-
Notifications
You must be signed in to change notification settings - Fork 376
/
lifecycle.go
175 lines (146 loc) · 4.31 KB
/
lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package ipfsutil
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p-core/connmgr"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"go.uber.org/zap"
"berty.tech/berty/v2/go/internal/lifecycle"
"berty.tech/berty/v2/go/internal/logutil"
)
var (
ConnLifecycleGracePeriod = time.Second
ConnLifecyclePingTimeout = time.Second * 5
ConnPeerOfInterestMinScore = 20
)
type ConnLifecycle struct {
connmgr.ConnManager
rootCtx context.Context
logger *zap.Logger
peering *PeeringService
ps *ping.PingService
h host.Host
lm *lifecycle.Manager
}
func NewConnLifecycle(ctx context.Context, logger *zap.Logger, h host.Host, ps *PeeringService, lm *lifecycle.Manager) (*ConnLifecycle, error) {
cl := &ConnLifecycle{
peering: ps,
rootCtx: ctx,
logger: logger,
ps: ping.NewPingService(h),
h: h,
lm: lm,
}
// start peer of interest monitoring process
if err := cl.monitorPeerOfInterest(ctx); err != nil {
return nil, err
}
// start app state monitoring process
go cl.monitorAppState(ctx)
cl.logger.Debug("lifecycle conn started")
return cl, nil
}
func (cl *ConnLifecycle) monitorAppState(ctx context.Context) {
currentState := lifecycle.StateActive
for {
start := time.Now()
if !cl.lm.WaitForStateChange(ctx, currentState) {
return
}
currentState = cl.lm.GetCurrentState()
if time.Since(start) <= ConnLifecycleGracePeriod {
continue
}
switch currentState {
case lifecycle.StateInactive:
cl.logger.Debug("inactive mode")
case lifecycle.StateActive:
cl.logger.Debug("active mode")
go cl.dropUnavailableConn()
}
}
}
func (cl *ConnLifecycle) dropUnavailableConn() {
cl.logger.Debug("droping unavailable conn")
peers := make(map[peer.ID]struct{})
for _, c := range cl.h.Network().Conns() {
if _, ok := peers[c.RemotePeer()]; !ok {
peers[c.RemotePeer()] = struct{}{}
}
}
unavailable := uint32(0)
wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(cl.rootCtx)
for p := range peers {
cping := cl.ps.Ping(ctx, p)
wg.Add(1)
go func(peer peer.ID) {
defer wg.Done()
select {
case ret := <-cping:
if ret.Error == nil {
return // everything should be ok
}
case <-time.After(ConnLifecyclePingTimeout):
}
// connection should be dead
atomic.AddUint32(&unavailable, 1)
// if we are here, conn should be kill
if err := cl.h.Network().ClosePeer(peer); err != nil {
cl.logger.Warn("unable to close connection", zap.Error(err))
}
}(p)
}
wg.Wait()
cancel()
if unavailable > 0 {
available := uint32(len(peers)) - unavailable
cl.logger.Debug("dropped unavailable peers", zap.Uint32("available", available), zap.Uint32("unavailable", unavailable))
} else {
cl.logger.Debug("all peers are available")
}
}
func (cl *ConnLifecycle) monitorPeerOfInterest(ctx context.Context) error {
sub, err := cl.h.EventBus().Subscribe([]interface{}{
new(EvtPeerTag),
})
if err != nil {
return fmt.Errorf("unable to subscribe to `EvtPeerConnectednessChanged`: %w", err)
}
for _, p := range cl.h.Peerstore().Peers() {
if tag := cl.h.ConnManager().GetTagInfo(p); tag != nil && tag.Value >= ConnPeerOfInterestMinScore {
infos := cl.h.Peerstore().PeerInfo(p)
cl.peering.AddPeer(infos)
cl.logger.Debug("adding peer of interest", logutil.PrivateStringer("peer", p), zap.Int("score", tag.Value))
}
}
go func() {
defer sub.Close()
for {
var e interface{}
select {
case e = <-sub.Out():
case <-ctx.Done():
return
}
evt := e.(EvtPeerTag)
oldTotal := evt.Total - evt.Diff
if evt.Total >= ConnPeerOfInterestMinScore && oldTotal < ConnPeerOfInterestMinScore {
infos := cl.h.Peerstore().PeerInfo(evt.Peer)
cl.peering.AddPeer(infos)
cl.logger.Debug("marking peer as peer of interest",
logutil.PrivateStringer("peer", evt.Peer), zap.Int("score", evt.Total), zap.Int("diff", evt.Diff), zap.String("last_tag", evt.Tag))
} else if evt.Total < ConnPeerOfInterestMinScore && oldTotal >= ConnPeerOfInterestMinScore {
cl.peering.RemovePeer(evt.Peer)
cl.logger.Debug("unmarking peer as peer of interest",
logutil.PrivateStringer("peer", evt.Peer), zap.Int("score", evt.Total), zap.Int("diff", evt.Diff), zap.String("last_tag", evt.Tag))
}
}
}()
return nil
}