-
Notifications
You must be signed in to change notification settings - Fork 75
/
pex.go
102 lines (86 loc) · 1.84 KB
/
pex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package peer
import (
"net"
"time"
"github.com/cenkalti/rain/internal/peerconn"
"github.com/cenkalti/rain/internal/peerprotocol"
"github.com/cenkalti/rain/internal/pexlist"
)
type pex struct {
conn *peerconn.Conn
extID uint8
// Contains added and dropped peers.
pexList *pexlist.PEXList
// To send connected peers at interval
pexTicker *time.Ticker
pexAddPeerC chan *net.TCPAddr
pexDropPeerC chan *net.TCPAddr
closeC chan struct{}
doneC chan struct{}
}
func newPEX(conn *peerconn.Conn, extID uint8, initialPeers map[*Peer]struct{}) *pex {
pl := pexlist.New()
for pe := range initialPeers {
if pe.Addr().String() != conn.Addr().String() {
pl.Add(pe.Addr())
}
}
return &pex{
conn: conn,
extID: extID,
pexList: pl,
pexAddPeerC: make(chan *net.TCPAddr),
pexDropPeerC: make(chan *net.TCPAddr),
closeC: make(chan struct{}),
doneC: make(chan struct{}),
}
}
func (p *pex) close() {
close(p.closeC)
<-p.doneC
}
func (p *pex) run() {
defer close(p.doneC)
p.pexFlushPeers()
p.pexTicker = time.NewTicker(time.Minute)
defer p.pexTicker.Stop()
for {
select {
case addr := <-p.pexAddPeerC:
p.pexList.Add(addr)
case addr := <-p.pexDropPeerC:
p.pexList.Drop(addr)
case <-p.pexTicker.C:
p.pexFlushPeers()
case <-p.closeC:
return
}
}
}
func (p *pex) Add(addr *net.TCPAddr) {
select {
case p.pexAddPeerC <- addr:
case <-p.doneC:
}
}
func (p *pex) Drop(addr *net.TCPAddr) {
select {
case p.pexDropPeerC <- addr:
case <-p.doneC:
}
}
func (p *pex) pexFlushPeers() {
added, dropped := p.pexList.Flush()
if len(added) == 0 && len(dropped) == 0 {
return
}
extPEXMsg := peerprotocol.ExtensionPEXMessage{
Added: added,
Dropped: dropped,
}
msg := peerprotocol.ExtensionMessage{
ExtendedMessageID: p.extID,
Payload: extPEXMsg,
}
p.conn.SendMessage(msg)
}