Skip to content

Commit

Permalink
fix(query sharding): Generalize avg -> sum/count sharding using exist…
Browse files Browse the repository at this point in the history
…ing binop mapper (#12599)
  • Loading branch information
MasslessParticle committed Apr 12, 2024
1 parent 91dab51 commit 11e7687
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 31 deletions.
38 changes: 13 additions & 25 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,33 +240,21 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr

case syntax.OpTypeAvg:
// avg(x) -> sum(x)/count(x), which is parallelizable
lhs, lhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
}, r, false)
if err != nil {
return nil, 0, err
}

rhs, rhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeCount,
}, r, false)
if err != nil {
return nil, 0, err
binOp := &syntax.BinOpExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
},
RHS: &syntax.VectorAggregationExpr{
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeCount,
},
Op: syntax.OpTypeDiv,
}

// We take the maximum bytes per shard of both sides of the operation
bytesPerShard := uint64(max(int(lhsBytesPerShard), int(rhsBytesPerShard)))

return &syntax.BinOpExpr{
SampleExpr: lhs,
RHS: rhs,
Op: syntax.OpTypeDiv,
}, bytesPerShard, nil

return m.mapBinOpExpr(binOp, r, topLevel)
case syntax.OpTypeCount:
if syntax.ReducesLabels(expr.Left) {
// skip sharding optimizations at this level. If labels are reduced,
Expand Down
24 changes: 18 additions & 6 deletions pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ func TestMappingStrings(t *testing.T) {
)
)`,
},
{
in: `avg by(foo_extracted) (quantile_over_time(0.95, {foo="baz"} | logfmt | unwrap foo_extracted | __error__="" [5m]))`,
out: `(
sum by (foo_extracted) (
downstream<sumby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=0_of_2>++downstream<sumby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=1_of_2>
)
/
sum by (foo_extracted) (
downstream<countby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=0_of_2>++downstream<countby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=1_of_2>
)
)`,
},
{
in: `count(rate({foo="bar"} | json | keep foo [5m]))`,
out: `count(
Expand Down Expand Up @@ -283,12 +295,12 @@ func TestMappingStrings(t *testing.T) {
},
{
in: `sum without (a) (
label_replace(
sum without (b) (
rate({foo="bar"}[5m])
),
"baz", "buz", "foo", "(.*)"
)
label_replace(
sum without (b) (
rate({foo="bar"}[5m])
),
"baz", "buz", "foo", "(.*)"
)
)`,
out: `sum without(a) (
label_replace(
Expand Down

0 comments on commit 11e7687

Please sign in to comment.