Skip to content

Commit

Permalink
[HUDI-5488] Make sure Disrupt queue start first, then insert records (a…
Browse files Browse the repository at this point in the history
…pache#7582)

We must to make sure to set up Disruptor's queue first, then producer can insert records to the queue. But currently we have no idea which thread start first, so this pr tries to fix it.
  • Loading branch information
boneanxs authored and jian.feng committed Jan 31, 2023
1 parent 05bd0d5 commit 5e9a931
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public Integer finish() {
};

DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = new DisruptorExecutor(Option.of(1024),
producers, Option.of(consumer), getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA),
producers, consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA),
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());

final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ protected void doProduce(HoodieMessageQueue<I, O> queue, HoodieProducer<I> produ

protected abstract void doConsume(HoodieMessageQueue<I, O> queue, HoodieConsumer<O, E> consumer);

protected void setUp() {}

/**
* Start producing
*/
Expand Down Expand Up @@ -165,6 +167,7 @@ public boolean isRunning() {
public E execute() {
try {
checkState(this.consumer.isPresent());
setUp();
// Start consuming/producing asynchronously
CompletableFuture<Void> consuming = startConsumingAsync();
CompletableFuture<Void> producing = startProducingAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,24 @@ public class DisruptorExecutor<I, O, E> extends BaseHoodieQueueBasedExecutor<I,

public DisruptorExecutor(final Option<Integer> bufferSize, final Iterator<I> inputItr,
HoodieConsumer<O, E> consumer, Function<I, O> transformFunction, Option<String> waitStrategy, Runnable preExecuteRunnable) {
this(bufferSize, Collections.singletonList(new IteratorBasedQueueProducer<>(inputItr)), Option.of(consumer),
this(bufferSize, Collections.singletonList(new IteratorBasedQueueProducer<>(inputItr)), consumer,
transformFunction, waitStrategy, preExecuteRunnable);
}

public DisruptorExecutor(final Option<Integer> bufferSize, List<HoodieProducer<I>> producers,
Option<HoodieConsumer<O, E>> consumer, final Function<I, O> transformFunction,
HoodieConsumer<O, E> consumer, final Function<I, O> transformFunction,
final Option<String> waitStrategy, Runnable preExecuteRunnable) {
super(producers, consumer, new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable), preExecuteRunnable);
super(producers, Option.of(consumer), new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable), preExecuteRunnable);
}

@Override
protected void doConsume(HoodieMessageQueue<I, O> queue, HoodieConsumer<O, E> consumer) {
protected void setUp() {
DisruptorMessageQueue<I, O> disruptorQueue = (DisruptorMessageQueue<I, O>) queue;
// Before we start producing, we need to set up Disruptor's queue
disruptorQueue.setHandlers(consumer);
disruptorQueue.setHandlers(consumer.get());
disruptorQueue.start();
}

@Override
protected void doConsume(HoodieMessageQueue<I, O> queue, HoodieConsumer<O, E> consumer) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> {
private final RingBuffer<HoodieDisruptorEvent> ringBuffer;

private boolean isShutdown = false;
private boolean isStarted = false;

public DisruptorMessageQueue(Option<Integer> bufferSize, Function<I, O> transformFunction, Option<String> waitStrategyName, int totalProducers, Runnable preExecuteRunnable) {
WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyName);
Expand All @@ -60,6 +61,10 @@ public long size() {

@Override
public void insertRecord(I value) throws Exception {
if (!isStarted) {
throw new HoodieException("Can't insert into the queue since the queue is not started yet");
}

if (isShutdown) {
throw new HoodieException("Can't insert into the queue after it had already been closed");
}
Expand Down Expand Up @@ -92,6 +97,7 @@ public void close() {
synchronized (this) {
if (!isShutdown) {
isShutdown = true;
isStarted = false;
queue.shutdown();
}
}
Expand All @@ -104,7 +110,12 @@ protected void setHandlers(HoodieConsumer consumer) {
}

protected void start() {
queue.start();
synchronized (this) {
if (!isStarted) {
queue.start();
isStarted = true;
}
}
}

/**
Expand Down

0 comments on commit 5e9a931

Please sign in to comment.