Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicated Subscription fail with IllegalStateException #10097

Closed
lhotari opened this issue Mar 31, 2021 · 1 comment · Fixed by #10098
Closed

Replicated Subscription fail with IllegalStateException #10097

lhotari opened this issue Mar 31, 2021 · 1 comment · Fixed by #10098
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@lhotari
Copy link
Member

lhotari commented Mar 31, 2021

Problem

This exception is repeated on the log when using replicated subscriptions:

07:17:59.770 [bookkeeper-ml-workers-OrderedExecutor-4-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://georep/default/t1][cluster-a -> cluster-b] Unexpected exception: Field 'replicated_from' is not set
java.lang.IllegalStateException: Field 'replicated_from' is not set
at org.apache.pulsar.common.api.proto.MessageMetadata.getReplicatedFrom(MessageMetadata.java:151) ~[org.apache.pulsar-pulsar-common-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentReplicator.checkReplicatedSubscriptionMarker(PersistentReplicator.java:763) ~[org.apache.pulsar-pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:366) ~[org.apache.pulsar-pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[org.apache.pulsar-managed-ledger-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.13.0.jar:4.13.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.13.0.jar:4.13.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]

To reproduce

  1. Create 2-cluster k8s deployment (sample script/helm to do so)
  2. Create consumer with replicated subscription for topic georep/default/t1 in cluster-a and close it
  3. Create consumer with replicated subscription for topic georep/default/t1 in cluster-b and close it
  4. Create producer for topic georep/default/t1 in cluster-a and publish messages to the topic
  5. Create consumer with replicated subscription for topic georep/default/t1 in cluster-a, consume 1 message and close it
  6. Create consumer with replicated subscription for topic georep/default/t1 in cluster-b, consume 1 message and close it

It might be possible to reproduce the issue with fewer steps.

Observations

It seems that the code location broker after the switch to LightProto (#9046).
The fix is easy for the exception above. The concern is the lack of test coverage for replicated subscriptions.

@lhotari
Copy link
Member Author

lhotari commented Mar 31, 2021

After fixing the exception in the description with #10098 changes, a new issue appears:

11:38:04.969 [broker-topic-workers-OrderedScheduler-3-0] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
at io.netty.buffer.UnsafeByteBufUtil.getBytes(UnsafeByteBufUtil.java:481) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
at io.netty.buffer.PooledUnsafeDirectByteBuf.getBytes(PooledUnsafeDirectByteBuf.java:130) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1147) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
at org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250) ~[org.apache.pulsar-pulsar-common-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.common.api.proto.ClusterMessageId.getCluster(ClusterMessageId.java:19) ~[org.apache.pulsar-pulsar-common-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.localSubscriptionUpdated(ReplicatedSubscriptionsController.java:116) ~[org.apache.pulsar-pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$acknowledgeMessage$8(PersistentSubscription.java:358) ~[org.apache.pulsar-pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_282]
at org.apache.pulsar.broker.service.persistent.PersistentSubscription.acknowledgeMessage(PersistentSubscription.java:358) ~[org.apache.pulsar-pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:114) ~[org.apache.pulsar-pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:200) ~[org.apache.pulsar-pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:146) ~[org.apache.pulsar-pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.13.0.jar:4.13.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_282]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_282]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_282]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]

sijie pushed a commit that referenced this issue Apr 9, 2021
Fixes: #10097 

### Motivation

See #10097 for the issue. It seems that the code broke when the switch was made to LightProto in #9046.

### Modifications

It is necessary to use `msg.getMessageBuilder().hasReplicatedFrom()` and use logic that only calls `msg.getMessageBuilder().getReplicatedFrom()` if `hasReplicatedFrom()` returns true.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant