/
fast_queue.go
67 lines (59 loc) · 1.69 KB
/
fast_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package datastreams
import (
"sync/atomic"
"time"
)
const (
queueSize = 10000
)
// there are many writers, there is only 1 reader.
// each value will be read at most once.
// reader will stop if it catches up with writer
// if reader is too slow, there is no guarantee in which order values will be dropped.
type fastQueue struct {
elements [queueSize]atomic.Pointer[processorInput]
writePos atomic.Int64
readPos atomic.Int64
}
func newFastQueue() *fastQueue {
return &fastQueue{}
}
func (q *fastQueue) push(p *processorInput) (dropped bool) {
nextPos := q.writePos.Add(1)
// l is the length of the queue after the element has been added, and before the next element has been read.
l := nextPos - q.readPos.Load()
p.queuePos = nextPos - 1
q.elements[(nextPos-1)%queueSize].Store(p)
return l > queueSize
}
func (q *fastQueue) pop() *processorInput {
writePos := q.writePos.Load()
readPos := q.readPos.Load()
if writePos <= readPos {
return nil
}
loaded := q.elements[readPos%queueSize].Load()
if loaded == nil || loaded.queuePos < readPos {
// the write started, but hasn't finished yet, the element we read
// is the one from the previous cycle.
return nil
}
q.readPos.Add(1)
return loaded
}
func (q *fastQueue) poll(timeout time.Duration) *processorInput {
deadline := time.Now().Add(timeout)
for {
if p := q.pop(); p != nil {
return p
}
if time.Now().After(deadline) {
return nil
}
time.Sleep(10 * time.Millisecond)
}
}