-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
jobmanager.go
355 lines (292 loc) · 9.5 KB
/
jobmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
package fairshare
import (
"container/list"
"fmt"
"io/ioutil"
"math"
"sync"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/sdk/helper/logging"
)
type JobManager struct {
name string
queues map[string]*list.List
quit chan struct{}
newWork chan struct{} // must be buffered
workerPool *dispatcher
workerCount map[string]int
onceStart sync.Once
onceStop sync.Once
logger log.Logger
totalJobs int
metricSink *metricsutil.ClusterMetricSink
// waitgroup for testing stop functionality
wg sync.WaitGroup
// protects `queues`, `workerCount`, `queuesIndex`, `lastQueueAccessed`
l sync.RWMutex
// track queues by index for round robin worker assignment
queuesIndex []string
lastQueueAccessed int
}
// NewJobManager creates a job manager, with an optional name
func NewJobManager(name string, numWorkers int, l log.Logger, metricSink *metricsutil.ClusterMetricSink) *JobManager {
if l == nil {
l = logging.NewVaultLoggerWithWriter(ioutil.Discard, log.NoLevel)
}
if name == "" {
guid, err := uuid.GenerateUUID()
if err != nil {
l.Warn("uuid generator failed, using 'no-uuid'", "err", err)
guid = "no-uuid"
}
name = fmt.Sprintf("jobmanager-%s", guid)
}
wp := newDispatcher(fmt.Sprintf("%s-dispatcher", name), numWorkers, l)
j := JobManager{
name: name,
queues: make(map[string]*list.List),
quit: make(chan struct{}),
newWork: make(chan struct{}, 1),
workerPool: wp,
workerCount: make(map[string]int),
logger: l,
metricSink: metricSink,
queuesIndex: make([]string, 0),
lastQueueAccessed: -1,
}
j.logger.Trace("created job manager", "name", name, "pool_size", numWorkers)
return &j
}
// Start starts the job manager
// note: a given job manager cannot be restarted after it has been stopped
func (j *JobManager) Start() {
j.onceStart.Do(func() {
j.logger.Trace("starting job manager", "name", j.name)
j.workerPool.start()
j.assignWork()
})
}
// Stop stops the job manager asynchronously
func (j *JobManager) Stop() {
j.onceStop.Do(func() {
j.logger.Trace("terminating job manager...")
close(j.quit)
j.workerPool.stop()
})
}
// AddJob adds a job to the given queue, creating the queue if it doesn't exist
func (j *JobManager) AddJob(job Job, queueID string) {
j.l.Lock()
if len(j.queues) == 0 {
defer func() {
// newWork must be buffered to avoid deadlocks if work is added
// before the job manager is started
j.newWork <- struct{}{}
}()
}
defer j.l.Unlock()
if _, ok := j.queues[queueID]; !ok {
j.addQueue(queueID)
}
j.queues[queueID].PushBack(job)
j.totalJobs++
if j.metricSink != nil {
j.metricSink.AddSampleWithLabels([]string{j.name, "job_manager", "queue_length"}, float32(j.queues[queueID].Len()), []metrics.Label{{"queue_id", queueID}})
j.metricSink.AddSample([]string{j.name, "job_manager", "total_jobs"}, float32(j.totalJobs))
}
}
// GetCurrentJobCount returns the total number of pending jobs in the job manager
func (j *JobManager) GetPendingJobCount() int {
j.l.RLock()
defer j.l.RUnlock()
cnt := 0
for _, q := range j.queues {
cnt += q.Len()
}
return cnt
}
// GetWorkerCounts() returns a map of queue ID to number of active workers
func (j *JobManager) GetWorkerCounts() map[string]int {
j.l.RLock()
defer j.l.RUnlock()
return j.workerCount
}
// GetWorkQueueLengths() returns a map of queue ID to number of jobs in the queue
func (j *JobManager) GetWorkQueueLengths() map[string]int {
out := make(map[string]int)
j.l.RLock()
defer j.l.RUnlock()
for k, v := range j.queues {
out[k] = v.Len()
}
return out
}
// getNextJob pops the next job to be processed and prunes empty queues
// it also returns the ID of the queue the job is associated with
func (j *JobManager) getNextJob() (Job, string) {
j.l.Lock()
defer j.l.Unlock()
if len(j.queues) == 0 {
return nil, ""
}
queueID, canAssignWorker := j.getNextQueue()
if !canAssignWorker {
return nil, ""
}
jobElement := j.queues[queueID].Front()
jobRaw := j.queues[queueID].Remove(jobElement)
j.totalJobs--
if j.metricSink != nil {
j.metricSink.AddSampleWithLabels([]string{j.name, "job_manager", "queue_length"}, float32(j.queues[queueID].Len()), []metrics.Label{{"queue_id", queueID}})
j.metricSink.AddSample([]string{j.name, "job_manager", "total_jobs"}, float32(j.totalJobs))
}
if j.queues[queueID].Len() == 0 {
// we remove the empty queue, but we don't remove the worker count
// in case we are still working on previous jobs from this queue.
// worker count cleanup is handled in j.decrementWorkerCount
j.removeLastQueueAccessed()
}
return jobRaw.(Job), queueID
}
// returns the next queue to assign work from, and a bool if there is a queue
// that can have a worker assigned. if there is work to be assigned,
// j.lastQueueAccessed will be updated to that queue.
// note: this must be called with j.l held
func (j *JobManager) getNextQueue() (string, bool) {
var nextQueue string
var canAssignWorker bool
// ensure we loop through all existing queues until we find an eligible
// queue, if one exists.
queueIdx := j.nextQueueIndex(j.lastQueueAccessed)
for i := 0; i < len(j.queuesIndex); i++ {
potentialQueueID := j.queuesIndex[queueIdx]
if !j.queueWorkersSaturated(potentialQueueID) {
nextQueue = potentialQueueID
canAssignWorker = true
j.lastQueueAccessed = queueIdx
break
}
queueIdx = j.nextQueueIndex(queueIdx)
}
return nextQueue, canAssignWorker
}
// get the index of the next queue in round-robin order
// note: this must be called with j.l held
func (j *JobManager) nextQueueIndex(currentIdx int) int {
return (currentIdx + 1) % len(j.queuesIndex)
}
// returns true if there are already too many workers on this queue
// note: this must be called with j.l held (at least for read).
// note: we may want to eventually factor in queue length relative to num queues
func (j *JobManager) queueWorkersSaturated(queueID string) bool {
numActiveQueues := float64(len(j.queues))
numTotalWorkers := float64(j.workerPool.numWorkers)
maxWorkersPerQueue := math.Ceil(0.9 * numTotalWorkers / numActiveQueues)
numWorkersPerQueue := j.workerCount
return numWorkersPerQueue[queueID] >= int(maxWorkersPerQueue)
}
// increment the worker count for this queue
func (j *JobManager) incrementWorkerCount(queueID string) {
j.l.Lock()
defer j.l.Unlock()
j.workerCount[queueID]++
}
// decrement the worker count for this queue
// this also removes worker tracking for this queue if needed
func (j *JobManager) decrementWorkerCount(queueID string) {
j.l.Lock()
defer j.l.Unlock()
j.workerCount[queueID]--
_, queueExists := j.queues[queueID]
if !queueExists && j.workerCount[queueID] < 1 {
delete(j.workerCount, queueID)
}
}
// assignWork continually loops checks for new jobs and dispatches them to the
// worker pool
func (j *JobManager) assignWork() {
j.wg.Add(1)
go func() {
// ticker is used to prevent memory leak of using time.After in
// for - select pattern.
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
for {
// assign work while there are jobs to distribute
select {
case <-j.quit:
j.wg.Done()
return
case <-j.newWork:
// keep the channel empty since we're already processing work
default:
}
job, queueID := j.getNextJob()
if job != nil {
j.workerPool.dispatch(job,
func() {
j.incrementWorkerCount(queueID)
},
func() {
j.decrementWorkerCount(queueID)
})
} else {
break
}
}
ticker.Reset(50 * time.Millisecond)
select {
case <-j.quit:
j.wg.Done()
return
case <-j.newWork:
// listen for wake-up when an empty job manager has been given work
case <-ticker.C:
// periodically check if new workers can be assigned. with the
// fairsharing worker distribution it can be the case that there
// is work waiting, but no queues are eligible for another worker
}
}
}()
}
// addQueue generates a new queue if a queue for `queueID` doesn't exist
// it also starts tracking workers on that queue, if not already tracked
// note: this must be called with j.l held for write
func (j *JobManager) addQueue(queueID string) {
if _, ok := j.queues[queueID]; !ok {
j.queues[queueID] = list.New()
j.queuesIndex = append(j.queuesIndex, queueID)
}
// it's possible the queue ran out of work and was pruned, but there were
// still workers operating on data formerly in that queue, which were still
// being tracked. if that is the case, we don't want to wipe out that worker
// count when the queue is re-initialized.
if _, ok := j.workerCount[queueID]; !ok {
j.workerCount[queueID] = 0
}
}
// removes the queue and index tracker for the last queue accessed.
// it is to be used when the last queue accessed has emptied.
// note: this must be called with j.l held.
func (j *JobManager) removeLastQueueAccessed() {
if j.lastQueueAccessed == -1 || j.lastQueueAccessed > len(j.queuesIndex)-1 {
j.logger.Warn("call to remove queue out of bounds", "idx", j.lastQueueAccessed)
return
}
queueID := j.queuesIndex[j.lastQueueAccessed]
// remove the queue
delete(j.queues, queueID)
// remove the index for the queue
j.queuesIndex = append(j.queuesIndex[:j.lastQueueAccessed], j.queuesIndex[j.lastQueueAccessed+1:]...)
// correct the last queue accessed for round robining
if j.lastQueueAccessed > 0 {
j.lastQueueAccessed--
} else {
j.lastQueueAccessed = len(j.queuesIndex) - 1
}
}