From 6cdf3786b115bbaa96cc37b818fb5f16b23ebaf1 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Sun, 26 Jan 2025 10:09:39 +0800 Subject: [PATCH 1/5] [fix][broker] Closed topics won't be removed from the cache --- .../pulsar/broker/service/AbstractTopic.java | 4 + .../pulsar/broker/service/BrokerService.java | 39 +---- .../nonpersistent/NonPersistentTopic.java | 4 +- .../service/persistent/PersistentTopic.java | 4 +- .../buffer/impl/TopicTransactionBuffer.java | 27 +++- .../service/BrokerBkEnsemblesTests.java | 4 +- .../impl/TransactionPersistentTopicTest.java | 148 ++++++++++++++++++ 7 files changed, 181 insertions(+), 49 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 69a38bc50de9d..b71f77a1f1e67 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -48,6 +48,7 @@ import java.util.function.ToLongFunction; import javax.annotation.Nonnull; import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -95,6 +96,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60; protected final String topic; + @Getter + @Setter + protected volatile CompletableFuture> createFuture; // Producers currently connected to this topic protected final ConcurrentHashMap producers; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 79e6fb2b02e31..2529fd6b1a852 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1326,6 +1326,7 @@ private CompletableFuture> createNonPersistentTopic(String topic NonPersistentTopic nonPersistentTopic; try { nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); + nonPersistentTopic.setCreateFuture(topicFuture); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); topicFuture.completeExceptionally(e); @@ -1800,6 +1801,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { PersistentTopic persistentTopic = isSystemTopic(topic) ? new SystemTopic(topic, ledger, BrokerService.this) : newTopic(topic, ledger, BrokerService.this, PersistentTopic.class); + persistentTopic.setCreateFuture(topicFuture); persistentTopic .initialize() .thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded()) @@ -2409,42 +2411,7 @@ public AuthorizationService getAuthorizationService() { return authorizationService; } - public CompletableFuture removeTopicFromCache(Topic topic) { - Optional>> createTopicFuture = findTopicFutureInCache(topic); - if (createTopicFuture.isEmpty()){ - return CompletableFuture.completedFuture(null); - } - return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get()); - } - - private Optional>> findTopicFutureInCache(Topic topic){ - if (topic == null){ - return Optional.empty(); - } - final CompletableFuture> createTopicFuture = topics.get(topic.getName()); - // If not exists in cache, do nothing. - if (createTopicFuture == null){ - return Optional.empty(); - } - // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic. - if (!createTopicFuture.isDone()){ - return Optional.empty(); - } - // If the future in cache has exception complete, - // the topic instance in the cache is not the same with the topic. - if (createTopicFuture.isCompletedExceptionally()){ - return Optional.empty(); - } - Optional optionalTopic = createTopicFuture.join(); - Topic topicInCache = optionalTopic.orElse(null); - if (topicInCache == null || topicInCache != topic){ - return Optional.empty(); - } else { - return Optional.of(createTopicFuture); - } - } - - private CompletableFuture removeTopicFutureFromCache(String topic, + public CompletableFuture removeTopicFutureFromCache(String topic, CompletableFuture> createTopicFuture) { TopicName topicName = TopicName.get(topic); return pulsar.getNamespaceService().getBundleAsync(topicName) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 7cdc8cc11a482..917471e1cf381 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -454,7 +454,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c // topic GC iterates over topics map and removing from the map with the same thread creates // deadlock. so, execute it in different thread brokerService.executor().execute(() -> { - brokerService.removeTopicFromCache(NonPersistentTopic.this); + brokerService.removeTopicFutureFromCache(topic, createFuture); unregisterTopicPolicyListener(); log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); @@ -555,7 +555,7 @@ public CompletableFuture close( brokerService.executor().execute(() -> { if (disconnectClients) { - brokerService.removeTopicFromCache(NonPersistentTopic.this); + brokerService.removeTopicFutureFromCache(topic, createFuture); unregisterTopicPolicyListener(); } closeFuture.complete(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2325c8286a1be..3792d3170acc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1530,7 +1530,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(PersistentTopic.this); + brokerService.removeTopicFutureFromCache(topic, createFuture); dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); @@ -1807,7 +1807,7 @@ private boolean isClosed() { } private void disposeTopic(CompletableFuture closeFuture) { - brokerService.removeTopicFromCache(PersistentTopic.this) + brokerService.removeTopicFutureFromCache(topic, createFuture) .thenRun(() -> { replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 41977e6b61d88..c43f0ed7fb9c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -109,7 +109,25 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final AbortedTxnProcessor.SnapshotType snapshotType; private final MaxReadPositionCallBack maxReadPositionCallBack; + private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic) { + return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled() + ? new SnapshotSegmentAbortedTxnProcessorImpl(topic) + : new SingleSnapshotAbortedTxnProcessorImpl(topic); + } + + private static AbortedTxnProcessor.SnapshotType determineSnapshotType(PersistentTopic topic) { + return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled() + ? AbortedTxnProcessor.SnapshotType.Segment + : AbortedTxnProcessor.SnapshotType.Single; + } + public TopicTransactionBuffer(PersistentTopic topic) { + this(topic, createSnapshotProcessor(topic), determineSnapshotType(topic)); + } + + @VisibleForTesting + TopicTransactionBuffer(PersistentTopic topic, AbortedTxnProcessor snapshotAbortedTxnProcessor, + AbortedTxnProcessor.SnapshotType snapshotType) { super(State.None); this.topic = topic; this.timer = topic.getBrokerService().getPulsar().getTransactionTimer(); @@ -118,13 +136,8 @@ public TopicTransactionBuffer(PersistentTopic topic) { this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar() .getConfiguration().getTransactionBufferSnapshotMinTimeInMillis(); this.maxReadPosition = topic.getManagedLedger().getLastConfirmedEntry(); - if (topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()) { - snapshotAbortedTxnProcessor = new SnapshotSegmentAbortedTxnProcessorImpl(topic); - snapshotType = AbortedTxnProcessor.SnapshotType.Segment; - } else { - snapshotAbortedTxnProcessor = new SingleSnapshotAbortedTxnProcessorImpl(topic); - snapshotType = AbortedTxnProcessor.SnapshotType.Single; - } + this.snapshotAbortedTxnProcessor = snapshotAbortedTxnProcessor; + this.snapshotType = snapshotType; this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack(); this.recover(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 68a52c4b4c381..8141b97f3ca21 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -126,7 +126,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception { // (3) remove topic and managed-ledger from broker which means topic is not closed gracefully consumer.close(); producer.close(); - pulsar.getBrokerService().removeTopicFromCache(topic); + pulsar.getBrokerService().removeTopicFutureFromCache(topic.getName(), topic.getCreateFuture()); ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getDefaultManagedLedgerFactory(); Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); field.setAccessible(true); @@ -249,7 +249,7 @@ public void testSkipCorruptDataLedger() throws Exception { // clean managed-ledger and recreate topic to clean any data from the cache producer.close(); - pulsar.getBrokerService().removeTopicFromCache(topic); + pulsar.getBrokerService().removeTopicFutureFromCache(topic.getName(), topic.getCreateFuture()); ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getDefaultManagedLedgerFactory(); Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); field.setAccessible(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java new file mode 100644 index 0000000000000..d686e2a1cfa77 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicFactory; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class TransactionPersistentTopicTest extends ProducerConsumerBase { + + private static CountDownLatch topicInitSuccessSignal = new CountDownLatch(1); + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + // Intercept when the `topicFuture` is about to complete and wait until the topic close operation finishes. + conf.setTopicFactoryClassName(MyTopicFactory.class.getName()); + conf.setTransactionCoordinatorEnabled(true); + conf.setBrokerDeduplicationEnabled(false); + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testNoOrphanClosedTopicIfTxnInternalFailed() { + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); + + BrokerService brokerService = pulsar.getBrokerService(); + + // 2. Mock close topic when create transactionBuffer + TransactionBufferProvider mockTransactionBufferProvider = originTopic -> { + AbortedTxnProcessor abortedTxnProcessor = mock(AbortedTxnProcessor.class); + doAnswer(invocation -> { + topicInitSuccessSignal.await(); + return CompletableFuture.failedFuture(new RuntimeException("Mock recovery failed")); + }).when(abortedTxnProcessor).recoverFromSnapshot(); + when(abortedTxnProcessor.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + return new TopicTransactionBuffer( + (PersistentTopic) originTopic, abortedTxnProcessor, AbortedTxnProcessor.SnapshotType.Single); + }; + TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider(); + pulsar.setTransactionBufferProvider(mockTransactionBufferProvider); + + // 3. Trigger create topic and assert topic load success. + CompletableFuture> firstLoad = brokerService.getTopic(tpName, true); + Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertTrue(firstLoad.isDone()); + assertFalse(firstLoad.isCompletedExceptionally()); + }); + + // 4. Assert topic removed from cache + Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(brokerService.getTopics().containsKey(tpName)); + }); + + // 5. Set txn provider to back + pulsar.setTransactionBufferProvider(originalTransactionBufferProvider); + } + + public static class MyTopicFactory implements TopicFactory { + @Override + public T create(String topic, ManagedLedger ledger, BrokerService brokerService, + Class topicClazz) { + try { + if (topicClazz == NonPersistentTopic.class) { + return (T) new NonPersistentTopic(topic, brokerService); + } else { + return (T) new MyPersistentTopic(topic, ledger, brokerService); + } + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + @Override + public void close() throws IOException { + // No-op + } + } + + public static class MyPersistentTopic extends PersistentTopic { + + public MyPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) { + super(topic, ledger, brokerService); + } + + @SneakyThrows + @Override + public CompletableFuture checkDeduplicationStatus() { + topicInitSuccessSignal.countDown(); + // Sleep 1s pending txn buffer recover failed and close topic + Thread.sleep(1000); + return CompletableFuture.completedFuture(null); + } + } + +} From 255aa719517a60bae703e470554e000b400bf1a2 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Sun, 26 Jan 2025 19:18:14 +0800 Subject: [PATCH 2/5] change annotation --- .../buffer/impl/TransactionPersistentTopicTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java index d686e2a1cfa77..508423adce4d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java @@ -74,7 +74,7 @@ public void testNoOrphanClosedTopicIfTxnInternalFailed() { BrokerService brokerService = pulsar.getBrokerService(); - // 2. Mock close topic when create transactionBuffer + // 1. Mock close topic when create transactionBuffer TransactionBufferProvider mockTransactionBufferProvider = originTopic -> { AbortedTxnProcessor abortedTxnProcessor = mock(AbortedTxnProcessor.class); doAnswer(invocation -> { @@ -88,7 +88,7 @@ public void testNoOrphanClosedTopicIfTxnInternalFailed() { TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider(); pulsar.setTransactionBufferProvider(mockTransactionBufferProvider); - // 3. Trigger create topic and assert topic load success. + // 2. Trigger create topic and assert topic load success. CompletableFuture> firstLoad = brokerService.getTopic(tpName, true); Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS) .pollInterval(200, TimeUnit.MILLISECONDS) @@ -97,14 +97,14 @@ public void testNoOrphanClosedTopicIfTxnInternalFailed() { assertFalse(firstLoad.isCompletedExceptionally()); }); - // 4. Assert topic removed from cache + // 3. Assert topic removed from cache Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .untilAsserted(() -> { assertFalse(brokerService.getTopics().containsKey(tpName)); }); - // 5. Set txn provider to back + // 4. Set txn provider to back pulsar.setTransactionBufferProvider(originalTransactionBufferProvider); } From 175272167a5ecbac68f2dc4ccc30d44e6ea724af Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 4 Feb 2025 17:35:05 +0800 Subject: [PATCH 3/5] Address code review --- .../pulsar/broker/service/AbstractTopic.java | 2 ++ .../pulsar/broker/service/BrokerService.java | 14 ++++++++++---- .../service/nonpersistent/NonPersistentTopic.java | 4 ++-- .../broker/service/persistent/PersistentTopic.java | 4 ++-- .../broker/service/BrokerBkEnsemblesTests.java | 4 ++-- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index b71f77a1f1e67..753ea026fc48d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -96,6 +96,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60; protected final String topic; + + // createFuture associated with the topic to ensure safe removal from the cache of brokerService @Getter @Setter protected volatile CompletableFuture> createFuture; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2529fd6b1a852..ddd436b085493 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2411,12 +2411,18 @@ public AuthorizationService getAuthorizationService() { return authorizationService; } - public CompletableFuture removeTopicFutureFromCache(String topic, - CompletableFuture> createTopicFuture) { - TopicName topicName = TopicName.get(topic); + /** + * Removes the topic from the cache only if the topicName and associated createFuture match exactly. + * The TopicEvent.UNLOAD event will be triggered before and after removal. + * + * @param topic The topic to be removed. + * @return A CompletableFuture that completes when the operation is done. + */ + public CompletableFuture removeTopicFromCache(AbstractTopic topic) { + TopicName topicName = TopicName.get(topic.getName()); return pulsar.getNamespaceService().getBundleAsync(topicName) .thenAccept(namespaceBundle -> { - removeTopicFromCache(topic, namespaceBundle, createTopicFuture); + removeTopicFromCache(topic.getName(), namespaceBundle, topic.getCreateFuture()); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 917471e1cf381..8ff3dadbe6da1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -454,7 +454,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c // topic GC iterates over topics map and removing from the map with the same thread creates // deadlock. so, execute it in different thread brokerService.executor().execute(() -> { - brokerService.removeTopicFutureFromCache(topic, createFuture); + brokerService.removeTopicFromCache(this); unregisterTopicPolicyListener(); log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); @@ -555,7 +555,7 @@ public CompletableFuture close( brokerService.executor().execute(() -> { if (disconnectClients) { - brokerService.removeTopicFutureFromCache(topic, createFuture); + brokerService.removeTopicFromCache(this); unregisterTopicPolicyListener(); } closeFuture.complete(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3792d3170acc9..0dc87998511d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1530,7 +1530,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFutureFromCache(topic, createFuture); + brokerService.removeTopicFromCache(PersistentTopic.this); dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); @@ -1807,7 +1807,7 @@ private boolean isClosed() { } private void disposeTopic(CompletableFuture closeFuture) { - brokerService.removeTopicFutureFromCache(topic, createFuture) + brokerService.removeTopicFromCache(this) .thenRun(() -> { replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 8141b97f3ca21..68a52c4b4c381 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -126,7 +126,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception { // (3) remove topic and managed-ledger from broker which means topic is not closed gracefully consumer.close(); producer.close(); - pulsar.getBrokerService().removeTopicFutureFromCache(topic.getName(), topic.getCreateFuture()); + pulsar.getBrokerService().removeTopicFromCache(topic); ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getDefaultManagedLedgerFactory(); Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); field.setAccessible(true); @@ -249,7 +249,7 @@ public void testSkipCorruptDataLedger() throws Exception { // clean managed-ledger and recreate topic to clean any data from the cache producer.close(); - pulsar.getBrokerService().removeTopicFutureFromCache(topic.getName(), topic.getCreateFuture()); + pulsar.getBrokerService().removeTopicFromCache(topic); ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getDefaultManagedLedgerFactory(); Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); field.setAccessible(true); From c61c192411d5ed9838c3c3c1f39f0d9378de60e0 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 5 Feb 2025 18:45:37 +0800 Subject: [PATCH 4/5] Remove useless change --- .../broker/service/nonpersistent/NonPersistentTopic.java | 4 ++-- .../pulsar/broker/service/persistent/PersistentTopic.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 8ff3dadbe6da1..7cdc8cc11a482 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -454,7 +454,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c // topic GC iterates over topics map and removing from the map with the same thread creates // deadlock. so, execute it in different thread brokerService.executor().execute(() -> { - brokerService.removeTopicFromCache(this); + brokerService.removeTopicFromCache(NonPersistentTopic.this); unregisterTopicPolicyListener(); log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); @@ -555,7 +555,7 @@ public CompletableFuture close( brokerService.executor().execute(() -> { if (disconnectClients) { - brokerService.removeTopicFromCache(this); + brokerService.removeTopicFromCache(NonPersistentTopic.this); unregisterTopicPolicyListener(); } closeFuture.complete(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0dc87998511d6..2325c8286a1be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1807,7 +1807,7 @@ private boolean isClosed() { } private void disposeTopic(CompletableFuture closeFuture) { - brokerService.removeTopicFromCache(this) + brokerService.removeTopicFromCache(PersistentTopic.this) .thenRun(() -> { replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); From 3401270ffbd19f201d1406a8fde267156c5f6914 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 5 Feb 2025 22:14:50 +0800 Subject: [PATCH 5/5] Address comment --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 753ea026fc48d..9a115e6d1ca4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -97,7 +97,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected final String topic; - // createFuture associated with the topic to ensure safe removal from the cache of brokerService + // Reference to the CompletableFuture returned when creating this topic in BrokerService. + // Used to safely remove the topic from BrokerService's cache by ensuring we remove the exact + // topic instance that was created. @Getter @Setter protected volatile CompletableFuture> createFuture;