Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky-test: MessageDispatchThrottlingTest.testBacklogConsumerCacheReads #16979

Closed
codelipenghui opened this issue Aug 8, 2022 · 11 comments · Fixed by #17105
Closed

Flaky-test: MessageDispatchThrottlingTest.testBacklogConsumerCacheReads #16979

codelipenghui opened this issue Aug 8, 2022 · 11 comments · Fixed by #17105

Comments

@codelipenghui
Copy link
Contributor

example failure

  Error:  Tests run: 64, Failures: 16, Errors: 0, Skipped: 48, Time elapsed: 604.061 s <<< FAILURE! - in org.apache.pulsar.client.api.MessageDispatchThrottlingTest
  Error:  testBacklogConsumerCacheReads(org.apache.pulsar.client.api.MessageDispatchThrottlingTest)  Time elapsed: 300.011 s  <<< FAILURE!
  org.testng.internal.thread.ThreadTimeoutException: Method org.apache.pulsar.client.api.MessageDispatchThrottlingTest.testBacklogConsumerCacheReads() didn't finish within the time-out 300000
  	at org.testng.internal.MethodInvocationHelper.invokeWithTimeoutWithNewExecutor(MethodInvocationHelper.java:371)
  	at org.testng.internal.MethodInvocationHelper.invokeWithTimeout(MethodInvocationHelper.java:282)
  	at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:605)
  	at org.testng.internal.TestInvoker.retryFailed(TestInvoker.java:214)
  	at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:58)
  	at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
  	at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
  	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
  	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
  	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
  	at org.testng.TestRunner.privateRun(TestRunner.java:764)
  	at org.testng.TestRunner.run(TestRunner.java:585)
  	at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
  	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
  	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
  	at org.testng.SuiteRunner.run(SuiteRunner.java:286)
  	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
  	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
  	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
  	at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
  	at org.testng.TestNG.runSuites(TestNG.java:1069)
  	at org.testng.TestNG.run(TestNG.java:1037)
  	at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:135)
  	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeSingleClass(TestNGDirectoryTestSuite.java:112)
  	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeLazy(TestNGDirectoryTestSuite.java:123)
  	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.execute(TestNGDirectoryTestSuite.java:90)
  	at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:146)
  	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
  	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
  	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
  	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
@leizhiyuan
Copy link
Contributor

leizhiyuan commented Aug 8, 2022

sometimes I can reproduce this in local machine

image

It should be 200*10

I print some log in messagelistener. It seems there has a bug in recent change

2022-08-08T16:57:10,898 - ERROR - [broker-topic-workers-OrderedExecutor-7-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-1}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 3
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[classes/:?]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:587) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:558) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$6(PersistentDispatcherMultipleConsumers.java:548) ~[classes/:?]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
2022-08-08T16:57:10,919 - ERROR - [broker-topic-workers-OrderedExecutor-7-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-1}] [-1] Failed to parse message metadata
java.lang.IndexOutOfBoundsException: readerIndex(42) + length(4) exceeds writerIndex(44): UnpooledDuplicatedByteBuf(ridx: 42, widx: 44, cap: 44/44, unwrapped: UnpooledHeapByteBuf(ridx: 0, widx: 44, cap: 44/44))
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
	at io.netty.buffer.AbstractByteBuf.skipBytes(AbstractByteBuf.java:971) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:267) ~[classes/:?]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.lambda$filterEntriesForConsumer$1(AbstractBaseDispatcher.java:117) ~[classes/:?]
	at java.util.Optional.orElseGet(Optional.java:364) ~[?:?]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:115) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:657) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:558) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$6(PersistentDispatcherMultipleConsumers.java:548) ~[classes/:?]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

@leizhiyuan
Copy link
Contributor

org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl#insert

image

image

maybe this should return true ,because if entry is in cache , we should release the entry.

after changed this , I can not reproduce this issue.

@codelipenghui
Copy link
Contributor Author

@leizhiyuan Do you want to create a PR to fix the issue?

@leizhiyuan
Copy link
Contributor

@leizhiyuan Do you want to create a PR to fix the issue?

ok

michaeljmarshall pushed a commit that referenced this issue Aug 9, 2022
* feat:return true if entry exists in cache

* feat:return true if entry exists in cache

* chore: add comment

Co-authored-by: zhiyuanlei <zhiyuanlei@tencent.com>

Master Issue: #16979

### Motivation

