From 11e768726fb25f905de880ad2f5495b0f7fba156 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Fri, 12 Apr 2024 11:46:23 -0600 Subject: [PATCH] fix(query sharding): Generalize avg -> sum/count sharding using existing binop mapper (#12599) --- pkg/logql/shardmapper.go | 38 ++++++++++++----------------------- pkg/logql/shardmapper_test.go | 24 ++++++++++++++++------ 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index fbd0dbaa83eb..df7c62a895bb 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -240,33 +240,21 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr case syntax.OpTypeAvg: // avg(x) -> sum(x)/count(x), which is parallelizable - lhs, lhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{ - Left: expr.Left, - Grouping: expr.Grouping, - Operation: syntax.OpTypeSum, - }, r, false) - if err != nil { - return nil, 0, err - } - - rhs, rhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{ - Left: expr.Left, - Grouping: expr.Grouping, - Operation: syntax.OpTypeCount, - }, r, false) - if err != nil { - return nil, 0, err + binOp := &syntax.BinOpExpr{ + SampleExpr: &syntax.VectorAggregationExpr{ + Left: expr.Left, + Grouping: expr.Grouping, + Operation: syntax.OpTypeSum, + }, + RHS: &syntax.VectorAggregationExpr{ + Left: expr.Left, + Grouping: expr.Grouping, + Operation: syntax.OpTypeCount, + }, + Op: syntax.OpTypeDiv, } - // We take the maximum bytes per shard of both sides of the operation - bytesPerShard := uint64(max(int(lhsBytesPerShard), int(rhsBytesPerShard))) - - return &syntax.BinOpExpr{ - SampleExpr: lhs, - RHS: rhs, - Op: syntax.OpTypeDiv, - }, bytesPerShard, nil - + return m.mapBinOpExpr(binOp, r, topLevel) case syntax.OpTypeCount: if syntax.ReducesLabels(expr.Left) { // skip sharding optimizations at this level. If labels are reduced, diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index 355f839bac55..f81f90a13778 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -231,6 +231,18 @@ func TestMappingStrings(t *testing.T) { ) )`, }, + { + in: `avg by(foo_extracted) (quantile_over_time(0.95, {foo="baz"} | logfmt | unwrap foo_extracted | __error__="" [5m]))`, + out: `( + sum by (foo_extracted) ( + downstream++downstream + ) + / + sum by (foo_extracted) ( + downstream++downstream + ) + )`, + }, { in: `count(rate({foo="bar"} | json | keep foo [5m]))`, out: `count( @@ -283,12 +295,12 @@ func TestMappingStrings(t *testing.T) { }, { in: `sum without (a) ( - label_replace( - sum without (b) ( - rate({foo="bar"}[5m]) - ), - "baz", "buz", "foo", "(.*)" - ) + label_replace( + sum without (b) ( + rate({foo="bar"}[5m]) + ), + "baz", "buz", "foo", "(.*)" + ) )`, out: `sum without(a) ( label_replace(