/
schedule_watcher.go
216 lines (197 loc) · 7.04 KB
/
schedule_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package cdule
import (
"encoding/json"
"reflect"
"sort"
"sync"
"time"
"github.com/deepaksinghvi/cdule/pkg"
"github.com/deepaksinghvi/cdule/pkg/model"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
"gorm.io/gorm"
)
// ScheduleWatcher struct
type ScheduleWatcher struct {
Closed chan struct{}
WG sync.WaitGroup
Ticker *time.Ticker
}
var lastScheduleExecutionTime int64
var nextScheduleExecutionTime int64
// Run to run watcher in a continuous loop
func (t *ScheduleWatcher) Run() {
for {
select {
case <-t.Closed:
return
case <-t.Ticker.C:
now := time.Now()
lastScheduleExecutionTime = now.Add(-1 * time.Minute).UnixNano()
nextScheduleExecutionTime = now.UnixNano()
log.Infof("lastScheduleExecutionTime %d, nextScheduleExecutionTime %d", lastScheduleExecutionTime, nextScheduleExecutionTime)
runNextScheduleJobs(lastScheduleExecutionTime, nextScheduleExecutionTime)
}
}
}
// Stop to stop scheduler watcher
func (t *ScheduleWatcher) Stop() {
close(t.Closed)
t.WG.Wait()
}
func runNextScheduleJobs(scheduleStart, scheduleEnd int64) {
defer panicRecoveryForSchedule()
schedules, err := model.CduleRepos.CduleRepository.GetScheduleBetween(scheduleStart, scheduleEnd, WorkerID)
if nil != err {
log.Error(err)
return
}
workers, err := model.CduleRepos.CduleRepository.GetWorkers()
if nil != err {
log.Error(err)
return
}
for _, schedule := range schedules {
scheduledJob, err := model.CduleRepos.CduleRepository.GetJob(schedule.JobID)
if nil != err {
log.Errorf("Error while running Schedule for %d : %s", schedule.JobID, err.Error())
continue
}
log.Info("====START====")
log.Infof("Schedule for JobName: %s, Exeuction Time %d at Worker %s", scheduledJob.JobName, schedule.ExecutionID, schedule.WorkerID)
jobDataStr := schedule.JobData
var jobDataMap map[string]string
if pkg.EMPTYSTRING != jobDataStr {
err = json.Unmarshal([]byte(jobDataStr), &jobDataMap)
if nil != err {
log.Error(err)
continue
}
}
var jobHistory *model.JobHistory
if err == nil {
jobHistory, err = model.CduleRepos.CduleRepository.GetJobHistoryForSchedule(schedule.ExecutionID)
j := JobRegistry[scheduledJob.JobName]
jobInstance := reflect.New(j).Elem().Interface()
if err != nil && err.Error() == "record not found" && jobHistory != nil {
// if job history was present but not executed
if jobHistory.Status == model.JobStatusNew {
jobHistory.Status = model.JobStatusInProgress
model.CduleRepos.CduleRepository.UpdateJobHistory(jobHistory)
jobInstance.(Job).Execute(jobDataMap)
jobDataMap = jobInstance.(Job).GetJobData()
}
} else {
// if job history is not there for this schedule, so this should be executed.
jobHistory = &model.JobHistory{
JobID: schedule.JobID,
ExecutionID: schedule.ExecutionID,
DeletedAt: gorm.DeletedAt{},
Status: model.JobStatusNew,
WorkerID: schedule.WorkerID,
RetryCount: 0,
}
model.CduleRepos.CduleRepository.CreateJobHistory(jobHistory)
jobHistory.Status = model.JobStatusInProgress
model.CduleRepos.CduleRepository.UpdateJobHistory(jobHistory)
jobDataMap = executeJob(jobInstance, jobHistory, &jobDataMap)
log.Infof("Job Execution Completed For JobName: %s JobID: %d on Worker: %s", scheduledJob.JobName, schedule.JobID, schedule.WorkerID)
log.Info("====END====\n")
}
// Calculate the next schedule for the current job
jobDataBytes, err := json.Marshal(jobDataMap)
if nil != err {
log.Errorf("Error %s for JobName %s and Schedule ID %d ", err.Error(), scheduledJob.JobName, schedule.ExecutionID)
}
if string(jobDataBytes) != pkg.EMPTYSTRING {
jobDataStr = string(jobDataBytes)
}
SchedulerParser, err := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow).Parse(scheduledJob.CronExpression)
if err != nil {
log.Error(err.Error())
return
}
nextRunTime := SchedulerParser.Next(time.Now()).UnixNano()
workerIDForNextRun, _ := findNextAvailableWorker(workers, schedule)
newSchedule := model.Schedule{
ExecutionID: nextRunTime,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
DeletedAt: gorm.DeletedAt{},
WorkerID: workerIDForNextRun,
JobID: schedule.JobID,
JobData: jobDataStr,
}
model.CduleRepos.CduleRepository.CreateSchedule(&newSchedule)
log.Infof("*** Next Job Scheduled Info ***\n JobName: %s,\n Schedule Cron: %s,\n Job Scheduled Time: %d,\n Worker: %s ",
scheduledJob.JobName, scheduledJob.CronExpression, newSchedule.ExecutionID, newSchedule.WorkerID)
}
}
log.Infof("Schedules Completed For StartTime %d To EndTime %d", scheduleStart, scheduleEnd)
}
// WorkerJobCount struct
type WorkerJobCount struct {
WorkerID string `json:"worker_id"`
Count int64 `json:"count"`
}
func findNextAvailableWorker(workers []model.Worker, schedule model.Schedule) (string, error) {
workerName := schedule.WorkerID
var workerJobCountMetrics []WorkerJobCount
model.DB.Raw("SELECT worker_id, count(1) FROM job_histories WHERE job_id = ? group by worker_id", schedule.JobID).Scan(&workerJobCountMetrics)
log.Infof("workerJobCountMetrics %v", workerJobCountMetrics)
if len(workerJobCountMetrics) <= 0 {
log.Infof("workerName %s would be used", workerName)
return workerName, nil
}
for _, worker := range workers {
appendWorker := true
for _, v := range workerJobCountMetrics {
if v.WorkerID == worker.WorkerID {
appendWorker = false
break
}
}
if appendWorker {
newWorkerMetric := WorkerJobCount{
WorkerID: worker.WorkerID,
Count: 0,
}
workerJobCountMetrics = append(workerJobCountMetrics, newWorkerMetric)
}
}
sort.Slice(workerJobCountMetrics[:], func(i, j int) bool {
return workerJobCountMetrics[i].Count < workerJobCountMetrics[j].Count
})
return workerJobCountMetrics[0].WorkerID, nil
}
/*
For go 1.17 following method can be used.
func executeJob(jobInstance interface{}, jobHistory *model.JobHistory, jobDataMap map[string]string) {
defer panicRecovery(jobHistory)
jobInstance.(job.Job).Execute(jobDataMap)
}
*/
/*
cdule library has been built and developed using go 1.18 (go1.18beta2), if you need to use it for 1.17
then build from source by uncommenting the above method and comment the following
*/
func executeJob(jobInstance any, jobHistory *model.JobHistory, jobDataMap *map[string]string) map[string]string {
defer panicRecovery(jobHistory)
jobInstance.(Job).Execute(*jobDataMap)
return jobInstance.(Job).GetJobData()
}
// If there is any panic from Job Execution, set the JobStatus as FAILED
func panicRecovery(jobHistory *model.JobHistory) {
// TODO should be handled for any panic and set the status as FAILED for job history with error message
jobHistory.Status = model.JobStatusCompleted
if r := recover(); r != nil {
log.Warning("Recovered in panicRecovery for job execution ", r)
jobHistory.Status = model.JobStatusFailed
}
model.CduleRepos.CduleRepository.UpdateJobHistory(jobHistory)
}
func panicRecoveryForSchedule() {
if r := recover(); r != nil {
log.Warning("Recovered in runNextScheduleJobs ", r)
}
}