-
Notifications
You must be signed in to change notification settings - Fork 10
/
link-waiter.go
100 lines (92 loc) · 2.17 KB
/
link-waiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package transport_controller
import (
"github.com/aperturerobotics/bifrost/link"
"github.com/aperturerobotics/bifrost/peer"
)
// linkWaiter waits for a link from a specific peer to be opened.
// cb is NOT called concurrently (mtx will be locked while calling)
type linkWaiter struct {
peerID peer.ID
cb func(link.Link, bool)
cbOnce bool
}
// pushLinkWaiter pushes a new waiter for a link with a peer id.
// checks for a link that matches the peer id
// returns nil if callback was called immediately and cbOnce is set
// if cbOnce, only added events will be sent, and only one cb() will be called
// cb added indicates if it is an add or remove event.
// mtx should be locked.
func (c *Controller) pushLinkWaiter(
peerID peer.ID,
cbOnce bool,
cb func(lnk link.Link, added bool),
) *linkWaiter {
peerIDEmpty := peerID == peer.ID("")
for _, lnk := range c.links {
if peerIDEmpty || lnk.lnk.GetRemotePeer() == peerID {
cb(lnk.lnk, true)
if cbOnce {
return nil
}
}
}
w := &linkWaiter{peerID: peerID, cb: cb, cbOnce: cbOnce}
pw := c.linkWaiters[peerID]
pw = append(pw, w)
c.linkWaiters[peerID] = pw
return w
}
// clearLinkWaiter removes waiter for a link
// mtx should be locked.
// returns if the waiter was found
func (c *Controller) clearLinkWaiter(w *linkWaiter) bool {
if w == nil {
return false
}
pid := w.peerID
pw := c.linkWaiters[pid]
var found bool
for i, iw := range pw {
if iw == w {
pw[i] = pw[len(pw)-1]
pw[len(pw)-1] = nil
pw = pw[:len(pw)-1]
found = true
break
}
}
if !found {
return false
}
if len(pw) == 0 {
delete(c.linkWaiters, pid)
} else {
c.linkWaiters[pid] = pw
}
return true
}
// resolveLinkWaiters resolves waiters with a link.
// mtx should be locked.
func (c *Controller) resolveLinkWaiters(lnk link.Link, added bool) {
peerID := lnk.GetRemotePeer()
if pw, ok := c.linkWaiters[peerID]; ok {
for i := 0; i < len(pw); i++ {
w := pw[i]
if w.cbOnce && !added {
continue
}
w.cb(lnk, added)
if w.cbOnce {
pw[i] = pw[len(pw)-1]
pw[len(pw)-1] = nil
pw = pw[:len(pw)-1]
i--
}
}
if len(pw) == 0 {
delete(c.linkWaiters, peerID)
} else {
c.linkWaiters[peerID] = pw
}
}
}