-
Notifications
You must be signed in to change notification settings - Fork 25
/
job-tracker.go
213 lines (163 loc) · 4.31 KB
/
job-tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package jobs
import (
"context"
"errors"
"sync"
"time"
"google.golang.org/protobuf/proto"
statusv1 "github.com/fluxninja/aperture/v2/api/gen/proto/go/aperture/status/v1"
"github.com/fluxninja/aperture/v2/pkg/status"
)
var (
errInvalidJob = errors.New("job is nil or invalid name is provided")
errExistingJob = errors.New("job with same name already exists")
)
// JobInfo contains information such as run count, last run time, etc. for a Job.
type JobInfo struct {
LastExecuteTime time.Time
ExecuteCount int
}
type jobTracker struct {
job Job
statusRegistry status.Registry
jobInfo JobInfo
}
func newJobTracker(job Job, statusRegistry status.Registry) *jobTracker {
reg := statusRegistry.Child("job-tracker", job.Name())
return &jobTracker{
job: job,
statusRegistry: reg,
}
}
// Common groupTracker.
type groupTracker struct {
mu sync.Mutex
trackers map[string]*jobTracker
statusRegistry status.Registry
groupWatchers GroupWatchers
}
func newGroupTracker(gws GroupWatchers, statusRegistry status.Registry) *groupTracker {
gt := &groupTracker{
trackers: make(map[string]*jobTracker),
statusRegistry: statusRegistry,
groupWatchers: gws,
}
return gt
}
func (gt *groupTracker) updateStatus(job Job, s *statusv1.Status) error {
gt.mu.Lock()
defer gt.mu.Unlock()
// check whether this job still exists and hasn't been swapped with another job of the same name
tracker, ok := gt.trackers[job.Name()]
if !ok {
return errInvalidJob
}
if tracker.job != job {
return errExistingJob
}
tracker.statusRegistry.SetStatus(s)
return nil
}
func (gt *groupTracker) registerJob(job Job) error {
if job.Name() == "" {
return errInvalidJob
}
gt.mu.Lock()
defer gt.mu.Unlock()
_, ok := gt.trackers[job.Name()]
if ok {
return errExistingJob
}
tracker := newJobTracker(job, gt.statusRegistry)
gt.trackers[job.Name()] = tracker
gt.groupWatchers.OnJobRegistered(job.Name())
return nil
}
func (gt *groupTracker) deregisterJob(name string) (Job, error) {
var ok bool
var tracker *jobTracker
gt.mu.Lock()
defer gt.mu.Unlock()
tracker, ok = gt.trackers[name]
if !ok {
return nil, errInvalidJob
}
delete(gt.trackers, name)
gt.groupWatchers.OnJobDeregistered(name)
tracker.statusRegistry.Detach()
return tracker.job, nil
}
func (gt *groupTracker) reset() []Job {
jobs := []Job{}
gt.mu.Lock()
defer gt.mu.Unlock()
for _, tracker := range gt.trackers {
job := tracker.job
jobs = append(jobs, job)
gt.groupWatchers.OnJobDeregistered(job.Name())
tracker.statusRegistry.Detach()
}
gt.trackers = make(map[string]*jobTracker)
return jobs
}
func (gt *groupTracker) isHealthy() bool {
gt.mu.Lock()
defer gt.mu.Unlock()
for _, tracker := range gt.trackers {
if tracker.statusRegistry.GetStatus().GetError() != nil {
return false
}
}
return true
}
func (gt *groupTracker) results() (*statusv1.GroupStatus, bool) {
gt.mu.Lock()
defer gt.mu.Unlock()
return gt.statusRegistry.GetGroupStatus(), !gt.statusRegistry.HasError()
}
func (gt *groupTracker) getJobs() []Job {
jobs := []Job{}
gt.mu.Lock()
defer gt.mu.Unlock()
for _, tracker := range gt.trackers {
job := tracker.job
jobs = append(jobs, job)
}
return jobs
}
func (gt *groupTracker) updateJobInfo(job Job, startTime time.Time) error {
gt.mu.Lock()
defer gt.mu.Unlock()
tracker, ok := gt.trackers[job.Name()]
if !ok {
return errInvalidJob
}
tracker.jobInfo.LastExecuteTime = startTime
tracker.jobInfo.ExecuteCount++
return nil
}
func (gt *groupTracker) execute(ctx context.Context, job Job) (proto.Message, error) {
gt.groupWatchers.OnJobScheduled(job.Name())
job.JobWatchers().OnJobScheduled()
startTime := time.Now()
err := gt.updateJobInfo(job, startTime)
if err != nil {
return nil, err
}
details, err := job.Execute(ctx)
if err != nil {
return nil, err
}
endTime := time.Now()
duration := endTime.Sub(startTime)
s := status.NewStatus(details, err)
err = gt.updateStatus(job, s)
if err != nil {
gt.statusRegistry.GetLogger().Error().Err(err).Str("job", job.Name()).Msg("Recently completed job has been removed from tracker and is not reporting results")
return nil, err
}
jobStats := JobStats{Duration: duration}
job.JobWatchers().OnJobCompleted(s, jobStats)
gt.groupWatchers.OnJobCompleted(job.Name(), s, jobStats)
return details, err
}