Skip to content

Commit

Permalink
refine offset expr
Browse files Browse the repository at this point in the history
  • Loading branch information
garrettlish committed Mar 11, 2021
1 parent 82a7e84 commit 05e14f9
Show file tree
Hide file tree
Showing 5 changed files with 682 additions and 516 deletions.
20 changes: 6 additions & 14 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func newUnwrapExpr(id string, operation string) *unwrapExpr {
type logRange struct {
left LogSelectorExpr
interval time.Duration
offset time.Duration
offset *offsetExpr

unwrap *unwrapExpr
}
Expand All @@ -512,16 +512,20 @@ func (r logRange) String() string {
sb.WriteString(r.unwrap.String())
}
sb.WriteString(fmt.Sprintf("[%v]", model.Duration(r.interval)))
if r.offset != nil {
sb.WriteString(r.offset.String())
}
return sb.String()
}

func (r *logRange) Shardable() bool { return r.left.Shardable() }

func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr) *logRange {
func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr, o *offsetExpr) *logRange {
return &logRange{
left: left,
interval: interval,
unwrap: u,
offset: o,
}
}

Expand Down Expand Up @@ -641,16 +645,11 @@ type rangeAggregationExpr struct {
operation string

params *float64
offset *offsetExpr
grouping *grouping
implicit
}

func newRangeAggregationExpr(left *logRange, operation string, gr *grouping, stringParams *string) SampleExpr {
return newRangeAggregationExprWithOffset(left, operation, nil, gr, stringParams)
}

func newRangeAggregationExprWithOffset(left *logRange, operation string, offset *offsetExpr, gr *grouping, stringParams *string) SampleExpr {
var params *float64
if stringParams != nil {
if operation != OpRangeTypeQuantile {
Expand All @@ -668,13 +667,9 @@ func newRangeAggregationExprWithOffset(left *logRange, operation string, offset
panic(newParseError(fmt.Sprintf("parameter required for operation %s", operation), 0, 0))
}
}
if offset != nil {
left.offset = offset.offset
}
e := &rangeAggregationExpr{
left: left,
operation: operation,
offset: offset,
grouping: gr,
params: params,
}
Expand Down Expand Up @@ -722,9 +717,6 @@ func (e *rangeAggregationExpr) String() string {
sb.WriteString(",")
}
sb.WriteString(e.left.String())
if e.offset != nil {
sb.WriteString(e.offset.String())
}
sb.WriteString(")")
if e.grouping != nil {
sb.WriteString(e.grouping.String())
Expand Down
28 changes: 22 additions & 6 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,18 @@ func (ev *DefaultEvaluator) StepEvaluator(
// if range expression is wrapped with a vector expression
// we should send the vector expression for allowing reducing labels at the source.
nextEv = SampleEvaluatorFunc(func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) {
var (
start = q.Start().Add(-rangExpr.left.interval)
end = q.End()
)
if rangExpr.left.offset != nil {
start = start.Add(-rangExpr.left.offset.offset)
end = end.Add(-rangExpr.left.offset.offset)
}
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-rangExpr.left.interval).Add(-rangExpr.left.offset),
End: q.End().Add(-rangExpr.left.offset),
Start: start,
End: end,
Selector: e.String(), // intentionally send the the vector for reducing labels.
Shards: q.Shards(),
},
Expand All @@ -189,10 +197,18 @@ func (ev *DefaultEvaluator) StepEvaluator(
}
return vectorAggEvaluator(ctx, nextEv, e, q)
case *rangeAggregationExpr:
var (
start = q.Start().Add(-e.left.interval)
end = q.End()
)
if e.left.offset != nil {
start = start.Add(-e.left.offset.offset)
end = end.Add(-e.left.offset.offset)
}
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-e.left.interval).Add(-e.left.offset),
End: q.End().Add(-e.left.offset),
Start: start,
End: end,
Selector: expr.String(),
Shards: q.Shards(),
},
Expand Down Expand Up @@ -413,8 +429,8 @@ func rangeAggEvaluator(
return nil, err
}
var offset int64
if expr.offset != nil {
offset = expr.offset.offset.Nanoseconds()
if expr.left.offset != nil {
offset = expr.left.offset.offset.Nanoseconds()
}
iter := newRangeVectorIterator(
it,
Expand Down
42 changes: 25 additions & 17 deletions pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -135,19 +135,31 @@ logExpr:
;

logRangeExpr:
selector RANGE { $$ = newLogRange(newMatcherExpr($1), $2, nil) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $4, nil) }
| selector RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2 , $3) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4 , $5) }
| selector unwrapExpr RANGE { $$ = newLogRange(newMatcherExpr($1), $3, $2 ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $5, $3 ) }
| selector pipelineExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil ) }
| selector pipelineExpr unwrapExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4) }
| selector RANGE pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, nil) }
| selector RANGE pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, $4 ) }
| OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS { $$ = $2 }
selector RANGE { $$ = newLogRange(newMatcherExpr($1), $2, nil, nil ) }
| selector RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($1), $2, nil, $3 ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $4, nil, nil ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($2), $4, nil, $5 ) }
| selector RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2, $3, nil ) }
| selector RANGE offsetExpr unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2, $4, $3 ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4, $5, nil ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE offsetExpr unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4, $6, $5 ) }
| selector unwrapExpr RANGE { $$ = newLogRange(newMatcherExpr($1), $3, $2, nil ) }
| selector unwrapExpr RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($1), $3, $2, $4 ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $5, $3, nil ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($2), $5, $3, $6 ) }
| selector pipelineExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil, nil ) }
| selector pipelineExpr RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil, $4 ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil, $6 ) }
| selector pipelineExpr unwrapExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3, nil ) }
| selector pipelineExpr unwrapExpr RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3, $5 ) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4, $7 ) }
| selector RANGE pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, nil, nil) }
| selector RANGE offsetExpr pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $4), $2, nil, $3 ) }
| selector RANGE pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, $4, nil ) }
| selector RANGE offsetExpr pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $4), $2, $5, $3 ) }
| OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS { $$ = $2 }
| logRangeExpr error
;

