Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] Implement ordering for SortExchange #10408

Merged
merged 1 commit into from Mar 16, 2023

Conversation

somandal
Copy link
Contributor

@somandal somandal commented Mar 10, 2023

This PR adds support for ordering on the collation keys when a SortExchange is present in the relational tree. To do this the following changes have been made:

  • Add a PinotLogicalSortExchange to replace usage of LogicalSortExchange. This is needed because later LogicalSortExchange is broken into a MailboxSendNode and MailboxReceiveNode. Information on where to perform ordering (sender, receiver or both) needs to be passed down to these nodes and this is not possible via the LogicalSortExchange class alone.
    • We had discussed the potential to make Exchange hintable, but based on the hint semantics it should only be applied at the table or SELECT level which doesn't apply to Exchange nodes. The alternative was to extend the SortExchange with our own implementation.
  • Add support for ordering based on collation keys in the MailboxReceiveOperator when PinotLogicalSortExchange is used.
    • MailboxSendOperator will be modified later to add sort support. Once the sender can sort, the receiver needs to be modified to do a k-way merge instead of PriorityQueue based sort if the input is sorted.
  • Modify SortOperator to avoid sorting if the input is already sorted. It should still apply offset + limit

We will also integrate with SqlHint support to decide where to sort (sender, receiver, or both) once the path forward has been figured out on how to get hint support for some of the Apache Calcite nodes such as Window, Sort, Filter, etc. SqlHint support has not been added to this PR.

This PR is a pre-requisite to adding support for Window Functions with ORDER BY.

cc @walterddr @siddharthteotia @vvivekiyer @ankitsultana

@codecov-commenter
Copy link

codecov-commenter commented Mar 10, 2023

Codecov Report

Merging #10408 (15272ec) into master (0b111a1) will decrease coverage by 10.86%.
The diff coverage is 0.00%.

@@              Coverage Diff              @@
##             master   #10408       +/-   ##
=============================================
- Coverage     35.19%   24.33%   -10.86%     
+ Complexity      282       49      -233     
=============================================
  Files          2053     2029       -24     
  Lines        111397   110704      -693     
  Branches      16939    16863       -76     
=============================================
- Hits          39203    26939    -12264     
- Misses        68781    80946    +12165     
+ Partials       3413     2819      -594     
Flag Coverage Δ
integration1 24.33% <0.00%> (-0.15%) ⬇️
integration2 ?
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ache/pinot/query/planner/logical/StagePlanner.java 0.00% <0.00%> (ø)
.../pinot/query/planner/stage/MailboxReceiveNode.java 0.00% <0.00%> (ø)
...che/pinot/query/planner/stage/MailboxSendNode.java 0.00% <0.00%> (ø)
...va/org/apache/pinot/query/runtime/QueryRunner.java 0.00% <0.00%> (ø)
...query/runtime/operator/MailboxReceiveOperator.java 0.00% <0.00%> (ø)
...ot/query/runtime/operator/MailboxSendOperator.java 0.00% <0.00%> (ø)
...che/pinot/query/runtime/operator/SortOperator.java 0.00% <0.00%> (ø)
.../pinot/query/runtime/operator/utils/SortUtils.java 0.00% <0.00%> (ø)
.../pinot/query/runtime/plan/PhysicalPlanVisitor.java 0.00% <0.00%> (ø)
.../pinot/query/service/dispatch/QueryDispatcher.java 0.00% <0.00%> (ø)

