forked from Psiphon-Labs/psiphon-tunnel-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
meekBuffer.go
304 lines (256 loc) · 8.77 KB
/
meekBuffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
/*
* Copyright (c) 2017, Psiphon Inc.
* All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package server
import (
"errors"
"io"
)
// CachedResponse is a data structure that supports meek
// protocol connection interruption resiliency: it stores
// payload data from the most recent response so that it
// may be resent if the client fails to receive it.
//
// The meek server maintains one CachedResponse for each
// meek client. Psiphon's variant of meek streams response
// data, so responses are not fixed size. To limit the memory
// overhead of response caching, each CachedResponse has a
// fixed-size buffer that operates as a ring buffer,
// discarding older response bytes when the buffer fills.
// A CachedResponse that has discarded data may still satisfy
// a client retry where the client has already received part
// of the response payload.
//
// A CachedResponse will also extend its capacity by
// borrowing buffers from a CachedResponseBufferPool, if
// available. When Reset is called, borrowed buffers are
// released back to the pool.
type CachedResponse struct {
buffers [][]byte
readPosition int
readAvailable int
writeIndex int
writeBufferIndex int
overwriting bool
extendedBufferPool *CachedResponseBufferPool
}
// NewCachedResponse creates a CachedResponse with a fixed buffer
// of size bufferSize and borrowing buffers from extendedBufferPool.
func NewCachedResponse(
bufferSize int,
extendedBufferPool *CachedResponseBufferPool) *CachedResponse {
return &CachedResponse{
buffers: [][]byte{make([]byte, bufferSize)},
extendedBufferPool: extendedBufferPool,
}
}
// Reset reinitializes the CachedResponse state to have
// no buffered response and releases all extended buffers
// back to the pool.
// Reset _must_ be called before discarding a CachedResponse
// or extended buffers will not be released.
func (response *CachedResponse) Reset() {
for i, buffer := range response.buffers {
if i > 0 {
response.extendedBufferPool.Put(buffer)
}
}
response.buffers = response.buffers[0:1]
response.readPosition = 0
response.readAvailable = 0
response.writeIndex = 0
response.writeBufferIndex = 0
response.overwriting = false
}
// Available returns the size of the buffered response data.
func (response *CachedResponse) Available() int {
return response.readAvailable
}
// HasPosition checks if the CachedResponse has buffered
// response data starting at or before the specified
// position.
func (response *CachedResponse) HasPosition(position int) bool {
return response.readAvailable > 0 && response.readPosition <= position
}
// CopyFromPosition writes the response data, starting at
// the specified position, to writer. Any data before the
// position is skipped. CopyFromPosition will return an error
// if the specified position is not available.
// CopyFromPosition will copy no data and return no error if
// the position is at the end of its available data.
// CopyFromPosition can be called repeatedly to read the
// same data -- it does not advance or modify the CachedResponse.
func (response *CachedResponse) CopyFromPosition(
position int, writer io.Writer) (int, error) {
if response.readAvailable > 0 && response.readPosition > position {
return 0, errors.New("position unavailable")
}
// Special case: position is end of available data
if position == response.readPosition+response.readAvailable {
return 0, nil
}
// Begin at the start of the response data, which may
// be midway through the buffer(s).
index := 0
bufferIndex := 0
if response.overwriting {
index = response.writeIndex
bufferIndex = response.writeBufferIndex
if index >= len(response.buffers[bufferIndex]) {
index = 0
bufferIndex = (bufferIndex + 1) % len(response.buffers)
}
}
// Iterate over all available data, skipping until at the
// requested position.
n := 0
skip := position - response.readPosition
available := response.readAvailable
for available > 0 {
buffer := response.buffers[bufferIndex]
toCopy := min(len(buffer)-index, available)
available -= toCopy
if skip > 0 {
if toCopy >= skip {
index += skip
toCopy -= skip
skip = 0
} else {
skip -= toCopy
}
}
if skip == 0 {
written, err := writer.Write(buffer[index : index+toCopy])
n += written
if err != nil {
return n, err
}
}
index = 0
bufferIndex = (bufferIndex + 1) % len(response.buffers)
}
return n, nil
}
// Write appends data to the CachedResponse. All writes will
// succeed, but only the most recent bytes will be retained
// once the fixed buffer is full and no extended buffers are
// available.
//
// Write may be called multiple times to record a single
// response; Reset should be called between responses.
//
// Write conforms to the io.Writer interface.
func (response *CachedResponse) Write(data []byte) (int, error) {
dataIndex := 0
for dataIndex < len(data) {
// Write into available space in the current buffer
buffer := response.buffers[response.writeBufferIndex]
canWriteLen := len(buffer) - response.writeIndex
needWriteLen := len(data) - dataIndex
writeLen := min(canWriteLen, needWriteLen)
if writeLen > 0 {
copy(
buffer[response.writeIndex:response.writeIndex+writeLen],
data[dataIndex:dataIndex+writeLen])
response.writeIndex += writeLen
// readPosition tracks the earliest position in
// the response that remains in the cached response.
// Once the buffer is full (and cannot be extended),
// older data is overwritten and readPosition advances.
//
// readAvailable is the amount of data in the cached
// response, which may be less than the buffer capacity.
if response.overwriting {
response.readPosition += writeLen
} else {
response.readAvailable += writeLen
}
dataIndex += writeLen
}
if needWriteLen > canWriteLen {
// Add an extended buffer to increase capacity
// TODO: can extend whenever response.readIndex and response.readBufferIndex are 0?
if response.writeBufferIndex == len(response.buffers)-1 &&
!response.overwriting {
extendedBuffer := response.extendedBufferPool.Get()
if extendedBuffer != nil {
response.buffers = append(response.buffers, extendedBuffer)
}
}
// Move to the next buffer, which may wrap around
// This isn't a general ring buffer: Reset is called at
// start of each response, so the initial data is always
// at the beginning of the first buffer. It follows that
// data is overwritten once the buffer wraps around back
// to the beginning.
response.writeBufferIndex++
if response.writeBufferIndex >= len(response.buffers) {
response.writeBufferIndex = 0
response.overwriting = true
}
response.writeIndex = 0
}
}
return len(data), nil
}
// CachedResponseBufferPool is a fixed-size pool of
// fixed-size buffers that are used to temporarily extend
// the capacity of CachedResponses.
type CachedResponseBufferPool struct {
bufferSize int
buffers chan []byte
}
// NewCachedResponseBufferPool creates a new CachedResponseBufferPool
// with the specified number of buffers. Buffers are allocated on
// demand and once allocated remain allocated.
func NewCachedResponseBufferPool(
bufferSize, bufferCount int) *CachedResponseBufferPool {
buffers := make(chan []byte, bufferCount)
for i := 0; i < bufferCount; i++ {
buffers <- make([]byte, 0)
}
return &CachedResponseBufferPool{
bufferSize: bufferSize,
buffers: buffers,
}
}
// Get returns a buffer, if one is available, or returns nil
// when no buffer is available. Get does not block. Call Put
// to release the buffer back to the pool.
//
// Note: currently, Buffers are not zeroed between use by
// different CachedResponses owned by different clients.
// A bug resulting in cross-client data transfer exposes
// only OSSH ciphertext in the case of meek's use of
// CachedResponses.
func (pool *CachedResponseBufferPool) Get() []byte {
select {
case buffer := <-pool.buffers:
if len(buffer) == 0 {
buffer = make([]byte, pool.bufferSize)
}
return buffer
default:
return nil
}
}
// Put releases a buffer back to the pool. The buffer must
// have been obtained from Get.
func (pool *CachedResponseBufferPool) Put(buffer []byte) {
pool.buffers <- buffer
}