/
job_context.go
109 lines (93 loc) · 2.86 KB
/
job_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package utils
import (
"context"
"sync"
"time"
)
// JobContext is a synchronization object that combines the functions of a Context and a WaitGroup.
// The expected lifecycle is:
// - Caller calls JobContext.NewJob() with a parent context and a count of workers expected.
// - Caller launches the given number of workers, passing the JobContext to them.
// - Workers can check for termination by using the JobContext as a context.Context.
// - Workers can cancel the overall job by calling JobContext.Cancel().
// - Workers must call JobContext.WorkerDone() when they complete, like sync.WaitGroup.Done().
// - The caller, or other goroutines. can call JobContext.Wait() to wait for job completion.
//
// A single JobContext can only run one job at a time. If JobContext.NewJob() is called while a job
// is already running, that job will be cancelled and waited on prior to starting the new job.
type JobContext struct {
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
running bool
runningLock *sync.Mutex
}
// NewJob starts a new job with a defined number of workers. If a prior job is running, it is cancelled.
func (mw *JobContext) NewJob(ctx context.Context, workers int, returnIfRunning bool) bool {
if mw.runningLock == nil {
mw.runningLock = &sync.Mutex{}
}
mw.runningLock.Lock()
for mw.running {
if returnIfRunning {
mw.runningLock.Unlock()
return false
}
mw.cancel()
mw.runningLock.Unlock()
mw.Wait()
mw.runningLock.Lock()
}
mw.running = true
mw.ctx, mw.cancel = context.WithCancel(ctx)
mw.wg = &sync.WaitGroup{}
mw.wg.Add(workers)
mw.runningLock.Unlock()
go func() {
mw.wg.Wait()
mw.runningLock.Lock()
mw.running = false
mw.cancel()
mw.runningLock.Unlock()
}()
return true
}
// WorkerDone signals that a worker is finished, like sync.WaitGroup.Done().
func (mw *JobContext) WorkerDone() {
mw.wg.Done()
}
// Wait waits for the current job to complete, like sync.WaitGroup.Wait().
// If no job has been started, always just returns.
func (mw *JobContext) Wait() {
if mw.wg != nil {
mw.wg.Wait()
}
}
// Done implements Context.Done().
func (mw *JobContext) Done() <-chan struct{} {
return mw.ctx.Done()
}
// Err implements Context.Err().
func (mw *JobContext) Err() error {
return mw.ctx.Err()
}
// Deadline implements Context.Deadline().
func (mw *JobContext) Deadline() (time time.Time, ok bool) {
return mw.ctx.Deadline()
}
// Value implements Context.Value().
func (mw *JobContext) Value(key interface{}) interface{} {
return mw.ctx.Value(key)
}
// Cancel cancels the JobContext's context. If no job has been started, this does nothing.
func (mw *JobContext) Cancel() {
if mw.cancel != nil {
mw.cancel()
}
}
// Running returns true if a job is currently running.
func (mw *JobContext) Running() bool {
mw.runningLock.Lock()
defer mw.runningLock.Unlock()
return mw.running
}