Skip to content

Commit

Permalink
Merge pull request #392 from go-faster/feat/absent-over-time
Browse files Browse the repository at this point in the history
feat(logqlmetric): implement `absent_over_time`
  • Loading branch information
tdakkota committed Apr 26, 2024
2 parents c3cf8aa + e970d71 commit 9397a8a
Show file tree
Hide file tree
Showing 55 changed files with 431 additions and 365 deletions.
16 changes: 16 additions & 0 deletions dev/local/ch-logql-compliance/logql-test-queries.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,29 @@ test_cases:
{job="varlogs"} [{{ .range }}]
)
variant_args: ["simpleRangeAggOp", "range"]
# Absent over time range aggregation.
# NOTE(tdakkota): We do a separate test because `absent_over_time` returns non-empty result
# only if there is NO samples in step and vice versa.
- query: |-
absent_over_time(
{job="varlogs"} [{{ .range }}]
)
variant_args: ["range"]
should_be_empty: true
- query: |-
absent_over_time(
{job="varlogs"} |= "no way line would contain this message" [{{ .range }}]
)
variant_args: ["range"]
# Unwrap sampler.
- query: |-
{{ .unwrapRangeAggOp }}(
{job="varlogs"} | json | unwrap status
[{{ .range }}]
)
variant_args: ["unwrapRangeAggOp", "range"]
skip_comparison: true # It seems, there is some issues with unwrap.
# Vector aggregation.
- query: |-
{{ .simpleVecAggOp }} by (filename) (
Expand Down
3 changes: 2 additions & 1 deletion internal/chstorage/querier_logs_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/logstorage"
)

Expand Down Expand Up @@ -53,7 +54,7 @@ func (n *InputNode) EvalPipeline(ctx context.Context, params logqlengine.EvalPar
}

