Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve groupBy query granularity translation with 2x query performance improve when issued from sql layer #11379

Merged
merged 16 commits into from
Jul 11, 2021

Conversation

kaijianding
Copy link
Contributor

@kaijianding kaijianding commented Jun 23, 2021

Description

Sql like "select city_id, time_floor(__time to day), count(*) as c from table group by city_id, time_floor(__time to day)", the original translated query is granularity=all and dimensions:[d0, d1]
The better plan is granularity=day and dimensions:[d0] which reduces the computation cost in historical/realtime node.


example sql:

select
 sum("count") as c,
 Floor(__time to DAY) as __time
from
 "test_table"
 __time >= TIMESTAMP '2021-05-19 00:00:00'
 and __time < TIMESTAMP '2021-05-20 23:59:59'
group by
 Floor(__time to DAY)
having
 c > 0
[
    {
        "c": 104236286,
        "__time": "2021-05-19T00:00:00.000+08:00"
    },
    {
        "c": 12162156,
        "__time": "2021-05-20T00:00:00.000+08:00"
    }
]

test with the master version and vectorizeVirtualColumns to true/false:

before patch: true: 1.67s false: 3.29s
after patch: true: 810ms false:1.55s

2x performance improve


Key changed/added classes in this PR
  • GroupByStrategyV2
  • DefaultLimitSpec

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@clintropolis
Copy link
Member

it would probably also be worth measuring the code prior to this change using the query context parameter vectorizeVirtualColumns set to true (it is off by default currently, though this might soon change) so that the original query with all granularity can use the vectorized group by engine and expression processing.

That said, using granularity instead of expressions very well could still be faster, even if they can use the vectorized engine.

@kaijianding
Copy link
Contributor Author

kaijianding commented Jun 25, 2021

test with the master version:

before patch: true: 1.67s false: 3.29s
after patch: true: 810ms false:1.55s

@clintropolis

@kaijianding kaijianding changed the title improve groupBy query granularity translation with 5x query performance improve when issued from sql layer improve groupBy query granularity translation with 2x query performance improve when issued from sql layer Jun 25, 2021
Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kaijianding, it's a nice idea. Most of my comments are about relocating the new codes into the sql layer, so that all groupBy engines can benefit from the improved query plan.

int timestampDimensionIndexInDimensions = grouping.getDimensions().indexOf(dimensionExpression);
theContext = new HashMap<>(plannerContext.getQueryContext());
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, dimensionExpression.getOutputName());
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, queryGranularity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not passing queryGranularity to the constructor of GroupByQuery directly?

Copy link
Contributor Author

@kaijianding kaijianding Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is also my first thought.
I tried this solution to rewrite the GroupBy query's dimensions and granularity. But later I found it is so difficult to correctly handle the QueryMaker.remapFields() to deal with the new dimensions without time_floor dimension and to add the time_floor dimension back when dealing with the postAggs and the nested groupBy query.

So I see it in another perspective: the timestamp_result_field thing is similar with the universalTimestamp solution, it is a optimization of the groupBy process. the compute node(historical and realtime node) use the new granularity and new dimensions and add the time_floor dimension back when dealing with the postAggs. The sql layer will not be aware of this optimization. The only help from the sql layer is to translate the time_floor dimensionExpression.

This solution is much more straightforward and easier for understanding

Copy link
Contributor Author

@kaijianding kaijianding Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminded me that I can parse the granularity from the virtual columns, thus this part of code can be moved to GroupByStrategyV2 and then the sql layer is not aware of this optimization at all. Will do this change.

Comment on lines 226 to 242
if (hasTimestampResultField) {
// sql like "group by city_id,time_floor(__time to day)",
// the original translated query is granularity=all and dimensions:[d0, d1]
// the better plan is granularity=day and dimensions:[d0]
// but the ResultRow structure is changed from [d0, d1] to [__time, d0]
// this structure should be fixed as [d0, d1] (actually it is [d0, __time]) before postAggs are called
final Granularity timestampResultFieldGranularity
= query.getContextValue(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY);
dimensionSpecs =
query.getDimensions()
.stream()
.filter(dimensionSpec -> !dimensionSpec.getOutputName().equals(timestampResultField))
.collect(Collectors.toList());
granularity = timestampResultFieldGranularity;
universalTimestamp = null;
int timestampResultFieldIndexInOriginalDimensions = query.getContextValue(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX);
context.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, timestampResultFieldIndexInOriginalDimensions > 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query planning should be better to be done in DruidQuery. Can we move this to there?

Copy link
Contributor Author

@kaijianding kaijianding Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the previous reply

@@ -285,6 +342,34 @@ public boolean doMergeResults(final GroupByQuery query)
}
}

