Skip to content

Commit

Permalink
simplify+pointer for shard chunks
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Apr 18, 2024
1 parent 4639cfd commit a764c11
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 61 deletions.
73 changes: 15 additions & 58 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,22 +394,11 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(

case DownstreamSampleExpr:
// downstream to a querier
var shards Shards
if e.shard != nil {
shards = append(shards, e.shard.Shard)
params = ParamsWithChunkOverrides{
Params: params,
StoreChunksOverride: &e.shard.chunks,
}
}
acc := NewBufferedAccumulator(1)
results, err := ev.Downstream(ctx, []DownstreamQuery{{
Params: ParamsWithShardsOverride{
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: e.SampleExpr,
},
ShardsOverride: shards.Encode(),
Params: ParamsWithExpressionOverride{
Params: ParamOverridesFromShard(params, e.shard),
ExpressionOverride: e.SampleExpr,
},
}}, acc)
if err != nil {
Expand All @@ -422,16 +411,10 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
var queries []DownstreamQuery
for cur != nil {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: cur.DownstreamSampleExpr.SampleExpr},
}
if shard := cur.DownstreamSampleExpr.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: ParamsWithChunkOverrides{
Params: qry.Params,
StoreChunksOverride: &shard.chunks,
},
ShardsOverride: Shards{shard.Shard}.Encode(),
}
Params: ParamsWithExpressionOverride{
Params: ParamOverridesFromShard(params, cur.DownstreamSampleExpr.shard),
ExpressionOverride: cur.DownstreamSampleExpr.SampleExpr,
},
}
queries = append(queries, qry)
cur = cur.next
Expand Down Expand Up @@ -464,19 +447,10 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
for _, d := range e.quantileMergeExpr.downstreams {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{
Params: params,
Params: ParamOverridesFromShard(params, d.shard),
ExpressionOverride: d.SampleExpr,
},
}
if shard := d.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: ParamsWithChunkOverrides{
Params: qry.Params,
StoreChunksOverride: &shard.chunks,
},
ShardsOverride: Shards{shard.Shard}.Encode(),
}
}
queries = append(queries, qry)
}
}
Expand Down Expand Up @@ -512,22 +486,11 @@ func (ev *DownstreamEvaluator) NewIterator(
switch e := expr.(type) {
case DownstreamLogSelectorExpr:
// downstream to a querier
var shards Shards
if e.shard != nil {
shards = append(shards, e.shard.Shard)
params = ParamsWithChunkOverrides{
Params: params,
StoreChunksOverride: &e.shard.chunks,
}
}
acc := NewStreamAccumulator(params)
results, err := ev.Downstream(ctx, []DownstreamQuery{{
Params: ParamsWithShardsOverride{
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: e.LogSelectorExpr,
},
ShardsOverride: shards.Encode(),
Params: ParamsWithExpressionOverride{
Params: ParamOverridesFromShard(params, e.shard),
ExpressionOverride: e.LogSelectorExpr,
},
}}, acc)
if err != nil {
Expand All @@ -540,16 +503,10 @@ func (ev *DownstreamEvaluator) NewIterator(
var queries []DownstreamQuery
for cur != nil {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: cur.DownstreamLogSelectorExpr.LogSelectorExpr},
}
if shard := cur.DownstreamLogSelectorExpr.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: ParamsWithChunkOverrides{
Params: qry.Params,
StoreChunksOverride: &shard.chunks,
},
ShardsOverride: Shards{shard.Shard}.Encode(),
}
Params: ParamsWithExpressionOverride{
Params: ParamOverridesFromShard(params, cur.DownstreamLogSelectorExpr.shard),
ExpressionOverride: cur.DownstreamLogSelectorExpr.LogSelectorExpr,
},
}
queries = append(queries, qry)
cur = cur.next
Expand Down
20 changes: 20 additions & 0 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,26 @@ func (p ParamsWithChunkOverrides) GetStoreChunks() *logproto.ChunkRefGroup {
return p.StoreChunksOverride
}

func ParamOverridesFromShard(base Params, shard *ShardWithChunkRefs) (result Params) {
if shard == nil {
return base
}

result = ParamsWithShardsOverride{
Params: base,
ShardsOverride: Shards{shard.Shard}.Encode(),
}

if shard.chunks != nil {
result = ParamsWithChunkOverrides{
Params: result,
StoreChunksOverride: shard.chunks,
}
}

return result
}

// Sortable logql contain sort or sort_desc.
func Sortable(q Params) (bool, error) {
var sortable bool
Expand Down
6 changes: 3 additions & 3 deletions pkg/logql/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s DynamicBoundsStrategy) Shards(expr syntax.Expr) ([]ShardWithChunkRefs, u
maxBytes = max(maxBytes, shard.Stats.Bytes)
}
if len(chunks) > 0 {
x.chunks = chunks[i]
x.chunks = &chunks[i]
}
res = append(res, x)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s PowerOfTwoStrategy) Shards(expr syntax.Expr) ([]ShardWithChunkRefs, uint
// and are used to precompute chunk refs for each group
type ShardWithChunkRefs struct {
Shard
chunks logproto.ChunkRefGroup
chunks *logproto.ChunkRefGroup
}

// Shard represents a shard annotation
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s Shard) Bind(chunks *logproto.ChunkRefGroup) *ShardWithChunkRefs {
Shard: s,
}
if chunks != nil {
res.chunks = *chunks
res.chunks = chunks
}
return res
}
Expand Down

0 comments on commit a764c11

Please sign in to comment.