-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
417 lines (377 loc) · 8.98 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
package main
import (
"bufio"
"flag"
"html/template"
"io"
"log"
"net"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/paulhammond/tai64"
"aqwari.net/io/tailpipe"
)
const (
MetricsBufferMax = 100
QueryTrackingLimit = 100000
)
var (
interval = flag.Duration("i", time.Minute, "time between data points")
nameTemplate = flag.String("m",
"servers.{{.Hostname}}.dnscache.{{.Metric}}",
"naming scheme for metrics")
inputFile = flag.String("f", "", "tail this file instead of stdin")
verbose = flag.Bool("v", false, "print detailed information to stderr")
timestampedLog = flag.Bool("tai64", false, "log input contains tai64n timestamps")
)
var Hostname, Service string
// Most of these heuristics assume daemontools
func guessServiceName() string {
pwd, _ := os.Getwd()
// Example: /service/dnscache/log/run
if path.Base(pwd) == "log" {
return path.Base(path.Dir(pwd))
}
// Example: /var/log/dnscache
dir, el := path.Split(pwd)
for len(el) > 0 {
if strings.Contains(el, "dnscache") {
return el
}
dir, el = path.Split(dir)
}
// If we were to return "" here, we would be
// creating invalid metric names.
return "main"
}
// A ticker that aligns returned times on regular multiples
// of interval. Leaks a goroutine, so do not use in a library
func alignedTicker(d time.Duration) <-chan time.Time {
aligned := make(chan time.Time)
go func() {
for t := range time.Tick(d) {
aligned <- t.Truncate(d)
}
}()
return aligned
}
func verbosef(fmt string, args ...interface{}) {
if *verbose {
log.Printf(fmt, args...)
}
}
type metric struct {
Metric string
Time time.Time
Value int
}
// For metric name templates
func (m metric) Hostname() string {
return Hostname
}
type stats struct {
cacheHits int
dropped int
queries map[string]time.Time
nqueries int
latency time.Duration
maxLatency time.Duration
servfail int
outgoing int
activeUDP int
activeTCP int
cacheMotion struct{ start, end int }
}
type logReader struct {
src io.Reader
dst chan metric
interval time.Duration
timestamped bool
done chan struct{}
mu sync.Mutex // protects following members
stats
}
// There are two main run modes; from timestamped
// input and non-timestamped input. Timestamped is
// useful because it allows you to convert old data to
// fill in gaps.
func (l *logReader) run() {
var lastTime, now time.Time
var err error
if !l.timestamped {
go l.sampleRun()
}
scanner := bufio.NewScanner(l.src)
for scanner.Scan() {
line := strings.Split(scanner.Text(), " ")
if len(line) < 1 {
continue
}
if l.timestamped {
now, err = tai64.ParseTai64n(line[0])
if err != nil {
verbosef("bad timestamp %q: %v", line[0], err)
continue
}
line = line[1:]
if lastTime.IsZero() {
lastTime = now.Truncate(l.interval)
} else if now.Sub(lastTime) >= l.interval {
// Do this instead of lastTime = now so we get
// evenly spaced data points
lastTime = now.Truncate(l.interval)
l.sample(lastTime)
}
} else {
now = time.Now()
}
l.parseLine(now, line)
}
if scanner.Err() != nil {
verbosef("finished processing log: %v", scanner.Err())
}
if l.timestamped {
close(l.dst)
} else {
close(l.done)
}
}
// the run() method blocks on reading input, and therefore cannot
// send data at regular intervals. rather than implement read timeouts,
// when running on non-timestamped input, sampling is done in a
// separate goroutine.
func (l *logReader) sampleRun() {
tick := alignedTicker(l.interval)
Loop:
for {
select {
case <-l.done:
break Loop
case ts := <-tick:
l.sample(ts)
}
}
close(l.dst)
}
func (l *logReader) parseLine(ts time.Time, line []string) {
if len(line) < 3 {
return
}
event, args := line[0], line[1:]
l.mu.Lock()
defer l.mu.Unlock()
switch event {
case "cached":
l.cacheHits++
case "drop":
l.dropped++
delete(l.queries, args[0])
case "query":
if len(l.queries) < QueryTrackingLimit {
l.queries[args[0]] = ts
l.nqueries++
}
case "servfail":
l.servfail++
case "sent":
if v, ok := l.queries[args[0]]; ok {
latency := ts.Sub(v)
if latency > l.maxLatency {
l.maxLatency = latency
}
l.latency += latency
delete(l.queries, args[0])
}
case "stats":
if len(args) < 4 {
break
}
if v, err := strconv.Atoi(args[1]); err == nil {
if l.cacheMotion.start == 0 {
l.cacheMotion.start = v
}
l.cacheMotion.end = v
}
if activeUDP, err := strconv.Atoi(args[2]); err == nil {
if activeUDP > l.activeUDP {
l.activeUDP = activeUDP
}
}
if activeTCP, err := strconv.Atoi(args[3]); err == nil {
if activeTCP > l.activeTCP {
l.activeTCP = activeTCP
}
}
case "tx":
l.outgoing++
}
}
func (l *logReader) sample(ts time.Time) {
var metrics []metric
l.mu.Lock()
{
metrics = []metric{
{"queries", ts, l.nqueries},
{"cache_motion", ts, l.cacheMotion.end - l.cacheMotion.start},
{"udp_active", ts, l.activeUDP},
{"tcp_active", ts, l.activeTCP},
{"cache_hits", ts, l.cacheHits},
{"drop", ts, l.dropped},
{"query_max_us", ts, int(l.maxLatency / time.Microsecond)},
{"servfail", ts, l.servfail},
{"tx", ts, l.outgoing},
{"query_avg_us", ts, 0},
}
if l.nqueries > 0 {
metrics[len(metrics)-1].Value =
int((l.latency / time.Duration(l.nqueries)) / time.Microsecond)
}
l.stats = stats{queries: l.queries}
// This is a precaution against leaking memory. We do not expect
// that dnscache won't acknowledge finished queries, but there are
// other ways to miss the notification (I/O errors, file rotation,
// bugs in this program, ...)
for k, v := range l.queries {
if v.Before(ts) {
delete(l.queries, k)
}
}
}
l.mu.Unlock()
var dropCount int
for _, m := range metrics {
// NOTE(droyo) if we're buffering because of a bad connection, we always
// want to drop the older metrics first, because that will give a more accurate
// picture of *when* our connection became slow, and in general, newer metrics
// are more valuable than older metrics overall, especially if you are using them
// to generate alerts.
select {
case l.dst <- m:
case <-l.dst:
dropCount++
if dropCount == 100 {
verbosef("dropped %d metrics due to buffer full")
dropCount = 0
}
}
}
}
func toFile(filename string, out *template.Template, wg sync.WaitGroup) (chan metric, error) {
c := make(chan metric, MetricsBufferMax)
flags := os.O_WRONLY | os.O_APPEND | os.O_CREATE
file, err := os.OpenFile(filename, flags, 0666)
if err != nil {
return nil, err
}
wg.Add(1)
go func() {
for m := range c {
if err := out.Execute(file, m); err != nil {
verbosef("to file: %v", err)
}
}
file.Close()
wg.Done()
}()
return c, nil
}
func toGraphite(hostport string, out *template.Template, wg sync.WaitGroup) (chan metric, error) {
c := make(chan metric, MetricsBufferMax)
if !strings.Contains(hostport, ":") {
hostport += ":2003"
}
conn, err := net.Dial("tcp", hostport)
if err != nil {
return nil, err
}
wg.Add(1)
go func() {
bw := bufio.NewWriter(conn)
for m := range c {
out.Execute(bw, m)
bw.Flush()
}
conn.Close()
wg.Done()
}()
return c, nil
}
func main() {
var src io.Reader
var err error
var c chan metric
var wg sync.WaitGroup
flag.Parse()
if flag.NArg() != 1 {
flag.Usage()
os.Exit(2)
}
tmpl := template.Must(template.New("metric").Parse(
"{{template \"name\" .}} {{.Value}} {{.Time.Unix}}\n"))
_, err = tmpl.New("name").Parse(*nameTemplate)
if err != nil {
log.Fatal(err)
}
tmpl.Funcs(template.FuncMap{
"lstrip": strings.TrimPrefix,
"rstrip": strings.TrimSuffix,
"join": strings.Join,
"split": strings.Split,
})
Hostname, err = os.Hostname()
Hostname = strings.SplitN(Hostname, ".", 2)[0]
if err != nil {
log.Fatal(err)
}
Service = guessServiceName()
if len(*inputFile) > 0 {
file, err := tailpipe.Open(*inputFile)
if err != nil {
log.Fatal(err)
}
src = file
verbosef("following file %s", file.Name())
} else {
verbosef("reading from stdin")
src = os.Stdin
}
dest := flag.Arg(0)
if strings.HasPrefix(dest, "./") || strings.HasPrefix(dest, "/") {
c, err = toFile(dest, tmpl, wg)
verbosef("appending metrics to file %s", dest)
} else {
c, err = toGraphite(dest, tmpl, wg)
verbosef("sending metrics to graphite server %s", dest)
}
if err != nil {
log.Fatal(err)
}
// Duplicating the log on stdout makes this utility usable
// as a daemontools log service that feeds multilog/s6-log.
// For this reason we should not buffer writes to stdout
rd, wr := io.Pipe()
lr := &logReader{
src: rd,
dst: c,
interval: *interval,
timestamped: *timestampedLog,
done: make(chan struct{}),
stats: stats{
queries: make(map[string]time.Time, 10000),
},
}
go lr.run()
_, err = io.Copy(io.MultiWriter(wr, os.Stdout), src)
wr.Close()
// Wait for metrics to finish getting to disk/graphite.
// TODO(droyo) might want to have a deadline here.
wg.Wait()
if err != nil && err != io.EOF {
verbosef("ending: %v", err)
os.Exit(1)
}
}