-
Notifications
You must be signed in to change notification settings - Fork 8
/
dial.go
64 lines (56 loc) · 1.31 KB
/
dial.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package stream_api_dial
import (
"context"
"github.com/aperturerobotics/bifrost/link"
"github.com/aperturerobotics/bifrost/protocol"
"github.com/aperturerobotics/bifrost/stream"
stream_api "github.com/aperturerobotics/bifrost/stream/api/rpc"
"github.com/aperturerobotics/controllerbus/bus"
)
// ProcessRPC processes an RPC by dialing the desired target.
func ProcessRPC(
ctx context.Context,
b bus.Bus,
conf *Config,
rpc stream_api.RPC,
) error {
if err := conf.Validate(); err != nil {
return err
}
localPeerID, err := conf.ParseLocalPeerID()
if err != nil {
return err
}
remotePeerID, err := conf.ParsePeerID()
if err != nil {
return err
}
// Dial the target.
if err := rpc.Send(&stream_api.Data{
State: stream_api.StreamState_StreamState_ESTABLISHING,
}); err != nil {
return err
}
strm, rel, err := link.OpenStreamWithPeerEx(
ctx,
b,
protocol.ID(conf.GetProtocolId()),
localPeerID, remotePeerID,
conf.GetTransportId(),
stream.OpenOpts{
Reliable: conf.GetReliable(),
Encrypted: conf.GetEncrypted(),
},
)
if err != nil {
return err
}
defer rel()
defer strm.GetStream().Close()
if err := rpc.Send(&stream_api.Data{
State: stream_api.StreamState_StreamState_ESTABLISHED,
}); err != nil {
return err
}
return stream_api.AttachRPCToStream(rpc, strm.GetStream(), nil)
}