Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming PromQL engine: binary arithmetic operations with one-to-one matching #8096

Merged
merged 45 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
dfbd2c6
Add some test cases
charleskorn May 6, 2024
74f17a9
Enable upstream test cases for binary operators with one-to-one match…
charleskorn May 6, 2024
f909d55
Fix invalid test case
charleskorn May 8, 2024
3cba572
Make it clear that points are expected to be in timestamp order with …
charleskorn May 8, 2024
9ed7961
Enable relevant benchmarks
charleskorn May 8, 2024
3ef9607
Enable test cases for atan2
charleskorn May 8, 2024
67414c5
Initial (WIP) version of one-to-one matching
charleskorn May 8, 2024
81fd3e9
Update comment
charleskorn May 8, 2024
a24f632
Fix issue where `on` and `ignoring` are ignored
charleskorn May 8, 2024
a0c966e
Use correct terminology
charleskorn May 8, 2024
8183253
Fix issue where errors are lost if result is a matrix
charleskorn May 8, 2024
e2430bc
Add support for case where multiple different series contribute to on…
charleskorn May 8, 2024
c8438af
Add comment explaining when slices are returned to the pool
charleskorn May 8, 2024
1152fb7
Add further comment
charleskorn May 8, 2024
b9c6770
Remove redundant checks
charleskorn May 9, 2024
29698cd
Log output from benchmark binary if it fails
charleskorn May 9, 2024
dedf3e4
Don't limit the number of samples loaded in a query
charleskorn May 9, 2024
84b3a8e
Return point slices to the pool once we're done with them
charleskorn May 9, 2024
7465322
Split `SeriesMetadata` into smaller methods
charleskorn May 9, 2024
55fd653
Clarify comment
charleskorn May 9, 2024
c2d4946
Add extra benchmark
charleskorn May 9, 2024
ead5557
Add comment explaining tradeoff
charleskorn May 9, 2024
5162a1d
Rename `binaryOperationSeriesPair` to `binaryOperationOutputSeries`
charleskorn May 9, 2024
236f161
Add benchmark for the case where one side of a binary operation has m…
charleskorn May 9, 2024
7834b9e
Further simplify `SeriesMetadata` by moving more logic into `computeO…
charleskorn May 9, 2024
5555bb7
Sort output series to minimise the number of input series we need to …
charleskorn May 9, 2024
dc1457e
Add support for capturing CPU and memory profiles while running bench…
charleskorn May 9, 2024
9d4bc14
Reuse slices when computing result.
charleskorn May 9, 2024
825b4dc
Use bucketed pools with factor 2 rather than factor 10.
charleskorn May 10, 2024
957eb35
Merge branch 'main' into charleskorn/binary-operators
charleskorn May 10, 2024
e07463c
Add changelog entry
charleskorn May 10, 2024
bc557db
Fix flaky test.
charleskorn May 10, 2024
a23be64
Remove outdated comments
charleskorn May 10, 2024
7241678
Don't bother buffering series that won't be used.
charleskorn May 10, 2024
3c971f4
Clarify expected behaviour of Close
charleskorn May 10, 2024
053f3d3
Move `FallbackEngine` and `NotSupportedError` to their own package.
charleskorn May 10, 2024
3f1aa3a
Consolidate check for supported operator into one place
charleskorn May 10, 2024
20344f5
Use string of group labels as map key when constructing output series
charleskorn May 14, 2024
7751e87
Merge branch 'main' into charleskorn/binary-operators
charleskorn May 14, 2024
dac8815
Fix indentation
charleskorn May 14, 2024
a737a98
Add tests for sorting behaviour
charleskorn May 14, 2024
108ce4e
Reduce duplication in `favourRightSideSorter` and `favourRightSideSor…
charleskorn May 14, 2024
98c8f60
Fix linting
charleskorn May 14, 2024
5818cd1
Add tests for `binaryOperationSeriesBuffer`.
charleskorn May 14, 2024
61b59a1
Add test cases for `on` and `ignoring` with multiple labels
charleskorn May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/grafana/mimir/pkg/storage/chunk"
"github.com/grafana/mimir/pkg/storage/lazyquery"
"github.com/grafana/mimir/pkg/streamingpromql"
"github.com/grafana/mimir/pkg/streamingpromql/compat"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/activitytracker"
"github.com/grafana/mimir/pkg/util/limiter"
Expand Down Expand Up @@ -170,7 +171,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor

