Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ worker: Support in-memory shuffles. #16790

Merged
merged 13 commits into from
Jul 31, 2024
Merged

Conversation

gianm
Copy link
Contributor

@gianm gianm commented Jul 24, 2024

This patch is a follow-up to #16168, adding worker-side support for in-memory shuffles. Changes include:

  1. Worker-side code now respects the same context parameter "maxConcurrentStages"
    that was added to the controller in MSQ controller: Support in-memory shuffles; towards JVM reuse. #16168. The parameter remains undocumented
    for now, to give us a chance to more fully develop and test this functionality.

  2. WorkerImpl is broken up into WorkerImpl, RunWorkOrder, and RunWorkOrderListener
    to improve readability.

  3. WorkerImpl has a new StageOutputHolder + StageOutputReader concept, which
    abstract over memory-based or file-based stage results.

  4. RunWorkOrder is updated to create in-memory stage output channels when
    instructed to.

  5. ControllerResource is updated to add /doneReadingInput/, so the controller
    can tell when workers that sort, but do not gather statistics, are done reading
    their inputs.

  6. WorkerMemoryParameters is updated to consider maxConcurrentStages.

Additionally, WorkerChatHandler is split into WorkerResource, so as to match ControllerChatHandler and ControllerResource.

This patch is a follow-up to apache#16168, adding worker-side support for
in-memory shuffles. Changes include:

1) Worker-side code now respects the same context parameter "maxConcurrentStages"
   that was added to the controller in apache#16168. The parameter remains undocumented
   for now, to give us a chance to more fully develop and test this functionality.

1) WorkerImpl is broken up into WorkerImpl, RunWorkOrder, and RunWorkOrderListener
   to improve readability.

2) WorkerImpl has a new StageOutputHolder + StageOutputReader concept, which
   abstract over memory-based or file-based stage results.

3) RunWorkOrder is updated to create in-memory stage output channels when
   instructed to.

4) ControllerResource is updated to add /doneReadingInput/, so the controller
   can tell when workers that sort, but do not gather statistics, are done reading
   their inputs.

5) WorkerMemoryParameters is updated to consider maxConcurrentStages.