private void fixResultRowWithTimestampResultField(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This remapping should be done in the sql layer. See QueryMaker.remapFields().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed due to the time_floor dimension can be referenced by the postAgg and the nested groupBy. This is not the QueryMaker.remapFields() case. QueryMaker.remapFields() is to map the outer-most group by result to the sql level result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see that __time cannot be referenced by the post aggregators today, but what is the issue with the nested groupBy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the outer process may reference the inner groupby query's time_floor dimension

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. It's not only about nested groupBys, but nested queries in general when those queries are created by the sql planner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe moveOrReplicateTimestampInRow() is a better name.

.collect(Collectors.toList());
String timestampField = getContextValue(CTX_TIMESTAMP_RESULT_FIELD);
if (timestampField != null) {
dimensionNames.add(timestampField);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think groupBy query engine should know about the timestamp column remapping at all. It should be able to use the __time column as it is and the sql layer should handle the column remapping.

Copy link
Contributor Author

@kaijianding kaijianding Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the timestampField maybe referenced by the postAgg, so the timestampField should be added back to the dependencies of the postAggs here, otherwise the constructor will fail.

But I think postAggs should not be applied on compute nodes, so I will remove the postAggs in GroupByStrategyV2.newQuery and remove timestampField codes from GroupByQuery

Granularity queryGranularity = null;

if (!grouping.getDimensions().isEmpty()) {
for (DimensionExpression dimensionExpression : grouping.getDimensions()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we exclude the dimensionExpression from the dimensionSpecs that is passed to the constructor of GroupByQuery if it's a time_floor function? It seems duplicate since we can now pass the right granularity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the first reply.

@@ -285,6 +342,34 @@ public boolean doMergeResults(final GroupByQuery query)
}
}

private void fixResultRowWithTimestampResultField(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. It's not only about nested groupBys, but nested queries in general when those queries are created by the sql planner.

return Sequences.map(
mergedResults,
row -> {
final ResultRow resultRow = ResultRow.create(query.getResultRowPostAggregatorStart());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final ResultRow resultRow = ResultRow.create(query.getResultRowPostAggregatorStart());
final ResultRow resultRow = ResultRow.create(query.getResultRowSizeWithoutPostAggregators());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaijianding thanks for the explanation. The idea LGTM, but I think the plan optimization should be done in the sql planner. Please see my comments for more details.

@@ -220,6 +221,15 @@ public boolean isLimited()
sortingNeeded = !query.getGranularity().equals(Granularities.ALL) && query.getContextSortByDimsFirst();
}

if (!sortingNeeded) {
Map<String, Object> timestampFieldContext = GroupByQueryHelper.findTimestampResultField(query);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not finding the index from the query context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole timestampResult optimization doesn't affect the original query context now, the modification to DruidQuery has been withdraw, currently only GroupByStrategyV2 is actually modified.

Comment on lines 228 to 255
if (hasTimestampResultField) {
// sql like "group by city_id,time_floor(__time to day)",
// the original translated query is granularity=all and dimensions:[d0, d1]
// the better plan is granularity=day and dimensions:[d0]
// but the ResultRow structure is changed from [d0, d1] to [__time, d0]
// this structure should be fixed as [d0, d1] (actually it is [d0, __time]) before postAggs are called
final Granularity timestampResultFieldGranularity
= (Granularity) timestampFieldContext.get(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY);
dimensionSpecs =
query.getDimensions()
.stream()
.filter(dimensionSpec -> !dimensionSpec.getOutputName().equals(timestampResultField))
.collect(Collectors.toList());
granularity = timestampResultFieldGranularity;
// when timestampResultField is the last dimension, should set sortByDimsFirst=true,
// otherwise the downstream is sorted by row's timestamp first which makes the final ordering not as expected
timestampResultFieldIndex = (int) timestampFieldContext.get(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX);
if (!query.getContextSortByDimsFirst() && timestampResultFieldIndex == query.getDimensions().size() - 1) {
context.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, true);
}
// when timestampResultField is the first dimension and sortByDimsFirst=true,
// it is actually equals to sortByDimsFirst=false
if (query.getContextSortByDimsFirst() && timestampResultFieldIndex == 0) {
context.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false);
}
// when hasTimestampResultField=true and timestampResultField is neither first nor last dimension,
// the DefaultLimitSpec will always do the reordering
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Druid native query is the execution plan of the sql query. To me, it makes more sense to compute the granularity, dimensions, sortByDimsFirst, etc, and set them in the groupBy query in the sql planner, because the query engine should process whatever the given query plan as it is. This way, we can have one single place that optimizes the query plan which is easier to maintain and improve. Suppose we have multiple different places that modify the given query separately. It will be very hard to track what query will be actually executed.

Copy link
Contributor Author

@kaijianding kaijianding Jul 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree with you that better to do the planning at only one place.
But there are so so so many related code have to be changed to be compatible with the fact that the query.getDimensions() is with time_floor dimension removed, like subtotalsSpec, group set, post agg, having, and many other things.
It's much much easier to make this optimization as group by inner process improvement like fudgeTimestamp feature, and keep the sql layer not affected, and also very easy for understanding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, my apologies if my comment was not clear. What I meant is, something similar to your original approach seems better to me because the sql planner makes all decisions and the groupBy engine does whatever it is told to do. More precisely, I'm thinking of something that DruidQuery.toGroupByQuery() determines the granularity and dimensions. The granularity seems possible to be directly passed to the constructor of GroupByQuery, but for the dimensions, we can pass grouping.getDimensionSpecs() because it makes the query planning easier as you mentioned. Instead, the sql planner passes timestampResultField via queryContext, so that the groupBy engine can adjust dimensions correctly before it starts query execution. This way, all decisions can be made in the sql planner, but groupBy engine just performs one query rewriting per the decision of the planner. I think this is better because 1) we will have only one brain that is responsible for query optimization and 2) the explain plan will return the same native query as what will be actually executed. fudgeTimestamp is rather similar to this approach because the groupBy engine doesn't do anything smart, but just does whatever it is told to do.

Copy link
Contributor Author

@kaijianding kaijianding Jul 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have rollback some codes.
Currently, timestampResultField is passed from query context in DruidQuery.toGroupByQuery.
The granularity=all and sortByDimsFirst are still unchanged in DruidQuery. toGroupByQuery, because some code relies on the granularity and sortByDimsFirst like subtotals, in another word, the basic idea is still to treat this optimization as group by inner process improvement.
Please check the latest version of code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson code is ready, please continue review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The granularity=all and sortByDimsFirst are still unchanged in DruidQuery. toGroupByQuery, because some code relies on the granularity and sortByDimsFirst like subtotals, in another word, the basic idea is still to treat this optimization as group by inner process improvement.

In general, I don't think this should be an engine-level optimization but should be plan-level because the job of the Druid planner is making an optimal native query. Apparently, making this an engine-level optimization will introduce many problems such as 1) the explain result doesn't match to the native query actually being executed, 2) the optimization logic should be replicated for all strategies which the overhead could increase whenever we add a new strategy, 3) the optimization code requires readers to understand the relationship between the code piece in the sql planner and the other piece in the groupBy strategy, 4) having the optimization code spread across DruidQuery and GroupByStrategyV2 can potentially make them tied which can make 2) and 3) harder, and so on.

