Skip to content

Commit

Permalink
Fix unload namespaces bundle hangs. (#9116)
Browse files Browse the repository at this point in the history
### Motivation

Fix namespace bundle unloads hangs. In the BrokerService, we maintained a ConcurrentOpenHashMap for storing all topic references. In #8968 cleanup the topics when unloading namespace bundles, see https://github.com/apache/pulsar/pull/8968/files#diff-0210356c8a88e4efa89eb769a027fa6c166db479dbad8bbbbc704c6ed6e317f5R1572-R1579

Since StampedLock is not a reentrant and the method `foreach` of the ConcurrentOpenHashMap also acquire read lock, this might block the namespace unloading, here is the thread dump:

```
"pulsar-io-16-7" #132 prio=5 os_prio=31 tid=0x00007ff370ae2800 nid=0x1f603 waiting on condition [0x00007000121d0000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x0000000780a0be18> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.remove(ConcurrentOpenHashMap.java:306)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.access$200(ConcurrentOpenHashMap.java:180)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.remove(ConcurrentOpenHashMap.java:135)
	at org.apache.pulsar.broker.service.BrokerService.removeTopicFromCache(BrokerService.java:1658)
	at org.apache.pulsar.broker.service.BrokerService.lambda$cleanUnloadedTopicFromCache$61(BrokerService.java:1611)
	at org.apache.pulsar.broker.service.BrokerService$$Lambda$1003/2064147704.accept(Unknown Source)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
	at org.apache.pulsar.broker.service.BrokerService.cleanUnloadedTopicFromCache(BrokerService.java:1607)
	at org.apache.pulsar.broker.namespace.OwnedBundle.lambda$handleUnloadRequest$1(OwnedBundle.java:140)
	at org.apache.pulsar.broker.namespace.OwnedBundle$$Lambda$999/503902413.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$null$18(NonPersistentTopic.java:442)
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic$$Lambda$994/682846231.run(Unknown Source)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
```
This also makes the current ci unstable while shutdown the mock broker after the tests.

### Modifications

Use `keys()` method of the `ConcurrentOpenHashMap` to get a new keys array list.
  • Loading branch information
codelipenghui committed Jan 4, 2021
1 parent 68be899 commit 752319e
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1597,12 +1597,12 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
}

public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) {
topics.forEach((name, topicFuture) -> {
TopicName topicName = TopicName.get(name);
for (String topic : topics.keys()) {
TopicName topicName = TopicName.get(topic);
if (serviceUnit.includes(topicName)) {
pulsar.getBrokerService().removeTopicFromCache(topicName.toString());
}
});
}
}

public AuthorizationService getAuthorizationService() {
Expand Down

0 comments on commit 752319e

Please sign in to comment.