Additionally, WorkerChatHandler is split into WorkerResource, so as to match
ControllerChatHandler and ControllerResource.
@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Jul 24, 2024
*/
void postWorkerError(
String workerId,
@Nullable String queryId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the places where this is called, we pass it the worker's task id. Should we keep the parameter name as is? Also, I was taking a look at the controller chat handler and we don't really use the worker's task id afaict. Maybe we can remove the param altogether, though that would introduce backward compatibility issues.
Also, can it be nullable given that we pass this as a URL param?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I don't actually need to change this from workerId to queryId right now. This was related to some other stuff I was tinkering with, about supporting multiple queries running on the same worker id, but it isn't important for this PR. I'll revert it.

Fwiw, the server side (ControllerResource) does not read this parameter, so it currently doesn't matter what it is. That's probably why I didn't notice the inconsistency when testing this PR.


List<String> getTaskList() throws IOException;
List<String> getWorkerIds() throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets add javadoc here, even though it wasn't present in the original code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added javadoc.

/**
* Reader for {@link ReadableFrameChannel}.
*
* Because this reader returns an underlying channel directly, it must only be used when it is certain that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this reader can be used by a single consumer, why is there a requirement for adding synchronization to the methods? Also, we can probably guard the shared states with GuardedBy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a single consumer, but the consumer and producer threads might be different, so synchronization is still needed between consumer and producer.

I added @GuardedBy annotations; good call.


public class ByteChunksInputStreamTest
{
private final List<byte[]> chunks = ImmutableList.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some empty arrays in the middle and end as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can't be empty arrays in production, because chunks.add(bytes) is only called if the length is greater than zero. I added some defensive checks to ByteChunksInputStream to verify the chunks are all non-empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if we should separate out the state transitions from the actual code. The file itself is fairly easily understandable so I am fine either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think it's ok as-is, since the logic is simple.

Comment on lines 89 to 90
channelFuture.cancel(true);
channelFuture.addListener(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question - Why are we attaching a listener after canceling the future? Is it a defensive check for closing the channel if the cancel didn't register "in time"? Also given that this is happening on a single thread would we ever encounter such case when the addListener could get triggered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's in case of a race where channelFuture resolved after populateChannel() but before the call to cancel. In that case the call to cancel would have no effect, and the listener would fire to clean up the channel.

I don't think this needs to be a listener, really, it could just be in-line. I edited the code to make this logic in-line, and added a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice refactor!

Copy link
Contributor

@LakshSingla LakshSingla Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since a lot of fields like inputSliceReader are populated after initialization, we could use @MonotonicNonNull for readability and verification

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added @MonotonicNonNull to various fields.

Comment on lines +28 to +29
* Listener for various things that may happen during execution of {@link RunWorkOrder#start()}. Listener methods are
* fired in processing threads, so they must be thread-safe, and it is important that they run quickly.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't all of the MSQ work done in the "processing" threads? Does processing here mean the main thread that forms the MSQ's event loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just the regular processing threads. The point in the javadocs is mainly that there is not a separate thread pool that runs these things, and if they take too long, they will block processing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: Why is bundleCount = numWorkersInJvm + numProcessingThreadsInJvm and not just equal to numProcessingThreadsInJvm.

Copy link
Contributor

@LakshSingla LakshSingla Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we use maxConcurrentStages in the memory parameters, I wonder if it should be the maxConcurrentStages that the user passes (present code), or if it should be the maximum concurrent stages that can happen given the query structure. For example, if each stage sorts in the query, the maximum concurrent stages that can happen = 1, however, if the user passes it as 10, we would be allocating less memory per stage even though it isn't required.

Also, since it is not a user-facing change, I am debating if we should suggest reducing the maxConcurrentStages in the NotEnoughMemoryFault. We should perhaps mention it in the fault's message if maxConcurrentStages > 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: Why is bundleCount = numWorkersInJvm + numProcessingThreadsInJvm and not just equal to numProcessingThreadsInJvm.

Added a comment about this:

    // One bundle per worker + one per processor. The worker bundles are used for sorting (SuperSorter) and the
    // processing bundles are used for reading input and doing per-partition processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we use maxConcurrentStages in the memory parameters, I wonder if it should be the maxConcurrentStages that the user passes (present code), or if it should be the maximum concurrent stages that can happen given the query structure. For example, if each stage sorts in the query, the maximum concurrent stages that can happen = 1, however, if the user passes it as 10, we would be allocating less memory per stage even though it isn't required.

That would be a useful optimization for a future patch, imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, since it is not a user-facing change, I am debating if we should suggest reducing the maxConcurrentStages in the NotEnoughMemoryFault. We should perhaps mention it in the fault's message if maxConcurrentStages > 1.

I added a mention of reducing maxConcurrentStages if it's greater than 1.

*
* @see org.apache.druid.msq.exec.WorkerImpl#readChannel(StageId, int, long)
*/
ListenableFuture<InputStream> readRemotelyFrom(long offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be ReadableFrameChannel instead for having a unified interface that the callers can call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaict both readLocally() and readRemotelyFrom() read from the local disk and send it to the caller. Upon reading the names, I assumed that readLocally() reads from the local channels while the readRemotelyFrom() reads from some place present outside the local disk.

As such, both are ways of exposing same stream, but in a different way. If my understanding is correct, should we rename the methods to reflect that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Javadoc should state what will happen if two calls are made - at t1: readRemotelyFrom(offset1) and at t2: readRemotelyFrom(offset2) when t1 < t2 and offset1 > offset2. From the implementation of the channel reader, it would throw an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be ReadableFrameChannel instead for having a unified interface that the callers can call?

The methods have different returns because they are meant for different uses: the first is for reading remotely (so it must return some bytes that can be sent over HTTP) and the second is for reading locally (so it should return a channel object). For this reason the names also make sense (to me 😄). If you have suggestions for better names LMK, otherwise I can keep them as-is.

I've added additional javadocs to StageOutputReader and also its main implementations (ChannelStageOutputReader and FileStageOutputReader) to clarify the design. Please LMK if they do help in understanding.

the Javadoc should state what will happen if two calls are made - at t1: readRemotelyFrom(offset1) and at t2: readRemotelyFrom(offset2) when t1 < t2 and offset1 > offset2. From the implementation of the channel reader, it would throw an error.

I have added this:

   * It is implementation-dependent whether calls to this method must have monotonically increasing offsets.
   * In particular, {@link ChannelStageOutputReader} requires monotonically increasing offsets, but other
   * implementations do not.

I've also added details to the javadocs for ChannelStageOutputReader and FileStageOutputReader.

}

@Override
public synchronized void close()
Copy link
Contributor

@LakshSingla LakshSingla Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should set state = CLOSE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added this.

@Override
public synchronized void close()
{
// Call channel.close() unless readLocally() has been called. In that case, we expect the caller to close it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unify this logic, i.e. both the local and the remote readers call the close() on the reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this, and looked a bit into refactoring it to accomplish this, but I think it actually does make sense the way it is. The idea is consistently that callers should own the closing of the InputStream and ReadableFrameChannel that they get from the readRemotelyFrom and readLocally methods. Refactoring away from that would make the code here more complex, and also I think would make reasoning about ownership of the channels and streams more complex.

For the ChannelStageOutputReader, the readLocally method effectively transfers ownership of the channel to the caller, which is why it makes the ChannelStageOutputReader's own close method do nothing.

I've added javadocs to StageOutputReader and ChannelStageOutputReader clarifying this.

Copy link
Contributor

@LakshSingla LakshSingla Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggstion regarding naming of the class - Output is an overloaded term in MSQ. For example - outputChannel = readableChannel + writeableChannel.
In the context of this class, the StageOutputReader only encapsulates the readability portion. Should we remove output from the name to avoid such a confusion? Something like StageResultsReader and StageResultsHolder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the OutputChannel class naming has come up before as being confusing. IMO, if we're going to change the name of something, it should be that class. When it has a writable + readable channel pair, it's really more like a "channel pair" or a "pipe". Maybe it'd make sense to change that to OutputPipe in a follow-up, and have the readOnly() method return List<ReadableFrameChannel>?

At any rate, I don't think we need to let the funny naming of that class affect how we name this one.

*
* @see WorkerOrLocalInputChannelFactory#openChannel(StageId, int, int)
*/
ReadableFrameChannel readLocally();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should mention that this method can be called only once, while the readRemotelyFrom can be called multiple times with variable offsets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added this:

   * It is implementation-dependent whether this method can be called multiple times. In particular,
   * {@link ChannelStageOutputReader#readLocally()} can only be called one time, but the implementations in
   * {@link FileStageOutputReader} and {@link NilStageOutputReader} can be called multiple times.

}

@Override
public void close() throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this method also concern with closing the created randomAccessFile? ChannelStageOutputReader has a weird contract that readLocally() will close the channel directly, while the remote one would (presumably) call close() on the OutputReader. If a similar pattern is applied to the FileStageOutputReader, we might be left with unclosed RandomAccessFile references because the caller called close on the output reader, but no one called close on the input stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is the caller owns the returned InputStream, so the caller is responsible for closing it. I've updated the method documentation to clarify this:

   * Callers are responsible for closing the returned {@link InputStream}. This input stream may encapsulate
   * resources that are not closed by this class's {@link #close()} method.

Comment on lines +222 to +225
if (len > 0) {
final byte[] bytes = new byte[len];
src.get(bytes);
chunks.add(bytes);
Copy link
Contributor

@LakshSingla LakshSingla Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes additional copies of the bytes. I am wondering if
a. there's a way to reduce this additional copy.
b. Does having this copy add any meaningful overhead
c. If there's an overhead, was it present in the original code, or does the refactor introduce it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my thoughts:

a) I think the copy can be eliminated if we alter FrameFileWriter somehow. It accepts a WritableByteChannel that it writes the frame file to, and the WritableByteChannel method int write(ByteBuffer src) requires copying the bytes from src. (The ByteBuffer object cannot be retained, since the caller might reuse it.) So, we'd need to change FrameFileWriter to have some option other than WritableByteChannel.

b) I haven't done extensive profiling, so I'm not sure. I was planning to do more extensive profiling and look into further perf enhancements after this code lands in master.

c) If there's any overhead, it wasn't present in the original code, because this entire code path is new. This code path is only used for in-memory shuffles. The original code always read from on-disk frame files, & did not generate them on-the-fly.

}

@Override
public void close()
{
inMemoryWorkers.forEach((k, v) -> v.stopGracefully());
inMemoryWorkers.forEach((k, v) -> v.stop());
Copy link
Contributor

@LakshSingla LakshSingla Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what prompted this modification

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that stopGracefully was not really an appropriate method name to have on Worker. It's too linked to the behavior of tasks, and I wanted to make the Worker interface more easily extensible to workers that aren't tasks.

@@ -54,11 +54,12 @@ public boolean canTransitionFrom(final WorkerStagePhase priorPhase)
@Override
public boolean canTransitionFrom(final WorkerStagePhase priorPhase)
{
return priorPhase == PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES;
return priorPhase == PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES /* if globally sorting */
|| priorPhase == READING_INPUT /* if locally sorting */;
Copy link
Contributor

@LakshSingla LakshSingla Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this not get caught before this patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this patch, stages that locally sort would not enter the PRESHUFFLE_WRITING_OUTPUT phase. They would go directly from READING_INPUT to RESULTS_READY. But now, they need to enter PRESHUFFLE_WRITING_OUTPUT so they can send doneReadingInput to the controller.

@@ -70,7 +71,7 @@ public boolean canTransitionFrom(final WorkerStagePhase priorPhase)
@Override
public boolean canTransitionFrom(final WorkerStagePhase priorPhase)
{
return priorPhase == RESULTS_READY;
return priorPhase.compareTo(FINISHED) < 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should any stage be allowed to transition to finished, even if it hasn't produced any results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is needed for supporting LIMIT. When a downstream stage applies a LIMIT, it's possible for the upstream to be finished if it hasn't fully generated its output. I added a comment describing this case.

// Workers run all work orders they get. There is not (currently) any limit on the number of concurrent work
// orders; we rely on the controller to avoid overloading workers.
if (kernel.getPhase() == WorkerStagePhase.NEW
&& kernelHolders.runningKernelCount() < context.maxConcurrentStages()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workers in RESULTS_COMPLETE running in the MEMORY mode would be consuming a decent portion of the memory storing the result output. Does this condition not overload the memory on the worker because at most 1 stage will be not-running but consuming memory (in leap-frogging)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MEMORY just means the transfer is in-memory. The results aren't completely buffered in memory: just one frame at a time per channel. I added a note about this to OutputChannelMode#MEMORY.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New text:

  /**
   * In-memory output channels. Stage shuffle data does not hit disk. In-memory channels do not fully buffer stage
   * output. They use a blocking queue; see {@link RunWorkOrder#makeStageOutputChannelFactory()}.
   *
   * Because stage output is not fully buffered, this mode requires a consumer stage to run at the same time as its
   * corresponding producer stage. See {@link ControllerQueryKernelUtils#computeStageGroups} for the logic that
   * determines when we can use in-memory channels.
   */
  MEMORY("memory"),

Copy link
Contributor

@LakshSingla LakshSingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there end-to-end unit tests testing out the in-memory shuffles with >1 concurrent stages?

@gianm
Copy link
Contributor Author

gianm commented Jul 30, 2024

Are there end-to-end unit tests testing out the in-memory shuffles with >1 concurrent stages?

There aren't end to end tests yet, just unit tests of some of the important pieces. I was thinking integration tests would be ideal (to ensure all the http endpoints work properly as well) but have not yet created those. I was thinking a good time to do that would be after the initial work is committed, but before it's documented.

@LakshSingla
Copy link
Contributor

lgtm 🚀

@gianm gianm merged commit 01f6cfc into apache:master Jul 31, 2024
88 checks passed
@gianm gianm deleted the msq-memory-shuffles branch July 31, 2024 01:41
sreemanamala pushed a commit to sreemanamala/druid that referenced this pull request Aug 6, 2024
* MSQ worker: Support in-memory shuffles.

This patch is a follow-up to apache#16168, adding worker-side support for
in-memory shuffles. Changes include:

1) Worker-side code now respects the same context parameter "maxConcurrentStages"
   that was added to the controller in apache#16168. The parameter remains undocumented
   for now, to give us a chance to more fully develop and test this functionality.

1) WorkerImpl is broken up into WorkerImpl, RunWorkOrder, and RunWorkOrderListener
   to improve readability.

2) WorkerImpl has a new StageOutputHolder + StageOutputReader concept, which
   abstract over memory-based or file-based stage results.

3) RunWorkOrder is updated to create in-memory stage output channels when
   instructed to.

4) ControllerResource is updated to add /doneReadingInput/, so the controller
   can tell when workers that sort, but do not gather statistics, are done reading
   their inputs.

5) WorkerMemoryParameters is updated to consider maxConcurrentStages.

Additionally, WorkerChatHandler is split into WorkerResource, so as to match
ControllerChatHandler and ControllerResource.

* Updates for static checks, test coverage.

* Fixes.

* Remove exception.

* Changes from review.

* Address static check.

* Changes from review.

* Improvements to docs and method names.

* Update comments, add test.

* Additional javadocs.

* Fix throws.

* Fix worker stopping in tests.

* Fix stuck test.
LakshSingla added a commit to LakshSingla/druid that referenced this pull request Aug 9, 2024
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Release Notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants