-
Notifications
You must be signed in to change notification settings - Fork 3
/
http.go
96 lines (82 loc) · 2.43 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package plot
import (
"fmt"
"sort"
"sync"
"github.com/antongulenko/golib"
"github.com/bitflow-stream/go-bitflow/bitflow"
"github.com/bitflow-stream/go-bitflow/script/reg"
"github.com/bitflow-stream/go-bitflow/steps"
)
func RegisterHttpPlotter(b reg.ProcessorRegistry) {
create := func(p *bitflow.SamplePipeline, params map[string]interface{}) error {
windowSize := params["window"].(int)
static := params["local-static"].(bool)
p.Add(NewHttpPlotter(params["endpoint"].(string), windowSize, static))
return nil
}
b.RegisterStep("http", create,
"Serve HTTP-based plots about processed metrics values to the given HTTP endpoint").
Required("endpoint", reg.String()).
Optional("window", reg.Int(), 100).
Optional("local-static", reg.Bool(), false)
}
func NewHttpPlotter(endpoint string, windowSize int, useLocalStatic bool) *HttpPlotter {
return &HttpPlotter{
data: make(map[string]*steps.MetricWindow),
Endpoint: endpoint,
WindowSize: windowSize,
UseLocalStatic: useLocalStatic,
}
}
type HttpPlotter struct {
bitflow.NoopProcessor
Endpoint string
WindowSize int
UseLocalStatic bool
data map[string]*steps.MetricWindow
names []string
}
func (p *HttpPlotter) Start(wg *sync.WaitGroup) golib.StopChan {
go func() {
// This routine cannot be interrupted gracefully
if err := p.serve(); err != nil {
p.Error(err)
}
}()
return p.NoopProcessor.Start(wg)
}
func (p *HttpPlotter) String() string {
return fmt.Sprintf("HTTP plotter on %v (window size %v)", p.Endpoint, p.WindowSize)
}
func (p *HttpPlotter) Sample(sample *bitflow.Sample, header *bitflow.Header) error {
p.logSample(sample, header)
return p.NoopProcessor.Sample(sample, header)
}
func (p *HttpPlotter) logSample(sample *bitflow.Sample, header *bitflow.Header) {
for i, field := range header.Fields {
if _, ok := p.data[field]; !ok {
p.data[field] = steps.NewMetricWindow(p.WindowSize)
p.names = append(p.names, field)
sort.Strings(p.names)
}
p.data[field].Push(sample.Values[i])
}
}
func (p *HttpPlotter) metricNames() []string {
return p.names
}
func (p *HttpPlotter) metricData(metric string) []bitflow.Value {
if data, ok := p.data[metric]; ok {
return data.Data()
} else {
return []bitflow.Value{}
}
}
func (p *HttpPlotter) allMetricData() map[string][]bitflow.Value {
result := make(map[string][]bitflow.Value)
for name, values := range p.data {
result[name] = values.Data()
}
return result
}