Skip to content

Commit

Permalink
Copy list of subscriptions when checking for message expiration to av…
Browse files Browse the repository at this point in the history
…oid deadlocks (apache#8877)

Some of our broker servers experienced what appears to be a deadlock. The following is the thread dump at that time.

[threaddump.txt.zip](https://github.com/apache/pulsar/files/5665572/threaddump.txt.zip)

The thread "ForkJoinPool.commonPool-worker-120" was locking an instance of `ManagedLedgerImpl`. And this thread seemed to be waiting for `subscriptions`, which is an instance of `ConcurrentOpenHashMap`, to be unlocked. Many other threads were blocked because the lock on the `ManagedLedgerImpl` instance was not released.

```
"ForkJoinPool.commonPool-worker-120" apache#903 daemon prio=5 os_prio=0 tid=0x00007f9aa0010000 nid=0x12b59 waiting on condition [0x00007f9528cc3000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fa20b3e5eb0> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:650)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:720)
        - locked <0x00007fa20512f968> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:643)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$476/1880414247.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$475/707554512.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
```

The thread that locked `subscriptions` seems to be "pulsar-msg-expiry-monitor-24-1".
```
"pulsar-msg-expiry-monitor-24-1" apache#304 prio=5 os_prio=0 tid=0x00007f99602dd000 nid=0x12036 waiting on condition [0x00007f998d47c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fca4361dfb0> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNthEntry(ManagedCursorImpl.java:537)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:1820)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:901)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$36(PersistentTopic.java:1102)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1011/2104832020.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1102)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1009/2005752676.accept(Unknown Source)
        at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$32(BrokerService.java:951)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$779/1852910990.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:948)
        at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:925)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$108/203149502.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
```

I can't understand why "pulsar-msg-expiry-monitor-24-1" was stuck. However, it seems that this deadlock can be avoided if `subscriptions` is not locked when checking for message expiration, so I created this PR. If anyone can explain why "pulsar-msg-expiry-monitor-24-1" was stuck, please let me know.

When expiring messages for each subscription, copy the values of `subscriptions` as `List` and execute `forEach()` for that `List` instance.

(cherry picked from commit d08ac1d)
  • Loading branch information
Masahiro Sakamoto authored and eolivelli committed Dec 1, 2021
1 parent 59756f2 commit 2a893c9
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1216,8 +1216,9 @@ public void checkMessageExpiry() {
int message_ttl_in_seconds = getMessageTTL();

if (message_ttl_in_seconds != 0) {
subscriptions.forEach((subName, sub) -> sub.expireMessages(message_ttl_in_seconds));
replicators.forEach((region, replicator) -> ((PersistentReplicator)replicator).expireMessages(message_ttl_in_seconds));
subscriptions.values().forEach((sub) -> sub.expireMessages(message_ttl_in_seconds));
replicators.values().forEach((replicator)
-> ((PersistentReplicator) replicator).expireMessages(message_ttl_in_seconds));
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
Expand Down

0 comments on commit 2a893c9

Please sign in to comment.