forked from couchbase/indexing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mutation_queue_atomic.go
451 lines (370 loc) · 13.3 KB
/
mutation_queue_atomic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package indexer
import (
"errors"
"sync/atomic"
"time"
"unsafe"
"github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
)
//MutationQueue interface specifies methods which a mutation queue for indexer
//needs to implement
type MutationQueue interface {
//enqueue a mutation reference based on vbucket. This is a blocking call which
//will wait in case there is no free slot available for allocation.
//caller can close the appch to force this call to return.
Enqueue(mutation *MutationKeys, vbucket Vbucket, appch StopChannel) error
//dequeue a vbucket's mutation and keep sending on a channel until stop signal
Dequeue(vbucket Vbucket) (<-chan *MutationKeys, chan<- bool, error)
//dequeue a vbucket's mutation upto seqno(wait if not available)
DequeueUptoSeqno(vbucket Vbucket, seqno Seqno) (<-chan *MutationKeys, chan bool, error)
//dequeue single element for a vbucket and return
DequeueSingleElement(vbucket Vbucket) *MutationKeys
//return reference to a vbucket's mutation at Tail of queue without dequeue
PeekTail(vbucket Vbucket) *MutationKeys
//return reference to a vbucket's mutation at Head of queue without dequeue
PeekHead(vbucket Vbucket) *MutationKeys
//return size of queue per vbucket
GetSize(vbucket Vbucket) int64
//returns the numbers of vbuckets for the queue
GetNumVbuckets() uint16
//destroy the resources
Destroy()
}
//AtomicMutationQueue is a lock-free multi-queue with internal queue per
//vbucket for storing mutation references. This is loosely based on
//http://www.drdobbs.com/parallel/writing-lock-free-code-a-corrected-queue/210604448?pgno=1
//with the main difference being that free nodes are being reused here to reduce GC.
//
//It doesn't copy the mutation and its caller's responsiblity
//to allocate/deallocate KeyVersions struct. A mutation which is currently in queue
//shouldn't be freed.
//
//This implementation uses Go "atomic" pkg to provide safe concurrent access
//for a single reader and writer per vbucket queue without using mutex locks.
//
//It provides safe concurrent read/write access across vbucket queues.
type atomicMutationQueue struct {
head []unsafe.Pointer //head pointer per vbucket queue
tail []unsafe.Pointer //tail pointer per vbucket queue
size []int64 //size of queue per vbucket
memUsed *int64 //memory used by queue
maxMemory *int64 //max memory to be used
allocPollInterval uint64 //poll interval for new allocs, if queue is full
dequeuePollInterval uint64 //poll interval for dequeue, if waiting for mutations
resultChanSize uint64 //size of buffered result channel
minQueueLen uint64
free []*node //free pointer per vbucket queue
stopch []StopChannel
numVbuckets uint16 //num vbuckets for the queue
isDestroyed bool
bucket string
}
//NewAtomicMutationQueue allocates a new Atomic Mutation Queue and initializes it
func NewAtomicMutationQueue(bucket string, numVbuckets uint16, maxMemory *int64,
memUsed *int64, config common.Config) *atomicMutationQueue {
q := &atomicMutationQueue{head: make([]unsafe.Pointer, numVbuckets),
tail: make([]unsafe.Pointer, numVbuckets),
free: make([]*node, numVbuckets),
size: make([]int64, numVbuckets),
numVbuckets: numVbuckets,
maxMemory: maxMemory,
memUsed: memUsed,
stopch: make([]StopChannel, numVbuckets),
allocPollInterval: getAllocPollInterval(config),
dequeuePollInterval: config["mutation_queue.dequeuePollInterval"].Uint64(),
resultChanSize: config["mutation_queue.resultChanSize"].Uint64(),
minQueueLen: config["settings.minVbQueueLength"].Uint64(),
bucket: bucket,
}
var x uint16
for x = 0; x < numVbuckets; x++ {
node := &node{} //sentinel node for the queue
q.head[x] = unsafe.Pointer(node)
q.tail[x] = unsafe.Pointer(node)
q.free[x] = node
q.stopch[x] = make(StopChannel)
q.size[x] = 0
}
return q
}
//Node represents a single element in the queue
type node struct {
mutation *MutationKeys
next *node
}
//Enqueue will enqueue the mutation reference for given vbucket.
//Caller should not free the mutation till it is dequeued.
//Mutation will not be copied internally by the queue.
//caller can call appch to force this call to return. Otherwise
//this is a blocking call till there is a slot available for enqueue.
func (q *atomicMutationQueue) Enqueue(mutation *MutationKeys,
vbucket Vbucket, appch StopChannel) error {
if vbucket < 0 || vbucket > Vbucket(q.numVbuckets)-1 {
return errors.New("vbucket out of range")
}
//no more requests are taken once queue
//is marked as destroyed
if q.isDestroyed {
return nil
}
//create a new node
n := q.allocNode(vbucket, appch)
if n == nil {
return nil
}
n.mutation = mutation
n.next = nil
atomic.AddInt64(q.memUsed, n.mutation.Size())
//point tail's next to new node
tail := (*node)(atomic.LoadPointer(&q.tail[vbucket]))
tail.next = n
//update tail to new node
atomic.StorePointer(&q.tail[vbucket], unsafe.Pointer(tail.next))
atomic.AddInt64(&q.size[vbucket], 1)
return nil
}
//DequeueUptoSeqno returns a channel on which it will return mutation reference
//for specified vbucket upto the sequence number specified.
//This function will keep polling till mutations upto seqno are available
//to be sent. It terminates when it finds a mutation with seqno higher than
//the one specified as argument. This allow for multiple mutations with same
//seqno (e.g. in case of multiple indexes)
//It closes the mutation channel to indicate its done.
func (q *atomicMutationQueue) DequeueUptoSeqno(vbucket Vbucket, seqno Seqno) (
<-chan *MutationKeys, chan bool, error) {
datach := make(chan *MutationKeys, q.resultChanSize)
errch := make(chan bool)
go q.dequeueUptoSeqno(vbucket, seqno, datach, errch)
return datach, errch, nil
}
func (q *atomicMutationQueue) dequeueUptoSeqno(vbucket Vbucket, seqno Seqno,
datach chan *MutationKeys, errch chan bool) {
var dequeueSeq Seqno
var totalWait int
for {
totalWait += int(q.dequeuePollInterval)
if totalWait > 30000 {
if totalWait%5000 == 0 {
logging.Warnf("Indexer::MutationQueue Dequeue Waiting For "+
"Seqno %v Bucket %v Vbucket %v for %v ms. Last Dequeue %v.", seqno,
q.bucket, vbucket, totalWait, dequeueSeq)
}
}
for atomic.LoadPointer(&q.head[vbucket]) !=
atomic.LoadPointer(&q.tail[vbucket]) { //if queue is nonempty
head := (*node)(atomic.LoadPointer(&q.head[vbucket]))
//copy the mutation pointer
m := head.next.mutation
if seqno >= m.meta.seqno {
//free mutation pointer
head.next.mutation = nil
//move head to next
atomic.StorePointer(&q.head[vbucket], unsafe.Pointer(head.next))
atomic.AddInt64(&q.size[vbucket], -1)
atomic.AddInt64(q.memUsed, -m.Size())
//send mutation to caller
dequeueSeq = m.meta.seqno
datach <- m
} else {
logging.Warnf("Indexer::MutationQueue Dequeue Aborted For "+
"Seqno %v Bucket %v Vbucket %v. Last Dequeue %v Head Seqno %v.", seqno,
q.bucket, vbucket, dequeueSeq, m.meta.seqno)
close(errch)
return
}
//once the seqno is reached, close the channel
if seqno <= dequeueSeq {
close(datach)
return
}
}
time.Sleep(time.Millisecond * time.Duration(q.dequeuePollInterval))
}
}
//Dequeue returns a channel on which it will return mutation reference for specified vbucket.
//This function will keep polling and send mutations as those become available.
//It returns a stop channel on which caller can signal it to stop.
func (q *atomicMutationQueue) Dequeue(vbucket Vbucket) (<-chan *MutationKeys,
chan<- bool, error) {
datach := make(chan *MutationKeys)
stopch := make(chan bool)
//every dequeuePollInterval milliseconds, check for new mutations
ticker := time.NewTicker(time.Millisecond * time.Duration(q.dequeuePollInterval))
go func() {
for {
select {
case <-ticker.C:
q.dequeue(vbucket, datach)
case <-stopch:
ticker.Stop()
close(datach)
return
}
}
}()
return datach, stopch, nil
}
func (q *atomicMutationQueue) dequeue(vbucket Vbucket, datach chan *MutationKeys) {
//keep dequeuing till list is empty
for {
m := q.DequeueSingleElement(vbucket)
if m == nil {
return
}
//send mutation to caller
datach <- m
}
}
//DequeueSingleElement dequeues a single element and returns.
//Returns nil in case of empty queue.
func (q *atomicMutationQueue) DequeueSingleElement(vbucket Vbucket) *MutationKeys {
if atomic.LoadPointer(&q.head[vbucket]) !=
atomic.LoadPointer(&q.tail[vbucket]) { //if queue is nonempty
head := (*node)(atomic.LoadPointer(&q.head[vbucket]))
//copy the mutation pointer
m := head.next.mutation
//free mutation pointer
head.next.mutation = nil
//move head to next
atomic.StorePointer(&q.head[vbucket], unsafe.Pointer(head.next))
atomic.AddInt64(&q.size[vbucket], -1)
atomic.AddInt64(q.memUsed, -m.Size())
return m
}
return nil
}
//PeekTail returns reference to a vbucket's mutation at tail of queue without dequeue
func (q *atomicMutationQueue) PeekTail(vbucket Vbucket) *MutationKeys {
if atomic.LoadPointer(&q.head[vbucket]) !=
atomic.LoadPointer(&q.tail[vbucket]) { //if queue is nonempty
tail := (*node)(atomic.LoadPointer(&q.tail[vbucket]))
return tail.mutation
}
return nil
}
//PeekHead returns reference to a vbucket's mutation at head of queue without dequeue
func (q *atomicMutationQueue) PeekHead(vbucket Vbucket) *MutationKeys {
if atomic.LoadPointer(&q.head[vbucket]) !=
atomic.LoadPointer(&q.tail[vbucket]) { //if queue is nonempty
head := (*node)(atomic.LoadPointer(&q.head[vbucket]))
return head.mutation
}
return nil
}
//GetSize returns the size of the vbucket queue
func (q *atomicMutationQueue) GetSize(vbucket Vbucket) int64 {
return atomic.LoadInt64(&q.size[vbucket])
}
//GetNumVbuckets returns the numbers of vbuckets for the queue
func (q *atomicMutationQueue) GetNumVbuckets() uint16 {
return q.numVbuckets
}
//allocNode tries to get node from freelist, otherwise allocates a new node and returns
func (q *atomicMutationQueue) allocNode(vbucket Vbucket, appch StopChannel) *node {
n := q.checkMemAndAlloc(vbucket)
if n != nil {
return n
}
//every allocPollInterval milliseconds, check for memory usage
ticker := time.NewTicker(time.Millisecond * time.Duration(q.allocPollInterval))
defer ticker.Stop()
var totalWait uint64
for {
select {
case <-ticker.C:
totalWait += q.allocPollInterval
n := q.checkMemAndAlloc(vbucket)
if n != nil {
return n
}
if totalWait > 300000 { // 5mins
logging.Warnf("Indexer::MutationQueue Max Wait Period for Node "+
"Alloc Expired %v. Forcing Alloc. Bucket %v Vbucket %v", totalWait, q.bucket, vbucket)
return &node{}
} else if totalWait > 5000 {
if totalWait%3000 == 0 {
logging.Warnf("Indexer::MutationQueue Waiting for Node "+
"Alloc for %v Milliseconds Bucket %v Vbucket %v", totalWait, q.bucket, vbucket)
}
}
case <-q.stopch[vbucket]:
return nil
case <-appch:
//caller no longer wants to wait
//allocate new node and return
return &node{}
}
}
return nil
}
func (q *atomicMutationQueue) checkMemAndAlloc(vbucket Vbucket) *node {
currMem := atomic.LoadInt64(q.memUsed)
maxMem := atomic.LoadInt64(q.maxMemory)
currLen := atomic.LoadInt64(&q.size[vbucket])
if currMem < maxMem || currLen < int64(q.minQueueLen) {
//get node from freelist
n := q.popFreeList(vbucket)
if n != nil {
return n
} else {
//allocate new node and return
return &node{}
}
} else {
return nil
}
}
//popFreeList removes a node from freelist and returns to caller.
//if freelist is empty, it returns nil.
func (q *atomicMutationQueue) popFreeList(vbucket Vbucket) *node {
if q.free[vbucket] != (*node)(atomic.LoadPointer(&q.head[vbucket])) {
n := q.free[vbucket]
q.free[vbucket] = q.free[vbucket].next
n.mutation = nil
n.next = nil
return n
} else {
return nil
}
}
//Destroy will free up all resources of the queue.
//Importantly it will free up pending mutations as well.
//Once destroy have been called, further enqueue operations
//will be no-op.
func (q *atomicMutationQueue) Destroy() {
//set the flag so no more Enqueue requests
//are taken on this queue
q.isDestroyed = true
//ensure all pending allocs get stopped
var i uint16
for i = 0; i < q.numVbuckets; i++ {
close(q.stopch[i])
}
//dequeue all the items in the queue and free
for i = 0; i < q.numVbuckets; i++ {
mutch := make(chan *MutationKeys)
go func() {
for mutk := range mutch {
mutk.Free()
}
}()
q.dequeue(Vbucket(i), mutch)
close(mutch)
}
}
func getAllocPollInterval(config common.Config) uint64 {
if common.GetStorageMode() == common.FORESTDB {
return config["mutation_queue.fdb.allocPollInterval"].Uint64()
} else {
return config["mutation_queue.moi.allocPollInterval"].Uint64()
}
}