Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TraceQL Metrics] histogram_over_time #3644

Merged
merged 27 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
50a6984
Add lang/parser support for quantile_over_time, fix missing stringify…
mdisibio Apr 24, 2024
7307d4c
First working draft of quantile_over_time implementation
mdisibio Apr 24, 2024
589d83f
Validate the query in the frontend
mdisibio Apr 25, 2024
6abee43
Histogram accumulate jobs by bucket as they come in instead of at the…
mdisibio Apr 25, 2024
923d7d8
Fix language definition to allow both floats or ints for quantiles
mdisibio Apr 25, 2024
2ccc75f
Remove layer of proto->seriesset conversion. Fix roundtrip of __bucke…
mdisibio Apr 25, 2024
e320bbb
Rename to SimpleAdditionCombiner, slight interval calc cleanup
mdisibio Apr 25, 2024
b845743
Fix p0 returning 1 instead of minimum value, comments cleanup
mdisibio Apr 25, 2024
ee0e5bc
Rename frontend param and fix handling of ints
mdisibio Apr 26, 2024
4c1db95
Fix pre-existing bug in metrics optimization when asserting multiple …
mdisibio Apr 26, 2024
656f525
Fix to support 3 flavors of the metrics pipeline: query-frontend, acr…
mdisibio Apr 26, 2024
81deb1b
Merge branch 'main' into quantile-engine
mdisibio Apr 29, 2024
cb31ed0
Update query_range frontend test for new behavior
mdisibio Apr 29, 2024
b49d994
Consolidate histogram code between traceql and traceqlmetrics. quanti…
mdisibio Apr 30, 2024
05c3af4
lint
mdisibio May 2, 2024
dbe0a18
changelog
mdisibio May 2, 2024
eaf1a5c
histogram_over_time
mdisibio May 2, 2024
5c79dc2
Redo histograms to set __bucket label to the actual value instead of …
mdisibio May 3, 2024
3c8507f
Revert all changes to traceqlmetrics package, was getting too noisy
mdisibio May 3, 2024
6f873a2
Merge branch 'quantile-engine' into histogram_over_time
mdisibio May 6, 2024
cbdadd1
Fix after merge
mdisibio May 6, 2024
eb5d548
lint
mdisibio May 6, 2024
402f4cb
Merge branch 'main' into histogram_over_time
mdisibio May 6, 2024
85e7db1
changelog
mdisibio May 6, 2024
77040db
Merge branch 'main' into histogram_over_time
mdisibio May 7, 2024
48020da
Add test for histogram_over_time
mdisibio May 7, 2024
cb03471
Change quantile_over_time test to use more exported methods
mdisibio May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [ENHANCEMENT] Surface new labels for uninstrumented services and systems [#3543](https://github.com/grafana/tempo/pull/3543) (@t00mas)
* [FEATURE] Add TLS support for Memcached Client [#3585](https://github.com/grafana/tempo/pull/3585) (@sonisr)
* [FEATURE] TraceQL metrics queries: add quantile_over_time [#3605](https://github.com/grafana/tempo/pull/3605) [#3633](https://github.com/grafana/tempo/pull/3633) (@mdisibio)
* [FEATURE] TraceQL metrics queries: add histogram_over_time [#3644](https://github.com/grafana/tempo/pull/3644) (@mdisibio)
* [ENHANCEMENT] Add querier metrics for requests executed [#3524](https://github.com/grafana/tempo/pull/3524) (@electron0zero)
* [FEATURE] Added gRPC streaming endpoints for Tempo APIs.
* Added gRPC streaming endpoints for all tag queries. [#3460](https://github.com/grafana/tempo/pull/3460) (@joe-elliott)
Expand Down
60 changes: 58 additions & 2 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,11 +784,19 @@ func newMetricsAggregateQuantileOverTime(attr Attribute, qs []float64, by []Attr
}
}

func newMetricsAggregateHistogramOverTime(attr Attribute, by []Attribute) *MetricsAggregate {
return &MetricsAggregate{
op: metricsAggregateHistogramOverTime,
by: by,
attr: attr,
}
}

func (a *MetricsAggregate) extractConditions(request *FetchSpansRequest) {
switch a.op {
case metricsAggregateRate, metricsAggregateCountOverTime:
// No extra conditions, start time is already enough
case metricsAggregateQuantileOverTime:
case metricsAggregateQuantileOverTime, metricsAggregateHistogramOverTime:
if !request.HasAttribute(a.attr) {
request.SecondPassConditions = append(request.SecondPassConditions, Condition{
Attribute: a.attr,
Expand Down Expand Up @@ -829,6 +837,48 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
case metricsAggregateRate:
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) }

case metricsAggregateHistogramOverTime:
// Histograms are implemented as count_over_time() by(2^log2(attr)) for now
// This is very similar to quantile_over_time except the bucket values are the true
// underlying value in scale, i.e. a duration of 500ms will be in __bucket==0.512s
// The difference is that quantile_over_time has to calculate the final quantiles
// so in that case the log2 bucket number is more useful. We can clean it up later
// when updating quantiles to be smarter and more customizable range of buckets.
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
byFuncLabel = internalLabelBucket
switch a.attr {
case IntrinsicDurationAttribute:
// Optimal implementation for duration attribute
byFunc = func(s Span) (Static, bool) {
d := s.DurationNanos()
if d < 2 {
return Static{}, false
}
// Bucket is log2(nanos) converted to float seconds
return NewStaticFloat(Log2Bucketize(d) / float64(time.Second)), true
}
default:
// Basic implementation for all other attributes
byFunc = func(s Span) (Static, bool) {
v, ok := s.AttributeFor(a.attr)
if !ok {
return Static{}, false
}

// TODO(mdisibio) - Add support for floats, we need to map them into buckets.
// Because of the range of floats, we need a native histogram approach.
if v.Type != TypeInt {
return Static{}, false
}

if v.N < 2 {
return Static{}, false
}
// Bucket is the value rounded up to the nearest power of 2
return NewStaticFloat(Log2Bucketize(uint64(v.N))), true
}
}

case metricsAggregateQuantileOverTime:
// Quantiles are implemented as count_over_time() by(log2(attr)) for now
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
Expand All @@ -852,7 +902,8 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
return Static{}, false
}

// TODO(mdisibio) - Add quantile support for floats
// TODO(mdisibio) - Add support for floats, we need to map them into buckets.
// Because of the range of floats, we need a native histogram approach.
if v.Type != TypeInt {
return Static{}, false
}
Expand Down Expand Up @@ -908,6 +959,11 @@ func (a *MetricsAggregate) validate() error {
switch a.op {
case metricsAggregateCountOverTime:
case metricsAggregateRate:
case metricsAggregateHistogramOverTime:
if len(a.by) >= maxGroupBys {
// We reserve a spot for the bucket so quantile has 1 less group by
return newUnsupportedError(fmt.Sprintf("metrics group by %v values", len(a.by)))
}
case metricsAggregateQuantileOverTime:
if len(a.by) >= maxGroupBys {
// We reserve a spot for the bucket so quantile has 1 less group by
Expand Down
124 changes: 109 additions & 15 deletions pkg/traceql/engine_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,11 @@ func TestQuantileOverTime(t *testing.T) {
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | quantile_over_time(duration, 0, 0.5, 1) by (span.foo)",
}

var (
attr = IntrinsicDurationAttribute
qs = []float64{0, 0.5, 1}
by = []Attribute{NewScopedAttribute(AttributeScopeSpan, false, "foo")}
e = NewEngine()
_128ns = 0.000000128
_256ns = 0.000000256
_512ns = 0.000000512
Expand Down Expand Up @@ -385,33 +384,33 @@ func TestQuantileOverTime(t *testing.T) {
}

// 3 layers of processing matches: query-frontend -> queriers -> generators -> blocks
layer1 := newMetricsAggregateQuantileOverTime(attr, qs, by)
layer1.init(req, AggregateModeRaw)
layer1, err := e.CompileMetricsQueryRange(req, false, 0, false)
require.NoError(t, err)

layer2 := newMetricsAggregateQuantileOverTime(attr, qs, by)
layer2.init(req, AggregateModeSum)
layer2, err := e.CompileMetricsQueryRangeNonRaw(req, AggregateModeSum)
require.NoError(t, err)

layer3 := newMetricsAggregateQuantileOverTime(attr, qs, by)
layer3.init(req, AggregateModeFinal)
layer3, err := e.CompileMetricsQueryRangeNonRaw(req, AggregateModeFinal)
require.NoError(t, err)

// Pass spans to layer 1
for _, s := range in {
layer1.observe(s)
layer1.metricsPipeline.observe(s)
}

// Pass layer 1 to layer 2
// These are partial counts over time by bucket
res := layer1.result()
layer2.observeSeries(res.ToProto(req))
res := layer1.Results()
layer2.metricsPipeline.observeSeries(res.ToProto(req))

// Pass layer 2 to layer 3
// These are summed counts over time by bucket
res = layer2.result()
layer3.observeSeries(res.ToProto(req))
res = layer2.Results()
layer3.ObserveSeries(res.ToProto(req))

// Layer 3 final results
// The quantiles
final := layer3.result()
final := layer3.Results()
require.Equal(t, out, final)
}

Expand All @@ -422,3 +421,98 @@ func percentileHelper(q float64, values ...float64) float64 {
}
return Log2Quantile(q, h.Buckets)
}

func TestHistogramOverTime(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | histogram_over_time(duration) by (span.foo)",
}

var (
e = NewEngine()
_128ns = NewStaticFloat(0.000000128)
_256ns = NewStaticFloat(0.000000256)
_512ns = NewStaticFloat(0.000000512)
)

// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
}

// Output series with buckets per foo
// Prom labels are sorted alphabetically, traceql labels maintain original order.
out := SeriesSet{
`{` + internalLabelBucket + `="` + _128ns.EncodeToString(true) + `", span.foo="bar"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("bar")},
{Name: internalLabelBucket, Value: _128ns},
},
Values: []float64{1, 0, 0},
},
`{` + internalLabelBucket + `="` + _256ns.EncodeToString(true) + `", span.foo="bar"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("bar")},
{Name: internalLabelBucket, Value: _256ns},
},
Values: []float64{1, 4, 0},
},
`{` + internalLabelBucket + `="` + _512ns.EncodeToString(true) + `", span.foo="bar"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("bar")},
{Name: internalLabelBucket, Value: _512ns},
},
Values: []float64{1, 0, 0},
},
`{` + internalLabelBucket + `="` + _512ns.EncodeToString(true) + `", span.foo="baz"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("baz")},
{Name: internalLabelBucket, Value: _512ns},
},
Values: []float64{0, 0, 3},
},
}

// 3 layers of processing matches: query-frontend -> queriers -> generators -> blocks
layer1, err := e.CompileMetricsQueryRange(req, false, 0, false)
require.NoError(t, err)

layer2, err := e.CompileMetricsQueryRangeNonRaw(req, AggregateModeSum)
require.NoError(t, err)

layer3, err := e.CompileMetricsQueryRangeNonRaw(req, AggregateModeFinal)
require.NoError(t, err)

// Pass spans to layer 1
for _, s := range in {
layer1.metricsPipeline.observe(s)
}

// Pass layer 1 to layer 2
// These are partial counts over time by bucket
res := layer1.Results()
layer2.metricsPipeline.observeSeries(res.ToProto(req))

// Pass layer 2 to layer 3
// These are summed counts over time by bucket
res = layer2.Results()
layer3.ObserveSeries(res.ToProto(req))

// Layer 3 final results
// The quantiles
final := layer3.Results()
require.Equal(t, out, final)
}
3 changes: 3 additions & 0 deletions pkg/traceql/enum_aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
metricsAggregateRate MetricsAggregateOp = iota
metricsAggregateCountOverTime
metricsAggregateQuantileOverTime
metricsAggregateHistogramOverTime
)

func (a MetricsAggregateOp) String() string {
Expand All @@ -65,6 +66,8 @@ func (a MetricsAggregateOp) String() string {
return "count_over_time"
case metricsAggregateQuantileOverTime:
return "quantile_over_time"
case metricsAggregateHistogramOverTime:
return "histogram_over_time"
}

return fmt.Sprintf("aggregate(%d)", a)
Expand Down
14 changes: 8 additions & 6 deletions pkg/traceql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ import (
COUNT AVG MAX MIN SUM
BY COALESCE SELECT
END_ATTRIBUTE
RATE COUNT_OVER_TIME QUANTILE_OVER_TIME
RATE COUNT_OVER_TIME QUANTILE_OVER_TIME HISTOGRAM_OVER_TIME
WITH

// Operators are listed with increasing precedence.
Expand Down Expand Up @@ -292,12 +292,14 @@ aggregate:
// Metrics
// **********************
metricsAggregation:
RATE OPEN_PARENS CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateRate, nil) }
| RATE OPEN_PARENS CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateRate, $6) }
| COUNT_OVER_TIME OPEN_PARENS CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateCountOverTime, nil) }
| COUNT_OVER_TIME OPEN_PARENS CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateCountOverTime, $6) }
| QUANTILE_OVER_TIME OPEN_PARENS attribute COMMA numericList CLOSE_PARENS { $$ = newMetricsAggregateQuantileOverTime($3, $5, nil) }
RATE OPEN_PARENS CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateRate, nil) }
| RATE OPEN_PARENS CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateRate, $6) }
| COUNT_OVER_TIME OPEN_PARENS CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateCountOverTime, nil) }
| COUNT_OVER_TIME OPEN_PARENS CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateCountOverTime, $6) }
| QUANTILE_OVER_TIME OPEN_PARENS attribute COMMA numericList CLOSE_PARENS { $$ = newMetricsAggregateQuantileOverTime($3, $5, nil) }
| QUANTILE_OVER_TIME OPEN_PARENS attribute COMMA numericList CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregateQuantileOverTime($3, $5, $9) }
| HISTOGRAM_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS { $$ = newMetricsAggregateHistogramOverTime($3, nil) }
| HISTOGRAM_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregateHistogramOverTime($3, $7) }
;

// **********************
Expand Down
Loading
Loading