forked from grafana/k6
/
flush.go
115 lines (100 loc) · 3.76 KB
/
flush.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package expv2
import (
"context"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)
type pusher interface {
push(ctx context.Context, referenceID string, samples *pbcloud.MetricSet) error
}
type metricsFlusher struct {
referenceID string
bq *bucketQ
client pusher
aggregationPeriodInSeconds uint32
}
func (f *metricsFlusher) Flush(ctx context.Context) error {
// drain the buffer
buckets := f.bq.PopAll()
if len(buckets) < 1 {
return nil
}
// Pivot the data structure from a slice of Timebuckets
// to a metric set of time series where each has nested samples.
//
// The Protobuf payload structure has the metric as the first level
// instead of the current slice of buckets that knows about the metric
// only in deeply nested levels. So, we need to go through the buckets
// and group them by metric. To avoid doing too many loops and allocations,
// the metricSetBuilder is used for doing it during the traverse of the buckets.
msb := newMetricSetBuilder(f.referenceID, f.aggregationPeriodInSeconds)
for i := 0; i < len(buckets); i++ {
msb.addTimeBucket(&buckets[i])
}
// send the MetricSet to the remote service
return f.client.push(ctx, f.referenceID, msb.MetricSet)
}
type metricSetBuilder struct {
MetricSet *pbcloud.MetricSet
TestRunID string
AggregationPeriodInSeconds uint32
// TODO: If we will introduce the metricID then we could
// just use it as map's key (map[uint64]pbcloud.Metric). It is faster.
// Or maybe, when we will have a better vision around the dynamic tracking
// for metrics (https://github.com/grafana/k6/issues/1321) then we could consider
// if an array, with the length equals to the number of registered metrics,
// could eventually work.
//
// TODO: we may evaluate to replace it with
// map[*metrics.Metric][]*pbcloud.TimeSeries)
// and use a sync.Pool for the series slice.
// We need dedicated benchmarks before doing it.
//
// metrics tracks the related metric conversion
// into a protobuf structure.
metrics map[*metrics.Metric]*pbcloud.Metric
// seriesIndex tracks the index of the time series XYZ
// in the related slice in
// metrics[XYZ].<pbcloud.Metric>.TimeSeries.
// It supports the iterative process for appending
// the aggregated measurements for each time series.
seriesIndex map[metrics.TimeSeries]uint
}
func newMetricSetBuilder(testRunID string, aggrPeriodSec uint32) metricSetBuilder {
return metricSetBuilder{
TestRunID: testRunID,
MetricSet: &pbcloud.MetricSet{},
AggregationPeriodInSeconds: aggrPeriodSec,
// TODO: evaluate if removing the pointer from pbcloud.Metric
// is a better trade-off
metrics: make(map[*metrics.Metric]*pbcloud.Metric),
seriesIndex: make(map[metrics.TimeSeries]uint),
}
}
func (msb *metricSetBuilder) addTimeBucket(bucket *timeBucket) {
for timeSeries, sink := range bucket.Sinks {
pbmetric, ok := msb.metrics[timeSeries.Metric]
if !ok {
pbmetric = &pbcloud.Metric{
Name: timeSeries.Metric.Name,
Type: mapMetricTypeProto(timeSeries.Metric.Type),
}
msb.metrics[timeSeries.Metric] = pbmetric
msb.MetricSet.Metrics = append(msb.MetricSet.Metrics, pbmetric)
}
var pbTimeSeries *pbcloud.TimeSeries
ix, ok := msb.seriesIndex[timeSeries]
if !ok {
pbTimeSeries = &pbcloud.TimeSeries{
AggregationPeriod: msb.AggregationPeriodInSeconds,
Labels: mapTimeSeriesLabelsProto(timeSeries, msb.TestRunID),
}
pbmetric.TimeSeries = append(pbmetric.TimeSeries, pbTimeSeries)
msb.seriesIndex[timeSeries] = uint(len(pbmetric.TimeSeries) - 1)
} else {
pbTimeSeries = pbmetric.TimeSeries[ix]
}
addBucketToTimeSeriesProto(
pbTimeSeries, timeSeries.Metric.Type, bucket.Time, sink)
}
}