-
Notifications
You must be signed in to change notification settings - Fork 15
/
statemonitor.go
92 lines (80 loc) · 1.67 KB
/
statemonitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package datasetworker
import (
"context"
"sync"
"time"
"github.com/data-preservation-programs/singularity/model"
"gorm.io/gorm"
)
const jobCheckInterval = 5 * time.Second
func NewStateMonitor(db *gorm.DB) *StateMonitor {
return &StateMonitor{
db: db,
jobs: make(map[model.JobID]context.CancelFunc),
done: make(chan struct{}),
}
}
type StateMonitor struct {
db *gorm.DB
jobs map[model.JobID]context.CancelFunc
mu sync.Mutex
done chan struct{}
}
func (s *StateMonitor) AddJob(jobID model.JobID, cancel context.CancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
s.jobs[jobID] = cancel
}
func (s *StateMonitor) RemoveJob(jobID model.JobID) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.jobs, jobID)
}
func (s *StateMonitor) Start(ctx context.Context) {
db := s.db.WithContext(ctx)
go func() {
defer close(s.done)
var timer *time.Timer
for {
var i int
s.mu.Lock()
jobIDs := make([]model.JobID, len(s.jobs))
for jobID := range s.jobs {
jobIDs[i] = jobID
i++
}
s.mu.Unlock()
var jobs []model.Job
if len(jobIDs) > 0 {
err := db.Where("state = ?", model.Paused).Find(&jobs, jobIDs).Error
if err != nil {
logger.Errorf("failed to fetch paused jobs: %v", err)
}
}
s.mu.Lock()
for _, job := range jobs {
jobID := job.ID
cancel, ok := s.jobs[jobID]
if ok {
cancel()
delete(s.jobs, jobID)
}
}
s.mu.Unlock()
if timer == nil {
timer = time.NewTimer(jobCheckInterval)
defer timer.Stop()
} else {
timer.Reset(jobCheckInterval)
}
select {
case <-ctx.Done():
return
case <-timer.C:
}
}
}()
}
func (s *StateMonitor) Done() <-chan struct{} {
return s.done
}