From 423d08ab57111388b173c2e5b05a1762be93dd8e Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 14 Nov 2025 14:14:57 +0000 Subject: [PATCH 1/3] fix: SelectSeries not correct for single reports The query-backend will only call the aggregate functions, when there is more than one report for particular type. In the time series case the aggregate function is also the one that ensures order/correctness of the shape of data. I considered doing this inside every query, but eventually it felt better to do this when we would normally "aggregate" --- pkg/querybackend/block_reader_test.go | 49 +++++- pkg/querybackend/query.go | 3 +- pkg/querybackend/query_label_names.go | 1 + pkg/querybackend/query_label_values.go | 1 + pkg/querybackend/query_pprof.go | 1 + pkg/querybackend/query_series_labels.go | 1 + pkg/querybackend/query_time_series.go | 1 + pkg/querybackend/query_tree.go | 1 + pkg/querybackend/report_aggregator.go | 35 +++- pkg/querybackend/report_aggregator_test.go | 18 +- .../testdata/fixtures/time_series.json | 158 ++++++++++++++++++ .../fixtures/time_series_first_block.json | 56 +++++++ 12 files changed, 304 insertions(+), 21 deletions(-) create mode 100644 pkg/querybackend/testdata/fixtures/time_series.json create mode 100644 pkg/querybackend/testdata/fixtures/time_series_first_block.json diff --git a/pkg/querybackend/block_reader_test.go b/pkg/querybackend/block_reader_test.go index b1abdb7d07..1f76614ca6 100644 --- a/pkg/querybackend/block_reader_test.go +++ b/pkg/querybackend/block_reader_test.go @@ -239,18 +239,20 @@ func (s *testSuite) Test_SeriesLabels() { s.Assert().JSONEq(string(expected), string(actual)) } +var startTime = time.Unix(1739263329, 0) + func (s *testSuite) Test_QueryTimeSeries() { query := &queryv1.Query{ QueryType: queryv1.QueryType_QUERY_TIME_SERIES, TimeSeries: &queryv1.TimeSeriesQuery{ GroupBy: []string{"service_name"}, - Step: 1.0, // 1 second step + Step: 30.0, }, } req := &queryv1.InvokeRequest{ - StartTime: time.Now().Add(-1 * time.Hour).UnixMilli(), - EndTime: time.Now().UnixMilli(), + StartTime: startTime.UnixMilli(), + EndTime: startTime.Add(time.Hour).UnixMilli(), Query: []*queryv1.Query{query}, QueryPlan: s.plan, LabelSelector: "{}", @@ -262,6 +264,47 @@ func (s *testSuite) Test_QueryTimeSeries() { s.Require().NotNil(resp) s.Require().Len(resp.Reports, 1) s.Require().NotNil(resp.Reports[0].TimeSeries) + + actual, _ := json.Marshal(resp.Reports[0].TimeSeries.TimeSeries) + expected, err := os.ReadFile("testdata/fixtures/time_series.json") + s.Require().NoError(err) + s.Assert().JSONEq(string(expected), string(actual)) +} + +// When there is only one report we don't run the aggregate method. This check ensures that the timeseries, is still correctly formatted. +func (s *testSuite) Test_QueryTimeSeriesOneReport() { + query := &queryv1.Query{ + QueryType: queryv1.QueryType_QUERY_TIME_SERIES, + TimeSeries: &queryv1.TimeSeriesQuery{ + GroupBy: []string{"service_name"}, + Step: 30.0, + }, + } + + // shorten plan so there is only one report + shorterPlan := s.plan.CloneVT() + shorterPlan.Root = s.plan.Root.CloneVT() + shorterPlan.Root.Blocks = s.plan.Root.Blocks[:1] + + req := &queryv1.InvokeRequest{ + StartTime: startTime.UnixMilli(), + EndTime: startTime.Add(time.Hour).UnixMilli(), + Query: []*queryv1.Query{query}, + QueryPlan: shorterPlan, + LabelSelector: "{}", + Tenant: s.tenant, + } + + resp, err := s.reader.Invoke(s.ctx, req) + s.Require().NoError(err) + s.Require().NotNil(resp) + s.Require().Len(resp.Reports, 1) + s.Require().NotNil(resp.Reports[0].TimeSeries) + + actual, _ := json.Marshal(resp.Reports[0].TimeSeries.TimeSeries) + expected, err := os.ReadFile("testdata/fixtures/time_series_first_block.json") + s.Require().NoError(err) + s.Assert().JSONEq(string(expected), string(actual)) } func (s *testSuite) Test_QueryTree_All_Tenant_Isolation() { diff --git a/pkg/querybackend/query.go b/pkg/querybackend/query.go index dc9fb3c65e..87c86bdc98 100644 --- a/pkg/querybackend/query.go +++ b/pkg/querybackend/query.go @@ -67,12 +67,13 @@ func registerQueryType( rt queryv1.ReportType, q queryHandler, a aggregatorProvider, + alwaysAggregate bool, // this option will always call the aggregate method for this report type, so it will also run when there is only one report deps ...block.Section, ) { registerQueryReportType(qt, rt) registerQueryHandler(qt, q) registerQueryDependencies(qt, deps...) - registerAggregator(rt, a) + registerAggregator(rt, a, alwaysAggregate) } type blockContext struct { diff --git a/pkg/querybackend/query_label_names.go b/pkg/querybackend/query_label_names.go index 447d359307..c6fab33b76 100644 --- a/pkg/querybackend/query_label_names.go +++ b/pkg/querybackend/query_label_names.go @@ -18,6 +18,7 @@ func init() { queryv1.ReportType_REPORT_LABEL_NAMES, queryLabelNames, newLabelNameAggregator, + false, []block.Section{block.SectionTSDB}..., ) } diff --git a/pkg/querybackend/query_label_values.go b/pkg/querybackend/query_label_values.go index dc7552f723..3c38958ca6 100644 --- a/pkg/querybackend/query_label_values.go +++ b/pkg/querybackend/query_label_values.go @@ -20,6 +20,7 @@ func init() { queryv1.ReportType_REPORT_LABEL_VALUES, queryLabelValues, newLabelValueAggregator, + false, []block.Section{block.SectionTSDB}..., ) } diff --git a/pkg/querybackend/query_pprof.go b/pkg/querybackend/query_pprof.go index 5fb16cf22c..e1526f3b96 100644 --- a/pkg/querybackend/query_pprof.go +++ b/pkg/querybackend/query_pprof.go @@ -21,6 +21,7 @@ func init() { queryv1.ReportType_REPORT_PPROF, queryPprof, newPprofAggregator, + false, []block.Section{ block.SectionTSDB, block.SectionProfiles, diff --git a/pkg/querybackend/query_series_labels.go b/pkg/querybackend/query_series_labels.go index ab0ad94249..ead718aa6f 100644 --- a/pkg/querybackend/query_series_labels.go +++ b/pkg/querybackend/query_series_labels.go @@ -21,6 +21,7 @@ func init() { queryv1.ReportType_REPORT_SERIES_LABELS, querySeriesLabels, newSeriesLabelsAggregator, + false, []block.Section{block.SectionTSDB}..., ) } diff --git a/pkg/querybackend/query_time_series.go b/pkg/querybackend/query_time_series.go index faf8dc9c32..c2b10d1370 100644 --- a/pkg/querybackend/query_time_series.go +++ b/pkg/querybackend/query_time_series.go @@ -22,6 +22,7 @@ func init() { queryv1.ReportType_REPORT_TIME_SERIES, queryTimeSeries, newTimeSeriesAggregator, + true, []block.Section{ block.SectionTSDB, block.SectionProfiles, diff --git a/pkg/querybackend/query_tree.go b/pkg/querybackend/query_tree.go index ede1f300be..d311f32941 100644 --- a/pkg/querybackend/query_tree.go +++ b/pkg/querybackend/query_tree.go @@ -19,6 +19,7 @@ func init() { queryv1.ReportType_REPORT_TREE, queryTree, newTreeAggregator, + false, []block.Section{ block.SectionTSDB, block.SectionProfiles, diff --git a/pkg/querybackend/report_aggregator.go b/pkg/querybackend/report_aggregator.go index dd818ab1e0..491de350e6 100644 --- a/pkg/querybackend/report_aggregator.go +++ b/pkg/querybackend/report_aggregator.go @@ -10,6 +10,7 @@ import ( var ( aggregatorMutex = new(sync.RWMutex) aggregators = map[queryv1.ReportType]aggregatorProvider{} + alwaysAggregate = map[queryv1.ReportType]struct{}{} queryReportType = map[queryv1.QueryType]queryv1.ReportType{} ) @@ -23,7 +24,7 @@ type aggregator interface { build() *queryv1.Report } -func registerAggregator(t queryv1.ReportType, ap aggregatorProvider) { +func registerAggregator(t queryv1.ReportType, ap aggregatorProvider, always bool) { aggregatorMutex.Lock() defer aggregatorMutex.Unlock() _, ok := aggregators[t] @@ -31,6 +32,21 @@ func registerAggregator(t queryv1.ReportType, ap aggregatorProvider) { panic(fmt.Sprintf("%s: aggregator already registered", t)) } aggregators[t] = ap + + if always { + _, ok := alwaysAggregate[t] + if ok { + panic(fmt.Sprintf("%s: aggregator already registered to always aggregat", t)) + } + alwaysAggregate[t] = struct{}{} + } +} + +func isAlwaysAggregate(t queryv1.ReportType) bool { + aggregatorMutex.RLock() + defer aggregatorMutex.RUnlock() + _, result := alwaysAggregate[t] + return result } func getAggregator(r *queryv1.InvokeRequest, x *queryv1.Report) (aggregator, error) { @@ -97,13 +113,16 @@ func (ra *reportAggregator) aggregateReport(r *queryv1.Report) (err error) { ra.sm.Lock() v, found := ra.staged[r.ReportType] if !found { - // We delay aggregation until we have at least two - // reports of the same type. Otherwise, we just store - // the report and will return it as is, if it is the - // only one. - ra.staged[r.ReportType] = r - ra.sm.Unlock() - return nil + // For most ReportTypes we delay aggregation until we have at least two + // reports of the same type. In case there is only one we will + // return it as is. + // Some ReportTypes need to call the aggregator for correctness, in that case we call it right away. + if !isAlwaysAggregate(r.ReportType) { + ra.staged[r.ReportType] = r + ra.sm.Unlock() + return nil + } + ra.staged[r.ReportType] = nil } // Found a staged report of the same type. if v != nil { diff --git a/pkg/querybackend/report_aggregator_test.go b/pkg/querybackend/report_aggregator_test.go index 9fb696e31d..73ea9bade6 100644 --- a/pkg/querybackend/report_aggregator_test.go +++ b/pkg/querybackend/report_aggregator_test.go @@ -50,7 +50,7 @@ func mockAggregatorProvider(req *queryv1.InvokeRequest) aggregator { func TestReportAggregator_SingleReport(t *testing.T) { reportType := queryv1.ReportType(999) // use a high number that won't conflict with other registrations - registerAggregator(reportType, mockAggregatorProvider) + registerAggregator(reportType, mockAggregatorProvider, false) defer func() { aggregatorMutex.Lock() delete(aggregators, reportType) @@ -78,7 +78,7 @@ func TestReportAggregator_SingleReport(t *testing.T) { func TestReportAggregator_TwoReports(t *testing.T) { reportType := queryv1.ReportType(999) - registerAggregator(reportType, mockAggregatorProvider) + registerAggregator(reportType, mockAggregatorProvider, false) defer func() { aggregatorMutex.Lock() delete(aggregators, reportType) @@ -115,8 +115,8 @@ func TestReportAggregator_MultipleTypes(t *testing.T) { type1 := queryv1.ReportType(999) type2 := queryv1.ReportType(998) - registerAggregator(type1, mockAggregatorProvider) - registerAggregator(type2, mockAggregatorProvider) + registerAggregator(type1, mockAggregatorProvider, false) + registerAggregator(type2, mockAggregatorProvider, false) defer func() { aggregatorMutex.Lock() delete(aggregators, type1) @@ -167,7 +167,7 @@ func TestReportAggregator_NilReport(t *testing.T) { func TestReportAggregator_AggregateResponse(t *testing.T) { reportType := queryv1.ReportType(999) - registerAggregator(reportType, mockAggregatorProvider) + registerAggregator(reportType, mockAggregatorProvider, false) defer func() { aggregatorMutex.Lock() delete(aggregators, reportType) @@ -194,7 +194,7 @@ func TestReportAggregator_AggregateResponse(t *testing.T) { func TestReportAggregator_ConcurrentAccess(t *testing.T) { reportType := queryv1.ReportType(999) - registerAggregator(reportType, mockAggregatorProvider) + registerAggregator(reportType, mockAggregatorProvider, false) defer func() { aggregatorMutex.Lock() delete(aggregators, reportType) @@ -230,7 +230,7 @@ func TestReportAggregator_ConcurrentAccess(t *testing.T) { func TestGetAggregator(t *testing.T) { reportType := queryv1.ReportType(999) - registerAggregator(reportType, mockAggregatorProvider) + registerAggregator(reportType, mockAggregatorProvider, false) defer func() { aggregatorMutex.Lock() delete(aggregators, reportType) @@ -256,9 +256,9 @@ func TestGetAggregator_UnknownReportType(t *testing.T) { func TestRegisterAggregator_Duplicate(t *testing.T) { reportType := queryv1.ReportType(999) - registerAggregator(reportType, mockAggregatorProvider) + registerAggregator(reportType, mockAggregatorProvider, false) assert.Panics(t, func() { - registerAggregator(reportType, mockAggregatorProvider) + registerAggregator(reportType, mockAggregatorProvider, false) }) aggregatorMutex.Lock() diff --git a/pkg/querybackend/testdata/fixtures/time_series.json b/pkg/querybackend/testdata/fixtures/time_series.json new file mode 100644 index 0000000000..7912de611e --- /dev/null +++ b/pkg/querybackend/testdata/fixtures/time_series.json @@ -0,0 +1,158 @@ +[ + { + "labels": [ + { + "name": "service_name", + "value": "pyroscope" + } + ], + "points": [ + { + "value": 635460859340, + "timestamp": 1739263359000 + }, + { + "value": 3596227770441, + "timestamp": 1739263389000 + }, + { + "value": 3292115355100, + "timestamp": 1739263419000 + }, + { + "value": 3117194222351, + "timestamp": 1739263449000 + }, + { + "value": 3334880121438, + "timestamp": 1739263479000 + }, + { + "value": 4004142845972, + "timestamp": 1739263509000 + }, + { + "value": 3784707490611, + "timestamp": 1739263539000 + }, + { + "value": 3948517814712, + "timestamp": 1739263569000 + }, + { + "value": 3855556351880, + "timestamp": 1739263599000 + }, + { + "value": 3783064769056, + "timestamp": 1739263629000 + }, + { + "value": 2118809970223, + "timestamp": 1739263659000 + }, + { + "value": 1200000120, + "timestamp": 1739264019000 + }, + { + "value": 4561295991965, + "timestamp": 1739264049000 + }, + { + "value": 3767958932068, + "timestamp": 1739264079000 + }, + { + "value": 3152613326092, + "timestamp": 1739264109000 + }, + { + "value": 4331520617166, + "timestamp": 1739264139000 + }, + { + "value": 3848956851919, + "timestamp": 1739264169000 + }, + { + "value": 3648049166680, + "timestamp": 1739264199000 + } + ] + }, + { + "labels": [ + { + "name": "service_name", + "value": "test-app" + } + ], + "points": [ + { + "value": 349200034920, + "timestamp": 1739263509000 + }, + { + "value": 436500043650, + "timestamp": 1739263539000 + }, + { + "value": 261900026190, + "timestamp": 1739263569000 + }, + { + "value": 261900026190, + "timestamp": 1739263599000 + }, + { + "value": 261900026190, + "timestamp": 1739263629000 + }, + { + "value": 261900026190, + "timestamp": 1739263659000 + }, + { + "value": 261900026190, + "timestamp": 1739263689000 + }, + { + "value": 261900026190, + "timestamp": 1739263719000 + }, + { + "value": 261900026190, + "timestamp": 1739263749000 + }, + { + "value": 261900026190, + "timestamp": 1739263779000 + }, + { + "value": 261900026190, + "timestamp": 1739263809000 + }, + { + "value": 261900026190, + "timestamp": 1739263839000 + }, + { + "value": 261900026190, + "timestamp": 1739263869000 + }, + { + "value": 174600017460, + "timestamp": 1739263899000 + }, + { + "value": 8730000873, + "timestamp": 1739264709000 + }, + { + "value": 17460001746, + "timestamp": 1739264739000 + } + ] + } +] diff --git a/pkg/querybackend/testdata/fixtures/time_series_first_block.json b/pkg/querybackend/testdata/fixtures/time_series_first_block.json new file mode 100644 index 0000000000..8ca003ce2a --- /dev/null +++ b/pkg/querybackend/testdata/fixtures/time_series_first_block.json @@ -0,0 +1,56 @@ +[ + { + "labels": [ + { + "name": "service_name", + "value": "pyroscope" + } + ], + "points": [ + { + "value": 635460859340, + "timestamp": 1739263359000 + }, + { + "value": 3596227770441, + "timestamp": 1739263389000 + }, + { + "value": 3292115355100, + "timestamp": 1739263419000 + }, + { + "value": 3117194222351, + "timestamp": 1739263449000 + }, + { + "value": 3334880121438, + "timestamp": 1739263479000 + }, + { + "value": 4004142845972, + "timestamp": 1739263509000 + }, + { + "value": 3784707490611, + "timestamp": 1739263539000 + }, + { + "value": 3948517814712, + "timestamp": 1739263569000 + }, + { + "value": 3855556351880, + "timestamp": 1739263599000 + }, + { + "value": 3783064769056, + "timestamp": 1739263629000 + }, + { + "value": 2118809970223, + "timestamp": 1739263659000 + } + ] + } +] From d41f77e11eb71f436b808d86a72042a929a7c177 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Mon, 17 Nov 2025 12:40:48 +0000 Subject: [PATCH 2/3] Ensure we are holding the lock when initializing the aggreagator. --- pkg/querybackend/report_aggregator.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/querybackend/report_aggregator.go b/pkg/querybackend/report_aggregator.go index 491de350e6..3dda8787d7 100644 --- a/pkg/querybackend/report_aggregator.go +++ b/pkg/querybackend/report_aggregator.go @@ -116,13 +116,19 @@ func (ra *reportAggregator) aggregateReport(r *queryv1.Report) (err error) { // For most ReportTypes we delay aggregation until we have at least two // reports of the same type. In case there is only one we will // return it as is. - // Some ReportTypes need to call the aggregator for correctness, in that case we call it right away. if !isAlwaysAggregate(r.ReportType) { ra.staged[r.ReportType] = r ra.sm.Unlock() return nil } + + // Some ReportTypes need to call the aggregator for correctness even when + // there is only single instance, in that case call the aggregator right + // away and mark the report type appropriately in the staged map. + err = ra.aggregateReportNoCheck(r) ra.staged[r.ReportType] = nil + ra.sm.Unlock() + return nil } // Found a staged report of the same type. if v != nil { From bfa5e4f954f07dee151457d5aef4b5ec141f8253 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Mon, 17 Nov 2025 12:52:51 +0000 Subject: [PATCH 3/3] Use error --- pkg/querybackend/report_aggregator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querybackend/report_aggregator.go b/pkg/querybackend/report_aggregator.go index 3dda8787d7..7eab4c9dee 100644 --- a/pkg/querybackend/report_aggregator.go +++ b/pkg/querybackend/report_aggregator.go @@ -128,7 +128,7 @@ func (ra *reportAggregator) aggregateReport(r *queryv1.Report) (err error) { err = ra.aggregateReportNoCheck(r) ra.staged[r.ReportType] = nil ra.sm.Unlock() - return nil + return err } // Found a staged report of the same type. if v != nil {