Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data is not deleted after expiration due to connected readers #5621

Merged
merged 6 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ private ByteBuf copyEntry(EntryImpl entry) {
public void invalidateEntries(final PositionImpl lastPosition) {
final PositionImpl firstPosition = PositionImpl.get(-1, 0);

if (firstPosition.compareTo(lastPosition) > 0) {
log.debug("Attempted to invalidate entries in an invalid range : {} ~ {}",
firstPosition, lastPosition);
return;
}

Pair<Integer, Long> removed = entries.removeRange(firstPosition, lastPosition, false);
int entriesRemoved = removed.getLeft();
long sizeRemoved = removed.getRight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* care about ledgers to be deleted.
*
*/
class ManagedCursorContainer implements Iterable<ManagedCursor> {
public class ManagedCursorContainer implements Iterable<ManagedCursor> {

private static class Item {
final ManagedCursor cursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class ManagedCursorImpl implements ManagedCursor {

protected volatile PositionImpl markDeletePosition;
protected volatile PositionImpl readPosition;
private volatile MarkDeleteEntry lastMarkDeleteEntry;
protected volatile MarkDeleteEntry lastMarkDeleteEntry;

protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp");
Expand Down Expand Up @@ -178,7 +178,7 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
}
}

private final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
protected final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount");
@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,12 +863,12 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
}

@Override
public Iterable<ManagedCursor> getCursors() {
public ManagedCursorContainer getCursors() {
return cursors;
}

@Override
public Iterable<ManagedCursor> getActiveCursors() {
public ManagedCursorContainer getActiveCursors() {
return activeCursors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) {
// Start from last entry
initializeCursorPosition(ledger.getLastPositionAndCounter());
} else if (startCursorPosition.equals(PositionImpl.earliest)) {
} else if (startCursorPosition.getLedgerId() == PositionImpl.earliest.getLedgerId()) {
// Start from invalid ledger to read from first available entry
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
} else {
Expand Down Expand Up @@ -83,6 +83,12 @@ void recover(final VoidCallback callback) {
protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx) {
// Bypass persistence of mark-delete position and individually deleted messages info

MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx);
lastMarkDeleteEntry = mdEntry;
// it is important to advance cursor so the retention can kick in as expected.
ledger.updateCursor(NonDurableCursorImpl.this, mdEntry.newPosition);

callback.markDeleteComplete(ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
Expand Down Expand Up @@ -610,6 +611,50 @@ void testCursorWithNameIsCachable() throws Exception {
ledger.close();
}

@Test
public void testGetSlowestConsumer() throws Exception {
final String mlName = "test-get-slowest-consumer-ml";
final String c1 = "cursor1";
final String nc1 = "non-durable-cursor1";
final String ncEarliest = "non-durable-cursor-earliest";

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, new ManagedLedgerConfig());
Position p1 = ledger.addEntry(c1.getBytes(UTF_8));
log.info("write entry 1 : pos = {}", p1);
Position p2 = ledger.addEntry(nc1.getBytes(UTF_8));
log.info("write entry 2 : pos = {}", p2);
Position p3 = ledger.addEntry(nc1.getBytes(UTF_8));
log.info("write entry 3 : pos = {}", p3);

ManagedCursor cursor1 = ledger.openCursor(c1);
cursor1.seek(p3);
assertEquals(p3, ledger.getCursors().getSlowestReaderPosition());

ManagedCursor nonCursor1 = ledger.newNonDurableCursor(p2, nc1);
assertEquals(p2, ledger.getCursors().getSlowestReaderPosition());

PositionImpl earliestPos = new PositionImpl(-1, -2);

ManagedCursor nonCursorEarliest = ledger.newNonDurableCursor(earliestPos, ncEarliest);
PositionImpl expectedPos = new PositionImpl(((PositionImpl) p1).getLedgerId(), -1);
assertEquals(expectedPos, ledger.getCursors().getSlowestReaderPosition());

// move non-durable cursor should update the slowest reader position
nonCursorEarliest.markDelete(p1);
assertEquals(p1, ledger.getCursors().getSlowestReaderPosition());

nonCursorEarliest.markDelete(p2);
assertEquals(p2, ledger.getCursors().getSlowestReaderPosition());

nonCursorEarliest.markDelete(p3);
assertEquals(p2, ledger.getCursors().getSlowestReaderPosition());

nonCursor1.markDelete(p3);
assertEquals(p3, ledger.getCursors().getSlowestReaderPosition());

ledger.close();
}

@Test(expectedExceptions = NullPointerException.class)
void testCursorWithNameIsNotNull() throws Exception {
final String p1CursorName = "entry-1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,9 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri

long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
if (msgId instanceof BatchMessageIdImpl) {
if (ledgerId >= 0
&& msgId instanceof BatchMessageIdImpl
&& ((BatchMessageIdImpl) msgId).getBatchIndex() >= 0) {
// When the start message is relative to a batch, we need to take one step back on the previous message,
// because the "batch" might not have been consumed in its entirety.
// The client will then be able to discard the first messages if needed.
Expand Down