-
Notifications
You must be signed in to change notification settings - Fork 199
/
libp2pConnectionMonitor.go
116 lines (95 loc) · 3.66 KB
/
libp2pConnectionMonitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package libp2p
import (
"math"
"time"
"github.com/ElrondNetwork/elrond-go/p2p"
ns "github.com/ElrondNetwork/elrond-go/p2p/libp2p/networksharding"
"github.com/libp2p/go-libp2p-core/network"
"github.com/multiformats/go-multiaddr"
)
// DurationBetweenReconnectAttempts is used as to not call reconnecter.ReconnectToNetwork() to often
// when there are a lot of peers disconnecting and reconnection to initial nodes succeed
var DurationBetweenReconnectAttempts = time.Second * 5
type libp2pConnectionMonitor struct {
chDoReconnect chan struct{}
reconnecter p2p.Reconnecter
thresholdMinConnectedPeers int
thresholdDiscoveryResume int // if the number of connections drops under this value, the discovery is restarted
thresholdDiscoveryPause int // if the number of connections is over this value, the discovery is stopped
thresholdConnTrim int // if the number of connections is over this value, we start trimming
}
func newLibp2pConnectionMonitor(reconnecter p2p.Reconnecter, thresholdMinConnectedPeers int, targetConnCount int) (*libp2pConnectionMonitor, error) {
if thresholdMinConnectedPeers < 0 {
return nil, p2p.ErrInvalidValue
}
cm := &libp2pConnectionMonitor{
reconnecter: reconnecter,
chDoReconnect: make(chan struct{}, 0),
thresholdMinConnectedPeers: thresholdMinConnectedPeers,
thresholdDiscoveryResume: 0,
thresholdDiscoveryPause: math.MaxInt32,
thresholdConnTrim: math.MaxInt32,
}
if targetConnCount > 0 {
cm.thresholdDiscoveryResume = targetConnCount * 4 / 5
cm.thresholdDiscoveryPause = targetConnCount
cm.thresholdConnTrim = targetConnCount * 6 / 5
}
if reconnecter != nil {
go cm.doReconnection()
}
return cm, nil
}
// Listen is called when network starts listening on an addr
func (lcm *libp2pConnectionMonitor) Listen(network.Network, multiaddr.Multiaddr) {}
// ListenClose is called when network stops listening on an addr
func (lcm *libp2pConnectionMonitor) ListenClose(network.Network, multiaddr.Multiaddr) {}
// Request a reconnect to initial list
func (lcm *libp2pConnectionMonitor) doReconn() {
select {
case lcm.chDoReconnect <- struct{}{}:
default:
}
}
// Connected is called when a connection opened
func (lcm *libp2pConnectionMonitor) Connected(netw network.Network, conn network.Conn) {
if len(netw.Conns()) > lcm.thresholdDiscoveryPause {
lcm.reconnecter.Pause()
}
if len(netw.Conns()) > lcm.thresholdConnTrim {
sorted := ns.Get().SortList(netw.Peers(), netw.LocalPeer())
for i := lcm.thresholdDiscoveryPause; i < len(sorted); i++ {
_ = netw.ClosePeer(sorted[i])
}
lcm.doReconn()
}
}
// Disconnected is called when a connection closed
func (lcm *libp2pConnectionMonitor) Disconnected(netw network.Network, conn network.Conn) {
lcm.doReconnectionIfNeeded(netw)
if len(netw.Conns()) < lcm.thresholdDiscoveryResume && lcm.reconnecter != nil {
lcm.reconnecter.Resume()
lcm.doReconn()
}
}
func (lcm *libp2pConnectionMonitor) doReconnectionIfNeeded(netw network.Network) {
if !lcm.isConnectedToTheNetwork(netw) {
lcm.doReconn()
}
}
// OpenedStream is called when a stream opened
func (lcm *libp2pConnectionMonitor) OpenedStream(network.Network, network.Stream) {}
// ClosedStream is called when a stream closed
func (lcm *libp2pConnectionMonitor) ClosedStream(network.Network, network.Stream) {}
func (lcm *libp2pConnectionMonitor) doReconnection() {
for {
select {
case <-lcm.chDoReconnect:
<-lcm.reconnecter.ReconnectToNetwork()
}
time.Sleep(DurationBetweenReconnectAttempts)
}
}
func (lcm *libp2pConnectionMonitor) isConnectedToTheNetwork(netw network.Network) bool {
return len(netw.Conns()) >= lcm.thresholdMinConnectedPeers
}