-
Notifications
You must be signed in to change notification settings - Fork 27
/
queue.go
95 lines (80 loc) · 1.97 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package async
import (
"runtime"
"sync"
)
// Job is the generic interface object representing the data being
// pushed to the jobs queue and being passed to the workers.
type Job interface{}
// Logic is the implementation of the logic each worker will execute.
type Logic func(arg Job)
// WorkQueue is the object representing an async jobs queue with
// a given number of concurrent workers.
type WorkQueue struct {
workers int
jobChan chan Job
stopChan chan struct{}
jobs sync.WaitGroup
done sync.WaitGroup
logic Logic
}
// NewQueue creates a new job queue with a specific worker logic.
// If workers is greater or equal than zero, it will be auto
// scaled to the number of logical CPUs usable by the current
// process.
func NewQueue(workers int, logic Logic) *WorkQueue {
return createQueue(workers, logic, 0)
}
func NewBufferedQueue(workers int, logic Logic, size int) *WorkQueue {
return createQueue(workers, logic, size)
}
func createQueue(workers int, logic Logic, bufferSize int) *WorkQueue {
if workers <= 0 {
workers = runtime.NumCPU()
}
wq := &WorkQueue{
workers: workers,
jobChan: make(chan Job, bufferSize),
stopChan: make(chan struct{}),
jobs: sync.WaitGroup{},
done: sync.WaitGroup{},
logic: logic,
}
for i := 0; i < workers; i++ {
wq.done.Add(1)
go wq.worker(i)
}
return wq
}
func (wq *WorkQueue) worker(id int) {
defer wq.done.Done()
for {
select {
case <-wq.stopChan:
return
case job := <-wq.jobChan:
wq.logic(job)
wq.jobs.Done()
}
}
}
// Add pushes a new job to the queue.
func (wq *WorkQueue) Add(job Job) {
wq.jobs.Add(1)
wq.jobChan <- job
}
// Wait stops until all the workers stopped.
func (wq *WorkQueue) Wait() {
wq.done.Wait()
}
// WaitDone stops until all jobs on the queue have been processed.
func (wq *WorkQueue) WaitDone() {
wq.jobs.Wait()
}
// Stop stops the job queue and the workers.
func (wq *WorkQueue) Stop() {
close(wq.stopChan)
wq.jobs.Wait()
wq.done.Wait()
close(wq.jobChan)
}