forked from sensu/sensu-go
/
interval_scheduler.go
130 lines (110 loc) · 3.3 KB
/
interval_scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package schedulerd
import (
"context"
corev2 "github.com/sensu/sensu-go/api/core/v2"
"github.com/sensu/sensu-go/backend/messaging"
"github.com/sensu/sensu-go/backend/store"
"github.com/sensu/sensu-go/types"
"github.com/sirupsen/logrus"
)
// IntervalScheduler schedules checks to be executed on a timer
type IntervalScheduler struct {
lastIntervalState uint32
check *types.CheckConfig
store store.Store
bus messaging.MessageBus
logger *logrus.Entry
ctx context.Context
cancel context.CancelFunc
interrupt chan *corev2.CheckConfig
entityCache *EntityCache
}
func NewIntervalScheduler(ctx context.Context, store store.Store, bus messaging.MessageBus, check *types.CheckConfig, cache *EntityCache) *IntervalScheduler {
sched := &IntervalScheduler{
store: store,
bus: bus,
check: check,
lastIntervalState: check.Interval,
interrupt: make(chan *corev2.CheckConfig),
logger: logger.WithFields(logrus.Fields{
"name": check.Name,
"namespace": check.Namespace,
"scheduler_type": IntervalType.String(),
}),
entityCache: cache,
}
sched.ctx, sched.cancel = context.WithCancel(ctx)
sched.ctx = types.SetContextFromResource(sched.ctx, check)
return sched
}
func (s *IntervalScheduler) schedule(timer CheckTimer, executor *CheckExecutor) {
defer s.resetTimer(timer)
if s.check.IsSubdued() {
s.logger.Debug("check is subdued")
return
}
s.logger.Debug("check is not subdued")
if err := executor.processCheck(s.ctx, s.check); err != nil {
logger.WithError(err).Error("error executing check")
}
}
// Start starts the IntervalScheduler.
func (s *IntervalScheduler) Start() {
go s.start()
}
func (s *IntervalScheduler) start() {
s.logger.Info("starting new interval scheduler")
timer := NewIntervalTimer(s.check.Name, uint(s.check.Interval))
executor := NewCheckExecutor(s.bus, s.check.Namespace, s.store, s.entityCache)
timer.Start()
for {
select {
case <-s.ctx.Done():
timer.Stop()
return
case check := <-s.interrupt:
// if a schedule change is detected, restart the timer
s.check = check
if s.toggleSchedule() {
timer.Stop()
defer s.Start()
return
}
continue
case <-timer.C():
}
s.schedule(timer, executor)
}
}
// Interrupt refreshes the scheduler with a revised check config.
func (s *IntervalScheduler) Interrupt(check *corev2.CheckConfig) {
s.interrupt <- check
}
// Stop stops the IntervalScheduler
func (s *IntervalScheduler) Stop() error {
s.logger.Info("stopping scheduler")
s.cancel()
return nil
}
// Indicates a state change in the schedule, and if a timer needs to be reset.
func (s *IntervalScheduler) toggleSchedule() (stateChanged bool) {
defer s.setLastState()
if s.lastIntervalState != s.check.Interval {
s.logger.Info("interval schedule has changed")
return true
}
s.logger.Info("check schedule has not changed")
return false
}
// Update the IntervalScheduler with the last schedule states
func (s *IntervalScheduler) setLastState() {
s.lastIntervalState = s.check.Interval
}
// Reset timer
func (s *IntervalScheduler) resetTimer(timer CheckTimer) {
timer.SetDuration("", uint(s.check.Interval))
timer.Next()
}
func (s *IntervalScheduler) Type() SchedulerType {
return IntervalType
}