-
Notifications
You must be signed in to change notification settings - Fork 1
/
queue.go
175 lines (160 loc) · 3.91 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package conc
import (
"context"
"sync"
"github.com/aperturerobotics/util/broadcast"
"github.com/aperturerobotics/util/linkedlist"
)
// ConcurrentQueue is a pool of goroutines processing a stream of jobs.
// Job callbacks are called in the order they are added.
type ConcurrentQueue struct {
// mtx guards below fields
mtx sync.Mutex
// bcast is broadcasted when fields change
bcast broadcast.Broadcast
// maxConcurrency is the concurrency limit or 0 if none
maxConcurrency int
// running is the number of running goroutines.
running int
// jobQueue is the job queue linked list.
jobQueue *linkedlist.LinkedList[func()]
// jobQueueSize is the current size of jobQueue
jobQueueSize int
}
// NewConcurrentQueue constructs a new stream concurrency manager.
// initialElems contains the initial set of queued entries.
// if maxConcurrency <= 0, spawns infinite goroutines.
func NewConcurrentQueue(maxConcurrency int, initialElems ...func()) *ConcurrentQueue {
str := &ConcurrentQueue{
jobQueue: linkedlist.NewLinkedList(initialElems...),
jobQueueSize: len(initialElems),
maxConcurrency: maxConcurrency,
}
if len(initialElems) != 0 {
str.mtx.Lock()
str.updateLocked()
str.mtx.Unlock()
}
return str
}
// Enqueue enqueues a job callback to the stream.
// If possible, the job is started immediately and skips the queue.
// Returns the current number of queued and running jobs.
func (s *ConcurrentQueue) Enqueue(jobs ...func()) (queued, running int) {
s.mtx.Lock()
defer s.mtx.Unlock()
if len(jobs) == 0 {
return s.jobQueueSize, s.running
}
for _, job := range jobs {
if s.maxConcurrency <= 0 || s.running < s.maxConcurrency {
s.running++
go s.executeJob(job)
} else {
s.jobQueueSize++
s.jobQueue.Push(job)
}
}
s.bcast.Broadcast()
return s.jobQueueSize, s.running
}
// WaitIdle waits for no jobs to be running.
// Returns context.Canceled if ctx is canceled.
// errCh is an optional error channel.
func (s *ConcurrentQueue) WaitIdle(ctx context.Context, errCh <-chan error) error {
var wait <-chan struct{}
for {
s.mtx.Lock()
idle := s.running == 0 && s.jobQueueSize == 0
if !idle {
wait = s.bcast.GetWaitCh()
}
s.mtx.Unlock()
if idle {
return nil
}
select {
case <-ctx.Done():
return context.Canceled
case err, ok := <-errCh:
if !ok {
// errCh was non-nil but was closed
// treat this as context canceled
return context.Canceled
}
if err != nil {
return err
}
case <-wait:
}
}
}
// WatchState watches the concurrent queue state.
// If the callback returns an error or false, returns that error or nil.
// Returns nil immediately if callback is nil.
// Returns context.Canceled if ctx is canceled.
// errCh is an optional error channel.
func (s *ConcurrentQueue) WatchState(
ctx context.Context,
errCh <-chan error,
cb func(queued, running int) (bool, error),
) error {
if cb == nil {
return nil
}
for {
s.mtx.Lock()
queued, running := s.jobQueueSize, s.running
waitCh := s.bcast.GetWaitCh()
s.mtx.Unlock()
cntu, err := cb(queued, running)
if err != nil || !cntu {
return err
}
select {
case <-ctx.Done():
return context.Canceled
case <-waitCh:
}
}
}
// updateLocked checks if we need to spawn any new routines.
// caller must hold mtx
func (s *ConcurrentQueue) updateLocked() {
var dirty bool
for s.maxConcurrency <= 0 || s.running < s.maxConcurrency {
job, jobOk := s.jobQueue.Pop()
if !jobOk {
break
}
s.jobQueueSize--
s.running++
dirty = true
go s.executeJob(job)
}
if dirty {
s.bcast.Broadcast()
}
}
// executeJob is a goroutine to execute a job function.
// will continue to run until there are no more jobs.
func (s *ConcurrentQueue) executeJob(job func()) {
for {
if job != nil {
job()
}
s.mtx.Lock()
var jobOk bool
job, jobOk = s.jobQueue.Pop()
if !jobOk {
s.running--
s.bcast.Broadcast()
} else {
s.jobQueueSize--
}
s.mtx.Unlock()
if !jobOk {
return
}
}
}