-
Notifications
You must be signed in to change notification settings - Fork 10
/
link.go
101 lines (91 loc) · 2.48 KB
/
link.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package transport_controller
import (
"context"
"io"
"github.com/aperturerobotics/bifrost/link"
"github.com/aperturerobotics/controllerbus/bus"
"github.com/aperturerobotics/controllerbus/directive"
"github.com/sirupsen/logrus"
)
// establishedLink holds state for an established link.
type establishedLink struct {
// le is the log entry
le *logrus.Entry
// c is the transport controller
c *Controller
// lnk is the link.
lnk link.Link
// di is the directive instance
di directive.Instance
// cancel closes any goroutines related to the link
cancel context.CancelFunc
}
// newEstablishedLink constructs the new EstablishedLink object.
// The EstablishLink directive is fulfilled on the controller bus.
func newEstablishedLink(
le *logrus.Entry,
rctx context.Context,
b bus.Bus,
lnk link.Link,
ctrl *Controller,
) (*establishedLink, error) {
// Construct EstablishLink directive.
// The controller will match the directive to this link.
// Note: the directive has an UnrefDisposeDur assigned: minimum hold-open time.
di, dir, err := b.AddDirective(
link.NewEstablishLinkWithPeer(lnk.GetLocalPeer(), lnk.GetRemotePeer()),
nil,
)
if err != nil {
return nil, err
}
// Start the close goroutine
ctx, ctxCancel := context.WithCancel(rctx)
el := &establishedLink{
le: le.WithField("peer-id", lnk.GetRemotePeer().String()),
lnk: lnk,
di: di,
cancel: ctxCancel,
c: ctrl,
}
di.AddDisposeCallback(func() {
go lnk.Close()
ctxCancel()
})
go el.acceptStreamPump(ctx)
// Remove the directive instance immediately.
//
// The directive will enter the UnrefDisposeDur hold-open timer if there
// were no other references, and AddDisposeCallback above will be called
// only when the directive is removed after the dispose timeout.
dir.Release()
return el, nil
}
func (e *establishedLink) acceptStreamPump(ctx context.Context) {
// accept streams
lnk, ctrl := e.lnk, e.c
defer func() {
// close the directive instance early if there are no non-weak refs.
// this skips the hold-open (unref dispose dur) timer.
_ = e.di.CloseIfUnreferenced(false)
e.cancel()
lnk.Close()
}()
for {
strm, strmOpts, err := lnk.AcceptStream()
if err != nil {
if err != context.Canceled && err != io.EOF {
select {
case <-ctx.Done():
// don't log if the context was canceled
default:
e.le.WithError(err).Warn("link accept stream errored")
}
}
return
}
if strm != nil {
go ctrl.HandleIncomingStream(ctx, lnk, strm, strmOpts)
}
}
}