Skip to content

Commit

Permalink
cmd/bosun: centralize schedule for alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
Craig Peterson authored and kylebrandt committed Mar 6, 2017
1 parent ea52fac commit 5ceb838
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 24 deletions.
61 changes: 38 additions & 23 deletions cmd/bosun/sched/alertRunner.go
Expand Up @@ -16,53 +16,68 @@ func (s *Schedule) Run() error {
}
s.nc = make(chan interface{}, 1)
go s.dispatchNotifications()
go s.updateCheckContext()
type alertCh struct {
ch chan<- *checkContext
modulo int
}
chs := []alertCh{}
for _, a := range s.RuleConf.GetAlerts() {
go s.RunAlert(a)
ch := make(chan *checkContext, 1)
re := a.RunEvery
if re == 0 {
re = s.SystemConf.GetDefaultRunEvery()
}
go s.runAlert(a, ch)
chs = append(chs, alertCh{ch: ch, modulo: re})
}
return nil
}

func (s *Schedule) updateCheckContext() {
i := 0
for {
select {
case <-s.runnerContext.Done():
return nil
default:
}
ctx := &checkContext{utcNow(), cache.New(0)}
s.ctx = ctx
s.LastCheck = utcNow()
for _, a := range chs {
if i%a.modulo != 0 {
continue
}
// Put on channel. If that fails, the alert is backed up pretty bad.
// Because channel is buffered size 1, it will continue as soon as it finishes.
// Master scheduler will never block here.
select {
case a.ch <- ctx:
default:
}
}
i++
time.Sleep(s.SystemConf.GetCheckFrequency())
s.Lock("CollectStates")
s.CollectStates()
s.Unlock()
}
}

func (s *Schedule) RunAlert(a *conf.Alert) {
func (s *Schedule) runAlert(a *conf.Alert, ch <-chan *checkContext) {
// Add to waitgroup for running alert
s.checksRunning.Add(1)
// ensure when an alert is done it is removed from the wait group
defer s.checksRunning.Done()
for {
// Calcaulate runEvery based on system default and override if an alert has a
// custom runEvery
runEvery := s.SystemConf.GetDefaultRunEvery()
if a.RunEvery != 0 {
runEvery = a.RunEvery
}
wait := time.After(s.SystemConf.GetCheckFrequency() * time.Duration(runEvery))
s.checkAlert(a)
s.LastCheck = utcNow()
select {
case <-wait:
case <-s.runnerContext.Done():
// If an alert is waiting we cancel it
slog.Infof("Stopping alert routine for %v\n", a.Name)
return
case ctx := <-ch:
s.checkAlert(a, ctx)

}
}
}

func (s *Schedule) checkAlert(a *conf.Alert) {
checkTime := s.ctx.runTime
checkCache := s.ctx.checkCache
rh := s.NewRunHistory(checkTime, checkCache)
func (s *Schedule) checkAlert(a *conf.Alert, ctx *checkContext) {
rh := s.NewRunHistory(ctx.runTime, ctx.checkCache)
// s.CheckAlert will return early if the schedule has been closed
cancelled := s.CheckAlert(nil, rh, a)
if cancelled {
Expand Down
2 changes: 1 addition & 1 deletion cmd/bosun/sched/sched_test.go
Expand Up @@ -50,7 +50,7 @@ func check(s *Schedule, t time.Time) {
for _, n := range names {
a := s.RuleConf.GetAlerts()[n]
s.ctx.runTime = t
s.checkAlert(a)
s.checkAlert(a, s.ctx)
}
}

Expand Down

0 comments on commit 5ceb838

Please sign in to comment.