Skip to content

[multistage] Introduce InMemoryMailboxService for Optimizing Join Performance#9484

Merged
walterddr merged 12 commits intoapache:masterfrom
ankitsultana:join-opt-2
Oct 19, 2022
Merged

[multistage] Introduce InMemoryMailboxService for Optimizing Join Performance#9484
walterddr merged 12 commits intoapache:masterfrom
ankitsultana:join-opt-2

Conversation

@ankitsultana
Copy link
Contributor

@ankitsultana ankitsultana commented Sep 28, 2022

This does the following:

  1. MailboxService uses TransferableBlock. The serde to MailboxContent is moved within GrpcMailbox.
  2. Adds a new InMemoryMailboxService. If the stage is local, then the TransferableBlock can be passed from sender to receiver simply using a queue.
  3. Adds a MultiplexingMailboxService which can choose either gRPC based mailbox-service or in-memory mailbox-service.

Optimizations:

  1. This avoids MailboxContent serde if sender/receiver stages are local. This can save ~10% latency overhead (based on the query).
  2. Saves memory. If we use gRPC mailbox then we'll copy the buffer to create MailboxContent.
  3. [Not part of this PR] This will help avoid List<Object[]> _rows ==> DataTable ==> List<Object[]> _rows serde.

These optimizations are most helpful if the stages deal with large data (Amdahl's basically)

cc: @walterddr @siddharthteotia

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a GOOD IDEA!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 very clean!

@ankitsultana ankitsultana changed the title [Draft] [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance Oct 11, 2022
@ankitsultana ankitsultana marked this pull request as ready for review October 11, 2022 06:16
Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i did a partial review. overall the approach looks good. there are some details we need to iron out and clear up

also please rebase

Comment on lines 74 to +80
if (waitForInitialize()) {
mailboxContent = _contentStreamObserver.poll();
_totalMsgReceived.incrementAndGet();
} else {
return null;
}
return mailboxContent;
return fromMailboxContent(mailboxContent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    if (!waitForInitialize()) {
      return null;
    }
    mailboxContent = _contentStreamObserver.poll();
    _totalMsgReceived.incrementAndGet();
    return fromMailboxContent(mailboxContent);

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me overall
i have one suggestion on the wrapper around in-memory mailbox.

I will give another look once we address these. thank you for the contribution!

@ankitsultana
Copy link
Contributor Author

@walterddr : Removed InMemoryChannel. I agree using a Queue directly is better. I am using isEndOfStreamBlock to determine when the receiving mailbox is done receiving now. Can you take a look again?

@codecov-commenter
Copy link

codecov-commenter commented Oct 17, 2022

Codecov Report

Merging #9484 (e345e4b) into master (209c060) will increase coverage by 0.02%.
The diff coverage is 88.07%.

@@             Coverage Diff              @@
##             master    #9484      +/-   ##
============================================
+ Coverage     60.33%   60.35%   +0.02%     
- Complexity     5148     5161      +13     
============================================
  Files          1926     1931       +5     
  Lines        103209   103450     +241     
  Branches      15670    15691      +21     
============================================
+ Hits          62266    62442     +176     
- Misses        36271    36321      +50     
- Partials       4672     4687      +15     
Flag Coverage Δ
integration2 24.56% <0.00%> (+0.03%) ⬆️
unittests1 67.27% <88.88%> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...requesthandler/MultiStageBrokerRequestHandler.java 0.00% <0.00%> (ø)
...e/pinot/query/mailbox/StringMailboxIdentifier.java 76.66% <0.00%> (-2.65%) ⬇️
...ot/query/runtime/executor/PhysicalPlanVisitor.java 97.29% <ø> (ø)
...ot/query/runtime/executor/WorkerQueryExecutor.java 100.00% <ø> (ø)
...rg/apache/pinot/query/service/QueryDispatcher.java 81.00% <ø> (-3.00%) ⬇️
...he/pinot/query/mailbox/InMemorySendingMailbox.java 58.33% <58.33%> (ø)
...apache/pinot/query/mailbox/GrpcSendingMailbox.java 90.90% <75.00%> (-5.10%) ⬇️
.../pinot/query/mailbox/InMemoryReceivingMailbox.java 78.57% <78.57%> (ø)
...ache/pinot/query/mailbox/GrpcReceivingMailbox.java 93.10% <92.30%> (-2.14%) ⬇️
...apache/pinot/query/mailbox/GrpcMailboxService.java 94.11% <100.00%> (ø)
... and 101 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! The solution is really elegant here :) My comments are mostly around concurrency and making sure the non-happy path works properly.

Comment on lines +36 to +41
private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ArrayBlockingQueue<TransferableBlock>> _channelMap =
new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's usually safer to wrap ReceivingMailbox, SendingMailbox and ArrayBlockingQueue all in a single MailboxState class, and then maintain just a single Map<String, MailboxState> that sets all three of these at once in a single computeIfAbsent call.

I know it's unlikely that there will be a race, but it's easier to reason about when it's guaranteed there won't be!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this was my original split in grpc mailbox ( this was necessary as the channels and mailbox are separately managed)
but in this case all 3 are guaranteed 1-1 mapping b/c they are all in the same JVM

I am ok with both approaches since

  • grouping them make more clear code smell, we can precisely managed them together because it is a special property of the in-mem mailbox
  • separating them makes logical sense as separation of concern (e.g. the sender normally won't have access to the receiving side info, this just happen to be the case for in-mem mailbox)

}

@Override
public void shutdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do the maps get cleaned up? (esp. regarding a cancelled query or one that times out)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a known leak. @walterddr is planning to fix it with query cancellation afaik

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we had a discussion and i created a tracker but never get to it.

but upon thinking although the affect is not much (just a simple object reference), it would be a big problem when stuff error out, see my comment in the new issue: #9626


@Override
public boolean isClosed() {
return _closed && _queue.size() == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we check that the _queue.size() == 0? if I cancel the query or it times out and I close the mailbox I shouldn't be forced to process the queue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in GRPC isClosed() <=> isCompleted(); i think the wording might be misleading in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally there shouldn't be any case where _closed is true but _queue.size() is not 0. Only case where it may happen is when the sender sends something immediately after sending the EOS block. We can also throw an exception here in that case.

To better understand this, this is how I believe some of the scenarios will be handled:

  1. If the sender has died, it would have sent an end of stream block and then terminated on its end. The receiver here would then get the end of stream block and send it upstream to MailboxReceiveOperator. The queue should be empty after _closed is marked true.
  2. If for some reason the sender was not able to send the EOS block, MailboxReceiveOperator will time out after the query timeout is reached.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upon checking the usage of the interface method isClosed(). it is actually in the non-threadsafe mailboxreceiveoperator.

}

@Override
public void complete() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to send a sentinel block here to make sure that the receiving side will terminate (instead of waiting for DEFAULT_CHANNEL_TIMEOUT_SECONDS)? or is there any other mechanism to make sure that in the non-happy path the receiver won't need to wait the whole timeout?

Copy link
Contributor

@walterddr walterddr Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is guarantee that the last message is the EOS message which serves the purpose of marking it "closed/completed".

this is probably also the reason why the close flag didn't set to volatile b/c the thread that process the EOS content is also the one that sets the flag (not the otherway around)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is guarantee that the last message is the EOS

is that true even in the error scenarios? I was worried about cases like timeout, where we'd want to timeout quickly instead of blocking for 120s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 120s timeout was definitely a bug. Reduced it to 1s. I have mentioned some timeout scenario handling in the other comment regarding _queue.size() == 0.

Copy link
Contributor

@walterddr walterddr Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that true even in the error scenarios? I was worried about cases like timeout, where we'd want to timeout quickly instead of blocking for 120s

  • if the error is sent from the sender then yes that will be the last message;
  • if the error occurs after a normal message was delivered then the error will originate from where the error was thrown, from that point the originated operator will stop returning any new data blocks

so yes it is guaranteed in the operator level; but it is not guarantee in the mailbox level. meaning the mailbox could still be receiving data afterwards, just no thread will dequeue from that receiving buffer. exactly the scenario i described in #9626

The 120s timeout was definitely a bug. Reduced it to 1s. I have mentioned some timeout scenario handling in the other comment regarding _queue.size() == 0.

bare in mind that the 100ms timeout for GRPCReceivingMailbox is only the timeout upon initialization; from that point there's no pull-with-time-out. it is a pushed model managed by GRPC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 very clean!

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm overall thank you!

there are sevearl remaining items but I think we can follow them up later

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants