Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions pkg/querybackend/block_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,20 @@ func (s *testSuite) Test_SeriesLabels() {
s.Assert().JSONEq(string(expected), string(actual))
}

var startTime = time.Unix(1739263329, 0)

func (s *testSuite) Test_QueryTimeSeries() {
query := &queryv1.Query{
QueryType: queryv1.QueryType_QUERY_TIME_SERIES,
TimeSeries: &queryv1.TimeSeriesQuery{
GroupBy: []string{"service_name"},
Step: 1.0, // 1 second step
Step: 30.0,
},
}

req := &queryv1.InvokeRequest{
StartTime: time.Now().Add(-1 * time.Hour).UnixMilli(),
EndTime: time.Now().UnixMilli(),
StartTime: startTime.UnixMilli(),
EndTime: startTime.Add(time.Hour).UnixMilli(),
Query: []*queryv1.Query{query},
QueryPlan: s.plan,
LabelSelector: "{}",
Expand All @@ -262,6 +264,47 @@ func (s *testSuite) Test_QueryTimeSeries() {
s.Require().NotNil(resp)
s.Require().Len(resp.Reports, 1)
s.Require().NotNil(resp.Reports[0].TimeSeries)

actual, _ := json.Marshal(resp.Reports[0].TimeSeries.TimeSeries)
expected, err := os.ReadFile("testdata/fixtures/time_series.json")
s.Require().NoError(err)
s.Assert().JSONEq(string(expected), string(actual))
}

// When there is only one report we don't run the aggregate method. This check ensures that the timeseries, is still correctly formatted.
func (s *testSuite) Test_QueryTimeSeriesOneReport() {
query := &queryv1.Query{
QueryType: queryv1.QueryType_QUERY_TIME_SERIES,
TimeSeries: &queryv1.TimeSeriesQuery{
GroupBy: []string{"service_name"},
Step: 30.0,
},
}

// shorten plan so there is only one report
shorterPlan := s.plan.CloneVT()
shorterPlan.Root = s.plan.Root.CloneVT()
shorterPlan.Root.Blocks = s.plan.Root.Blocks[:1]

req := &queryv1.InvokeRequest{
StartTime: startTime.UnixMilli(),
EndTime: startTime.Add(time.Hour).UnixMilli(),
Query: []*queryv1.Query{query},
QueryPlan: shorterPlan,
LabelSelector: "{}",
Tenant: s.tenant,
}

resp, err := s.reader.Invoke(s.ctx, req)
s.Require().NoError(err)
s.Require().NotNil(resp)
s.Require().Len(resp.Reports, 1)
s.Require().NotNil(resp.Reports[0].TimeSeries)

actual, _ := json.Marshal(resp.Reports[0].TimeSeries.TimeSeries)
expected, err := os.ReadFile("testdata/fixtures/time_series_first_block.json")
s.Require().NoError(err)
s.Assert().JSONEq(string(expected), string(actual))
}

func (s *testSuite) Test_QueryTree_All_Tenant_Isolation() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/querybackend/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ func registerQueryType(
rt queryv1.ReportType,
q queryHandler,
a aggregatorProvider,
alwaysAggregate bool, // this option will always call the aggregate method for this report type, so it will also run when there is only one report
deps ...block.Section,
) {
registerQueryReportType(qt, rt)
registerQueryHandler(qt, q)
registerQueryDependencies(qt, deps...)
registerAggregator(rt, a)
registerAggregator(rt, a, alwaysAggregate)
}

type blockContext struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/querybackend/query_label_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func init() {
queryv1.ReportType_REPORT_LABEL_NAMES,
queryLabelNames,
newLabelNameAggregator,
false,
[]block.Section{block.SectionTSDB}...,
)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/querybackend/query_label_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func init() {
queryv1.ReportType_REPORT_LABEL_VALUES,
queryLabelValues,
newLabelValueAggregator,
false,
[]block.Section{block.SectionTSDB}...,
)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/querybackend/query_pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func init() {
queryv1.ReportType_REPORT_PPROF,
queryPprof,
newPprofAggregator,
false,
[]block.Section{
block.SectionTSDB,
block.SectionProfiles,
Expand Down
1 change: 1 addition & 0 deletions pkg/querybackend/query_series_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func init() {
queryv1.ReportType_REPORT_SERIES_LABELS,
querySeriesLabels,
newSeriesLabelsAggregator,
false,
[]block.Section{block.SectionTSDB}...,
)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/querybackend/query_time_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func init() {
queryv1.ReportType_REPORT_TIME_SERIES,
queryTimeSeries,
newTimeSeriesAggregator,
true,
[]block.Section{
block.SectionTSDB,
block.SectionProfiles,
Expand Down
1 change: 1 addition & 0 deletions pkg/querybackend/query_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func init() {
queryv1.ReportType_REPORT_TREE,
queryTree,
newTreeAggregator,
false,
[]block.Section{
block.SectionTSDB,
block.SectionProfiles,
Expand Down
39 changes: 32 additions & 7 deletions pkg/querybackend/report_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
var (
aggregatorMutex = new(sync.RWMutex)
aggregators = map[queryv1.ReportType]aggregatorProvider{}
alwaysAggregate = map[queryv1.ReportType]struct{}{}
queryReportType = map[queryv1.QueryType]queryv1.ReportType{}
)

Expand All @@ -23,14 +24,29 @@ type aggregator interface {
build() *queryv1.Report
}

func registerAggregator(t queryv1.ReportType, ap aggregatorProvider) {
func registerAggregator(t queryv1.ReportType, ap aggregatorProvider, always bool) {
aggregatorMutex.Lock()
defer aggregatorMutex.Unlock()
_, ok := aggregators[t]
if ok {
panic(fmt.Sprintf("%s: aggregator already registered", t))
}
aggregators[t] = ap

if always {
_, ok := alwaysAggregate[t]
if ok {
panic(fmt.Sprintf("%s: aggregator already registered to always aggregat", t))
}
alwaysAggregate[t] = struct{}{}
}
}

func isAlwaysAggregate(t queryv1.ReportType) bool {
aggregatorMutex.RLock()
defer aggregatorMutex.RUnlock()
_, result := alwaysAggregate[t]
return result
}

func getAggregator(r *queryv1.InvokeRequest, x *queryv1.Report) (aggregator, error) {
Expand Down Expand Up @@ -97,13 +113,22 @@ func (ra *reportAggregator) aggregateReport(r *queryv1.Report) (err error) {
ra.sm.Lock()
v, found := ra.staged[r.ReportType]
if !found {
// We delay aggregation until we have at least two
// reports of the same type. Otherwise, we just store
// the report and will return it as is, if it is the
// only one.
ra.staged[r.ReportType] = r
// For most ReportTypes we delay aggregation until we have at least two
// reports of the same type. In case there is only one we will
// return it as is.
if !isAlwaysAggregate(r.ReportType) {
ra.staged[r.ReportType] = r
ra.sm.Unlock()
return nil
}

// Some ReportTypes need to call the aggregator for correctness even when
// there is only single instance, in that case call the aggregator right
// away and mark the report type appropriately in the staged map.
err = ra.aggregateReportNoCheck(r)
ra.staged[r.ReportType] = nil
ra.sm.Unlock()
return nil
return err
}
// Found a staged report of the same type.
if v != nil {
Expand Down
18 changes: 9 additions & 9 deletions pkg/querybackend/report_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func mockAggregatorProvider(req *queryv1.InvokeRequest) aggregator {

func TestReportAggregator_SingleReport(t *testing.T) {
reportType := queryv1.ReportType(999) // use a high number that won't conflict with other registrations
registerAggregator(reportType, mockAggregatorProvider)
registerAggregator(reportType, mockAggregatorProvider, false)
defer func() {
aggregatorMutex.Lock()
delete(aggregators, reportType)
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestReportAggregator_SingleReport(t *testing.T) {

func TestReportAggregator_TwoReports(t *testing.T) {
reportType := queryv1.ReportType(999)
registerAggregator(reportType, mockAggregatorProvider)
registerAggregator(reportType, mockAggregatorProvider, false)
defer func() {
aggregatorMutex.Lock()
delete(aggregators, reportType)
Expand Down Expand Up @@ -115,8 +115,8 @@ func TestReportAggregator_MultipleTypes(t *testing.T) {
type1 := queryv1.ReportType(999)
type2 := queryv1.ReportType(998)

registerAggregator(type1, mockAggregatorProvider)
registerAggregator(type2, mockAggregatorProvider)
registerAggregator(type1, mockAggregatorProvider, false)
registerAggregator(type2, mockAggregatorProvider, false)
defer func() {
aggregatorMutex.Lock()
delete(aggregators, type1)
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestReportAggregator_NilReport(t *testing.T) {

func TestReportAggregator_AggregateResponse(t *testing.T) {
reportType := queryv1.ReportType(999)
registerAggregator(reportType, mockAggregatorProvider)
registerAggregator(reportType, mockAggregatorProvider, false)
defer func() {
aggregatorMutex.Lock()
delete(aggregators, reportType)
Expand All @@ -194,7 +194,7 @@ func TestReportAggregator_AggregateResponse(t *testing.T) {

func TestReportAggregator_ConcurrentAccess(t *testing.T) {
reportType := queryv1.ReportType(999)
registerAggregator(reportType, mockAggregatorProvider)
registerAggregator(reportType, mockAggregatorProvider, false)
defer func() {
aggregatorMutex.Lock()
delete(aggregators, reportType)
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestReportAggregator_ConcurrentAccess(t *testing.T) {

func TestGetAggregator(t *testing.T) {
reportType := queryv1.ReportType(999)
registerAggregator(reportType, mockAggregatorProvider)
registerAggregator(reportType, mockAggregatorProvider, false)
defer func() {
aggregatorMutex.Lock()
delete(aggregators, reportType)
Expand All @@ -256,9 +256,9 @@ func TestGetAggregator_UnknownReportType(t *testing.T) {
func TestRegisterAggregator_Duplicate(t *testing.T) {
reportType := queryv1.ReportType(999)

registerAggregator(reportType, mockAggregatorProvider)
registerAggregator(reportType, mockAggregatorProvider, false)
assert.Panics(t, func() {
registerAggregator(reportType, mockAggregatorProvider)
registerAggregator(reportType, mockAggregatorProvider, false)
})

aggregatorMutex.Lock()
Expand Down
Loading
Loading