-
Notifications
You must be signed in to change notification settings - Fork 378
/
notify_network.go
145 lines (122 loc) · 3.24 KB
/
notify_network.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package tinder
import (
"context"
"sync"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
"berty.tech/berty/v2/go/internal/notify"
)
type NetworkUpdate struct {
logger *zap.Logger
notify *notify.Notify
locker *sync.Mutex
sub event.Subscription
once sync.Once
currentAddrs []ma.Multiaddr
}
func NewNetworkUpdate(logger *zap.Logger, h host.Host) (*NetworkUpdate, error) {
sub, err := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated))
if err != nil {
return nil, err
}
locker := &sync.Mutex{}
nu := &NetworkUpdate{
logger: logger,
sub: sub,
locker: locker,
notify: notify.New(locker),
currentAddrs: h.Network().ListenAddresses(),
}
go nu.subscribeToNetworkUpdate()
nu.logger.Debug("network update subscribe started")
return nu, nil
}
func (n *NetworkUpdate) WaitForUpdate(ctx context.Context, currentAddrs []ma.Multiaddr, factory bhost.AddrsFactory) bool {
n.locker.Lock()
defer n.locker.Unlock()
for {
// check for new/removed addrs
if diff := diffAddrs(currentAddrs, n.currentAddrs); len(diff) > 0 {
// filter addrs
if factory == nil {
return true
}
if filtered := factory(diff); len(filtered) > 0 {
return true
}
}
// wait until context is done or network is updated
if ok := n.notify.Wait(ctx); !ok {
return false
}
}
}
func (n *NetworkUpdate) GetLastUpdatedAddrs(ctx context.Context) (addrs []ma.Multiaddr) {
n.locker.Lock()
addrs = n.currentAddrs
n.locker.Unlock()
return
}
func (n *NetworkUpdate) subscribeToNetworkUpdate() {
for evt := range n.sub.Out() {
e := evt.(event.EvtLocalAddressesUpdated)
if e.Diffs {
// log diffs
var nadd, ndel int
for _, uaddr := range e.Current {
switch uaddr.Action {
case event.Added:
n.logger.Debug("new addr", zap.String("addr", uaddr.Address.String()))
nadd++
case event.Removed:
n.logger.Debug("removed addr", zap.String("addr", uaddr.Address.String()))
ndel++
}
}
n.logger.Debug("network update", zap.Int("del", ndel), zap.Int("add", nadd), zap.Int("total", nadd+ndel))
// update current addrs
n.locker.Lock()
n.currentAddrs = getAddrsFromUpdatedAddress(e.Current)
n.notify.Broadcast()
n.locker.Unlock()
}
}
}
func (n *NetworkUpdate) Close() (err error) {
// use once to avoid panic if called twice
n.once.Do(func() { err = n.sub.Close() })
return err
}
func diffAddrs(a, b []ma.Multiaddr) []ma.Multiaddr {
diff := []ma.Multiaddr{}
seta := make(map[string]ma.Multiaddr, len(a))
for _, addr := range a {
seta[addr.String()] = addr
}
setb := make(map[string]struct{})
for _, maddr := range b {
key := maddr.String()
if _, found := seta[key]; !found {
delete(seta, key)
diff = append(diff, maddr)
} else {
setb[key] = struct{}{}
}
}
for key, maddr := range seta {
if _, found := setb[key]; !found {
diff = append(diff, maddr)
}
}
return diff
}
func getAddrsFromUpdatedAddress(updated []event.UpdatedAddress) []ma.Multiaddr {
addrs := make([]ma.Multiaddr, len(updated))
for i, uaddr := range updated {
addrs[i] = uaddr.Address
}
return addrs
}