Skip to content

Commit

Permalink
output/cloud/v2: Dedicated Sink implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed May 27, 2023
1 parent ede28b0 commit eb0c005
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 71 deletions.
28 changes: 11 additions & 17 deletions output/cloud/expv2/collect.go
Expand Up @@ -12,7 +12,7 @@ type timeBucket struct {
// TODO: for performance reasons, use directly a unix time here
// so we can avoid time->unix->time
Time time.Time
Sinks map[metrics.TimeSeries]metrics.Sink
Sinks map[metrics.TimeSeries]metricValue
}

// bucketQ is a queue for buffering the aggregated metrics
Expand Down Expand Up @@ -56,15 +56,12 @@ type collector struct {
aggregationPeriod time.Duration
waitPeriod time.Duration

// We should no longer have to handle metrics that have times long in the past.
// So, instead of a map, we can probably use a simple slice (or even an array!)
// as a ring buffer to store the aggregation buckets.
// This should save us a some time, since it would make the lookups and
// WaitPeriod checks basically O(1).
// And even if for some reason there are occasional metrics with past times
// that don't fit in the chosen ring buffer size, we could just send them
// along to the buffer unaggregated.
timeBuckets map[int64]map[metrics.TimeSeries]metrics.Sink
// we should no longer have to handle metrics that have times long in the past. So instead of a
// map, we can probably use a simple slice (or even an array!) as a ring buffer to store the
// aggregation buckets. This should save us a some time, since it would make the lookups and WaitPeriod
// checks basically O(1). And even if for some reason there are occasional metrics with past times that
// don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated
timeBuckets map[int64]map[metrics.TimeSeries]metricValue
}

func newCollector(aggrPeriod, waitPeriod time.Duration) (*collector, error) {
Expand All @@ -80,7 +77,7 @@ func newCollector(aggrPeriod, waitPeriod time.Duration) (*collector, error) {
return &collector{
bq: bucketQ{},
nowFunc: time.Now,
timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink),
timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue),
aggregationPeriod: aggrPeriod,
waitPeriod: waitPeriod,
}, nil
Expand Down Expand Up @@ -111,21 +108,18 @@ func (c *collector) collectSample(s metrics.Sample) {
// Get or create a time bucket
bucket, ok := c.timeBuckets[bucketID]
if !ok {
bucket = make(map[metrics.TimeSeries]metrics.Sink)
bucket = make(map[metrics.TimeSeries]metricValue)
c.timeBuckets[bucketID] = bucket
}

// Get or create the bucket's sinks map per time series
sink, ok := bucket[s.TimeSeries]
if !ok {
sink = newSink(s.Metric.Type)
sink = newMetricValue(s.Metric.Type)
bucket[s.TimeSeries] = sink
}

// TODO: we may consider to just pass
// the single value instead of the entire
// sample and save some memory
sink.Add(s)
sink.Add(s.Value)
}

