"Partition boost" the group by queries in MSQ for better splits#15474
"Partition boost" the group by queries in MSQ for better splits#15474LakshSingla merged 3 commits intoapache:masterfrom
Conversation
| final GroupByQuery query | ||
| ) | ||
| { | ||
| List<VirtualColumn> virtualColumns = new ArrayList<>(); |
There was a problem hiding this comment.
I removed the conditionals all together, therefore the code is now straightforward.
cryptoe
left a comment
There was a problem hiding this comment.
Lets add some test cases. We do not want the boosted column as part of the segment.
|
|
||
| if (frameWriter.addSelection()) { | ||
| if (partitionBoostVirtualColumn != null) { | ||
| partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1); |
There was a problem hiding this comment.
Will this cause the compare function on L161 to behave weirdly ?
There was a problem hiding this comment.
The compare function won't be behaving weirdly because:
- The boosted column is only a part of the final written frame. The
outpuRowdoesn't contain the boosted column - Also, the compare function is computed based on the group by query, which will compare the dims and the __time column (if needed). Since __boost doesn't come there, it will choose to ignore the column altogether.
This can be confirmed in the COUNT(*) tests testInsertOnExternalDataSource which aggregates the rows with the correct result.
...ry/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
Outdated
Show resolved
Hide resolved
...e/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
Outdated
Show resolved
Hide resolved
...e/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
Show resolved
Hide resolved
| resultClusterByWithPartitionBoostColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING)); | ||
| ClusterBy resultClusterByWithPartitionBoost = new ClusterBy( | ||
| resultClusterByWithPartitionBoostColumns, | ||
| resultClusterByWithoutPartitionBoost.getBucketByCount() |
There was a problem hiding this comment.
I think line 153 : 167 can be another method called createRowSignature.
Also the if on line 159 should be the first switch in the control flow. Something like
SigX=null;
if(boosted)
{
sigX=foo
}else{
sigX=bar
}
There was a problem hiding this comment.
Refactored to different function, and cleaned up the main code flow.
This is the job of the controller to remove the boosted column. Also, we have pre-existing MSQInsertTests, which assert on the expected row and the row signature of the segment. The boosted column doesn't show up there. |
|
I have also removed the conditional in the GroupByFrameProcessor and its factory because depending on the signature passed by the query kit, it will choose to add or ignore the partition boosted column. If the signature has __boost column in it, it will pick the column from the selector factory. Else, it will ignore the virtual column altogether. There's no need for additional conditional in the processor or the factory. |
...e/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
Outdated
Show resolved
Hide resolved
...e/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
Outdated
Show resolved
Hide resolved
|
Thanks for the review @cryptoe |

Description
In MSQ, each partition corresponds to a single segment (if it is the end stage) or the unit of work that is assigned to a worker. If the clustering key has low cardinality, then the returned partitions can be too large. For certain stages, we can break up the resulting partitions by introducing a synthetic clustering column "__boost" (which should contain a unique value - incrementally increasing longs for MSQ's implementation) at the end of the clustering keys, so that the keys become unique, and the partition boundaries are respectable. This is done in ScanQueryKit, and can be done in the GroupByPostShuffleFrameProcessor.
This PR introduces this partition boosting to the group by queries as well.
For example, the queries of the form
can run into large segments if the "aggregated" is a very low cardinality column, and the (dim1, dim2) is a high cardinality pair. The solution (before the patch) would be to add more dims into the CLUSTERED BY clause to make it more unique. Post this patch, that won't be required.
ControllerImpl already handles partition-boosted columns specially.
NOTE: Group by queries can fail during the cluster upgrade, if some of the workers are on an older version, while the others are on a newer version.
Release note
Key changed/added classes in this PR
MyFooOurBarTheirBazThis PR has: