Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] cannot cleanup expired data after managed-ledger restart #10087

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1654,13 +1654,28 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
}

if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
boolean shouldCursorMoveForward = false;
try {
long ledgerEntries = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
Long nextValidLedger = ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
shouldCursorMoveForward = (markDeletePosition.getEntryId() + 1 >= ledgerEntries)
&& (newPosition.getLedgerId() == nextValidLedger);
} catch (Exception e) {
log.warn("Failed to get ledger entries while setting mark-delete-position", e);
}

if (shouldCursorMoveForward) {
log.info("[{}] move mark-delete-position from {} to {} since all the entries have been consumed",
ledger.getName(), markDeletePosition, newPosition);
} else {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
}
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
}
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
}

lock.writeLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ public void initializeComplete() {
log.info("[{}] Successfully initialize managed ledger", name);
pendingInitializeLedgers.remove(name, pendingLedger);
future.complete(newledger);

// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1398,8 +1398,6 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct
} else {
log.info("[{}] Created new ledger {}", name, lh.getId());
ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build());
final long previousEntries = currentLedgerEntries;
final long previousLedgerId = currentLedger.getId();
currentLedger = lh;
currentLedgerEntries = 0;
currentLedgerSize = 0;
Expand All @@ -1417,14 +1415,9 @@ public void operationComplete(Void v, Stat stat) {
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
}
// Move cursor read point to new ledger
for (ManagedCursor cursor : cursors) {
PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
if (markDeletedPosition.getLedgerId() == previousLedgerId && markDeletedPosition.getEntryId() + 1 >= previousEntries) {
// All entries in last ledger are marked delete, move read point to the new ledger
updateCursor((ManagedCursorImpl) cursor, PositionImpl.get(currentLedger.getId(), -1));
}
}

// May need to update the cursor position
maybeUpdateCursorBeforeTrimmingConsumedLedger();
}

@Override
Expand Down Expand Up @@ -2168,6 +2161,39 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
this.waitingEntryCallBacks.add(cb);
}

public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
for (ManagedCursor cursor : cursors) {
PositionImpl lastAckedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
.map(Map.Entry::getValue).orElse(null);

if (currPointedLedger != null) {
if (nextPointedLedger != null) {
if (lastAckedPosition.getEntryId() != -1 &&
lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
lastAckedPosition = new PositionImpl(nextPointedLedger.getLedgerId(), -1);
}
} else {
log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor);
}
} else {
log.warn("Cursor: {} does not exist in the managed-ledger.", cursor);
}

if (!lastAckedPosition.equals((PositionImpl) cursor.getMarkDeletedPosition())) {
try {
log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
updateCursor((ManagedCursorImpl) cursor, lastAckedPosition);
} catch (Exception e) {
log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
cursor, cursor.getMarkDeletedPosition(), lastAckedPosition);
log.warn("Caused by", e);
}
}
}
}

private void trimConsumedLedgersInBackground() {
trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void testPropertiesClose() throws Exception {
ledger.addEntry("entry-1".getBytes());
ledger.addEntry("entry-2".getBytes());
Position p3 = ledger.addEntry("entry-3".getBytes());
ledger.addEntry("entry-4".getBytes());

Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
Expand Down Expand Up @@ -82,6 +83,7 @@ void testPropertiesRecoveryAfterCrash() throws Exception {
ledger.addEntry("entry-1".getBytes());
ledger.addEntry("entry-2".getBytes());
Position p3 = ledger.addEntry("entry-3".getBytes());
ledger.addEntry("entry-4".getBytes());

Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
Expand Down Expand Up @@ -113,6 +115,7 @@ void testPropertiesOnDelete() throws Exception {
ledger.addEntry("entry-1".getBytes());
Position p2 = ledger.addEntry("entry-2".getBytes());
Position p3 = ledger.addEntry("entry-3".getBytes());
ledger.addEntry("entry-4".getBytes());

Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
Expand Down Expand Up @@ -2914,6 +2915,48 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
Assert.assertEquals(ledger.getTotalSize(), 0);
}

@Test
public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionTime(1, TimeUnit.SECONDS);
config.setMaxEntriesPerLedger(2);
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
config.setMaximumRolloverTime(500, TimeUnit.MILLISECONDS);

ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("ml_restart_ledger", config);
ManagedCursor cursor = managedLedger.openCursor("c1");

for (int i = 0; i < 3; i++) {
managedLedger.addEntry(new byte[1024 * 1024]);
}

// we have 2 ledgers at the beginning [{entries=2}, {entries=1}]
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);
List<Entry> entries = cursor.readEntries(3);

for (Entry entry : entries) {
cursor.markDelete(entry.getPosition());
}
entries.forEach(e -> e.release());

// managed-ledger restart
managedLedger.close();
managedLedger = (ManagedLedgerImpl) factory.open("ml_restart_ledger", config);

// then we have one more empty ledger after managed-ledger initialization
// and now ledgers are [{entries=2}, {entries=1}, {entries=0}]
Assert.assertTrue(managedLedger.getLedgersInfoAsList().size() >= 2);

// Now we update the cursors that are still subscribing to ledgers that has been consumed completely
managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
managedLedger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
Thread.sleep(100);

// We only have one empty ledger at last [{entries=0}]
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
Assert.assertEquals(managedLedger.getTotalSize(), 0);
}

@Test(timeOut = 20000)
public void testAsyncTruncateLedgerRetention() throws Exception {

Expand Down