forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
138 lines (124 loc) · 3.03 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// The stats service collects the exported stats and submits them to
// the Kapacitor stream under the configured database and retetion policy.
//
// If you want to persist the data to InfluxDB just add a task like so:
//
// Example:
// stream
// .from()
// .influxDBOut()
//
// Assuming using default database and retetion policy run:
// `kapacitor define -name _stats -type stream -tick path/to/above/script.tick -dbrp _kapacitor.default`
//
// If you do create a task to send the data to InfluxDB make sure not to subscribe to that data in InfluxDB.
//
// Example:
//
// [influxdb]
// ...
// [influxdb.excluded-subscriptions]
// _kapacitor = [ "default" ]
//
package stats
import (
"errors"
"log"
"sync"
"time"
"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/timer"
)
// Sends internal stats back into the Kapacitor stream.
// Internal stats come from running tasks and other
// services running within Kapacitor.
type Service struct {
TaskMaster interface {
Stream(name string) (kapacitor.StreamCollector, error)
}
stream kapacitor.StreamCollector
interval time.Duration
db string
rp string
timingSampleRate float64
timingMovingAvgSize int
open bool
closing chan struct{}
mu sync.Mutex
wg sync.WaitGroup
logger *log.Logger
}
func NewService(c Config, l *log.Logger) *Service {
return &Service{
interval: time.Duration(c.StatsInterval),
db: c.Database,
rp: c.RetentionPolicy,
timingSampleRate: c.TimingSampleRate,
timingMovingAvgSize: c.TimingMovingAverageSize,
logger: l,
}
}
func (s *Service) Open() (err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.stream, err = s.TaskMaster.Stream("stats")
if err != nil {
return
}
s.open = true
s.closing = make(chan struct{})
s.wg.Add(1)
go s.sendStats()
s.logger.Println("I! opened service")
return
}
func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if !s.open {
return errors.New("error closing stats service: service not open")
}
s.open = false
close(s.closing)
s.wg.Wait()
s.stream.Close()
s.logger.Println("I! closed service")
return nil
}
func (s *Service) sendStats() {
defer s.wg.Done()
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-s.closing:
return
case <-ticker.C:
s.reportStats()
}
}
}
func (s *Service) reportStats() {
now := time.Now().UTC()
data, err := kapacitor.GetStatsData()
if err != nil {
s.logger.Println("E! error getting stats data:", err)
return
}
for _, stat := range data {
p := models.Point{
Database: s.db,
RetentionPolicy: s.rp,
Name: stat.Name,
Group: models.NilGroup,
Tags: models.Tags(stat.Tags),
Time: now,
Fields: models.Fields(stat.Values),
}
s.stream.CollectPoint(p)
}
}
func (s *Service) NewTimer(avgVar timer.Setter) timer.Timer {
return timer.New(s.timingSampleRate, s.timingMovingAvgSize, avgVar)
}