if cfg.EnablePromQLEngineFallback {
prometheusEngine := promql.NewEngine(opts)
eng = streamingpromql.NewEngineWithFallback(streamingEngine, prometheusEngine, reg, logger)
eng = compat.NewEngineWithFallback(streamingEngine, prometheusEngine, reg, logger)
} else {
eng = streamingEngine
}
Expand Down
30 changes: 20 additions & 10 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,20 @@ func TestCases(metricSizes []int) []BenchCase {
// Expr: "-a_X",
//},
//// Binary operators.
//{
// Expr: "a_X - b_X",
//},
//{
// Expr: "a_X - b_X",
// Steps: 10000,
//},
{
Expr: "a_X - b_X",
},
{
Expr: "a_X - b_X",
Steps: 10000,
},
// Test the case where one side of a binary operation has many more series than the other.
{
Expr: `a_100{l=~"[13579]."} - b_100`,
},
{
Expr: `a_2000{l=~"1..."} - b_2000`,
},
//{
// Expr: "a_X and b_X{l=~'.*[0-4]$'}",
//},
Expand Down Expand Up @@ -163,9 +170,12 @@ func TestCases(metricSizes []int) []BenchCase {
// Expr: "topk(5, a_X)",
//},
//// Combinations.
//{
// Expr: "rate(a_X[1m]) + rate(b_X[1m])",
//},
{
Expr: "rate(a_X[1m]) + rate(b_X[1m])",
},
{
Expr: "sum(a_X + b_X)",
},
{
Expr: "sum by (le)(rate(h_X[1m]))",
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package streamingpromql
package compat

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package streamingpromql
package compat

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package streamingpromql
package compat

import (
"context"
Expand Down
33 changes: 20 additions & 13 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/streamingpromql/compat"
)

func TestUnsupportedPromQLFeatures(t *testing.T) {
Expand All @@ -26,29 +28,34 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"a + b": "PromQL expression type *parser.BinaryExpr",
"1 + 2": "scalar value as top-level expression",
"metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr",
"1": "scalar value as top-level expression",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"sum without(l) (metric{})": "grouping with 'without'",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr",
"avg_over_time(metric{}[5m])": "'avg_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
"1 + 2": "scalar value as top-level expression",
"1 + metric{}": "binary expression with scalars",
"metric{} + 1": "binary expression with scalars",
"metric{} < other_metric{}": "binary expression with '<'",
"metric{} or other_metric{}": "binary expression with many-to-many matching",
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",
"metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching",
"1": "scalar value as top-level expression",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"sum without(l) (metric{})": "grouping with 'without'",
"rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr",
"avg_over_time(metric{}[5m])": "'avg_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
}

for expression, expectedError := range unsupportedExpressions {
t.Run(expression, func(t *testing.T) {
qry, err := engine.NewRangeQuery(ctx, nil, nil, expression, time.Now().Add(-time.Hour), time.Now(), time.Minute)
require.Error(t, err)
require.ErrorIs(t, err, NotSupportedError{})
require.ErrorIs(t, err, compat.NotSupportedError{})
require.EqualError(t, err, "not supported by streaming engine: "+expectedError)
require.Nil(t, qry)

qry, err = engine.NewInstantQuery(ctx, nil, nil, expression, time.Now())
require.Error(t, err)
require.ErrorIs(t, err, NotSupportedError{})
require.ErrorIs(t, err, compat.NotSupportedError{})
require.EqualError(t, err, "not supported by streaming engine: "+expectedError)
require.Nil(t, qry)
})
Expand All @@ -65,7 +72,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
t.Run(expression, func(t *testing.T) {
qry, err := engine.NewInstantQuery(ctx, nil, nil, expression, time.Now())
require.Error(t, err)
require.ErrorIs(t, err, NotSupportedError{})
require.ErrorIs(t, err, compat.NotSupportedError{})
require.EqualError(t, err, "not supported by streaming engine: "+expectedError)
require.Nil(t, qry)
})
Expand Down
16 changes: 0 additions & 16 deletions pkg/streamingpromql/operator/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,3 @@ func labelsToSeriesMetadata(lbls []labels.Labels) []SeriesMetadata {

return m
}

type testOperator struct {
series []labels.Labels
}

func (t *testOperator) SeriesMetadata(_ context.Context) ([]SeriesMetadata, error) {
return labelsToSeriesMetadata(t.series), nil
}

func (t *testOperator) NextSeries(_ context.Context) (InstantVectorSeriesData, error) {
panic("NextSeries() not supported")
}

func (t *testOperator) Close() {
panic("Close() not supported")
}