PR #12258 made changes to OpAddEntry which causes a memory leak when the entry is already in the cache.

### Modifications

Modifications: Revert PR #12258 changes to OpAddEntry

### Documentation

- [x] `doc-not-needed`
michaeljmarshall pushed a commit that referenced this issue Aug 9, 2022
* feat:return true if entry exists in cache

* feat:return true if entry exists in cache

* chore: add comment

Co-authored-by: zhiyuanlei <zhiyuanlei@tencent.com>

Master Issue: #16979

### Motivation

PR #12258 made changes to OpAddEntry which causes a memory leak when the entry is already in the cache.

### Modifications

Modifications: Revert PR #12258 changes to OpAddEntry

### Documentation

- [x] `doc-not-needed`

(cherry picked from commit 374b3a1)
michaeljmarshall pushed a commit to datastax/pulsar that referenced this issue Aug 9, 2022
* feat:return true if entry exists in cache

* feat:return true if entry exists in cache

* chore: add comment

Co-authored-by: zhiyuanlei <zhiyuanlei@tencent.com>

Master Issue: apache#16979

### Motivation

PR apache#12258 made changes to OpAddEntry which causes a memory leak when the entry is already in the cache.

### Modifications

Modifications: Revert PR apache#12258 changes to OpAddEntry

### Documentation

- [x] `doc-not-needed`

(cherry picked from commit 374b3a1)
@mattisonchao
Copy link
Member

Looks like this problem was fixed by PR #16996

Apache Pulsar / Flaky Tests automation moved this from To do to Done Aug 10, 2022
@michaeljmarshall
Copy link
Member

I just ran into the following error while running testBacklogConsumerCacheReads on my MacOS on a branch that includes #16996. Looks like there might be a separate issue, too. I'm going to start looking into what could be wrong.

java.lang.IllegalArgumentException: Invalid unknonwn tag type: 3
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[classes/:?]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1874) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:593) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:567) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$6(PersistentDispatcherMultipleConsumers.java:553) ~[classes/:?]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
2022-08-12T16:40:03,155 - INFO  - [pulsar-io-51-2:ServerCnx@1006] - [/127.0.0.1:49767] Subscribing on topic persistent://my-property/my-ns/cache-read / sub-4
2022-08-12T16:40:03,155 - ERROR - [broker-topic-workers-OrderedExecutor-11-0:Commands@1860] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-3}] [-1] Failed to parse message metadata
java.lang.IndexOutOfBoundsException: readerIndex(39) + length(8) exceeds writerIndex(44): UnpooledDuplicatedByteBuf(ridx: 39, widx: 44, cap: 44/44, unwrapped: UnpooledHeapByteBuf(ridx: 0, widx: 44, cap: 44/44))
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
	at io.netty.buffer.AbstractByteBuf.skipBytes(AbstractByteBuf.java:971) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:260) ~[classes/:?]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1874) ~[classes/:?]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.lambda$filterEntriesForConsumer$1(AbstractBaseDispatcher.java:117) ~[classes/:?]
	at java.util.Optional.orElseGet(Optional.java:364) ~[?:?]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:115) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:663) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:567) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$6(PersistentDispatcherMultipleConsumers.java:553) ~[classes/:?]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

@michaeljmarshall
Copy link
Member

michaeljmarshall commented Aug 12, 2022

Reopening this ticket because I am consistently getting this test to fail on my local machine.

I also just go this variant:

2022-08-12T16:51:21,672 - ERROR - [broker-topic-workers-OrderedExecutor-7-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-2}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 3
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[classes/:?]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:599) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:573) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$5(PersistentDispatcherMultipleConsumers.java:555) ~[classes/:?]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
2022-08-12T16:51:21,682 - ERROR - [broker-topic-workers-OrderedExecutor-7-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-2}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[classes/:?]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.lambda$filterEntriesForConsumer$1(AbstractBaseDispatcher.java:117) ~[classes/:?]
	at java.util.Optional.orElseGet(Optional.java:364) ~[?:?]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:115) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:669) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:573) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$5(PersistentDispatcherMultipleConsumers.java:555) ~[classes/:?]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

Seems like we have a data race. Still digging.

Apache Pulsar / Flaky Tests automation moved this from Done to To do Aug 12, 2022
@michaeljmarshall
Copy link
Member

Seems like this might be related to #10924.

@michaeljmarshall
Copy link
Member

#10924 is not related. Just before calling the parseFrom in the stack trace, we call skipBrokerEntryMetadataIfExist.

