Skip to content

Commit

Permalink
[fix][broker] Avoid expired unclosed ledgers when checking expired me…
Browse files Browse the repository at this point in the history
…ssages by ledger closure time (#22335)
  • Loading branch information
coderzc committed Mar 27, 2024
1 parent be0a9d9 commit f77fe5f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTL
managedLedger.getLedgersInfo().lastKey(), true);
MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) {
if (!ledgerInfo.hasTimestamp() || !MessageImpl.isEntryExpired(messageTTLInSeconds,
ledgerInfo.getTimestamp())) {
if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0L
|| !MessageImpl.isEntryExpired(messageTTLInSeconds, ledgerInfo.getTimestamp())) {
break;
}
info = ledgerInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -46,7 +44,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -59,6 +59,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand All @@ -72,11 +73,10 @@
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;

@Test(groups = "broker")
public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {

Expand Down Expand Up @@ -463,6 +463,45 @@ public void testIncorrectClientClock() throws Exception {
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
}

@Test
public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable {
final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
int maxTTLSeconds = 1;
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(5);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
// set client clock to 10 days later
long incorrectPublishTimestamp = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10);
for (int i = 0; i < 7; i++) {
ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp));
}
assertEquals(ledger.getLedgersInfoAsList().size(), 2);
PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
AsyncCallbacks.MarkDeleteCallback markDeleteCallback =
(AsyncCallbacks.MarkDeleteCallback) spy(
FieldUtils.readDeclaredField(monitor, "markDeleteCallback", true));
FieldUtils.writeField(monitor, "markDeleteCallback", markDeleteCallback, true);

AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
Mockito.doAnswer(invocation -> {
ManagedLedgerException argument = invocation.getArgument(0, ManagedLedgerException.class);
throwableAtomicReference.set(argument);
return invocation.callRealMethod();
}).when(markDeleteCallback).markDeleteFailed(any(), any());

PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry();
c1.markDelete(position);
Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
monitor.expireMessages(maxTTLSeconds);
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);

Assert.assertNull(throwableAtomicReference.get());
}

@Test
void testMessageExpiryWithPosition() throws Exception {
final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";
Expand Down

0 comments on commit f77fe5f

Please sign in to comment.