From 434fbcd58bec86cd1c7a34fba4a7ae8e54413dd7 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Wed, 3 Nov 2021 21:02:36 +0800 Subject: [PATCH] Cleanup already deleted namespace topics. (#12597) Cherry pick from #7473. #7473 has fix the `Cleanup already deleted namespace topics` issue, but with #8129 involved, changes have been changed back. ### Motivation We are having frequent issues when user removes cluster from the global namespace where broker from removed-cluster fails to unload topic and namespace bundle still loaded with the broker. It happens when broker from removed-cluster receives below error ``` 17:38:52.199 [pulsar-io-22-28] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://prop/global/ns/tp1][east -> west] Failed to close dispatch rate limiter: org.apache.pulsar.client.api.PulsarClientException: Producer was not registered on the connection : 17:38:52.199 [pulsar-io-22-28] WARN org.apache.pulsar.broker.service.AbstractReplicator - [persistent://prop/global/ns/tp1][east -> west]] Exception: 'org.apache.pulsar.client.api.PulsarClientException: Producer was not registered on the connection' occured while trying to close the producer. retrying again in 0.1 s : 17:38:52.351 [pulsar-io-22-37] ERROR org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://prop/global/ns/tp1] Error closing topic java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException: Producer was not registered on the connection at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?] ``` ### Modification Source Broker should return explicit error-code when producer is already closed and dest-broker from removed-cluster should handle this error and clean up the replicator and topic gracefully. (cherry picked from commit 6b3fb4193857c324c748ea53ae5e6028137b2e35) --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 02a06ec93edf9..e18f51eb181cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1570,9 +1570,8 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) { CompletableFuture producerFuture = producers.get(producerId); if (producerFuture == null) { - log.warn("[{}] Producer was not registered on the connection. producerId={}", remoteAddress, producerId); - commandSender.sendErrorResponse(requestId, ServerError.UnknownError, - "Producer was not registered on the connection"); + log.info("[{}] Producer {} was not registered on the connection", remoteAddress, producerId); + ctx.writeAndFlush(Commands.newSuccess(requestId)); return; } @@ -1617,8 +1616,8 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { CompletableFuture consumerFuture = consumers.get(consumerId); if (consumerFuture == null) { - log.warn("[{}] Consumer was not registered on the connection: consumerId={}", remoteAddress, consumerId); - commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found"); + log.info("[{}] Consumer was not registered on the connection: {}", consumerId, remoteAddress); + ctx.writeAndFlush(Commands.newSuccess(requestId)); return; }