From da5d28dc1a536156f0bd682746a0d48e9dfe72ac Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Wed, 24 Sep 2025 14:00:02 -0700 Subject: [PATCH] Update promql-engine to latest Signed-off-by: Ahmed Hassan --- go.mod | 2 +- go.sum | 4 +- .../thanos-io/promql-engine/engine/engine.go | 9 +- .../execution/aggregate/accumulator.go | 14 +- .../logicalplan/logical_nodes.go | 8 +- .../logicalplan/merge_selects.go | 40 ++- .../promql-engine/logicalplan/projection.go | 273 ++++++++++++++++++ .../promql-engine/storage/prometheus/pool.go | 2 + .../storage/prometheus/scanners.go | 11 + vendor/modules.txt | 2 +- 10 files changed, 333 insertions(+), 32 deletions(-) create mode 100644 vendor/github.com/thanos-io/promql-engine/logicalplan/projection.go diff --git a/go.mod b/go.mod index df75cf1e5ba..92129a822c6 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/spf13/afero v1.11.0 github.com/stretchr/testify v1.10.0 github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3 - github.com/thanos-io/promql-engine v0.0.0-20250726034445-91e6e32a36a7 + github.com/thanos-io/promql-engine v0.0.0-20250924193140-e9123dc11264 github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 diff --git a/go.sum b/go.sum index 7768f622c9c..24bc17a2f13 100644 --- a/go.sum +++ b/go.sum @@ -1755,8 +1755,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1 github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3 h1:P301Anc27aVL7Ls88el92j+qW3PJp8zmiDl+kOUZv3A= github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3/go.mod h1:uDHLkMKOGDAnlN75EAz8VrRzob1+VbgYSuUleatWuF0= -github.com/thanos-io/promql-engine v0.0.0-20250726034445-91e6e32a36a7 h1:lFCGOWLDH50RB4ig/xRnUXX99ECD13xUHQdNOvcAYwc= -github.com/thanos-io/promql-engine v0.0.0-20250726034445-91e6e32a36a7/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc= +github.com/thanos-io/promql-engine v0.0.0-20250924193140-e9123dc11264 h1:sOmANo4XVhem4VgvI9w05DBwqMex/qw+cDjuHW2FKWw= +github.com/thanos-io/promql-engine v0.0.0-20250924193140-e9123dc11264/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc= github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb h1:z/ePbn3lo/D4vdHGH8hpa2kgH9M6iLq0kOFtZwuelKM= github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb/go.mod h1:gGUG3TDEoRSjTFVs/QO6QnQIILRgNF0P9l7BiiMfmHw= github.com/tinylib/msgp v1.3.0 h1:ULuf7GPooDaIlbyvgAxBV/FI7ynli6LZ1/nVUNu+0ww= diff --git a/vendor/github.com/thanos-io/promql-engine/engine/engine.go b/vendor/github.com/thanos-io/promql-engine/engine/engine.go index b3782fbe324..ac0f0eecc65 100644 --- a/vendor/github.com/thanos-io/promql-engine/engine/engine.go +++ b/vendor/github.com/thanos-io/promql-engine/engine/engine.go @@ -539,9 +539,7 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { defer q.engine.activeQueryTracker.Delete(idx) ctx = warnings.NewContext(ctx) - defer func() { - ret.Warnings = ret.Warnings.Merge(warnings.FromContext(ctx)) - }() + warnings.MergeToContext(q.warns, ctx) // Handle case with strings early on as this does not need us to process samples. switch e := q.plan.Root().(type) { @@ -549,8 +547,7 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { return &promql.Result{Value: promql.String{V: e.Val, T: q.ts.UnixMilli()}} } ret = &promql.Result{ - Value: promql.Vector{}, - Warnings: q.warns, + Value: promql.Vector{}, } defer recoverEngine(q.engine.logger, q.plan, &ret.Err) @@ -626,6 +623,7 @@ loop: } sort.Sort(matrix) ret.Value = matrix + ret.Warnings = warnings.FromContext(ctx) if matrix.ContainsSameLabelset() { return newErrResult(ret, extlabels.ErrDuplicateLabelSet) } @@ -679,6 +677,7 @@ loop: } ret.Value = result + ret.Warnings = warnings.FromContext(ctx) return ret } diff --git a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go index 390dfe946b9..f8e3dbfa43d 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go @@ -39,9 +39,10 @@ type vectorAccumulator interface { } type sumAcc struct { - value float64 - histSum *histogram.FloatHistogram - hasFloatVal bool + value float64 + compensation float64 + histSum *histogram.FloatHistogram + hasFloatVal bool } func newSumAcc() *sumAcc { @@ -50,7 +51,7 @@ func newSumAcc() *sumAcc { func (s *sumAcc) AddVector(ctx context.Context, float64s []float64, histograms []*histogram.FloatHistogram) error { if len(float64s) > 0 { - s.value += SumCompensated(float64s) + s.value, s.compensation = KahanSumInc(SumCompensated(float64s), s.value, s.compensation) s.hasFloatVal = true } @@ -64,7 +65,7 @@ func (s *sumAcc) AddVector(ctx context.Context, float64s []float64, histograms [ func (s *sumAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { if h == nil { s.hasFloatVal = true - s.value += v + s.value, s.compensation = KahanSumInc(v, s.value, s.compensation) return nil } if s.histSum == nil { @@ -108,7 +109,7 @@ func (s *sumAcc) Value() (float64, *histogram.FloatHistogram) { if s.histSum != nil { s.histSum.Compact(0) } - return s.value, s.histSum + return s.value + s.compensation, s.histSum } func (s *sumAcc) ValueType() ValueType { @@ -125,6 +126,7 @@ func (s *sumAcc) Reset(_ float64) { s.histSum = nil s.hasFloatVal = false s.value = 0 + s.compensation = 0 } func newMaxAcc() *maxAcc { diff --git a/vendor/github.com/thanos-io/promql-engine/logicalplan/logical_nodes.go b/vendor/github.com/thanos-io/promql-engine/logicalplan/logical_nodes.go index 5c853eb7b84..f47abcb66da 100644 --- a/vendor/github.com/thanos-io/promql-engine/logicalplan/logical_nodes.go +++ b/vendor/github.com/thanos-io/promql-engine/logicalplan/logical_nodes.go @@ -73,7 +73,7 @@ type VectorSelector struct { Filters []*labels.Matcher BatchSize int64 SelectTimestamp bool - Projection Projection + Projection *Projection // When set, histogram iterators can return objects which only have their // CounterResetHint, Count and Sum values populated. Histogram buckets and spans // will not be used during query evaluation. @@ -87,7 +87,11 @@ func (f *VectorSelector) Clone() Node { clone.Filters = shallowCloneSlice(f.Filters) clone.LabelMatchers = shallowCloneSlice(f.LabelMatchers) - clone.Projection.Labels = shallowCloneSlice(f.Projection.Labels) + if f.Projection != nil { + clone.Projection = &Projection{} + clone.Projection.Labels = shallowCloneSlice(f.Projection.Labels) + clone.Projection.Include = f.Projection.Include + } if f.VectorSelector.Timestamp != nil { ts := *f.VectorSelector.Timestamp diff --git a/vendor/github.com/thanos-io/promql-engine/logicalplan/merge_selects.go b/vendor/github.com/thanos-io/promql-engine/logicalplan/merge_selects.go index 8688baa81d8..4cc2f7cf2a7 100644 --- a/vendor/github.com/thanos-io/promql-engine/logicalplan/merge_selects.go +++ b/vendor/github.com/thanos-io/promql-engine/logicalplan/merge_selects.go @@ -37,6 +37,9 @@ func extractSelectors(selectors matcherHeap, expr Node) { if !ok { return } + if !emptyProjection(e) { + return + } for _, l := range e.LabelMatchers { if l.Name == labels.MetricName { selectors.add(l.Value, e.LabelMatchers) @@ -50,6 +53,9 @@ func replaceMatchers(selectors matcherHeap, expr *Node) { var matchers []*labels.Matcher switch e := (*node).(type) { case *VectorSelector: + if !emptyProjection(e) { + return + } matchers = e.LabelMatchers default: return @@ -69,18 +75,8 @@ func replaceMatchers(selectors matcherHeap, expr *Node) { filters := make([]*labels.Matcher, len(matchers)) copy(filters, matchers) - // All replacements are done on metrics name only, - // so we can drop the explicit metric name selector. - filters = dropMatcher(labels.MetricName, filters) - - // Drop filters which are already present as matchers in the replacement selector. - for _, s := range replacement { - for _, f := range filters { - if s.Name == f.Name && s.Value == f.Value && s.Type == f.Type { - filters = dropMatcher(f.Name, filters) - } - } - } + // Drop filters which are already present as matchers in the replacement selector including metric name selector. + filters = dropMatcher(replacement, filters) switch e := (*node).(type) { case *VectorSelector: @@ -93,12 +89,19 @@ func replaceMatchers(selectors matcherHeap, expr *Node) { }) } -func dropMatcher(matcherName string, originalMatchers []*labels.Matcher) []*labels.Matcher { - res := slices.Clone(originalMatchers) +func dropMatcher(toDrop []*labels.Matcher, original []*labels.Matcher) []*labels.Matcher { + res := slices.Clone(original) i := 0 for i < len(res) { l := res[i] - if l.Name == matcherName { + remove := false + for _, m := range toDrop { + if l.Name == m.Name && l.Type == m.Type && l.Value == m.Value { + remove = true + break + } + } + if remove { res = slices.Delete(res, i, i+1) } else { i++ @@ -163,3 +166,10 @@ func (m matcherHeap) findReplacement(metricName string, matcher []*labels.Matche return top, true } + +func emptyProjection(vs *VectorSelector) bool { + if vs.Projection == nil { + return true + } + return !vs.Projection.Include && len(vs.Projection.Labels) == 0 +} diff --git a/vendor/github.com/thanos-io/promql-engine/logicalplan/projection.go b/vendor/github.com/thanos-io/promql-engine/logicalplan/projection.go new file mode 100644 index 00000000000..cbca7c983bb --- /dev/null +++ b/vendor/github.com/thanos-io/promql-engine/logicalplan/projection.go @@ -0,0 +1,273 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +package logicalplan + +import ( + "maps" + "slices" + + "github.com/thanos-io/promql-engine/query" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/util/annotations" +) + +type ProjectionOptimizer struct { + SeriesHashLabel string +} + +func (p ProjectionOptimizer) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) { + p.pushProjection(&plan, nil) + return plan, nil +} + +// pushProjection recursively traverses the tree and pushes projection information down. +func (p ProjectionOptimizer) pushProjection(node *Node, projection *Projection) { + switch n := (*node).(type) { + case *VectorSelector: + if projection != nil { + n.Projection = projection + } else { + // Set dummy projection. + n.Projection = &Projection{} + } + + case *Aggregation: + // Special handling for aggregation functions that need all labels + // regardless of grouping (topk, bottomk, limitk, limit_ratio) + switch n.Op { + case parser.TOPK, parser.BOTTOMK, parser.LIMITK, parser.LIMIT_RATIO: + // These functions need all labels, so clear any requirements + p.pushProjection(&n.Expr, nil) + return + } + + // For aggregations, we directly use the grouping labels + grouping := n.Grouping + groupingProjection := &Projection{ + Labels: grouping, + Include: !n.Without, + } + // Note that we don't push projection to Aggregation.Param as they are not + // selecting data for the aggregation. + p.pushProjection(&n.Expr, groupingProjection) + + if p.SeriesHashLabel != "" && n.Without { + n.Grouping = append(grouping, p.SeriesHashLabel) + } + + case *Binary: + var highCard, lowCard = n.LHS, n.RHS + + if n.VectorMatching == nil || (!n.VectorMatching.On && len(n.VectorMatching.MatchingLabels) == 0) { + if IsConstantExpr(lowCard) { + p.pushProjection(&highCard, projection) + } else { + p.pushProjection(&highCard, nil) + } + + if IsConstantExpr(highCard) { + p.pushProjection(&lowCard, projection) + } else { + p.pushProjection(&lowCard, nil) + } + return + } + + if n.VectorMatching.Card == parser.CardOneToOne { + proj := &Projection{ + Labels: n.VectorMatching.MatchingLabels, + Include: n.VectorMatching.On, + } + + for _, child := range n.Children() { + p.pushProjection(child, proj) + } + + if !n.VectorMatching.On && p.SeriesHashLabel != "" { + n.VectorMatching.MatchingLabels = append(n.VectorMatching.MatchingLabels, p.SeriesHashLabel) + } + return + } + + if n.VectorMatching.Card == parser.CardOneToMany { + highCard, lowCard = lowCard, highCard + } + + // Handle high card side projection. Only ignoring mode is supported. + hcProjection := &Projection{} + // Only push projection for high card side if there is an outer projection available + // to remove series hash + if projection != nil && projection.Include { + // Include labels are from low card side so we don't need to fetch + // them from high card side if include labels are not used as join keys. + hcProjection.Labels = n.VectorMatching.Include + if !n.VectorMatching.On { + hcProjection.Labels = intersect(hcProjection.Labels, n.VectorMatching.MatchingLabels) + } + } + if len(hcProjection.Labels) > 1 { + p.pushProjection(&highCard, hcProjection) + } else { + // If there is only 1 label to project then it is not worth to push projection + // down to high card side as calculating hash might be more expensive. + p.pushProjection(&highCard, nil) + } + + // Handle low card side projection. + lcProjection := extendProjection(Projection{ + Include: n.VectorMatching.On, + Labels: n.VectorMatching.MatchingLabels, + }, n.VectorMatching.Include) + p.pushProjection(&lowCard, &lcProjection) + + if !n.VectorMatching.On && p.SeriesHashLabel != "" { + n.VectorMatching.MatchingLabels = append(n.VectorMatching.MatchingLabels, p.SeriesHashLabel) + } + return + + case *FunctionCall: + // Handle function-specific label requirements. + updatedProjection := getFunctionLabelRequirements(n.Func.Name, n.Args, projection) + for _, child := range n.Children() { + p.pushProjection(child, updatedProjection) + } + + default: + // For other node types, propagate to children + for _, child := range (*node).Children() { + p.pushProjection(child, projection) + } + } +} + +func extendProjection(projection Projection, lbls []string) Projection { + var extendedLabels []string + if projection.Include { + extendedLabels = union(projection.Labels, lbls) + } else { + extendedLabels = subtract(projection.Labels, lbls) + } + return Projection{ + Include: projection.Include, + Labels: extendedLabels, + } +} + +// unwrapStepInvariantExpr recursively unwraps step invariant expressions to get to the underlying node. +func unwrapStepInvariantExpr(node Node) Node { + if stepInvariant, ok := node.(*StepInvariantExpr); ok { + return unwrapStepInvariantExpr(stepInvariant.Expr) + } + return node +} + +// getFunctionLabelRequirements returns an updated projection based on function-specific requirements. +func getFunctionLabelRequirements(funcName string, args []Node, projection *Projection) *Projection { + if projection == nil { + projection = &Projection{} + } + result := &Projection{ + Labels: make([]string, len(projection.Labels)), + Include: projection.Include, + } + copy(result.Labels, projection.Labels) + + // Add function-specific required labels + switch funcName { + case "absent_over_time", "absent", "scalar": + return &Projection{ + Labels: []string{}, + Include: true, + } + case "histogram_quantile": + // Unsafe to push projection down for histogram_quantile as it requires le label. + return nil + case "label_replace": + dstArg := unwrapStepInvariantExpr(args[1]) + if dstLit, ok := dstArg.(*StringLiteral); ok { + dstLabel := dstLit.Val + needed := slices.Contains(result.Labels, dstLabel) + needSourceLabels := (result.Include && needed) || (!result.Include && !needed) + if !needSourceLabels { + return result + } + + srcArg := unwrapStepInvariantExpr(args[3]) + if strLit, ok := srcArg.(*StringLiteral); ok { + if result.Include && needed { + result.Labels = append(result.Labels, strLit.Val) + } else { + result.Labels = slices.DeleteFunc(result.Labels, func(s string) bool { + return s == strLit.Val + }) + } + } + } + case "label_join": + dstArg := unwrapStepInvariantExpr(args[1]) + if dstLit, ok := dstArg.(*StringLiteral); ok { + dstLabel := dstLit.Val + needed := slices.Contains(result.Labels, dstLabel) + needSourceLabels := (result.Include && needed) || (!result.Include && !needed) + if !needSourceLabels { + return result + } + + // Only if the destination label is needed, we need the source labels + for i := 3; i < len(args); i++ { + srcArg := unwrapStepInvariantExpr(args[i]) + if strLit, ok := srcArg.(*StringLiteral); ok { + if result.Include && needed { + result.Labels = append(result.Labels, strLit.Val) + } else { + result.Labels = slices.DeleteFunc(result.Labels, func(s string) bool { + return s == strLit.Val + }) + } + } + } + } + } + + return result +} + +// union returns the union of two string slices. +func union(l1 []string, l2 []string) []string { + m := make(map[string]struct{}) + for _, s := range l1 { + m[s] = struct{}{} + } + for _, s := range l2 { + m[s] = struct{}{} + } + return slices.Collect(maps.Keys(m)) +} + +// subtract returns the intersection of two string slices. +func subtract(l1 []string, l2 []string) []string { + m := make(map[string]struct{}) + for _, s := range l1 { + m[s] = struct{}{} + } + for _, s := range l2 { + delete(m, s) + } + return slices.Collect(maps.Keys(m)) +} + +func intersect(l1 []string, l2 []string) []string { + m := make(map[string]struct{}) + for _, s := range l1 { + m[s] = struct{}{} + } + var result []string + for _, s := range l2 { + if _, ok := m[s]; ok { + result = append(result, s) + } + } + return result +} diff --git a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/pool.go b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/pool.go index e6e71a24539..9706b334081 100644 --- a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/pool.go +++ b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/pool.go @@ -55,6 +55,8 @@ func hashMatchers(matchers []*labels.Matcher, mint, maxt int64, hints storage.Se writeString(sb, hints.Func) writeString(sb, strings.Join(hints.Grouping, ";")) writeBool(sb, hints.By) + writeString(sb, strings.Join(hints.ProjectionLabels, ";")) + writeBool(sb, hints.ProjectionInclude) key := sb.Sum64() return key diff --git a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go index 4b595442a72..0ffc9750b66 100644 --- a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go +++ b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go @@ -53,6 +53,12 @@ func (p Scanners) NewVectorSelector( hints storage.SelectHints, logicalNode logicalplan.VectorSelector, ) (model.VectorOperator, error) { + // Update hints with projection information if available + if logicalNode.Projection != nil { + hints.ProjectionLabels = logicalNode.Projection.Labels + hints.ProjectionInclude = logicalNode.Projection.Include + } + selector := p.selectors.GetFilteredSelector(hints.Start, hints.End, opts.Step.Milliseconds(), logicalNode.VectorSelector.LabelMatchers, logicalNode.Filters, hints) if logicalNode.DecodeNativeHistogramStats { selector = newHistogramStatsSelector(selector) @@ -121,6 +127,11 @@ func (p Scanners) NewMatrixSelector( } vs := logicalNode.VectorSelector + if vs.Projection != nil { + hints.ProjectionLabels = vs.Projection.Labels + hints.ProjectionInclude = vs.Projection.Include + } + selector := p.selectors.GetFilteredSelector(hints.Start, hints.End, opts.Step.Milliseconds(), vs.LabelMatchers, vs.Filters, hints) if logicalNode.VectorSelector.DecodeNativeHistogramStats { selector = newHistogramStatsSelector(selector) diff --git a/vendor/modules.txt b/vendor/modules.txt index 77166f22ef9..3e183623028 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1216,7 +1216,7 @@ github.com/thanos-io/objstore/providers/gcs github.com/thanos-io/objstore/providers/s3 github.com/thanos-io/objstore/providers/swift github.com/thanos-io/objstore/tracing/opentracing -# github.com/thanos-io/promql-engine v0.0.0-20250726034445-91e6e32a36a7 +# github.com/thanos-io/promql-engine v0.0.0-20250924193140-e9123dc11264 ## explicit; go 1.24.0 github.com/thanos-io/promql-engine/api github.com/thanos-io/promql-engine/engine