Skip to content

[multistage][POC] Move mailbox instance from global map to request context #9836

Closed
61yao wants to merge 6 commits intoapache:masterfrom
61yao:control_flow
Closed

[multistage][POC] Move mailbox instance from global map to request context #9836
61yao wants to merge 6 commits intoapache:masterfrom
61yao:control_flow

Conversation

@61yao
Copy link
Contributor

@61yao 61yao commented Nov 20, 2022

This PR moves sending mailbox map from global map (MailboxService) to per request context (PlanRequestContext).

The old design has two issues:

  1. Memory leak of the instance because when request finishes, the sending mailbox doesn't need to be alive anymore (this is also true for receiving mailbox)
  2. High contention for the concurrent hash map.

The new implementation moves the instance and exchange to per request context.

  1. This fixes the memory leak and contention issue for the old design.
  2. This opens the door for better resource cleaning and error handling in general.

Following PRs would be:

  1. proper close mailbox channel and propagate error when there is an error
  2. clean up the leak for receiving mailbox (same issue)
  3. Have a better abstraction for request context. (should be runtime context or opchain context)

@61yao
Copy link
Contributor Author

61yao commented Nov 20, 2022

I haven't figured out a good way to test this. Will follow up with a test.

@agavra
Copy link
Contributor

agavra commented Nov 21, 2022

Thanks @61yao! I think you've identified the right problem to solve, but I think the abstractions in this PR aren't what we need.

From a code ownership perspective, I don't think it makes sense for an operator chain to "own" the underlying physical resources it uses to send/receive data. That'll make it much more difficult in the future to share those resources, and it makes management of those resources happen via various callbacks/asynchronous actions - that in turn makes it harder to debug and harder to reason about.

