This repository has been archived by the owner on Feb 12, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
poster.go
111 lines (94 loc) · 3.16 KB
/
poster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"log"
"sync"
"time"
influx "github.com/heroku/lumbermill/Godeps/_workspace/src/github.com/influxdb/influxdb-go"
metrics "github.com/heroku/lumbermill/Godeps/_workspace/src/github.com/rcrowley/go-metrics"
)
var deliverySizeHistogram = metrics.GetOrRegisterHistogram("lumbermill.poster.deliver.sizes", metrics.DefaultRegistry, metrics.NewUniformSample(100))
type poster struct {
destination *destination
name string
influxClient *influx.Client
pointsSuccessCounter metrics.Counter
pointsSuccessTime metrics.Timer
pointsFailureCounter metrics.Counter
pointsFailureTime metrics.Timer
}
func newPoster(clientConfig influx.ClientConfig, name string, destination *destination, waitGroup *sync.WaitGroup) *poster {
influxClient, err := influx.NewClient(&clientConfig)
if err != nil {
panic(err)
}
return &poster{
destination: destination,
name: name,
influxClient: influxClient,
pointsSuccessCounter: metrics.GetOrRegisterCounter("lumbermill.poster.deliver.points."+name, metrics.DefaultRegistry),
pointsSuccessTime: metrics.GetOrRegisterTimer("lumbermill.poster.success.time."+name, metrics.DefaultRegistry),
pointsFailureCounter: metrics.GetOrRegisterCounter("lumbermill.poster.error.points."+name, metrics.DefaultRegistry),
pointsFailureTime: metrics.GetOrRegisterTimer("lumbermill.poster.error.time."+name, metrics.DefaultRegistry),
}
}
func makeSeries(p point) *influx.Series {
series := &influx.Series{Points: make([][]interface{}, 0)}
series.Name = p.SeriesName()
series.Columns = seriesColumns[p.Type]
return series
}
func (p *poster) Run() {
var last bool
var delivery map[string]*influx.Series
timeout := time.NewTicker(time.Second)
defer func() { timeout.Stop() }()
for !last {
delivery, last = p.nextDelivery(timeout)
p.deliver(delivery)
}
}
func (p *poster) nextDelivery(timeout *time.Ticker) (delivery map[string]*influx.Series, last bool) {
delivery = make(map[string]*influx.Series)
for {
select {
case point, open := <-p.destination.points:
if open {
seriesName := point.SeriesName()
series, found := delivery[seriesName]
if !found {
series = makeSeries(point)
}
series.Points = append(series.Points, point.Points)
delivery[seriesName] = series
} else {
return delivery, true
}
case <-timeout.C:
return delivery, false
}
}
}
func (p *poster) deliver(allSeries map[string]*influx.Series) {
pointCount := 0
seriesGroup := make([]*influx.Series, 0, len(allSeries))
for _, s := range allSeries {
pointCount += len(s.Points)
seriesGroup = append(seriesGroup, s)
}
if pointCount == 0 {
return
}
start := time.Now()
err := p.influxClient.WriteSeriesWithTimePrecision(seriesGroup, influx.Microsecond)
if err != nil {
// TODO: Ugh. These could be timeout errors, or an internal error.
// Should probably attempt to figure out which...
p.pointsFailureCounter.Inc(1)
p.pointsFailureTime.UpdateSince(start)
log.Printf("Error posting points: %s\n", err)
} else {
p.pointsSuccessCounter.Inc(1)
p.pointsSuccessTime.UpdateSince(start)
deliverySizeHistogram.Update(int64(pointCount))
}
}