-
Notifications
You must be signed in to change notification settings - Fork 470
/
nocopy.go
302 lines (265 loc) · 9.43 KB
/
nocopy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package netpoll
import (
"io"
"reflect"
"unsafe"
"github.com/bytedance/gopkg/lang/dirtmake"
"github.com/bytedance/gopkg/lang/mcache"
)
// Reader is a collection of operations for nocopy reads.
//
// For ease of use, it is recommended to implement Reader as a blocking interface,
// rather than simply fetching the buffer.
// For example, the return of calling Next(n) should be blocked if there are fewer than n bytes, unless timeout.
// The return value is guaranteed to meet the requirements or an error will be returned.
type Reader interface {
// Next returns a slice containing the next n bytes from the buffer,
// advancing the buffer as if the bytes had been returned by Read.
//
// If there are fewer than n bytes in the buffer, Next returns will be blocked
// until data enough or an error occurs (such as a wait timeout).
//
// The slice p is only valid until the next call to the Release method.
// Next is not globally optimal, and Skip, ReadString, ReadBinary methods
// are recommended for specific scenarios.
//
// Return: len(p) must be n or 0, and p and error cannot be nil at the same time.
Next(n int) (p []byte, err error)
// Peek returns the next n bytes without advancing the reader.
// Other behavior is the same as Next.
Peek(n int) (buf []byte, err error)
// Skip the next n bytes and advance the reader, which is
// a faster implementation of Next when the next data is not used.
Skip(n int) (err error)
// Until reads until the first occurrence of delim in the input,
// returning a slice stops with delim in the input buffer.
// If Until encounters an error before finding a delimiter,
// it returns all the data in the buffer and the error itself (often ErrEOF or ErrConnClosed).
// Until returns err != nil only if line does not end in delim.
Until(delim byte) (line []byte, err error)
// ReadString is a faster implementation of Next when a string needs to be returned.
// It replaces:
//
// var p, err = Next(n)
// return string(p), err
//
ReadString(n int) (s string, err error)
// ReadBinary is a faster implementation of Next when it needs to
// return a copy of the slice that is not shared with the underlying layer.
// It replaces:
//
// var p, err = Next(n)
// var b = make([]byte, n)
// copy(b, p)
// return b, err
//
ReadBinary(n int) (p []byte, err error)
// ReadByte is a faster implementation of Next when a byte needs to be returned.
// It replaces:
//
// var p, err = Next(1)
// return p[0], err
//
ReadByte() (b byte, err error)
// Slice returns a new Reader containing the Next n bytes from this Reader.
//
// If you want to make a new Reader using the []byte returned by Next, Slice already does that,
// and the operation is zero-copy. Besides, Slice would also Release this Reader.
// The logic pseudocode is similar:
//
// var p, err = this.Next(n)
// var reader = new Reader(p) // pseudocode
// this.Release()
// return reader, err
//
Slice(n int) (r Reader, err error)
// Release the memory space occupied by all read slices. This method needs to be executed actively to
// recycle the memory after confirming that the previously read data is no longer in use.
// After invoking Release, the slices obtained by the method such as Next, Peek, Skip will
// become an invalid address and cannot be used anymore.
Release() (err error)
// Len returns the total length of the readable data in the reader.
Len() (length int)
}
// Writer is a collection of operations for nocopy writes.
//
// The usage of the design is a two-step operation, first apply for a section of memory,
// fill it and then submit. E.g:
//
// var buf, _ = Malloc(n)
// buf = append(buf[:0], ...)
// Flush()
//
// Note that it is not recommended to submit self-managed buffers to Writer.
// Since the writer is processed asynchronously, if the self-managed buffer is used and recycled after submission,
// it may cause inconsistent life cycle problems. Of course this is not within the scope of the design.
type Writer interface {
// Malloc returns a slice containing the next n bytes from the buffer,
// which will be written after submission(e.g. Flush).
//
// The slice p is only valid until the next submit(e.g. Flush).
// Therefore, please make sure that all data has been written into the slice before submission.
Malloc(n int) (buf []byte, err error)
// WriteString is a faster implementation of Malloc when a string needs to be written.
// It replaces:
//
// var buf, err = Malloc(len(s))
// n = copy(buf, s)
// return n, err
//
// The argument string s will be referenced based on the original address and will not be copied,
// so make sure that the string s will not be changed.
WriteString(s string) (n int, err error)
// WriteBinary is a faster implementation of Malloc when a slice needs to be written.
// It replaces:
//
// var buf, err = Malloc(len(b))
// n = copy(buf, b)
// return n, err
//
// The argument slice b will be referenced based on the original address and will not be copied,
// so make sure that the slice b will not be changed.
WriteBinary(b []byte) (n int, err error)
// WriteByte is a faster implementation of Malloc when a byte needs to be written.
// It replaces:
//
// var buf, _ = Malloc(1)
// buf[0] = b
//
WriteByte(b byte) (err error)
// WriteDirect is used to insert an additional slice of data on the current write stream.
// For example, if you plan to execute:
//
// var bufA, _ = Malloc(nA)
// WriteBinary(b)
// var bufB, _ = Malloc(nB)
//
// It can be replaced by:
//
// var buf, _ = Malloc(nA+nB)
// WriteDirect(b, nB)
//
// where buf[:nA] = bufA, buf[nA:nA+nB] = bufB.
WriteDirect(p []byte, remainCap int) error
// MallocAck will keep the first n malloc bytes and discard the rest.
// The following behavior:
//
// var buf, _ = Malloc(8)
// buf = buf[:5]
// MallocAck(5)
//
// equivalent as
// var buf, _ = Malloc(5)
//
MallocAck(n int) (err error)
// Append the argument writer to the tail of this writer and set the argument writer to nil,
// the operation is zero-copy, similar to p = append(p, w.p).
Append(w Writer) (err error)
// Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned.
// Its behavior is equivalent to the io.Writer hat already has parameters(slice b).
Flush() (err error)
// MallocLen returns the total length of the writable data that has not yet been submitted in the writer.
MallocLen() (length int)
}
// ReadWriter is a combination of Reader and Writer.
type ReadWriter interface {
Reader
Writer
}
// NewReader convert io.Reader to nocopy Reader
func NewReader(r io.Reader) Reader {
return newZCReader(r)
}
// NewWriter convert io.Writer to nocopy Writer
func NewWriter(w io.Writer) Writer {
return newZCWriter(w)
}
// NewReadWriter convert io.ReadWriter to nocopy ReadWriter
func NewReadWriter(rw io.ReadWriter) ReadWriter {
return &zcReadWriter{
zcReader: newZCReader(rw),
zcWriter: newZCWriter(rw),
}
}
// NewIOReader convert Reader to io.Reader
func NewIOReader(r Reader) io.Reader {
if reader, ok := r.(io.Reader); ok {
return reader
}
return newIOReader(r)
}
// NewIOWriter convert Writer to io.Writer
func NewIOWriter(w Writer) io.Writer {
if writer, ok := w.(io.Writer); ok {
return writer
}
return newIOWriter(w)
}
// NewIOReadWriter convert ReadWriter to io.ReadWriter
func NewIOReadWriter(rw ReadWriter) io.ReadWriter {
if rwer, ok := rw.(io.ReadWriter); ok {
return rwer
}
return &ioReadWriter{
ioReader: newIOReader(rw),
ioWriter: newIOWriter(rw),
}
}
const (
block1k = 1 * 1024
block2k = 2 * 1024
block4k = 4 * 1024
block8k = 8 * 1024
block32k = 32 * 1024
pagesize = block8k
mallocMax = block8k * block1k // mallocMax is 8MB
minReuseBytes = 64 // only reuse bytes if n >= minReuseBytes
defaultLinkBufferMode = 0
// readonlyMask is used to set readonly mode,
// which indicate that the buffer node memory is not controlled by itself,
// so we cannot reuse the buffer or nocopy read it.
readonlyMask uint8 = 1 << 0 // 0000 0001
// readonlyMask is used to set nocopyRead mode,
// which indicate that the buffer node has been no copy read and cannot reuse the buffer.
nocopyReadMask uint8 = 1 << 1 // 0000 0010
)
// zero-copy slice convert to string
func unsafeSliceToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
// zero-copy slice convert to string
func unsafeStringToSlice(s string) (b []byte) {
p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data)
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
hdr.Data = uintptr(p)
hdr.Cap = len(s)
hdr.Len = len(s)
return b
}
// malloc limits the cap of the buffer from mcache.
func malloc(size, capacity int) []byte {
if capacity > mallocMax {
return dirtmake.Bytes(size, capacity)
}
return mcache.Malloc(size, capacity)
}
// free limits the cap of the buffer from mcache.
func free(buf []byte) {
if cap(buf) > mallocMax {
return
}
mcache.Free(buf)
}