func entryMapper(r logstorage.Record) (logqlengine.Entry, error) {
set := logqlengine.NewLabelSet()
set := logqlabels.NewLabelSet()
e := logqlengine.Entry{
Timestamp: r.Timestamp,
Line: r.Body,
Expand Down
3 changes: 2 additions & 1 deletion internal/chstorage/querier_logs_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric"
"github.com/go-faster/oteldb/internal/logstorage"
"github.com/go-faster/oteldb/internal/otelstorage"
Expand Down Expand Up @@ -253,7 +254,7 @@ func (v *SampleQuery) Eval(ctx context.Context, q *Querier) (_ logqlengine.Sampl
result = append(result, logqlmetric.SampledEntry{
Timestamp: pcommon.NewTimestampFromTime(timestamp),
Sample: sample,
Set: logqlengine.AggregatedLabelsFromMap(labels),
Set: logqlabels.AggregatedLabelsFromMap(labels),
})
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"regexp"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand All @@ -20,6 +21,6 @@ const ansiPattern = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\
var ansiRegex = regexp.MustCompile(ansiPattern)

// Process implements Processor.
func (d *Decolorize) Process(_ otelstorage.Timestamp, line string, _ LabelSet) (string, bool) {
func (d *Decolorize) Process(_ otelstorage.Timestamp, line string, _ logqlabels.LabelSet) (string, bool) {
return ansiRegex.ReplaceAllString(line, ""), true
}
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/decolorize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
)

func TestDecolorize(t *testing.T) {
Expand All @@ -26,7 +27,7 @@ func TestDecolorize(t *testing.T) {
for i, tt := range tests {
tt := tt
t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) {
set := NewLabelSet()
set := logqlabels.NewLabelSet()

f, err := buildDecolorize(&logql.DecolorizeExpr{})
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logqlengine

import (
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand All @@ -24,7 +25,7 @@ func buildDistinctFilter(stage *logql.DistinctFilter) (Processor, error) {
}

// Process implements Processor.
func (d *DistinctFilter) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (d *DistinctFilter) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
for _, label := range d.labels {
val, ok := set.GetString(label)
if !ok {
Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand Down Expand Up @@ -36,7 +37,7 @@ func buildDropLabels(stage *logql.DropLabelsExpr) (Processor, error) {
}

// Process implements Processor.
func (k *DropLabels) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) {
func (k *DropLabels) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) {
set.Range(func(label logql.Label, val pcommon.Value) {
if k.dropPair(label, val) {
set.Delete(label)
Expand Down
3 changes: 1 addition & 2 deletions internal/logql/logqlengine/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func TestDropLabels(t *testing.T) {
})
require.NoError(t, err)

set := NewLabelSet()
set.labels = tt.input
set := newLabelSet(tt.input)
newLine, ok := e.Process(0, ``, set)
// Ensure that processor does not change the line.
require.Equal(t, ``, newLine)
Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/engine_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric"
"github.com/go-faster/oteldb/internal/lokiapi"
"github.com/go-faster/oteldb/internal/otelstorage"
Expand Down Expand Up @@ -38,7 +39,7 @@ const (
type Entry struct {
Timestamp otelstorage.Timestamp
Line string
Set LabelSet
Set logqlabels.LabelSet
}

type (
Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/logstorage"
"github.com/go-faster/oteldb/internal/lokiapi"
"github.com/go-faster/oteldb/internal/otelstorage"
Expand Down Expand Up @@ -161,7 +162,7 @@ func (n *mockPipelineNode) EvalPipeline(ctx context.Context, params EvalParams)
}
}

set := NewLabelSet()
set := logqlabels.NewLabelSet()
set.SetAttrs(
otelstorage.Attrs(attrs),
otelstorage.Attrs(scopeAttrs),
Expand Down
9 changes: 5 additions & 4 deletions internal/logql/logqlengine/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/jsonexpr"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand Down Expand Up @@ -52,7 +53,7 @@ func buildJSONExtractor(stage *logql.JSONExpressionParser) (Processor, error) {
}

// Process implements Processor.
func (e *JSONExtractor) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) {
func (e *JSONExtractor) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) {
var err error
switch {
case len(e.paths) != 0:
Expand All @@ -68,7 +69,7 @@ func (e *JSONExtractor) Process(_ otelstorage.Timestamp, line string, set LabelS
return line, true
}

func extractExprs(paths map[logql.Label]jsonexpr.Path, line string, set LabelSet) error {
func extractExprs(paths map[logql.Label]jsonexpr.Path, line string, set logqlabels.LabelSet) error {
// TODO(tdakkota): allocates buffer for each line.
d := decodeStr(line)
return jsonexpr.Extract(
Expand All @@ -80,7 +81,7 @@ func extractExprs(paths map[logql.Label]jsonexpr.Path, line string, set LabelSet
)
}

func extractSome(labels map[logql.Label]struct{}, line string, set LabelSet) error {
func extractSome(labels map[logql.Label]struct{}, line string, set logqlabels.LabelSet) error {
d := decodeStr(line)
return d.ObjBytes(func(d *jx.Decoder, key []byte) error {
if _, ok := labels[logql.Label(key)]; !ok {
Expand All @@ -101,7 +102,7 @@ func extractSome(labels map[logql.Label]struct{}, line string, set LabelSet) err
})
}

func extractAll(line string, set LabelSet) error {
func extractAll(line string, set logqlabels.LabelSet) error {
d := decodeStr(line)
return d.Obj(func(d *jx.Decoder, key string) error {
value, ok, err := parseValue(d)
Expand Down
9 changes: 5 additions & 4 deletions internal/logql/logqlengine/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
)

func TestJSONExtractor(t *testing.T) {
Expand Down Expand Up @@ -90,7 +91,7 @@ func TestJSONExtractor(t *testing.T) {
})
require.NoError(t, err)

set := NewLabelSet()
set := logqlabels.NewLabelSet()
newLine, ok := e.Process(0, tt.input, set)
// Ensure that extractor does not change the line.
require.Equal(t, tt.input, newLine)
Expand All @@ -104,7 +105,7 @@ func TestJSONExtractor(t *testing.T) {
errMsg, ok := set.GetError()
require.False(t, ok, "got error: %s", errMsg)

require.Len(t, set.labels, len(tt.expectLabels))
require.Equal(t, len(tt.expectLabels), set.Len())
for k, expect := range tt.expectLabels {
got, ok := set.Get(k)
require.Truef(t, ok, "key %q", k)
Expand Down Expand Up @@ -172,7 +173,7 @@ func BenchmarkJSONExtractor(b *testing.B) {
p, err := buildJSONExtractor(bb.expr)
require.NoError(b, err)

set := NewLabelSet()
set := logqlabels.NewLabelSet()
var (
line string
ok bool
Expand All @@ -182,7 +183,7 @@ func BenchmarkJSONExtractor(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
set.reset()
set.Reset()
line, ok = p.Process(10, benchdata, set)
}

Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/keep.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand Down Expand Up @@ -36,7 +37,7 @@ func buildKeepLabels(stage *logql.KeepLabelsExpr) (Processor, error) {
}

// Process implements Processor.
func (k *KeepLabels) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) {
func (k *KeepLabels) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) {
set.Range(func(label logql.Label, val pcommon.Value) {
if !k.keepPair(label, val) {
set.Delete(label)
Expand Down
3 changes: 1 addition & 2 deletions internal/logql/logqlengine/keep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ func TestKeepLabels(t *testing.T) {
})
require.NoError(t, err)

set := NewLabelSet()
set.labels = tt.input
set := newLabelSet(tt.input)
newLine, ok := e.Process(0, ``, set)
// Ensure that processor does not change the line.
require.Equal(t, ``, newLine)
Expand Down
15 changes: 8 additions & 7 deletions internal/logql/logqlengine/label_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-faster/errors"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand Down Expand Up @@ -67,7 +68,7 @@ type AndLabelMatcher struct {
}

// Process implements Processor.
func (m *AndLabelMatcher) Process(ts otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (m *AndLabelMatcher) Process(ts otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
line, keep = m.Left.Process(ts, line, set)
if !keep {
return line, keep
Expand All @@ -82,7 +83,7 @@ type OrLabelMatcher struct {
}

// Process implements Processor.
func (m *OrLabelMatcher) Process(ts otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (m *OrLabelMatcher) Process(ts otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
line, keep = m.Left.Process(ts, line, set)
if keep {
return line, keep
Expand All @@ -109,7 +110,7 @@ func buildLabelMatcher(pred logql.LabelMatcher) (Processor, error) {
}

// Process implements Processor.
func (lf *LabelMatcher) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (lf *LabelMatcher) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
labelValue, _ := set.GetString(lf.name)
keep = lf.matcher.Match(labelValue)
return line, keep
Expand Down Expand Up @@ -160,7 +161,7 @@ func buildDurationLabelFilter(pred *logql.DurationFilter) (Processor, error) {
}

// Process implements Processor.
func (lf *DurationLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (lf *DurationLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
v, ok := set.GetString(lf.name)
if !ok {
return "", false
Expand Down Expand Up @@ -222,7 +223,7 @@ func buildBytesLabelFilter(pred *logql.BytesFilter) (Processor, error) {
}

// Process implements Processor.
func (lf *BytesLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (lf *BytesLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
v, ok := set.GetString(lf.name)
if !ok {
return "", false
Expand Down Expand Up @@ -284,7 +285,7 @@ func buildNumberLabelFilter(pred *logql.NumberFilter) (Processor, error) {
}

// Process implements Processor.
func (lf *NumberLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (lf *NumberLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
switch val, ok, err := set.GetFloat(lf.name); {
case err != nil:
// Keep the line, but set error label.
Expand Down Expand Up @@ -314,7 +315,7 @@ func buildIPLabelFilter(pred *logql.IPFilter) (Processor, error) {
}

// Process implements Processor.
func (lf *IPLabelFilter) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (lf *IPLabelFilter) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
v, ok := set.GetString(lf.name)
if !ok {
return "", false
Expand Down
Loading

0 comments on commit 9397a8a

Please sign in to comment.