Skip to content

Commit

Permalink
try ledger recovery from previous entries in cas eof corrupt/missing …
Browse files Browse the repository at this point in the history
…footer of the chunked data
  • Loading branch information
dlg99 committed May 23, 2024
1 parent 4b73c64 commit c1ee50d
Showing 1 changed file with 72 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -581,48 +581,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac

// Read the last entry in the ledger
long lastEntryInLedger = lh.getLastAddConfirmed();

if (lastEntryInLedger < 0) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
}

lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
if (isBkErrorNotRecoverable(rc1)) {
log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));

callback.operationFailed(createManagedLedgerException(rc1));
return;
}

LedgerEntry entry = seq.nextElement();
byte[] data = entry.getEntry();
try {
ChunkSequenceFooter chunkSequenceFooter = parseChunkSequenceFooter(data);
if (chunkSequenceFooter.numParts > 0) {
readChunkSequence(callback, lh, lastEntryInLedger, chunkSequenceFooter);
} else {
completeCursorRecovery(callback, lh, data);
}
} catch (IOException error) {
log.error("Cannot parse footer", error);
callback.operationFailed(ManagedLedgerException.getManagedLedgerException(error));
}
}, null);
recoverFromLedgerByEntryId(info, callback, lh, lastEntryInLedger);
};

try {
bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null);
} catch (Throwable t) {
Expand All @@ -632,6 +593,66 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}
}

private void recoverFromLedgerByEntryId(ManagedCursorInfo info,
VoidCallback callback,
LedgerHandle lh,
long entryId) {
long ledgerId = lh.getId();

if (entryId < 0) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
}

lh.asyncReadEntries(entryId, entryId, (rc1, lh1, seq, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
if (isBkErrorNotRecoverable(rc1)) {
log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));

callback.operationFailed(createManagedLedgerException(rc1));
return;
}

LedgerEntry entry = seq.nextElement();
byte[] data = entry.getEntry();
try {
ChunkSequenceFooter chunkSequenceFooter = parseChunkSequenceFooter(data);
if (chunkSequenceFooter.numParts > 0) {
readChunkSequence(callback, lh, entryId, chunkSequenceFooter);
} else {
Throwable res = tryCompleteCursorRecovery(lh, data);
if (res == null) {
callback.operationComplete();
} else {
log.warn("[{}] Error recovering from metadata ledger {} entry {} for cursor {}. "
+ "Will try recovery from previous entry.",
ledger.getName(), ledgerId, entryId, name, res);
//try recovery from previous entry
recoverFromLedgerByEntryId(info, callback, lh, entryId - 1);
}
}
} catch (IOException error) {
log.error("Cannot parse footer", error);
log.warn("[{}] Error recovering from metadata ledger {} entry {} for cursor {}, cannot parse footer. "
+ "Will try recovery from previous entry.",
ledger.getName(), ledgerId, entryId, name, error);
recoverFromLedgerByEntryId(info, callback, lh, entryId - 1);
}
}, null);
}

private void readChunkSequence(VoidCallback callback, LedgerHandle lh,
long footerPosition, ChunkSequenceFooter chunkSequenceFooter) {
long startPos = footerPosition - chunkSequenceFooter.numParts;
Expand All @@ -657,7 +678,12 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entri
callback.operationFailed(ManagedLedgerException.getManagedLedgerException(new IOException(
"Expected " + chunkSequenceFooter.length + " bytes but read " + result.length + " bytes")));
}
completeCursorRecovery(callback, lh, result);
Throwable res = tryCompleteCursorRecovery(lh, result);
if (res == null) {
callback.operationComplete();
} else {
callback.operationFailed(new ManagedLedgerException(res));
}
}
}, null);
}
Expand All @@ -681,16 +707,15 @@ private ChunkSequenceFooter parseChunkSequenceFooter(byte[] data) throws IOExcep
return ObjectMapperFactory.getMapper().getObjectMapper().readValue(data, ChunkSequenceFooter.class);
}

private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byte[] data) {
private Throwable tryCompleteCursorRecovery(LedgerHandle lh, byte[] data) {
mbean.addReadCursorLedgerSize(data.length);

try {
data = decompressDataIfNeeded(data, lh);
} catch (Throwable e) {
log.error("[{}] Failed to decompress position info from ledger {} for cursor {}: {}", ledger.getName(),
lh.getId(), name, e);
callback.operationFailed(new ManagedLedgerException(e));
return;
return e;
}

PositionInfo positionInfo;
Expand All @@ -699,8 +724,7 @@ private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byt
} catch (InvalidProtocolBufferException e) {
log.error("[{}] Failed to parse position info from ledger {} for cursor {}: {}", ledger.getName(),
lh.getId(), name, e);
callback.operationFailed(new ManagedLedgerException(e));
return;
return e;
}

Map<String, Long> recoveredProperties = Collections.emptyMap();
Expand All @@ -722,7 +746,7 @@ private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byt
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
recoveredCursor(position, recoveredProperties, cursorProperties, lh);
callback.operationComplete();
return null;
}

private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
Expand Down

0 comments on commit c1ee50d

Please sign in to comment.