func (c *collector) expiredBuckets() []timeBucket {
Expand Down
40 changes: 21 additions & 19 deletions output/cloud/expv2/collect_test.go
Expand Up @@ -22,7 +22,7 @@ func TestCollectorCollectSample(t *testing.T) {
c := collector{
aggregationPeriod: 3 * time.Second,
waitPeriod: 1 * time.Second,
timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink),
timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue),
nowFunc: func() time.Time {
return time.Unix(31, 0)
},
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestCollectorCollectSampleAggregateNumbers(t *testing.T) {
c := collector{
aggregationPeriod: 3 * time.Second,
waitPeriod: 1 * time.Second,
timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink),
timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue),
nowFunc: func() time.Time {
return time.Unix(31, 0)
},
Expand Down Expand Up @@ -83,9 +83,9 @@ func TestCollectorCollectSampleAggregateNumbers(t *testing.T) {
assert.Contains(t, c.timeBuckets, int64(3))
assert.Contains(t, c.timeBuckets, int64(4))

sink, ok := c.timeBuckets[4][ts].(*metrics.CounterSink)
sink, ok := c.timeBuckets[4][ts].(*counter)
require.True(t, ok)
assert.Equal(t, 7.0, sink.Value)
assert.Equal(t, 7.0, sink.Sum)
}

func TestDropExpiringDelay(t *testing.T) {
Expand All @@ -105,7 +105,7 @@ func TestCollectorExpiredBucketsNoExipired(t *testing.T) {
nowFunc: func() time.Time {
return time.Unix(10, 0)
},
timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{
timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{
6: {},
},
}
Expand Down Expand Up @@ -134,21 +134,23 @@ func TestCollectorExpiredBuckets(t *testing.T) {
nowFunc: func() time.Time {
return time.Unix(10, 0)
},
timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{
timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{
3: {
ts1: &metrics.CounterSink{Value: 10},
ts2: &metrics.CounterSink{Value: 4},
ts1: &counter{Sum: 10},
ts2: &counter{Sum: 4},
},
},
}
expired := c.expiredBuckets()
require.Len(t, expired, 1)

assert.NotZero(t, expired[0].Time)
assert.Equal(t, expired[0].Sinks, map[metrics.TimeSeries]metrics.Sink{
ts1: &metrics.CounterSink{Value: 10},
ts2: &metrics.CounterSink{Value: 4},
})

exp := map[metrics.TimeSeries]metricValue{
ts1: &counter{Sum: 10},
ts2: &counter{Sum: 4},
}
assert.Equal(t, exp, expired[0].Sinks)
}

func TestCollectorExpiredBucketsCutoff(t *testing.T) {
Expand All @@ -160,7 +162,7 @@ func TestCollectorExpiredBucketsCutoff(t *testing.T) {
nowFunc: func() time.Time {
return time.Unix(10, 0)
},
timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{
timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{
3: {},
6: {},
9: {},
Expand Down Expand Up @@ -249,9 +251,9 @@ func TestBucketQPopAll(t *testing.T) {
func TestBucketQPushPopConcurrency(t *testing.T) {
t.Parallel()
var (
counter = 0
bq = bucketQ{}
sink = metrics.NewSink(metrics.Counter)
count = 0
bq = bucketQ{}
sink = &counter{}

stop = time.After(100 * time.Millisecond)
pop = make(chan struct{}, 10)
Expand All @@ -278,17 +280,17 @@ func TestBucketQPushPopConcurrency(t *testing.T) {
close(done)
return
default:
counter++
count++
bq.Push([]timeBucket{
{
Time: now,
Sinks: map[metrics.TimeSeries]metrics.Sink{
Sinks: map[metrics.TimeSeries]metricValue{
{}: sink,
},
},
})

if counter%5 == 0 { // a fixed-arbitrary flush rate
if count%5 == 0 { // a fixed-arbitrary flush rate
pop <- struct{}{}
}
}
Expand Down
4 changes: 2 additions & 2 deletions output/cloud/expv2/flush_test.go
Expand Up @@ -25,8 +25,8 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {

tb := timeBucket{
Time: time.Unix(1, 0),
Sinks: map[metrics.TimeSeries]metrics.Sink{
timeSeries: &metrics.CounterSink{},
Sinks: map[metrics.TimeSeries]metricValue{
timeSeries: &counter{},
},
}
msb := newMetricSetBuilder("testrunid-123", 1)
Expand Down
13 changes: 2 additions & 11 deletions output/cloud/expv2/hdr.go
Expand Up @@ -5,7 +5,6 @@ import (
"math/bits"
"time"

"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -260,14 +259,6 @@ func resolveBucketIndex(val float64) uint32 {
return (nkdiff << k) + (upscaled >> nkdiff)
}

func (h *histogram) IsEmpty() bool {
return h.Count == 0
}

func (h *histogram) Add(s metrics.Sample) {
h.addToBucket(s.Value)
}

func (h *histogram) Format(time.Duration) map[string]float64 {
panic("output/cloud/expv2/histogram.Format is not expected to be called")
func (h *histogram) Add(v float64) {
h.addToBucket(v)
}
4 changes: 3 additions & 1 deletion output/cloud/expv2/integration/testdata/metricset.json
Expand Up @@ -57,7 +57,9 @@
"time": "2023-05-01T02:00:01.500Z",
"last": 3.14,
"min": 3.14,
"max": 3.14
"max": 3.14,
"avg": 3.14,
"count": 1
}
]
}
Expand Down
25 changes: 12 additions & 13 deletions output/cloud/expv2/mapping.go
Expand Up @@ -52,39 +52,38 @@ func addBucketToTimeSeriesProto(
timeSeries *pbcloud.TimeSeries,
mt metrics.MetricType,
time time.Time,
sink metrics.Sink,
sink metricValue,
) {
if timeSeries.Samples == nil {
initTimeSeriesSamples(timeSeries, mt)
}

switch mt {
case metrics.Counter:
c := sink.(*metrics.CounterSink).Value //nolint: forcetypeassert
c := sink.(*counter).Sum //nolint: forcetypeassert
samples := timeSeries.GetCounterSamples()
samples.Values = append(samples.Values, &pbcloud.CounterValue{
Time: timestamppb.New(time),
Value: c,
})
case metrics.Gauge:
g := sink.(*metrics.GaugeSink) //nolint: forcetypeassert
g := sink.(*gauge) //nolint: forcetypeassert
samples := timeSeries.GetGaugeSamples()
samples.Values = append(samples.Values, &pbcloud.GaugeValue{
Time: timestamppb.New(time),
Last: g.Value,
Min: g.Max,
Max: g.Min,
// TODO: implement the custom gauge for track them
Avg: 0,
Count: 0,
Time: timestamppb.New(time),
Last: g.Last,
Min: g.Max,
Max: g.Min,
Avg: g.Avg,
Count: g.Count,
})
case metrics.Rate:
r := sink.(*metrics.RateSink) //nolint: forcetypeassert
r := sink.(*rate) //nolint: forcetypeassert
samples := timeSeries.GetRateSamples()
samples.Values = append(samples.Values, &pbcloud.RateValue{
Time: timestamppb.New(time),
NonzeroCount: uint32(r.Trues),
TotalCount: uint32(r.Total),
NonzeroCount: r.Trues,
TotalCount: r.Total,
})
case metrics.Trend:
h := sink.(*histogram) //nolint: forcetypeassert
Expand Down
4 changes: 2 additions & 2 deletions output/cloud/expv2/output_test.go
Expand Up @@ -110,9 +110,9 @@ func TestOutputCollectSamples(t *testing.T) {
require.Contains(t, buckets[0].Sinks, ts)
require.Len(t, buckets[0].Sinks, 1)

counter, ok := buckets[0].Sinks[ts].(*metrics.CounterSink)
counter, ok := buckets[0].Sinks[ts].(*counter)
require.True(t, ok)
assert.Equal(t, 5.0, counter.Value)
assert.Equal(t, 5.0, counter.Sum)

expTime := time.Date(2023, time.May, 1, 1, 1, 15, 0, time.UTC)
assert.Equal(t, expTime, buckets[0].Time)
Expand Down
73 changes: 69 additions & 4 deletions output/cloud/expv2/sink.go
@@ -1,13 +1,78 @@
package expv2

import (
"fmt"

"go.k6.io/k6/metrics"
)

func newSink(mt metrics.MetricType) metrics.Sink {
if mt == metrics.Trend {
return &histogram{}
type metricValue interface {
Add(v float64)
}

func newMetricValue(mt metrics.MetricType) metricValue {
var am metricValue
switch mt {
case metrics.Counter:
am = &counter{}
case metrics.Gauge:
am = &gauge{}
case metrics.Rate:
am = &rate{}
case metrics.Trend:
am = &histogram{}
default:
// Should not be possible to create
// an invalid metric type except for specific
// and controlled tests
panic(fmt.Sprintf("MetricType %q is not supported", mt))
}
return am
}

// TODO: add unit tests for the Add methods

type counter struct {
Sum float64
}

func (c *counter) Add(v float64) {
c.Sum += v
}

type gauge struct {
Last float64
Sum float64
Min, Max float64
Avg float64
Count uint32

minSet bool
}

func (g *gauge) Add(v float64) {
g.Last = v
g.Count++
g.Sum += v
g.Avg = g.Sum / float64(g.Count)

if v > g.Max {
g.Max = v
}
if !g.minSet || v < g.Min {
g.Min = v
g.minSet = true
}
}

return metrics.NewSink(mt)
type rate struct {
Trues uint32
Total uint32
}

func (r *rate) Add(v float64) {
r.Total++
if v != 0 {
r.Trues++
}
}
6 changes: 4 additions & 2 deletions output/cloud/expv2/sink_test.go
Expand Up @@ -13,10 +13,12 @@ func TestNewSink(t *testing.T) {
mt metrics.MetricType
exp any
}{
{metrics.Counter, &metrics.CounterSink{}},
{metrics.Counter, &counter{}},
{metrics.Gauge, &gauge{}},
{metrics.Rate, &rate{}},
{metrics.Trend, &histogram{}},
}
for _, tc := range tests {
assert.Equal(t, tc.exp, newSink(tc.mt))
assert.Equal(t, tc.exp, newMetricValue(tc.mt))
}
}

0 comments on commit eb0c005

Please sign in to comment.