diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3131a7649cce4..193867a626944 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1359,7 +1359,7 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) { CompletableFuture producerFuture = producers.get(producerId); if (producerFuture == null) { log.warn("[{}] Producer {} was not registered on the connection", remoteAddress, producerId); - ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, + ctx.writeAndFlush(Commands.newError(requestId, ServerError.AlreadyClosed, "Producer was not registered on the connection")); return; } @@ -2002,4 +2002,8 @@ public String getAuthRole() { public String getAuthMethod() { return authMethod; } + + public ConcurrentLongHashMap> getProducers() { + return producers; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 6503df3fa3fdc..cff6b2ae20fb3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -54,6 +54,7 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ProducerImpl; @@ -760,11 +761,13 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) final CompletableFuture future = new CompletableFuture<>(); super.disconnect(failIfHasBacklog).thenRun(() -> { - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - future.complete(null); + cleanAfterDisconnect(future); }).exceptionally(ex -> { Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); - if (t instanceof TopicBusyException == false) { + if (t instanceof AlreadyClosedException) { + cleanAfterDisconnect(future); + return null; + } else if (t instanceof TopicBusyException == false) { log.error("[{}][{} -> {}] Failed to close dispatch rate limiter: {}", topicName, localCluster, remoteCluster, ex.getMessage()); } @@ -775,6 +778,11 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return future; } + private void cleanAfterDisconnect(CompletableFuture future) { + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + future.complete(null); + } + @Override public boolean isConnected() { ProducerImpl producer = this.producer; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 4919b24eb5598..12a813e574914 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -51,6 +51,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -925,6 +926,54 @@ public void testUpdateGlobalTopicPartition() throws Exception { client1.close(); client2.close(); } + + /** + * It validates that closing replicator producer will handle AlreadyClosedException and allow topic to close + * gracefully. + * + * @throws Exception + */ + @Test + public void testCleanReplicatorProducer() throws Exception { + log.info("--- Starting ReplicatorTest::testCleanReplicatorProducer ---"); + + final String cluster1 = pulsar1.getConfig().getClusterName(); + final String cluster2 = pulsar2.getConfig().getClusterName(); + final String namespace = "pulsar/global/ns-" + System.nanoTime(); + final String topicName = "persistent://" + namespace + "/cleanup"; + + final String subscriberName = "sub1"; + admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2)); + + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subscriberName) + .subscribe(); + Consumer consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subscriberName) + .subscribe(); + + assertEquals(pulsar1.getNamespaceService().getOwnedServiceUnits().size(), 2); + ((PersistentTopic) pulsar2.getBrokerService().getTopicIfExists(topicName).get().get()).producers + .forEach((name, prod) -> { + prod.getCnx().getProducers().clear(); + }); + + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster2)); + + MockedPulsarServiceBaseTest.retryStrategically( + (test) -> (pulsar1.getNamespaceService().getOwnedServiceUnits().size() == 1), 5, 100); + assertEquals(pulsar1.getNamespaceService().getOwnedServiceUnits().size(), 1); + + consumer1.close(); + consumer2.close(); + + client1.close(); + client2.close(); + } + private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index d63cbc13c55f3..290ec2b88a2aa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1003,6 +1003,8 @@ private PulsarClientException getPulsarClientException(ServerError error, String return new PulsarClientException.TopicDoesNotExistException(errorMsg); case ConsumerAssignError: return new PulsarClientException.ConsumerAssignException(errorMsg); + case AlreadyClosed: + return new PulsarClientException.AlreadyClosedException(errorMsg); case UnknownError: default: return new PulsarClientException(errorMsg); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 02fed41a0b337..04fd556a59911 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -82,6 +82,7 @@ public enum ServerError ConsumerAssignError(19, 19), TransactionCoordinatorNotFound(20, 20), InvalidTxnStatus(21, 21), + AlreadyClosed(22, 22), ; public static final int UnknownError_VALUE = 0; @@ -106,6 +107,7 @@ public enum ServerError public static final int ConsumerAssignError_VALUE = 19; public static final int TransactionCoordinatorNotFound_VALUE = 20; public static final int InvalidTxnStatus_VALUE = 21; + public static final int AlreadyClosed_VALUE = 22; public final int getNumber() { return value; } @@ -134,6 +136,7 @@ public static ServerError valueOf(int value) { case 19: return ConsumerAssignError; case 20: return TransactionCoordinatorNotFound; case 21: return InvalidTxnStatus; + case 22: return AlreadyClosed; default: return null; } }