forked from SlyMarbo/spdy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
io.go
221 lines (195 loc) · 5.1 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package spdy3
import (
"runtime"
"github.com/SlyMarbo/spdy/common"
"github.com/SlyMarbo/spdy/spdy3/frames"
)
// readFrames is the main processing loop, where frames
// are read from the connection and processed individually.
// Returning from readFrames begins the cleanup and exit
// process for this connection.
func (c *Conn) readFrames() {
// Ensure no panics happen.
defer func() {
if v := recover(); v != nil {
if !c.Closed() {
log.Printf("Encountered receive error: %v (%[1]T)\n", v)
}
}
}()
for {
// This is the mechanism for handling too many benign errors.
// By default MaxBenignErrors is 0, which ignores errors.
too_many := c.numBenignErrors > common.MaxBenignErrors && common.MaxBenignErrors > 0
if c.criticalCheck(too_many, 0, "Ending connection for benign error buildup") {
return
}
// ReadFrame takes care of the frame parsing for us.
c.refreshReadTimeout()
frame, err := frames.ReadFrame(c.buf, c.Subversion)
if err != nil {
c.handleReadWriteError(err)
return
}
debug.Printf("Receiving %s:\n", frame.Name()) // Print frame type.
// Decompress the frame's headers, if there are any.
err = frame.Decompress(c.decompressor)
if c.criticalCheck(err != nil, 0, "Decompression: %v", err) {
return
}
debug.Println(frame) // Print frame once the content's been decompressed.
if c.processFrame(frame) {
return
}
}
}
// send is run in a separate goroutine. It's used
// to ensure clear interleaving of frames and to
// provide assurances of priority and structure.
func (c *Conn) send() {
// Catch any panics.
defer func() {
if v := recover(); v != nil {
if !c.Closed() {
log.Printf("Encountered send error: %v (%[1]T)\n", v)
}
}
}()
for i := 1; ; i++ {
if i >= 5 {
i = 0 // Once per 5 frames, pick randomly.
}
var frame common.Frame
if i == 0 { // Ignore priority.
frame = c.selectFrameToSend(false)
} else { // Normal selection.
frame = c.selectFrameToSend(true)
}
if frame == nil {
c.Close()
return
}
// Process connection-level flow control.
if c.Subversion > 0 {
c.connectionWindowLock.Lock()
if frame, ok := frame.(*frames.DATA); ok {
size := int64(len(frame.Data))
constrained := false
sending := size
if sending > c.connectionWindowSize {
sending = c.connectionWindowSize
constrained = true
}
if sending < 0 {
sending = 0
}
c.connectionWindowSize -= sending
if constrained {
// Chop off what we can send now.
partial := new(frames.DATA)
partial.Flags = frame.Flags
partial.StreamID = frame.StreamID
partial.Data = make([]byte, int(sending))
copy(partial.Data, frame.Data[:sending])
frame.Data = frame.Data[sending:]
// Buffer this frame and try again.
if c.dataBuffer == nil {
c.dataBuffer = []*frames.DATA{frame}
} else {
buffer := make([]*frames.DATA, 1, len(c.dataBuffer)+1)
buffer[0] = frame
buffer = append(buffer, c.dataBuffer...)
c.dataBuffer = buffer
}
frame = partial
}
}
c.connectionWindowLock.Unlock()
}
// Compress any name/value header blocks.
err := frame.Compress(c.compressor)
if err != nil {
log.Printf("Error in compression: %v (type %T).\n", err, frame)
c.Close()
return
}
debug.Printf("Sending %s:\n", frame.Name())
debug.Println(frame)
// Leave the specifics of writing to the
// connection up to the frame.
c.refreshWriteTimeout()
if _, err = frame.WriteTo(c.conn); err != nil {
c.handleReadWriteError(err)
return
}
}
}
// selectFrameToSend follows the specification's guidance
// on frame priority, sending frames with higher priority
// (a smaller number) first. If the given boolean is false,
// this priority is temporarily ignored, which can be used
// when high load is ignoring low-priority frames.
func (c *Conn) selectFrameToSend(prioritise bool) (frame common.Frame) {
if c.Closed() {
return nil
}
// Try buffered DATA frames first.
if c.Subversion > 0 {
if c.dataBuffer != nil {
if len(c.dataBuffer) == 0 {
c.dataBuffer = nil
} else {
first := c.dataBuffer[0]
if c.connectionWindowSize >= int64(8+len(first.Data)) {
if len(c.dataBuffer) > 1 {
c.dataBuffer = c.dataBuffer[1:]
} else {
c.dataBuffer = nil
}
return first
}
}
}
}
// Then in priority order.
if prioritise {
for i := 0; i < 8; i++ {
select {
case frame = <-c.output[i]:
return frame
default:
}
}
// No frames are immediately pending, so if the
// cection is being closed, cease sending
// safely.
c.sendingLock.Lock()
if c.sending != nil {
close(c.sending)
c.sendingLock.Unlock()
runtime.Goexit()
}
c.sendingLock.Unlock()
}
// Wait for any frame.
select {
case frame = <-c.output[0]:
return frame
case frame = <-c.output[1]:
return frame
case frame = <-c.output[2]:
return frame
case frame = <-c.output[3]:
return frame
case frame = <-c.output[4]:
return frame
case frame = <-c.output[5]:
return frame
case frame = <-c.output[6]:
return frame
case frame = <-c.output[7]:
return frame
case _ = <-c.stop:
return nil
}
}