... and 507 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@somandal somandal changed the title [multistage] Implement ordering for LogicalSortExchange [multistage] Implement ordering for SortExchange Mar 10, 2023
@@ -43,11 +57,25 @@ public MailboxReceiveNode(int stageId) {

public MailboxReceiveNode(int stageId, DataSchema dataSchema, int senderStageId,
RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
@Nullable List<RelFieldCollation> fieldCollations, boolean isInputSorted, boolean sortOnReceiver,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a related PR by @KKcorps where we are switching to a operator context instead of individual args which may help simplify this. https://github.com/apache/pinot/pull/10413/files#diff-7ccb4a6b83288d4c1e1547c71a602c36dfbc90a8a283e9835c6d867cc2eeb2f3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 but i dont think this PR would be related since the refactoring only deal with visitor invariant, shared info across the stage such as partition, stageID, etc.

but i would anticipate a rebase issue. thank you for calling it out @ankitsultana

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this will only help with future shared info. The fields I've added as part of this PR aren't shared.

super(stageId, dataSchema);
_receiverStageId = receiverStageId;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
if (!CollectionUtils.isEmpty(fieldCollations)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a TODO or Preconditions check to indicate that the sorting is not supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the design approach look good to me overall. will take a look at the implementation details next. thank you for the contribution @somandal

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me mostly.

i have some concerns regarding how the mailbox (1) handles received block; (2) what should be returned when normal block or error block is being received. please kindly take a look

Comment on lines 196 to 216
if (!block.isEndOfStreamBlock()) {
return block;
if (_priorityQueue != null) {
List<Object[]> container = block.getContainer();
_priorityQueue.addAll(container);
} else {
return block;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's an issue here with this approach. I am not sure if we should return a no-op block here or continue to grab more. is it possible to box this into a util. say.

Suggested change
if (!block.isEndOfStreamBlock()) {
return block;
if (_priorityQueue != null) {
List<Object[]> container = block.getContainer();
_priorityQueue.addAll(container);
} else {
return block;
}
if (!block.isEndOfStreamBlock()) {
return processReceivedBlock(block); // box the logic in processReceivedBlock(block)

Copy link
Contributor Author

@somandal somandal Mar 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the code here does return a no-op block (after exiting out of the loop, i.e processing all the mailboxes in the _sendingMailbox list). Are you recommending instead to return a no-op block for each mailbox? (at least it seems so from the recommended change)

I've created the util as recommended and it explicitly returns a no-op block after adding all the rows to the PriorityQueue if the data needs to be ordered for that given mailbox.

Any reason why we should not keep processing all mailboxes and instead return a no-op for each mailbox in the _sendingMailbox list as recommended?

Comment on lines +217 to +241
if (((openMailboxCount == 0) || (openMailboxCount <= eosMailboxCount))
&& (!CollectionUtils.isEmpty(_priorityQueue)) && !_isSortedBlockConstructed) {
// Some data is present in the PriorityQueue, these need to be sent upstream
LinkedList<Object[]> rows = new LinkedList<>();
while (_priorityQueue.size() > 0) {
Object[] row = _priorityQueue.poll();
rows.addFirst(row);
}
_isSortedBlockConstructed = true;
return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on this logic. as long as there's a _priorityQueue and size is not zero you should return the priority queue results.
when any mailbox receives an error. it should also dropped any _priorityQueue content and stopped the consumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I've modified the code to drop the contents of the _priorityQueue on receiving an error block. Keeping this block as is. Let me know if you intended for a different change here.

@somandal
Copy link
Contributor Author

i have some concerns regarding how the mailbox (1) handles received block; (2) what should be returned when normal block or error block is being received. please kindly take a look

Hey @walterddr can you check if I addressed your comments in this area correctly? We can discuss this some more if required.

Basically made changes to:

  1. Clear the _priorityQueue when an error block is received / returned.
  2. Always return a no-op when a block is received and the contents are added to the _priorityQueue. Earlier I was waiting for all the mailboxes to get processed in the loop before returning a no-op, now I return a no-op immediately for each mailbox.

@walterddr
Copy link
Contributor

walterddr commented Mar 15, 2023

i have some concerns regarding how the mailbox (1) handles received block; (2) what should be returned when normal block or error block is being received. please kindly take a look

Hey @walterddr can you check if I addressed your comments in this area correctly? We can discuss this some more if required.

Basically made changes to:

  1. Clear the _priorityQueue when an error block is received / returned.
  2. Always return a no-op when a block is received and the contents are added to the _priorityQueue. Earlier I was waiting for all the mailboxes to get processed in the loop before returning a no-op, now I return a no-op immediately for each mailbox.

Yeah when I thought about it more last night I think we shouldn't return a no-op block to every mailbox received.
The problem I have was to achieve 2 goals:

  1. collaborative multi-threading. when some mailboxes (or all mailboxes) are empty, they should give up the thread-worker, so another query stage can run.

    • This is achieved via the return of the no-op block.
    • Some operator doesn't follow this. for example, AggregateOperator will hold onto the thread-worker until all inbound blocks are consumed and created an indexed map.
    • this can be problematic as when multiple stages are running on the same server, it could deadlock awaiting inbound messages; but the inbound stage cannot be scheduled due to thread starvation. (this will not occur as long as mailbox guarantee to return no-op block collaboratively)
  2. fairness when grabbing from all mailboxes.

    • this is more related to the K-loser algorithm we will implement when the sender side is sorting. this way we will maintain some sort of state internally in the sender/receive operator.
    • however, we will actually return some blocks as long as those blocks contain data smaller (or larger when ORDER DESC) than any top row from all mailbox blocks.

for now the implementation looks good. i will create an issue to systematically address both points above

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

…e as it will be sender and receiver aware

Add support for ordering based on collation keys in the MailboxReceiveOperator when PinotLogicalSortExchange is used.
MailboxSendOperator will be modified later to add sort support.
Modify SortOperator to avoid sorting if the input is already sorted. It should still apply offset + limit
@somandal somandal requested a review from walterddr March 16, 2023 19:45
@walterddr walterddr merged commit bc9aa75 into apache:master Mar 16, 2023
11 checks passed
@somandal somandal deleted the logicalsortexchange branch March 16, 2023 21:32
walterddr pushed a commit that referenced this pull request Apr 7, 2023
…versions (#10570)

* Split MailboxReceiveOperator into sorted and non-sorted versions
  - sorted mailbox receiver will keep polling for data from all the mailboxes until it gets only 'null' blocks or gets EOS from all mailboxes.
  - non-sorted mailbox receiver will return immediately when data arrives (behavior identical to prior to #10408 
* Fix tests - ORDER BY with only limit should set isSortOnReceiver to false
  - when there's fetch/offset in SortNode but no Collation, skip sorting thus speed up performance
abhioncbr pushed a commit to abhioncbr/pinot that referenced this pull request Apr 14, 2023
…versions (apache#10570)

* Split MailboxReceiveOperator into sorted and non-sorted versions
  - sorted mailbox receiver will keep polling for data from all the mailboxes until it gets only 'null' blocks or gets EOS from all mailboxes.
  - non-sorted mailbox receiver will return immediately when data arrives (behavior identical to prior to apache#10408 
* Fix tests - ORDER BY with only limit should set isSortOnReceiver to false
  - when there's fetch/offset in SortNode but no Collation, skip sorting thus speed up performance
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants