-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Describe the bug
Pulsar can get stuck on a single unreadable entry in bookkeeper
To Reproduce
Increase max message size from the default 5M to i.e 10M.
Write a ledger/stream with entries under 5M, entry < 5M, and then some that are less than 5M.
reduce max message size back to 5M.
try to process the ledger
Pulsar gets stuck on the entry > 5M and autoSkipNonRecoverableData does not help
Pulsar logs
org.apache.bookkeeper.mledger.impl.OpReadEntry - ... read failed from ledger at position:X:Y : Bookie handle is not available
Expected behavior
autoSkipNonRecoverableData to allow skipping such entry
Additional context
This is not a problem right now (worked around this) and I will not spend more time on this, mostly an FYI in case anyone hits this.
Below is rather untested diff in case anyone needs it; to deal with this normally it would require unit tests with repro of such situations and/or similar tests in the bookkeeper (plus, possibly, better handlings of such entries on the bookie side)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 91a6e26f567..fd0b0519280 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -23,6 +23,8 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.List;
+
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -97,6 +99,22 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
callback.readEntriesComplete(entries, ctx);
recycle();
}));
+ } else if (cursor.config.isAutoSkipNonRecoverableData()
+ && exception.getCause() instanceof BKException.BKBookieHandleNotAvailableException) {
+ // It is possible to create situation when bookie client won't be able to read valid existing entry.
+ // Specifically: write large entry and then reduce max message size
+ // Bookie client will disconnect on attempt to deal with this
+ // and throw the exception BKBookieHandleNotAvailableException.
+ log.warn("[{}][{}] read failed from ledger at position:{} : {}; will skip the entry",
+ cursor.ledger.getName(),
+ cursor.getName(),
+ readPosition,
+ exception.getMessage(),
+ exception);
+ // Move to next valid position, skipping this one entry
+ final Position nexReadPosition = readPosition.getNext();
+ updateReadPosition(nexReadPosition);
+ checkReadCompletion();
} else if (cursor.config.isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
readPosition, exception.getMessage());