However, I think I understand why you want to do this. The fundamental problem here seems that the timestamp column name is fixed as __time and must be the first column for merge. Can you please add a comment in the code about these details of the tradeoffs behind the design decision? The comment should explain what the tradeoffs are, such as what code relies on what assumption, what could be benefits and potential problems in this implementation. This explanation should be documented in groupByStrategyV2 and DruidQuery.toGroupByQuery so that other people don't have to go through the same analysis process as what you have done for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add more comments to explain this optimization and tradeoffs in both groupByStrategyV2 and DruidQuery.toGroupByQuery.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding comments!

if (!timestampFieldContext.isEmpty()) {
int timestampResultFieldIndex = (int) timestampFieldContext.get(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX);
// change the sorting order when timestampResultField is neither first nor last dimension
sortingNeeded = timestampResultFieldIndex != 0 && timestampResultFieldIndex != query.getDimensions().size() - 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this depend on query.getContextSortByDimsFirst()? Such as

        sortingNeeded = query.getContextSortByDimsFirst()
                        ? timestampResultFieldIndex != query.getDimensions().size() - 1
                        : timestampResultFieldIndex != 0;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, your code is more easier understanding. I will fix it.

@kaijianding
Copy link
Contributor Author

CI has been passed, the only failure 32167.71 (Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests is irrelevant

}
queryGranularity = granularity;
int timestampDimensionIndexInDimensions = grouping.getDimensions().indexOf(dimensionExpression);
theContext = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it clear all planner context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is going to be passed to query.withOverriddenContext(theContext), so it's no need to repeat other contexts in query.getContext();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but this is quite confusing. We should not overwrite variables unless we have to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is no longer needed.

@@ -1016,6 +1017,42 @@ public GroupByQuery toGroupByQuery()
grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()),
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
// in this case, the timestampResult optimization is not neccessary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the optimization unnecessary in these cases? Or did you mean we don't optimize for these cases yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many reasons let me think this optimization unnecessary for this case.

When query.isApplyLimitPushDown() is true, it means the limit is calculated on compute node first.

select floor(__time to hour),dim1 from a_table group by floor(__time to hour),dim1 limit 10
On compute node, the row number produced with limit when granularity=ALL is different with the sum of row number produced with limit in each hour.
The limit should be applied to the whole result rows.

select dim1,floor(__time to hour),dim2 from a_table group by dim1,floor(__time to hour),dim2 limit 10
For this even more complicated case, if we replace granularity=ALL with granularity=HOUR, we have to modify the Buffer Grouper logic to adjust the time_floor dimension value(and the position in row) in the result row to make it looks the same with the result row when granularity=ALL, and not only the value but also the sequence in rows after the limit is applied. I don't think it is worth for both logic complexity or performance gain.

And for another reason, when the limit is pushed down to compute node, I think the whole computation should be fast enough, the timestampResult optimization is not going to be that effectively improve the total performance.

And the last reason, if we don't pushdown the limit to compute node and apply this optimization and do limit on broker, I guess the total performance is degraded.

Generally, the timestampResult optimization logic happens on broker, so I disable this optimization when query.isApplyLimitPushDown() is true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm sorry, I don't quite understand. I'm not sure what the limit clause does anything with this optimization.

When query.isApplyLimitPushDown() is true, it means the limit is calculated on compute node first.

Yes, when isApplyLimitPushDown is true, the limit is applied in both the data nodes and the broker. In data nodes, the limit is applied when per-segment results are merged. The broker applies the limit again when it merges per-data-node results. For example, let's say you issued a groupBy query with a limit of 10, each data node would return at most 10 rows and the broker would merge those per-data-node results and apply the limit again to finally return only 10 rows.

select floor(__time to hour),dim1 from a_table group by floor(__time to hour),dim1 limit 10
On compute node, the row number produced with limit when granularity=ALL is different with the sum of row number produced with limit in each hour.
The limit should be applied to the whole result rows.

I don't think the number of rows should be different in those cases. This optimization not just modifies the granularity but also dimensions. When granularity is set to ALL, the dimensions should have the virtual column for the timefloor function. When granularity is set to HOUR, the dimensions should not have that virtual column. In these cases, the actual keys used for grouping are effectively the same because the ALL granularity groups all rows into one bucket. As a result, the number of rows must be the same in both cases.

select dim1,floor(__time to hour),dim2 from a_table group by dim1,floor(__time to hour),dim2 limit 10
For this even more complicated case, if we replace granularity=ALL with granularity=HOUR, we have to modify the Buffer Grouper logic to adjust the time_floor dimension value(and the position in row) in the result row to make it looks the same with the result row when granularity=ALL, and not only the value but also the sequence in rows after the limit is applied. I don't think it is worth for both logic complexity or performance gain.

What logic should we modify? The optimization you added is modifying the native query that is passed all the way down to the data nodes which use the BufferGrouper. Why does pushing down the limit make any difference in the result signature of the data nodes?

And for another reason, when the limit is pushed down to compute node, I think the whole computation should be fast enough, the timestampResult optimization is not going to be that effectively improve the total performance.

