Skip to content

Commit

Permalink
fixing exception flow in HoodieExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored and xushiyan committed May 19, 2023
1 parent fc942ae commit f5753cd
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 12 deletions.
Expand Up @@ -281,16 +281,6 @@ public Integer finish() {
@Timeout(value = 60)
public void testException() throws Exception {
final int numRecords = 1000;
final int numProducers = 40;

final DisruptorMessageQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
new DisruptorMessageQueue(1024, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig),
"BLOCKING_WAIT", numProducers, new Runnable() {
@Override
public void run() {
// do nothing.
}
});

List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, numRecords);

Expand All @@ -307,8 +297,7 @@ public void run() {
}));
}
}



HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {

Expand Down
Expand Up @@ -215,6 +215,11 @@ public E execute() {
// to be interrupted as well
Thread.currentThread().interrupt();
}
// throw if we have any other exception seen already. There is a chance that cancellation/closing of producers with CompeletableFuture wins before the actual exception
// is thrown.
if (this.queue.getThrowable() != null) {
throw new HoodieException(queue.getThrowable());
}

throw new HoodieException(e);
}
Expand Down
Expand Up @@ -270,6 +270,11 @@ public void markAsFailed(Throwable e) {
this.rateLimiter.release(RECORD_CACHING_LIMIT + 1);
}

@Override
public Throwable getThrowable() {
return this.hasFailed.get();
}

@Override
public boolean isEmpty() {
return this.queue.size() == 0;
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

/**
Expand All @@ -47,6 +48,7 @@ public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> {
private final Disruptor<HoodieDisruptorEvent> queue;
private final Function<I, O> transformFunction;
private final RingBuffer<HoodieDisruptorEvent> ringBuffer;
private AtomicReference<Throwable> throwable = new AtomicReference<>(null);

private boolean isShutdown = false;
private boolean isStarted = false;
Expand Down Expand Up @@ -89,9 +91,15 @@ public Option<O> readNextRecord() {

@Override
public void markAsFailed(Throwable e) {
this.throwable.compareAndSet(null, e);
// no-op
}

@Override
public Throwable getThrowable() {
return this.throwable.get();
}

@Override
public boolean isEmpty() {
return ringBuffer.getBufferSize() == ringBuffer.remainingCapacity();
Expand Down
Expand Up @@ -50,6 +50,8 @@ public interface HoodieMessageQueue<I, O> extends Closeable {
*/
void markAsFailed(Throwable e);

Throwable getThrowable();

boolean isEmpty();

/**
Expand Down

0 comments on commit f5753cd

Please sign in to comment.