diff --git a/output/cloud/expv2/collect.go b/output/cloud/expv2/collect.go index 7b5a2403a2c0..5a7a7e715eb5 100644 --- a/output/cloud/expv2/collect.go +++ b/output/cloud/expv2/collect.go @@ -12,7 +12,7 @@ type timeBucket struct { // TODO: for performance reasons, use directly a unix time here // so we can avoid time->unix->time Time time.Time - Sinks map[metrics.TimeSeries]metrics.Sink + Sinks map[metrics.TimeSeries]metricValue } // bucketQ is a queue for buffering the aggregated metrics @@ -56,15 +56,12 @@ type collector struct { aggregationPeriod time.Duration waitPeriod time.Duration - // We should no longer have to handle metrics that have times long in the past. - // So, instead of a map, we can probably use a simple slice (or even an array!) - // as a ring buffer to store the aggregation buckets. - // This should save us a some time, since it would make the lookups and - // WaitPeriod checks basically O(1). - // And even if for some reason there are occasional metrics with past times - // that don't fit in the chosen ring buffer size, we could just send them - // along to the buffer unaggregated. - timeBuckets map[int64]map[metrics.TimeSeries]metrics.Sink + // we should no longer have to handle metrics that have times long in the past. So instead of a + // map, we can probably use a simple slice (or even an array!) as a ring buffer to store the + // aggregation buckets. This should save us a some time, since it would make the lookups and WaitPeriod + // checks basically O(1). And even if for some reason there are occasional metrics with past times that + // don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated + timeBuckets map[int64]map[metrics.TimeSeries]metricValue } func newCollector(aggrPeriod, waitPeriod time.Duration) (*collector, error) { @@ -80,7 +77,7 @@ func newCollector(aggrPeriod, waitPeriod time.Duration) (*collector, error) { return &collector{ bq: bucketQ{}, nowFunc: time.Now, - timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink), + timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue), aggregationPeriod: aggrPeriod, waitPeriod: waitPeriod, }, nil @@ -111,21 +108,18 @@ func (c *collector) collectSample(s metrics.Sample) { // Get or create a time bucket bucket, ok := c.timeBuckets[bucketID] if !ok { - bucket = make(map[metrics.TimeSeries]metrics.Sink) + bucket = make(map[metrics.TimeSeries]metricValue) c.timeBuckets[bucketID] = bucket } // Get or create the bucket's sinks map per time series sink, ok := bucket[s.TimeSeries] if !ok { - sink = newSink(s.Metric.Type) + sink = newMetricValue(s.Metric.Type) bucket[s.TimeSeries] = sink } - // TODO: we may consider to just pass - // the single value instead of the entire - // sample and save some memory - sink.Add(s) + sink.Add(s.Value) } func (c *collector) expiredBuckets() []timeBucket { diff --git a/output/cloud/expv2/collect_test.go b/output/cloud/expv2/collect_test.go index c1cd4d4d2944..9d4af801d997 100644 --- a/output/cloud/expv2/collect_test.go +++ b/output/cloud/expv2/collect_test.go @@ -22,7 +22,7 @@ func TestCollectorCollectSample(t *testing.T) { c := collector{ aggregationPeriod: 3 * time.Second, waitPeriod: 1 * time.Second, - timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink), + timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue), nowFunc: func() time.Time { return time.Unix(31, 0) }, @@ -55,7 +55,7 @@ func TestCollectorCollectSampleAggregateNumbers(t *testing.T) { c := collector{ aggregationPeriod: 3 * time.Second, waitPeriod: 1 * time.Second, - timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink), + timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue), nowFunc: func() time.Time { return time.Unix(31, 0) }, @@ -83,9 +83,9 @@ func TestCollectorCollectSampleAggregateNumbers(t *testing.T) { assert.Contains(t, c.timeBuckets, int64(3)) assert.Contains(t, c.timeBuckets, int64(4)) - sink, ok := c.timeBuckets[4][ts].(*metrics.CounterSink) + sink, ok := c.timeBuckets[4][ts].(*counter) require.True(t, ok) - assert.Equal(t, 7.0, sink.Value) + assert.Equal(t, 7.0, sink.Sum) } func TestDropExpiringDelay(t *testing.T) { @@ -105,7 +105,7 @@ func TestCollectorExpiredBucketsNoExipired(t *testing.T) { nowFunc: func() time.Time { return time.Unix(10, 0) }, - timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{ + timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{ 6: {}, }, } @@ -134,10 +134,10 @@ func TestCollectorExpiredBuckets(t *testing.T) { nowFunc: func() time.Time { return time.Unix(10, 0) }, - timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{ + timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{ 3: { - ts1: &metrics.CounterSink{Value: 10}, - ts2: &metrics.CounterSink{Value: 4}, + ts1: &counter{Sum: 10}, + ts2: &counter{Sum: 4}, }, }, } @@ -145,10 +145,12 @@ func TestCollectorExpiredBuckets(t *testing.T) { require.Len(t, expired, 1) assert.NotZero(t, expired[0].Time) - assert.Equal(t, expired[0].Sinks, map[metrics.TimeSeries]metrics.Sink{ - ts1: &metrics.CounterSink{Value: 10}, - ts2: &metrics.CounterSink{Value: 4}, - }) + + exp := map[metrics.TimeSeries]metricValue{ + ts1: &counter{Sum: 10}, + ts2: &counter{Sum: 4}, + } + assert.Equal(t, exp, expired[0].Sinks) } func TestCollectorExpiredBucketsCutoff(t *testing.T) { @@ -160,7 +162,7 @@ func TestCollectorExpiredBucketsCutoff(t *testing.T) { nowFunc: func() time.Time { return time.Unix(10, 0) }, - timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{ + timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{ 3: {}, 6: {}, 9: {}, @@ -249,9 +251,9 @@ func TestBucketQPopAll(t *testing.T) { func TestBucketQPushPopConcurrency(t *testing.T) { t.Parallel() var ( - counter = 0 - bq = bucketQ{} - sink = metrics.NewSink(metrics.Counter) + count = 0 + bq = bucketQ{} + sink = &counter{} stop = time.After(100 * time.Millisecond) pop = make(chan struct{}, 10) @@ -278,17 +280,17 @@ func TestBucketQPushPopConcurrency(t *testing.T) { close(done) return default: - counter++ + count++ bq.Push([]timeBucket{ { Time: now, - Sinks: map[metrics.TimeSeries]metrics.Sink{ + Sinks: map[metrics.TimeSeries]metricValue{ {}: sink, }, }, }) - if counter%5 == 0 { // a fixed-arbitrary flush rate + if count%5 == 0 { // a fixed-arbitrary flush rate pop <- struct{}{} } } diff --git a/output/cloud/expv2/flush_test.go b/output/cloud/expv2/flush_test.go index b22443daf53d..aff37c5f1811 100644 --- a/output/cloud/expv2/flush_test.go +++ b/output/cloud/expv2/flush_test.go @@ -25,8 +25,8 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) { tb := timeBucket{ Time: time.Unix(1, 0), - Sinks: map[metrics.TimeSeries]metrics.Sink{ - timeSeries: &metrics.CounterSink{}, + Sinks: map[metrics.TimeSeries]metricValue{ + timeSeries: &counter{}, }, } msb := newMetricSetBuilder("testrunid-123", 1) diff --git a/output/cloud/expv2/hdr.go b/output/cloud/expv2/hdr.go index ca94fefc791e..eada2d4a4ea5 100644 --- a/output/cloud/expv2/hdr.go +++ b/output/cloud/expv2/hdr.go @@ -5,7 +5,6 @@ import ( "math/bits" "time" - "go.k6.io/k6/metrics" "go.k6.io/k6/output/cloud/expv2/pbcloud" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -260,14 +259,6 @@ func resolveBucketIndex(val float64) uint32 { return (nkdiff << k) + (upscaled >> nkdiff) } -func (h *histogram) IsEmpty() bool { - return h.Count == 0 -} - -func (h *histogram) Add(s metrics.Sample) { - h.addToBucket(s.Value) -} - -func (h *histogram) Format(time.Duration) map[string]float64 { - panic("output/cloud/expv2/histogram.Format is not expected to be called") +func (h *histogram) Add(v float64) { + h.addToBucket(v) } diff --git a/output/cloud/expv2/integration/testdata/metricset.json b/output/cloud/expv2/integration/testdata/metricset.json index 52ce1b3d41f6..8390132ba52f 100644 --- a/output/cloud/expv2/integration/testdata/metricset.json +++ b/output/cloud/expv2/integration/testdata/metricset.json @@ -57,7 +57,9 @@ "time": "2023-05-01T02:00:01.500Z", "last": 3.14, "min": 3.14, - "max": 3.14 + "max": 3.14, + "avg": 3.14, + "count": 1 } ] } diff --git a/output/cloud/expv2/mapping.go b/output/cloud/expv2/mapping.go index 1f541f4ac400..7e75d7b9dd35 100644 --- a/output/cloud/expv2/mapping.go +++ b/output/cloud/expv2/mapping.go @@ -52,7 +52,7 @@ func addBucketToTimeSeriesProto( timeSeries *pbcloud.TimeSeries, mt metrics.MetricType, time time.Time, - sink metrics.Sink, + sink metricValue, ) { if timeSeries.Samples == nil { initTimeSeriesSamples(timeSeries, mt) @@ -60,31 +60,30 @@ func addBucketToTimeSeriesProto( switch mt { case metrics.Counter: - c := sink.(*metrics.CounterSink).Value //nolint: forcetypeassert + c := sink.(*counter).Sum //nolint: forcetypeassert samples := timeSeries.GetCounterSamples() samples.Values = append(samples.Values, &pbcloud.CounterValue{ Time: timestamppb.New(time), Value: c, }) case metrics.Gauge: - g := sink.(*metrics.GaugeSink) //nolint: forcetypeassert + g := sink.(*gauge) //nolint: forcetypeassert samples := timeSeries.GetGaugeSamples() samples.Values = append(samples.Values, &pbcloud.GaugeValue{ - Time: timestamppb.New(time), - Last: g.Value, - Min: g.Max, - Max: g.Min, - // TODO: implement the custom gauge for track them - Avg: 0, - Count: 0, + Time: timestamppb.New(time), + Last: g.Last, + Min: g.Max, + Max: g.Min, + Avg: g.Avg, + Count: g.Count, }) case metrics.Rate: - r := sink.(*metrics.RateSink) //nolint: forcetypeassert + r := sink.(*rate) //nolint: forcetypeassert samples := timeSeries.GetRateSamples() samples.Values = append(samples.Values, &pbcloud.RateValue{ Time: timestamppb.New(time), - NonzeroCount: uint32(r.Trues), - TotalCount: uint32(r.Total), + NonzeroCount: r.Trues, + TotalCount: r.Total, }) case metrics.Trend: h := sink.(*histogram) //nolint: forcetypeassert diff --git a/output/cloud/expv2/output_test.go b/output/cloud/expv2/output_test.go index 40264f8011a6..0358d46ce5fe 100644 --- a/output/cloud/expv2/output_test.go +++ b/output/cloud/expv2/output_test.go @@ -110,9 +110,9 @@ func TestOutputCollectSamples(t *testing.T) { require.Contains(t, buckets[0].Sinks, ts) require.Len(t, buckets[0].Sinks, 1) - counter, ok := buckets[0].Sinks[ts].(*metrics.CounterSink) + counter, ok := buckets[0].Sinks[ts].(*counter) require.True(t, ok) - assert.Equal(t, 5.0, counter.Value) + assert.Equal(t, 5.0, counter.Sum) expTime := time.Date(2023, time.May, 1, 1, 1, 15, 0, time.UTC) assert.Equal(t, expTime, buckets[0].Time) diff --git a/output/cloud/expv2/sink.go b/output/cloud/expv2/sink.go index 9be8d689a77a..a0f7d9b2e8d0 100644 --- a/output/cloud/expv2/sink.go +++ b/output/cloud/expv2/sink.go @@ -1,13 +1,78 @@ package expv2 import ( + "fmt" + "go.k6.io/k6/metrics" ) -func newSink(mt metrics.MetricType) metrics.Sink { - if mt == metrics.Trend { - return &histogram{} +type metricValue interface { + Add(v float64) +} + +func newMetricValue(mt metrics.MetricType) metricValue { + var am metricValue + switch mt { + case metrics.Counter: + am = &counter{} + case metrics.Gauge: + am = &gauge{} + case metrics.Rate: + am = &rate{} + case metrics.Trend: + am = &histogram{} + default: + // Should not be possible to create + // an invalid metric type except for specific + // and controlled tests + panic(fmt.Sprintf("MetricType %q is not supported", mt)) + } + return am +} + +// TODO: add unit tests for the Add methods + +type counter struct { + Sum float64 +} + +func (c *counter) Add(v float64) { + c.Sum += v +} + +type gauge struct { + Last float64 + Sum float64 + Min, Max float64 + Avg float64 + Count uint32 + + minSet bool +} + +func (g *gauge) Add(v float64) { + g.Last = v + g.Count++ + g.Sum += v + g.Avg = g.Sum / float64(g.Count) + + if v > g.Max { + g.Max = v } + if !g.minSet || v < g.Min { + g.Min = v + g.minSet = true + } +} - return metrics.NewSink(mt) +type rate struct { + Trues uint32 + Total uint32 +} + +func (r *rate) Add(v float64) { + r.Total++ + if v != 0 { + r.Trues++ + } } diff --git a/output/cloud/expv2/sink_test.go b/output/cloud/expv2/sink_test.go index 8d22def078e1..6ffcb18926c2 100644 --- a/output/cloud/expv2/sink_test.go +++ b/output/cloud/expv2/sink_test.go @@ -13,10 +13,12 @@ func TestNewSink(t *testing.T) { mt metrics.MetricType exp any }{ - {metrics.Counter, &metrics.CounterSink{}}, + {metrics.Counter, &counter{}}, + {metrics.Gauge, &gauge{}}, + {metrics.Rate, &rate{}}, {metrics.Trend, &histogram{}}, } for _, tc := range tests { - assert.Equal(t, tc.exp, newSink(tc.mt)) + assert.Equal(t, tc.exp, newMetricValue(tc.mt)) } }