/
requeuer.go
116 lines (94 loc) · 2.66 KB
/
requeuer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package work
import (
"errors"
"fmt"
"time"
"github.com/Daskott/kronus/colors"
"github.com/Daskott/kronus/server/models"
"gorm.io/gorm"
)
type requeuer struct {
fromQueue string
stopChan chan struct{}
}
var supportedQueues = map[string]bool{models.IN_PROGRESS_JOB: true, models.SCHEDULED_JOB: true}
func newRequeuer(fromQueue string) (*requeuer, error) {
if !supportedQueues[fromQueue] {
return nil, fmt.Errorf("%v is not a supported queue, must be in %v", fromQueue, supportedQueues)
}
return &requeuer{
fromQueue: fromQueue,
stopChan: make(chan struct{}),
}, nil
}
// start starts the requeuer loop that pulls jobs from 'in-progress'
// that are stuck(i.e stayed too long in-progress) and requeue them
func (r *requeuer) start() {
go r.loop()
}
func (r *requeuer) stop() {
r.stopChan <- struct{}{}
}
func (r *requeuer) loop() {
var job *models.Job
var err error
// At some point we may need an expnential back-off,
// but for now keep it simple
sleepBackOff := 5
rateLimiter := time.NewTicker(DefaultTickerDuration)
defer rateLimiter.Stop()
logg.Infof("Starting %s job requeuer", r.fromQueue)
for {
select {
case <-r.stopChan:
logg.Infof("Stopping %s job requeuer", r.fromQueue)
return
case <-rateLimiter.C:
job, err = r.nextJob()
// If no job found, sleep for 'sleepBackOff' seconds
if errors.Is(err, gorm.ErrRecordNotFound) {
rateLimiter.Reset(time.Duration(sleepBackOff) * time.Second)
continue
}
if err != nil {
r.logError(err)
rateLimiter.Reset(TickerDurationOnError)
continue
}
r.logInfof("fetched job with id=%v, status_id=%v, job.claimed=%v",
job.ID, job.JobStatusID, job.Claimed)
r.requeue(job)
rateLimiter.Reset(DefaultTickerDuration)
}
}
}
func (r *requeuer) nextJob() (*models.Job, error) {
if r.fromQueue == models.IN_PROGRESS_JOB {
return models.LastJobLastUpdated(10, models.IN_PROGRESS_JOB)
}
return models.FirstScheduledJobToBeQueued()
}
func (r *requeuer) requeue(job *models.Job) {
jobStatus, err := models.FindJobStatus(models.ENQUEUED_JOB)
if err != nil {
logg.Error(err)
return
}
update := make(map[string]interface{})
update["claimed"] = false
update["job_status_id"] = jobStatus.ID
update["enqueued_at"] = time.Now()
err = job.Update(update)
if err != nil {
r.logError(err)
}
r.logInfof("job with id=%v requeued", job.ID)
}
func (r *requeuer) logInfof(template string, args ...interface{}) {
prefix := colors.Yellow(fmt.Sprintf("[%s job requeuer] ", r.fromQueue))
logg.Infof(prefix+template, args...)
}
func (r *requeuer) logError(args ...interface{}) {
prefix := colors.Red(fmt.Sprintf("[%s job requeuer] ", r.fromQueue))
logg.Errorf(prefix, args...)
}