Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[broker] Fix deadlock when adding consumer #7841

Merged
merged 2 commits into from
Aug 27, 2020
Merged

Conversation

massakam
Copy link
Contributor

@massakam massakam commented Aug 18, 2020

Motivation

A deadlock occurred on our Pulsar 2.4.2 broker server. The cause is the following two threads:

"prometheus-stats-36-1" #410 prio=5 os_prio=0 tid=0x00007f4b70019800 nid=0x30ca waiting for monitor entry [0x00007f4bbe3b7000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesDelayed(PersistentSubscription.java:1013)
        - waiting to lock <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$getTopicStats$8(NamespaceStatsAggregator.java:129)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$523/2109257042.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.getTopicStats(NamespaceStatsAggregator.java:122)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$0(NamespaceStatsAggregator.java:64)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$521/1017174654.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$1(NamespaceStatsAggregator.java:63)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$520/1098830264.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$generate$2(NamespaceStatsAggregator.java:62)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$316/212211274.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.generate(NamespaceStatsAggregator.java:59)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator.generate(PrometheusMetricsGenerator.java:73)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet.lambda$doGet$0(PrometheusMetricsServlet.java:70)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet$$Lambda$315/1221766138.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

"ForkJoinPool.commonPool-worker-104" #953 daemon prio=5 os_prio=0 tid=0x00007f4dc8030800 nid=0x3b87 waiting on condition [0x00007f48f6ce1000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f913d08b5c8> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
        at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:377)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.values(ConcurrentOpenHashMap.java:174)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNumberOfConsumers(PersistentTopic.java:1227)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.isConsumersExceededOnTopic(PersistentDispatcherMultipleConsumers.java:178)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:144)
        - locked <0x00007f91120dc258> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:238)
        - locked <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$451/1414070467.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
        at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$408/1168861154.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$406/1351396211.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:163)

prometheus-stats-36-1 was trying to lock PersistentSubscription (and PersistentDispatcherMultipleConsumers) after locking PersistentTopic#subscriptions, an instance of ConcurrentOpenHashMap.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java#L122-L129

On the other hand, ForkJoinPool.commonPool-worker-104 was trying to lock these instances in reverse order.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L176-L237
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L127-L144
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L159-L175
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1212-L1214

PersistentSubscription#getNumberOfEntriesDelayed() is no longer used in the master code, but it seems that this deadlock has not yet been resolved.

