-
Notifications
You must be signed in to change notification settings - Fork 8
/
establish-link.go
103 lines (92 loc) · 2.52 KB
/
establish-link.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package pubsub_controller
import (
"context"
"github.com/aperturerobotics/bifrost/link"
"github.com/aperturerobotics/bifrost/pubsub"
"github.com/aperturerobotics/controllerbus/directive"
)
// establishLinkHandler handles EstablishLink values
type establishLinkHandler struct {
// c is the controller
c *Controller
// ref is the reference
ref directive.Reference
}
// newEstablishLinkHandler constructs a new establishLinkHandler
func newEstablishLinkHandler(c *Controller) *establishLinkHandler {
return &establishLinkHandler{
c: c,
}
}
// handleEstablishLink handles an EstablishLink directive.
func (c *Controller) handleEstablishLink(
ctx context.Context,
di directive.Instance,
d link.EstablishLinkWithPeer,
) {
handler := newEstablishLinkHandler(c)
ref := di.AddReference(handler, true)
if ref == nil {
return
}
handler.ref = ref
c.cleanupRefs = append(c.cleanupRefs, ref)
}
// HandleValueAdded is called when a value is added to the directive.
func (e *establishLinkHandler) HandleValueAdded(inst directive.Instance, val directive.AttachedValue) {
vl, ok := val.GetValue().(link.Link)
if !ok {
e.c.le.Warn("EstablishLink value was not a Link")
return
}
e.c.le.Debugf("got link with uuid %v", vl.GetUUID())
// Attempt to open the stream.
e.c.mtx.Lock()
e.c.incLinks = append(e.c.incLinks, vl)
e.c.mtx.Unlock()
e.c.wake()
}
// HandleValueRemoved is called when a value is removed from the directive.
func (e *establishLinkHandler) HandleValueRemoved(inst directive.Instance, val directive.AttachedValue) {
vl, ok := val.GetValue().(link.Link)
if !ok {
return
}
e.c.le.Debugf("lost link with uuid %v", vl.GetUUID())
tpl := pubsub.NewPeerLinkTuple(vl)
e.c.mtx.Lock()
for i, l := range e.c.incLinks {
if l == vl {
e.c.incLinks[i] = e.c.incLinks[len(e.c.incLinks)-1]
e.c.incLinks[len(e.c.incLinks)-1] = nil
e.c.incLinks = e.c.incLinks[:len(e.c.incLinks)-1]
break
}
}
if v, ok := e.c.links[tpl]; ok {
v.ctxCancel()
}
e.c.mtx.Unlock()
}
// HandleInstanceDisposed is called when a directive instance is disposed.
// This will occur if Close() is called on the directive instance.
func (e *establishLinkHandler) HandleInstanceDisposed(inst directive.Instance) {
eref := e.ref
if eref == nil {
return
}
e.ref = nil
e.c.mtx.Lock()
for i, ref := range e.c.cleanupRefs {
if ref == eref {
a := e.c.cleanupRefs
a[i] = a[len(a)-1]
a[len(a)-1] = nil
a = a[:len(a)-1]
e.c.cleanupRefs = a
break
}
}
e.c.mtx.Unlock()
}
var _ directive.ReferenceHandler = ((*establishLinkHandler)(nil))