forked from Meituan-Dianping/cat-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
aggregator_metric.go
123 lines (103 loc) · 2.4 KB
/
aggregator_metric.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package cat
import (
"fmt"
"strconv"
"time"
"github.com/changbo218/cat-go/message"
)
type metricData struct {
name string
count int
duration time.Duration
}
type metricAggregator struct {
scheduleMixin
ch chan *metricData
dataMap map[string]*metricData
ticker *time.Ticker
}
func (p *metricAggregator) GetName() string {
return "MetricAggregator"
}
func (p *metricAggregator) afterStart() {
p.ticker = time.NewTicker(metricAggregatorInterval)
}
func (p *metricAggregator) beforeStop() {
close(p.ch)
for event := range p.ch {
p.putOrMerge(event)
}
p.collectAndSend()
p.ticker.Stop()
}
func (p *metricAggregator) process() {
select {
case sig := <-p.signals:
p.handle(sig)
case data := <-p.ch:
p.putOrMerge(data)
case <-p.ticker.C:
p.collectAndSend()
}
}
func (p *metricAggregator) collectAndSend() {
dataMap := p.dataMap
p.dataMap = make(map[string]*metricData)
p.send(dataMap)
}
func (p *metricAggregator) send(dataMap map[string]*metricData) {
if len(dataMap) == 0 {
return
}
t := message.NewTransaction(typeSystem, nameMetricAggregator, aggregator.flush)
defer t.Complete()
for _, data := range dataMap {
metric := message.NewMetric("", data.name, nil)
if data.duration > 0 {
metric.SetStatus("S,C")
duration := data.duration.Nanoseconds() / time.Millisecond.Nanoseconds()
metric.SetData(fmt.Sprintf("%d,%d", data.count, duration))
} else {
metric.SetStatus("C")
metric.SetData(strconv.Itoa(data.count))
}
t.AddChild(metric)
}
}
func (p *metricAggregator) putOrMerge(data *metricData) {
if item, ok := p.dataMap[data.name]; ok {
item.count += data.count
item.duration += data.duration
} else {
p.dataMap[data.name] = data
}
}
func newMetricAggregator() *metricAggregator {
return &metricAggregator{
scheduleMixin: makeScheduleMixedIn(signalMetricAggregatorExit),
ch: make(chan *metricData, metricAggregatorChannelCapacity),
dataMap: make(map[string]*metricData),
}
}
func (p *metricAggregator) AddDuration(name string, duration time.Duration) {
select {
case p.ch <- &metricData{
name: name,
count: 1,
duration: duration,
}:
default:
logger.Warning("Metric aggregator is full")
}
}
func (p *metricAggregator) AddCount(name string, count int) {
select {
case p.ch <- &metricData{
name: name,
count: count,
duration: 0,
}:
default:
logger.Warning("Metric aggregator is full")
}
}