diff --git a/output/cloud/expv2/hdr.go b/output/cloud/expv2/hdr.go new file mode 100644 index 00000000000..fc59f1e9617 --- /dev/null +++ b/output/cloud/expv2/hdr.go @@ -0,0 +1,178 @@ +package expv2 + +import ( + "math" + "time" + + "go.k6.io/k6/output/cloud/expv2/pbcloud" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + // lowestTrackable represents the minimum value that the histogram tracks. + // Essentially, it excludes negative numbers. + // Most of metrics tracked by histograms are durations + // where we don't expect negative numbers. + // + // In the future, we may expand and include them, + // probably after https://github.com/grafana/k6/issues/763. + lowestTrackable = 0 + + // highestTrackable represents the maximum + // value that the histogram is able to track with high accuracy (0.1% of error). + // It should be a high enough + // and rationale value for the k6 context; 2^30 = 1073741824 + highestTrackable = 1 << 30 +) + +type histogram struct { + Buckets []uint32 + ExtraLowBucket uint32 + ExtraHighBucket uint32 + FirstNotZeroBucket uint32 + LastNotZeroBucket uint32 + + Max float64 + Min float64 + Sum float64 + Count uint32 +} + +// newHistogram creates an HDR histogram of the provided values. +func newHistogram(values []float64) histogram { + h := histogram{} + if len(values) < 1 { + return h + } + + for i := 0; i < len(values); i++ { + h.addToBucket(values[i]) + } + + h.trimzeros() + return h +} + +func (h *histogram) addToBucket(v float64) { + if h.Count == 0 { + h.Max, h.Min = v, v + } else { + if v > h.Max { + h.Max = v + } + if v < h.Min { + h.Min = v + } + } + + h.Count++ + h.Sum += v + + if v > highestTrackable { + h.ExtraHighBucket++ + return + } + if v < lowestTrackable { + h.ExtraLowBucket++ + return + } + + index := resolveBucketIndex(v) + blen := uint32(len(h.Buckets)) + if blen == 0 { + h.FirstNotZeroBucket = index + h.LastNotZeroBucket = index + } else { + if index < h.FirstNotZeroBucket { + h.FirstNotZeroBucket = index + } + if index > h.LastNotZeroBucket { + h.LastNotZeroBucket = index + } + } + + if index >= blen { + // TODO: evaluate if a pool can improve + // expand with zeros up to the required index + h.Buckets = append(h.Buckets, make([]uint32, index-blen+1)...) + } + h.Buckets[index]++ +} + +// trimzeros removes all buckets that have a zero value +// from the begin and from the end until +// / they find the first not zero bucket. +func (h *histogram) trimzeros() { + if h.Count < 1 || len(h.Buckets) < 1 { + return + } + + // all the counters are set to zero, we can remove all + if h.FirstNotZeroBucket == 0 && h.LastNotZeroBucket == 0 { + h.Buckets = []uint32{} + return + } + + h.Buckets = h.Buckets[h.FirstNotZeroBucket : h.LastNotZeroBucket+1] +} + +// histogramAsProto converts the histogram into the equivalent Protobuf. +func histogramAsProto(h histogram, time time.Time) *pbcloud.TrendHdrValue { + hval := &pbcloud.TrendHdrValue{ + Time: timestamppb.New(time), + MinResolution: 1.0, + SignificantDigits: 2, + LowerCounterIndex: h.FirstNotZeroBucket, + MinValue: h.Min, + MaxValue: h.Max, + Sum: h.Sum, + Count: h.Count, + Counters: h.Buckets, + } + if h.ExtraLowBucket > 0 { + hval.ExtraLowValuesCounter = &h.ExtraLowBucket + } + if h.ExtraHighBucket > 0 { + hval.ExtraHighValuesCounter = &h.ExtraHighBucket + } + return hval +} + +// resolveBucketIndex returns the relative index +// in the bucket series for the provided value. +func resolveBucketIndex(val float64) uint32 { + // the lowest trackable value is zero + // negative number are not expected + if val < 0 { + return 0 + } + + upscaled := int32(math.Ceil(val)) + bucketIdx := upscaled + + //# k is a power of 2 closest to 10^precision_points + //# i.e 2^7 = 128 ~ 100 = 10^2 + //# 2^10 = 1024 ~ 1000 = 10^3 + //# f(x) = 3*x + 1 - empiric formula that works for us + //# since f(2)=7 and f(3)=10 + const k = 7 + + // if upscaled_val >= 1<<(k+1) { + // 256 = 1<<(7+1) = 1<<(7+1) + if upscaled >= 256 { + //# Here we use some math to get simple formula + //# derivation: + //# let n = msb(u) - most significant digit position + //# i.e. n = floor(log(u, 2)) + //# major_bucket_index = n - k + 1 + //# sub_bucket_index = u>>(n - k) - (1<>(n-k) - (1<>(n-k) + //# + nk_diff := int32(math.Floor(math.Log2(float64(upscaled >> k)))) + bucketIdx = (nk_diff << k) + (upscaled >> nk_diff) + } + + return uint32(bucketIdx) +} diff --git a/output/cloud/expv2/hdr_test.go b/output/cloud/expv2/hdr_test.go new file mode 100644 index 00000000000..245d93f4b2c --- /dev/null +++ b/output/cloud/expv2/hdr_test.go @@ -0,0 +1,233 @@ +package expv2 + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.k6.io/k6/output/cloud/expv2/pbcloud" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestValueBacket(t *testing.T) { + t.Parallel() + + tests := []struct { + in float64 + exp uint32 + }{ + {in: -1029, exp: 0}, + {in: -12, exp: 0}, + {in: -0.82673, exp: 0}, + {in: 10, exp: 10}, + {in: 12, exp: 12}, + {in: 12.5, exp: 13}, + {in: 20, exp: 20}, + {in: 255, exp: 255}, + {in: 256, exp: 256}, + {in: 282.29, exp: 269}, + {in: 1029, exp: 512}, + {in: (1 << 30) - 1, exp: 3071}, + {in: (1 << 30), exp: 3072}, + {in: math.MaxInt32, exp: 3199}, + {in: math.MaxInt32 + 1, exp: 2147483648}, // int32 overflow + } + for _, tc := range tests { + assert.Equal(t, int(tc.exp), int(resolveBucketIndex(tc.in)), tc.in) + } +} + +func TestNewHistogramWithSimpleValue(t *testing.T) { + t.Parallel() + res := newHistogram([]float64{100}) + + exp := histogram{ + Buckets: []uint32{1}, + FirstNotZeroBucket: 100, + LastNotZeroBucket: 100, + ExtraLowBucket: 0, + ExtraHighBucket: 0, + Max: 100, + Min: 100, + Sum: 100, + Count: 1, + } + assert.Equal(t, exp, res) +} + +func TestNewHistogramWithUntrackables(t *testing.T) { + t.Parallel() + res := newHistogram([]float64{5, -3.14, 2 * 1e9, 1}) + + exp := histogram{ + Buckets: []uint32{1, 0, 0, 0, 1}, + FirstNotZeroBucket: 1, + LastNotZeroBucket: 5, + ExtraLowBucket: 1, + ExtraHighBucket: 1, + Max: 2 * 1e9, + Min: -3.14, + Sum: 2*1e9 + 5 + 1 - 3.14, + Count: 4, + } + assert.Equal(t, exp, res) +} + +func TestNewHistogramWithMultipleValues(t *testing.T) { + t.Parallel() + res := newHistogram([]float64{51.8, 103.6, 103.6, 103.6, 103.6}) + + exp := histogram{ + FirstNotZeroBucket: 52, + LastNotZeroBucket: 104, + Max: 103.6, + Min: 51.8, + ExtraLowBucket: 0, + ExtraHighBucket: 0, + Buckets: append(append([]uint32{1}, make([]uint32, 51)...), 4), + // Buckets = {1, 0 for 51 times, 4} + Sum: 466.20000000000005, + Count: 5, + } + assert.Equal(t, exp, res) +} + +func TestNewHistogramWithNegativeNum(t *testing.T) { + t.Parallel() + res := newHistogram([]float64{-2.42314}) + + exp := histogram{ + FirstNotZeroBucket: 0, + Max: -2.42314, + Min: -2.42314, + Buckets: nil, + ExtraLowBucket: 1, + ExtraHighBucket: 0, + Sum: -2.42314, + Count: 1, + } + assert.Equal(t, exp, res) +} + +func TestNewHistogramWithMultipleNegativeNums(t *testing.T) { + t.Parallel() + res := newHistogram([]float64{-0.001, -0.001, -0.001}) + + exp := histogram{ + Buckets: nil, + FirstNotZeroBucket: 0, + ExtraLowBucket: 3, + ExtraHighBucket: 0, + Max: -0.001, + Min: -0.001, + Sum: -0.003, + Count: 3, + } + assert.Equal(t, exp, res) +} + +func TestNewHistoramWithNoVals(t *testing.T) { + t.Parallel() + res := newHistogram([]float64{}) + exp := histogram{ + Buckets: nil, + FirstNotZeroBucket: 0, + ExtraLowBucket: 0, + ExtraHighBucket: 0, + Max: 0, + Min: 0, + Sum: 0, + } + assert.Equal(t, exp, res) +} + +func TestHistogramTrimzeros(t *testing.T) { + t.Parallel() + + cases := []struct { + in histogram + exp []uint32 + }{ + {in: histogram{Buckets: []uint32{}}, exp: []uint32{}}, + {in: histogram{Buckets: []uint32{0}}, exp: []uint32{}}, + {in: histogram{Buckets: []uint32{0, 0, 0}}, exp: []uint32{}}, + { + in: histogram{ + Buckets: []uint32{0, 0, 0, 0, 0, 0, 1, 0}, + FirstNotZeroBucket: 6, + LastNotZeroBucket: 6, + }, + exp: []uint32{1}, + }, + { + in: histogram{ + Buckets: []uint32{0, 0, 0, 1, 9, 0, 0, 1, 0, 0, 0}, + FirstNotZeroBucket: 3, + LastNotZeroBucket: 7, + }, + exp: []uint32{1, 9, 0, 0, 1}, + }, + } + + for _, tc := range cases { + h := tc.in + h.Count = 1 + h.trimzeros() + assert.Equal(t, tc.exp, h.Buckets, tc.in.Buckets) + } +} + +func TestHistogramAsProto(t *testing.T) { + t.Parallel() + + uint32ptr := func(v uint32) *uint32 { + return &v + } + + cases := []struct { + name string + in histogram + exp *pbcloud.TrendHdrValue + }{ + { + name: "empty histogram", + in: histogram{}, + exp: &pbcloud.TrendHdrValue{}, + }, + { + name: "not trackable values", + in: newHistogram([]float64{-0.23, 1<<30 + 1}), + exp: &pbcloud.TrendHdrValue{ + Count: 2, + ExtraLowValuesCounter: uint32ptr(1), + ExtraHighValuesCounter: uint32ptr(1), + Counters: nil, + LowerCounterIndex: 0, + MinValue: -0.23, + MaxValue: 1<<30 + 1, + }, + }, + { + name: "normal values", + in: newHistogram([]float64{2, 1.1, 3}), + exp: &pbcloud.TrendHdrValue{ + Count: 3, + ExtraLowValuesCounter: nil, + ExtraHighValuesCounter: nil, + Counters: []uint32{2, 1}, + LowerCounterIndex: 2, + MinValue: 1.1, + MaxValue: 3, + }, + }, + } + + for _, tc := range cases { + tc.exp.MinResolution = 1.0 + tc.exp.SignificantDigits = 2 + tc.exp.Time = ×tamppb.Timestamp{Seconds: 1} + tc.exp.Sum = tc.in.Sum + assert.Equal(t, tc.exp, histogramAsProto(tc.in, time.Unix(1, 0)), tc.name) + } +} diff --git a/output/cloud/expv2/metrics_client.go b/output/cloud/expv2/metrics_client.go index d12f99c1f75..08122d6f8b7 100644 --- a/output/cloud/expv2/metrics_client.go +++ b/output/cloud/expv2/metrics_client.go @@ -104,11 +104,17 @@ func (mc *MetricsClient) Push(ctx context.Context, referenceID string, samples * return nil } +const b100KiB = 100 * 1024 + func newRequestBody(data *pbcloud.MetricSet) ([]byte, error) { b, err := proto.Marshal(data) if err != nil { return nil, fmt.Errorf("encoding series as protobuf write request failed: %w", err) } + if len(b) > b100KiB { + return nil, fmt.Errorf("the protobuf message is too large to be handled from the cloud processor; "+ + "size: %d, limit: 1MB", len(b)) + } if snappy.MaxEncodedLen(len(b)) < 0 { return nil, fmt.Errorf("the protobuf message is too large to be handled by Snappy encoder; "+ "size: %d, limit: %d", len(b), 0xffffffff) diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index 8f89b77c574..50e1e8d1b40 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -109,7 +109,6 @@ func (o *Output) flushMetrics() { ctx, cancel := context.WithTimeout(context.Background(), o.config.MetricPushInterval.TimeDuration()) defer cancel() - err := o.client.Push(ctx, o.referenceID, &pbcloud.MetricSet{Metrics: metricSet}) if err != nil { o.logger.WithError(err).Error("failed to push metrics to the cloud") @@ -194,11 +193,6 @@ func (as *aggregatedSamples) MapAsProto(refID string) []*pbcloud.TimeSeries { } pbseries := make([]*pbcloud.TimeSeries, 0, len(as.Samples)) for ts, samples := range as.Samples { - if ts.Metric.Type == metrics.Trend { - // skip trend metrics - continue - } - pb := pbcloud.TimeSeries{} // TODO: optimize removing Map // and using https://github.com/grafana/k6/issues/2764 @@ -209,7 +203,6 @@ func (as *aggregatedSamples) MapAsProto(refID string) []*pbcloud.TimeSeries { pb.Labels = append(pb.Labels, &pbcloud.Label{Name: ktag, Value: vtag}) } - // TODO: extend with other missing types switch ts.Metric.Type { case metrics.Counter: counterSamples := &pbcloud.CounterSamples{} @@ -254,7 +247,18 @@ func (as *aggregatedSamples) MapAsProto(refID string) []*pbcloud.TimeSeries { RateSamples: rateSamples, } case metrics.Trend: - // TODO: implement the HDR histogram mapping + trendSamples := &pbcloud.TrendHdrSamples{} + for _, trendSample := range samples { + hdrValue := histogramAsProto( + newHistogram([]float64{trendSample.Value}), + trendSample.Time, + ) + trendSamples.Values = append(trendSamples.Values, hdrValue) + } + + pb.Samples = &pbcloud.TimeSeries_TrendHdrSamples{ + TrendHdrSamples: trendSamples, + } } pbseries = append(pbseries, &pb) } diff --git a/output/cloud/expv2/output_test.go b/output/cloud/expv2/output_test.go index ca0e69b029f..c599fa25eef 100644 --- a/output/cloud/expv2/output_test.go +++ b/output/cloud/expv2/output_test.go @@ -134,16 +134,49 @@ func TestAggregatedSamplesMapAsProto(t *testing.T) { }, }, }, - // {mtyp: metrics.Trend}, + { + mtyp: metrics.Trend, + expmap: &pbcloud.TimeSeries{ + Labels: append([]*pbcloud.Label{{Name: "__name__", Value: "metrictrend"}}, expLabels...), + Samples: &pbcloud.TimeSeries_TrendHdrSamples{ + TrendHdrSamples: &pbcloud.TrendHdrSamples{ + Values: []*pbcloud.TrendHdrValue{ + { + Time: timestamppb.New(now), + MaxValue: 42.0, + MinValue: 42.0, + Counters: []uint32{1}, + MinResolution: float64(1), + LowerCounterIndex: 42, + SignificantDigits: 2.0, + Sum: 42.0, + Count: 1, + }, + { + Time: timestamppb.New(now), + MaxValue: 42.0, + MinValue: 42.0, + Counters: []uint32{1}, + MinResolution: float64(1), + LowerCounterIndex: 42, + SignificantDigits: 2.0, + Sum: 42.0, + Count: 1, + }, + }, + }, + }, + }, + }, } - r := metrics.NewRegistry() - for _, tc := range tests { tc := tc t.Run(tc.mtyp.String(), func(t *testing.T) { t.Parallel() + r := metrics.NewRegistry() + s1 := metrics.Sample{ TimeSeries: metrics.TimeSeries{ Metric: r.MustNewMetric(fmt.Sprintf("metric%s", tc.mtyp.String()), tc.mtyp), @@ -158,8 +191,10 @@ func TestAggregatedSamplesMapAsProto(t *testing.T) { s1.TimeSeries: {&s1, &s1}, }, } + pbsamples := aggSamples.MapAsProto("fake-test-id") require.Len(t, pbsamples, 1) + assert.Equal(t, tc.expmap.Labels, pbsamples[0].Labels) assert.Equal(t, tc.expmap.Samples, pbsamples[0].Samples) })