Skip to content

Commit

Permalink
cloudv2: Create batches for single bucket flush (#3187)
Browse files Browse the repository at this point in the history
Split in multiple flushes even in the case of a single bucket with a number of series higher than the limit, not only across buckets.
  • Loading branch information
codebien committed Jul 13, 2023
1 parent d567ac2 commit ea3a23d
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 45 deletions.
83 changes: 49 additions & 34 deletions output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package expv2

import (
"context"
"time"

"github.com/sirupsen/logrus"
"go.k6.io/k6/cloudapi/insights"
Expand Down Expand Up @@ -42,26 +43,45 @@ func (f *metricsFlusher) flush() error {
// and group them by metric. To avoid doing too many loops and allocations,
// the metricSetBuilder is used for doing it during the traverse of the buckets.

var (
seriesCount int
batchesCount int
start = time.Now()
)

defer func() {
f.logger.
WithField("t", time.Since(start)).
WithField("series", seriesCount).
WithField("buckets", len(buckets)).
WithField("batches", batchesCount).Debug("Flush the queued buckets")
}()

msb := newMetricSetBuilder(f.testRunID, f.aggregationPeriodInSeconds)
for i := 0; i < len(buckets); i++ {
msb.addTimeBucket(buckets[i])
if len(msb.seriesIndex) < f.maxSeriesInBatch {
continue
}
for timeSeries, sink := range buckets[i].Sinks {
msb.addTimeSeries(buckets[i].Time, timeSeries, sink)
if len(msb.seriesIndex) < f.maxSeriesInBatch {
continue
}

// we hit the chunk size, let's flush
err := f.push(msb)
if err != nil {
return err
// we hit the batch size, let's flush
batchesCount++
seriesCount += len(msb.seriesIndex)
if err := f.push(msb); err != nil {
return err
}
msb = newMetricSetBuilder(f.testRunID, f.aggregationPeriodInSeconds)
}
msb = newMetricSetBuilder(f.testRunID, f.aggregationPeriodInSeconds)
}

if len(msb.seriesIndex) < 1 {
return nil
}

// send the last (or the unique) MetricSet chunk to the remote service
batchesCount++
seriesCount += len(msb.seriesIndex)
return f.push(msb)
}

Expand Down Expand Up @@ -125,36 +145,31 @@ func newMetricSetBuilder(testRunID string, aggrPeriodSec uint32) metricSetBuilde
return builder
}

func (msb *metricSetBuilder) addTimeBucket(bucket timeBucket) {
for timeSeries, sink := range bucket.Sinks {
pbmetric, ok := msb.metrics[timeSeries.Metric]
if !ok {
pbmetric = &pbcloud.Metric{
Name: timeSeries.Metric.Name,
Type: mapMetricTypeProto(timeSeries.Metric.Type),
}
msb.metrics[timeSeries.Metric] = pbmetric
msb.MetricSet.Metrics = append(msb.MetricSet.Metrics, pbmetric)
func (msb *metricSetBuilder) addTimeSeries(timestamp int64, timeSeries metrics.TimeSeries, sink metricValue) {
pbmetric, ok := msb.metrics[timeSeries.Metric]
if !ok {
pbmetric = &pbcloud.Metric{
Name: timeSeries.Metric.Name,
Type: mapMetricTypeProto(timeSeries.Metric.Type),
}
msb.metrics[timeSeries.Metric] = pbmetric
msb.MetricSet.Metrics = append(msb.MetricSet.Metrics, pbmetric)
}

var pbTimeSeries *pbcloud.TimeSeries
ix, ok := msb.seriesIndex[timeSeries]
if !ok {
labels, discardedLabels := mapTimeSeriesLabelsProto(timeSeries.Tags)
msb.recordDiscardedLabels(discardedLabels)
var pbTimeSeries *pbcloud.TimeSeries
if ix, ok := msb.seriesIndex[timeSeries]; !ok {
labels, discardedLabels := mapTimeSeriesLabelsProto(timeSeries.Tags)
msb.recordDiscardedLabels(discardedLabels)

pbTimeSeries = &pbcloud.TimeSeries{
Labels: labels,
}
pbmetric.TimeSeries = append(pbmetric.TimeSeries, pbTimeSeries)
msb.seriesIndex[timeSeries] = uint(len(pbmetric.TimeSeries) - 1)
} else {
pbTimeSeries = pbmetric.TimeSeries[ix]
pbTimeSeries = &pbcloud.TimeSeries{
Labels: labels,
}

addBucketToTimeSeriesProto(
pbTimeSeries, timeSeries.Metric.Type, bucket.Time, sink)
pbmetric.TimeSeries = append(pbmetric.TimeSeries, pbTimeSeries)
msb.seriesIndex[timeSeries] = uint(len(pbmetric.TimeSeries) - 1)
} else {
pbTimeSeries = pbmetric.TimeSeries[ix]
}
addBucketToTimeSeriesProto(pbTimeSeries, timeSeries.Metric.Type, timestamp, sink)
}

func (msb *metricSetBuilder) recordDiscardedLabels(labels []string) {
Expand Down
59 changes: 48 additions & 11 deletions output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,8 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {
Tags: r.RootTagSet().With("key1", "val1"),
}

tb := timeBucket{
Time: 1,
Sinks: map[metrics.TimeSeries]metricValue{
timeSeries: &counter{},
},
}
msb := newMetricSetBuilder("testrunid-123", 1)
msb.addTimeBucket(tb)
msb.addTimeSeries(1, timeSeries, &counter{})

assert.Contains(t, msb.metrics, m1)
require.Contains(t, msb.seriesIndex, timeSeries)
Expand All @@ -115,8 +109,52 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {
require.Len(t, msb.MetricSet.Metrics, 1)
assert.Len(t, msb.MetricSet.Metrics[0].TimeSeries, 1)
}
func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) {
t.Parallel()

testCases := []struct {
series int
expFlushCalls int
}{
{series: 5, expFlushCalls: 2},
{series: 2, expFlushCalls: 1},
}

r := metrics.NewRegistry()
m1 := r.MustNewMetric("metric1", metrics.Counter)
for _, tc := range testCases {
logger, _ := testutils.NewLoggerWithHook(t)

bq := &bucketQ{}
pm := &pusherMock{}
mf := metricsFlusher{
bq: bq,
client: pm,
logger: logger,
discardedLabels: make(map[string]struct{}),
maxSeriesInBatch: 3,
}

bq.buckets = make([]timeBucket, 0, tc.series)
sinks := make(map[metrics.TimeSeries]metricValue)
for i := 0; i < tc.series; i++ {
ts := metrics.TimeSeries{
Metric: m1,
Tags: r.RootTagSet().With("key1", "val"+strconv.Itoa(i)),
}

sinks[ts] = &counter{Sum: float64(1)}
}
require.Len(t, sinks, tc.series)

bq.Push([]timeBucket{{Time: 1, Sinks: sinks}})
err := mf.flush()
require.NoError(t, err)
assert.Equal(t, tc.expFlushCalls, pm.pushCalled)
}
}

func TestMetricsFlusherFlushChunk(t *testing.T) {
func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) {
t.Parallel()

testCases := []struct {
Expand Down Expand Up @@ -223,9 +261,8 @@ func TestFlushWithReservedLabels(t *testing.T) {
assert.Equal(t, 1, len(collected))

// check that warnings sown only once per label
require.Len(t, loglines, 2)
testutils.LogContains(loglines, logrus.WarnLevel, "Tag __name__ has been discarded since it is reserved for Cloud operations.")
testutils.LogContains(loglines, logrus.WarnLevel, "Tag test_run_id has been discarded since it is reserved for Cloud operations.")
assert.Len(t, testutils.FilterEntries(loglines, logrus.WarnLevel, "Tag __name__ has been discarded since it is reserved for Cloud operations."), 1)
assert.Len(t, testutils.FilterEntries(loglines, logrus.WarnLevel, "Tag test_run_id has been discarded since it is reserved for Cloud operations."), 1)

// check that flusher is not sending labels with reserved names
require.Len(t, collected[0].Metrics, 1)
Expand Down

0 comments on commit ea3a23d

Please sign in to comment.