diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 80764de70892..2e3dbfd014a1 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -394,22 +394,11 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( case DownstreamSampleExpr: // downstream to a querier - var shards Shards - if e.shard != nil { - shards = append(shards, e.shard.Shard) - params = ParamsWithChunkOverrides{ - Params: params, - StoreChunksOverride: &e.shard.chunks, - } - } acc := NewBufferedAccumulator(1) results, err := ev.Downstream(ctx, []DownstreamQuery{{ - Params: ParamsWithShardsOverride{ - Params: ParamsWithExpressionOverride{ - Params: params, - ExpressionOverride: e.SampleExpr, - }, - ShardsOverride: shards.Encode(), + Params: ParamsWithExpressionOverride{ + Params: ParamOverridesFromShard(params, e.shard), + ExpressionOverride: e.SampleExpr, }, }}, acc) if err != nil { @@ -422,16 +411,10 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( var queries []DownstreamQuery for cur != nil { qry := DownstreamQuery{ - Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: cur.DownstreamSampleExpr.SampleExpr}, - } - if shard := cur.DownstreamSampleExpr.shard; shard != nil { - qry.Params = ParamsWithShardsOverride{ - Params: ParamsWithChunkOverrides{ - Params: qry.Params, - StoreChunksOverride: &shard.chunks, - }, - ShardsOverride: Shards{shard.Shard}.Encode(), - } + Params: ParamsWithExpressionOverride{ + Params: ParamOverridesFromShard(params, cur.DownstreamSampleExpr.shard), + ExpressionOverride: cur.DownstreamSampleExpr.SampleExpr, + }, } queries = append(queries, qry) cur = cur.next @@ -464,19 +447,10 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( for _, d := range e.quantileMergeExpr.downstreams { qry := DownstreamQuery{ Params: ParamsWithExpressionOverride{ - Params: params, + Params: ParamOverridesFromShard(params, d.shard), ExpressionOverride: d.SampleExpr, }, } - if shard := d.shard; shard != nil { - qry.Params = ParamsWithShardsOverride{ - Params: ParamsWithChunkOverrides{ - Params: qry.Params, - StoreChunksOverride: &shard.chunks, - }, - ShardsOverride: Shards{shard.Shard}.Encode(), - } - } queries = append(queries, qry) } } @@ -512,22 +486,11 @@ func (ev *DownstreamEvaluator) NewIterator( switch e := expr.(type) { case DownstreamLogSelectorExpr: // downstream to a querier - var shards Shards - if e.shard != nil { - shards = append(shards, e.shard.Shard) - params = ParamsWithChunkOverrides{ - Params: params, - StoreChunksOverride: &e.shard.chunks, - } - } acc := NewStreamAccumulator(params) results, err := ev.Downstream(ctx, []DownstreamQuery{{ - Params: ParamsWithShardsOverride{ - Params: ParamsWithExpressionOverride{ - Params: params, - ExpressionOverride: e.LogSelectorExpr, - }, - ShardsOverride: shards.Encode(), + Params: ParamsWithExpressionOverride{ + Params: ParamOverridesFromShard(params, e.shard), + ExpressionOverride: e.LogSelectorExpr, }, }}, acc) if err != nil { @@ -540,16 +503,10 @@ func (ev *DownstreamEvaluator) NewIterator( var queries []DownstreamQuery for cur != nil { qry := DownstreamQuery{ - Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: cur.DownstreamLogSelectorExpr.LogSelectorExpr}, - } - if shard := cur.DownstreamLogSelectorExpr.shard; shard != nil { - qry.Params = ParamsWithShardsOverride{ - Params: ParamsWithChunkOverrides{ - Params: qry.Params, - StoreChunksOverride: &shard.chunks, - }, - ShardsOverride: Shards{shard.Shard}.Encode(), - } + Params: ParamsWithExpressionOverride{ + Params: ParamOverridesFromShard(params, cur.DownstreamLogSelectorExpr.shard), + ExpressionOverride: cur.DownstreamLogSelectorExpr.LogSelectorExpr, + }, } queries = append(queries, qry) cur = cur.next diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 9370c614b38c..7c1f45021731 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -157,6 +157,26 @@ func (p ParamsWithChunkOverrides) GetStoreChunks() *logproto.ChunkRefGroup { return p.StoreChunksOverride } +func ParamOverridesFromShard(base Params, shard *ShardWithChunkRefs) (result Params) { + if shard == nil { + return base + } + + result = ParamsWithShardsOverride{ + Params: base, + ShardsOverride: Shards{shard.Shard}.Encode(), + } + + if shard.chunks != nil { + result = ParamsWithChunkOverrides{ + Params: result, + StoreChunksOverride: shard.chunks, + } + } + + return result +} + // Sortable logql contain sort or sort_desc. func Sortable(q Params) (bool, error) { var sortable bool diff --git a/pkg/logql/shards.go b/pkg/logql/shards.go index d280777c0f60..4ada33587ae2 100644 --- a/pkg/logql/shards.go +++ b/pkg/logql/shards.go @@ -163,7 +163,7 @@ func (s PowerOfTwoStrategy) Shards(expr syntax.Expr) ([]ShardWithChunkRefs, uint // and are used to precompute chunk refs for each group type ShardWithChunkRefs struct { Shard - chunks logproto.ChunkRefGroup + chunks *logproto.ChunkRefGroup } // Shard represents a shard annotation @@ -208,7 +208,7 @@ func (s Shard) Bind(chunks *logproto.ChunkRefGroup) *ShardWithChunkRefs { Shard: s, } if chunks != nil { - res.chunks = *chunks + res.chunks = chunks } return res }