Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 16802: fix Repeated messages of shared dispatcher #16812

Merged
merged 10 commits into from Jul 28, 2022

Conversation

eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Jul 27, 2022

Changes: sendMessagesToConsumer must block reads of new entries otherwise we will end up in reading duplicates

Fixes #16802 #16693

Motivation

As described in #16802 it may happen that multiple calls to readMoreEntries are served before the execution of sendMessagesToConsumer
Unfortunately there is no control over the execution of readMoreEntries, it may happen in many different threadpools

Modifications

Add a flag to prevent concurrent execution of sendMessagesToConsumer with readMoreEntries.
Revert aa change to a test modified in #16802 to make the test pass

Verifying this change

This change is already covered by existing tests

  • doc-not-needed

Changes: sendMessagesToConsumer must block reads of new entries otherwise we will end up in reading duplicates
@eolivelli eolivelli self-assigned this Jul 27, 2022
@eolivelli eolivelli added this to the 2.11.0 milestone Jul 27, 2022
@eolivelli eolivelli added the release/blocker Indicate the PR or issue that should block the release until it gets resolved label Jul 27, 2022
break;
sendInProgress = true;
try {
if (needTrimAckedMessages()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this try block to another method to avoid so many code changes? For example,

    private void trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
        /* ... */
    }

    protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
        sendInProgress = true;
        try {
            trySendMessagesToConsumers(readType, entries);
        } finally {
            sendInProgress = false;
        }
        readMoreEntries();
    }

In addition, the following code in your original code can be removed because in the finally block sendInProgress would be false, then readMoreEntries will be called after that.

            if (entriesToDispatch == 0) {
                sendInProgress = false;
                readMoreEntries();
                return;
            }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for your quick fix!


protected synchronized void trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PersistentStickyKeyDispatcherMultipleConsumers should also override this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, thanks for the heads up
I will also add a comment for future people who will put their hands here

@@ -278,7 +278,6 @@ public void testDelayedDeliveryWithMultipleConcurrentReadEntries()
for (int i = 0; i < N; i++) {
msg = consumer.receive(10, TimeUnit.SECONDS);
receivedMsgs.add(msg.getValue());
consumer.acknowledge(msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to remove this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added that line in the PR that created this problem.

this line made the test pass,
with this "fix" now the test still passes

this test is very interesting because it runs many readMoreEntries() in a separate thread, simulating the mess that happens inside the broker

it is important to revert this change to the test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

@eolivelli
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@codelipenghui
Copy link
Contributor

@eolivelli

Please check the failed test

Error:  Tests run: 9, Failures: 1, Errors: 0, Skipped: 7, Time elapsed: 20.618 s <<< FAILURE! - in org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest
  Error:  testMessageRedelivery(org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest)  Time elapsed: 7.488 s  <<< FAILURE!
  java.lang.StackOverflowError
  	at org.mockito.internal.util.MockUtil.getMockHandler(MockUtil.java:73)
  	at org.mockito.internal.util.MockUtil.getMockName(MockUtil.java:128)
  	at org.mockito.internal.stubbing.defaultanswers.ReturnsEmptyValues.answer(ReturnsEmptyValues.java:75)
  	at org.mockito.internal.stubbing.defaultanswers.GloballyConfiguredAnswer.answer(GloballyConfiguredAnswer.java:25)
  	at org.mockito.Answers.answer(Answers.java:99)
  	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:110)
  	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
  	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
  	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
  	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
  	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.toString(PersistentSubscription.java:481)
  	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.lambda$filterEntriesForConsumer$1(AbstractBaseDispatcher.java:136)
  	at java.base/java.util.Optional.orElseGet(Optional.java:364)
  	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:136)
  	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:105)
  	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:276)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:552)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:545)
  	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest.lambda$testMessageRedelivery$8(PersistentStickyKeyDispatcherMultipleConsumersTest.java:404)
  	at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:42)
  	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
  	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
  	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
  	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
  	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
  	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:[841](https://github.com/apache/pulsar/runs/7549605041?check_suite_focus=true#step:10:842))
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:305)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:556)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:545)
  	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest.lambda$testMessageRedelivery$8(PersistentStickyKeyDispatcherMultipleConsumersTest.java:404)
  	at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:42)
  	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
  	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
  	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
  	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
  	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
  	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:841)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:305)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:556)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:545)
  	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest.lambda$testMessageRedelivery$8(PersistentStickyKeyDispatcherMultipleConsumersTest.java:404)
  	at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:42)
  	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
  	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
  	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
  	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
  	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
  	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:841)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:305)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:556)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:545)
  	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest.lambda$testMessageRedelivery$8(PersistentStickyKeyDispatcherMultipleConsumersTest.java:404)
  	at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:42)
  	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
  	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
  	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
  	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
  	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
  	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:841)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:305)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:556)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:545)
  	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest.lambda$testMessageRedelivery$8(PersistentStickyKeyDispatcherMultipleConsumersTest.java:404)
  	at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:42)
  	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
  	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
  	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
  	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
  	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
  	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:841)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:305)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:556)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:545)
  	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest.lambda$testMessageRedelivery$8(PersistentStickyKeyDispatcherMultipleConsumersTest.java:404)
  	at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:42)
  	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
  	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
  	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
  	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
  	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
  	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:841)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:305)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:556)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:545)
  	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest.lambda$testMessageRedelivery$8(PersistentStickyKeyDispatcherMultipleConsumersTest.java:404)
  	at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:42)
  	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
  	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
  	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
  	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
  	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
  	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:841)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:305)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:556)
  	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:545)
  	at

@eolivelli
Copy link
Contributor Author

@codelipenghui I am looking into the failed test.
it is mocking so many things, also the ServiceConfiguration object!

@eolivelli
Copy link
Contributor Author

@codelipenghui I have fixed the test, there was a problem in the KEY_SHARED dispatcher

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried to run some performance test on Shared and Key_Shared subscription. Seems to have no noticeable performance regression

@eolivelli eolivelli merged commit 825b68d into apache:master Jul 28, 2022
@eolivelli eolivelli deleted the fix/dispatch-shared-duplicates branch July 28, 2022 19:03
@eolivelli
Copy link
Contributor Author

@codelipenghui @BewareMyPower I have merged the PR
thank you very much for your help !

@@ -89,7 +89,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;

protected boolean sendInProgress;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flag should be volatile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Itbis guarded by synchronized blocks. No need to makenit volatile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misunderstood something, you're right.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.10 cherry-picked/branch-2.11 doc-not-needed Your PR changes do not impact docs release/blocker Indicate the PR or issue that should block the release until it gets resolved release/2.10.5 release/2.11.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Repeated messages of shared dispatcher
6 participants