-
Notifications
You must be signed in to change notification settings - Fork 13
/
unordered.go
60 lines (51 loc) · 947 Bytes
/
unordered.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package parallel
import (
"sync"
"github.com/circonus-labs/circonus-unified-agent/cua"
)
type Unordered struct {
wg sync.WaitGroup
acc cua.Accumulator
fn func(cua.Metric) []cua.Metric
inQueue chan cua.Metric
}
func NewUnordered(
acc cua.Accumulator,
fn func(cua.Metric) []cua.Metric,
workerCount int,
) *Unordered {
p := &Unordered{
acc: acc,
inQueue: make(chan cua.Metric, workerCount),
fn: fn,
}
// start workers
p.wg.Add(1)
go func() {
p.startWorkers(workerCount)
p.wg.Done()
}()
return p
}
func (p *Unordered) startWorkers(count int) {
wg := sync.WaitGroup{}
wg.Add(count)
for i := 0; i < count; i++ {
go func() {
for metric := range p.inQueue {
for _, m := range p.fn(metric) {
p.acc.AddMetric(m)
}
}
wg.Done()
}()
}
wg.Wait()
}
func (p *Unordered) Stop() {
close(p.inQueue)
p.wg.Wait()
}
func (p *Unordered) Enqueue(m cua.Metric) {
p.inQueue <- m
}