Skip to content

Commit

Permalink
add error-codes
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Jul 8, 2020
1 parent 570132d commit 972ae3e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) {
CompletableFuture<Producer> producerFuture = producers.get(producerId);
if (producerFuture == null) {
log.warn("[{}] Producer {} was not registered on the connection", remoteAddress, producerId);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AlreadyClosed,
"Producer was not registered on the connection"));
return;
}
Expand Down Expand Up @@ -2002,4 +2002,8 @@ public String getAuthRole() {
public String getAuthMethod() {
return authMethod;
}

public ConcurrentLongHashMap<CompletableFuture<Producer>> getProducers() {
return producers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
Expand Down Expand Up @@ -760,11 +761,13 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
final CompletableFuture<Void> future = new CompletableFuture<>();

super.disconnect(failIfHasBacklog).thenRun(() -> {
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
future.complete(null);
cleanAfterDisconnect(future);
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
if (t instanceof TopicBusyException == false) {
if (t instanceof AlreadyClosedException) {
cleanAfterDisconnect(future);
return null;
} else if (t instanceof TopicBusyException == false) {
log.error("[{}][{} -> {}] Failed to close dispatch rate limiter: {}", topicName, localCluster,
remoteCluster, ex.getMessage());
}
Expand All @@ -775,6 +778,11 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
return future;
}

private void cleanAfterDisconnect(CompletableFuture<Void> future) {
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
future.complete(null);
}

@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -925,6 +926,54 @@ public void testUpdateGlobalTopicPartition() throws Exception {
client1.close();
client2.close();
}

/**
* It validates that closing replicator producer will handle AlreadyClosedException and allow topic to close
* gracefully.
*
* @throws Exception
*/
@Test
public void testCleanReplicatorProducer() throws Exception {
log.info("--- Starting ReplicatorTest::testCleanReplicatorProducer ---");

final String cluster1 = pulsar1.getConfig().getClusterName();
final String cluster2 = pulsar2.getConfig().getClusterName();
final String namespace = "pulsar/global/ns-" + System.nanoTime();
final String topicName = "persistent://" + namespace + "/cleanup";

final String subscriberName = "sub1";
admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2));

PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subscriberName)
.subscribe();
Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subscriberName)
.subscribe();

assertEquals(pulsar1.getNamespaceService().getOwnedServiceUnits().size(), 2);
((PersistentTopic) pulsar2.getBrokerService().getTopicIfExists(topicName).get().get()).producers
.forEach((name, prod) -> {
prod.getCnx().getProducers().clear();
});

admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster2));

MockedPulsarServiceBaseTest.retryStrategically(
(test) -> (pulsar1.getNamespaceService().getOwnedServiceUnits().size() == 1), 5, 100);
assertEquals(pulsar1.getNamespaceService().getOwnedServiceUnits().size(), 1);

consumer1.close();
consumer2.close();

client1.close();
client2.close();
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,8 @@ private PulsarClientException getPulsarClientException(ServerError error, String
return new PulsarClientException.TopicDoesNotExistException(errorMsg);
case ConsumerAssignError:
return new PulsarClientException.ConsumerAssignException(errorMsg);
case AlreadyClosed:
return new PulsarClientException.AlreadyClosedException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 972ae3e

Please sign in to comment.