forked from go-kit/kit
/
emitter.go
159 lines (137 loc) · 4.3 KB
/
emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package graphite
import (
"bufio"
"fmt"
"io"
"net"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/util/conn"
)
// Emitter is a struct to manage connections and orchestrate the emission of
// metrics to a Graphite system.
type Emitter struct {
mtx sync.Mutex
prefix string
mgr *conn.Manager
counters []*counter
histograms []*windowedHistogram
gauges []*gauge
logger log.Logger
quitc chan chan struct{}
}
// NewEmitter will return an Emitter that will prefix all metrics names with the
// given prefix. Once started, it will attempt to create a connection with the
// given network and address via `net.Dial` and periodically post metrics to the
// connection in the Graphite plaintext protocol.
func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
}
// NewEmitterDial is the same as NewEmitter, but allows you to specify your own
// Dialer function. This is primarily useful for tests.
func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
e := &Emitter{
prefix: metricsPrefix,
mgr: conn.NewManager(dialer, network, address, time.After, logger),
logger: logger,
quitc: make(chan chan struct{}),
}
go e.loop(flushInterval)
return e
}
// NewCounter returns a Counter whose value will be periodically emitted in
// a Graphite-compatible format once the Emitter is started. Fields are ignored.
func (e *Emitter) NewCounter(name string) metrics.Counter {
e.mtx.Lock()
defer e.mtx.Unlock()
c := newCounter(name)
e.counters = append(e.counters, c)
return c
}
// NewHistogram is taken from http://github.com/codahale/metrics. It returns a
// windowed HDR histogram which drops data older than five minutes.
//
// The histogram exposes metrics for each passed quantile as gauges. Quantiles
// should be integers in the range 1..99. The gauge names are assigned by using
// the passed name as a prefix and appending "_pNN" e.g. "_p50".
//
// The values of this histogram will be periodically emitted in a
// Graphite-compatible format once the Emitter is started. Fields are ignored.
func (e *Emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
gauges := map[int]metrics.Gauge{}
for _, quantile := range quantiles {
if quantile <= 0 || quantile >= 100 {
return nil, fmt.Errorf("invalid quantile %d", quantile)
}
gauges[quantile] = e.gauge(fmt.Sprintf("%s_p%02d", name, quantile))
}
h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges, e.logger)
e.mtx.Lock()
defer e.mtx.Unlock()
e.histograms = append(e.histograms, h)
return h, nil
}
// NewGauge returns a Gauge whose value will be periodically emitted in a
// Graphite-compatible format once the Emitter is started. Fields are ignored.
func (e *Emitter) NewGauge(name string) metrics.Gauge {
e.mtx.Lock()
defer e.mtx.Unlock()
return e.gauge(name)
}
func (e *Emitter) gauge(name string) metrics.Gauge {
g := &gauge{name, 0}
e.gauges = append(e.gauges, g)
return g
}
func (e *Emitter) loop(d time.Duration) {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ticker.C:
e.Flush()
case q := <-e.quitc:
e.Flush()
close(q)
return
}
}
}
// Stop will flush the current metrics and close the active connection. Calling
// stop more than once is a programmer error.
func (e *Emitter) Stop() {
q := make(chan struct{})
e.quitc <- q
<-q
}
// Flush will write the current metrics to the Emitter's connection in the
// Graphite plaintext protocol.
func (e *Emitter) Flush() {
e.mtx.Lock() // one flush at a time
defer e.mtx.Unlock()
conn := e.mgr.Take()
if conn == nil {
e.logger.Log("during", "flush", "err", "connection unavailable")
return
}
err := e.flush(conn)
if err != nil {
e.logger.Log("during", "flush", "err", err)
}
e.mgr.Put(err)
}
func (e *Emitter) flush(w io.Writer) error {
bw := bufio.NewWriter(w)
for _, c := range e.counters {
c.flush(bw, e.prefix)
}
for _, h := range e.histograms {
h.flush(bw, e.prefix)
}
for _, g := range e.gauges {
g.flush(bw, e.prefix)
}
return bw.Flush()
}