forked from kubernetes/kubernetes
/
batcher.go
142 lines (121 loc) · 3.19 KB
/
batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package tsdb
import (
"sync"
"sync/atomic"
"time"
)
// PointBatcher accepts Points and will emit a batch of those points when either
// a) the batch reaches a certain size, or b) a certain time passes.
type PointBatcher struct {
size int
duration time.Duration
stop chan struct{}
in chan Point
out chan []Point
flush chan struct{}
stats PointBatcherStats
wg *sync.WaitGroup
}
// NewPointBatcher returns a new PointBatcher.
func NewPointBatcher(sz int, d time.Duration) *PointBatcher {
return &PointBatcher{
size: sz,
duration: d,
stop: make(chan struct{}),
in: make(chan Point),
out: make(chan []Point),
flush: make(chan struct{}),
}
}
// PointBatcherStats are the statistics each batcher tracks.
type PointBatcherStats struct {
BatchTotal uint64 // Total count of batches transmitted.
PointTotal uint64 // Total count of points processed.
SizeTotal uint64 // Number of batches that reached size threshold.
TimeoutTotal uint64 // Number of timeouts that occurred.
}
// Start starts the batching process. Returns the in and out channels for points
// and point-batches respectively.
func (b *PointBatcher) Start() {
// Already running?
if b.wg != nil {
return
}
var timer *time.Timer
var batch []Point
var timerCh <-chan time.Time
emit := func() {
b.out <- batch
atomic.AddUint64(&b.stats.BatchTotal, 1)
batch = nil
}
b.wg = &sync.WaitGroup{}
b.wg.Add(1)
go func() {
defer b.wg.Done()
for {
select {
case <-b.stop:
if len(batch) > 0 {
emit()
timerCh = nil
}
return
case p := <-b.in:
atomic.AddUint64(&b.stats.PointTotal, 1)
if batch == nil {
batch = make([]Point, 0, b.size)
if b.duration > 0 {
timer = time.NewTimer(b.duration)
timerCh = timer.C
}
}
batch = append(batch, p)
if len(batch) >= b.size { // 0 means send immediately.
atomic.AddUint64(&b.stats.SizeTotal, 1)
emit()
timerCh = nil
}
case <-b.flush:
if len(batch) > 0 {
emit()
timerCh = nil
}
case <-timerCh:
atomic.AddUint64(&b.stats.TimeoutTotal, 1)
emit()
}
}
}()
}
func (b *PointBatcher) Stop() {
// If not running, nothing to stop.
if b.wg == nil {
return
}
close(b.stop)
b.wg.Wait()
}
// In returns the channel to which points should be written.
func (b *PointBatcher) In() chan<- Point {
return b.in
}
// Out returns the channel from which batches should be read.
func (b *PointBatcher) Out() <-chan []Point {
return b.out
}
// Flush instructs the batcher to emit any pending points in a batch, regardless of batch size.
// If there are no pending points, no batch is emitted.
func (b *PointBatcher) Flush() {
b.flush <- struct{}{}
}
// Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be
// closely correlated with each other statistic, it is not guaranteed.
func (b *PointBatcher) Stats() *PointBatcherStats {
stats := PointBatcherStats{}
stats.BatchTotal = atomic.LoadUint64(&b.stats.BatchTotal)
stats.PointTotal = atomic.LoadUint64(&b.stats.PointTotal)
stats.SizeTotal = atomic.LoadUint64(&b.stats.SizeTotal)
stats.TimeoutTotal = atomic.LoadUint64(&b.stats.TimeoutTotal)
return &stats
}