forked from twmb/franz-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
ring.go
269 lines (217 loc) · 5.81 KB
/
ring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
package kgo
import "sync"
// The ring types below are fixed sized blocking MPSC ringbuffers. These
// replace channels in a few places in this client. The *main* advantage they
// provide is to allow loops that terminate.
//
// With channels, we always have to have a goroutine draining the channel. We
// cannot start the goroutine when we add the first element, because the
// goroutine will immediately drain the first and if something produces right
// away, it will start a second concurrent draining goroutine.
//
// We cannot fix that by adding a "working" field, because we would need a lock
// around checking if the goroutine still has elements *and* around setting the
// working field to false. If a push was blocked, it would be holding the lock,
// which would block the worker from grabbing the lock. Any other lock ordering
// has TOCTOU problems as well.
//
// We could use a slice that we always push to and pop the front of. This is a
// bit easier to reason about, but constantly reallocates and has no bounded
// capacity. The second we think about adding bounded capacity, we get this
// ringbuffer below.
//
// The key insight is that we only pop the front *after* we are done with it.
// If there are still more elements, the worker goroutine can continue working.
// If there are no more elements, it can quit. When pushing, if the pusher
// pushed the first element, it starts the worker.
//
// Pushes fail if the ring is dead, allowing the pusher to fail any promise.
// If a die happens while a worker is running, all future pops will see the
// ring is dead and can fail promises immediately. If a worker is not running,
// then there are no promises that need to be called.
//
// We use size 8 buffers because eh why not. This gives us a small optimization
// of masking to increment and decrement, rather than modulo arithmetic.
const (
mask7 = 0b0000_0111
eight = mask7 + 1
)
type ringReq struct {
mu sync.Mutex
c *sync.Cond
elems [eight]promisedReq
head uint8
tail uint8
l uint8
dead bool
}
func (r *ringReq) die() {
r.mu.Lock()
defer r.mu.Unlock()
r.dead = true
if r.c != nil {
r.c.Broadcast()
}
}
func (r *ringReq) push(pr promisedReq) (first, dead bool) {
r.mu.Lock()
defer r.mu.Unlock()
for r.l == eight && !r.dead {
if r.c == nil {
r.c = sync.NewCond(&r.mu)
}
r.c.Wait()
}
if r.dead {
return false, true
}
r.elems[r.tail] = pr
r.tail = (r.tail + 1) & mask7
r.l++
return r.l == 1, false
}
func (r *ringReq) dropPeek() (next promisedReq, more, dead bool) {
r.mu.Lock()
defer r.mu.Unlock()
r.elems[r.head] = promisedReq{}
r.head = (r.head + 1) & mask7
r.l--
// If the cond has been initialized, there could potentially be waiters
// and we must always signal.
if r.c != nil {
r.c.Signal()
}
return r.elems[r.head], r.l > 0, r.dead
}
// ringResp duplicates the code above, but for promisedResp
type ringResp struct {
mu sync.Mutex
c *sync.Cond
elems [eight]promisedResp
head uint8
tail uint8
l uint8
dead bool
}
func (r *ringResp) die() {
r.mu.Lock()
defer r.mu.Unlock()
r.dead = true
if r.c != nil {
r.c.Broadcast()
}
}
func (r *ringResp) push(pr promisedResp) (first, dead bool) {
r.mu.Lock()
defer r.mu.Unlock()
for r.l == eight && !r.dead {
if r.c == nil {
r.c = sync.NewCond(&r.mu)
}
r.c.Wait()
}
if r.dead {
return false, true
}
r.elems[r.tail] = pr
r.tail = (r.tail + 1) & mask7
r.l++
return r.l == 1, false
}
func (r *ringResp) dropPeek() (next promisedResp, more, dead bool) {
r.mu.Lock()
defer r.mu.Unlock()
r.elems[r.head] = promisedResp{}
r.head = (r.head + 1) & mask7
r.l--
if r.c != nil {
r.c.Signal()
}
return r.elems[r.head], r.l > 0, r.dead
}
// ringSeqResp duplicates the code above, but for *seqResp. We leave off die
// because we do not use it, but we keep `c` for testing lowering eight/mask7.
type ringSeqResp struct {
mu sync.Mutex
c *sync.Cond
elems [eight]*seqResp
head uint8
tail uint8
l uint8
}
func (r *ringSeqResp) push(sr *seqResp) (first bool) {
r.mu.Lock()
defer r.mu.Unlock()
for r.l == eight {
if r.c == nil {
r.c = sync.NewCond(&r.mu)
}
r.c.Wait()
}
r.elems[r.tail] = sr
r.tail = (r.tail + 1) & mask7
r.l++
return r.l == 1
}
func (r *ringSeqResp) dropPeek() (next *seqResp, more bool) {
r.mu.Lock()
defer r.mu.Unlock()
r.elems[r.head] = nil
r.head = (r.head + 1) & mask7
r.l--
if r.c != nil {
r.c.Signal()
}
return r.elems[r.head], r.l > 0
}
// Also no die; this type is slightly different because we can have overflow.
// If we have overflow, we add to overflow until overflow is drained -- we
// always want strict odering.
type ringBatchPromise struct {
mu sync.Mutex
elems [eight]batchPromise
head uint8
tail uint8
l uint8
overflow []batchPromise
}
func (r *ringBatchPromise) push(b batchPromise) (first bool) {
r.mu.Lock()
defer r.mu.Unlock()
// If the ring is full, we go into overflow; if overflow is non-empty,
// for ordering purposes, we add to the end of overflow. We only go
// back to using the ring once overflow is finally empty.
if r.l == eight || len(r.overflow) > 0 {
r.overflow = append(r.overflow, b)
return false
}
r.elems[r.tail] = b
r.tail = (r.tail + 1) & mask7
r.l++
return r.l == 1
}
func (r *ringBatchPromise) dropPeek() (next batchPromise, more bool) {
r.mu.Lock()
defer r.mu.Unlock()
// We always drain the ring first. If the ring is ever empty, there
// must be overflow: we would not be here if the ring is not-empty.
if r.l > 1 {
r.elems[r.head] = batchPromise{}
r.head = (r.head + 1) & mask7
r.l--
return r.elems[r.head], true
} else if r.l == 1 {
r.elems[r.head] = batchPromise{}
r.head = (r.head + 1) & mask7
r.l--
if len(r.overflow) == 0 {
return next, false
}
return r.overflow[0], true
}
r.overflow = r.overflow[1:]
if len(r.overflow) > 0 {
return r.overflow[0], true
}
return next, false
}