Skip to content

key_shared mode maybe deadlock and with a lot of CLOSE_WAIT #11964

@baomingyu

Description

@baomingyu

Describe the bug
With key_shared consumer mode, sometimes, there will be deadlock and there are a lot of CLOSE_WAIT status in broker

To Reproduce
Steps to reproduce the behavior:

  1. one topic whit 10 partition
  2. 10 producer ,and 100 consumer
  3. running , consumer reconnected
  4. there are a lot of CLOSE_WAIT status in broker, and printing stack info , there are deadlock info.

Additional context

Found one Java-level deadlock:

"pulsar-io-23-128":
  waiting to lock monitor 0x00007f1bcc016b28 (object 0x0000000448635570, a org.apache.pulsar.broker.service.persistent.PersistentSubscription),
  which is held by "pulsar-io-23-120"
"pulsar-io-23-120":
  waiting to lock monitor 0x00007f1ff0018868 (object 0x0000000448805cc0, a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers),
  which is held by "BookKeeperClientWorker-OrderedExecutor-26-0"
"BookKeeperClientWorker-OrderedExecutor-26-0":
  waiting for ownable synchronizer 0x00000004486515b8, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "pulsar-io-23-55"
"pulsar-io-23-55":
  waiting to lock monitor 0x00007f1ff0018868 (object 0x0000000448805cc0, a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers),
  which is held by "BookKeeperClientWorker-OrderedExecutor-26-0"

Java stack information for the threads listed above:

"pulsar-io-23-128":
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:180)
	- waiting to lock <0x0000000448635570> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:172)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:641)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$644/453555360.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
	at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:669)
	at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:1997)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:637)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:929)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$585/676684412.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$16(ServerCnx.java:905)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$582/1632106175.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:855)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:250)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:191)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:153)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
"pulsar-io-23-120":
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.addConsumer(PersistentStickyKeyDispatcherMultipleConsumers.java:107)
	- waiting to lock <0x0000000448805cc0> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:243)
	- locked <0x0000000448635570> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:172)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:641)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$644/453555360.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
	at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:669)
	at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:1997)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:637)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:929)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$585/676684412.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$16(ServerCnx.java:905)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$582/1632106175.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:855)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:250)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:191)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:153)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
"BookKeeperClientWorker-OrderedExecutor-26-0":
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000004486515b8> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1171)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntriesInOrder(PersistentDispatcherMultipleConsumers.java:349)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:312)
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.markDeletePositionMoveForward(PersistentStickyKeyDispatcherMultipleConsumers.java:359)
	- locked <0x0000000448805cc0> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.notifyTheMarkDeletePositionMoveForwardIfNeeded(PersistentSubscription.java:536)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.access$100(PersistentSubscription.java:77)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$2.deleteComplete(PersistentSubscription.java:523)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$19.markDeleteComplete(ManagedCursorImpl.java:1936)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$17.operationComplete(ManagedCursorImpl.java:1721)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.lambda$persistPositionToLedger$21(ManagedCursorImpl.java:2494)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$$Lambda$775/632662446.addComplete(Unknown Source)
	at org.apache.bookkeeper.client.AsyncCallback$AddCallback.addCompleteWithLatency(AsyncCallback.java:92)
	at org.apache.bookkeeper.client.PendingAddOp.submitCallback(PendingAddOp.java:431)
	at org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1823)
	at org.apache.bookkeeper.client.PendingAddOp.sendAddSuccessCallbacks(PendingAddOp.java:415)
	at org.apache.bookkeeper.client.PendingAddOp.writeComplete(PendingAddOp.java:409)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.writeComplete(PerChannelBookieClient.java:2123)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.handleResponse(PerChannelBookieClient.java:2180)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.handleV2Response(PerChannelBookieClient.java:2159)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$ReadV2ResponseCallback.safeRun(PerChannelBookieClient.java:1354)
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
"pulsar-io-23-55":
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.markDeletePositionMoveForward(PersistentStickyKeyDispatcherMultipleConsumers.java:355)
	- waiting to lock <0x0000000448805cc0> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.notifyTheMarkDeletePositionMoveForwardIfNeeded(PersistentSubscription.java:536)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.access$100(PersistentSubscription.java:77)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$2.deleteComplete(PersistentSubscription.java:523)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncDelete(ManagedCursorImpl.java:1886)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.acknowledgeMessage(PersistentSubscription.java:362)
	at org.apache.pulsar.broker.service.Consumer.messageAcked(Consumer.java:360)
	at org.apache.pulsar.broker.service.ServerCnx.handleAck(ServerCnx.java:1272)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:155)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:191)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:153)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions