diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f47796d933c88..683bb0331e9bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -137,7 +137,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener> topics; + private final ConcurrentOpenHashMap>> topics; private final ConcurrentOpenHashMap replicationClients; @@ -154,7 +154,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener dynamicConfigurationMap = prepareDynamicConfigurationMap(); private final ConcurrentOpenHashMap> configRegisteredListeners; - private final ConcurrentLinkedQueue>> pendingTopicLoadingQueue; + private final ConcurrentLinkedQueue>>> pendingTopicLoadingQueue; private AuthorizationService authorizationService = null; private final ScheduledExecutorService statsUpdater; @@ -422,19 +422,19 @@ public void unloadNamespaceBundlesGracefully() { } public CompletableFuture> getTopicIfExists(final String topic) { - return getTopic(topic, false /* createIfMissing */ ).thenApply(t -> Optional.ofNullable(t)); + return getTopic(topic, false /* createIfMissing */ ); } public CompletableFuture getOrCreateTopic(final String topic) { - return getTopic(topic, true /* createIfMissing */ ); + return getTopic(topic, true /* createIfMissing */ ).thenApply(Optional::get); } - private CompletableFuture getTopic(final String topic, boolean createIfMissing) { + private CompletableFuture> getTopic(final String topic, boolean createIfMissing) { try { - CompletableFuture topicFuture = topics.get(topic); + CompletableFuture> topicFuture = topics.get(topic); if (topicFuture != null) { if (topicFuture.isCompletedExceptionally() - || (topicFuture.isDone() && topicFuture.getNow(null) == null)) { + || (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) { // Exceptional topics should be recreated. topics.remove(topic, topicFuture); } else { @@ -461,8 +461,8 @@ private CompletableFuture getTopic(final String topic, boolean createIfMi } } - private CompletableFuture createNonPersistentTopic(String topic) { - CompletableFuture topicFuture = new CompletableFuture(); + private CompletableFuture> createNonPersistentTopic(String topic) { + CompletableFuture> topicFuture = new CompletableFuture<>(); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { @@ -480,7 +480,7 @@ private CompletableFuture createNonPersistentTopic(String topic) { long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic); - topicFuture.complete(nonPersistentTopic); + topicFuture.complete(Optional.of(nonPersistentTopic)); }); replicationFuture.exceptionally((ex) -> { log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex); @@ -549,10 +549,10 @@ public PulsarClient getReplicationClient(String cluster) { * @return CompletableFuture * @throws RuntimeException */ - protected CompletableFuture loadOrCreatePersistentTopic(final String topic, boolean createIfMissing) throws RuntimeException { + protected CompletableFuture> loadOrCreatePersistentTopic(final String topic, boolean createIfMissing) throws RuntimeException { checkTopicNsOwnership(topic); - final CompletableFuture topicFuture = new CompletableFuture<>(); + final CompletableFuture> topicFuture = new CompletableFuture<>(); if (!pulsar.getConfiguration().isEnablePersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load persistent topic {}", topic); @@ -572,7 +572,7 @@ protected CompletableFuture loadOrCreatePersistentTopic(final String topi return null; }); } else { - pendingTopicLoadingQueue.add(new ImmutablePair>(topic, topicFuture)); + pendingTopicLoadingQueue.add(new ImmutablePair>>(topic, topicFuture)); if (log.isDebugEnabled()) { log.debug("topic-loading for {} added into pending queue", topic); } @@ -580,7 +580,7 @@ protected CompletableFuture loadOrCreatePersistentTopic(final String topi return topicFuture; } - private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture topicFuture) { + private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture) { final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); TopicName topicName = TopicName.get(topic); @@ -615,7 +615,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); addTopicToStatsMaps(topicName, persistentTopic); - topicFuture.complete(persistentTopic); + topicFuture.complete(Optional.of(persistentTopic)); }).exceptionally((ex) -> { log.warn( "Replication or dedup check failed. Removing topic from topics list {}, {}", @@ -638,7 +638,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) { // We were just trying to load a topic and the topic doesn't exist - topicFuture.complete(null); + topicFuture.complete(Optional.empty()); } else { log.warn("Failed to create topic {}", topic, exception); pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); @@ -782,9 +782,9 @@ public void invalidateOfflineTopicStatCache(TopicName topicName) { * This method will not make the broker attempt to load the topic if it's not already. */ public Optional getTopicReference(String topic) { - CompletableFuture future = topics.get(topic); + CompletableFuture> future = topics.get(topic); if (future != null && future.isDone() && !future.isCompletedExceptionally()) { - return Optional.ofNullable(future.join()); + return future.join(); } else { return Optional.empty(); } @@ -814,27 +814,27 @@ public Semaphore getLookupRequestSemaphore() { public void checkGC(int gcIntervalInSeconds) { topics.forEach((n, t) -> { - Topic topic = t.isCompletedExceptionally() ? null : t.getNow(null); - if (topic != null) { - topic.checkGC(gcIntervalInSeconds); + Optional topic = extractTopic(t); + if (topic.isPresent()) { + topic.get().checkGC(gcIntervalInSeconds); } }); } public void checkMessageExpiry() { topics.forEach((n, t) -> { - Topic topic = t.getNow(null); - if (topic != null) { - topic.checkMessageExpiry(); + Optional topic = extractTopic(t); + if (topic.isPresent()) { + topic.get().checkMessageExpiry(); } }); } public void checkMessageDeduplicationInfo() { topics.forEach((n, t) -> { - Topic topic = t.getNow(null); - if (topic != null) { - topic.checkMessageDeduplicationInfo(); + Optional topic = extractTopic(t); + if (topic.isPresent()) { + topic.get().checkMessageDeduplicationInfo(); } }); } @@ -868,8 +868,9 @@ public boolean isBacklogExceeded(PersistentTopic topic) { public void monitorBacklogQuota() { topics.forEach((n, t) -> { try { - if (t.getNow(null) != null && t.getNow(null) instanceof PersistentTopic) { - PersistentTopic topic = (PersistentTopic) t.getNow(null); + Optional optionalTopic = extractTopic(t); + if (optionalTopic.isPresent() && optionalTopic.get() instanceof PersistentTopic) { + PersistentTopic topic = (PersistentTopic) optionalTopic.get(); if (isBacklogExceeded(topic)) { getBacklogQuotaManager().handleExceededBacklogQuota(topic); } else if (topic == null) { @@ -921,7 +922,8 @@ public CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit) if (serviceUnit.includes(topicName)) { // Topic needs to be unloaded log.info("[{}] Unloading topic", topicName); - closeFutures.add(topicFuture.thenCompose(Topic::close)); + closeFutures.add(topicFuture + .thenCompose(t -> t.isPresent() ? t.get().close() : CompletableFuture.completedFuture(null))); } }); CompletableFuture aggregator = FutureUtil.waitForAll(closeFutures); @@ -978,7 +980,7 @@ public int getNumberOfNamespaceBundles() { return this.numberOfNamespaceBundles; } - public ConcurrentOpenHashMap> getTopics() { + public ConcurrentOpenHashMap>> getTopics() { return topics; } @@ -996,7 +998,8 @@ public void onUpdate(String path, Policies data, Stat stat) { if (log.isDebugEnabled()) { log.debug("Notifying topic that policies have changed: {}", name); } - topic.onPoliciesUpdate(data); + + topic.ifPresent(t -> t.onPoliciesUpdate(data)); }); } }); @@ -1033,9 +1036,9 @@ public String generateUniqueProducerName() { public Map getTopicStats() { HashMap stats = new HashMap<>(); topics.forEach((name, topicFuture) -> { - Topic currentTopic = topicFuture.getNow(null); - if (currentTopic != null) { - stats.put(name, currentTopic.getStats()); + Optional topic = extractTopic(topicFuture); + if (topic.isPresent()) { + stats.put(name, topic.get().getStats()); } }); return stats; @@ -1128,18 +1131,12 @@ private void updateTopicMessageDispatchRate() { this.pulsar().getExecutor().execute(() -> { // update message-rate for each topic topics.forEach((name, topicFuture) -> { - if (topicFuture.isDone()) { - String topicName = null; - try { - if (topicFuture.get() instanceof PersistentTopic) { - PersistentTopic topic = (PersistentTopic) topicFuture.get(); - topicName = topicFuture.get().getName(); - // it first checks namespace-policy rate and if not present then applies broker-config - topic.getDispatchRateLimiter().updateDispatchRate(); - } - } catch (Exception e) { - log.warn("[{}] failed to update message-dispatch rate {}", topicName, e.getMessage()); - } + Optional topic = extractTopic(topicFuture); + + if (topic.isPresent() && topic.get() instanceof PersistentTopic) { + PersistentTopic persistentTopic = (PersistentTopic) topic.get(); + // it first checks namespace-policy rate and if not present then applies broker-config + persistentTopic.getDispatchRateLimiter().updateDispatchRate(); } }); }); @@ -1149,22 +1146,19 @@ private void updateSubscriptionMessageDispatchRate() { this.pulsar().getExecutor().submit(() -> { // update message-rate for each topic subscription topics.forEach((name, topicFuture) -> { - if (topicFuture.isDone()) { - try { - topicFuture.get().getSubscriptions().forEach((subName, persistentSubscription) -> { - if (persistentSubscription - .getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { - ((PersistentDispatcherMultipleConsumers) persistentSubscription - .getDispatcher()).getDispatchRateLimiter().updateDispatchRate(); - } else if (persistentSubscription + Optional topic = extractTopic(topicFuture); + + if (topic.isPresent()) { + topic.get().getSubscriptions().forEach((subName, persistentSubscription) -> { + if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { + ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher()) + .getDispatchRateLimiter().updateDispatchRate(); + } else if (persistentSubscription .getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) { - ((PersistentDispatcherSingleActiveConsumer) persistentSubscription - .getDispatcher()).getDispatchRateLimiter().updateDispatchRate(); - } - }); - } catch (Exception e) { - log.warn("Failed to get topic from future while update subscription dispatch rate ", e); - } + ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher()) + .getDispatchRateLimiter().updateDispatchRate(); + } + }); } }); }); @@ -1177,11 +1171,13 @@ private void updateManagedLedgerConfig() { if (topicFuture.isDone()) { String topicName = null; try { - if (topicFuture.getNow(null) instanceof PersistentTopic) { - PersistentTopic topic = (PersistentTopic) topicFuture.get(); - topicName = topicFuture.get().getName(); + Optional topic = extractTopic(topicFuture); + + if (topic.isPresent() && topic.get() instanceof PersistentTopic) { + PersistentTopic persistentTopic = (PersistentTopic) topic.get(); + topicName = persistentTopic.getName(); // update skipNonRecoverableLedger configuration - topic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData( + persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData( pulsar.getConfiguration().isAutoSkipNonRecoverableData()); } } catch (Exception e) { @@ -1340,7 +1336,7 @@ private static ConcurrentOpenHashMap prepareDynamicConfigur * permit if it was successful to acquire it. */ private void createPendingLoadTopic() { - Pair> pendingTopic = pendingTopicLoadingQueue.poll(); + Pair>> pendingTopic = pendingTopicLoadingQueue.poll(); if (pendingTopic == null) { return; } @@ -1348,7 +1344,7 @@ private void createPendingLoadTopic() { final String topic = pendingTopic.getLeft(); try { checkTopicNsOwnership(topic); - CompletableFuture pendingFuture = pendingTopic.getRight(); + CompletableFuture> pendingFuture = pendingTopic.getRight(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); createPersistentTopic(topic, true, pendingFuture); @@ -1442,25 +1438,21 @@ private void blockDispatchersWithLargeUnAckMessages() { lock.readLock().lock(); try { topics.forEach((name, topicFuture) -> { - if (topicFuture.isDone()) { - try { - topicFuture.get().getSubscriptions().forEach((subName, persistentSubscription) -> { - if (persistentSubscription - .getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { - PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription - .getDispatcher(); - int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages(); - if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) { - log.info("[{}] Blocking dispatcher due to reached max broker limit {}", - dispatcher.getName(), dispatcher.getTotalUnackedMessages()); - dispatcher.blockDispatcherOnUnackedMsgs(); - blockedDispatchers.add(dispatcher); - } + Optional topic = extractTopic(topicFuture); + if (topic.isPresent()) { + topic.get().getSubscriptions().forEach((subName, persistentSubscription) -> { + if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { + PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription + .getDispatcher(); + int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages(); + if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) { + log.info("[{}] Blocking dispatcher due to reached max broker limit {}", + dispatcher.getName(), dispatcher.getTotalUnackedMessages()); + dispatcher.blockDispatcherOnUnackedMsgs(); + blockedDispatchers.add(dispatcher); } - }); - } catch (Exception e) { - log.warn("Failed to get topic from future ", e); - } + } + }); } }); } finally { @@ -1496,4 +1488,15 @@ public ConfigField(Field field) { this.field = field; } } + + /** + * Safely extract optional topic instance from a future, in a way to avoid unchecked exceptions and race conditions. + */ + public static Optional extractTopic(CompletableFuture> topicFuture) { + if (topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) { + return topicFuture.join(); + } else { + return Optional.empty(); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java index c2ffc534150fc..914a25c7e4517 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java @@ -19,9 +19,12 @@ package org.apache.pulsar.broker.stats; import java.util.Map; +import java.util.Optional; import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.naming.TopicName; @@ -47,8 +50,9 @@ public static Map> generate(PulsarSer private Map> generate() throws Exception { if (pulsar.getBrokerService() != null && pulsar.getBrokerService().getTopics() != null) { pulsar.getBrokerService().getTopics().forEach((name, topicFuture) -> { - PersistentTopic persistentTopic = (PersistentTopic) topicFuture.getNow(null); - if (persistentTopic != null) { + Optional topic = BrokerService.extractTopic(topicFuture); + if (topic.isPresent() && topic.get() instanceof PersistentTopic) { + PersistentTopic persistentTopic = (PersistentTopic) topic.get(); TopicName topicName = TopicName.get(persistentTopic.getName()); put(topicName, persistentTopic.getManagedLedger().getStats().getPendingBookieOpsStats()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 01e388a33437f..a2ef39726f589 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -32,11 +32,16 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; + import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.URI; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -75,10 +80,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.github.benmanes.caffeine.cache.AsyncLoadingCache; -import com.google.common.collect.Lists; -import com.google.common.hash.Hashing; - public class NamespaceServiceTest extends BrokerTestBase { @BeforeMethod @@ -260,10 +261,11 @@ public void testUnloadNamespaceBundleFailure() throws Exception { final String topicName = "persistent://my-property/use/my-ns/my-topic1"; pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe(); - ConcurrentOpenHashMap> topics = pulsar.getBrokerService().getTopics(); - Topic spyTopic = spy(topics.get(topicName).get()); + ConcurrentOpenHashMap>> topics = pulsar.getBrokerService() + .getTopics(); + Topic spyTopic = spy(topics.get(topicName).get().get()); topics.clear(); - CompletableFuture topicFuture = CompletableFuture.completedFuture(spyTopic); + CompletableFuture> topicFuture = CompletableFuture.completedFuture(Optional.of(spyTopic)); // add mock topic topics.put(topicName, topicFuture); doAnswer(new Answer>() { @@ -300,10 +302,10 @@ public void testUnloadNamespaceBundleWithStuckTopic() throws Exception { final String topicName = "persistent://my-property/use/my-ns/my-topic1"; Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") .subscribe(); - ConcurrentOpenHashMap> topics = pulsar.getBrokerService().getTopics(); - Topic spyTopic = spy(topics.get(topicName).get()); + ConcurrentOpenHashMap>> topics = pulsar.getBrokerService().getTopics(); + Topic spyTopic = spy(topics.get(topicName).get().get()); topics.clear(); - CompletableFuture topicFuture = CompletableFuture.completedFuture(spyTopic); + CompletableFuture> topicFuture = CompletableFuture.completedFuture(Optional.of(spyTopic)); // add mock topic topics.put(topicName, topicFuture); // return uncompleted future as close-topic result. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 0918bc7c97171..31773bbd67565 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -746,7 +746,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception { pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle, false); // try to create topic which should fail as bundle is disable - CompletableFuture futureResult = pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true); + CompletableFuture> futureResult = pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true); try { futureResult.get();