Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement offset modifier for range vector aggregation in LogQL #3455

Merged
merged 6 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/sources/logql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ The same rules that apply for [Prometheus Label Selectors](https://prometheus.io

**Important note:** The `=~` regex operator is fully anchored, meaning regex must match against the *entire* string, including newlines. The regex `.` character does not match newlines by default. If you want the regex dot character to match newlines you can use the single-line flag, like so: `(?s)search_term.+` matches `search_term\n`.

#### Offset modifier
The offset modifier allows changing the time offset for individual range vectors in a query.

For example, the following expression counts all the logs within the last ten minutes to five minutes rather than last five minutes for the MySQL job. Note that the `offset` modifier always needs to follow the range vector selector immediately.
```logql
count_over_time({job="mysql"}[5m] offset 5m) // GOOD
count_over_time({job="mysql"}[5m]) offset 5m // INVALID
```

### Log Pipeline

A log pipeline can be appended to a log stream selector to further process and filter log streams. It usually is composed of one or multiple expressions, each expressions is executed in sequence for each log line. If an expression filters out a log line, the pipeline will stop at this point and start processing the next line.
Expand Down
24 changes: 23 additions & 1 deletion pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ func newUnwrapExpr(id string, operation string) *unwrapExpr {
type logRange struct {
left LogSelectorExpr
interval time.Duration
offset *offsetExpr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small suggestion: it might be easier to simply have offset as time.Duration. since offset=0 is the same as a nil offset, we could easily do

start := q.Start().Add(-rangExpr.left.interval).Add(-rangExpr.left.offset.offset)

rather than using the more clunky if statements:

if rangExpr.left.offset != nil {
					start := start.Add(-rangExpr.left.offset.offset)
					end = end.Add(-rangExpr.left.offset.offset)
				}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, will make the change


unwrap *unwrapExpr
}
Expand All @@ -511,16 +512,36 @@ func (r logRange) String() string {
sb.WriteString(r.unwrap.String())
}
sb.WriteString(fmt.Sprintf("[%v]", model.Duration(r.interval)))
if r.offset != nil {
sb.WriteString(r.offset.String())
}
return sb.String()
}

func (r *logRange) Shardable() bool { return r.left.Shardable() }

func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr) *logRange {
func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr, o *offsetExpr) *logRange {
return &logRange{
left: left,
interval: interval,
unwrap: u,
offset: o,
}
}

type offsetExpr struct {
offset time.Duration
}

func (o *offsetExpr) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf(" %s %s", OpOffset, o.offset.String()))
return sb.String()
}

func newOffsetExpr(offset time.Duration) *offsetExpr {
return &offsetExpr{
offset: offset,
}
}

