Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix potential deadlock that can occur in addConsumer #5371

Merged

Conversation

jerrypeng
Copy link
Contributor

Motivation

We have observed a deadlock that happens when consumers are added because isConsumersExceededOnTopic and isConsumersExceededOnSubscription are blocking calls even though addConsumer is called in async fashion

Modifications

Make the calls isConsumersExceededOnTopic and isConsumersExceededOnSubscription nonblocking by only getting the data from cache and not going to ZK. There can be instances when the data is stale but it should be rare since the data is like already in cache. The more permanent solution would be to make those calls async and addConsumer async but that requires a big change. We are currently in the process of refactoring the MetadataStore interfaces so lets hold off on those big changes until that process is done.

@jerrypeng jerrypeng added the type/bug The PR fixed a bug or issue reported a bug label Oct 12, 2019
@jerrypeng jerrypeng added this to the 2.4.2 milestone Oct 12, 2019
@jerrypeng jerrypeng self-assigned this Oct 12, 2019
@codelipenghui
Copy link
Contributor

retest this please

@jerrypeng
Copy link
Contributor Author

rerun java8 tests
rerun integration tests

@massakam
Copy link
Contributor

Do we need to modify other dispatchers (such as PersistentDispatcherSingleActiveConsumer) in the same way?

@jerrypeng
Copy link
Contributor Author

@massakam good suggestion! Will make to change PersistentDispatcherSingleActiveConsumer as well.

@rdhabalia
Copy link
Contributor

can you add stacktrace where you have observed deadlock.

.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));

if (policies == null) {
policies = new Policies();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just return false instead of creating empty object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia I don't think that is correct because we still have to check serviceConfig.getMaxConsumersPerTopic() here:

https://github.com/apache/pulsar/pull/5371/files#diff-5cc001bf2b64aad097762e99c5b78e4aR171

@jerrypeng
Copy link
Contributor Author

@rdhabalia

"ForkJoinPool.commonPool-worker-269" #1005072 daemon prio=5 os_prio=0 tid=0x00007f9c94010800 nid=0x7f12 waiting on condition [0x00007f9c602c8000]
 5389    java.lang.Thread.State: TIMED_WAITING (parking)
 5390         at sun.misc.Unsafe.park(Native Method)
 5391         - parking to wait for  <0x00000007767804e8> (a java.util.concurrent.CompletableFuture$Signaller)
 5392         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
 5393         at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1695)
 5394         at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313)
 5395         at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1775)
 5396         at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 5397         at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:95)
 5398         at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.isConsumersExceededOnSubscription(PersistentDispatcherMultipleConsumers.java:177)
 5399         at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:145)
 5400         - locked <0x000000068e2dfad8> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
 5401         at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:225)
 5402         - locked <0x000000068e2dec90> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
 5403         at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$9(PersistentTopic.java:526)
 5404         at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$671/51359314.accept(Unknown Source)
 5405         at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
 5406         at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:669)
 5407         at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:1997)
 5408         at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:520)
 5409         at org.apache.pulsar.broker.service.ServerCnx.lambda$null$9(ServerCnx.java:697)
 5410         at org.apache.pulsar.broker.service.ServerCnx$$Lambda$629/738438293.apply(Unknown Source)
 5411         at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
 5412         at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 5413         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 5414         at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 5415         at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$null$5(BookkeeperSchemaStorage.java:188)
 5416         at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage$$Lambda$587/1869295206.apply(Unknown Source)
 5417         at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
 5418         at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 5419         at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
 5420         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
 5421         at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
 5422         at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
 5423         at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Many threads in the ForkJoinPool are waiting for lock 0x000000068e2dec90 that is locked by the above thread

@jerrypeng
Copy link
Contributor Author

Full jstack:

threads.txt

@jerrypeng
Copy link
Contributor Author

rerun integration tests

@jerrypeng
Copy link
Contributor Author

rerun integration tests
rerun java8 tests

@jerrypeng
Copy link
Contributor Author

rerun integration tests

2 similar comments
@jerrypeng
Copy link
Contributor Author

rerun integration tests

@jerrypeng
Copy link
Contributor Author

rerun integration tests

Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@jerrypeng jerrypeng merged commit 590b068 into apache:master Oct 22, 2019
wolfstudy pushed a commit that referenced this pull request Nov 20, 2019
* fix potential deadlock that can occur in addConsumer

* add fix to PersistentDispatcherSingleActiveConsumer

* fixing tests

(cherry picked from commit 590b068)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants