This repository has been archived by the owner on Nov 17, 2021. It is now read-only.
/
message_aggregator.go
88 lines (72 loc) · 1.93 KB
/
message_aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package v1
import (
"crypto/sha1"
"fmt"
"io"
"sort"
"sync"
"time"
"github.com/cloudfoundry/sonde-go/events"
)
var MaxTTL = time.Minute
type MessageAggregator struct {
mu sync.Mutex
counterTotals map[counterID]uint64
outputWriter EnvelopeWriter
}
func NewAggregator(outputWriter EnvelopeWriter) *MessageAggregator {
return &MessageAggregator{
outputWriter: outputWriter,
counterTotals: make(map[counterID]uint64),
}
}
func (m *MessageAggregator) Write(envelope *events.Envelope) {
if envelope.GetEventType() == events.Envelope_CounterEvent {
envelope = m.handleCounter(envelope)
}
m.outputWriter.Write(envelope)
}
func (m *MessageAggregator) handleCounter(envelope *events.Envelope) *events.Envelope {
countID := counterID{
name: envelope.GetCounterEvent().GetName(),
origin: envelope.GetOrigin(),
tagsHash: hashTags(envelope.Tags),
}
m.mu.Lock()
defer m.mu.Unlock()
if envelope.GetCounterEvent().GetTotal() != 0 {
m.counterTotals[countID] = envelope.GetCounterEvent().GetTotal()
return envelope
}
newVal := m.counterTotals[countID] + envelope.GetCounterEvent().GetDelta()
m.counterTotals[countID] = newVal
envelope.GetCounterEvent().Total = &newVal
return envelope
}
func hashTags(tags map[string]string) string {
hash := ""
elements := []mapElement{}
for k, v := range tags {
elements = append(elements, mapElement{k, v})
}
sort.Sort(byKey(elements))
for _, element := range elements {
kHash, vHash := sha1.New(), sha1.New()
io.WriteString(kHash, element.k)
io.WriteString(vHash, element.v)
hash += fmt.Sprintf("%x%x", kHash.Sum(nil), vHash.Sum(nil))
}
return hash
}
type byKey []mapElement
func (a byKey) Len() int { return len(a) }
func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byKey) Less(i, j int) bool { return a[i].k < a[j].k }
type mapElement struct {
k, v string
}
type counterID struct {
origin string
name string
tagsHash string
}