MSQ: Add limitHint to global-sort shuffles.#16911
Conversation
This allows pushing down limits into the SuperSorter.
LakshSingla
left a comment
There was a problem hiding this comment.
WDYT of pushing this limit hint into the InputChannelsImpl#openSorted (perhaps in a future patch)? This will also reduce the amount of data a single worker would have to read per partition.
| } else if (rowLimit == 0 && activeProcessors == 0) { | ||
| // We had a row limit, and got it all the way down to zero. | ||
| // Generate empty output channels for any partitions that we haven't written yet. | ||
| superSorterProgressTracker.markTriviallyComplete(); |
There was a problem hiding this comment.
If there was a row limit initially and we brought it down to zero, should it be called trivially completed? Trivially completed means that there wasn't any data to begin with.
There was a problem hiding this comment.
Oh, yeah, you're right. I changed this to instead call addMergedBatchesForLevel to "fill in" the empty channels.
There was a problem hiding this comment.
Can similar optimisation be applied for ScanQueryKit as well?
There was a problem hiding this comment.
Good point. I just added it. It's only there for the case where the scan query requires a sort. In theory it could be there for the non-sort case too, but in this patch only the sorting shuffle spec has a limit hint. I think we could add limitHint to other specs in the future.
I do think that'd make sense as a followup. |
| * | ||
| * Implementations may also ignore this hint completely, or may apply a limit that is somewhat higher than this hint. | ||
| */ | ||
| default long limitHint() |
There was a problem hiding this comment.
I was wondering if there's any merit in separating the limit hint into a POJO, and having both limit and offset baked into it. Something like DefaultLimitSpec without the columns. It's up to the reader to calculate the combined limit and use that, instead of baking that knowledge into a long.
I was thinking of use cases when we want to pushdown this limit into other portions of MSQ's stack and want to distinguish between rows [0..offset) (thrown away) and rows [offset, offset + limit) (kept)
There was a problem hiding this comment.
I don't think offset can be pushed down? Limit can be pushed down when sorting because if some row is in the top N globally, it must also be in the first N rows of whichever partition it appears in.
But with offset, if for example you have LIMIT N OFFSET M, we can't push down OFFSET M (i.e. skip M rows). It is possible that some of the first M rows in partition A still need to appear in the final resultset, perhaps because some of them are greater than any of the first M rows in the globally-sorted result. So the best we can do is push down LIMIT N + M.
There was a problem hiding this comment.
For my understanding, how does this work?
There was a problem hiding this comment.
It's this API: https://fasterxml.github.io/jackson-annotations/javadoc/2.9/com/fasterxml/jackson/annotation/JsonInclude.Include.html#CUSTOM
Value that indicates that separate
filterObject (specified by JsonInclude.valueFilter() for value itself, and/or JsonInclude.contentFilter() for contents of structured types) is to be used for determining inclusion criteria. Filter object's equals() method is called with value to serialize; if it returns true value is excluded (that is, filtered out); if false value is included.
It's kind of goofy, but it's the only tool Jackson provides us for keeping the serialized JSON clean other than "include non-null", "include non-default", and "include non-empty".
Logical merge conflict between apache#16911 and apache#16914.
* MSQ: Add limitHint to global-sort shuffles. This allows pushing down limits into the SuperSorter. * Test fixes. * Add limitSpec to ScanQueryKit. Fix SuperSorter tracking.
Logical merge conflict between apache#16911 and apache#16914.
Previously, the processor used "remainingChannels" to track the number of non-null entries of currentFrame. Now, "remainingChannels" tracks the number of channels that are unfinished. The difference is subtle. In the previous code, when an input channel was blocked upon exiting nextFrame(), the "currentFrames" entry would be null, and therefore the "remainingChannels" variable would be decremented. After the next await and call to populateCurrentFramesAndTournamentTree(), "remainingChannels" would be incremented if the channel had become unblocked after awaiting. This means that finished(), which returned true if remainingChannels was zero, would not be reliable if called between nextFrame() and the next await + populateCurrentFramesAndTournamentTree(). This patch changes things such that finished() is always reliable. This fixes a regression introduced in PR apache#16911, which added a call to finished() that was, at that time, unsafe.
Previously, the processor used "remainingChannels" to track the number of non-null entries of currentFrame. Now, "remainingChannels" tracks the number of channels that are unfinished. The difference is subtle. In the previous code, when an input channel was blocked upon exiting nextFrame(), the "currentFrames" entry would be null, and therefore the "remainingChannels" variable would be decremented. After the next await and call to populateCurrentFramesAndTournamentTree(), "remainingChannels" would be incremented if the channel had become unblocked after awaiting. This means that finished(), which returned true if remainingChannels was zero, would not be reliable if called between nextFrame() and the next await + populateCurrentFramesAndTournamentTree(). This patch changes things such that finished() is always reliable. This fixes a regression introduced in PR #16911, which added a call to finished() that was, at that time, unsafe.
Previously, the processor used "remainingChannels" to track the number of non-null entries of currentFrame. Now, "remainingChannels" tracks the number of channels that are unfinished. The difference is subtle. In the previous code, when an input channel was blocked upon exiting nextFrame(), the "currentFrames" entry would be null, and therefore the "remainingChannels" variable would be decremented. After the next await and call to populateCurrentFramesAndTournamentTree(), "remainingChannels" would be incremented if the channel had become unblocked after awaiting. This means that finished(), which returned true if remainingChannels was zero, would not be reliable if called between nextFrame() and the next await + populateCurrentFramesAndTournamentTree(). This patch changes things such that finished() is always reliable. This fixes a regression introduced in PR apache#16911, which added a call to finished() that was, at that time, unsafe.
Previously, the processor used "remainingChannels" to track the number of non-null entries of currentFrame. Now, "remainingChannels" tracks the number of channels that are unfinished. The difference is subtle. In the previous code, when an input channel was blocked upon exiting nextFrame(), the "currentFrames" entry would be null, and therefore the "remainingChannels" variable would be decremented. After the next await and call to populateCurrentFramesAndTournamentTree(), "remainingChannels" would be incremented if the channel had become unblocked after awaiting. This means that finished(), which returned true if remainingChannels was zero, would not be reliable if called between nextFrame() and the next await + populateCurrentFramesAndTournamentTree(). This patch changes things such that finished() is always reliable. This fixes a regression introduced in PR apache#16911, which added a call to finished() that was, at that time, unsafe.
…17194) Previously, the processor used "remainingChannels" to track the number of non-null entries of currentFrame. Now, "remainingChannels" tracks the number of channels that are unfinished. The difference is subtle. In the previous code, when an input channel was blocked upon exiting nextFrame(), the "currentFrames" entry would be null, and therefore the "remainingChannels" variable would be decremented. After the next await and call to populateCurrentFramesAndTournamentTree(), "remainingChannels" would be incremented if the channel had become unblocked after awaiting. This means that finished(), which returned true if remainingChannels was zero, would not be reliable if called between nextFrame() and the next await + populateCurrentFramesAndTournamentTree(). This patch changes things such that finished() is always reliable. This fixes a regression introduced in PR #16911, which added a call to finished() that was, at that time, unsafe. Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
This allows pushing down limits into the SuperSorter.