/
jobManager.go
135 lines (113 loc) · 2.6 KB
/
jobManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package jobs
import (
"sync"
"time"
"github.com/thoas/go-funk"
)
// Job Detail
type jobDetail struct {
// Job name
Name string
// job repeat/schedule duration
Duration time.Duration
// Job
Job Job
}
// Job Manager
type JobManager struct {
// Job db
JobDb JobDb
// Job Details
JobDetails []jobDetail
// Stop chan
stopChan chan struct{}
// Mutex to protect JobRunLogs
mutex *sync.Mutex
// Wait group
wg *sync.WaitGroup
}
// Global job manager
func NewJobManager(database string) *JobManager {
return &JobManager{
JobDb: JobDb{Database: database},
JobDetails: make([]jobDetail, 0),
stopChan: make(chan struct{}),
wg: &sync.WaitGroup{},
mutex: &sync.Mutex{},
}
}
// Add job
func (j *JobManager) RegisterJob(name string, duration time.Duration, job Job) {
j.mutex.Lock()
defer j.mutex.Unlock()
// check if job already exists
jd := funk.Find(j.JobDetails, func(j jobDetail) bool {
return j.Name == name
})
// if job already exists, return
if jd != nil {
return
}
// save job details to db.
jobRunLogModel := &JobRunLogModel{
Name: name,
LastExecutionDuration: duration,
LastExecutionDateTime: time.Now().Format("dd-MMM-yyyy HH:mm:ss"),
LastExecutionStatus: JobStatusScheduled,
LastExecutionMessage: "Job scheduled",
}
j.JobDb.GetJobRunLogRepository().Save(jobRunLogModel)
j.JobDetails = append(
j.JobDetails,
jobDetail{
Name: name,
Duration: duration,
Job: job,
})
}
// Start job
func (j *JobManager) runJob(jd jobDetail) {
j.wg.Add(1)
defer j.wg.Done()
id := jd.Name
duration := jd.Duration
job := jd.Job
for {
select {
case <-j.stopChan:
// Stop the goroutine gracefully
<-j.JobDb.GetJobRunLogRepository().MarkJob(id, "Job stopped", JobStatusStopped)
return
case <-time.After(duration):
if isRunning := <-j.JobDb.GetJobRunLogRepository().IsRunning(id); !isRunning {
<-j.JobDb.GetJobRunLogRepository().MarkJob(id, "Job running", JobStatusRunning)
err := job.Run()
if err != nil {
<-j.JobDb.GetJobRunLogRepository().MarkJob(id, "Job failed: "+err.Error(), JobStatusFailed)
} else {
<-j.JobDb.GetJobRunLogRepository().MarkJob(id, "Job scheduled", JobStatusScheduled)
}
}
}
}
}
// Stop method to signal the registered jobs to stop
func (j *JobManager) Stop() {
j.mutex.Lock()
defer j.mutex.Unlock()
select {
case <-j.stopChan:
return
default:
close(j.stopChan)
j.wg.Wait()
}
}
// Start method to start the registered jobs
func (j *JobManager) Start() {
j.mutex.Lock()
defer j.mutex.Unlock()
for _, jd := range j.JobDetails {
go j.runJob(jd)
}
}