public static void parseMessageMetadata(ByteBuf buffer, MessageMetadata msgMetadata) {
// initially reader-index may point to start of broker entry metadata :
// increment reader-index to start_of_headAndPayload to parse metadata
skipBrokerEntryMetadataIfExist(buffer);
// initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata
// to parse metadata
skipChecksumIfPresent(buffer);
int metadataSize = (int) buffer.readUnsignedInt();
msgMetadata.parseFrom(buffer, metadataSize);
}

@michaeljmarshall
Copy link
Member

It's clear to me now that the issue is a data race with the entries array. After #16603, entries are published to another thread. I believe they are "safely published" because the calling thread is within a synchronized block and the receiving thread passes the entries array to a synchronized method:

dispatchMessagesThread.execute(safeRun(() -> {
if (sendMessagesToConsumers(readType, entries)) {

The fundamental race comes from the caching. When an entry is expired from the cache, we call release(), which deallocates the object.

Here is another stack trace I found from running the test. It looks like we already found similar stack traces here #14436. My analysis aligns with @lhotari's. I'll have a PR up soon.

2022-08-12T23:52:24,841 - ERROR - [broker-topic-workers-OrderedExecutor-3-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-1}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 7
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[classes/:?]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:599) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:573) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$5(PersistentDispatcherMultipleConsumers.java:556) ~[classes/:?]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
2022-08-12T23:52:24,874 - ERROR - [broker-topic-workers-OrderedExecutor-3-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-1}] [-1] Failed to parse message metadata
java.lang.IndexOutOfBoundsException: readerIndex(44) + length(1) exceeds writerIndex(44): UnpooledDuplicatedByteBuf(ridx: 44, widx: 44, cap: 44/44, unwrapped: UnpooledHeapByteBuf(ridx: 0, widx: 44, cap: 44/44))
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
	at io.netty.buffer.AbstractByteBuf.readByte(AbstractByteBuf.java:730) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
	at org.apache.pulsar.common.api.proto.LightProtoCodec.readVarInt(LightProtoCodec.java:140) ~[classes/:?]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1234) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.lambda$filterEntriesForConsumer$1(AbstractBaseDispatcher.java:117) ~[classes/:?]
	at java.util.Optional.orElseGet(Optional.java:364) ~[?:?]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:115) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:669) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:573) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$5(PersistentDispatcherMultipleConsumers.java:556) ~[classes/:?]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
2022-08-12T23:52:24,865 - ERROR - [broker-topic-workers-OrderedExecutor-2-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-0}] [-1] Failed to parse message metadata
java.lang.IllegalStateException: Some required fields are missing
	at org.apache.pulsar.common.api.proto.MessageMetadata.checkRequiredFields(MessageMetadata.java:1378) ~[classes/:?]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1373) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:599) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:573) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$5(PersistentDispatcherMultipleConsumers.java:556) ~[classes/:?]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

@michaeljmarshall
Copy link
Member

I wrote michaeljmarshall@8f48cf2 thinking that it would solve the issue. However, I am still getting the same exceptions, so there must be something I am missing. I'll continue on this tomorrow or Monday.

Apache Pulsar / Flaky Tests automation moved this from To do to Done Aug 16, 2022
michaeljmarshall added a commit that referenced this issue Aug 16, 2022
…17105)

Fixes #16979 

### Motivation

#12258 introduced caching for backlogged consumers. When caching the entry, it is important to duplicate the `ByteBuffer` so that the reader index is not shared. The current code has a race condition where the `ByteBuffer` reference in the cache is shared with the dispatcher. When another consumer reads from the cache, the cache calls `duplicate()` on the shared `ByteBuffer`, which copies the current reader index, which might not be 0 if the original dispatcher read data from the `ByteBuffer`.

Note: it seems like the caching `insert` method creates (or recycles) more `EntryImpl` instances than is really necessary. Changing that is outside this PR's scope, so I am going to leave it as is.

### Modifications

* Create a new `Entry` before inserting it into the cache.
* Add a new test to the `EntryCacheTest`. The test fails before this change and passes after it.
* Update the `EntryCacheTest` mocking so that it returns unique entries when mocking reads from the bookkeeper. Before, all returned `LedgerEntry` objects had ledgerId 0 and entryId 0, which messed with the caching for the new test.

### Verifying this change

This change includes a test that failed before the PR and passes after it.

