Skip to content

Commit

Permalink
output/cloudv2: Optimized metric sinks (#3085)
Browse files Browse the repository at this point in the history
Dedicated Sink implementations to reduce at minimum the performance costs on aggregation.
  • Loading branch information
codebien committed May 31, 2023
1 parent 7db2dbf commit 811d5d6
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 101 deletions.
28 changes: 11 additions & 17 deletions output/cloud/expv2/collect.go
Expand Up @@ -12,7 +12,7 @@ type timeBucket struct {
// TODO: for performance reasons, use directly a unix time here
// so we can avoid time->unix->time
Time time.Time
Sinks map[metrics.TimeSeries]metrics.Sink
Sinks map[metrics.TimeSeries]metricValue
}

// bucketQ is a queue for buffering the aggregated metrics
Expand Down Expand Up @@ -56,15 +56,12 @@ type collector struct {
aggregationPeriod time.Duration
waitPeriod time.Duration

// We should no longer have to handle metrics that have times long in the past.
// So, instead of a map, we can probably use a simple slice (or even an array!)
// as a ring buffer to store the aggregation buckets.
// This should save us a some time, since it would make the lookups and
// WaitPeriod checks basically O(1).
// And even if for some reason there are occasional metrics with past times
// that don't fit in the chosen ring buffer size, we could just send them
// along to the buffer unaggregated.
timeBuckets map[int64]map[metrics.TimeSeries]metrics.Sink
// we should no longer have to handle metrics that have times long in the past. So instead of a
// map, we can probably use a simple slice (or even an array!) as a ring buffer to store the
// aggregation buckets. This should save us a some time, since it would make the lookups and WaitPeriod
// checks basically O(1). And even if for some reason there are occasional metrics with past times that
// don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated
timeBuckets map[int64]map[metrics.TimeSeries]metricValue
}

func newCollector(aggrPeriod, waitPeriod time.Duration) (*collector, error) {
Expand All @@ -80,7 +77,7 @@ func newCollector(aggrPeriod, waitPeriod time.Duration) (*collector, error) {
return &collector{
bq: bucketQ{},
nowFunc: time.Now,
timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink),
timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue),
aggregationPeriod: aggrPeriod,
waitPeriod: waitPeriod,
}, nil
Expand Down Expand Up @@ -111,21 +108,18 @@ func (c *collector) collectSample(s metrics.Sample) {
// Get or create a time bucket
bucket, ok := c.timeBuckets[bucketID]
if !ok {
bucket = make(map[metrics.TimeSeries]metrics.Sink)
bucket = make(map[metrics.TimeSeries]metricValue)
c.timeBuckets[bucketID] = bucket
}

// Get or create the bucket's sinks map per time series
sink, ok := bucket[s.TimeSeries]
if !ok {
sink = newSink(s.Metric.Type)
sink = newMetricValue(s.Metric.Type)
bucket[s.TimeSeries] = sink
}

// TODO: we may consider to just pass
// the single value instead of the entire
// sample and save some memory
sink.Add(s)
sink.Add(s.Value)
}

func (c *collector) expiredBuckets() []timeBucket {
Expand Down
40 changes: 21 additions & 19 deletions output/cloud/expv2/collect_test.go
Expand Up @@ -22,7 +22,7 @@ func TestCollectorCollectSample(t *testing.T) {
c := collector{
aggregationPeriod: 3 * time.Second,
waitPeriod: 1 * time.Second,
timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink),
timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue),
nowFunc: func() time.Time {
return time.Unix(31, 0)
},
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestCollectorCollectSampleAggregateNumbers(t *testing.T) {
c := collector{
aggregationPeriod: 3 * time.Second,
waitPeriod: 1 * time.Second,
timeBuckets: make(map[int64]map[metrics.TimeSeries]metrics.Sink),
timeBuckets: make(map[int64]map[metrics.TimeSeries]metricValue),
nowFunc: func() time.Time {
return time.Unix(31, 0)
},
Expand Down Expand Up @@ -83,9 +83,9 @@ func TestCollectorCollectSampleAggregateNumbers(t *testing.T) {
assert.Contains(t, c.timeBuckets, int64(3))
assert.Contains(t, c.timeBuckets, int64(4))

sink, ok := c.timeBuckets[4][ts].(*metrics.CounterSink)
sink, ok := c.timeBuckets[4][ts].(*counter)
require.True(t, ok)
assert.Equal(t, 7.0, sink.Value)
assert.Equal(t, 7.0, sink.Sum)
}

func TestDropExpiringDelay(t *testing.T) {
Expand All @@ -105,7 +105,7 @@ func TestCollectorExpiredBucketsNoExipired(t *testing.T) {
nowFunc: func() time.Time {
return time.Unix(10, 0)
},
timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{
timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{
6: {},
},
}
Expand Down Expand Up @@ -134,21 +134,23 @@ func TestCollectorExpiredBuckets(t *testing.T) {
nowFunc: func() time.Time {
return time.Unix(10, 0)
},
timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{
timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{
3: {
ts1: &metrics.CounterSink{Value: 10},
ts2: &metrics.CounterSink{Value: 4},
ts1: &counter{Sum: 10},
ts2: &counter{Sum: 4},
},
},
}
expired := c.expiredBuckets()
require.Len(t, expired, 1)

assert.NotZero(t, expired[0].Time)
assert.Equal(t, expired[0].Sinks, map[metrics.TimeSeries]metrics.Sink{
ts1: &metrics.CounterSink{Value: 10},
ts2: &metrics.CounterSink{Value: 4},
})

exp := map[metrics.TimeSeries]metricValue{
ts1: &counter{Sum: 10},
ts2: &counter{Sum: 4},
}
assert.Equal(t, exp, expired[0].Sinks)
}

func TestCollectorExpiredBucketsCutoff(t *testing.T) {
Expand All @@ -160,7 +162,7 @@ func TestCollectorExpiredBucketsCutoff(t *testing.T) {
nowFunc: func() time.Time {
return time.Unix(10, 0)
},
timeBuckets: map[int64]map[metrics.TimeSeries]metrics.Sink{
timeBuckets: map[int64]map[metrics.TimeSeries]metricValue{
3: {},
6: {},
9: {},
Expand Down Expand Up @@ -249,9 +251,9 @@ func TestBucketQPopAll(t *testing.T) {
func TestBucketQPushPopConcurrency(t *testing.T) {
t.Parallel()
var (
counter = 0
bq = bucketQ{}
sink = metrics.NewSink(metrics.Counter)
count = 0
bq = bucketQ{}
sink = &counter{}

stop = time.After(100 * time.Millisecond)
pop = make(chan struct{}, 10)
Expand All @@ -278,17 +280,17 @@ func TestBucketQPushPopConcurrency(t *testing.T) {
close(done)
return
default:
counter++
count++
bq.Push([]timeBucket{
{
Time: now,
Sinks: map[metrics.TimeSeries]metrics.Sink{
Sinks: map[metrics.TimeSeries]metricValue{
{}: sink,
},
},
})

if counter%5 == 0 { // a fixed-arbitrary flush rate
if count%5 == 0 { // a fixed-arbitrary flush rate
pop <- struct{}{}
}
}
Expand Down
4 changes: 2 additions & 2 deletions output/cloud/expv2/flush_test.go
Expand Up @@ -25,8 +25,8 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {

tb := timeBucket{
Time: time.Unix(1, 0),
Sinks: map[metrics.TimeSeries]metrics.Sink{
timeSeries: &metrics.CounterSink{},
Sinks: map[metrics.TimeSeries]metricValue{
timeSeries: &counter{},
},
}
msb := newMetricSetBuilder("testrunid-123", 1)
Expand Down
18 changes: 2 additions & 16 deletions output/cloud/expv2/hdr.go
Expand Up @@ -5,7 +5,6 @@ import (
"math/bits"
"time"

"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -80,11 +79,6 @@ type histogram struct {
Count uint32
}

// newHistogram creates an histogram of the provided values.
func newHistogram() histogram {
return histogram{}
}

// addToBucket increments the counter of the bucket of the provided value.
// If the value is lower or higher than the trackable limits
// then it is counted into specific buckets. All the stats are also updated accordingly.
Expand Down Expand Up @@ -265,14 +259,6 @@ func resolveBucketIndex(val float64) uint32 {
return (nkdiff << k) + (upscaled >> nkdiff)
}

func (h *histogram) IsEmpty() bool {
return h.Count == 0
}

func (h *histogram) Add(s metrics.Sample) {
h.addToBucket(s.Value)
}

func (h *histogram) Format(time.Duration) map[string]float64 {
panic("output/cloud/expv2/histogram.Format is not expected to be called")
func (h *histogram) Add(v float64) {
h.addToBucket(v)
}
32 changes: 15 additions & 17 deletions output/cloud/expv2/hdr_test.go
Expand Up @@ -42,7 +42,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
t.Parallel()

// Zero as value
res := newHistogram()
res := histogram{}
res.addToBucket(0)
exp := histogram{
Buckets: []uint32{1},
Expand All @@ -58,7 +58,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
require.Equal(t, exp, res)

// Add a lower bucket index within slice capacity
res = newHistogram()
res = histogram{}
res.addToBucket(8)
res.addToBucket(5)

Expand All @@ -76,7 +76,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
require.Equal(t, exp, res)

// Add a higher bucket index within slice capacity
res = newHistogram()
res = histogram{}
res.addToBucket(100)
res.addToBucket(101)

Expand All @@ -94,7 +94,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
require.Equal(t, exp, res)

// Same case but reversed test check
res = newHistogram()
res = histogram{}
res.addToBucket(101)
res.addToBucket(100)

Expand All @@ -112,7 +112,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
assert.Equal(t, exp, res)

// One more complex case with lower index and more than two indexes
res = newHistogram()
res = histogram{}
res.addToBucket(8)
res.addToBucket(9)
res.addToBucket(10)
Expand All @@ -136,7 +136,7 @@ func TestNewHistogramWithSimpleValue(t *testing.T) {
func TestNewHistogramWithUntrackables(t *testing.T) {
t.Parallel()

res := newHistogram()
res := histogram{}
for _, v := range []float64{5, -3.14, 2 * 1e9, 1} {
res.addToBucket(v)
}
Expand All @@ -158,7 +158,7 @@ func TestNewHistogramWithUntrackables(t *testing.T) {
func TestNewHistogramWithMultipleValues(t *testing.T) {
t.Parallel()

res := newHistogram()
res := histogram{}
for _, v := range []float64{51.8, 103.6, 103.6, 103.6, 103.6} {
res.addToBucket(v)
}
Expand All @@ -181,7 +181,7 @@ func TestNewHistogramWithMultipleValues(t *testing.T) {
func TestNewHistogramWithNegativeNum(t *testing.T) {
t.Parallel()

res := newHistogram()
res := histogram{}
res.addToBucket(-2.42314)

exp := histogram{
Expand All @@ -199,7 +199,7 @@ func TestNewHistogramWithNegativeNum(t *testing.T) {

func TestNewHistogramWithMultipleNegativeNums(t *testing.T) {
t.Parallel()
res := newHistogram()
res := histogram{}
for _, v := range []float64{-0.001, -0.001, -0.001} {
res.addToBucket(v)
}
Expand All @@ -220,7 +220,7 @@ func TestNewHistogramWithMultipleNegativeNums(t *testing.T) {
func TestNewHistoramWithNoVals(t *testing.T) {
t.Parallel()

res := newHistogram()
res := histogram{}
exp := histogram{
Buckets: nil,
FirstNotZeroBucket: 0,
Expand Down Expand Up @@ -267,17 +267,14 @@ func TestHistogramAsProto(t *testing.T) {
cases := []struct {
name string
vals []float64
in histogram
exp *pbcloud.TrendHdrValue
}{
{
name: "empty histogram",
in: histogram{},
exp: &pbcloud.TrendHdrValue{},
},
{
name: "not trackable values",
in: newHistogram(),
vals: []float64{-0.23, 1<<30 + 1},
exp: &pbcloud.TrendHdrValue{
Count: 2,
Expand All @@ -287,11 +284,11 @@ func TestHistogramAsProto(t *testing.T) {
LowerCounterIndex: 0,
MinValue: -0.23,
MaxValue: 1<<30 + 1,
Sum: (1 << 30) + 1 - 0.23,
},
},
{
name: "normal values",
in: newHistogram(),
vals: []float64{2, 1.1, 3},
exp: &pbcloud.TrendHdrValue{
Count: 3,
Expand All @@ -301,18 +298,19 @@ func TestHistogramAsProto(t *testing.T) {
LowerCounterIndex: 2,
MinValue: 1.1,
MaxValue: 3,
Sum: 6.1,
},
},
}

for _, tc := range cases {
h := histogram{}
for _, v := range tc.vals {
tc.in.addToBucket(v)
h.addToBucket(v)
}
tc.exp.MinResolution = 1.0
tc.exp.SignificantDigits = 2
tc.exp.Time = &timestamppb.Timestamp{Seconds: 1}
tc.exp.Sum = tc.in.Sum
assert.Equal(t, tc.exp, histogramAsProto(&tc.in, time.Unix(1, 0)), tc.name)
assert.Equal(t, tc.exp, histogramAsProto(&h, time.Unix(1, 0)), tc.name)
}
}
4 changes: 3 additions & 1 deletion output/cloud/expv2/integration/testdata/metricset.json
Expand Up @@ -57,7 +57,9 @@
"time": "2023-05-01T02:00:00Z",
"last": 3.14,
"min": 3.14,
"max": 3.14
"max": 3.14,
"avg": 3.14,
"count": 1
}
]
}
Expand Down

0 comments on commit 811d5d6

Please sign in to comment.