-
Notifications
You must be signed in to change notification settings - Fork 199
/
peersOnChannel.go
133 lines (110 loc) · 3.56 KB
/
peersOnChannel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package libp2p
import (
"context"
"sync"
"time"
"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/libp2p/go-libp2p-core/peer"
)
// peersOnChannel manages peers on topics
// it buffers the data and refresh the peers list continuously (in refreshInterval intervals)
type peersOnChannel struct {
mutPeers sync.RWMutex
peers map[string][]core.PeerID
lastUpdated map[string]time.Time
refreshInterval time.Duration
ttlInterval time.Duration
fetchPeersHandler func(topic string) []peer.ID
getTimeHandler func() time.Time
cancelFunc context.CancelFunc
}
// newPeersOnChannel returns a new peersOnChannel object
func newPeersOnChannel(
fetchPeersHandler func(topic string) []peer.ID,
refreshInterval time.Duration,
ttlInterval time.Duration,
) (*peersOnChannel, error) {
if fetchPeersHandler == nil {
return nil, p2p.ErrNilFetchPeersOnTopicHandler
}
if refreshInterval == 0 {
return nil, p2p.ErrInvalidDurationProvided
}
if ttlInterval == 0 {
return nil, p2p.ErrInvalidDurationProvided
}
ctx, cancelFunc := context.WithCancel(context.Background())
poc := &peersOnChannel{
peers: make(map[string][]core.PeerID),
lastUpdated: make(map[string]time.Time),
refreshInterval: refreshInterval,
ttlInterval: ttlInterval,
fetchPeersHandler: fetchPeersHandler,
cancelFunc: cancelFunc,
}
poc.getTimeHandler = poc.clockTime
go poc.refreshPeersOnAllKnownTopics(ctx)
return poc, nil
}
func (poc *peersOnChannel) clockTime() time.Time {
return time.Now()
}
// ConnectedPeersOnChannel returns the known peers on a topic
// if the list was not initialized, it will trigger a manual fetch
func (poc *peersOnChannel) ConnectedPeersOnChannel(topic string) []core.PeerID {
poc.mutPeers.RLock()
peers := poc.peers[topic]
poc.mutPeers.RUnlock()
if peers != nil {
return peers
}
return poc.refreshPeersOnTopic(topic)
}
// updateConnectedPeersOnTopic updates the connected peers on a topic and the last update timestamp
func (poc *peersOnChannel) updateConnectedPeersOnTopic(topic string, connectedPeers []core.PeerID) {
poc.mutPeers.Lock()
poc.peers[topic] = connectedPeers
poc.lastUpdated[topic] = poc.getTimeHandler()
poc.mutPeers.Unlock()
}
// refreshPeersOnAllKnownTopics iterates each topic, fetching its last timestamp
// it the timestamp + ttlInterval < time.Now, will trigger a fetch of connected peers on topic
func (poc *peersOnChannel) refreshPeersOnAllKnownTopics(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Debug("refreshPeersOnAllKnownTopics's go routine is stopping...")
return
case <-time.After(poc.refreshInterval):
}
listTopicsToBeRefreshed := make([]string, 0)
//build required topic list
poc.mutPeers.RLock()
for topic, lastRefreshed := range poc.lastUpdated {
needsToBeRefreshed := poc.getTimeHandler().Sub(lastRefreshed) > poc.ttlInterval
if needsToBeRefreshed {
listTopicsToBeRefreshed = append(listTopicsToBeRefreshed, topic)
}
}
poc.mutPeers.RUnlock()
for _, topic := range listTopicsToBeRefreshed {
_ = poc.refreshPeersOnTopic(topic)
}
}
}
// refreshPeersOnTopic
func (poc *peersOnChannel) refreshPeersOnTopic(topic string) []core.PeerID {
list := poc.fetchPeersHandler(topic)
connectedPeers := make([]core.PeerID, len(list))
for i, pid := range list {
connectedPeers[i] = core.PeerID(pid)
}
poc.updateConnectedPeersOnTopic(topic, connectedPeers)
return connectedPeers
}
// Close closes all underlying components
func (poc *peersOnChannel) Close() error {
poc.cancelFunc()
return nil
}