-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
buffer.go
272 lines (242 loc) · 9.53 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package buffer provides a high-performant lock free implementation of a
// circular buffer used by the profiling code.
package buffer
import (
"errors"
"math/bits"
"runtime"
"sync"
"sync/atomic"
"unsafe"
)
type queue struct {
// An array of pointers as references to the items stored in this queue.
arr []unsafe.Pointer
// The maximum number of elements this queue may store before it wraps around
// and overwrites older values. Must be an exponent of 2.
size uint32
// Always size - 1. A bitwise AND is performed with this mask in place of a
// modulo operation by the Push operation.
mask uint32
// Each Push operation into this queue increments the acquired counter before
// proceeding forwarding with the actual write to arr. This counter is also
// used by the Drain operation's drainWait subroutine to wait for all pushes
// to complete.
acquired uint32 // Accessed atomically.
// After the completion of a Push operation, the written counter is
// incremented. Also used by drainWait to wait for all pushes to complete.
written uint32
}
// Allocates and returns a new *queue. size needs to be a exponent of two.
func newQueue(size uint32) *queue {
return &queue{
arr: make([]unsafe.Pointer, size),
size: size,
mask: size - 1,
}
}
// drainWait blocks the caller until all Pushes on this queue are complete.
func (q *queue) drainWait() {
for atomic.LoadUint32(&q.acquired) != atomic.LoadUint32(&q.written) {
runtime.Gosched()
}
}
// A queuePair has two queues. At any given time, Pushes go into the queue
// referenced by queuePair.q. The active queue gets switched when there's a
// drain operation on the circular buffer.
type queuePair struct {
q0 unsafe.Pointer
q1 unsafe.Pointer
q unsafe.Pointer
}
// Allocates and returns a new *queuePair with its internal queues allocated.
func newQueuePair(size uint32) *queuePair {
qp := &queuePair{}
qp.q0 = unsafe.Pointer(newQueue(size))
qp.q1 = unsafe.Pointer(newQueue(size))
qp.q = qp.q0
return qp
}
// Switches the current queue for future Pushes to proceed to the other queue
// so that there's no blocking in Push. Returns a pointer to the old queue that
// was in place before the switch.
func (qp *queuePair) switchQueues() *queue {
// Even though we have mutual exclusion across drainers (thanks to mu.Lock in
// drain), Push operations may access qp.q whilst we're writing to it.
if atomic.CompareAndSwapPointer(&qp.q, qp.q0, qp.q1) {
return (*queue)(qp.q0)
}
atomic.CompareAndSwapPointer(&qp.q, qp.q1, qp.q0)
return (*queue)(qp.q1)
}
// In order to not have expensive modulo operations, we require the maximum
// number of elements in the circular buffer (N) to be an exponent of two to
// use a bitwise AND mask. Since a CircularBuffer is a collection of queuePairs
// (see below), we need to divide N; since exponents of two are only divisible
// by other exponents of two, we use floorCPUCount number of queuePairs within
// each CircularBuffer.
//
// Floor of the number of CPUs (and not the ceiling) was found to the be the
// optimal number through experiments.
func floorCPUCount() uint32 {
floorExponent := bits.Len32(uint32(runtime.NumCPU())) - 1
if floorExponent < 0 {
floorExponent = 0
}
return 1 << uint32(floorExponent)
}
var numCircularBufferPairs = floorCPUCount()
// CircularBuffer is a lock-free data structure that supports Push and Drain
// operations.
//
// Note that CircularBuffer is built for performance more than reliability.
// That is, some Push operations may fail without retries in some situations
// (such as during a Drain operation). Order of pushes is not maintained
// either; that is, if A was pushed before B, the Drain operation may return an
// array with B before A. These restrictions are acceptable within gRPC's
// profiling, but if your use-case does not permit these relaxed constraints
// or if performance is not a primary concern, you should probably use a
// lock-based data structure such as internal/buffer.UnboundedBuffer.
type CircularBuffer struct {
drainMutex sync.Mutex
qp []*queuePair
// qpn is an monotonically incrementing counter that's used to determine
// which queuePair a Push operation should write to. This approach's
// performance was found to be better than writing to a random queue.
qpn uint32
qpMask uint32
}
var errInvalidCircularBufferSize = errors.New("buffer size is not an exponent of two")
// NewCircularBuffer allocates a circular buffer of size size and returns a
// reference to the struct. Only circular buffers of size 2^k are allowed
// (saves us from having to do expensive modulo operations).
func NewCircularBuffer(size uint32) (*CircularBuffer, error) {
if size&(size-1) != 0 {
return nil, errInvalidCircularBufferSize
}
n := numCircularBufferPairs
if size/numCircularBufferPairs < 8 {
// If each circular buffer is going to hold less than a very small number
// of items (let's say 8), using multiple circular buffers is very likely
// wasteful. Instead, fallback to one circular buffer holding everything.
n = 1
}
cb := &CircularBuffer{
qp: make([]*queuePair, n),
qpMask: n - 1,
}
for i := uint32(0); i < n; i++ {
cb.qp[i] = newQueuePair(size / n)
}
return cb, nil
}
// Push pushes an element in to the circular buffer. Guaranteed to complete in
// a finite number of steps (also lock-free). Does not guarantee that push
// order will be retained. Does not guarantee that the operation will succeed
// if a Drain operation concurrently begins execution.
func (cb *CircularBuffer) Push(x any) {
n := atomic.AddUint32(&cb.qpn, 1) & cb.qpMask
qptr := atomic.LoadPointer(&cb.qp[n].q)
q := (*queue)(qptr)
acquired := atomic.AddUint32(&q.acquired, 1) - 1
// If true, it means that we have incremented acquired before any queuePair
// was switched, and therefore before any drainWait completion. Therefore, it
// is safe to proceed with the Push operation on this queue. Otherwise, it
// means that a Drain operation has begun execution, but we don't know how
// far along the process it is. If it is past the drainWait check, it is not
// safe to proceed with the Push operation. We choose to drop this sample
// entirely instead of retrying, as retrying may potentially send the Push
// operation into a spin loop (we want to guarantee completion of the Push
// operation within a finite time). Before exiting, we increment written so
// that any existing drainWaits can proceed.
if atomic.LoadPointer(&cb.qp[n].q) != qptr {
atomic.AddUint32(&q.written, 1)
return
}
// At this point, we're definitely writing to the right queue. That is, one
// of the following is true:
// 1. No drainer is in execution on this queue.
// 2. A drainer is in execution on this queue and it is waiting at the
// acquired == written barrier.
//
// Let's say two Pushes A and B happen on the same queue. Say A and B are
// q.size apart; i.e. they get the same index. That is,
//
// index_A = index_B
// acquired_A + q.size = acquired_B
//
// We say "B has wrapped around A" when this happens. In this case, since A
// occurred before B, B's Push should be the final value. However, we
// accommodate A being the final value because wrap-arounds are extremely
// rare and accounting for them requires an additional counter and a
// significant performance penalty. Note that the below approach never leads
// to any data corruption.
index := acquired & q.mask
atomic.StorePointer(&q.arr[index], unsafe.Pointer(&x))
// Allows any drainWait checks to proceed.
atomic.AddUint32(&q.written, 1)
}
// Dereferences non-nil pointers from arr into result. Range of elements from
// arr that are copied is [from, to). Assumes that the result slice is already
// allocated and is large enough to hold all the elements that might be copied.
// Also assumes mutual exclusion on the array of pointers.
func dereferenceAppend(result []any, arr []unsafe.Pointer, from, to uint32) []any {
for i := from; i < to; i++ {
// We have mutual exclusion on arr, there's no need for atomics.
x := (*any)(arr[i])
if x != nil {
result = append(result, *x)
}
}
return result
}
// Drain allocates and returns an array of things Pushed in to the circular
// buffer. Push order is not maintained; that is, if B was Pushed after A,
// drain may return B at a lower index than A in the returned array.
func (cb *CircularBuffer) Drain() []any {
cb.drainMutex.Lock()
qs := make([]*queue, len(cb.qp))
for i := 0; i < len(cb.qp); i++ {
qs[i] = cb.qp[i].switchQueues()
}
var wg sync.WaitGroup
wg.Add(len(qs))
for i := 0; i < len(qs); i++ {
go func(qi int) {
qs[qi].drainWait()
wg.Done()
}(i)
}
wg.Wait()
result := make([]any, 0)
for i := 0; i < len(qs); i++ {
if acquired := atomic.LoadUint32(&qs[i].acquired); acquired < qs[i].size {
result = dereferenceAppend(result, qs[i].arr, 0, acquired)
} else {
result = dereferenceAppend(result, qs[i].arr, 0, qs[i].size)
}
}
for i := 0; i < len(qs); i++ {
atomic.StoreUint32(&qs[i].acquired, 0)
atomic.StoreUint32(&qs[i].written, 0)
}
cb.drainMutex.Unlock()
return result
}