There is one performance issue with LimitedBufferHashGrouper that I'm aware of that makes the groupBy with limitPushDown on even slower than the same query with limitPushDown off. But even if there was no known performance issue, I think query performance is really different case by case and thus I wouldn't say that the query processing with limitPushDown is fast in data nodes and thus we need no more optimization.

And the last reason, if we don't pushdown the limit to compute node and apply this optimization and do limit on broker, I guess the total performance is degraded.

Hmm, I'm confused. Are you saying that we should push down the limit because the performance will be worse otherwise? If so, I agree.

Generally, the timestampResult optimization logic happens on broker, so I disable this optimization when query.isApplyLimitPushDown() is true

What do you mean by "the timestampResult optimization logic happens on broker"? The rewritten query will be passed down to the data nodes as far as I can tell.

Just for clarification, my question is not why we don't optimize this case now, but is whether your comment here is correct.

@@ -7966,7 +7971,7 @@ public void testMultipleExactCountDistinctWithGroupingAndOtherAggregators() thro
ImmutableList.of("d0", "d1"),
ImmutableList.of("d0", "d2")
))
.setContext(QUERY_CONTEXT_DEFAULT)
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d0", 0, P1D))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d0", 0, P1D))
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d0", 0, Granularities.DAY))

@@ -10075,7 +10080,7 @@ public void testGroupByFloorTimeAndOneOtherDimensionWithOrderBy() throws Excepti
Integer.MAX_VALUE
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d0", 0, P1Y))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d0", 0, P1Y))
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d0", 0, Granularities.YEAR))

@@ -13337,7 +13342,7 @@ public void testGroupByTimeAndOtherDimension() throws Exception
Integer.MAX_VALUE
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, P1M))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, P1M))
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))

@@ -13404,7 +13409,7 @@ public void testGroupingSets() throws Exception
ImmutableList.of()
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, P1M))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix here and other places too.

@@ -114,7 +120,9 @@ public void testCorrelatedSubquery(Map<String, Object> queryContext) throws Exce
"a0",
"a0:a"
)))
.setContext(queryContext)
.setContext(
withTimestampResultContext(queryContext, "d0", P1D)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
withTimestampResultContext(queryContext, "d0", P1D)
withTimestampResultContext(queryContext, "d0", Granularities.DAY)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix other places as well.

@@ -285,6 +342,34 @@ public boolean doMergeResults(final GroupByQuery query)
}
}

private void fixResultRowWithTimestampResultField(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe moveOrReplicateTimestampInRow() is a better name.

@@ -1016,6 +1017,42 @@ public GroupByQuery toGroupByQuery()
grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()),
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
// in this case, the timestampResult optimization is not neccessary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm sorry, I don't quite understand. I'm not sure what the limit clause does anything with this optimization.

When query.isApplyLimitPushDown() is true, it means the limit is calculated on compute node first.

Yes, when isApplyLimitPushDown is true, the limit is applied in both the data nodes and the broker. In data nodes, the limit is applied when per-segment results are merged. The broker applies the limit again when it merges per-data-node results. For example, let's say you issued a groupBy query with a limit of 10, each data node would return at most 10 rows and the broker would merge those per-data-node results and apply the limit again to finally return only 10 rows.

select floor(__time to hour),dim1 from a_table group by floor(__time to hour),dim1 limit 10
On compute node, the row number produced with limit when granularity=ALL is different with the sum of row number produced with limit in each hour.
The limit should be applied to the whole result rows.

I don't think the number of rows should be different in those cases. This optimization not just modifies the granularity but also dimensions. When granularity is set to ALL, the dimensions should have the virtual column for the timefloor function. When granularity is set to HOUR, the dimensions should not have that virtual column. In these cases, the actual keys used for grouping are effectively the same because the ALL granularity groups all rows into one bucket. As a result, the number of rows must be the same in both cases.

select dim1,floor(__time to hour),dim2 from a_table group by dim1,floor(__time to hour),dim2 limit 10
For this even more complicated case, if we replace granularity=ALL with granularity=HOUR, we have to modify the Buffer Grouper logic to adjust the time_floor dimension value(and the position in row) in the result row to make it looks the same with the result row when granularity=ALL, and not only the value but also the sequence in rows after the limit is applied. I don't think it is worth for both logic complexity or performance gain.

What logic should we modify? The optimization you added is modifying the native query that is passed all the way down to the data nodes which use the BufferGrouper. Why does pushing down the limit make any difference in the result signature of the data nodes?

And for another reason, when the limit is pushed down to compute node, I think the whole computation should be fast enough, the timestampResult optimization is not going to be that effectively improve the total performance.

There is one performance issue with LimitedBufferHashGrouper that I'm aware of that makes the groupBy with limitPushDown on even slower than the same query with limitPushDown off. But even if there was no known performance issue, I think query performance is really different case by case and thus I wouldn't say that the query processing with limitPushDown is fast in data nodes and thus we need no more optimization.

And the last reason, if we don't pushdown the limit to compute node and apply this optimization and do limit on broker, I guess the total performance is degraded.

Hmm, I'm confused. Are you saying that we should push down the limit because the performance will be worse otherwise? If so, I agree.

Generally, the timestampResult optimization logic happens on broker, so I disable this optimization when query.isApplyLimitPushDown() is true

What do you mean by "the timestampResult optimization logic happens on broker"? The rewritten query will be passed down to the data nodes as far as I can tell.

Just for clarification, my question is not why we don't optimize this case now, but is whether your comment here is correct.

}
queryGranularity = granularity;
int timestampDimensionIndexInDimensions = grouping.getDimensions().indexOf(dimensionExpression);
theContext = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but this is quite confusing. We should not overwrite variables unless we have to.

if (query.getLimitSpec() instanceof DefaultLimitSpec && query.isApplyLimitPushDown()) {
return query;
}
Map<String, Object> theContext = plannerContext.getQueryContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<String, Object> theContext = plannerContext.getQueryContext();
final Map<String, Object> theContext = new HashMap<>();

@kaijianding
Copy link
Contributor Author

kaijianding commented Jul 8, 2021

I don't think the number of rows should be different in those cases. This optimization not just modifies the granularity but also dimensions. When granularity is set to ALL, the dimensions should have the virtual column for the timefloor function. When granularity is set to HOUR, the dimensions should not have that virtual column. In these cases, the actual keys used for grouping are effectively the same because the ALL granularity groups all rows into one bucket. As a result, the number of rows must be the same in both cases.

select floor(__time to hour),dim1 from a_table group by floor(__time to hour),dim1 limit 10

When granularity=ALL, the cursors contains only 1 element, and only 10 results are produced on compute node.
When granularity=HOUR, the cursors contains 24 elements for 1 day, and 10 results are produced each cursor on compute node. This is 24 times cost for 1 day. If the time interval is 1 year, it is 8,760 times.
This is the basic idea that make me think it's a bad idea to change the granularity from All to HOUR.

What logic should we modify? The optimization you added is modifying the native query that is passed all the way down to the data nodes which use the BufferGrouper. Why does pushing down the limit make any difference in the result signature of the data nodes?

select dim1,floor(__time to hour),dim2 from a_table group by dim1,floor(__time to hour),dim2 limit 10

When granularity=ALL, the keys in grouper are dim1,time_to_hour_dim,dim2 and with limit=10.
When granularity=HOUR, the keys in grouper are dim1,dim2 and with limit=10, the overall orderBy result sequence(considering the time) becomes different with that when granularity=ALL.
Something is needed to make the order correct again on compute node, like the final sorting on broker.

And the last reason, if we don't pushdown the limit to compute node and apply this optimization and do limit on broker, I guess the total performance is degraded.

Hmm, I'm confused. Are you saying that we should push down the limit because the performance will be worse otherwise? If so, I agree.

Yes, I mean the limit should be push down, otherwise the performance will be worse.

What do you mean by "the timestampResult optimization logic happens on broker"? The rewritten query will be passed down to the data nodes as far as I can tell.

I mean the main change is to make the code on broker to be compatible with the fact that the result rows's format from compute nodes are changed, and let the group by code process no change on compute nodes.

Base on these reasons, I think the timestampResult optimization is not necessary when query.isApplyLimitPushDown() is true, it's much more efficient to keep granularity to ALL in this case.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the number of rows should be different in those cases. This optimization not just modifies the granularity but also dimensions. When granularity is set to ALL, the dimensions should have the virtual column for the timefloor function. When granularity is set to HOUR, the dimensions should not have that virtual column. In these cases, the actual keys used for grouping are effectively the same because the ALL granularity groups all rows into one bucket. As a result, the number of rows must be the same in both cases.

