diff --git a/CHANGELOG.md b/CHANGELOG.md index 91b8becb2c..ce58bf0038 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ * [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747 * [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653 * [FEATURE] New `-.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 * [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739 * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 * [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123 diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index c1a5d6f3a7..762296b6e1 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -30,6 +30,7 @@ import ( "github.com/grafana/mimir/pkg/storage/chunk" "github.com/grafana/mimir/pkg/storage/lazyquery" "github.com/grafana/mimir/pkg/streamingpromql" + "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/activitytracker" "github.com/grafana/mimir/pkg/util/limiter" @@ -170,7 +171,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor if cfg.EnablePromQLEngineFallback { prometheusEngine := promql.NewEngine(opts) - eng = streamingpromql.NewEngineWithFallback(streamingEngine, prometheusEngine, reg, logger) + eng = compat.NewEngineWithFallback(streamingEngine, prometheusEngine, reg, logger) } else { eng = streamingEngine } diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index 4277730a0f..c94781ee7b 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -107,13 +107,20 @@ func TestCases(metricSizes []int) []BenchCase { // Expr: "-a_X", //}, //// Binary operators. - //{ - // Expr: "a_X - b_X", - //}, - //{ - // Expr: "a_X - b_X", - // Steps: 10000, - //}, + { + Expr: "a_X - b_X", + }, + { + Expr: "a_X - b_X", + Steps: 10000, + }, + // Test the case where one side of a binary operation has many more series than the other. + { + Expr: `a_100{l=~"[13579]."} - b_100`, + }, + { + Expr: `a_2000{l=~"1..."} - b_2000`, + }, //{ // Expr: "a_X and b_X{l=~'.*[0-4]$'}", //}, @@ -163,9 +170,12 @@ func TestCases(metricSizes []int) []BenchCase { // Expr: "topk(5, a_X)", //}, //// Combinations. - //{ - // Expr: "rate(a_X[1m]) + rate(b_X[1m])", - //}, + { + Expr: "rate(a_X[1m]) + rate(b_X[1m])", + }, + { + Expr: "sum(a_X + b_X)", + }, { Expr: "sum by (le)(rate(h_X[1m]))", }, diff --git a/pkg/streamingpromql/errors.go b/pkg/streamingpromql/compat/errors.go similarity index 94% rename from pkg/streamingpromql/errors.go rename to pkg/streamingpromql/compat/errors.go index 9cebc16704..14d1000d14 100644 --- a/pkg/streamingpromql/errors.go +++ b/pkg/streamingpromql/compat/errors.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package streamingpromql +package compat import ( "errors" diff --git a/pkg/streamingpromql/fallback_engine.go b/pkg/streamingpromql/compat/fallback_engine.go similarity index 99% rename from pkg/streamingpromql/fallback_engine.go rename to pkg/streamingpromql/compat/fallback_engine.go index 28b9957a47..237714ed8b 100644 --- a/pkg/streamingpromql/fallback_engine.go +++ b/pkg/streamingpromql/compat/fallback_engine.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package streamingpromql +package compat import ( "context" diff --git a/pkg/streamingpromql/fallback_engine_test.go b/pkg/streamingpromql/compat/fallback_engine_test.go similarity index 99% rename from pkg/streamingpromql/fallback_engine_test.go rename to pkg/streamingpromql/compat/fallback_engine_test.go index ad49747b98..0d18f86634 100644 --- a/pkg/streamingpromql/fallback_engine_test.go +++ b/pkg/streamingpromql/compat/fallback_engine_test.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package streamingpromql +package compat import ( "context" diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 6b310b789b..9b3f342313 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -15,6 +15,8 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/promqltest" "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/streamingpromql/compat" ) func TestUnsupportedPromQLFeatures(t *testing.T) { @@ -26,29 +28,34 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { // The goal of this is not to list every conceivable expression that is unsupported, but to cover all the // different cases and make sure we produce a reasonable error message when these cases are encountered. unsupportedExpressions := map[string]string{ - "a + b": "PromQL expression type *parser.BinaryExpr", - "1 + 2": "scalar value as top-level expression", - "metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr", - "1": "scalar value as top-level expression", - "metric{} offset 2h": "instant vector selector with 'offset'", - "avg(metric{})": "'avg' aggregation", - "sum without(l) (metric{})": "grouping with 'without'", - "rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr", - "avg_over_time(metric{}[5m])": "'avg_over_time' function", - "-sum(metric{})": "PromQL expression type *parser.UnaryExpr", + "1 + 2": "scalar value as top-level expression", + "1 + metric{}": "binary expression with scalars", + "metric{} + 1": "binary expression with scalars", + "metric{} < other_metric{}": "binary expression with '<'", + "metric{} or other_metric{}": "binary expression with many-to-many matching", + "metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching", + "metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching", + "1": "scalar value as top-level expression", + "metric{} offset 2h": "instant vector selector with 'offset'", + "avg(metric{})": "'avg' aggregation", + "sum without(l) (metric{})": "grouping with 'without'", + "rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'", + "rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr", + "avg_over_time(metric{}[5m])": "'avg_over_time' function", + "-sum(metric{})": "PromQL expression type *parser.UnaryExpr", } for expression, expectedError := range unsupportedExpressions { t.Run(expression, func(t *testing.T) { qry, err := engine.NewRangeQuery(ctx, nil, nil, expression, time.Now().Add(-time.Hour), time.Now(), time.Minute) require.Error(t, err) - require.ErrorIs(t, err, NotSupportedError{}) + require.ErrorIs(t, err, compat.NotSupportedError{}) require.EqualError(t, err, "not supported by streaming engine: "+expectedError) require.Nil(t, qry) qry, err = engine.NewInstantQuery(ctx, nil, nil, expression, time.Now()) require.Error(t, err) - require.ErrorIs(t, err, NotSupportedError{}) + require.ErrorIs(t, err, compat.NotSupportedError{}) require.EqualError(t, err, "not supported by streaming engine: "+expectedError) require.Nil(t, qry) }) @@ -65,7 +72,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { t.Run(expression, func(t *testing.T) { qry, err := engine.NewInstantQuery(ctx, nil, nil, expression, time.Now()) require.Error(t, err) - require.ErrorIs(t, err, NotSupportedError{}) + require.ErrorIs(t, err, compat.NotSupportedError{}) require.EqualError(t, err, "not supported by streaming engine: "+expectedError) require.Nil(t, qry) }) diff --git a/pkg/streamingpromql/operator/aggregation_test.go b/pkg/streamingpromql/operator/aggregation_test.go index 331fb7e672..44ad8c9683 100644 --- a/pkg/streamingpromql/operator/aggregation_test.go +++ b/pkg/streamingpromql/operator/aggregation_test.go @@ -97,19 +97,3 @@ func labelsToSeriesMetadata(lbls []labels.Labels) []SeriesMetadata { return m } - -type testOperator struct { - series []labels.Labels -} - -func (t *testOperator) SeriesMetadata(_ context.Context) ([]SeriesMetadata, error) { - return labelsToSeriesMetadata(t.series), nil -} - -func (t *testOperator) NextSeries(_ context.Context) (InstantVectorSeriesData, error) { - panic("NextSeries() not supported") -} - -func (t *testOperator) Close() { - panic("Close() not supported") -} diff --git a/pkg/streamingpromql/operator/binary_operation.go b/pkg/streamingpromql/operator/binary_operation.go new file mode 100644 index 0000000000..bf3a027295 --- /dev/null +++ b/pkg/streamingpromql/operator/binary_operation.go @@ -0,0 +1,626 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + +package operator + +import ( + "context" + "fmt" + "math" + "slices" + "sort" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + + "github.com/grafana/mimir/pkg/streamingpromql/compat" +) + +// BinaryOperation represents a binary operation between instant vectors such as " + " or " - ". +type BinaryOperation struct { + Left InstantVectorOperator + Right InstantVectorOperator + Op parser.ItemType + + VectorMatching parser.VectorMatching + + // We need to retain these so that NextSeries() can return an error message with the series labels when + // multiple points match on a single side. + // Note that we don't retain the output series metadata: if we need to return an error message, we can compute + // the output series labels from these again. + leftMetadata []SeriesMetadata + rightMetadata []SeriesMetadata + + remainingSeries []*binaryOperationOutputSeries + leftBuffer *binaryOperationSeriesBuffer + rightBuffer *binaryOperationSeriesBuffer + opFunc binaryOperationFunc +} + +var _ InstantVectorOperator = &BinaryOperation{} + +type binaryOperationOutputSeries struct { + leftSeriesIndices []int + rightSeriesIndices []int +} + +// latestLeftSeries returns the index of the last series from the left source needed for this output series. +// +// It assumes that leftSeriesIndices is sorted in ascending order. +func (s binaryOperationOutputSeries) latestLeftSeries() int { + return s.leftSeriesIndices[len(s.leftSeriesIndices)-1] +} + +// latestRightSeries returns the index of the last series from the right source needed for this output series. +// +// It assumes that rightSeriesIndices is sorted in ascending order. +func (s binaryOperationOutputSeries) latestRightSeries() int { + return s.rightSeriesIndices[len(s.rightSeriesIndices)-1] +} + +func NewBinaryOperation(left InstantVectorOperator, right InstantVectorOperator, vectorMatching parser.VectorMatching, op parser.ItemType) (*BinaryOperation, error) { + opFunc := arithmeticOperationFuncs[op] + if opFunc == nil { + return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op)) + } + + return &BinaryOperation{ + Left: left, + Right: right, + VectorMatching: vectorMatching, + Op: op, + + opFunc: opFunc, + }, nil +} + +// SeriesMetadata returns the series expected to be produced by this operator. +// +// Note that it is possible that this method returns a series which will not have any points, as the +// list of possible output series is generated based solely on the series labels, not their data. +// +// For example, if this operator is for a range query with the expression "left_metric + right_metric", but +// left_metric has points at T=0 and T=1 in the query range, and right_metric has points at T=2 and T=3 in the +// query range, then SeriesMetadata will return a series, but NextSeries will return no points for that series. +// +// If this affects many series in the query, this may cause consuming operators to be less efficient, but in +// practice this rarely happens. +// +// (The alternative would be to compute the entire result here in SeriesMetadata and only return the series that +// contain points, but that would mean we'd need to hold the entire result in memory at once, which we want to +// avoid.) +func (b *BinaryOperation) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) { + if canProduceAnySeries, err := b.loadSeriesMetadata(ctx); err != nil { + return nil, err + } else if !canProduceAnySeries { + return nil, nil + } + + allMetadata, allSeries, leftSeriesUsed, rightSeriesUsed := b.computeOutputSeries() + b.sortSeries(allMetadata, allSeries) + b.remainingSeries = allSeries + + b.leftBuffer = newBinaryOperationSeriesBuffer(b.Left, leftSeriesUsed) + b.rightBuffer = newBinaryOperationSeriesBuffer(b.Right, rightSeriesUsed) + + return allMetadata, nil +} + +// loadSeriesMetadata loads series metadata from both sides of this operation. +// It returns false if one side returned no series and that means there is no way for this operation to return any series. +// (eg. if doing A + B and either A or B have no series, then there is no way for this operation to produce any series) +func (b *BinaryOperation) loadSeriesMetadata(ctx context.Context) (bool, error) { + // We retain the series labels for later so we can use them to generate error messages. + // We'll return them to the pool in Close(). + + var err error + b.leftMetadata, err = b.Left.SeriesMetadata(ctx) + if err != nil { + return false, err + } + + if len(b.leftMetadata) == 0 { + // FIXME: this is incorrect for 'or' + // No series on left-hand side, we'll never have any output series. + return false, nil + } + + b.rightMetadata, err = b.Right.SeriesMetadata(ctx) + if err != nil { + return false, err + } + + if len(b.rightMetadata) == 0 { + // FIXME: this is incorrect for 'or' and 'unless' + // No series on right-hand side, we'll never have any output series. + return false, nil + } + + return true, nil +} + +// computeOutputSeries determines the possible output series from this operator. +// It assumes leftMetadata and rightMetadata have already been populated. +// +// It returns: +// - a list of all possible series this operator could return +// - a corresponding list of the source series for each output series +// - a list indicating which series from the left side are needed to compute the output +// - a list indicating which series from the right side are needed to compute the output +func (b *BinaryOperation) computeOutputSeries() ([]SeriesMetadata, []*binaryOperationOutputSeries, []bool, []bool) { + labelsFunc := b.labelsFunc() + outputSeriesMap := map[string]*binaryOperationOutputSeries{} + + // TODO: is it better to use whichever side has fewer series for this first loop? Should result in a smaller map and therefore less work later on + // Would need to be careful about 'or' and 'unless' cases + for idx, s := range b.leftMetadata { + groupLabels := labelsFunc(s.Labels).String() + series, exists := outputSeriesMap[groupLabels] + + if !exists { + series = &binaryOperationOutputSeries{} + outputSeriesMap[groupLabels] = series + } + + series.leftSeriesIndices = append(series.leftSeriesIndices, idx) + } + + for idx, s := range b.rightMetadata { + groupLabels := labelsFunc(s.Labels).String() + + if series, exists := outputSeriesMap[groupLabels]; exists { + series.rightSeriesIndices = append(series.rightSeriesIndices, idx) + } + + // FIXME: if this is an 'or' operation, then we need to create the right side even if the left doesn't exist + } + + // Remove series that cannot produce samples. + for seriesLabels, outputSeries := range outputSeriesMap { + if len(outputSeries.leftSeriesIndices) == 0 || len(outputSeries.rightSeriesIndices) == 0 { + // FIXME: this is incorrect for 'or' and 'unless' + // No matching series on at least one side for this output series, so output series will have no samples. Remove it. + delete(outputSeriesMap, seriesLabels) + } + } + + allMetadata := make([]SeriesMetadata, 0, len(outputSeriesMap)) + allSeries := make([]*binaryOperationOutputSeries, 0, len(outputSeriesMap)) + leftSeriesUsed := GetBoolSlice(len(b.leftMetadata))[:len(b.leftMetadata)] + rightSeriesUsed := GetBoolSlice(len(b.rightMetadata))[:len(b.rightMetadata)] + + for _, outputSeries := range outputSeriesMap { + firstSeriesLabels := b.leftMetadata[outputSeries.leftSeriesIndices[0]].Labels + allMetadata = append(allMetadata, SeriesMetadata{Labels: labelsFunc(firstSeriesLabels)}) + allSeries = append(allSeries, outputSeries) + + for _, leftSeriesIndex := range outputSeries.leftSeriesIndices { + leftSeriesUsed[leftSeriesIndex] = true + } + + for _, rightSeriesIndex := range outputSeries.rightSeriesIndices { + rightSeriesUsed[rightSeriesIndex] = true + } + } + + return allMetadata, allSeries, leftSeriesUsed, rightSeriesUsed +} + +// sortSeries sorts metadata and series in place to try to minimise the number of input series we'll need to buffer in memory. +// +// This is critical for minimising the memory consumption of this operator: if we choose a poor ordering of series, +// we'll need to buffer many input series in memory. +// +// At present, sortSeries uses a very basic heuristic to guess the best way to sort the output series, but we could make +// this more sophisticated in the future. +func (b *BinaryOperation) sortSeries(metadata []SeriesMetadata, series []*binaryOperationOutputSeries) { + // For one-to-one matching, we assume that each output series takes one series from each side of the operator. + // If this is true, then the best order is the one in which we read from the highest cardinality side in order. + // If we do this, then in the worst case, we'll have to buffer the whole of the lower cardinality side. + // (Compare this with sorting so that we read the lowest cardinality side in order: in the worst case, we'll have + // to buffer the whole of the higher cardinality side.) + // + // FIXME: this is reasonable for one-to-one matching, but likely not for one-to-many / many-to-one. + // For one-to-many / many-to-one, it would likely be best to buffer the side used for multiple output series (the "one" side), + // as we'll need to retain these series for multiple output series anyway. + + var sortInterface sort.Interface + + if len(b.leftMetadata) < len(b.rightMetadata) { + sortInterface = newFavourRightSideSorter(metadata, series) + } else { + sortInterface = newFavourLeftSideSorter(metadata, series) + } + + sort.Sort(sortInterface) +} + +type binaryOperationOutputSorter struct { + metadata []SeriesMetadata + series []*binaryOperationOutputSeries +} + +type favourLeftSideSorter struct { + binaryOperationOutputSorter +} + +func newFavourLeftSideSorter(metadata []SeriesMetadata, series []*binaryOperationOutputSeries) favourLeftSideSorter { + return favourLeftSideSorter{binaryOperationOutputSorter{metadata, series}} +} + +type favourRightSideSorter struct { + binaryOperationOutputSorter +} + +func newFavourRightSideSorter(metadata []SeriesMetadata, series []*binaryOperationOutputSeries) favourRightSideSorter { + return favourRightSideSorter{binaryOperationOutputSorter{metadata, series}} +} + +func (g binaryOperationOutputSorter) Len() int { + return len(g.metadata) +} + +func (g binaryOperationOutputSorter) Swap(i, j int) { + g.metadata[i], g.metadata[j] = g.metadata[j], g.metadata[i] + g.series[i], g.series[j] = g.series[j], g.series[i] +} + +func (g favourLeftSideSorter) Less(i, j int) bool { + iLeft := g.series[i].latestLeftSeries() + jLeft := g.series[j].latestLeftSeries() + if iLeft != jLeft { + return iLeft < jLeft + } + + return g.series[i].latestRightSeries() < g.series[j].latestRightSeries() +} + +func (g favourRightSideSorter) Less(i, j int) bool { + iRight := g.series[i].latestRightSeries() + jRight := g.series[j].latestRightSeries() + if iRight != jRight { + return iRight < jRight + } + + return g.series[i].latestLeftSeries() < g.series[j].latestLeftSeries() +} + +// labelsFunc returns a function that computes the labels of the output group this series belongs to. +func (b *BinaryOperation) labelsFunc() func(labels.Labels) labels.Labels { + lb := labels.NewBuilder(labels.EmptyLabels()) + + if b.VectorMatching.On { + return func(l labels.Labels) labels.Labels { + lb.Reset(l) + lb.Keep(b.VectorMatching.MatchingLabels...) + return lb.Labels() + } + } + + return func(l labels.Labels) labels.Labels { + lb.Reset(l) + lb.Del(b.VectorMatching.MatchingLabels...) + lb.Del(labels.MetricName) + return lb.Labels() + } +} + +func (b *BinaryOperation) NextSeries(ctx context.Context) (InstantVectorSeriesData, error) { + if len(b.remainingSeries) == 0 { + return InstantVectorSeriesData{}, EOS + } + + thisSeries := b.remainingSeries[0] + b.remainingSeries = b.remainingSeries[1:] + + allLeftSeries, err := b.leftBuffer.getSeries(ctx, thisSeries.leftSeriesIndices) + if err != nil { + return InstantVectorSeriesData{}, err + } + + mergedLeftSide, err := b.mergeOneSide(allLeftSeries, thisSeries.leftSeriesIndices, b.leftMetadata, "left") + if err != nil { + return InstantVectorSeriesData{}, err + } + + allRightSeries, err := b.rightBuffer.getSeries(ctx, thisSeries.rightSeriesIndices) + if err != nil { + return InstantVectorSeriesData{}, err + } + + mergedRightSide, err := b.mergeOneSide(allRightSeries, thisSeries.rightSeriesIndices, b.rightMetadata, "right") + if err != nil { + return InstantVectorSeriesData{}, err + } + + return b.computeResult(mergedLeftSide, mergedRightSide), nil +} + +// mergeOneSide exists to handle the case where one side of an output series has different source series at different time steps. +// +// For example, consider the query "left_side + on (env) right_side" with the following source data: +// +// left_side{env="test", pod="a"} 1 2 _ +// left_side{env="test", pod="b"} _ _ 3 +// right_side{env="test"} 100 200 300 +// +// mergeOneSide will take in both series for left_side and return a single series with the points [1, 2, 3]. +// +// mergeOneSide is optimised for the case where there is only one source series, or the source series do not overlap, as in the example above. +// +// FIXME: for many-to-one / one-to-many matching, we could avoid re-merging each time for the side used multiple times +func (b *BinaryOperation) mergeOneSide(data []InstantVectorSeriesData, sourceSeriesIndices []int, sourceSeriesMetadata []SeriesMetadata, side string) (InstantVectorSeriesData, error) { + if len(data) == 1 { + // Fast path: if there's only one series on this side, there's no merging required. + return data[0], nil + } + + if len(data) == 0 { + return InstantVectorSeriesData{}, nil + } + + slices.SortFunc(data, func(a, b InstantVectorSeriesData) int { + return int(a.Floats[0].T - b.Floats[0].T) + }) + + mergedSize := len(data[0].Floats) + haveOverlaps := false + + // We're going to create a new slice, so return this one to the pool. + // We'll return the other slices in the for loop below. + // We must defer here, rather than at the end, as the merge loop below reslices Floats. + // FIXME: this isn't correct for many-to-one / one-to-many matching - we'll need the series again (unless we store the result of the merge) + defer PutFPointSlice(data[0].Floats) + + for i := 0; i < len(data)-1; i++ { + first := data[i] + second := data[i+1] + mergedSize += len(second.Floats) + + // We're going to create a new slice, so return this one to the pool. + // We must defer here, rather than at the end, as the merge loop below reslices Floats. + // FIXME: this isn't correct for many-to-one / one-to-many matching - we'll need the series again (unless we store the result of the merge) + defer PutFPointSlice(second.Floats) + + // Check if first overlaps with second. + // InstantVectorSeriesData.Floats is required to be sorted in timestamp order, so if the last point + // of the first series is before the first point of the second series, it cannot overlap. + if first.Floats[len(first.Floats)-1].T >= second.Floats[0].T { + haveOverlaps = true + } + } + + output := GetFPointSlice(mergedSize) + + if !haveOverlaps { + // Fast path: no overlaps, so we can just concatenate the slices together, and there's no + // need to check for conflicts either. + for _, d := range data { + output = append(output, d.Floats...) + } + + return InstantVectorSeriesData{Floats: output}, nil + } + + // Slow path: there are overlaps, so we need to merge slices together and check for conflicts as we go. + // We don't expect to have many series here, so something like a loser tree is likely unnecessary. + remainingSeries := len(data) + + for { + if remainingSeries == 1 { + // Only one series left, just copy remaining points. + for _, d := range data { + if len(d.Floats) > 0 { + output = append(output, d.Floats...) + return InstantVectorSeriesData{Floats: output}, nil + } + } + } + + nextT := int64(math.MaxInt64) + sourceSeriesIndexInData := -1 + + for seriesIndexInData, d := range data { + if len(d.Floats) == 0 { + continue + } + + nextPointInSeries := d.Floats[0] + if nextPointInSeries.T == nextT { + // Another series has a point with the same timestamp. We have a conflict. + firstConflictingSeriesLabels := sourceSeriesMetadata[sourceSeriesIndices[sourceSeriesIndexInData]].Labels + secondConflictingSeriesLabels := sourceSeriesMetadata[sourceSeriesIndices[seriesIndexInData]].Labels + groupLabels := b.labelsFunc()(firstConflictingSeriesLabels) + + return InstantVectorSeriesData{}, fmt.Errorf("found duplicate series for the match group %s on the %s side of the operation at timestamp %s: %s and %s", groupLabels, side, timestamp.Time(nextT).Format(time.RFC3339Nano), firstConflictingSeriesLabels, secondConflictingSeriesLabels) + } + + if d.Floats[0].T < nextT { + nextT = d.Floats[0].T + sourceSeriesIndexInData = seriesIndexInData + } + } + + output = append(output, data[sourceSeriesIndexInData].Floats[0]) + data[sourceSeriesIndexInData].Floats = data[sourceSeriesIndexInData].Floats[1:] + + if len(data[sourceSeriesIndexInData].Floats) == 0 { + remainingSeries-- + } + } +} + +func (b *BinaryOperation) computeResult(left InstantVectorSeriesData, right InstantVectorSeriesData) InstantVectorSeriesData { + var output []promql.FPoint + + // For one-to-one matching for arithmetic operators, reuse one of the input slices to avoid allocating another slice. + // We'll never produce more points than the smaller input side, so use that as our output slice. + // + // FIXME: this is not safe to do for one-to-many, many-to-one or many-to-many matching, as we may need the input series for later output series. + if len(left.Floats) < len(right.Floats) { + output = left.Floats[:0] + defer PutFPointSlice(right.Floats) + } else { + output = right.Floats[:0] + defer PutFPointSlice(left.Floats) + } + + nextRightIndex := 0 + + for _, leftPoint := range left.Floats { + for nextRightIndex < len(right.Floats) && right.Floats[nextRightIndex].T < leftPoint.T { + nextRightIndex++ + } + + if nextRightIndex == len(right.Floats) { + // No more points on right side. We are done. + break + } + + if leftPoint.T == right.Floats[nextRightIndex].T { + // We have matching points on both sides, compute the result. + output = append(output, promql.FPoint{ + F: b.opFunc(leftPoint.F, right.Floats[nextRightIndex].F), + T: leftPoint.T, + }) + } + } + + return InstantVectorSeriesData{ + Floats: output, + } +} + +func (b *BinaryOperation) Close() { + b.Left.Close() + b.Right.Close() + + if b.leftMetadata != nil { + PutSeriesMetadataSlice(b.leftMetadata) + } + + if b.rightMetadata != nil { + PutSeriesMetadataSlice(b.rightMetadata) + } + + if b.leftBuffer != nil { + b.leftBuffer.close() + } + + if b.rightBuffer != nil { + b.rightBuffer.close() + } +} + +// binaryOperationSeriesBuffer buffers series data until it is needed by BinaryOperation. +// +// For example, if the source operator produces series in order A, B, C, but their corresponding output series from the +// binary operation are in order B, A, C, binaryOperationSeriesBuffer will buffer the data for series A while series B is +// produced, then return series A when needed. +type binaryOperationSeriesBuffer struct { + source InstantVectorOperator + nextIndexToRead int + + // If seriesUsed[i] == true, then the series at index i is needed for this operation and should be buffered if not used immediately. + // If seriesUsed[i] == false, then the series at index i is never used and can be immediately discarded. + // FIXME: could use a bitmap here to save some memory + seriesUsed []bool + + // Stores series read but required for later series. + buffer map[int]InstantVectorSeriesData + + // Reused to avoid allocating on every call to getSeries. + output []InstantVectorSeriesData +} + +func newBinaryOperationSeriesBuffer(source InstantVectorOperator, seriesUsed []bool) *binaryOperationSeriesBuffer { + return &binaryOperationSeriesBuffer{ + source: source, + seriesUsed: seriesUsed, + buffer: map[int]InstantVectorSeriesData{}, + } +} + +// getSeries returns the data for the series in seriesIndices. +// The returned slice is only safe to use until getSeries is called again. +// seriesIndices should be sorted in ascending order to avoid unnecessary buffering. +func (b *binaryOperationSeriesBuffer) getSeries(ctx context.Context, seriesIndices []int) ([]InstantVectorSeriesData, error) { + if cap(b.output) < len(seriesIndices) { + b.output = make([]InstantVectorSeriesData, len(seriesIndices)) + } + + b.output = b.output[:len(seriesIndices)] + + for i, seriesIndex := range seriesIndices { + d, err := b.getSingleSeries(ctx, seriesIndex) + + if err != nil { + return nil, err + } + + b.output[i] = d + } + + return b.output, nil +} + +func (b *binaryOperationSeriesBuffer) getSingleSeries(ctx context.Context, seriesIndex int) (InstantVectorSeriesData, error) { + for seriesIndex > b.nextIndexToRead { + d, err := b.source.NextSeries(ctx) + if err != nil { + return InstantVectorSeriesData{}, err + } + + if b.seriesUsed[b.nextIndexToRead] { + // We need this series later, but not right now. Store it for later. + b.buffer[b.nextIndexToRead] = d + } else { + // We don't need this series at all, return the slice to the pool now. + PutFPointSlice(d.Floats) + } + + b.nextIndexToRead++ + } + + if seriesIndex == b.nextIndexToRead { + // Don't bother buffering data if we can return it directly. + b.nextIndexToRead++ + return b.source.NextSeries(ctx) + } + + d := b.buffer[seriesIndex] + delete(b.buffer, seriesIndex) + + return d, nil +} + +func (b *binaryOperationSeriesBuffer) close() { + if b.seriesUsed != nil { + PutBoolSlice(b.seriesUsed) + } +} + +type binaryOperationFunc func(left, right float64) float64 + +var arithmeticOperationFuncs = map[parser.ItemType]binaryOperationFunc{ + parser.ADD: func(left, right float64) float64 { + return left + right + }, + parser.SUB: func(left, right float64) float64 { + return left - right + }, + parser.MUL: func(left, right float64) float64 { + return left * right + }, + parser.DIV: func(left, right float64) float64 { + return left / right + }, + parser.MOD: math.Mod, + parser.POW: math.Pow, + parser.ATAN2: math.Atan2, +} diff --git a/pkg/streamingpromql/operator/binary_operation_test.go b/pkg/streamingpromql/operator/binary_operation_test.go new file mode 100644 index 0000000000..c329554ea4 --- /dev/null +++ b/pkg/streamingpromql/operator/binary_operation_test.go @@ -0,0 +1,556 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operator + +import ( + "context" + "slices" + "sort" + "strconv" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/require" +) + +// Most of the functionality of the binary operation operator is tested through the test scripts in +// pkg/streamingpromql/testdata. +// +// The merging behaviour has many edge cases, so it's easier to test it here. +func TestBinaryOperation_SeriesMerging(t *testing.T) { + testCases := map[string]struct { + input []InstantVectorSeriesData + sourceSeriesIndices []int + sourceSeriesMetadata []SeriesMetadata + + expectedOutput InstantVectorSeriesData + expectedError string + }{ + "no input series": { + input: []InstantVectorSeriesData{}, + expectedOutput: InstantVectorSeriesData{}, + }, + "single input series": { + input: []InstantVectorSeriesData{ + { + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 3, F: 30}, + }, + }, + }, + expectedOutput: InstantVectorSeriesData{ + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 3, F: 30}, + }, + }, + }, + "two input series with no overlap, series in time order": { + input: []InstantVectorSeriesData{ + { + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 3, F: 30}, + }, + }, + { + Floats: []promql.FPoint{ + {T: 4, F: 40}, + {T: 5, F: 50}, + {T: 6, F: 60}, + }, + }, + }, + expectedOutput: InstantVectorSeriesData{ + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 3, F: 30}, + {T: 4, F: 40}, + {T: 5, F: 50}, + {T: 6, F: 60}, + }, + }, + }, + "two input series with no overlap, series not in time order": { + input: []InstantVectorSeriesData{ + { + Floats: []promql.FPoint{ + {T: 4, F: 40}, + {T: 5, F: 50}, + {T: 6, F: 60}, + }, + }, + { + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 3, F: 30}, + }, + }, + }, + expectedOutput: InstantVectorSeriesData{ + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 3, F: 30}, + {T: 4, F: 40}, + {T: 5, F: 50}, + {T: 6, F: 60}, + }, + }, + }, + "three input series with no overlap": { + input: []InstantVectorSeriesData{ + { + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 3, F: 30}, + }, + }, + { + Floats: []promql.FPoint{ + {T: 4, F: 40}, + {T: 5, F: 50}, + {T: 6, F: 60}, + }, + }, + { + Floats: []promql.FPoint{ + {T: 7, F: 70}, + {T: 8, F: 80}, + {T: 9, F: 90}, + }, + }, + }, + expectedOutput: InstantVectorSeriesData{ + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 3, F: 30}, + {T: 4, F: 40}, + {T: 5, F: 50}, + {T: 6, F: 60}, + {T: 7, F: 70}, + {T: 8, F: 80}, + {T: 9, F: 90}, + }, + }, + }, + "two input series with overlap": { + input: []InstantVectorSeriesData{ + { + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 3, F: 30}, + {T: 5, F: 50}, + }, + }, + { + Floats: []promql.FPoint{ + {T: 2, F: 20}, + {T: 4, F: 40}, + {T: 6, F: 60}, + }, + }, + }, + expectedOutput: InstantVectorSeriesData{ + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 3, F: 30}, + {T: 4, F: 40}, + {T: 5, F: 50}, + {T: 6, F: 60}, + }, + }, + }, + "three input series with overlap": { + input: []InstantVectorSeriesData{ + { + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 4, F: 40}, + }, + }, + { + Floats: []promql.FPoint{ + {T: 2, F: 20}, + {T: 5, F: 50}, + }, + }, + { + Floats: []promql.FPoint{ + {T: 3, F: 30}, + {T: 6, F: 60}, + }, + }, + }, + expectedOutput: InstantVectorSeriesData{ + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 3, F: 30}, + {T: 4, F: 40}, + {T: 5, F: 50}, + {T: 6, F: 60}, + }, + }, + }, + "input series with conflict": { + input: []InstantVectorSeriesData{ + { + Floats: []promql.FPoint{ + {T: 1, F: 10}, + {T: 2, F: 20}, + {T: 5, F: 50}, + }, + }, + { + Floats: []promql.FPoint{ + {T: 6, F: 60}, + }, + }, + { + Floats: []promql.FPoint{ + {T: 2, F: 20}, + {T: 4, F: 40}, + }, + }, + }, + sourceSeriesIndices: []int{6, 9, 4}, + sourceSeriesMetadata: []SeriesMetadata{ + {labels.FromStrings("__name__", "right_side", "env", "test", "pod", "a")}, + {labels.FromStrings("__name__", "right_side", "env", "test", "pod", "b")}, + {labels.FromStrings("__name__", "right_side", "env", "test", "pod", "c")}, + {labels.FromStrings("__name__", "right_side", "env", "test", "pod", "d")}, + {labels.FromStrings("__name__", "right_side", "env", "test", "pod", "e")}, + {labels.FromStrings("__name__", "right_side", "env", "test", "pod", "f")}, + {labels.FromStrings("__name__", "right_side", "env", "test", "pod", "g")}, + {labels.FromStrings("__name__", "right_side", "env", "test", "pod", "h")}, + {labels.FromStrings("__name__", "right_side", "env", "test", "pod", "i")}, + {labels.FromStrings("__name__", "right_side", "env", "test", "pod", "j")}, + }, + expectedError: `found duplicate series for the match group {env="test"} on the right side of the operation at timestamp 1970-01-01T00:00:00.002Z: {__name__="right_side", env="test", pod="g"} and {__name__="right_side", env="test", pod="j"}`, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + o := &BinaryOperation{ + // Simulate an expression with "on (env)". + // This is used to generate error messages. + VectorMatching: parser.VectorMatching{ + On: true, + MatchingLabels: []string{"env"}, + }, + } + + result, err := o.mergeOneSide(testCase.input, testCase.sourceSeriesIndices, testCase.sourceSeriesMetadata, "right") + + if testCase.expectedError == "" { + require.NoError(t, err) + require.Equal(t, testCase.expectedOutput, result) + } else { + require.EqualError(t, err, testCase.expectedError) + } + }) + } +} + +func TestBinaryOperation_Sorting(t *testing.T) { + testCases := map[string]struct { + series []*binaryOperationOutputSeries + + expectedOrderFavouringLeftSide []int + expectedOrderFavouringRightSide []int + }{ + "no output series": { + series: []*binaryOperationOutputSeries{}, + + expectedOrderFavouringLeftSide: []int{}, + expectedOrderFavouringRightSide: []int{}, + }, + "single output series": { + series: []*binaryOperationOutputSeries{ + { + leftSeriesIndices: []int{4}, + rightSeriesIndices: []int{1}, + }, + }, + + expectedOrderFavouringLeftSide: []int{0}, + expectedOrderFavouringRightSide: []int{0}, + }, + "two output series, both with one input series, read from both sides in same order and already sorted correctly": { + series: []*binaryOperationOutputSeries{ + { + leftSeriesIndices: []int{1}, + rightSeriesIndices: []int{1}, + }, + { + leftSeriesIndices: []int{2}, + rightSeriesIndices: []int{2}, + }, + }, + + expectedOrderFavouringLeftSide: []int{0, 1}, + expectedOrderFavouringRightSide: []int{0, 1}, + }, + "two output series, both with one input series, read from both sides in same order but sorted incorrectly": { + series: []*binaryOperationOutputSeries{ + { + leftSeriesIndices: []int{2}, + rightSeriesIndices: []int{2}, + }, + { + leftSeriesIndices: []int{1}, + rightSeriesIndices: []int{1}, + }, + }, + + expectedOrderFavouringLeftSide: []int{1, 0}, + expectedOrderFavouringRightSide: []int{1, 0}, + }, + "two output series, both with one input series, read from both sides in different order": { + series: []*binaryOperationOutputSeries{ + { + leftSeriesIndices: []int{1}, + rightSeriesIndices: []int{2}, + }, + { + leftSeriesIndices: []int{2}, + rightSeriesIndices: []int{1}, + }, + }, + + expectedOrderFavouringLeftSide: []int{0, 1}, + expectedOrderFavouringRightSide: []int{1, 0}, + }, + "two output series, both with multiple input series": { + series: []*binaryOperationOutputSeries{ + { + leftSeriesIndices: []int{1, 2}, + rightSeriesIndices: []int{0, 3}, + }, + { + leftSeriesIndices: []int{0, 3}, + rightSeriesIndices: []int{1, 2}, + }, + }, + + expectedOrderFavouringLeftSide: []int{0, 1}, + expectedOrderFavouringRightSide: []int{1, 0}, + }, + "multiple output series, both with one input series, read from both sides in same order and already sorted correctly": { + series: []*binaryOperationOutputSeries{ + { + leftSeriesIndices: []int{1}, + rightSeriesIndices: []int{1}, + }, + { + leftSeriesIndices: []int{2}, + rightSeriesIndices: []int{2}, + }, + { + leftSeriesIndices: []int{3}, + rightSeriesIndices: []int{3}, + }, + }, + + expectedOrderFavouringLeftSide: []int{0, 1, 2}, + expectedOrderFavouringRightSide: []int{0, 1, 2}, + }, + "multiple output series, both with one input series, read from both sides in same order but sorted incorrectly": { + series: []*binaryOperationOutputSeries{ + { + leftSeriesIndices: []int{2}, + rightSeriesIndices: []int{2}, + }, + { + leftSeriesIndices: []int{3}, + rightSeriesIndices: []int{3}, + }, + { + leftSeriesIndices: []int{1}, + rightSeriesIndices: []int{1}, + }, + }, + + expectedOrderFavouringLeftSide: []int{2, 0, 1}, + expectedOrderFavouringRightSide: []int{2, 0, 1}, + }, + "multiple output series, both with one input series, read from both sides in different order": { + series: []*binaryOperationOutputSeries{ + { + leftSeriesIndices: []int{1}, + rightSeriesIndices: []int{2}, + }, + { + leftSeriesIndices: []int{3}, + rightSeriesIndices: []int{3}, + }, + { + leftSeriesIndices: []int{2}, + rightSeriesIndices: []int{1}, + }, + }, + + expectedOrderFavouringLeftSide: []int{0, 2, 1}, + expectedOrderFavouringRightSide: []int{2, 0, 1}, + }, + "multiple output series, with multiple input series each": { + series: []*binaryOperationOutputSeries{ + { + leftSeriesIndices: []int{4, 5, 10}, + rightSeriesIndices: []int{2, 20}, + }, + { + leftSeriesIndices: []int{2, 4, 15}, + rightSeriesIndices: []int{3, 5, 50}, + }, + { + leftSeriesIndices: []int{3, 1}, + rightSeriesIndices: []int{1, 40}, + }, + }, + + expectedOrderFavouringLeftSide: []int{2, 0, 1}, + expectedOrderFavouringRightSide: []int{0, 2, 1}, + }, + "multiple output series which depend on the same input series": { + series: []*binaryOperationOutputSeries{ + { + leftSeriesIndices: []int{1}, + rightSeriesIndices: []int{2}, + }, + { + leftSeriesIndices: []int{1}, + rightSeriesIndices: []int{1}, + }, + { + leftSeriesIndices: []int{2}, + rightSeriesIndices: []int{2}, + }, + { + leftSeriesIndices: []int{2}, + rightSeriesIndices: []int{1}, + }, + }, + + expectedOrderFavouringLeftSide: []int{1, 0, 3, 2}, + expectedOrderFavouringRightSide: []int{1, 3, 0, 2}, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + require.Len(t, testCase.expectedOrderFavouringLeftSide, len(testCase.series), "invalid test case: should have same number of input and output series for order favouring left side") + require.Len(t, testCase.expectedOrderFavouringRightSide, len(testCase.series), "invalid test case: should have same number of input and output series for order favouring right side") + + metadata := make([]SeriesMetadata, len(testCase.series)) + for i := range testCase.series { + metadata[i] = SeriesMetadata{labels.FromStrings("series", strconv.Itoa(i))} + } + + test := func(t *testing.T, series []*binaryOperationOutputSeries, metadata []SeriesMetadata, sorter sort.Interface, expectedOrder []int) { + expectedSeriesOrder := make([]*binaryOperationOutputSeries, len(series)) + expectedMetadataOrder := make([]SeriesMetadata, len(metadata)) + + for outputIndex, inputIndex := range expectedOrder { + expectedSeriesOrder[outputIndex] = series[inputIndex] + expectedMetadataOrder[outputIndex] = metadata[inputIndex] + } + + sort.Sort(sorter) + + require.Equal(t, expectedSeriesOrder, series) + require.Equal(t, expectedMetadataOrder, metadata) + } + + t.Run("sorting favouring left side", func(t *testing.T) { + series := slices.Clone(testCase.series) + metadata := slices.Clone(metadata) + sorter := newFavourLeftSideSorter(metadata, series) + test(t, series, metadata, sorter, testCase.expectedOrderFavouringLeftSide) + }) + + t.Run("sorting favouring right side", func(t *testing.T) { + series := slices.Clone(testCase.series) + metadata := slices.Clone(metadata) + sorter := newFavourRightSideSorter(metadata, series) + test(t, series, metadata, sorter, testCase.expectedOrderFavouringRightSide) + }) + }) + } +} + +func TestBinaryOperationSeriesBuffer(t *testing.T) { + series0Data := InstantVectorSeriesData{Floats: []promql.FPoint{{T: 0, F: 0}}} + series2Data := InstantVectorSeriesData{Floats: []promql.FPoint{{T: 0, F: 2}}} + series3Data := InstantVectorSeriesData{Floats: []promql.FPoint{{T: 0, F: 3}}} + series4Data := InstantVectorSeriesData{Floats: []promql.FPoint{{T: 0, F: 4}}} + series5Data := InstantVectorSeriesData{Floats: []promql.FPoint{{T: 0, F: 5}}} + series6Data := InstantVectorSeriesData{Floats: []promql.FPoint{{T: 0, F: 6}}} + + inner := &testOperator{ + series: []labels.Labels{ + labels.FromStrings("series", "0"), + labels.FromStrings("series", "1"), + labels.FromStrings("series", "2"), + labels.FromStrings("series", "3"), + labels.FromStrings("series", "4"), + labels.FromStrings("series", "5"), + labels.FromStrings("series", "6"), + }, + data: []InstantVectorSeriesData{ + series0Data, + {Floats: []promql.FPoint{{T: 0, F: 1}}}, + series2Data, + series3Data, + series4Data, + series5Data, + series6Data, + }, + } + + seriesUsed := []bool{true, false, true, true, true} + buffer := newBinaryOperationSeriesBuffer(inner, seriesUsed) + ctx := context.Background() + + // Read first series. + series, err := buffer.getSeries(ctx, []int{0}) + require.NoError(t, err) + require.Equal(t, []InstantVectorSeriesData{series0Data}, series) + require.Empty(t, buffer.buffer) // Should not buffer series that was immediately returned. + + // Read next desired series, skipping over series that won't be used. + series, err = buffer.getSeries(ctx, []int{2}) + require.NoError(t, err) + require.Equal(t, []InstantVectorSeriesData{series2Data}, series) + require.Empty(t, buffer.buffer) // Should not buffer series at index 1 that won't be used. + + // Read another desired series, skipping over a series that will be used later. + series, err = buffer.getSeries(ctx, []int{4}) + require.NoError(t, err) + require.Equal(t, []InstantVectorSeriesData{series4Data}, series) + require.Len(t, buffer.buffer, 1) // Should only have buffered a single series (index 3). + + // Read the series we just read past from the buffer. + series, err = buffer.getSeries(ctx, []int{3}) + require.NoError(t, err) + require.Equal(t, []InstantVectorSeriesData{series3Data}, series) + require.Empty(t, buffer.buffer) // Series that has been returned should be removed from buffer once it's returned. + + // Read multiple series. + series, err = buffer.getSeries(ctx, []int{5, 6}) + require.NoError(t, err) + require.Equal(t, []InstantVectorSeriesData{series5Data, series6Data}, series) +} diff --git a/pkg/streamingpromql/operator/operator.go b/pkg/streamingpromql/operator/operator.go index 6e81a81bf5..443df8b8e1 100644 --- a/pkg/streamingpromql/operator/operator.go +++ b/pkg/streamingpromql/operator/operator.go @@ -21,6 +21,7 @@ type Operator interface { // Close frees all resources associated with this operator. // Calling SeriesMetadata or NextSeries after calling Close may result in unpredictable behaviour, corruption or crashes. + // It must be safe to call Close at any time, including if SeriesMetadata or NextSeries have returned an error. Close() } @@ -70,7 +71,14 @@ type SeriesMetadata struct { } type InstantVectorSeriesData struct { - Floats []promql.FPoint + // Floats contains floating point samples for this series. + // Samples must be sorted in timestamp order, earliest timestamps first. + // Samples must not have duplicate timestamps. + Floats []promql.FPoint + + // Histograms contains histogram samples for this series. + // Samples must be sorted in timestamp order, earliest timestamps first. + // Samples must not have duplicate timestamps. Histograms []promql.HPoint } diff --git a/pkg/streamingpromql/operator/operator_test.go b/pkg/streamingpromql/operator/operator_test.go new file mode 100644 index 0000000000..bf071af3e7 --- /dev/null +++ b/pkg/streamingpromql/operator/operator_test.go @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operator + +import ( + "context" + + "github.com/prometheus/prometheus/model/labels" +) + +// Operator used only in tests. +type testOperator struct { + series []labels.Labels + data []InstantVectorSeriesData +} + +func (t *testOperator) SeriesMetadata(_ context.Context) ([]SeriesMetadata, error) { + return labelsToSeriesMetadata(t.series), nil +} + +func (t *testOperator) NextSeries(_ context.Context) (InstantVectorSeriesData, error) { + if len(t.data) == 0 { + return InstantVectorSeriesData{}, EOS + } + + d := t.data[0] + t.data = t.data[1:] + + return d, nil +} + +func (t *testOperator) Close() { + panic("Close() not supported") +} diff --git a/pkg/streamingpromql/operator/pool.go b/pkg/streamingpromql/operator/pool.go index d7b93e99f1..adb259dada 100644 --- a/pkg/streamingpromql/operator/pool.go +++ b/pkg/streamingpromql/operator/pool.go @@ -9,33 +9,35 @@ import ( ) const ( - maxExpectedPointsPerSeries = 100_000 // There's not too much science behind this number: 100000 points allows for a point per minute for just under 70 days. + maxExpectedPointsPerSeries = 100_000 // There's not too much science behind this number: 100000 points allows for a point per minute for just under 70 days. + pointsPerSeriesBucketFactor = 2.0 - maxExpectedSeriesPerResult = 10_000_000 // Likewise, there's not too much science behind this number: this is the based on examining the largest queries seen at Grafana Labs. + maxExpectedSeriesPerResult = 10_000_000 // Likewise, there's not too much science behind this number: this is the based on examining the largest queries seen at Grafana Labs. + seriesPerResultBucketFactor = 2.0 ) var ( - fPointSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, 10, func(size int) []promql.FPoint { + fPointSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, pointsPerSeriesBucketFactor, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }) - matrixPool = pool.NewBucketedPool(1, maxExpectedSeriesPerResult, 10, func(size int) promql.Matrix { + matrixPool = pool.NewBucketedPool(1, maxExpectedSeriesPerResult, seriesPerResultBucketFactor, func(size int) promql.Matrix { return make(promql.Matrix, 0, size) }) - vectorPool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, 10, func(size int) promql.Vector { + vectorPool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, pointsPerSeriesBucketFactor, func(size int) promql.Vector { return make(promql.Vector, 0, size) }) - seriesMetadataSlicePool = pool.NewBucketedPool(1, maxExpectedSeriesPerResult, 10, func(size int) []SeriesMetadata { + seriesMetadataSlicePool = pool.NewBucketedPool(1, maxExpectedSeriesPerResult, seriesPerResultBucketFactor, func(size int) []SeriesMetadata { return make([]SeriesMetadata, 0, size) }) - floatSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, 10, func(_ int) []float64 { + floatSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, pointsPerSeriesBucketFactor, func(_ int) []float64 { // Don't allocate a new slice now - we'll allocate one in GetFloatSlice if we need it, so we can differentiate between reused and new slices. return nil }) - boolSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, 10, func(_ int) []bool { + boolSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, pointsPerSeriesBucketFactor, func(_ int) []bool { // Don't allocate a new slice now - we'll allocate one in GetBoolSlice if we need it, so we can differentiate between reused and new slices. return nil }) diff --git a/pkg/streamingpromql/operator/ring_buffer.go b/pkg/streamingpromql/operator/ring_buffer.go index 8e6869ba75..f48db09aac 100644 --- a/pkg/streamingpromql/operator/ring_buffer.go +++ b/pkg/streamingpromql/operator/ring_buffer.go @@ -10,6 +10,12 @@ type RingBuffer struct { size int // Number of points in this buffer. } +var ( + // Overrides used only during tests. + getFPointSliceForRingBuffer = GetFPointSlice + putFPointSliceForRingBuffer = PutFPointSlice +) + // DiscardPointsBefore discards all points in this buffer with timestamp less than t. func (b *RingBuffer) DiscardPointsBefore(t int64) { for b.size > 0 && b.points[b.firstIndex].T < t { @@ -31,7 +37,7 @@ func (b *RingBuffer) DiscardPointsBefore(t int64) { // Callers must not modify the values in the returned slices or return them to a pool. // Calling UnsafePoints is more efficient than calling CopyPoints, as CopyPoints will create a new slice and copy all // points into the slice, whereas UnsafePoints returns a view into the internal state of this buffer. -// The returned slices are no longer valid if this buffer is modified (eg. a point is added, or the buffer is reset). +// The returned slices are no longer valid if this buffer is modified (eg. a point is added, or the buffer is reset or closed). // // FIXME: the fact we have to expose this is a bit gross, but the overhead of calling a function with ForEach is terrible. // Perhaps we can use range-over function iterators (https://go.dev/wiki/RangefuncExperiment) once this is not experimental? @@ -65,7 +71,7 @@ func (b *RingBuffer) CopyPoints(maxT int64) []promql.FPoint { } head, tail := b.UnsafePoints(maxT) - combined := GetFPointSlice(len(head) + len(tail)) + combined := getFPointSliceForRingBuffer(len(head) + len(tail)) combined = append(combined, head...) combined = append(combined, tail...) @@ -109,13 +115,13 @@ func (b *RingBuffer) Append(p promql.FPoint) { newSize = 2 } - newSlice := GetFPointSlice(newSize) + newSlice := getFPointSliceForRingBuffer(newSize) newSlice = newSlice[:cap(newSlice)] pointsAtEnd := b.size - b.firstIndex copy(newSlice, b.points[b.firstIndex:]) copy(newSlice[pointsAtEnd:], b.points[:b.firstIndex]) - PutFPointSlice(b.points) + putFPointSliceForRingBuffer(b.points) b.points = newSlice b.firstIndex = 0 } @@ -134,7 +140,7 @@ func (b *RingBuffer) Reset() { // Close releases any resources associated with this buffer. func (b *RingBuffer) Close() { b.Reset() - PutFPointSlice(b.points) + putFPointSliceForRingBuffer(b.points) b.points = nil } diff --git a/pkg/streamingpromql/operator/ring_buffer_test.go b/pkg/streamingpromql/operator/ring_buffer_test.go index 40327965d0..c22abd5804 100644 --- a/pkg/streamingpromql/operator/ring_buffer_test.go +++ b/pkg/streamingpromql/operator/ring_buffer_test.go @@ -11,6 +11,8 @@ import ( ) func TestRingBuffer(t *testing.T) { + setupRingBufferPoolFunctionsForTesting(t) + buf := &RingBuffer{} shouldHaveNoPoints(t, buf) @@ -39,19 +41,13 @@ func TestRingBuffer(t *testing.T) { buf.Append(promql.FPoint{T: 5, F: 500}) shouldHavePoints(t, buf, promql.FPoint{T: 4, F: 400}, promql.FPoint{T: 5, F: 500}) - // Trigger expansion of buffer (we resize in powers of two, but the underlying slice comes from a pool that uses a factor of 10). + // Trigger expansion of buffer (we resize in powers of two, and the underlying slice comes from a pool that uses a factor of 2 as well). // Ideally we wouldn't reach into the internals here, but this helps ensure the test is testing the correct scenario. - require.Len(t, buf.points, 10, "expected underlying slice to have length 10, if this assertion fails, the test setup is not as expected") + require.Len(t, buf.points, 2, "expected underlying slice to have length 2, if this assertion fails, the test setup is not as expected") + require.Equal(t, 2, cap(buf.points), "expected underlying slice to have capacity 2, if this assertion fails, the test setup is not as expected") buf.Append(promql.FPoint{T: 6, F: 600}) buf.Append(promql.FPoint{T: 7, F: 700}) - buf.Append(promql.FPoint{T: 8, F: 800}) - buf.Append(promql.FPoint{T: 9, F: 900}) - buf.Append(promql.FPoint{T: 10, F: 1000}) - buf.Append(promql.FPoint{T: 11, F: 1100}) - buf.Append(promql.FPoint{T: 12, F: 1200}) - buf.Append(promql.FPoint{T: 13, F: 1300}) - buf.Append(promql.FPoint{T: 14, F: 1400}) - require.Greater(t, len(buf.points), 10, "expected underlying slice to be expanded, if this assertion fails, the test setup is not as expected") + require.Greater(t, cap(buf.points), 2, "expected underlying slice to be expanded, if this assertion fails, the test setup is not as expected") shouldHavePoints(t, buf, @@ -59,13 +55,6 @@ func TestRingBuffer(t *testing.T) { promql.FPoint{T: 5, F: 500}, promql.FPoint{T: 6, F: 600}, promql.FPoint{T: 7, F: 700}, - promql.FPoint{T: 8, F: 800}, - promql.FPoint{T: 9, F: 900}, - promql.FPoint{T: 10, F: 1000}, - promql.FPoint{T: 11, F: 1100}, - promql.FPoint{T: 12, F: 1200}, - promql.FPoint{T: 13, F: 1300}, - promql.FPoint{T: 14, F: 1400}, ) buf.Reset() @@ -76,49 +65,43 @@ func TestRingBuffer(t *testing.T) { } func TestRingBuffer_DiscardPointsBefore_ThroughWrapAround(t *testing.T) { + setupRingBufferPoolFunctionsForTesting(t) + // Set up the buffer so that the first point is part-way through the underlying slice. - // We resize in powers of two, but the underlying slice comes from a pool that uses a factor of 10. + // We resize in powers of two, and the underlying slice comes from a pool that uses a factor of 2 as well. buf := &RingBuffer{} buf.Append(promql.FPoint{T: 1, F: 100}) buf.Append(promql.FPoint{T: 2, F: 200}) buf.Append(promql.FPoint{T: 3, F: 300}) buf.Append(promql.FPoint{T: 4, F: 400}) - buf.Append(promql.FPoint{T: 5, F: 500}) - buf.Append(promql.FPoint{T: 6, F: 600}) - buf.Append(promql.FPoint{T: 7, F: 700}) - buf.Append(promql.FPoint{T: 8, F: 800}) - buf.Append(promql.FPoint{T: 9, F: 900}) - buf.Append(promql.FPoint{T: 10, F: 1000}) // Ideally we wouldn't reach into the internals here, but this helps ensure the test is testing the correct scenario. - require.Len(t, buf.points, 10, "expected underlying slice to have length 10, if this assertion fails, the test setup is not as expected") - buf.DiscardPointsBefore(8) - buf.Append(promql.FPoint{T: 11, F: 1100}) - buf.Append(promql.FPoint{T: 12, F: 1200}) - buf.Append(promql.FPoint{T: 13, F: 1300}) + require.Len(t, buf.points, 4, "expected underlying slice to have length 4, if this assertion fails, the test setup is not as expected") + require.Equal(t, 4, cap(buf.points), "expected underlying slice to have capacity 4, if this assertion fails, the test setup is not as expected") + buf.DiscardPointsBefore(3) + buf.Append(promql.FPoint{T: 5, F: 500}) + buf.Append(promql.FPoint{T: 6, F: 600}) // Should not have expanded slice. - require.Len(t, buf.points, 10, "expected underlying slice to have length 10, if this assertion fails, the test setup is not as expected") + require.Len(t, buf.points, 4, "expected underlying slice to have length 4") + require.Equal(t, 4, cap(buf.points), "expected underlying slice to have capacity 4") // Discard before end of underlying slice. - buf.DiscardPointsBefore(9) + buf.DiscardPointsBefore(4) shouldHavePoints(t, buf, - promql.FPoint{T: 9, F: 900}, - promql.FPoint{T: 10, F: 1000}, - promql.FPoint{T: 11, F: 1100}, - promql.FPoint{T: 12, F: 1200}, - promql.FPoint{T: 13, F: 1300}, + promql.FPoint{T: 4, F: 400}, + promql.FPoint{T: 5, F: 500}, + promql.FPoint{T: 6, F: 600}, ) - require.Equal(t, 8, buf.firstIndex, "expected first point to be in middle of underlying slice, if this assertion fails, the test setup is not as expected") + require.Equal(t, 3, buf.firstIndex, "expected first point to be in middle of underlying slice, if this assertion fails, the test setup is not as expected") // Discard after wraparound. - buf.DiscardPointsBefore(12) + buf.DiscardPointsBefore(6) shouldHavePoints(t, buf, - promql.FPoint{T: 12, F: 1200}, - promql.FPoint{T: 13, F: 1300}, + promql.FPoint{T: 6, F: 600}, ) } @@ -177,3 +160,28 @@ func shouldHavePointsAtOrBeforeTime(t *testing.T, buf *RingBuffer, ts int64, exp require.Equal(t, expected[len(expected)-1], end) } } + +// setupRingBufferPoolFunctionsForTesting replaces the global FPoint slice pool used by RingBuffer +// with a fake for testing. +// +// This helps ensure that the tests behave as expected: the default global pool does not guarantee that +// slices returned have exactly the capacity requested. Instead, it only guarantees that slices have +// capacity at least as large as requested. This makes it difficult to consistently test scenarios like +// wraparound. +func setupRingBufferPoolFunctionsForTesting(t *testing.T) { + originalGet := getFPointSliceForRingBuffer + originalPut := putFPointSliceForRingBuffer + + getFPointSliceForRingBuffer = func(size int) []promql.FPoint { + return make([]promql.FPoint, 0, size) + } + + putFPointSliceForRingBuffer = func(_ []promql.FPoint) { + // Drop slice on the floor - we don't need it. + } + + t.Cleanup(func() { + getFPointSliceForRingBuffer = originalGet + putFPointSliceForRingBuffer = originalPut + }) +} diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index cae4ace5c6..d74cae7efd 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/util/stats" "golang.org/x/exp/slices" + "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/streamingpromql/operator" ) @@ -77,7 +78,7 @@ func newQuery(queryable storage.Queryable, opts promql.QueryOpts, qs string, sta return nil, err } default: - return nil, NewNotSupportedError(fmt.Sprintf("%s value as top-level expression", parser.DocumentedType(expr.Type()))) + return nil, compat.NewNotSupportedError(fmt.Sprintf("%s value as top-level expression", parser.DocumentedType(expr.Type()))) } return q, nil @@ -102,7 +103,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (operator.Insta } if e.OriginalOffset != 0 || e.Offset != 0 { - return nil, NewNotSupportedError("instant vector selector with 'offset'") + return nil, compat.NewNotSupportedError("instant vector selector with 'offset'") } return &operator.InstantVectorSelector{ @@ -118,7 +119,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (operator.Insta }, nil case *parser.AggregateExpr: if e.Op != parser.SUM { - return nil, NewNotSupportedError(fmt.Sprintf("'%s' aggregation", e.Op)) + return nil, compat.NewNotSupportedError(fmt.Sprintf("'%s' aggregation", e.Op)) } if e.Param != nil { @@ -127,7 +128,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (operator.Insta } if e.Without { - return nil, NewNotSupportedError("grouping with 'without'") + return nil, compat.NewNotSupportedError("grouping with 'without'") } slices.Sort(e.Grouping) @@ -146,7 +147,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (operator.Insta }, nil case *parser.Call: if e.Func.Name != "rate" { - return nil, NewNotSupportedError(fmt.Sprintf("'%s' function", e.Func.Name)) + return nil, compat.NewNotSupportedError(fmt.Sprintf("'%s' function", e.Func.Name)) } if len(e.Args) != 1 { @@ -162,13 +163,33 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (operator.Insta return &operator.RangeVectorFunction{ Inner: inner, }, nil + case *parser.BinaryExpr: + if e.LHS.Type() != parser.ValueTypeVector || e.RHS.Type() != parser.ValueTypeVector { + return nil, compat.NewNotSupportedError("binary expression with scalars") + } + + if e.VectorMatching.Card != parser.CardOneToOne { + return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with %v matching", e.VectorMatching.Card)) + } + + lhs, err := q.convertToInstantVectorOperator(e.LHS) + if err != nil { + return nil, err + } + + rhs, err := q.convertToInstantVectorOperator(e.RHS) + if err != nil { + return nil, err + } + + return operator.NewBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op) case *parser.StepInvariantExpr: // One day, we'll do something smarter here. return q.convertToInstantVectorOperator(e.Expr) case *parser.ParenExpr: return q.convertToInstantVectorOperator(e.Expr) default: - return nil, NewNotSupportedError(fmt.Sprintf("PromQL expression type %T", e)) + return nil, compat.NewNotSupportedError(fmt.Sprintf("PromQL expression type %T", e)) } } @@ -182,7 +203,7 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (operator.RangeVe vectorSelector := e.VectorSelector.(*parser.VectorSelector) if vectorSelector.OriginalOffset != 0 || vectorSelector.Offset != 0 { - return nil, NewNotSupportedError("range vector selector with 'offset'") + return nil, compat.NewNotSupportedError("range vector selector with 'offset'") } interval := q.statement.Interval @@ -208,7 +229,7 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (operator.RangeVe case *parser.ParenExpr: return q.convertToRangeVectorOperator(e.Expr) default: - return nil, NewNotSupportedError(fmt.Sprintf("PromQL expression type %T", e)) + return nil, compat.NewNotSupportedError(fmt.Sprintf("PromQL expression type %T", e)) } } @@ -251,7 +272,7 @@ func (q *Query) Exec(ctx context.Context) *promql.Result { } default: // This should be caught in newQuery above. - return &promql.Result{Err: NewNotSupportedError(fmt.Sprintf("unsupported result type %s", parser.DocumentedType(q.statement.Expr.Type())))} + return &promql.Result{Err: compat.NewNotSupportedError(fmt.Sprintf("unsupported result type %s", parser.DocumentedType(q.statement.Expr.Type())))} } return q.result diff --git a/pkg/streamingpromql/testdata/ours/binary_operators.test b/pkg/streamingpromql/testdata/ours/binary_operators.test new file mode 100644 index 0000000000..08de3d1ee3 --- /dev/null +++ b/pkg/streamingpromql/testdata/ours/binary_operators.test @@ -0,0 +1,144 @@ +# SPDX-License-Identifier: AGPL-3.0-only + +# Most cases for aggregation operators are covered already in the upstream test cases. +# These test cases cover scenarios not covered by the upstream test cases, such as range queries, or edge cases that are uniquely likely to cause issues in the streaming engine. + +# Throughout this file, we use a 6m step to avoid the default 5m lookback window. + +# Basic arithmetic operations, and atan2 +load 6m + left_side 11 21 32 42 + right_side 1 2 3 4 + +eval range from 0 to 24m step 6m left_side + right_side + {} 12 23 35 46 + +eval range from 0 to 24m step 6m left_side - right_side + {} 10 19 29 38 + +eval range from 0 to 24m step 6m left_side * right_side + {} 11 42 96 168 + +eval range from 0 to 24m step 6m left_side / right_side + {} 11 10.5 10.66666 10.5 + +eval range from 0 to 24m step 6m left_side % right_side + {} 0 1 2 2 + +eval range from 0 to 24m step 6m left_side ^ right_side + {} 11 441 32768 3111696 + +eval range from 0 to 24m step 6m left_side atan2 right_side + {} 1.4801364395941514 1.4758446204521403 1.477319545636307 1.4758446204521403 + +clear + +# One-to-one matching with all labels +load 6m + left_side{env="prod", pod="pod-abc123"} 1 2 3 4 + left_side{env="dev", pod="pod-abc123"} 10 20 30 40 + left_side{env="dev", pod="pod-xyz456"} 9 9 9 9 + right_side{env="prod", pod="pod-abc123"} 100 200 300 400 + right_side{env="dev", pod="pod-abc123"} 1000 2000 3000 4000 + right_side{env="dev", pod="pod-mno789"} 5 5 5 5 + +# Matches on both sides: returns results for matching series, ignores non-matching series +eval range from 0 to 24m step 6m left_side + right_side + {env="dev", pod="pod-abc123"} 1010 2020 3030 4040 + {env="prod", pod="pod-abc123"} 101 202 303 404 + +# No series on either side: returns no results +eval range from 0 to 24m step 6m left_side_that_doesnt_exist + right_side_that_doesnt_exist + +# No series on left side: returns no results +eval range from 0 to 24m step 6m left_side_that_doesnt_exist + right_side + +# No series on right side: returns no results +eval range from 0 to 24m step 6m left_side + right_side_that_doesnt_exist + +clear + +# Series match on both sides, but points don't align +load 6m + partial_left_side 1 2 _ _ + partial_right_side _ _ 3 4 + +eval range from 0 to 24m step 6m partial_left_side + partial_right_side + +clear + +# One-to-one matching with "on" and "ignoring" +load 6m + left_side{env="test", pod="a"} 1 2 3 + left_side{env="prod", pod="b"} 4 5 6 + right_side{env="prod", pod="a"} 10 20 30 + right_side{env="test", pod="b"} 40 50 60 + +eval range from 0 to 24m step 6m left_side + on(env) right_side + {env="prod"} 14 25 36 + {env="test"} 41 52 63 + +eval range from 0 to 24m step 6m left_side + ignoring(pod) right_side + {env="prod"} 14 25 36 + {env="test"} 41 52 63 + +clear + +# One-to-one matching with "on" and "ignoring" with multiple labels. +load 6m + left_side{env="test", pod="a", group="foo"} 1 2 3 + left_side{env="test", pod="b", group="bar"} 4 5 6 + left_side{env="prod", pod="a", group="baz"} 7 8 9 + right_side{env="test", pod="a", group="bar"} 10 20 30 + right_side{env="test", pod="b", group="baz"} 40 50 60 + right_side{env="prod", pod="a", group="foo"} 70 80 90 + +eval range from 0 to 24m step 6m left_side + on(env, pod) right_side + {env="prod", pod="a"} 77 88 99 + {env="test", pod="a"} 11 22 33 + {env="test", pod="b"} 44 55 66 + +eval range from 0 to 24m step 6m left_side + ignoring(env, pod) right_side + {group="baz"} 47 58 69 + {group="bar"} 14 25 36 + {group="foo"} 71 82 93 + +clear + +# One-to-one matching, but different series match at different time steps, or not at all +load 6m + left_side{env="test", bar="a"} 1 _ 3 _ _ 6 _ + left_side{env="test", bar="b"} _ 2 _ 4 _ _ _ + right_side{env="test", foo="0"} 2 2 _ _ _ _ 2 + right_side{env="test", foo="1"} _ _ 3 3 _ _ _ + +eval range from 0 to 42m step 6m left_side * on (env) right_side + {env="test"} 2 4 9 12 _ _ _ + +clear + +# One-to-one matching with multiple matches on left side +load 6m + left_side{env="test", pod="a"} 1 2 3 + left_side{env="test", pod="b"} 4 5 6 + left_side{env="test", pod="c"} 7 8 9 + left_side{env="test", pod="d"} _ 10 11 + right_side{env="test"} 100 200 300 + +eval_fail range from 0 to 42m step 6m left_side * on (env) right_side + # TODO: expected_message multiple matches for labels: many-to-one matching must be explicit (group_left/group_right) + +clear + +# One-to-one matching with multiple matches on right side +load 6m + left_side{env="test"} 100 200 300 + right_side{env="test", pod="a"} 1 2 3 + right_side{env="test", pod="b"} 4 5 6 + right_side{env="test", pod="c"} 7 8 9 + right_side{env="test", pod="d"} _ 10 11 + +eval_fail range from 0 to 42m step 6m left_side * on (env) right_side + # TODO: expected_message found duplicate series for the match group {env="test"} on the right hand-side of the operation: [{__name__="right_side", env="test", pod="b"}, {__name__="right_side", env="test", pod="a"}];many-to-many matching not allowed: matching labels must be unique on one side + +clear diff --git a/pkg/streamingpromql/testdata/upstream/operators.test b/pkg/streamingpromql/testdata/upstream/operators.test new file mode 100644 index 0000000000..b4f461eb93 --- /dev/null +++ b/pkg/streamingpromql/testdata/upstream/operators.test @@ -0,0 +1,576 @@ +# SPDX-License-Identifier: AGPL-3.0-only +# Provenance-includes-location: https://github.com/prometheus/prometheus/tree/main/promql/testdata/operators.test +# Provenance-includes-license: Apache-2.0 +# Provenance-includes-copyright: The Prometheus Authors + +load 5m + http_requests{job="api-server", instance="0", group="production"} 0+10x10 + http_requests{job="api-server", instance="1", group="production"} 0+20x10 + http_requests{job="api-server", instance="0", group="canary"} 0+30x10 + http_requests{job="api-server", instance="1", group="canary"} 0+40x10 + http_requests{job="app-server", instance="0", group="production"} 0+50x10 + http_requests{job="app-server", instance="1", group="production"} 0+60x10 + http_requests{job="app-server", instance="0", group="canary"} 0+70x10 + http_requests{job="app-server", instance="1", group="canary"} 0+80x10 + +load 5m + vector_matching_a{l="x"} 0+1x100 + vector_matching_a{l="y"} 0+2x50 + vector_matching_b{l="x"} 0+4x25 + + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) - COUNT(http_requests) BY (job) +# {job="api-server"} 996 +# {job="app-server"} 2596 + +# Unsupported by streaming engine. +# eval instant at 50m 2 - SUM(http_requests) BY (job) +# {job="api-server"} -998 +# {job="app-server"} -2598 + +# Unsupported by streaming engine. +# eval instant at 50m -http_requests{job="api-server",instance="0",group="production"} +# {job="api-server",instance="0",group="production"} -100 + +# Unsupported by streaming engine. +# eval instant at 50m +http_requests{job="api-server",instance="0",group="production"} +# http_requests{job="api-server",instance="0",group="production"} 100 + +# Unsupported by streaming engine. +# eval instant at 50m - - - SUM(http_requests) BY (job) +# {job="api-server"} -1000 +# {job="app-server"} -2600 + +# Unsupported by streaming engine. +# eval instant at 50m - - - 1 +# -1 + +# Unsupported by streaming engine. +# eval instant at 50m -2^---1*3 +# -1.5 + +# Unsupported by streaming engine. +# eval instant at 50m 2/-2^---1*3+2 +# -10 + +# Unsupported by streaming engine. +# eval instant at 50m -10^3 * - SUM(http_requests) BY (job) ^ -1 +# {job="api-server"} 1 +# {job="app-server"} 0.38461538461538464 + +# Unsupported by streaming engine. +# eval instant at 50m 1000 / SUM(http_requests) BY (job) +# {job="api-server"} 1 +# {job="app-server"} 0.38461538461538464 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) - 2 +# {job="api-server"} 998 +# {job="app-server"} 2598 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) % 3 +# {job="api-server"} 1 +# {job="app-server"} 2 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) % 0.3 +# {job="api-server"} 0.1 +# {job="app-server"} 0.2 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) ^ 2 +# {job="api-server"} 1000000 +# {job="app-server"} 6760000 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) % 3 ^ 2 +# {job="api-server"} 1 +# {job="app-server"} 8 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ (3 ^ 2) +# {job="api-server"} 488 +# {job="app-server"} 40 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 +# {job="api-server"} 488 +# {job="app-server"} 40 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2 +# {job="api-server"} 1000 +# {job="app-server"} 2600 + +# Unsupported by streaming engine. +# eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job) +# {job="api-server"} 256 +# {job="app-server"} 256 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) / 0 +# {job="api-server"} +Inf +# {job="app-server"} +Inf + +# Unsupported by streaming engine. +# eval instant at 50m http_requests{group="canary", instance="0", job="api-server"} / 0 +# {group="canary", instance="0", job="api-server"} +Inf + +# Unsupported by streaming engine. +# eval instant at 50m -1 * http_requests{group="canary", instance="0", job="api-server"} / 0 +# {group="canary", instance="0", job="api-server"} -Inf + +# Unsupported by streaming engine. +# eval instant at 50m 0 * http_requests{group="canary", instance="0", job="api-server"} / 0 +# {group="canary", instance="0", job="api-server"} NaN + +# Unsupported by streaming engine. +# eval instant at 50m 0 * http_requests{group="canary", instance="0", job="api-server"} % 0 +# {group="canary", instance="0", job="api-server"} NaN + +eval instant at 50m SUM(http_requests) BY (job) + SUM(http_requests) BY (job) + {job="api-server"} 2000 + {job="app-server"} 5200 + +eval instant at 50m (SUM((http_requests)) BY (job)) + SUM(http_requests) BY (job) + {job="api-server"} 2000 + {job="app-server"} 5200 + +eval instant at 50m http_requests{job="api-server", group="canary"} + http_requests{group="canary", instance="0", job="api-server"} 300 + http_requests{group="canary", instance="1", job="api-server"} 400 + +# Unsupported by streaming engine. +# eval instant at 50m http_requests{job="api-server", group="canary"} + rate(http_requests{job="api-server"}[5m]) * 5 * 60 +# {group="canary", instance="0", job="api-server"} 330 +# {group="canary", instance="1", job="api-server"} 440 + +# Unsupported by streaming engine. +# eval instant at 50m rate(http_requests[25m]) * 25 * 60 +# {group="canary", instance="0", job="api-server"} 150 +# {group="canary", instance="0", job="app-server"} 350 +# {group="canary", instance="1", job="api-server"} 200 +# {group="canary", instance="1", job="app-server"} 400 +# {group="production", instance="0", job="api-server"} 50 +# {group="production", instance="0", job="app-server"} 249.99999999999997 +# {group="production", instance="1", job="api-server"} 100 +# {group="production", instance="1", job="app-server"} 300 + +# Unsupported by streaming engine. +# eval instant at 50m (rate((http_requests[25m])) * 25) * 60 +# {group="canary", instance="0", job="api-server"} 150 +# {group="canary", instance="0", job="app-server"} 350 +# {group="canary", instance="1", job="api-server"} 200 +# {group="canary", instance="1", job="app-server"} 400 +# {group="production", instance="0", job="api-server"} 50 +# {group="production", instance="0", job="app-server"} 249.99999999999997 +# {group="production", instance="1", job="api-server"} 100 +# {group="production", instance="1", job="app-server"} 300 + + +# Unsupported by streaming engine. +# eval instant at 50m http_requests{group="canary"} and http_requests{instance="0"} +# http_requests{group="canary", instance="0", job="api-server"} 300 +# http_requests{group="canary", instance="0", job="app-server"} 700 + +# Unsupported by streaming engine. +# eval instant at 50m (http_requests{group="canary"} + 1) and http_requests{instance="0"} +# {group="canary", instance="0", job="api-server"} 301 +# {group="canary", instance="0", job="app-server"} 701 + +# Unsupported by streaming engine. +# eval instant at 50m (http_requests{group="canary"} + 1) and on(instance, job) http_requests{instance="0", group="production"} +# {group="canary", instance="0", job="api-server"} 301 +# {group="canary", instance="0", job="app-server"} 701 + +# Unsupported by streaming engine. +# eval instant at 50m (http_requests{group="canary"} + 1) and on(instance) http_requests{instance="0", group="production"} +# {group="canary", instance="0", job="api-server"} 301 +# {group="canary", instance="0", job="app-server"} 701 + +# Unsupported by streaming engine. +# eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group) http_requests{instance="0", group="production"} +# {group="canary", instance="0", job="api-server"} 301 +# {group="canary", instance="0", job="app-server"} 701 + +# Unsupported by streaming engine. +# eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group, job) http_requests{instance="0", group="production"} +# {group="canary", instance="0", job="api-server"} 301 +# {group="canary", instance="0", job="app-server"} 701 + +# Unsupported by streaming engine. +# eval instant at 50m http_requests{group="canary"} or http_requests{group="production"} +# http_requests{group="canary", instance="0", job="api-server"} 300 +# http_requests{group="canary", instance="0", job="app-server"} 700 +# http_requests{group="canary", instance="1", job="api-server"} 400 +# http_requests{group="canary", instance="1", job="app-server"} 800 +# http_requests{group="production", instance="0", job="api-server"} 100 +# http_requests{group="production", instance="0", job="app-server"} 500 +# http_requests{group="production", instance="1", job="api-server"} 200 +# http_requests{group="production", instance="1", job="app-server"} 600 + +# On overlap the rhs samples must be dropped. +# Unsupported by streaming engine. +# eval instant at 50m (http_requests{group="canary"} + 1) or http_requests{instance="1"} +# {group="canary", instance="0", job="api-server"} 301 +# {group="canary", instance="0", job="app-server"} 701 +# {group="canary", instance="1", job="api-server"} 401 +# {group="canary", instance="1", job="app-server"} 801 +# http_requests{group="production", instance="1", job="api-server"} 200 +# http_requests{group="production", instance="1", job="app-server"} 600 + + +# Matching only on instance excludes everything that has instance=0/1 but includes +# entries without the instance label. +# Unsupported by streaming engine. +# eval instant at 50m (http_requests{group="canary"} + 1) or on(instance) (http_requests or cpu_count or vector_matching_a) +# {group="canary", instance="0", job="api-server"} 301 +# {group="canary", instance="0", job="app-server"} 701 +# {group="canary", instance="1", job="api-server"} 401 +# {group="canary", instance="1", job="app-server"} 801 +# vector_matching_a{l="x"} 10 +# vector_matching_a{l="y"} 20 + +# Unsupported by streaming engine. +# eval instant at 50m (http_requests{group="canary"} + 1) or ignoring(l, group, job) (http_requests or cpu_count or vector_matching_a) +# {group="canary", instance="0", job="api-server"} 301 +# {group="canary", instance="0", job="app-server"} 701 +# {group="canary", instance="1", job="api-server"} 401 +# {group="canary", instance="1", job="app-server"} 801 +# vector_matching_a{l="x"} 10 +# vector_matching_a{l="y"} 20 + +# Unsupported by streaming engine. +# eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"} +# http_requests{group="canary", instance="1", job="api-server"} 400 +# http_requests{group="canary", instance="1", job="app-server"} 800 + +# Unsupported by streaming engine. +# eval instant at 50m http_requests{group="canary"} unless on(job) http_requests{instance="0"} + +# Unsupported by streaming engine. +# eval instant at 50m http_requests{group="canary"} unless on(job, instance) http_requests{instance="0"} +# http_requests{group="canary", instance="1", job="api-server"} 400 +# http_requests{group="canary", instance="1", job="app-server"} 800 + +eval instant at 50m http_requests{group="canary"} / on(instance,job) http_requests{group="production"} + {instance="0", job="api-server"} 3 + {instance="0", job="app-server"} 1.4 + {instance="1", job="api-server"} 2 + {instance="1", job="app-server"} 1.3333333333333333 + +# Unsupported by streaming engine. +# eval instant at 50m http_requests{group="canary"} unless ignoring(group, instance) http_requests{instance="0"} + +# Unsupported by streaming engine. +# eval instant at 50m http_requests{group="canary"} unless ignoring(group) http_requests{instance="0"} +# http_requests{group="canary", instance="1", job="api-server"} 400 +# http_requests{group="canary", instance="1", job="app-server"} 800 + +eval instant at 50m http_requests{group="canary"} / ignoring(group) http_requests{group="production"} + {instance="0", job="api-server"} 3 + {instance="0", job="app-server"} 1.4 + {instance="1", job="api-server"} 2 + {instance="1", job="app-server"} 1.3333333333333333 + +# https://github.com/prometheus/prometheus/issues/1489 +# Unsupported by streaming engine. +# eval instant at 50m http_requests AND ON (dummy) vector(1) +# http_requests{group="canary", instance="0", job="api-server"} 300 +# http_requests{group="canary", instance="0", job="app-server"} 700 +# http_requests{group="canary", instance="1", job="api-server"} 400 +# http_requests{group="canary", instance="1", job="app-server"} 800 +# http_requests{group="production", instance="0", job="api-server"} 100 +# http_requests{group="production", instance="0", job="app-server"} 500 +# http_requests{group="production", instance="1", job="api-server"} 200 +# http_requests{group="production", instance="1", job="app-server"} 600 + +# Unsupported by streaming engine. +# eval instant at 50m http_requests AND IGNORING (group, instance, job) vector(1) +# http_requests{group="canary", instance="0", job="api-server"} 300 +# http_requests{group="canary", instance="0", job="app-server"} 700 +# http_requests{group="canary", instance="1", job="api-server"} 400 +# http_requests{group="canary", instance="1", job="app-server"} 800 +# http_requests{group="production", instance="0", job="api-server"} 100 +# http_requests{group="production", instance="0", job="app-server"} 500 +# http_requests{group="production", instance="1", job="api-server"} 200 +# http_requests{group="production", instance="1", job="app-server"} 600 + + +# Comparisons. +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) > 1000 +# {job="app-server"} 2600 + +# Unsupported by streaming engine. +# eval instant at 50m 1000 < SUM(http_requests) BY (job) +# {job="app-server"} 2600 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) <= 1000 +# {job="api-server"} 1000 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) != 1000 +# {job="app-server"} 2600 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) == 1000 +# {job="api-server"} 1000 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) == bool 1000 +# {job="api-server"} 1 +# {job="app-server"} 0 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) == bool SUM(http_requests) BY (job) +# {job="api-server"} 1 +# {job="app-server"} 1 + +# Unsupported by streaming engine. +# eval instant at 50m SUM(http_requests) BY (job) != bool SUM(http_requests) BY (job) +# {job="api-server"} 0 +# {job="app-server"} 0 + +# Unsupported by streaming engine. +# eval instant at 50m 0 == bool 1 +# 0 + +# Unsupported by streaming engine. +# eval instant at 50m 1 == bool 1 +# 1 + +# Unsupported by streaming engine. +# eval instant at 50m http_requests{job="api-server", instance="0", group="production"} == bool 100 +# {job="api-server", instance="0", group="production"} 1 + +# group_left/group_right. + +clear + +load 5m + node_var{instance="abc",job="node"} 2 + node_role{instance="abc",job="node",role="prometheus"} 1 + +load 5m + node_cpu{instance="abc",job="node",mode="idle"} 3 + node_cpu{instance="abc",job="node",mode="user"} 1 + node_cpu{instance="def",job="node",mode="idle"} 8 + node_cpu{instance="def",job="node",mode="user"} 2 + +load 5m + random{foo="bar"} 1 + +load 5m + threshold{instance="abc",job="node",target="a@b.com"} 0 + +# Copy machine role to node variable. +# Unsupported by streaming engine. +# eval instant at 5m node_role * on (instance) group_right (role) node_var +# {instance="abc",job="node",role="prometheus"} 2 + +# Unsupported by streaming engine. +# eval instant at 5m node_var * on (instance) group_left (role) node_role +# {instance="abc",job="node",role="prometheus"} 2 + +# Unsupported by streaming engine. +# eval instant at 5m node_var * ignoring (role) group_left (role) node_role +# {instance="abc",job="node",role="prometheus"} 2 + +# Unsupported by streaming engine. +# eval instant at 5m node_role * ignoring (role) group_right (role) node_var +# {instance="abc",job="node",role="prometheus"} 2 + +# Copy machine role to node variable with instrumentation labels. +# Unsupported by streaming engine. +# eval instant at 5m node_cpu * ignoring (role, mode) group_left (role) node_role +# {instance="abc",job="node",mode="idle",role="prometheus"} 3 +# {instance="abc",job="node",mode="user",role="prometheus"} 1 + +# Unsupported by streaming engine. +# eval instant at 5m node_cpu * on (instance) group_left (role) node_role +# {instance="abc",job="node",mode="idle",role="prometheus"} 3 +# {instance="abc",job="node",mode="user",role="prometheus"} 1 + + +# Ratio of total. +# Unsupported by streaming engine. +# eval instant at 5m node_cpu / on (instance) group_left sum by (instance,job)(node_cpu) +# {instance="abc",job="node",mode="idle"} .75 +# {instance="abc",job="node",mode="user"} .25 +# {instance="def",job="node",mode="idle"} .80 +# {instance="def",job="node",mode="user"} .20 + +# Unsupported by streaming engine. +# eval instant at 5m sum by (mode, job)(node_cpu) / on (job) group_left sum by (job)(node_cpu) +# {job="node",mode="idle"} 0.7857142857142857 +# {job="node",mode="user"} 0.21428571428571427 + +# Unsupported by streaming engine. +# eval instant at 5m sum(sum by (mode, job)(node_cpu) / on (job) group_left sum by (job)(node_cpu)) +# {} 1.0 + + +# Unsupported by streaming engine. +# eval instant at 5m node_cpu / ignoring (mode) group_left sum without (mode)(node_cpu) +# {instance="abc",job="node",mode="idle"} .75 +# {instance="abc",job="node",mode="user"} .25 +# {instance="def",job="node",mode="idle"} .80 +# {instance="def",job="node",mode="user"} .20 + +# Unsupported by streaming engine. +# eval instant at 5m node_cpu / ignoring (mode) group_left(dummy) sum without (mode)(node_cpu) +# {instance="abc",job="node",mode="idle"} .75 +# {instance="abc",job="node",mode="user"} .25 +# {instance="def",job="node",mode="idle"} .80 +# {instance="def",job="node",mode="user"} .20 + +# Unsupported by streaming engine. +# eval instant at 5m sum without (instance)(node_cpu) / ignoring (mode) group_left sum without (instance, mode)(node_cpu) +# {job="node",mode="idle"} 0.7857142857142857 +# {job="node",mode="user"} 0.21428571428571427 + +# Unsupported by streaming engine. +# eval instant at 5m sum(sum without (instance)(node_cpu) / ignoring (mode) group_left sum without (instance, mode)(node_cpu)) +# {} 1.0 + + +# Copy over label from metric with no matching labels, without having to list cross-job target labels ('job' here). +# Unsupported by streaming engine. +# eval instant at 5m node_cpu + on(dummy) group_left(foo) random*0 +# {instance="abc",job="node",mode="idle",foo="bar"} 3 +# {instance="abc",job="node",mode="user",foo="bar"} 1 +# {instance="def",job="node",mode="idle",foo="bar"} 8 +# {instance="def",job="node",mode="user",foo="bar"} 2 + + +# Use threshold from metric, and copy over target. +# Unsupported by streaming engine. +# eval instant at 5m node_cpu > on(job, instance) group_left(target) threshold +# node_cpu{instance="abc",job="node",mode="idle",target="a@b.com"} 3 +# node_cpu{instance="abc",job="node",mode="user",target="a@b.com"} 1 + +# Use threshold from metric, and a default (1) if it's not present. +# Unsupported by streaming engine. +# eval instant at 5m node_cpu > on(job, instance) group_left(target) (threshold or on (job, instance) (sum by (job, instance)(node_cpu) * 0 + 1)) +# node_cpu{instance="abc",job="node",mode="idle",target="a@b.com"} 3 +# node_cpu{instance="abc",job="node",mode="user",target="a@b.com"} 1 +# node_cpu{instance="def",job="node",mode="idle"} 8 +# node_cpu{instance="def",job="node",mode="user"} 2 + + +# Check that binops drop the metric name. +# Unsupported by streaming engine. +# eval instant at 5m node_cpu + 2 +# {instance="abc",job="node",mode="idle"} 5 +# {instance="abc",job="node",mode="user"} 3 +# {instance="def",job="node",mode="idle"} 10 +# {instance="def",job="node",mode="user"} 4 + +# Unsupported by streaming engine. +# eval instant at 5m node_cpu - 2 +# {instance="abc",job="node",mode="idle"} 1 +# {instance="abc",job="node",mode="user"} -1 +# {instance="def",job="node",mode="idle"} 6 +# {instance="def",job="node",mode="user"} 0 + +# Unsupported by streaming engine. +# eval instant at 5m node_cpu / 2 +# {instance="abc",job="node",mode="idle"} 1.5 +# {instance="abc",job="node",mode="user"} 0.5 +# {instance="def",job="node",mode="idle"} 4 +# {instance="def",job="node",mode="user"} 1 + +# Unsupported by streaming engine. +# eval instant at 5m node_cpu * 2 +# {instance="abc",job="node",mode="idle"} 6 +# {instance="abc",job="node",mode="user"} 2 +# {instance="def",job="node",mode="idle"} 16 +# {instance="def",job="node",mode="user"} 4 + +# Unsupported by streaming engine. +# eval instant at 5m node_cpu ^ 2 +# {instance="abc",job="node",mode="idle"} 9 +# {instance="abc",job="node",mode="user"} 1 +# {instance="def",job="node",mode="idle"} 64 +# {instance="def",job="node",mode="user"} 4 + +# Unsupported by streaming engine. +# eval instant at 5m node_cpu % 2 +# {instance="abc",job="node",mode="idle"} 1 +# {instance="abc",job="node",mode="user"} 1 +# {instance="def",job="node",mode="idle"} 0 +# {instance="def",job="node",mode="user"} 0 + + +clear + +load 5m + random{foo="bar"} 2 + metricA{baz="meh"} 3 + metricB{baz="meh"} 4 + +# On with no labels, for metrics with no common labels. +eval instant at 5m random + on() metricA + {} 5 + +# Ignoring with no labels is the same as no ignoring. +eval instant at 5m metricA + ignoring() metricB + {baz="meh"} 7 + +eval instant at 5m metricA + metricB + {baz="meh"} 7 + +clear + +# Test duplicate labelset in promql output. +load 5m + testmetric1{src="a",dst="b"} 0 + testmetric2{src="a",dst="b"} 1 + +# Unsupported by streaming engine. +# eval_fail instant at 0m -{__name__=~'testmetric1|testmetric2'} + +clear + +load 5m + test_total{instance="localhost"} 50 + test_smaller{instance="localhost"} 10 + +# Unsupported by streaming engine. +# eval instant at 5m test_total > bool test_smaller +# {instance="localhost"} 1 + +# Unsupported by streaming engine. +# eval instant at 5m test_total > test_smaller +# test_total{instance="localhost"} 50 + +# Unsupported by streaming engine. +# eval instant at 5m test_total < bool test_smaller +# {instance="localhost"} 0 + +# Unsupported by streaming engine. +# eval instant at 5m test_total < test_smaller + +clear + +# Testing atan2. +load 5m + trigy{} 10 + trigx{} 20 + trigNaN{} NaN + +eval instant at 5m trigy atan2 trigx + {} 0.4636476090008061 + +eval instant at 5m trigy atan2 trigNaN + {} NaN + +# eval instant at 5m 10 atan2 20 +# 0.4636476090008061 + +# eval instant at 5m 10 atan2 NaN +# NaN diff --git a/pkg/streamingpromql/testdata/upstream/operators.test.disabled b/pkg/streamingpromql/testdata/upstream/operators.test.disabled deleted file mode 100644 index 14bf0b103d..0000000000 --- a/pkg/streamingpromql/testdata/upstream/operators.test.disabled +++ /dev/null @@ -1,494 +0,0 @@ -# SPDX-License-Identifier: AGPL-3.0-only -# Provenance-includes-location: https://github.com/prometheus/prometheus/tree/main/promql/testdata/operators.test -# Provenance-includes-license: Apache-2.0 -# Provenance-includes-copyright: The Prometheus Authors - -load 5m - http_requests{job="api-server", instance="0", group="production"} 0+10x10 - http_requests{job="api-server", instance="1", group="production"} 0+20x10 - http_requests{job="api-server", instance="0", group="canary"} 0+30x10 - http_requests{job="api-server", instance="1", group="canary"} 0+40x10 - http_requests{job="app-server", instance="0", group="production"} 0+50x10 - http_requests{job="app-server", instance="1", group="production"} 0+60x10 - http_requests{job="app-server", instance="0", group="canary"} 0+70x10 - http_requests{job="app-server", instance="1", group="canary"} 0+80x10 - -load 5m - vector_matching_a{l="x"} 0+1x100 - vector_matching_a{l="y"} 0+2x50 - vector_matching_b{l="x"} 0+4x25 - - -eval instant at 50m SUM(http_requests) BY (job) - COUNT(http_requests) BY (job) - {job="api-server"} 996 - {job="app-server"} 2596 - -eval instant at 50m 2 - SUM(http_requests) BY (job) - {job="api-server"} -998 - {job="app-server"} -2598 - -eval instant at 50m -http_requests{job="api-server",instance="0",group="production"} - {job="api-server",instance="0",group="production"} -100 - -eval instant at 50m +http_requests{job="api-server",instance="0",group="production"} - http_requests{job="api-server",instance="0",group="production"} 100 - -eval instant at 50m - - - SUM(http_requests) BY (job) - {job="api-server"} -1000 - {job="app-server"} -2600 - -eval instant at 50m - - - 1 - -1 - -eval instant at 50m -2^---1*3 - -1.5 - -eval instant at 50m 2/-2^---1*3+2 - -10 - -eval instant at 50m -10^3 * - SUM(http_requests) BY (job) ^ -1 - {job="api-server"} 1 - {job="app-server"} 0.38461538461538464 - -eval instant at 50m 1000 / SUM(http_requests) BY (job) - {job="api-server"} 1 - {job="app-server"} 0.38461538461538464 - -eval instant at 50m SUM(http_requests) BY (job) - 2 - {job="api-server"} 998 - {job="app-server"} 2598 - -eval instant at 50m SUM(http_requests) BY (job) % 3 - {job="api-server"} 1 - {job="app-server"} 2 - -eval instant at 50m SUM(http_requests) BY (job) % 0.3 - {job="api-server"} 0.1 - {job="app-server"} 0.2 - -eval instant at 50m SUM(http_requests) BY (job) ^ 2 - {job="api-server"} 1000000 - {job="app-server"} 6760000 - -eval instant at 50m SUM(http_requests) BY (job) % 3 ^ 2 - {job="api-server"} 1 - {job="app-server"} 8 - -eval instant at 50m SUM(http_requests) BY (job) % 2 ^ (3 ^ 2) - {job="api-server"} 488 - {job="app-server"} 40 - -eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 - {job="api-server"} 488 - {job="app-server"} 40 - -eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2 - {job="api-server"} 1000 - {job="app-server"} 2600 - -eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job) - {job="api-server"} 256 - {job="app-server"} 256 - -eval instant at 50m SUM(http_requests) BY (job) / 0 - {job="api-server"} +Inf - {job="app-server"} +Inf - -eval instant at 50m http_requests{group="canary", instance="0", job="api-server"} / 0 - {group="canary", instance="0", job="api-server"} +Inf - -eval instant at 50m -1 * http_requests{group="canary", instance="0", job="api-server"} / 0 - {group="canary", instance="0", job="api-server"} -Inf - -eval instant at 50m 0 * http_requests{group="canary", instance="0", job="api-server"} / 0 - {group="canary", instance="0", job="api-server"} NaN - -eval instant at 50m 0 * http_requests{group="canary", instance="0", job="api-server"} % 0 - {group="canary", instance="0", job="api-server"} NaN - -eval instant at 50m SUM(http_requests) BY (job) + SUM(http_requests) BY (job) - {job="api-server"} 2000 - {job="app-server"} 5200 - -eval instant at 50m (SUM((http_requests)) BY (job)) + SUM(http_requests) BY (job) - {job="api-server"} 2000 - {job="app-server"} 5200 - -eval instant at 50m http_requests{job="api-server", group="canary"} - http_requests{group="canary", instance="0", job="api-server"} 300 - http_requests{group="canary", instance="1", job="api-server"} 400 - -eval instant at 50m http_requests{job="api-server", group="canary"} + rate(http_requests{job="api-server"}[5m]) * 5 * 60 - {group="canary", instance="0", job="api-server"} 330 - {group="canary", instance="1", job="api-server"} 440 - -eval instant at 50m rate(http_requests[25m]) * 25 * 60 - {group="canary", instance="0", job="api-server"} 150 - {group="canary", instance="0", job="app-server"} 350 - {group="canary", instance="1", job="api-server"} 200 - {group="canary", instance="1", job="app-server"} 400 - {group="production", instance="0", job="api-server"} 50 - {group="production", instance="0", job="app-server"} 249.99999999999997 - {group="production", instance="1", job="api-server"} 100 - {group="production", instance="1", job="app-server"} 300 - -eval instant at 50m (rate((http_requests[25m])) * 25) * 60 - {group="canary", instance="0", job="api-server"} 150 - {group="canary", instance="0", job="app-server"} 350 - {group="canary", instance="1", job="api-server"} 200 - {group="canary", instance="1", job="app-server"} 400 - {group="production", instance="0", job="api-server"} 50 - {group="production", instance="0", job="app-server"} 249.99999999999997 - {group="production", instance="1", job="api-server"} 100 - {group="production", instance="1", job="app-server"} 300 - - -eval instant at 50m http_requests{group="canary"} and http_requests{instance="0"} - http_requests{group="canary", instance="0", job="api-server"} 300 - http_requests{group="canary", instance="0", job="app-server"} 700 - -eval instant at 50m (http_requests{group="canary"} + 1) and http_requests{instance="0"} - {group="canary", instance="0", job="api-server"} 301 - {group="canary", instance="0", job="app-server"} 701 - -eval instant at 50m (http_requests{group="canary"} + 1) and on(instance, job) http_requests{instance="0", group="production"} - {group="canary", instance="0", job="api-server"} 301 - {group="canary", instance="0", job="app-server"} 701 - -eval instant at 50m (http_requests{group="canary"} + 1) and on(instance) http_requests{instance="0", group="production"} - {group="canary", instance="0", job="api-server"} 301 - {group="canary", instance="0", job="app-server"} 701 - -eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group) http_requests{instance="0", group="production"} - {group="canary", instance="0", job="api-server"} 301 - {group="canary", instance="0", job="app-server"} 701 - -eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group, job) http_requests{instance="0", group="production"} - {group="canary", instance="0", job="api-server"} 301 - {group="canary", instance="0", job="app-server"} 701 - -eval instant at 50m http_requests{group="canary"} or http_requests{group="production"} - http_requests{group="canary", instance="0", job="api-server"} 300 - http_requests{group="canary", instance="0", job="app-server"} 700 - http_requests{group="canary", instance="1", job="api-server"} 400 - http_requests{group="canary", instance="1", job="app-server"} 800 - http_requests{group="production", instance="0", job="api-server"} 100 - http_requests{group="production", instance="0", job="app-server"} 500 - http_requests{group="production", instance="1", job="api-server"} 200 - http_requests{group="production", instance="1", job="app-server"} 600 - -# On overlap the rhs samples must be dropped. -eval instant at 50m (http_requests{group="canary"} + 1) or http_requests{instance="1"} - {group="canary", instance="0", job="api-server"} 301 - {group="canary", instance="0", job="app-server"} 701 - {group="canary", instance="1", job="api-server"} 401 - {group="canary", instance="1", job="app-server"} 801 - http_requests{group="production", instance="1", job="api-server"} 200 - http_requests{group="production", instance="1", job="app-server"} 600 - - -# Matching only on instance excludes everything that has instance=0/1 but includes -# entries without the instance label. -eval instant at 50m (http_requests{group="canary"} + 1) or on(instance) (http_requests or cpu_count or vector_matching_a) - {group="canary", instance="0", job="api-server"} 301 - {group="canary", instance="0", job="app-server"} 701 - {group="canary", instance="1", job="api-server"} 401 - {group="canary", instance="1", job="app-server"} 801 - vector_matching_a{l="x"} 10 - vector_matching_a{l="y"} 20 - -eval instant at 50m (http_requests{group="canary"} + 1) or ignoring(l, group, job) (http_requests or cpu_count or vector_matching_a) - {group="canary", instance="0", job="api-server"} 301 - {group="canary", instance="0", job="app-server"} 701 - {group="canary", instance="1", job="api-server"} 401 - {group="canary", instance="1", job="app-server"} 801 - vector_matching_a{l="x"} 10 - vector_matching_a{l="y"} 20 - -eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"} - http_requests{group="canary", instance="1", job="api-server"} 400 - http_requests{group="canary", instance="1", job="app-server"} 800 - -eval instant at 50m http_requests{group="canary"} unless on(job) http_requests{instance="0"} - -eval instant at 50m http_requests{group="canary"} unless on(job, instance) http_requests{instance="0"} - http_requests{group="canary", instance="1", job="api-server"} 400 - http_requests{group="canary", instance="1", job="app-server"} 800 - -eval instant at 50m http_requests{group="canary"} / on(instance,job) http_requests{group="production"} - {instance="0", job="api-server"} 3 - {instance="0", job="app-server"} 1.4 - {instance="1", job="api-server"} 2 - {instance="1", job="app-server"} 1.3333333333333333 - -eval instant at 50m http_requests{group="canary"} unless ignoring(group, instance) http_requests{instance="0"} - -eval instant at 50m http_requests{group="canary"} unless ignoring(group) http_requests{instance="0"} - http_requests{group="canary", instance="1", job="api-server"} 400 - http_requests{group="canary", instance="1", job="app-server"} 800 - -eval instant at 50m http_requests{group="canary"} / ignoring(group) http_requests{group="production"} - {instance="0", job="api-server"} 3 - {instance="0", job="app-server"} 1.4 - {instance="1", job="api-server"} 2 - {instance="1", job="app-server"} 1.3333333333333333 - -# https://github.com/prometheus/prometheus/issues/1489 -eval instant at 50m http_requests AND ON (dummy) vector(1) - http_requests{group="canary", instance="0", job="api-server"} 300 - http_requests{group="canary", instance="0", job="app-server"} 700 - http_requests{group="canary", instance="1", job="api-server"} 400 - http_requests{group="canary", instance="1", job="app-server"} 800 - http_requests{group="production", instance="0", job="api-server"} 100 - http_requests{group="production", instance="0", job="app-server"} 500 - http_requests{group="production", instance="1", job="api-server"} 200 - http_requests{group="production", instance="1", job="app-server"} 600 - -eval instant at 50m http_requests AND IGNORING (group, instance, job) vector(1) - http_requests{group="canary", instance="0", job="api-server"} 300 - http_requests{group="canary", instance="0", job="app-server"} 700 - http_requests{group="canary", instance="1", job="api-server"} 400 - http_requests{group="canary", instance="1", job="app-server"} 800 - http_requests{group="production", instance="0", job="api-server"} 100 - http_requests{group="production", instance="0", job="app-server"} 500 - http_requests{group="production", instance="1", job="api-server"} 200 - http_requests{group="production", instance="1", job="app-server"} 600 - - -# Comparisons. -eval instant at 50m SUM(http_requests) BY (job) > 1000 - {job="app-server"} 2600 - -eval instant at 50m 1000 < SUM(http_requests) BY (job) - {job="app-server"} 2600 - -eval instant at 50m SUM(http_requests) BY (job) <= 1000 - {job="api-server"} 1000 - -eval instant at 50m SUM(http_requests) BY (job) != 1000 - {job="app-server"} 2600 - -eval instant at 50m SUM(http_requests) BY (job) == 1000 - {job="api-server"} 1000 - -eval instant at 50m SUM(http_requests) BY (job) == bool 1000 - {job="api-server"} 1 - {job="app-server"} 0 - -eval instant at 50m SUM(http_requests) BY (job) == bool SUM(http_requests) BY (job) - {job="api-server"} 1 - {job="app-server"} 1 - -eval instant at 50m SUM(http_requests) BY (job) != bool SUM(http_requests) BY (job) - {job="api-server"} 0 - {job="app-server"} 0 - -eval instant at 50m 0 == bool 1 - 0 - -eval instant at 50m 1 == bool 1 - 1 - -eval instant at 50m http_requests{job="api-server", instance="0", group="production"} == bool 100 - {job="api-server", instance="0", group="production"} 1 - -# group_left/group_right. - -clear - -load 5m - node_var{instance="abc",job="node"} 2 - node_role{instance="abc",job="node",role="prometheus"} 1 - -load 5m - node_cpu{instance="abc",job="node",mode="idle"} 3 - node_cpu{instance="abc",job="node",mode="user"} 1 - node_cpu{instance="def",job="node",mode="idle"} 8 - node_cpu{instance="def",job="node",mode="user"} 2 - -load 5m - random{foo="bar"} 1 - -load 5m - threshold{instance="abc",job="node",target="a@b.com"} 0 - -# Copy machine role to node variable. -eval instant at 5m node_role * on (instance) group_right (role) node_var - {instance="abc",job="node",role="prometheus"} 2 - -eval instant at 5m node_var * on (instance) group_left (role) node_role - {instance="abc",job="node",role="prometheus"} 2 - -eval instant at 5m node_var * ignoring (role) group_left (role) node_role - {instance="abc",job="node",role="prometheus"} 2 - -eval instant at 5m node_role * ignoring (role) group_right (role) node_var - {instance="abc",job="node",role="prometheus"} 2 - -# Copy machine role to node variable with instrumentation labels. -eval instant at 5m node_cpu * ignoring (role, mode) group_left (role) node_role - {instance="abc",job="node",mode="idle",role="prometheus"} 3 - {instance="abc",job="node",mode="user",role="prometheus"} 1 - -eval instant at 5m node_cpu * on (instance) group_left (role) node_role - {instance="abc",job="node",mode="idle",role="prometheus"} 3 - {instance="abc",job="node",mode="user",role="prometheus"} 1 - - -# Ratio of total. -eval instant at 5m node_cpu / on (instance) group_left sum by (instance,job)(node_cpu) - {instance="abc",job="node",mode="idle"} .75 - {instance="abc",job="node",mode="user"} .25 - {instance="def",job="node",mode="idle"} .80 - {instance="def",job="node",mode="user"} .20 - -eval instant at 5m sum by (mode, job)(node_cpu) / on (job) group_left sum by (job)(node_cpu) - {job="node",mode="idle"} 0.7857142857142857 - {job="node",mode="user"} 0.21428571428571427 - -eval instant at 5m sum(sum by (mode, job)(node_cpu) / on (job) group_left sum by (job)(node_cpu)) - {} 1.0 - - -eval instant at 5m node_cpu / ignoring (mode) group_left sum without (mode)(node_cpu) - {instance="abc",job="node",mode="idle"} .75 - {instance="abc",job="node",mode="user"} .25 - {instance="def",job="node",mode="idle"} .80 - {instance="def",job="node",mode="user"} .20 - -eval instant at 5m node_cpu / ignoring (mode) group_left(dummy) sum without (mode)(node_cpu) - {instance="abc",job="node",mode="idle"} .75 - {instance="abc",job="node",mode="user"} .25 - {instance="def",job="node",mode="idle"} .80 - {instance="def",job="node",mode="user"} .20 - -eval instant at 5m sum without (instance)(node_cpu) / ignoring (mode) group_left sum without (instance, mode)(node_cpu) - {job="node",mode="idle"} 0.7857142857142857 - {job="node",mode="user"} 0.21428571428571427 - -eval instant at 5m sum(sum without (instance)(node_cpu) / ignoring (mode) group_left sum without (instance, mode)(node_cpu)) - {} 1.0 - - -# Copy over label from metric with no matching labels, without having to list cross-job target labels ('job' here). -eval instant at 5m node_cpu + on(dummy) group_left(foo) random*0 - {instance="abc",job="node",mode="idle",foo="bar"} 3 - {instance="abc",job="node",mode="user",foo="bar"} 1 - {instance="def",job="node",mode="idle",foo="bar"} 8 - {instance="def",job="node",mode="user",foo="bar"} 2 - - -# Use threshold from metric, and copy over target. -eval instant at 5m node_cpu > on(job, instance) group_left(target) threshold - node_cpu{instance="abc",job="node",mode="idle",target="a@b.com"} 3 - node_cpu{instance="abc",job="node",mode="user",target="a@b.com"} 1 - -# Use threshold from metric, and a default (1) if it's not present. -eval instant at 5m node_cpu > on(job, instance) group_left(target) (threshold or on (job, instance) (sum by (job, instance)(node_cpu) * 0 + 1)) - node_cpu{instance="abc",job="node",mode="idle",target="a@b.com"} 3 - node_cpu{instance="abc",job="node",mode="user",target="a@b.com"} 1 - node_cpu{instance="def",job="node",mode="idle"} 8 - node_cpu{instance="def",job="node",mode="user"} 2 - - -# Check that binops drop the metric name. -eval instant at 5m node_cpu + 2 - {instance="abc",job="node",mode="idle"} 5 - {instance="abc",job="node",mode="user"} 3 - {instance="def",job="node",mode="idle"} 10 - {instance="def",job="node",mode="user"} 4 - -eval instant at 5m node_cpu - 2 - {instance="abc",job="node",mode="idle"} 1 - {instance="abc",job="node",mode="user"} -1 - {instance="def",job="node",mode="idle"} 6 - {instance="def",job="node",mode="user"} 0 - -eval instant at 5m node_cpu / 2 - {instance="abc",job="node",mode="idle"} 1.5 - {instance="abc",job="node",mode="user"} 0.5 - {instance="def",job="node",mode="idle"} 4 - {instance="def",job="node",mode="user"} 1 - -eval instant at 5m node_cpu * 2 - {instance="abc",job="node",mode="idle"} 6 - {instance="abc",job="node",mode="user"} 2 - {instance="def",job="node",mode="idle"} 16 - {instance="def",job="node",mode="user"} 4 - -eval instant at 5m node_cpu ^ 2 - {instance="abc",job="node",mode="idle"} 9 - {instance="abc",job="node",mode="user"} 1 - {instance="def",job="node",mode="idle"} 64 - {instance="def",job="node",mode="user"} 4 - -eval instant at 5m node_cpu % 2 - {instance="abc",job="node",mode="idle"} 1 - {instance="abc",job="node",mode="user"} 1 - {instance="def",job="node",mode="idle"} 0 - {instance="def",job="node",mode="user"} 0 - - -clear - -load 5m - random{foo="bar"} 2 - metricA{baz="meh"} 3 - metricB{baz="meh"} 4 - -# On with no labels, for metrics with no common labels. -eval instant at 5m random + on() metricA - {} 5 - -# Ignoring with no labels is the same as no ignoring. -eval instant at 5m metricA + ignoring() metricB - {baz="meh"} 7 - -eval instant at 5m metricA + metricB - {baz="meh"} 7 - -clear - -# Test duplicate labelset in promql output. -load 5m - testmetric1{src="a",dst="b"} 0 - testmetric2{src="a",dst="b"} 1 - -eval_fail instant at 0m -{__name__=~'testmetric1|testmetric2'} - -clear - -load 5m - test_total{instance="localhost"} 50 - test_smaller{instance="localhost"} 10 - -eval instant at 5m test_total > bool test_smaller - {instance="localhost"} 1 - -eval instant at 5m test_total > test_smaller - test_total{instance="localhost"} 50 - -eval instant at 5m test_total < bool test_smaller - {instance="localhost"} 0 - -eval instant at 5m test_total < test_smaller - -clear - -# Testing atan2. -load 5m - trigy{} 10 - trigx{} 20 - trigNaN{} NaN - -eval instant at 5m trigy atan2 trigx - {} 0.4636476090008061 - -eval instant at 5m trigy atan2 trigNaN - {} NaN - -eval instant at 5m 10 atan2 20 - 0.4636476090008061 - -eval instant at 5m 10 atan2 NaN - NaN diff --git a/pkg/streamingpromql/testing.go b/pkg/streamingpromql/testing.go index 36fd14826c..35c51dec88 100644 --- a/pkg/streamingpromql/testing.go +++ b/pkg/streamingpromql/testing.go @@ -3,6 +3,7 @@ package streamingpromql import ( + "math" "time" "github.com/prometheus/prometheus/promql" @@ -12,7 +13,7 @@ func NewTestEngineOpts() promql.EngineOpts { return promql.EngineOpts{ Logger: nil, Reg: nil, - MaxSamples: 50000000, + MaxSamples: math.MaxInt, Timeout: 100 * time.Second, EnableAtModifier: true, EnableNegativeOffset: true, diff --git a/tools/benchmark-query-engine/main.go b/tools/benchmark-query-engine/main.go index 2ef52290fa..f9ab6b7a6b 100644 --- a/tools/benchmark-query-engine/main.go +++ b/tools/benchmark-query-engine/main.go @@ -44,6 +44,8 @@ type app struct { testFilter string listTests bool justRunIngester bool + cpuProfilePath string + memProfilePath string } func (a *app) run() error { @@ -62,6 +64,16 @@ func (a *app) run() error { return nil } + if a.cpuProfilePath != "" || a.memProfilePath != "" { + if a.count != 1 { + return fmt.Errorf("must run exactly one iteration when emitting profile, but have -count=%d", a.count) + } + + if len(filteredNames) != 1 { + return fmt.Errorf("must select exactly one benchmark with -bench when emitting profile, but have %v benchmarks selected", len(filteredNames)) + } + } + if err := a.findBenchmarkPackageDir(); err != nil { return fmt.Errorf("could not find engine package directory: %w", err) } @@ -134,6 +146,8 @@ func (a *app) parseArgs() error { flag.BoolVar(&a.listTests, "list", false, "list known benchmarks and exit") flag.BoolVar(&a.justRunIngester, "start-ingester", false, "start ingester and wait, run no benchmarks") flag.StringVar(&a.ingesterAddress, "use-existing-ingester", "", "use existing ingester rather than creating a new one") + flag.StringVar(&a.cpuProfilePath, "cpuprofile", "", "write CPU profile to file, only supported when running a single iteration of one benchmark") + flag.StringVar(&a.memProfilePath, "memprofile", "", "write memory profile to file, only supported when running a single iteration of one benchmark") if err := flagext.ParseFlagsWithoutArguments(flag.CommandLine); err != nil { fmt.Printf("%v\n", err) @@ -287,7 +301,19 @@ func (a *app) filteredTestCaseNames() ([]string, error) { } func (a *app) runTestCase(name string, printBenchmarkHeader bool) error { - cmd := exec.Command(a.binaryPath, "-test.bench="+regexp.QuoteMeta(name), "-test.run=NoTestsWillMatchThisPattern", "-test.benchmem") + args := []string{ + "-test.bench=" + regexp.QuoteMeta(name), "-test.run=NoTestsWillMatchThisPattern", "-test.benchmem", + } + + if a.cpuProfilePath != "" { + args = append(args, "-test.cpuprofile="+a.cpuProfilePath) + } + + if a.memProfilePath != "" { + args = append(args, "-test.memprofile="+a.memProfilePath) + } + + cmd := exec.Command(a.binaryPath, args...) buf := &bytes.Buffer{} cmd.Stdout = buf cmd.Stderr = os.Stderr @@ -295,6 +321,7 @@ func (a *app) runTestCase(name string, printBenchmarkHeader bool) error { cmd.Env = append(cmd.Env, "STREAMING_PROMQL_ENGINE_BENCHMARK_SKIP_COMPARE_RESULTS=true") if err := cmd.Run(); err != nil { + slog.Warn("output from failed command", "output", buf.String()) return fmt.Errorf("executing command failed: %w", err) }