-
Notifications
You must be signed in to change notification settings - Fork 70
/
peerconn.go
128 lines (110 loc) · 3.39 KB
/
peerconn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package peerconn
import (
"io"
"net"
"time"
"github.com/cenkalti/rain/internal/logger"
"github.com/cenkalti/rain/internal/peerconn/peerreader"
"github.com/cenkalti/rain/internal/peerconn/peerwriter"
"github.com/cenkalti/rain/internal/peerprotocol"
"github.com/juju/ratelimit"
)
// Conn is a peer connection that provides a channel for receiving messages and methods for sending messages.
type Conn struct {
conn net.Conn
reader *peerreader.PeerReader
writer *peerwriter.PeerWriter
messages chan any
log logger.Logger
closeC chan struct{}
doneC chan struct{}
}
// New returns a new PeerConn by wrapping a net.Conn.
func New(conn net.Conn, l logger.Logger, pieceTimeout time.Duration, maxRequestsIn int, fastEnabled bool, br, bw *ratelimit.Bucket) *Conn {
return &Conn{
conn: conn,
reader: peerreader.New(conn, l, pieceTimeout, br),
writer: peerwriter.New(conn, l, maxRequestsIn, fastEnabled, bw),
messages: make(chan any),
log: l,
closeC: make(chan struct{}),
doneC: make(chan struct{}),
}
}
// Addr returns the net.TCPAddr of the peer.
func (p *Conn) Addr() *net.TCPAddr {
return p.conn.RemoteAddr().(*net.TCPAddr)
}
// IP returns the string representation of IP address.
func (p *Conn) IP() string {
return p.conn.RemoteAddr().(*net.TCPAddr).IP.String()
}
// String returns the remote address as string.
func (p *Conn) String() string {
return p.conn.RemoteAddr().String()
}
// Close stops receiving and sending messages and closes underlying net.Conn.
func (p *Conn) Close() {
close(p.closeC)
<-p.doneC
}
// Logger for the peer that logs messages prefixed with peer address.
func (p *Conn) Logger() logger.Logger {
return p.log
}
// Messages received from the peer will be sent to the channel returned.
// The channel and underlying net.Conn will be closed if any error occurs while receiving or sending.
func (p *Conn) Messages() <-chan any {
return p.messages
}
// SendMessage queues a message for sending. Does not block.
func (p *Conn) SendMessage(msg peerprotocol.Message) {
p.writer.SendMessage(msg)
}
// SendPiece queues a piece message for sending. Does not block.
// Piece data is read just before the message is sent.
// If queued messages greater than `maxRequestsIn` specified in constructor, the last message is dropped.
func (p *Conn) SendPiece(msg peerprotocol.RequestMessage, pi io.ReaderAt) {
p.writer.SendPiece(msg, pi)
}
// CancelRequest removes previously queued piece message matching msg.
func (p *Conn) CancelRequest(msg peerprotocol.CancelMessage) {
p.writer.CancelRequest(msg)
}
// Run starts receiving messages from peer and starts sending queued messages.
// If any error happens during receiving or sending messages,
// the connection and the underlying net.Conn will be closed.
func (p *Conn) Run() {
defer close(p.doneC)
defer close(p.messages)
p.log.Debugln("Communicating peer", p.conn.RemoteAddr())
go p.reader.Run()
defer func() { <-p.reader.Done() }()
go p.writer.Run()
defer func() { <-p.writer.Done() }()
defer p.conn.Close()
for {
select {
case msg := <-p.reader.Messages():
select {
case p.messages <- msg:
case <-p.closeC:
}
case msg := <-p.writer.Messages():
select {
case p.messages <- msg:
case <-p.closeC:
}
case <-p.closeC:
p.reader.Stop()
p.writer.Stop()
return
case <-p.reader.Done():
p.writer.Stop()
return
case <-p.writer.Done():
p.reader.Stop()
return
}
}
}