-
Notifications
You must be signed in to change notification settings - Fork 3
/
multi_header_merger.go
115 lines (101 loc) · 3.13 KB
/
multi_header_merger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package steps
import (
"fmt"
"sort"
"github.com/bitflow-stream/go-bitflow/bitflow"
"github.com/bitflow-stream/go-bitflow/script/reg"
log "github.com/sirupsen/logrus"
)
// Can tolerate multiple headers, fills missing data up with default values.
type MultiHeaderMerger struct {
bitflow.NoopProcessor
header *bitflow.Header
metrics map[string][]bitflow.Value
samples []*bitflow.SampleMetadata
}
func NewMultiHeaderMerger() *MultiHeaderMerger {
return &MultiHeaderMerger{
metrics: make(map[string][]bitflow.Value),
}
}
func RegisterMergeHeaders(b reg.ProcessorRegistry) {
b.RegisterStep("merge_headers",
func(p *bitflow.SamplePipeline, _ map[string]interface{}) error {
p.Add(NewMultiHeaderMerger())
return nil
},
"Accept any number of changing headers and merge them into one output header when flushing the results")
}
func (p *MultiHeaderMerger) Sample(sample *bitflow.Sample, header *bitflow.Header) error {
p.addSample(sample, header)
return nil
}
func (p *MultiHeaderMerger) addSample(incomingSample *bitflow.Sample, header *bitflow.Header) {
handledMetrics := make(map[string]bool, len(header.Fields))
for i, field := range header.Fields {
metrics, ok := p.metrics[field]
if !ok {
metrics = make([]bitflow.Value, len(p.samples)) // Filled up with zeroes
}
p.metrics[field] = append(metrics, incomingSample.Values[i])
handledMetrics[field] = true
}
for field := range p.metrics {
if ok := handledMetrics[field]; !ok {
p.metrics[field] = append(p.metrics[field], 0) // Filled up with zeroes
}
}
p.samples = append(p.samples, incomingSample.Metadata())
}
func (p *MultiHeaderMerger) Close() {
defer p.CloseSink()
defer func() {
// Allow garbage collection
p.metrics = nil
p.samples = nil
}()
if len(p.samples) == 0 {
log.Warnln(p.String(), "has no samples stored")
return
}
log.Println(p, "reconstructing and flushing", len(p.samples), "samples with", len(p.metrics), "metrics")
outHeader := p.reconstructHeader()
for index := range p.samples {
outSample := p.reconstructSample(index, outHeader)
if err := p.NoopProcessor.Sample(outSample, outHeader); err != nil {
err = fmt.Errorf("Error flushing reconstructed samples: %v", err)
log.Errorln(err)
p.Error(err)
return
}
}
}
func (p *MultiHeaderMerger) OutputSampleSize(sampleSize int) int {
if len(p.metrics) > sampleSize {
sampleSize = len(p.metrics)
}
return sampleSize
}
func (p *MultiHeaderMerger) reconstructHeader() *bitflow.Header {
fields := make([]string, 0, len(p.metrics))
for field := range p.metrics {
fields = append(fields, field)
}
sort.Strings(fields)
return &bitflow.Header{Fields: fields}
}
func (p *MultiHeaderMerger) reconstructSample(num int, header *bitflow.Header) *bitflow.Sample {
values := make([]bitflow.Value, len(p.metrics))
for i, field := range header.Fields {
slice := p.metrics[field]
if len(slice) != len(p.samples) {
// Should never happen
panic(fmt.Sprintf("Have %v values for field %v, should be %v", len(slice), field, len(p.samples)))
}
values[i] = slice[num]
}
return p.samples[num].NewSample(values)
}
func (p *MultiHeaderMerger) String() string {
return "MultiHeaderMerger"
}