/
peer_queuehandler.go
140 lines (128 loc) · 3.99 KB
/
peer_queuehandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright (c) 2020-2021 The bitcoinpay developers
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2016-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer
import (
"container/list"
"github.com/btceasypay/bitcoinpay/core/message"
"github.com/btceasypay/bitcoinpay/log"
"sync/atomic"
"time"
)
// queueHandler handles the queuing of outgoing data for the peer. This runs as
// a muxer for various sources of input so we can ensure that server and peer
// handlers will not block on us sending a message. That data is then passed on
// to outHandler to be actually written.
func (p *Peer) queueHandler() {
pendingMsgs := list.New()
invSendQueue := list.New()
trickleTicker := time.NewTicker(p.cfg.TrickleInterval)
defer trickleTicker.Stop()
// We keep the waiting flag so that we know if we have a message queued
// to the outHandler or not. We could use the presence of a head of
// the list for this but then we have rather racy concerns about whether
// it has gotten it at cleanup time - and thus who sends on the
// message's done channel. To avoid such confusion we keep a different
// flag and pendingMsgs only contains messages that we have not yet
// passed to outHandler.
waiting := false
// To avoid duplication below.
queuePacket := func(msg outMsg, list *list.List, waiting bool) bool {
if !waiting {
p.sendQueue <- msg
} else {
list.PushBack(msg)
}
// we are always waiting now.
return true
}
out:
for {
select {
case msg := <-p.outputQueue:
waiting = queuePacket(msg, pendingMsgs, waiting)
// This channel is notified when a message has been sent across
// the network socket.
case <-p.sendDoneQueue:
// No longer waiting if there are no more messages
// in the pending messages queue.
next := pendingMsgs.Front()
if next == nil {
waiting = false
continue
}
// Notify the outHandler about the next item to
// asynchronously send.
val := pendingMsgs.Remove(next)
p.sendQueue <- val.(outMsg)
case iv := <-p.outputInvChan:
// No handshake? They'll find out soon enough.
if p.VersionKnown() {
invSendQueue.PushBack(iv)
}
case <-trickleTicker.C:
// Don't send anything if we're disconnecting or there
// is no queued inventory.
// version is known if send queue has any entries.
if atomic.LoadInt32(&p.disconnect) != 0 ||
invSendQueue.Len() == 0 {
continue
}
// Create and send as many inv messages as needed to
// drain the inventory send queue.
invMsg := message.NewMsgInvSizeHint(uint(invSendQueue.Len()))
for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
iv := invSendQueue.Remove(e).(*message.InvVect)
// Don't send inventory that became known after
// the initial check.
if p.knownInventory.Exists(iv) {
continue
}
invMsg.AddInvVect(iv)
invMsg.GS = p.GetGraphState()
if len(invMsg.InvList) >= maxInvTrickleSize {
waiting = queuePacket(
outMsg{msg: invMsg},
pendingMsgs, waiting)
invMsg = message.NewMsgInvSizeHint(uint(invSendQueue.Len()))
}
// Add the inventory that is being relayed to
// the known inventory for the peer.
p.AddKnownInventory(iv)
}
if len(invMsg.InvList) > 0 {
waiting = queuePacket(outMsg{msg: invMsg},
pendingMsgs, waiting)
}
case <-p.quit:
break out
}
}
// Drain any wait channels before we go away so we don't leave something
// waiting for us.
for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() {
val := pendingMsgs.Remove(e)
msg := val.(outMsg)
if msg.doneChan != nil {
msg.doneChan <- struct{}{}
}
}
cleanup:
for {
select {
case msg := <-p.outputQueue:
if msg.doneChan != nil {
msg.doneChan <- struct{}{}
}
case <-p.outputInvChan:
// Just drain channel
// sendDoneQueue is buffered so doesn't need draining.
default:
break cleanup
}
}
close(p.queueQuit)
log.Trace("Peer queue handler done", "peer", p.addr)
}