-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpc.go
106 lines (83 loc) · 2.55 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package fleet
import (
"context"
"fmt"
"io/fs"
"log"
)
type RpcEndpoint func(any) (any, error)
var (
rpcE = make(map[string]RpcEndpoint)
)
type RPC interface {
// All will send a given data object to all other RPC instances on the fleet
// and will collect responses
All(ctx context.Context, data []byte) ([]any, error)
// Broadcast will do the same as All but will not wait for responses
Broadcast(ctx context.Context, data []byte) error
// Request will send a given object to a specific peer and return the response
Request(ctx context.Context, id string, data []byte) ([]byte, error)
// SEnd will send a given object to a specific peer but ignore the response
Send(ctx context.Context, id string, data []byte) error
// Self will return the id of the local peer, can be used for other instances
// to contact here with Send().
Self() string
// Connect connects this RPC instance incoming events to a given function
// that will be called each time an event is received.
Connect(cb func(context.Context, []byte) ([]byte, error))
}
type rpcInstance struct {
a *Agent
name string
cb func(context.Context, []byte) ([]byte, error)
}
func SetRpcEndpoint(e string, f RpcEndpoint) {
rpcE[e] = f
}
// CallRpcEndpoint will call the named RPC endpoint on the local machine
func CallRpcEndpoint(e string, p any) (res any, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("[fleet] Panic in RPC %s: %s", e, r)
err = fmt.Errorf("rpc call panic recovered: %s", r)
}
}()
ep, ok := rpcE[e]
if !ok {
return nil, fs.ErrNotExist
}
return ep(p)
}
func (a *Agent) NewRpcInstance(name string) (RPC, error) {
i := &rpcInstance{
name: name,
a: a,
}
SetRpcEndpoint(name, i.call)
return i, nil
}
func (i *rpcInstance) All(ctx context.Context, data []byte) ([]any, error) {
return i.a.AllRpcRequest(ctx, i.name, data)
}
func (i *rpcInstance) Broadcast(ctx context.Context, data []byte) error {
_, err := i.a.BroadcastRpcBin(ctx, i.name, data)
return err
}
func (i *rpcInstance) Request(ctx context.Context, id string, data []byte) ([]byte, error) {
return i.a.RpcRequest(ctx, id, i.name, data)
}
func (i *rpcInstance) Send(ctx context.Context, id string, data []byte) error {
return i.a.RpcSend(ctx, id, i.name, data)
}
func (i *rpcInstance) Self() string {
return i.a.Id()
}
func (i *rpcInstance) Connect(cb func(context.Context, []byte) ([]byte, error)) {
i.cb = cb
}
func (i *rpcInstance) call(v any) (any, error) {
if cb := i.cb; cb != nil {
return cb(context.Background(), v.([]byte))
}
return nil, fs.ErrNotExist
}