/
metricAggregator.go
73 lines (59 loc) · 1.88 KB
/
metricAggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package aggregator
import (
"bytes"
"time"
metricspb "github.com/DataDog/agent-payload/v5/gogen"
"github.com/DataDog/datadog-agent/test/fakeintake/api"
)
//nolint:revive // TODO(APL) Fix revive linter
type MetricSeries struct {
// embed proto Metric Series struct
metricspb.MetricPayload_MetricSeries
collectedTime time.Time
}
func (mp *MetricSeries) name() string {
return mp.Metric
}
// GetTags return the tags from a payload
func (mp *MetricSeries) GetTags() []string {
return mp.Tags
}
// GetCollectedTime return the time when the payload has been collected by the fakeintake server
func (mp *MetricSeries) GetCollectedTime() time.Time {
return mp.collectedTime
}
// ParseMetricSeries return the parsed metrics from payload
func ParseMetricSeries(payload api.Payload) (metrics []*MetricSeries, err error) {
if bytes.Equal(payload.Data, []byte("{}")) {
// metrics can submit empty JSON object
return []*MetricSeries{}, nil
}
enflated, err := enflate(payload.Data, payload.Encoding)
if err != nil {
return nil, err
}
metricsPayload := new(metricspb.MetricPayload)
err = metricsPayload.Unmarshal(enflated)
if err != nil {
return nil, err
}
metrics = []*MetricSeries{}
for _, serie := range metricsPayload.Series {
metrics = append(metrics, &MetricSeries{MetricPayload_MetricSeries: *serie, collectedTime: payload.Timestamp})
}
return metrics, err
}
//nolint:revive // TODO(APL) Fix revive linter
type MetricAggregator struct {
Aggregator[*MetricSeries]
}
//nolint:revive // TODO(APL) Fix revive linter
func NewMetricAggregator() MetricAggregator {
return MetricAggregator{
Aggregator: newAggregator(ParseMetricSeries),
}
}