Expand Down Expand Up @@ -582,6 +603,7 @@ const (

OpPipe = "|"
OpUnwrap = "unwrap"
OpOffset = "offset"

// conversion Op
OpConvBytes = "bytes"
Expand Down
24 changes: 24 additions & 0 deletions pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,27 @@ func Test_SampleExpr_String(t *testing.T) {
for _, tc := range []string{
`rate( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] offset 10m )`,
`sum without(a) ( rate ( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum by(a) (rate( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum(count_over_time({job="mysql"}[5m]))`,
`sum(count_over_time({job="mysql"}[5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | json [5m]))`,
`sum(count_over_time({job="mysql"} | json [5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | logfmt [5m]))`,
`sum(count_over_time({job="mysql"} | logfmt [5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | unpack | json [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m] offset 10m))`,
`topk(10,sum(rate({region="us-east1"}[5m])) by (name))`,
`topk by (name)(10,sum(rate({region="us-east1"}[5m])))`,
`avg( rate( ( {job="nginx"} |= "GET" ) [10s] ) ) by (region)`,
`avg(min_over_time({job="nginx"} |= "GET" | unwrap foo[10s])) by (region)`,
`avg(min_over_time({job="nginx"} |= "GET" | unwrap foo[10s] offset 10m)) by (region)`,
`sum by (cluster) (count_over_time({job="mysql"}[5m]))`,
`sum by (cluster) (count_over_time({job="mysql"}[5m] offset 10m))`,
`sum by (cluster) (count_over_time({job="mysql"}[5m])) / sum by (cluster) (count_over_time({job="postgres"}[5m])) `,
`sum by (cluster) (count_over_time({job="mysql"}[5m] offset 10m)) / sum by (cluster) (count_over_time({job="postgres"}[5m] offset 10m)) `,
`
sum by (cluster) (count_over_time({job="postgres"}[5m])) /
sum by (cluster) (count_over_time({job="postgres"}[5m])) /
Expand All @@ -86,6 +94,8 @@ func Test_SampleExpr_String(t *testing.T) {
)`,
`stdvar_over_time({app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo [5m])`,
`stdvar_over_time({app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo [5m] offset 10m)`,
`sum_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms|unwrap latency [5m])`,
`sum by (job) (
sum_over_time({namespace="tns"} |= "level=error" | json | foo=5 and bar<25ms | unwrap latency[5m])
Expand Down Expand Up @@ -130,6 +140,20 @@ func Test_SampleExpr_String(t *testing.T) {
`10 / (5/2)`,
`10 / (count_over_time({job="postgres"}[5m])/2)`,
`{app="foo"} | json response_status="response.status.code", first_param="request.params[0]"`,
`label_replace(
sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) | __error__!~".*" [5m] offset 1h
)
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m] offset 1h)
),
"foo",
"$1",
"service",
"(.*):.*"
)
`,
} {
t.Run(tc, func(t *testing.T) {
expr, err := ParseExpr(tc)
Expand Down
30 changes: 25 additions & 5 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,18 @@ func (ev *DefaultEvaluator) StepEvaluator(
// if range expression is wrapped with a vector expression
// we should send the vector expression for allowing reducing labels at the source.
nextEv = SampleEvaluatorFunc(func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) {
var (
start = q.Start().Add(-rangExpr.left.interval)
end = q.End()
)
if rangExpr.left.offset != nil {
start = start.Add(-rangExpr.left.offset.offset)
end = end.Add(-rangExpr.left.offset.offset)
}
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-rangExpr.left.interval),
End: q.End(),
Start: start,
End: end,
Selector: e.String(), // intentionally send the the vector for reducing labels.
Shards: q.Shards(),
},
Expand All @@ -189,10 +197,18 @@ func (ev *DefaultEvaluator) StepEvaluator(
}
return vectorAggEvaluator(ctx, nextEv, e, q)
case *rangeAggregationExpr:
var (
start = q.Start().Add(-e.left.interval)
end = q.End()
)
if e.left.offset != nil {
start = start.Add(-e.left.offset.offset)
end = end.Add(-e.left.offset.offset)
}
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-e.left.interval),
End: q.End(),
Start: start,
End: end,
Selector: expr.String(),
Shards: q.Shards(),
},
Expand Down Expand Up @@ -412,11 +428,15 @@ func rangeAggEvaluator(
if err != nil {
return nil, err
}
var offset int64
if expr.left.offset != nil {
offset = expr.left.offset.offset.Nanoseconds()
}
iter := newRangeVectorIterator(
it,
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
q.Start().UnixNano(), q.End().UnixNano(), offset,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simplify this as well. There's no need to make the iterator offset aware if we just pass (start-offset, end-offset) to it.

Copy link
Contributor Author

@garrettlish garrettlish Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@owen-d thanks for your comments. I am passing the offset to iterator is because I'd like to shift the returned samples timestamp from history timestamp to current timestamp, otherwise, the returned samples timestamp are historical timestamp which is not meaningful. Any thoughts? Please advise, thanks!

For example:
Assume current time is 17:00, expression - count_over_time({job="varlogs"} [1m] offset 3h) returns 14:00 samples, but count_over_time({job="varlogs"} [1m]) returns 17:00 samples, then the expression - count_over_time({job="varlogs"} [1m])/count_over_time({job="varlogs"} [1m] offset 3h) > 1.5 cannot work since these sub-expressions are returning inconsistent timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had the same assumption as Owen, eg. you could just pass in the modified start/end.

But now with your explanation I understand why it's like this.

I think you could only offset back L148 only and that would be fine.

ts := r.current / 1e+6

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should add a test with an offset different than zero for the range vector iterator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise LGTM for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, let me add test, thanks @cyriltovena

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cyriltovena added non-zero offset test - 1b8f454, please help review again, thanks!

)
if expr.operation == OpRangeTypeAbsent {
return &absentRangeVectorEvaluator{
Expand Down
44 changes: 30 additions & 14 deletions pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
JSONExpression log.JSONExpression
JSONExpressionList []log.JSONExpression
UnwrapExpr *unwrapExpr
OffsetExpr *offsetExpr
}

%start root
Expand Down Expand Up @@ -90,6 +91,7 @@ import (
%type <JSONExpressionList> jsonExpressionList
%type <UnwrapExpr> unwrapExpr
%type <UnitFilter> unitFilter
%type <OffsetExpr> offsetExpr

%token <bytes> BYTES
%token <str> IDENTIFIER STRING NUMBER
Expand All @@ -98,7 +100,7 @@ import (
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
ABSENT_OVER_TIME LABEL_REPLACE UNPACK
ABSENT_OVER_TIME LABEL_REPLACE UNPACK OFFSET

// Operators are listed with increasing precedence.
%left <binOp> OR
Expand Down Expand Up @@ -133,19 +135,31 @@ logExpr:
;

logRangeExpr:
Copy link
Member

@owen-d owen-d Mar 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love how we're duplicating all paths here. I know we've done this in the past, but it's getting out of hand (not your fault).

@cyriltovena we should consider refactoring this a bit, but I'm fine with this PR.

selector RANGE { $$ = newLogRange(newMatcherExpr($1), $2, nil) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $4, nil) }
| selector RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2 , $3) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4 , $5) }
| selector unwrapExpr RANGE { $$ = newLogRange(newMatcherExpr($1), $3, $2 ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $5, $3 ) }
| selector pipelineExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil ) }
| selector pipelineExpr unwrapExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4) }
| selector RANGE pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, nil) }
| selector RANGE pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, $4 ) }
| OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS { $$ = $2 }
selector RANGE { $$ = newLogRange(newMatcherExpr($1), $2, nil, nil ) }
| selector RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($1), $2, nil, $3 ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $4, nil, nil ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($2), $4, nil, $5 ) }
| selector RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2, $3, nil ) }
| selector RANGE offsetExpr unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2, $4, $3 ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4, $5, nil ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE offsetExpr unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4, $6, $5 ) }
| selector unwrapExpr RANGE { $$ = newLogRange(newMatcherExpr($1), $3, $2, nil ) }
| selector unwrapExpr RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($1), $3, $2, $4 ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $5, $3, nil ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($2), $5, $3, $6 ) }
| selector pipelineExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil, nil ) }
| selector pipelineExpr RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil, $4 ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil, $6 ) }
| selector pipelineExpr unwrapExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3, nil ) }
| selector pipelineExpr unwrapExpr RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3, $5 ) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4, $7 ) }
| selector RANGE pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, nil, nil) }
| selector RANGE offsetExpr pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $4), $2, nil, $3 ) }
| selector RANGE pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, $4, nil ) }
| selector RANGE offsetExpr pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $4), $2, $5, $3 ) }
| OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS { $$ = $2 }
| logRangeExpr error
;

Expand Down Expand Up @@ -363,6 +377,8 @@ rangeOp:
| ABSENT_OVER_TIME { $$ = OpRangeTypeAbsent }
;

offsetExpr:
OFFSET DURATION { $$ = newOffsetExpr( $2 ) }

labels:
IDENTIFIER { $$ = []string{ $1 } }
Expand Down
Loading