Expand All @@ -168,10 +180,6 @@ rangeAggregationExpr:
| rangeOp OPEN_PARENTHESIS NUMBER COMMA logRangeExpr CLOSE_PARENTHESIS { $$ = newRangeAggregationExpr($5, $1, nil, &$3) }
| rangeOp OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS grouping { $$ = newRangeAggregationExpr($3, $1, $5, nil) }
| rangeOp OPEN_PARENTHESIS NUMBER COMMA logRangeExpr CLOSE_PARENTHESIS grouping { $$ = newRangeAggregationExpr($5, $1, $7, &$3) }
| rangeOp OPEN_PARENTHESIS logRangeExpr offsetExpr CLOSE_PARENTHESIS { $$ = newRangeAggregationExprWithOffset($3, $1, $4, nil, nil) }
| rangeOp OPEN_PARENTHESIS NUMBER COMMA logRangeExpr offsetExpr CLOSE_PARENTHESIS { $$ = newRangeAggregationExprWithOffset($5, $1, $6, nil, &$3) }
| rangeOp OPEN_PARENTHESIS logRangeExpr offsetExpr CLOSE_PARENTHESIS grouping { $$ = newRangeAggregationExprWithOffset($3, $1, $4, $6, nil) }
| rangeOp OPEN_PARENTHESIS NUMBER COMMA logRangeExpr offsetExpr CLOSE_PARENTHESIS grouping { $$ = newRangeAggregationExprWithOffset($5, $1, $6, $8, &$3) }
;

vectorAggregationExpr:
Expand Down
Loading

0 comments on commit 05e14f9

Please sign in to comment.