Skip to content

Commit

Permalink
make KafkaSourceRecord ack() async to avoid deadlock (apache#11435) (#18
Browse files Browse the repository at this point in the history
)

The `ack()` method of the `AbstractKafkaSourceRecord` should be non-blocking. Otherwise there'll be deadlock for pulsar-client-io thread and the main `public/default/debezium-mongodb-source-0` thread. And further blocks the whole debezium connector to work correctly.

1. remove the blocking `future.get()` call from `ack()`
2. move the commit logic into callbacks

Co-authored-by: Neng Lu <nlu@streamnative.io>
  • Loading branch information
dlg99 and nlu90 committed Jan 11, 2022
1 parent 6efdde5 commit 041ce94
Showing 1 changed file with 2 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void close() {

private static Map<String, String> PROPERTIES = Collections.emptyMap();
private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
private static long FLUSH_TIMEOUT_MS = 2000;
private static long FLUSH_TIMEOUT_MS = 60000;

public abstract class AbstractKafkaSourceRecord<T> implements Record {
@Getter
Expand Down Expand Up @@ -247,9 +247,8 @@ private void completedFlushOffset(Throwable error, Void result) {
flushFuture.complete(null);
} catch (InterruptedException exception) {
log.warn("Flush of {} offsets interrupted, cancelling", this);
Thread.currentThread().interrupt();
offsetWriter.cancelFlush();
flushFuture.completeExceptionally(new Exception("Failed to commit offsets", exception));
flushFuture.completeExceptionally(new Exception("Failed to commit offsets"));
} catch (Throwable t) {
// SourceTask can throw unchecked ConnectException/KafkaException.
// Make sure the future is cancelled in that case
Expand Down Expand Up @@ -281,20 +280,6 @@ public void ack() {
flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
return;
}

// Wait until the offsets are flushed
try {
doFlush.get(FLUSH_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("Flush of {} offsets interrupted, cancelling", this);
offsetWriter.cancelFlush();
} catch (ExecutionException e) {
log.error("Flush of {} offsets threw an unexpected exception: ", this, e);
offsetWriter.cancelFlush();
} catch (TimeoutException e) {
log.error("Timed out waiting to flush {} offsets to storage", this);
offsetWriter.cancelFlush();
}
}
}

Expand Down

0 comments on commit 041ce94

Please sign in to comment.