forked from moira-alert/moira
/
worker.go
129 lines (110 loc) · 3.92 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package worker
import (
"runtime"
"time"
"github.com/moira-alert/moira/remote"
"github.com/patrickmn/go-cache"
"gopkg.in/tomb.v2"
"github.com/moira-alert/moira"
"github.com/moira-alert/moira/checker"
"github.com/moira-alert/moira/metrics/graphite"
)
// Checker represents workers for periodically triggers checking based by new events
type Checker struct {
Logger moira.Logger
Database moira.Database
Config *checker.Config
RemoteConfig *remote.Config
Metrics *graphite.CheckerMetrics
TriggerCache *cache.Cache
LazyTriggersCache *cache.Cache
PatternCache *cache.Cache
lazyTriggerIDs map[string]bool
lastData int64
tomb tomb.Tomb
remoteEnabled bool
}
// Start start schedule new MetricEvents and check for NODATA triggers
func (worker *Checker) Start() error {
if worker.Config.MaxParallelChecks == 0 {
worker.Config.MaxParallelChecks = runtime.NumCPU()
worker.Logger.Infof("MaxParallelChecks is not configured, set it to the number of CPU - %d", worker.Config.MaxParallelChecks)
}
worker.lastData = time.Now().UTC().Unix()
metricEventsChannel, err := worker.Database.SubscribeMetricEvents(&worker.tomb)
if err != nil {
return err
}
worker.lazyTriggerIDs = make(map[string]bool)
worker.tomb.Go(worker.lazyTriggersWorker)
worker.tomb.Go(worker.runNodataChecker)
worker.remoteEnabled = worker.RemoteConfig.IsEnabled()
if worker.remoteEnabled && worker.Config.MaxParallelRemoteChecks == 0 {
worker.Config.MaxParallelRemoteChecks = runtime.NumCPU()
worker.Logger.Infof("MaxParallelRemoteChecks is not configured, set it to the number of CPU - %d", worker.Config.MaxParallelRemoteChecks)
}
if worker.remoteEnabled {
worker.tomb.Go(worker.remoteChecker)
worker.Logger.Info("Remote checker started")
} else {
worker.Logger.Info("Remote checker disabled")
}
worker.Logger.Infof("Start %v parallel checker(s)", worker.Config.MaxParallelChecks)
for i := 0; i < worker.Config.MaxParallelChecks; i++ {
worker.tomb.Go(func() error { return worker.metricsChecker(metricEventsChannel) })
worker.tomb.Go(func() error { return worker.startTriggerHandler(false, worker.Metrics.MoiraMetrics) })
}
if worker.remoteEnabled {
worker.Logger.Infof("Start %v parallel remote checker(s)", worker.Config.MaxParallelRemoteChecks)
for i := 0; i < worker.Config.MaxParallelRemoteChecks; i++ {
worker.tomb.Go(func() error { return worker.startTriggerHandler(true, worker.Metrics.RemoteMetrics) })
}
}
worker.Logger.Info("Checking new events started")
go func() {
<-worker.tomb.Dying()
worker.Logger.Info("Checking for new events stopped")
}()
worker.tomb.Go(func() error { return worker.checkMetricEventsChannelLen(metricEventsChannel) })
worker.tomb.Go(worker.checkTriggersToCheckCount)
return nil
}
func (worker *Checker) checkTriggersToCheckCount() error {
checkTicker := time.NewTicker(time.Millisecond * 100)
var triggersToCheckCount, remoteTriggersToCheckCount int64
var err error
for {
select {
case <-worker.tomb.Dying():
return nil
case <-checkTicker.C:
triggersToCheckCount, err = worker.Database.GetTriggersToCheckCount()
if err == nil {
worker.Metrics.MoiraMetrics.TriggersToCheckCount.Update(triggersToCheckCount)
}
if worker.remoteEnabled {
remoteTriggersToCheckCount, err = worker.Database.GetRemoteTriggersToCheckCount()
if err == nil {
worker.Metrics.RemoteMetrics.TriggersToCheckCount.Update(remoteTriggersToCheckCount)
}
}
}
}
}
func (worker *Checker) checkMetricEventsChannelLen(ch <-chan *moira.MetricEvent) error {
checkTicker := time.NewTicker(time.Millisecond * 100)
for {
select {
case <-worker.tomb.Dying():
return nil
case <-checkTicker.C:
worker.Metrics.MetricEventsChannelLen.Update(int64(len(ch)))
}
}
}
// Stop stops checks triggers
func (worker *Checker) Stop() error {
worker.Database.DeregisterNodataChecker()
worker.tomb.Kill(nil)
return worker.tomb.Wait()
}