Skip to content

[Bug] keyshared will block the pulsar-io thread #17437

@leizhiyuan

Description

@leizhiyuan

Search before asking

  • I searched in the issues and found nothing similar.

Version

2.7.2

Minimal reproduce step

no way

What did you expect to see?

no

What did you see instead?

[2] Busy(98.1%) thread(7338/0x1caa) stack of java process(5965) under user(root):

"pulsar-io-23-57" #514 prio=5 os_prio=0 tid=0x00007fe1b8072800 nid=0x1caa waiting for monitor entry [0x00007fdee5691000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:257)
	- waiting to lock <0x000000034d3e74c0> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.lambda$sendMessagesToConsumers$1(PersistentStickyKeyDispatcherMultipleConsumers.java:228)
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers$$Lambda$1159/1048711785.operationComplete(Unknown Source)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
	at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
	at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717)
	at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272)
	at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352)
	at io.netty.channel.epoll.AbstractEpollStreamChannel.writeBytesMultiple(AbstractEpollStreamChannel.java:305)
	at io.netty.channel.epoll.AbstractEpollStreamChannel.doWriteMultiple(AbstractEpollStreamChannel.java:510)
	at io.netty.channel.epoll.AbstractEpollStreamChannel.doWrite(AbstractEpollStreamChannel.java:422)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949)
	at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.flush0(AbstractEpollChannel.java:557)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
	at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
	at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$0(PulsarCommandSenderImpl.java:376)
	at org.apache.pulsar.broker.service.PulsarCommandSenderImpl$$Lambda$909/1669560270.run(Unknown Source)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

the lock object

"pulsar-io-23-10" #436 prio=5 os_prio=0 tid=0x00007fe1b801d800 nid=0x1c04 runnable [0x00007fdf12706000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TreeMap.getLastEntry(TreeMap.java:2141)
	at java.util.TreeMap.pollLastEntry(TreeMap.java:685)
	at java.util.TreeSet.pollLast(TreeSet.java:462)
	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.lambda$items$4(ConcurrentSortedLongPairSet.java:140)
	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet$$Lambda$1143/2030611489.accept(Unknown Source)
	at org.apache.pulsar.common.util.collections.ConcurrentLongPairSet$Section.forEach(ConcurrentLongPairSet.java:430)
	at org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.forEach(ConcurrentLongPairSet.java:159)
	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.items(ConcurrentSortedLongPairSet.java:137)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentDispatcherMultipleConsumers.java:837)
	- locked <0x000000034d3e74c0> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentStickyKeyDispatcherMultipleConsumers.java:373)
	- eliminated <0x000000034d3e74c0> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:329)
	- locked <0x000000034d3e74c0> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.lambda$sendMessagesToConsumers$1(PersistentStickyKeyDispatcherMultipleConsumers.java:228)
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers$$Lambda$1159/1048711785.operationComplete(Unknown Source)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
	at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
	at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717)
	at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272)
	at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352)
	at io.netty.channel.epoll.AbstractEpollStreamChannel.writeBytesMultiple(AbstractEpollStreamChannel.java:305)
	at io.netty.channel.epoll.AbstractEpollStreamChannel.doWriteMultiple(AbstractEpollStreamChannel.java:510)
	at io.netty.channel.epoll.AbstractEpollStreamChannel.doWrite(AbstractEpollStreamChannel.java:422)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949)
	at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.flush0(AbstractEpollChannel.java:557)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
	at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
	at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$0(PulsarCommandSenderImpl.java:376)
	at org.apache.pulsar.broker.service.PulsarCommandSenderImpl$$Lambda$909/1669560270.run(Unknown Source)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

Anything else?

sh_219.log

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Staletype/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions