New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-5023] Cleaning up QueueBasedExecutor
impls
#7238
Conversation
Ack Will finish my review this week. |
6241a82
to
eb19e94
Compare
* Start all producers at once. | ||
*/ | ||
@Override | ||
public CompletableFuture<Void> startProducers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method (w/ modifications) have been moved into base class to be shared across impls
// Consumer | ||
protected final Option<HoodieConsumer<O, E>> consumer; | ||
|
||
public BaseHoodieQueueBasedExecutor(List<HoodieProducer<I>> producers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideas underpinning this class were
- Abstract common logic across impls
- Make sure all concurrency is handled exclusively in the base class (impl should just write sync code of how to produce/consume to/from the queue)
- Make sure all resource lifecycle handling happens in the base class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work here!
* HoodieExecutorBase holds common elements producerExecutorService, consumerExecutorService, producers and a single consumer. | ||
* Also HoodieExecutorBase control the lifecycle of producerExecutorService and consumerExecutorService. | ||
*/ | ||
public abstract class HoodieExecutorBase<I, O, E> implements HoodieExecutor<I, O, E> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class has been replaced by BaseHoodieQueueBasedExecutor
/** | ||
* IteratorBasedHoodieMessageQueue implements HoodieMessageQueue with Iterable | ||
*/ | ||
public abstract class HoodieIterableMessageQueue<I, O> implements HoodieMessageQueue<I, O>, Iterable<O> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed anymore
/** | ||
* Consume entries from queue and execute callback function. | ||
*/ | ||
public abstract class IteratorBasedQueueConsumer<I, O> implements HoodieConsumer<I, O> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed anymore
@@ -208,19 +205,33 @@ public BoundedMemoryRecords( | |||
Schema logSchema, | |||
Configuration hadoopConf, | |||
org.apache.flink.configuration.Configuration flinkConf) { | |||
HoodieUnMergedLogRecordScanner.Builder scannerBuilder = HoodieUnMergedLogRecordScanner.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This refactoring is necessary to avoid exposing internals of the executor (queue), instead relying on the provided interfaces to configure it
@hudi-bot run azure |
eb19e94
to
e17b127
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice Work!
Just left a few minor comments
/** | ||
* HoodieExecutorBase holds common elements producerExecutorService, consumerExecutorService, producers and a single consumer. | ||
* Also HoodieExecutorBase control the lifecycle of producerExecutorService and consumerExecutorService. | ||
*/ | ||
public abstract class BaseHoodieQueueBasedExecutor<I, O, E> implements HoodieExecutor<I, O, E> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: HoodieExecutorBase => BaseHoodieQueueBasedExecutor
public final CompletableFuture<Void> startProducingAsync() { | ||
return allOf(producers.stream() | ||
.map(producer -> CompletableFuture.supplyAsync(() -> { | ||
doProduce(queue, producer); | ||
return (Void) null; | ||
}, producerExecutorService)) | ||
.collect(Collectors.toList()) | ||
) | ||
.thenApply(ignored -> (Void) null) | ||
.whenComplete((result, throwable) -> { | ||
// Regardless of how producing has completed, we have to close producers | ||
// to make sure resources are properly cleaned up | ||
producers.forEach(HoodieProducer::close); | ||
// Mark production as done so that consumer will be able to exit | ||
queue.seal(); | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using
public final CompletableFuture<Void> startProducingAsync() {
return CompletableFuture.allOf(producers.stream().map(producer -> {
return CompletableFuture.supplyAsync(() -> {
doProduce(queue, producer);
return (Void) null;
}, producerExecutorService);
}).toArray(CompletableFuture[]::new)).whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
// Regardless of how producing has completed, we have to close producers
// to make sure resources are properly cleaned up
producers.forEach(HoodieProducer::close);
// Mark production as done so that consumer will be able to exit
queue.seal();
}
});
}
may be no need to do .thenApply(ignored -> (Void) null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #7245, i'm actually going to replace this w/ an allOf
instance having slightly different semantic (allowing to cascade cancellations upon first encountered failure)
return producing.thenCombine(consuming, (aVoid, anotherVoid) -> null) | ||
.whenComplete((ignored, throwable) -> { | ||
// Close the queue to release the resources | ||
queue.close(); | ||
}) | ||
.thenApply(ignored -> consumer.get().finish()) | ||
// Block until producing and consuming both finish | ||
.join(); | ||
} catch (Exception e) { | ||
throw new HoodieException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that TestBoundedInMemoryExecutorInSpark#testInterruptExecutor is blocked here which case ut timeout.
Maybe we can revisit the logic here about how to handle interrupt exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, join
actually doesn't throw InterruptedException
. Addressed
// Consumer | ||
protected final Option<HoodieConsumer<O, E>> consumer; | ||
|
||
public BaseHoodieQueueBasedExecutor(List<HoodieProducer<I>> producers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work here!
@Override | ||
public void shutdownNow() { | ||
producerExecutorService.shutdownNow(); | ||
consumerExecutorService.shutdownNow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to close current queue into this shutdownNow func.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually close the queue w/in the execute
method even in the presence of exceptions (which will cascade from shutdownNow
if invoked). Idea here is that shutdownNow
should only be used as last-ditch effort of (quickly!) shutting down spun up threads only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That make sense !
@@ -31,11 +32,18 @@ | |||
|
|||
public class QueueBasedExecutorFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can also rename QueueBasedExecutorFactory to ExecutorFactory because we will support simpleExecutor here which has no inner Queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Let's take this up in the PR introducing SimpleExecutor
5e9080b
to
3532ca4
Compare
QueueBasedExecutor
implsQueueBasedExecutor
impls
0c62553
to
a76c39f
Compare
@hudi-bot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykudinkin @zhangyue19921010 Do you plan to re-run the same benchmark as in #5416 for this PR as well?
@@ -149,6 +153,7 @@ public void close() throws IOException { | |||
|
|||
@Override | |||
public float getProgress() throws IOException { | |||
return Math.min(parquetReader.getProgress(), logRecordScanner.getProgress()); | |||
// TODO fix to reflect scanner progress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we need that for "unmerged" record reader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is just tracking progress in terms of how many records were processed from the base-file vs logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced up offline. This will be addressed in followup PR.
}) | ||
.build(); | ||
// Scan all the delta-log files, filling in the queue | ||
scanner.scan(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe assign scanner progress here, which can be used in L157.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take this up in #7245
Yeap, I run this benchmark several times based on this PR and the result are the same as before. Thanks for reminding! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LTGM! Thanks for this GREAT work!
After UTs green maybe we can land it. Maybe we can rebase master #7346 and have another try.
Cleaned up `HoodieIterableMessageQueue`
…tind down the queue prematurely)
Fixed handling of the case when there's no consumer provide closing the queue prematurely
…e of DQ returning prematurely
a76c39f
to
e4652c0
Compare
@@ -149,6 +153,7 @@ public void close() throws IOException { | |||
|
|||
@Override | |||
public float getProgress() throws IOException { | |||
return Math.min(parquetReader.getProgress(), logRecordScanner.getProgress()); | |||
// TODO fix to reflect scanner progress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced up offline. This will be addressed in followup PR.
- Further cleaning up some of the historically inherited artifacts - Replacing remaining usages of the BIMQ, w/ QueueBasedExecutorFactory - Adding missing deps to bundles - Lifted up all concurrency management to `BaseHoodieQueueBasedExuecutor` - Fixed handling of the case when there's no consumer provide closing the queue prematurely
Change Logs
This is a follow-up PR for #5416 that is
QueueBasedExecutorFactory
This change is a precursor for both #7245 and #7174
Impact
No impact
Risk level (write none, low medium or high below)
Low
Documentation Update
N/A
Contributor's checklist