select floor(__time to hour),dim1 from a_table group by floor(__time to hour),dim1 limit 10

When granularity=ALL, the cursors contains only 1 element, and only 10 results are produced on compute node.
When granularity=HOUR, the cursors contains 24 elements for 1 day, and 10 results are produced each cursor on compute node. This is 24 times cost for 1 day. If the time interval is 1 year, it is 8,760 times.
This is the basic idea that make me think it's a bad idea to change the granularity from All to HOUR.

I don't think this is how things work. When a groupBy query is issued, the query is sent to data nodes. Each node processes certain number of segments to compute per-segment results, and then merge those per-segment results before it sends the results back to the broker. When it computes per-segment result, it uses the cursor to iterate the rows in each segment by bucketed interval. When granularity is ALL, there will be only one cursor. When granularity is HOUR, there could be at most 24 cursors for daily segments (the actual number depends on timestamp data). However, the query granularity does nothing with the total number of rows that those cursors iterate. It is only used to adjust the timestamp in the result. No matter what granularity is in use, the totoal number of rows iterated by all cursors must be consistent for the same segment. These per-segment results are merged using ConcurrentGrouper. When the limit is pushed down, it is applied when merging per-segment results using LimitedBufferHashGrouper inside of ConcurrentGrouper. This is how it works when applyLimitPushDown = true and applyLimitPushDownToSegment = false (applyLimitPushDownToSegment is off by default because it has quite a big overhead to initialize the buffer per segment). So, when the limit is pushed down to data nodes, it does not save either data scan cost or per-segment computation cost in data nodes, but saves the merge cost in the broker.

What logic should we modify? The optimization you added is modifying the native query that is passed all the way down to the data nodes which use the BufferGrouper. Why does pushing down the limit make any difference in the result signature of the data nodes?

select dim1,floor(__time to hour),dim2 from a_table group by dim1,floor(__time to hour),dim2 limit 10

When granularity=ALL, the keys in grouper are dim1,time_to_hour_dim,dim2 and with limit=10.
When granularity=HOUR, the keys in grouper are dim1,dim2 and with limit=10, the overall orderBy result sequence(considering the time) becomes different with that when granularity=ALL.
Something is needed to make the order correct again on compute node, like the final sorting on broker.

The groupBy query rewritten by your optimization is not only used by the broker. The same query is also sent to all data nodes participating in query processing. In groupBy v2, the timestamp can be used as a grouping key unless the granularity is ALL. So, when granularity is ALL, the grouping key will be something like dim1, time_to_hour_dim, dim2 as you said. However, when granularity is HOUR, the grouping key must be timestamp, dim1, dim2 not dim1, dim2 because your optimization will change both the granualrity and dimensions. As a result, the grouping keys must be effectively the same no matter whether your optimization is applied or not. The only difference is the order of keys in result rows, which is adjusted by your code. Limit push down does not make any difference in this ordering.

I think your optimization makes query processing faster because data nodes will compute the timestamp using the cursor instead of expensive expressions, isn't it? If this is the case, I think it would be worth looking into how this optimization can be used when limit is pushed down because why not? I'm quite sure it can be done easily, but need to think about how things will work when forceLimitPushDown is set and the query is ordered by metrics. This can be done in a separate PR.

I'm approving this PR, but please update the comment at https://github.com/apache/druid/pull/11379/files#diff-d6917493496ad8c60618ac81bd975e8c76f8f9f88580cd90970133738b55b411R1020 to something like We don't apply timestamp computation optimization yet when limit is pushed down. Maybe someday..

}
queryGranularity = granularity;
int timestampDimensionIndexInDimensions = grouping.getDimensions().indexOf(dimensionExpression);
theContext = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is no longer needed.

@kaijianding
Copy link
Contributor Author

The comment is modified.
Thanks very much for your review! @jihoonson

@jihoonson jihoonson merged commit e39ff44 into apache:master Jul 11, 2021
@kaijianding kaijianding deleted the groupby_time branch July 11, 2021 17:28
@clintropolis clintropolis added this to the 0.22.0 milestone Aug 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants