forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
140 lines (125 loc) · 2.91 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// The stats service collects the exported stats and submits them to
// the Kapacitor stream under the configured database and retetion policy.
//
// If you want to persist the data to InfluxDB just add a task like so:
//
// Example:
// stream
// .from()
// .influxDBOut()
//
// Assuming using default database and retetion policy run:
// `kapacitor define -name _stats -type stream -tick path/to/above/script.tick -dbrp _kapacitor.default`
//
// If you do create a task to send the data to InfluxDB make sure not to subscribe to that data in InfluxDB.
//
// Example:
//
// [influxdb]
// ...
// [influxdb.excluded-subscriptions]
// _kapacitor = [ "default" ]
//
package stats
import (
"sync"
"time"
"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/server/vars"
"github.com/influxdata/kapacitor/timer"
)
type Diagnostic interface {
Error(msg string, err error)
}
// Sends internal stats back into the Kapacitor stream.
// Internal stats come from running tasks and other
// services running within Kapacitor.
type Service struct {
TaskMaster interface {
Stream(name string) (kapacitor.StreamCollector, error)
}
stream kapacitor.StreamCollector
interval time.Duration
db string
rp string
timingSampleRate float64
timingMovingAvgSize int
open bool
closing chan struct{}
mu sync.Mutex
wg sync.WaitGroup
diag Diagnostic
}
func NewService(c Config, d Diagnostic) *Service {
return &Service{
interval: time.Duration(c.StatsInterval),
db: c.Database,
rp: c.RetentionPolicy,
timingSampleRate: c.TimingSampleRate,
timingMovingAvgSize: c.TimingMovingAverageSize,
diag: d,
}
}
func (s *Service) Open() (err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.stream, err = s.TaskMaster.Stream("stats")
if err != nil {
return
}
s.open = true
s.closing = make(chan struct{})
s.wg.Add(1)
go s.sendStats()
return
}
func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if !s.open {
return nil
}
s.open = false
close(s.closing)
s.wg.Wait()
s.stream.Close()
return nil
}
func (s *Service) sendStats() {
defer s.wg.Done()
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-s.closing:
return
case <-ticker.C:
s.reportStats()
}
}
}
func (s *Service) reportStats() {
now := time.Now().UTC()
data, err := vars.GetStatsData()
if err != nil {
s.diag.Error("error getting stats data", err)
return
}
for _, stat := range data {
p := edge.NewPointMessage(
stat.Name,
s.db,
s.rp,
models.Dimensions{},
models.Fields(stat.Values),
models.Tags(stat.Tags),
now,
)
s.stream.CollectPoint(p)
}
}
func (s *Service) NewTimer(avgVar timer.Setter) timer.Timer {
return timer.New(s.timingSampleRate, s.timingMovingAvgSize, avgVar)
}