-
Notifications
You must be signed in to change notification settings - Fork 8
/
srpc.go
132 lines (122 loc) · 2.97 KB
/
srpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package stream_srpc
import (
"context"
"errors"
"time"
"github.com/aperturerobotics/bifrost/link"
"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/bifrost/protocol"
"github.com/aperturerobotics/bifrost/stream"
"github.com/aperturerobotics/controllerbus/bus"
"github.com/aperturerobotics/starpc/srpc"
"github.com/sirupsen/logrus"
)
// NewOpenStreamFunc constructs a new OpenStreamFunc which establishes a
// connection with the given peer on-demand when a RPC call is made.
//
// transportID and srcPeer are optional
// starts a read pump in a goroutine
func NewOpenStreamFunc(
b bus.Bus,
protocolID protocol.ID,
srcPeer, destPeer peer.ID,
transportID uint64,
) srpc.OpenStreamFunc {
return func(
ctx context.Context,
msgHandler srpc.PacketDataHandler,
closeHandler srpc.CloseHandler,
) (srpc.PacketWriter, error) {
return EstablishSrpcStream(
ctx,
b,
protocolID,
srcPeer, destPeer,
transportID,
msgHandler, closeHandler,
)
}
}
// NewMultiOpenStreamFunc builds a func which attempts multiple peers.
//
// if timeoutDur <= 0, uses no timeout.
func NewMultiOpenStreamFunc(
b bus.Bus,
le *logrus.Entry,
protocolID protocol.ID,
srcPeer peer.ID, destPeers []peer.ID,
transportID uint64,
timeoutDur time.Duration,
) srpc.OpenStreamFunc {
return func(
ctx context.Context,
msgHandler srpc.PacketDataHandler,
closeHandler srpc.CloseHandler,
) (srpc.PacketWriter, error) {
var lastErr error
for _, destPeer := range destPeers {
var estCtx context.Context
var estCtxCancel context.CancelFunc
if timeoutDur > 0 {
estCtx, estCtxCancel = context.WithTimeout(ctx, timeoutDur)
} else {
estCtx, estCtxCancel = context.WithCancel(ctx)
}
le := le.WithField("server-peer-id", destPeer.String())
writer, err := EstablishSrpcStream(
estCtx,
b,
protocolID,
srcPeer, destPeer,
transportID,
msgHandler,
closeHandler,
)
estCtxCancel()
if err != nil {
// detect deadline exceeded
if err == context.Canceled && estCtx.Err() != nil && ctx.Err() == nil {
err = context.DeadlineExceeded
}
le.WithError(err).Warn("unable to establish srpc conn")
lastErr = err
continue
}
return writer, nil
}
if lastErr == nil {
lastErr = errors.New("connection failed")
}
return nil, lastErr
}
}
// EstablishSrpcStream establishes a srpc stream via a Bifrost stream.
//
// transportID and srcPeer are optional
// starts a read pump in a goroutine
func EstablishSrpcStream(
ctx context.Context,
b bus.Bus,
protocolID protocol.ID,
srcPeer, destPeer peer.ID,
transportID uint64,
msgHandler srpc.PacketDataHandler,
closeHandler srpc.CloseHandler,
) (srpc.PacketWriter, error) {
ms, msRel, err := link.OpenStreamWithPeerEx(
ctx,
b,
protocolID,
srcPeer, destPeer, transportID,
stream.OpenOpts{},
)
if err != nil {
return nil, err
}
rw := srpc.NewPacketReadWriter(ms.GetStream())
go func() {
rw.ReadPump(msgHandler, closeHandler)
msRel()
}()
return rw, nil
}