Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ public Entry get() {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
if (managedLedger.getConfig().isAutoSkipNonRecoverableData()
&& exception instanceof ManagedLedgerException.NonRecoverableLedgerException
|| exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the ManagedLedgerFencedException check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And from the current implementation, if return isReadable here which means the pending ack reply will be complete, is it expected behavior? https://github.com/apache/pulsar/pull/14781/files#diff-07a1d142ec4105cb65fac733494182b461214a68d735ae1c3f5104c3eb4f92dbL363

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the ManagedLedgerFencedException check?

we can't delete ManagedLedgerFencedException, that's my mistake

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And from the current implementation, if return isReadable here which means the pending ack reply will be complete, is it expected behavior? https://github.com/apache/pulsar/pull/14781/files#diff-07a1d142ec4105cb65fac733494182b461214a68d735ae1c3f5104c3eb4f92dbL363

if ManagedLedgerFencedException or CursorAlreadyClosedxception, the pending ack has been closed so we can stop the recover.

|| exception instanceof ManagedLedgerException.ManagedLedgerFencedException
|| exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
isReadable = false;
}
log.error("MLPendingAckStore of topic [{}] stat reply fail!", managedLedger.getName(), exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,17 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription);
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle2.getStats().state, "Ready"));

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

PendingAckHandleImpl pendingAckHandle3 = new PendingAckHandleImpl(persistentSubscription);

Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle3.getStats().state, "Ready"));
}

@Test
Expand Down