Skip to content

Commit 9c83d13

Browse files
Technoboy-srinath-ctds
authored andcommitted
[fix][ml] Fix getNumberOfEntries may point to deleted ledger (apache#24852)
(cherry picked from commit a85a064)
1 parent 6c5c797 commit 9c83d13

File tree

3 files changed

+44
-6
lines changed

3 files changed

+44
-6
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3716,11 +3716,17 @@ public long getNumberOfEntries(Range<Position> range) {
37163716
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
37173717

37183718
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
3719-
// If the 2 positions are in the same ledger
3720-
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3721-
count += fromIncluded ? 1 : 0;
3722-
count += toIncluded ? 1 : 0;
3723-
return count;
3719+
LedgerInfo li = ledgers.get(toPosition.getLedgerId());
3720+
if (li != null) {
3721+
// If the 2 positions are in the same ledger
3722+
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3723+
count += fromIncluded ? 1 : 0;
3724+
count += toIncluded ? 1 : 0;
3725+
return count;
3726+
} else {
3727+
// if the ledgerId is not in the ledgers, it means it has been deleted
3728+
return 0;
3729+
}
37243730
} else {
37253731
long count = 0;
37263732
// If the from & to are pointing to different ledgers, then we need to :

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import static org.testng.Assert.assertSame;
4343
import static org.testng.Assert.assertTrue;
4444
import static org.testng.Assert.fail;
45+
import com.google.common.collect.Range;
4546
import com.google.common.collect.Sets;
4647
import io.netty.buffer.ByteBuf;
4748
import io.netty.buffer.ByteBufAllocator;
@@ -2667,6 +2668,37 @@ public void testGetNumberOfEntriesInStorage() throws Exception {
26672668
assertEquals(length, numberOfEntries);
26682669
}
26692670

2671+
@Test
2672+
public void testGetNumberOfEntries() throws Exception {
2673+
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
2674+
initManagedLedgerConfig(managedLedgerConfig);
2675+
managedLedgerConfig.setMaxEntriesPerLedger(5);
2676+
ManagedLedgerImpl managedLedger =
2677+
(ManagedLedgerImpl) factory.open("testGetNumberOfEntries", managedLedgerConfig);
2678+
// open cursor to prevent ledger to be deleted when ledger rollover
2679+
ManagedCursorImpl managedCursor = (ManagedCursorImpl) managedLedger.openCursor("cursor");
2680+
int numberOfEntries = 10;
2681+
List<Position> positions = new ArrayList<>(numberOfEntries);
2682+
for (int i = 0; i < numberOfEntries; i++) {
2683+
positions.add(managedLedger.addEntry(("entry-" + i).getBytes(Encoding)));
2684+
}
2685+
Position mdPos = positions.get(numberOfEntries - 1);
2686+
Position rdPos = PositionFactory.create(mdPos.getLedgerId(), mdPos.getEntryId() + 1);
2687+
managedCursor.delete(positions);
2688+
// trigger ledger rollover and wait for the new ledger created
2689+
Awaitility.await().untilAsserted(() -> {
2690+
assertEquals("LedgerOpened", WhiteboxImpl.getInternalState(managedLedger, "state").toString());
2691+
});
2692+
managedLedger.rollCurrentLedgerIfFull();
2693+
Awaitility.await().untilAsserted(() -> {
2694+
assertEquals(managedLedger.getLedgersInfo().size(), 1);
2695+
assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened);
2696+
});
2697+
2698+
long length = managedLedger.getNumberOfEntries(Range.closed(mdPos, rdPos));
2699+
assertEquals(length, 0);
2700+
}
2701+
26702702
@Test
26712703
public void testEstimatedBacklogSize() throws Exception {
26722704
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testEstimatedBacklogSize");

pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2297,7 +2297,7 @@ public void testAcknowledgeWithReconnection() throws Exception {
22972297

22982298
Awaitility.await().untilAsserted(() ->
22992299
assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
2300-
5));
2300+
0));
23012301

23022302
// Make consumer reconnect to broker
23032303
admin.topics().unload(topicName);

0 commit comments

Comments
 (0)