Force push down for nested queries #6410
Closed
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The way nested query execution is implemented today, it executes the inner most query on the historical nodes with the outer queries being executed on the broker node. This can get problematic when the inner query groups using a high cardinality dimension, returning too many records for broker node to handle. One of the options that we have been internally testing and exploring is the capability to push down the complete nested query to the historical nodes. Each historical node then will execute the nested query primarily dealing with the segments it owns for that query. Because the number of records returned by each historical node would potentially be much smaller in this case, it would be less intensive for the broker to perform the final merge and aggregation. The broker though won't need to perform any more dimension or segment level filtering since it will be taken care of at the historical nodes itself. Note that this way of distributing the aggregation to the historical nodes doesn't always return the same results as the final aggregation getting done on the broker node. However, there is a good set of cases (for ex - aggregating on dimensions that are used for hashing during ingestion) where this kind of push down logic will return the right results. I can get into this into more detail but the general idea was to leave the onus on the user to figure out if their data layout allows for this kind of push down.
This implementation provides user a way of forcing nested query execution through a query context variable. The next cut will focus on doing this automatically under the following conditions (credit to @gianm ) for clearly articulating the following:
The groupBy query granularity is equal to, or finer than, segment granularity;
and either:
2a) A time chunk uses HashBasedNumberedShardSpec, partitionDimensions is nonempty, the grouping dimension set contains all of the shard partitionDimensions, and there are no "extension" partitions (partitions with partitionNum >= partitions, which are created by ingest tasks that append data)
or:
2b) A time chunk uses SingleDimensionShardSpec and the grouping dimension set contains the shard dimension.
If Druid detects this it should push down the query automatically for that time chunk. There will be situations where the query can be pushed down for some time chunks but not others (for example: consider a data pipeline that loads data unpartitioned in realtime, and later has a reindexing job to partition historical data by some useful dimension). In this case, ideally the broker should be capable of pushing down the query for the time chunks where it can be correctly pushed down, and not pushing it down for others.
TODO: support "subtotalspec" for push down nested queries.