-
Notifications
You must be signed in to change notification settings - Fork 2
/
queue.go
55 lines (49 loc) · 1.4 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package queue
import (
"github.com/enriquebris/goconcurrentqueue"
"golang.org/x/sync/errgroup"
)
// QueueWorker allow us to run N concurrent jobs in order
type QueueWorker struct {
queue goconcurrentqueue.Queue
workers int
}
// Closure is the function used for each item in the Queue to handle the logic
type Closure func(v interface{}) error
// New initiates a new FIFO queue
func New(capacity, workers int) *QueueWorker {
q := goconcurrentqueue.NewFixedFIFO(capacity)
// dont allow more workers than there are items in the queue
if workers > q.GetCap() {
workers = q.GetCap()
}
return &QueueWorker{
queue: q,
workers: workers,
}
}
// Push item to the queue
func (qw *QueueWorker) Push(v interface{}) error {
return qw.queue.Enqueue(v)
}
// Work starts N coroutines based on QueueWorker.workers
// The queue will stop at first failure if any.
// The workers expect the queue to be populated before any work is started.
func (qw *QueueWorker) Work(closure Closure) error {
g := new(errgroup.Group)
for i := 0; i < qw.workers; i++ {
g.Go(func() error {
for qw.queue.GetLen() > 0 {
// we will explicitly suppress any Dequeue errors
// since our queue implementation will not support
// concurrently push to the queue while we are working it.
value, _ := qw.queue.Dequeue()
if err := closure(value); err != nil {
return err
}
}
return nil
})
}
return g.Wait()
}