From 857095414809223844782ceeecd7a8bee93e16c7 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 6 Dec 2021 23:43:28 -0600 Subject: [PATCH 1/2] [Java Client] Send CloseProducer on timeout --- .../pulsar/client/api/ClientErrorsTest.java | 48 ++++++++++++++++++- .../pulsar/client/impl/ProducerImpl.java | 12 ++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index f66b19808f564..a5a12f32b8331 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -39,6 +39,7 @@ import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -170,6 +171,7 @@ private void producerCreateFailAfterRetryTimeout(String topic) throws Exception PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress()) .operationTimeout(1, TimeUnit.SECONDS).build(); final AtomicInteger counter = new AtomicInteger(0); + final AtomicInteger closeProducerCounter = new AtomicInteger(0); mockBrokerService.setHandleProducer((ctx, producer) -> { if (counter.incrementAndGet() == 2) { @@ -182,6 +184,10 @@ private void producerCreateFailAfterRetryTimeout(String topic) throws Exception ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.ServiceNotReady, "msg")); }); + mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> { + closeProducerCounter.incrementAndGet(); + }); + try { client.newProducer().topic(topic).create(); fail("Should have failed"); @@ -189,7 +195,47 @@ private void producerCreateFailAfterRetryTimeout(String topic) throws Exception // we fail even on the retriable error assertTrue(e instanceof PulsarClientException); } + // There is a small race condition here because the producer's timeout both fails the client creation + // and triggers sending CloseProducer. + Awaitility.await().until(() -> closeProducerCounter.get() == 1); + mockBrokerService.resetHandleProducer(); + } + + @Test + public void testCreatedProducerSendsCloseProducerAfterTimeout() throws Exception { + producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1"); + } + + @Test + public void testCreatedPartitionedProducerSendsCloseProducerAfterTimeout() throws Exception { + producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1"); + } + + private void producerCreatedThenFailsRetryTimeout(String topic) throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress()) + .operationTimeout(1, TimeUnit.SECONDS).build(); + final AtomicInteger counter = new AtomicInteger(0); + final AtomicInteger closeProducerCounter = new AtomicInteger(0); + + mockBrokerService.setHandleProducer((ctx, producer) -> { + if (counter.incrementAndGet() == 1) { + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "producer1", + SchemaVersion.Empty)); + // Trigger reconnect + ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1)); + } + // Don't respond to other producer commands to ensure timeout + }); + + mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> { + closeProducerCounter.incrementAndGet(); + }); + // Create producer should succeed then upon closure, it should reattempt creation. That will always timeout. + client.newProducer().topic(topic).create(); + Awaitility.await().until(() -> closeProducerCounter.get() == 1); + Awaitility.await().until(() -> counter.get() == 2); mockBrokerService.resetHandleProducer(); } @@ -491,7 +537,6 @@ public void testPartitionedProducerFailOnInitialization() throws Throwable { mockBrokerService.resetHandleProducer(); mockBrokerService.resetHandleCloseProducer(); - client.close(); } // failed to connect to partition at sending step if a producer which connects to broker as lazy-loading mode @@ -552,7 +597,6 @@ public void testPartitionedProducerFailOnSending() throws Throwable { mockBrokerService.resetHandleProducer(); mockBrokerService.resetHandleCloseProducer(); - client.close(); } // if a producer which doesn't connect as lazy-loading mode fails to connect while creating partitioned producer, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 0bcd21528db1b..13ec01649d319 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1476,6 +1476,16 @@ public void connectionOpened(final ClientCnx cnx) { cnx.channel().close(); return null; } + + if (cause instanceof TimeoutException) { + // Creating the producer has timed out. We need to ensure the broker closes the producer + // in case it was indeed created, otherwise it might prevent new create producer operation, + // since we are not necessarily closing the connection. + long closeRequestId = client.newRequestId(); + ByteBuf cmd = Commands.newCloseProducer(producerId, closeRequestId); + cnx.sendRequestWithId(cmd, closeRequestId); + } + if (cause instanceof PulsarClientException.ProducerFencedException) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Failed to create producer: {}", @@ -1484,7 +1494,7 @@ public void connectionOpened(final ClientCnx cnx) { } else { log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage()); } - // Close the producer since topic does not exists. + // Close the producer since topic does not exist. if (cause instanceof PulsarClientException.TopicDoesNotExistException) { closeAsync().whenComplete((v, ex) -> { if (ex != null) { From c55f62ccb1e1bacd857ca78cf51a602a655f435a Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 7 Dec 2021 13:06:18 -0600 Subject: [PATCH 2/2] Fix failed test from ClientErrorsTest --- .../pulsar/client/api/ClientErrorsTest.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index a5a12f32b8331..d7507d2b47cb9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -199,6 +199,7 @@ private void producerCreateFailAfterRetryTimeout(String topic) throws Exception // and triggers sending CloseProducer. Awaitility.await().until(() -> closeProducerCounter.get() == 1); mockBrokerService.resetHandleProducer(); + mockBrokerService.resetHandleCloseProducer(); } @Test @@ -215,28 +216,37 @@ private void producerCreatedThenFailsRetryTimeout(String topic) throws Exception @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress()) .operationTimeout(1, TimeUnit.SECONDS).build(); - final AtomicInteger counter = new AtomicInteger(0); + final AtomicInteger producerCounter = new AtomicInteger(0); final AtomicInteger closeProducerCounter = new AtomicInteger(0); mockBrokerService.setHandleProducer((ctx, producer) -> { - if (counter.incrementAndGet() == 1) { + int producerCount = producerCounter.incrementAndGet(); + if (producerCount == 1) { ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "producer1", SchemaVersion.Empty)); // Trigger reconnect ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1)); + } else if (producerCount != 2) { + // Respond to subsequent requests to prevent timeouts + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "producer1", + SchemaVersion.Empty)); } - // Don't respond to other producer commands to ensure timeout + // Don't respond to the second Producer command to ensure timeout }); mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> { closeProducerCounter.incrementAndGet(); + ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId())); }); - // Create producer should succeed then upon closure, it should reattempt creation. That will always timeout. + // Create producer should succeed then upon closure, it should reattempt creation. The first request will + // timeout, which triggers CloseProducer. The client might send send the third Producer command before the + // below assertion, so we pass with 2 or 3. client.newProducer().topic(topic).create(); Awaitility.await().until(() -> closeProducerCounter.get() == 1); - Awaitility.await().until(() -> counter.get() == 2); + Awaitility.await().until(() -> producerCounter.get() == 2 || producerCounter.get() == 3); mockBrokerService.resetHandleProducer(); + mockBrokerService.resetHandleCloseProducer(); } @Test