Instead, I think the current abstraction is pretty good (there's a centralized MailboxService that maintains the mailboxes and there's only one of them for the lifetime of a Pinot server). The problem you've identified is that we need some way to let that centralized service clean up resources that don't exist. I think there might be various different triggers to trigger that:

  1. (as you've identified) an operator finished (either successfully or in error)
  2. there might be some kind of timeouts
  3. there might be some kind of admin operation that forces it to clean up those mailboxes

The design here (decentralizing the mailbox ownership) would make 2/3 much more difficult - and it leaks information into places that don't need that information.

As an aside, I don't think contention for the concurrent hashmap is a problem - so long as there isn't key contention (which there should almost never be) it will perform extremely fast (and access to it is almost certainly not a bottleneck compared to all the other things a query needs to do).

@61yao
Copy link
Contributor Author

61yao commented Nov 21, 2022

Thanks @61yao! I think you've identified the right problem to solve, but I think the abstractions in this PR aren't what we need.

From a code ownership perspective, I don't think it makes sense for an operator chain to "own" the underlying physical resources it uses to send/receive data. That'll make it much more difficult in the future to share those resources, and it makes management of those resources happen via various callbacks/asynchronous actions - that in turn makes it harder to debug and harder to reason about.

Instead, I think the current abstraction is pretty good (there's a centralized MailboxService that maintains the mailboxes and there's only one of them for the lifetime of a Pinot server). The problem you've identified is that we need some way to let that centralized service clean up resources that don't exist. I think there might be various different triggers to trigger that:

  1. (as you've identified) an operator finished (either successfully or in error)
  2. there might be some kind of timeouts
  3. there might be some kind of admin operation that forces it to clean up those mailboxes

The design here (decentralizing the mailbox ownership) would make 2/3 much more difficult - and it leaks information into places that don't need that information.

As an aside, I don't think contention for the concurrent hashmap is a problem - so long as there isn't key contention (which there should almost never be) it will perform extremely fast (and access to it is almost certainly not a bottleneck compared to all the other things a query needs to do).

I agree centralized resource should be in central place. but resources created per request should clean up per request instead of leaving it to centralized request management.

For example, shared channel between servers is shared and should be managed globally.
However, streaming channel, instance or request opened by per request should be cleaned up per request rather than leaving it in central map especially sending mailbox.

I agree it should not leave inside opchain. but I haven't found a good place to hold the request context yet.

It doesn't make sense to put per request resource in the central place and clean it up later. It makes it so much easier to leak.

Ideally, timeout and error should receive the same exit point where we clean up per request resource.

The actual physical resource say the data block can live globally but the instance of mailbox and the streaming channel should still be managed per resource purpose.

@61yao 61yao marked this pull request as draft November 21, 2022 22:20
@61yao
Copy link
Contributor Author

61yao commented Nov 21, 2022

Discussed with @agavra offline. We will put more thoughts into this and figure out the right next step

@61yao 61yao changed the title [multistage][bugfix] Move sending mailbox instance from global map to request context [multistage][POC] Move mailbox instance from global map to request context Nov 22, 2022
@61yao
Copy link
Contributor Author

61yao commented Nov 22, 2022

I took a look again about the fairness scheduling. This PR has nothing to do with that because it only deals with sending mailbox instead of receiving. I agree receiving side needs more thoughts.

We want to have different connection for different request due to isolation. having one single connection between servers doesn't seem to be a good idea. If one request crashes or has error on the channel, all following request will fail.

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after digesting this for a bit, I actually really like the approach. I think adding a close to operator makes a lot of sense, and I think it'll integrate well with the scheduler with only a little bit of added work.

mostly minor comments; the only major comment is that I'd prefer we avoid exposing BlockExchange - (see the two inline comments). Perhaps we can add Operator#close(@Nullable Throwable e) and allow the MailboxSendOperator to send an error message if it's closed with an error?

}

@Override
public void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we clear the queue here?

Comment on lines +133 to +134
if(!_isCompleted.get()){
_isCompleted.set(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a race condition here, it should be if (!isCompleted.compareAndSet(false, true))


@Override
public void onCompleted() {
finishLatch.countDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we move this and _isCompleted.set(true) to shutdown so both onError and onCompleted have the same behavior.

register(operatorChain);
} else {
LOGGER.info("Execution time: " + timer.getThreadTimeNs());
operatorChain.getRoot().close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's keep both log statements (though looks like I forgot to change this one to debug!)

}
} catch (Exception e) {
LOGGER.error("Failed to execute query!", e);
operatorChain._context.getExchange().send(TransferableBlockUtils.getErrorTransferableBlock(e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this breaks some abstraction boundaries - this scheduler service should know nothing about the exchange or sending blocks; instead we should consider adding this to MailboxSendOperator (which is always the root operator for these chains). FWIW, I think that's already the case.

@Override
public void runJob() {
public void runJob()
throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this should never throw as the worker pool threads will just die and we'll be left with a dangling worker pool (these issues are really tough to debug). Instead let's catch any exceptions and handle them

public OpChain(Operator<TransferableBlock> root) {
_root = root;
// TODO: refactor this into OpChainContext
public PlanRequestContext _context;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private final

protected final int _port;
protected final Map<Integer, StageMetadata> _metadataMap;
// TODO: Add exchange map if multiple exchanges are needed.
BlockExchange _exchange;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it breaks some abstraction barriers to allow any piece of code that has access to the PlanRequestContext to exchange blocks via a BlockExchange. Only the MailboxSendOperator should be able to send blocks IMO - otherwise it can be difficult to debug the ordering of events that are sent.

new Object[]{"SELECT * FROM b WHERE col3 < 0.5"},

// Hybrid table
// new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(reminder) I know this is a draft, but let's make sure these pass and uncomment them (or delete them if we don't want them anymore)

}

@Override
public void close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(suggestion) maybe this is the default implementation in BaseOperator? (will make the review a bit easier)

@61yao
Copy link
Contributor Author

61yao commented Dec 9, 2022

This PR has too many merge conflicts now. I'll just write a new one

@61yao 61yao closed this Dec 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants