Skip to content

Commit ffd8a0c

Browse files
zhouyifan279BewareMyPowerlhotari
authored andcommitted
[fix][broker] First entry will be skipped if opening NonDurableCursor while trimmed ledger is adding first entry. (apache#24738)
Co-authored-by: Yunze Xu <xyzinfernity@163.com> Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit 7a120f3) (cherry picked from commit f427b97)
1 parent cf11b88 commit ffd8a0c

File tree

2 files changed

+61
-2
lines changed

2 files changed

+61
-2
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,12 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
4545
// Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
4646
// store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require
4747
// both ledgerId and entryId to be Long.max()
48-
if (startCursorPosition == null || startCursorPosition.compareTo(ledger.lastConfirmedEntry) > 0) {
48+
Pair<Position, Long> lastPositionCounter = ledger.getLastPositionAndCounter();
49+
if (startCursorPosition == null || startCursorPosition.compareTo(lastPositionCounter.getLeft()) > 0) {
4950
// Start from last entry
5051
switch (initialPosition) {
5152
case Latest:
52-
initializeCursorPosition(ledger.getLastPositionAndCounter());
53+
initializeCursorPosition(lastPositionCounter);
5354
break;
5455
case Earliest:
5556
initializeCursorPosition(ledger.getFirstPositionAndCounter());

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
22+
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER;
2223
import static org.testng.Assert.assertEquals;
2324
import static org.testng.Assert.assertFalse;
2425
import static org.testng.Assert.assertNotEquals;
@@ -50,6 +51,11 @@
5051
import org.apache.bookkeeper.mledger.Position;
5152
import org.apache.bookkeeper.mledger.PositionFactory;
5253
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
54+
import org.apache.commons.lang3.tuple.Pair;
55+
import org.apache.pulsar.common.api.proto.CommandSubscribe;
56+
import org.awaitility.Awaitility;
57+
import org.mockito.Mockito;
58+
import org.mockito.stubbing.Answer;
5359
import org.slf4j.Logger;
5460
import org.slf4j.LoggerFactory;
5561
import org.testng.Assert;
@@ -103,6 +109,58 @@ void testOpenNonDurableCursorAtNonExistentMessageId() throws Exception {
103109
ledger.close();
104110
}
105111

112+
@Test(timeOut = 20000)
113+
void testOpenNonDurableCursorWhileLedgerIsAddingFirstEntryAfterTrimmed() throws Exception {
114+
ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1)
115+
.setRetentionTime(0, TimeUnit.MILLISECONDS);
116+
config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
117+
@Cleanup
118+
ManagedLedgerImpl ledgerSpy =
119+
Mockito.spy((ManagedLedgerImpl) factory.open("non_durable_cursor_while_ledger_trimmed", config));
120+
121+
ledgerSpy.addEntry("message1".getBytes());
122+
123+
ledgerSpy.rollCurrentLedgerIfFull();
124+
Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() ->
125+
ledgerSpy.getLedgersInfoAsList().size() > 1
126+
);
127+
CompletableFuture<Void> trimFuture = new CompletableFuture<>();
128+
ledgerSpy.trimConsumedLedgersInBackground(trimFuture);
129+
trimFuture.join();
130+
131+
// Use (currentLedgerId, -1) as startCursorPosition after ledger was trimmed
132+
Position startCursorPosition = PositionFactory.create(ledgerSpy.getCurrentLedger().getId(), -1);
133+
assertTrue(startCursorPosition.compareTo(ledgerSpy.lastConfirmedEntry) > 0);
134+
135+
CountDownLatch getLastPositionLatch = new CountDownLatch(1);
136+
CountDownLatch newNonDurableCursorLatch = new CountDownLatch(1);
137+
Mockito.when(ledgerSpy.getLastPositionAndCounter()).then((Answer<Pair<Position, Long>>) invocation -> {
138+
newNonDurableCursorLatch.countDown();
139+
getLastPositionLatch.await();
140+
return Pair.of(ledgerSpy.lastConfirmedEntry, ENTRIES_ADDED_COUNTER_UPDATER.get(ledgerSpy));
141+
});
142+
143+
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<ManagedCursor>()
144+
.completeAsync(() ->
145+
new NonDurableCursorImpl(bkc, ledgerSpy, "my_test_cursor",
146+
startCursorPosition, CommandSubscribe.InitialPosition.Latest, false)
147+
);
148+
Position oldLastConfirmedEntry = ledgerSpy.lastConfirmedEntry;
149+
150+
// Wait until NonDurableCursorImpl constructor invokes ManagedLedgerImpl.getLastPositionAndCounter
151+
newNonDurableCursorLatch.await();
152+
// Add first entry after ledger was trimmed
153+
ledgerSpy.addEntry("message2".getBytes());
154+
assertTrue(oldLastConfirmedEntry.compareTo(ledgerSpy.lastConfirmedEntry) < 0);
155+
156+
// Unblock NonDurableCursorImpl constructor
157+
getLastPositionLatch.countDown();
158+
159+
// cursor should read from lastConfirmedEntry
160+
ManagedCursor cursor = cursorFuture.join();
161+
assertEquals(cursor.getReadPosition(), ledgerSpy.lastConfirmedEntry);
162+
}
163+
106164
@Test(timeOut = 20000)
107165
void testZNodeBypassed() throws Exception {
108166
ManagedLedger ledger = factory.open("my_test_ledger");

0 commit comments

Comments
 (0)