Skip to content

Commit

Permalink
[improve] [broker] Do not call cursor.isCursorDataFullyPersistable if…
Browse files Browse the repository at this point in the history
… disabled dispatcherPauseOnAckStatePersistentEnabled (apache#22729)
  • Loading branch information
poorbarcode authored May 17, 2024
1 parent fd5916c commit 23d5e12
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,15 @@ public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {
@Override
public boolean checkAndResumeIfPaused() {
boolean paused = blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE;
// Calling "cursor.isCursorDataFullyPersistable()" will loop the collection "individualDeletedMessages". It is
// not a light method.
// If never enabled "dispatcherPauseOnAckStatePersistentEnabled", skip the following checks to improve
// performance.
if (!paused && !topic.isDispatcherPauseOnAckStatePersistentEnabled()){
// "true" means no need to pause.
return true;
}
// Enabled "dispatcherPauseOnAckStatePersistentEnabled" before.
boolean shouldPauseNow = !cursor.isCursorDataFullyPersistable()
&& topic.isDispatcherPauseOnAckStatePersistentEnabled();
// No need to change.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
Expand All @@ -38,6 +42,7 @@
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -549,4 +554,49 @@ public void testMultiConsumersPauseOnAckStatPersistNotAffectReplayRead(Subscript
c4.close();
admin.topics().delete(tpName, false);
}

@Test(dataProvider = "multiConsumerSubscriptionTypes")
public void testNeverCallCursorIsCursorDataFullyPersistableIfDisabledTheFeature(SubscriptionType subscriptionType)
throws Exception {
final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String mlName = TopicName.get(tpName).getPersistenceNamingEncoding();
final String subscription = "s1";
final int msgSendCount = 100;
// Inject a injection to record the counter of calling "cursor.isCursorDataFullyPersistable".
final ManagedLedgerImpl ml = (ManagedLedgerImpl) pulsar.getBrokerService().getManagedLedgerFactory().open(mlName);
final ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor(subscription);
final ManagedCursorImpl spyCursor = Mockito.spy(cursor);
AtomicInteger callingIsCursorDataFullyPersistableCounter = new AtomicInteger();
Mockito.doAnswer(invocation -> {
callingIsCursorDataFullyPersistableCounter.incrementAndGet();
return invocation.callRealMethod();
}).when(spyCursor).isCursorDataFullyPersistable();
final ManagedCursorContainer cursors = WhiteboxImpl.getInternalState(ml, "cursors");
final ManagedCursorContainer activeCursors = WhiteboxImpl.getInternalState(ml, "activeCursors");
cursors.removeCursor(cursor.getName());
activeCursors.removeCursor(cursor.getName());
cursors.add(spyCursor, null);
activeCursors.add(spyCursor, null);

// Pub & Sub.
Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription)
.isAckReceiptEnabled(true).subscriptionType(subscriptionType).subscribe();
Producer<String> p1 = pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create();
for (int i = 0; i < msgSendCount; i++) {
p1.send(Integer.valueOf(i).toString());
}
for (int i = 0; i < msgSendCount; i++) {
Message<String> m = c1.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(m);
c1.acknowledge(m);
}
// Verify: the counter of calling "cursor.isCursorDataFullyPersistable".
// In expected the counter should be "0", to avoid flaky, verify it is less than 5.
Assert.assertTrue(callingIsCursorDataFullyPersistableCounter.get() < 5);

// cleanup.
p1.close();
c1.close();
admin.topics().delete(tpName, false);
}
}

0 comments on commit 23d5e12

Please sign in to comment.