### Documentation

- [x] `doc-not-needed`
michaeljmarshall added a commit that referenced this issue Aug 16, 2022
…17105)

Fixes #16979

### Motivation

#12258 introduced caching for backlogged consumers. When caching the entry, it is important to duplicate the `ByteBuffer` so that the reader index is not shared. The current code has a race condition where the `ByteBuffer` reference in the cache is shared with the dispatcher. When another consumer reads from the cache, the cache calls `duplicate()` on the shared `ByteBuffer`, which copies the current reader index, which might not be 0 if the original dispatcher read data from the `ByteBuffer`.

Note: it seems like the caching `insert` method creates (or recycles) more `EntryImpl` instances than is really necessary. Changing that is outside this PR's scope, so I am going to leave it as is.

### Modifications

* Create a new `Entry` before inserting it into the cache.
* Add a new test to the `EntryCacheTest`. The test fails before this change and passes after it.
* Update the `EntryCacheTest` mocking so that it returns unique entries when mocking reads from the bookkeeper. Before, all returned `LedgerEntry` objects had ledgerId 0 and entryId 0, which messed with the caching for the new test.

### Verifying this change

This change includes a test that failed before the PR and passes after it.

### Documentation

- [x] `doc-not-needed`

(cherry picked from commit 76f4195)
michaeljmarshall added a commit to datastax/pulsar that referenced this issue Aug 16, 2022
…pache#17105)

Fixes apache#16979

### Motivation

apache#12258 introduced caching for backlogged consumers. When caching the entry, it is important to duplicate the `ByteBuffer` so that the reader index is not shared. The current code has a race condition where the `ByteBuffer` reference in the cache is shared with the dispatcher. When another consumer reads from the cache, the cache calls `duplicate()` on the shared `ByteBuffer`, which copies the current reader index, which might not be 0 if the original dispatcher read data from the `ByteBuffer`.

Note: it seems like the caching `insert` method creates (or recycles) more `EntryImpl` instances than is really necessary. Changing that is outside this PR's scope, so I am going to leave it as is.

### Modifications

* Create a new `Entry` before inserting it into the cache.
* Add a new test to the `EntryCacheTest`. The test fails before this change and passes after it.
* Update the `EntryCacheTest` mocking so that it returns unique entries when mocking reads from the bookkeeper. Before, all returned `LedgerEntry` objects had ledgerId 0 and entryId 0, which messed with the caching for the new test.

### Verifying this change

This change includes a test that failed before the PR and passes after it.

### Documentation

- [x] `doc-not-needed`

(cherry picked from commit 76f4195)
Technoboy- pushed a commit to merlimat/pulsar that referenced this issue Aug 16, 2022
* feat:return true if entry exists in cache

* feat:return true if entry exists in cache

* chore: add comment

Co-authored-by: zhiyuanlei <zhiyuanlei@tencent.com>

Master Issue: apache#16979

### Motivation

PR apache#12258 made changes to OpAddEntry which causes a memory leak when the entry is already in the cache.

### Modifications

Modifications: Revert PR apache#12258 changes to OpAddEntry

### Documentation

- [x] `doc-not-needed`
Technoboy- pushed a commit to merlimat/pulsar that referenced this issue Aug 16, 2022
…pache#17105)

Fixes apache#16979 

### Motivation

apache#12258 introduced caching for backlogged consumers. When caching the entry, it is important to duplicate the `ByteBuffer` so that the reader index is not shared. The current code has a race condition where the `ByteBuffer` reference in the cache is shared with the dispatcher. When another consumer reads from the cache, the cache calls `duplicate()` on the shared `ByteBuffer`, which copies the current reader index, which might not be 0 if the original dispatcher read data from the `ByteBuffer`.

Note: it seems like the caching `insert` method creates (or recycles) more `EntryImpl` instances than is really necessary. Changing that is outside this PR's scope, so I am going to leave it as is.

### Modifications

* Create a new `Entry` before inserting it into the cache.
* Add a new test to the `EntryCacheTest`. The test fails before this change and passes after it.
* Update the `EntryCacheTest` mocking so that it returns unique entries when mocking reads from the bookkeeper. Before, all returned `LedgerEntry` objects had ledgerId 0 and entryId 0, which messed with the caching for the new test.

### Verifying this change

This change includes a test that failed before the PR and passes after it.

### Documentation

- [x] `doc-not-needed`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging a pull request may close this issue.

4 participants