Skip to content

Commit

Permalink
Fix marking individual deletes as dirty (#9732)
Browse files Browse the repository at this point in the history
* Fix marking individual deletes as dirty

When we mark cursors as dirty, we aren't marking when individual acks
cause a dirty cursor.

This results in cursors not being flushed and causing redelivery.

This one line fix will ensure we mark the cursor as dirty in this
situation as well

* add a test

* improve tests to not use sleep

* make the polling rate be slower
  • Loading branch information
addisonj committed Mar 1, 2021
1 parent 74db656 commit 34ca893
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1944,6 +1944,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb

// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
PositionImpl finalNewMarkDeletePosition = newMarkDeletePosition;
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this,
last -> new MarkDeleteEntry(finalNewMarkDeletePosition, last.properties, null, null));
Expand Down Expand Up @@ -2931,6 +2932,9 @@ void flush() {
asyncMarkDelete(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Flushed dirty mark-delete position", ledger.getName(), name);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
Expand Down Expand Up @@ -3387,18 +3388,88 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {

assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1));

// Give chance to the flush to be automatically triggered.
Thread.sleep(3000);
Awaitility.await()
// Give chance to the flush to be automatically triggered.
// NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting
.pollDelay(Duration.ofMillis(2000))
.untilAsserted(() -> {
// Abruptly re-open the managed ledger without graceful close
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
try {
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");

// Abruptly re-open the managed ledger without graceful close
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");
assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));

assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
} finally {
factory2.shutdown();
}
});

factory1.shutdown();
factory2.shutdown();
}

@Test
public void testFlushCursorAfterIndividualDeleteInactivity() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setThrottleMarkDelete(1.0);

ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig();
factoryConfig.setCursorPositionFlushSeconds(1);
ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), factoryConfig);
ManagedLedger ledger1 = factory1.open("testFlushCursorAfterIndDelInactivity", config);
ManagedCursor c1 = ledger1.openCursor("c");
List<Position> positions = new ArrayList<Position>();

for (int i = 0; i < 20; i++) {
positions.add(ledger1.addEntry(new byte[1024]));
}

CountDownLatch latch = new CountDownLatch(positions.size());

positions.forEach(p -> c1.asyncDelete(p, new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
latch.countDown();
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
throw new RuntimeException(exception);
}
}, null));

latch.await();

assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1));

// reopen the cursor and we should see entries not be flushed
ManagedLedgerFactory dirtyFactory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ManagedLedger ledgerDirty = dirtyFactory.open("testFlushCursorAfterIndDelInactivity", config);
ManagedCursor dirtyCursor = ledgerDirty.openCursor("c");

assertNotEquals(dirtyCursor.getMarkDeletedPosition(), positions.get(positions.size() - 1));

Awaitility.await()
// Give chance to the flush to be automatically triggered.
// NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting
.pollDelay(Duration.ofMillis(2000))
.untilAsserted(() -> {
// Abruptly re-open the managed ledger without graceful close
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
try {
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterIndDelInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");

assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));

} finally {
factory2.shutdown();
}
});

factory1.shutdown();
dirtyFactory.shutdown();
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
Expand Down

0 comments on commit 34ca893

Please sign in to comment.