-
Notifications
You must be signed in to change notification settings - Fork 8
/
drpc.go
58 lines (52 loc) · 1.25 KB
/
drpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package stream_drpc
import (
"context"
"github.com/aperturerobotics/bifrost/link"
"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/bifrost/protocol"
"github.com/aperturerobotics/bifrost/stream"
"github.com/aperturerobotics/controllerbus/bus"
"storj.io/drpc/drpcconn"
)
// NewDrpcConn constructs a new dprc conn from a stream.
func NewDrpcConn(ctx context.Context, strm stream.Stream, opts *DrpcOpts) (*drpcconn.Conn, error) {
opt, err := opts.BuildOpts()
if err != nil {
return nil, err
}
return drpcconn.NewWithOptions(strm, opt), nil
}
// EstablishDrpcConn establishes a drpc connection with a peer.
//
// srcPeer, transportID can be empty.
func EstablishDrpcConn(
ctx context.Context,
b bus.Bus,
opts *DrpcOpts,
protocolID protocol.ID,
srcPeer, destPeer peer.ID,
transportID uint64,
) (*drpcconn.Conn, func(), error) {
if err := opts.Validate(); err != nil {
return nil, nil, err
}
ms, msRef, err := link.OpenStreamWithPeerEx(
ctx,
b,
protocolID,
srcPeer, destPeer, 0,
stream.OpenOpts{
Reliable: true,
Encrypted: true,
},
)
if err != nil {
return nil, nil, err
}
conn, err := NewDrpcConn(ctx, ms.GetStream(), opts)
if err != nil {
msRef()
return nil, nil, err
}
return conn, msRef, err
}