subscriptions.forEach((name, subscription) -> {
SubscriptionStats subStats = subscription.getStats(getPreciseBacklog);

public SubscriptionStats getStats(Boolean getPreciseBacklog) {
SubscriptionStats subStats = new SubscriptionStats();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
dispatcher.getConsumers().forEach(consumer -> {
ConsumerStats consumerStats = consumer.getStats();
subStats.consumers.add(consumerStats);
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
subStats.msgOutCounter += consumerStats.msgOutCounter;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
subStats.chuckedMessageRate += consumerStats.chuckedMessageRate;
subStats.unackedMessages += consumerStats.unackedMessages;
subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
});
}
subStats.type = getType();
if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
Consumer activeConsumer = ((PersistentDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer();
if (activeConsumer != null) {
subStats.activeConsumerName = activeConsumer.consumerName();
}
}
if (Subscription.isIndividualAckMode(subStats.type)) {
if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) dispatcher;
subStats.unackedMessages = d.getTotalUnackedMessages();
subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs();
subStats.msgDelayed = d.getNumberOfDelayedMessages();

Modifications

Moved the isConsumersExceededOnTopic() method to check the number of connected consumers when adding a consumer to the topic from the Dispatcher classes to the AbstractTopic class. Avoid the deadlock mentioned above by calling PersistentTopic#getNumberOfConsumers() before locking the PersistentSubscription instance.

@massakam massakam self-assigned this Aug 18, 2020
@massakam massakam added area/broker type/bug The PR fixed a bug or issue reported a bug labels Aug 18, 2020
@massakam massakam added this to the 2.7.0 milestone Aug 18, 2020
@massakam massakam force-pushed the fix-deadlock branch 2 times, most recently from 53c3d08 to 8962717 Compare August 20, 2020 15:57
if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log an error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie Added warning log here.

@massakam
Copy link
Contributor Author

PTAL

@sijie sijie merged commit 23dc5c7 into apache:master Aug 27, 2020
@massakam massakam deleted the fix-deadlock branch August 28, 2020 01:09
lbenc135 pushed a commit to lbenc135/pulsar that referenced this pull request Sep 5, 2020
### Motivation

A deadlock occurred on our Pulsar 2.4.2 broker server. The cause is the following two threads:
```
"prometheus-stats-36-1" apache#410 prio=5 os_prio=0 tid=0x00007f4b70019800 nid=0x30ca waiting for monitor entry [0x00007f4bbe3b7000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesDelayed(PersistentSubscription.java:1013)
        - waiting to lock <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$getTopicStats$8(NamespaceStatsAggregator.java:129)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$523/2109257042.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.getTopicStats(NamespaceStatsAggregator.java:122)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$0(NamespaceStatsAggregator.java:64)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$521/1017174654.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$1(NamespaceStatsAggregator.java:63)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$520/1098830264.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$generate$2(NamespaceStatsAggregator.java:62)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$316/212211274.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.generate(NamespaceStatsAggregator.java:59)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator.generate(PrometheusMetricsGenerator.java:73)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet.lambda$doGet$0(PrometheusMetricsServlet.java:70)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet$$Lambda$315/1221766138.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

"ForkJoinPool.commonPool-worker-104" apache#953 daemon prio=5 os_prio=0 tid=0x00007f4dc8030800 nid=0x3b87 waiting on condition [0x00007f48f6ce1000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f913d08b5c8> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
        at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:377)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.values(ConcurrentOpenHashMap.java:174)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNumberOfConsumers(PersistentTopic.java:1227)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.isConsumersExceededOnTopic(PersistentDispatcherMultipleConsumers.java:178)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:144)
        - locked <0x00007f91120dc258> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:238)
        - locked <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$451/1414070467.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
        at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$408/1168861154.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$406/1351396211.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:163)
```

`prometheus-stats-36-1` was trying to lock `PersistentSubscription` (and `PersistentDispatcherMultipleConsumers`) after locking `PersistentTopic#subscriptions`, an instance of `ConcurrentOpenHashMap`.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java#L122-L129

On the other hand, `ForkJoinPool.commonPool-worker-104` was trying to lock these instances in reverse order.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L176-L237
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L127-L144
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L159-L175
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1212-L1214

`PersistentSubscription#getNumberOfEntriesDelayed()` is no longer used in the master code, but it seems that this deadlock has not yet been resolved.
https://github.com/apache/pulsar/blob/e06e8726847584700d9e4fc98fd56a495eb05a23/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1528-L1529
https://github.com/apache/pulsar/blob/17ae233a5d0fa364048b7c30ec90b8f7291d0d07/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L967-L1000
https://github.com/apache/pulsar/blob/6e7d1a83c3c2737610f01cb372f61e2b830a62f7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L809

### Modifications

Moved the `isConsumersExceededOnTopic()` method to check the number of connected consumers when adding a consumer to the topic from the `Dispatcher` classes to the `AbstractTopic` class. Avoid the deadlock mentioned above by calling `PersistentTopic#getNumberOfConsumers()` before locking the `PersistentSubscription` instance.
lbenc135 pushed a commit to lbenc135/pulsar that referenced this pull request Sep 5, 2020
### Motivation

A deadlock occurred on our Pulsar 2.4.2 broker server. The cause is the following two threads:
```
"prometheus-stats-36-1" apache#410 prio=5 os_prio=0 tid=0x00007f4b70019800 nid=0x30ca waiting for monitor entry [0x00007f4bbe3b7000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesDelayed(PersistentSubscription.java:1013)
        - waiting to lock <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$getTopicStats$8(NamespaceStatsAggregator.java:129)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$523/2109257042.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.getTopicStats(NamespaceStatsAggregator.java:122)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$0(NamespaceStatsAggregator.java:64)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$521/1017174654.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$1(NamespaceStatsAggregator.java:63)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$520/1098830264.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$generate$2(NamespaceStatsAggregator.java:62)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$316/212211274.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.generate(NamespaceStatsAggregator.java:59)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator.generate(PrometheusMetricsGenerator.java:73)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet.lambda$doGet$0(PrometheusMetricsServlet.java:70)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet$$Lambda$315/1221766138.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

"ForkJoinPool.commonPool-worker-104" apache#953 daemon prio=5 os_prio=0 tid=0x00007f4dc8030800 nid=0x3b87 waiting on condition [0x00007f48f6ce1000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f913d08b5c8> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
        at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:377)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.values(ConcurrentOpenHashMap.java:174)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNumberOfConsumers(PersistentTopic.java:1227)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.isConsumersExceededOnTopic(PersistentDispatcherMultipleConsumers.java:178)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:144)
        - locked <0x00007f91120dc258> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:238)
        - locked <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$451/1414070467.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
        at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$408/1168861154.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$406/1351396211.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:163)
```

`prometheus-stats-36-1` was trying to lock `PersistentSubscription` (and `PersistentDispatcherMultipleConsumers`) after locking `PersistentTopic#subscriptions`, an instance of `ConcurrentOpenHashMap`.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java#L122-L129

On the other hand, `ForkJoinPool.commonPool-worker-104` was trying to lock these instances in reverse order.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L176-L237
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L127-L144
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L159-L175
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1212-L1214

`PersistentSubscription#getNumberOfEntriesDelayed()` is no longer used in the master code, but it seems that this deadlock has not yet been resolved.
https://github.com/apache/pulsar/blob/e06e8726847584700d9e4fc98fd56a495eb05a23/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1528-L1529
https://github.com/apache/pulsar/blob/17ae233a5d0fa364048b7c30ec90b8f7291d0d07/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L967-L1000
https://github.com/apache/pulsar/blob/6e7d1a83c3c2737610f01cb372f61e2b830a62f7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L809

### Modifications

Moved the `isConsumersExceededOnTopic()` method to check the number of connected consumers when adding a consumer to the topic from the `Dispatcher` classes to the `AbstractTopic` class. Avoid the deadlock mentioned above by calling `PersistentTopic#getNumberOfConsumers()` before locking the `PersistentSubscription` instance.
lbenc135 pushed a commit to lbenc135/pulsar that referenced this pull request Sep 5, 2020
### Motivation

A deadlock occurred on our Pulsar 2.4.2 broker server. The cause is the following two threads:
```
"prometheus-stats-36-1" apache#410 prio=5 os_prio=0 tid=0x00007f4b70019800 nid=0x30ca waiting for monitor entry [0x00007f4bbe3b7000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesDelayed(PersistentSubscription.java:1013)
        - waiting to lock <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$getTopicStats$8(NamespaceStatsAggregator.java:129)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$523/2109257042.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.getTopicStats(NamespaceStatsAggregator.java:122)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$0(NamespaceStatsAggregator.java:64)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$521/1017174654.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$1(NamespaceStatsAggregator.java:63)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$520/1098830264.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$generate$2(NamespaceStatsAggregator.java:62)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$316/212211274.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.generate(NamespaceStatsAggregator.java:59)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator.generate(PrometheusMetricsGenerator.java:73)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet.lambda$doGet$0(PrometheusMetricsServlet.java:70)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet$$Lambda$315/1221766138.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

"ForkJoinPool.commonPool-worker-104" apache#953 daemon prio=5 os_prio=0 tid=0x00007f4dc8030800 nid=0x3b87 waiting on condition [0x00007f48f6ce1000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f913d08b5c8> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
        at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:377)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.values(ConcurrentOpenHashMap.java:174)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNumberOfConsumers(PersistentTopic.java:1227)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.isConsumersExceededOnTopic(PersistentDispatcherMultipleConsumers.java:178)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:144)
        - locked <0x00007f91120dc258> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:238)
        - locked <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$451/1414070467.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
        at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$408/1168861154.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$406/1351396211.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:163)
```

`prometheus-stats-36-1` was trying to lock `PersistentSubscription` (and `PersistentDispatcherMultipleConsumers`) after locking `PersistentTopic#subscriptions`, an instance of `ConcurrentOpenHashMap`.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java#L122-L129

On the other hand, `ForkJoinPool.commonPool-worker-104` was trying to lock these instances in reverse order.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L176-L237
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L127-L144
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L159-L175
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1212-L1214

`PersistentSubscription#getNumberOfEntriesDelayed()` is no longer used in the master code, but it seems that this deadlock has not yet been resolved.
https://github.com/apache/pulsar/blob/e06e8726847584700d9e4fc98fd56a495eb05a23/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1528-L1529
https://github.com/apache/pulsar/blob/17ae233a5d0fa364048b7c30ec90b8f7291d0d07/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L967-L1000
https://github.com/apache/pulsar/blob/6e7d1a83c3c2737610f01cb372f61e2b830a62f7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L809

### Modifications

Moved the `isConsumersExceededOnTopic()` method to check the number of connected consumers when adding a consumer to the topic from the `Dispatcher` classes to the `AbstractTopic` class. Avoid the deadlock mentioned above by calling `PersistentTopic#getNumberOfConsumers()` before locking the `PersistentSubscription` instance.
wolfstudy pushed a commit that referenced this pull request Oct 30, 2020
### Motivation

A deadlock occurred on our Pulsar 2.4.2 broker server. The cause is the following two threads:
```
"prometheus-stats-36-1" #410 prio=5 os_prio=0 tid=0x00007f4b70019800 nid=0x30ca waiting for monitor entry [0x00007f4bbe3b7000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesDelayed(PersistentSubscription.java:1013)
        - waiting to lock <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$getTopicStats$8(NamespaceStatsAggregator.java:129)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$523/2109257042.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.getTopicStats(NamespaceStatsAggregator.java:122)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$0(NamespaceStatsAggregator.java:64)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$521/1017174654.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$1(NamespaceStatsAggregator.java:63)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$520/1098830264.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$generate$2(NamespaceStatsAggregator.java:62)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$316/212211274.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.generate(NamespaceStatsAggregator.java:59)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator.generate(PrometheusMetricsGenerator.java:73)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet.lambda$doGet$0(PrometheusMetricsServlet.java:70)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet$$Lambda$315/1221766138.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

"ForkJoinPool.commonPool-worker-104" #953 daemon prio=5 os_prio=0 tid=0x00007f4dc8030800 nid=0x3b87 waiting on condition [0x00007f48f6ce1000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f913d08b5c8> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
        at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:377)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.values(ConcurrentOpenHashMap.java:174)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNumberOfConsumers(PersistentTopic.java:1227)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.isConsumersExceededOnTopic(PersistentDispatcherMultipleConsumers.java:178)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:144)
        - locked <0x00007f91120dc258> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:238)
        - locked <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$451/1414070467.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
        at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$408/1168861154.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$406/1351396211.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:163)
```

`prometheus-stats-36-1` was trying to lock `PersistentSubscription` (and `PersistentDispatcherMultipleConsumers`) after locking `PersistentTopic#subscriptions`, an instance of `ConcurrentOpenHashMap`.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java#L122-L129

On the other hand, `ForkJoinPool.commonPool-worker-104` was trying to lock these instances in reverse order.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L176-L237
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L127-L144
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L159-L175
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1212-L1214

`PersistentSubscription#getNumberOfEntriesDelayed()` is no longer used in the master code, but it seems that this deadlock has not yet been resolved.
https://github.com/apache/pulsar/blob/e06e8726847584700d9e4fc98fd56a495eb05a23/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1528-L1529
https://github.com/apache/pulsar/blob/17ae233a5d0fa364048b7c30ec90b8f7291d0d07/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L967-L1000
https://github.com/apache/pulsar/blob/6e7d1a83c3c2737610f01cb372f61e2b830a62f7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L809

### Modifications

Moved the `isConsumersExceededOnTopic()` method to check the number of connected consumers when adding a consumer to the topic from the `Dispatcher` classes to the `AbstractTopic` class. Avoid the deadlock mentioned above by calling `PersistentTopic#getNumberOfConsumers()` before locking the `PersistentSubscription` instance.

(cherry picked from commit 23dc5c7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker release/2.6.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants