forked from qdeconinck/mp-quic
/
flow_controller.go
220 lines (181 loc) · 7.5 KB
/
flow_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package flowcontrol
import (
"errors"
"time"
"github.com/lucas-clemente/quic-go/congestion"
"github.com/lucas-clemente/quic-go/internal/handshake"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
)
type flowController struct {
streamID protocol.StreamID
contributesToConnection bool // does the stream contribute to connection level flow control
connectionParameters handshake.ConnectionParametersManager
rttStats *congestion.RTTStats
remoteRTTs map[protocol.PathID]time.Duration
bytesSent protocol.ByteCount
sendWindow protocol.ByteCount
// For statistics purpose
bytesRetrans protocol.ByteCount
lastWindowUpdateTime time.Time
bytesRead protocol.ByteCount
highestReceived protocol.ByteCount
receiveWindow protocol.ByteCount
receiveWindowIncrement protocol.ByteCount
maxReceiveWindowIncrement protocol.ByteCount
}
// ErrReceivedSmallerByteOffset occurs if the ByteOffset received is smaller than a ByteOffset that was set previously
var ErrReceivedSmallerByteOffset = errors.New("Received a smaller byte offset")
// newFlowController gets a new flow controller
func newFlowController(streamID protocol.StreamID, contributesToConnection bool, connectionParameters handshake.ConnectionParametersManager, rttStats *congestion.RTTStats, remoteRTTs map[protocol.PathID]time.Duration) *flowController {
fc := flowController{
streamID: streamID,
contributesToConnection: contributesToConnection,
connectionParameters: connectionParameters,
rttStats: rttStats,
remoteRTTs: remoteRTTs,
}
if streamID == 0 {
fc.receiveWindow = connectionParameters.GetReceiveConnectionFlowControlWindow()
fc.receiveWindowIncrement = fc.receiveWindow
fc.maxReceiveWindowIncrement = connectionParameters.GetMaxReceiveConnectionFlowControlWindow()
} else {
fc.receiveWindow = connectionParameters.GetReceiveStreamFlowControlWindow()
fc.receiveWindowIncrement = fc.receiveWindow
fc.maxReceiveWindowIncrement = connectionParameters.GetMaxReceiveStreamFlowControlWindow()
}
return &fc
}
func (c *flowController) ContributesToConnection() bool {
return c.contributesToConnection
}
func (c *flowController) getSendWindow() protocol.ByteCount {
if c.sendWindow == 0 {
if c.streamID == 0 {
return c.connectionParameters.GetSendConnectionFlowControlWindow()
}
return c.connectionParameters.GetSendStreamFlowControlWindow()
}
return c.sendWindow
}
func (c *flowController) AddBytesSent(n protocol.ByteCount) {
c.bytesSent += n
}
func (c *flowController) GetBytesSent() protocol.ByteCount {
return c.bytesSent
}
func (c *flowController) AddBytesRetrans(n protocol.ByteCount) {
c.bytesRetrans += n
}
func (c *flowController) GetBytesRetrans() protocol.ByteCount {
return c.bytesRetrans
}
// UpdateSendWindow should be called after receiving a WindowUpdateFrame
// it returns true if the window was actually updated
func (c *flowController) UpdateSendWindow(newOffset protocol.ByteCount) bool {
if newOffset > c.sendWindow {
c.sendWindow = newOffset
return true
}
return false
}
func (c *flowController) SendWindowSize() protocol.ByteCount {
sendWindow := c.getSendWindow()
if c.bytesSent > sendWindow { // should never happen, but make sure we don't do an underflow here
return 0
}
return sendWindow - c.bytesSent
}
func (c *flowController) SendWindowOffset() protocol.ByteCount {
return c.getSendWindow()
}
// UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher
// Should **only** be used for the stream-level FlowController
// it returns an ErrReceivedSmallerByteOffset if the received byteOffset is smaller than any byteOffset received before
// This error occurs every time StreamFrames get reordered and has to be ignored in that case
// It should only be treated as an error when resetting a stream
func (c *flowController) UpdateHighestReceived(byteOffset protocol.ByteCount) (protocol.ByteCount, error) {
if byteOffset == c.highestReceived {
return 0, nil
}
if byteOffset > c.highestReceived {
increment := byteOffset - c.highestReceived
c.highestReceived = byteOffset
return increment, nil
}
return 0, ErrReceivedSmallerByteOffset
}
// IncrementHighestReceived adds an increment to the highestReceived value
// Should **only** be used for the connection-level FlowController
func (c *flowController) IncrementHighestReceived(increment protocol.ByteCount) {
c.highestReceived += increment
}
func (c *flowController) AddBytesRead(n protocol.ByteCount) {
// pretend we sent a WindowUpdate when reading the first byte
// this way auto-tuning of the window increment already works for the first WindowUpdate
if c.bytesRead == 0 {
c.lastWindowUpdateTime = time.Now()
}
c.bytesRead += n
}
// MaybeUpdateWindow updates the receive window, if necessary
// if the receive window increment is changed, the new value is returned, otherwise a 0
// the last return value is the new offset of the receive window
func (c *flowController) MaybeUpdateWindow() (bool, protocol.ByteCount /* new increment */, protocol.ByteCount /* new offset */) {
diff := c.receiveWindow - c.bytesRead
// Chromium implements the same threshold
if diff < (c.receiveWindowIncrement / 2) {
var newWindowIncrement protocol.ByteCount
oldWindowIncrement := c.receiveWindowIncrement
c.maybeAdjustWindowIncrement()
if c.receiveWindowIncrement != oldWindowIncrement {
newWindowIncrement = c.receiveWindowIncrement
}
c.lastWindowUpdateTime = time.Now()
c.receiveWindow = c.bytesRead + c.receiveWindowIncrement
return true, newWindowIncrement, c.receiveWindow
}
return false, c.receiveWindowIncrement, c.receiveWindow
}
// maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending WindowUpdates too often
func (c *flowController) maybeAdjustWindowIncrement() {
if c.lastWindowUpdateTime.IsZero() {
return
}
rtt := c.rttStats.SmoothedRTT()
if rtt == 0 {
return
}
timeSinceLastWindowUpdate := time.Since(c.lastWindowUpdateTime)
var maxRemoteRTT time.Duration
for _, remoteRTT := range(c.remoteRTTs) {
maxRemoteRTT = utils.MaxDuration(maxRemoteRTT, remoteRTT)
}
// interval between the window updates is sufficiently large, no need to increase the increment
if timeSinceLastWindowUpdate >= 2*utils.MaxDuration(rtt, maxRemoteRTT) {
return
}
oldWindowSize := c.receiveWindowIncrement
c.receiveWindowIncrement = utils.MinByteCount(2*c.receiveWindowIncrement, c.maxReceiveWindowIncrement)
// debug log, if the window size was actually increased
if oldWindowSize < c.receiveWindowIncrement {
newWindowSize := c.receiveWindowIncrement / (1 << 10)
if c.streamID == 0 {
utils.Debugf("Increasing receive flow control window for the connection to %d kB", newWindowSize)
} else {
utils.Debugf("Increasing receive flow control window increment for stream %d to %d kB", c.streamID, newWindowSize)
}
}
}
// EnsureMinimumWindowIncrement sets a minimum window increment
// it is intended be used for the connection-level flow controller
// it should make sure that the connection-level window is increased when a stream-level window grows
func (c *flowController) EnsureMinimumWindowIncrement(inc protocol.ByteCount) {
if inc > c.receiveWindowIncrement {
c.receiveWindowIncrement = utils.MinByteCount(inc, c.maxReceiveWindowIncrement)
c.lastWindowUpdateTime = time.Time{} // disables autotuning for the next window update
}
}
func (c *flowController) CheckFlowControlViolation() bool {
return c.highestReceived > c.receiveWindow
}