Skip to content

Commit

Permalink
[Java Client] Send CloseConsumer on timeout (#16616)
Browse files Browse the repository at this point in the history
(cherry picked from commit 8f31655)
  • Loading branch information
michaeljmarshall authored and codelipenghui committed Aug 8, 2022
1 parent 6e821e8 commit c2bb553
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private void producerCreatedThenFailsRetryTimeout(String topic) throws Exception
});

// Create producer should succeed then upon closure, it should reattempt creation. The first request will
// timeout, which triggers CloseProducer. The client might send send the third Producer command before the
// time out, which triggers CloseProducer. The client might send the third Producer command before the
// below assertion, so we pass with 2 or 3.
client.newProducer().topic(topic).create();
Awaitility.await().until(() -> closeProducerCounter.get() == 1);
Expand All @@ -249,6 +249,51 @@ private void producerCreatedThenFailsRetryTimeout(String topic) throws Exception
mockBrokerService.resetHandleCloseProducer();
}

@Test
public void testCreatedConsumerSendsCloseConsumerAfterTimeout() throws Exception {
consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1");
}

@Test
public void testCreatedPartitionedConsumerSendsCloseConsumerAfterTimeout() throws Exception {
consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1");
}

private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
.operationTimeout(1, TimeUnit.SECONDS).build();
final AtomicInteger subscribeCounter = new AtomicInteger(0);
final AtomicInteger closeConsumerCounter = new AtomicInteger(0);

mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
int subscribeCount = subscribeCounter.incrementAndGet();
if (subscribeCount == 1) {
ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
// Trigger reconnect
ctx.writeAndFlush(Commands.newCloseConsumer(subscribe.getConsumerId(), -1));
} else if (subscribeCount != 2) {
// Respond to subsequent requests to prevent timeouts
ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
}
// Don't respond to the second Subscribe command to ensure timeout
});

mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> {
closeConsumerCounter.incrementAndGet();
ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId()));
});

// Create consumer (subscribe) should succeed then upon closure, it should reattempt creation. The first
// request will time out, which triggers CloseConsumer. The client might send the third Subscribe command before
// the below assertion, so we pass with 2 or 3.
client.newConsumer().topic(topic).subscriptionName("test").subscribe();
Awaitility.await().until(() -> closeConsumerCounter.get() == 1);
Awaitility.await().until(() -> subscribeCounter.get() == 2 || subscribeCounter.get() == 3);
mockBrokerService.resetHandleSubscribe();
mockBrokerService.resetHandleCloseConsumer();
}

@Test
public void testProducerFailDoesNotFailOtherProducer() throws Exception {
producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,15 @@ public void connectionOpened(final ClientCnx cnx) {
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
subscription, cnx.channel().remoteAddress());

if (e.getCause() instanceof PulsarClientException.TimeoutException) {
// Creating the consumer has timed out. We need to ensure the broker closes the consumer
// in case it was indeed created, otherwise it might prevent new create consumer operation,
// since we are not necessarily closing the connection.
long closeRequestId = client.newRequestId();
ByteBuf cmd = Commands.newCloseConsumer(consumerId, closeRequestId);
cnx.sendRequestWithId(cmd, closeRequestId);
}

if (e.getCause() instanceof PulsarClientException
&& PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
Expand Down
9 changes: 9 additions & 0 deletions site2/docs/developing-binary-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ subscription is not already there, a new one will be created.

![Consumer](assets/binary-protocol-consumer.png)

If the client does not receive a response indicating consumer creation success or failure,
the client should first send a command to close the original consumer before sending a
command to re-attempt consumer creation.

#### Flow control

After the consumer is ready, the client needs to *give permission* to the
Expand Down Expand Up @@ -419,6 +423,11 @@ Parameters:

This command behaves the same as [`CloseProducer`](#command-closeproducer)

If the client does not receive a response to a `Subscribe` command within a timeout,
the client must first send a `CloseConsumer` command before sending another
`Subscribe` command. The client does not need to await a response to the `CloseConsumer`
command before sending the next `Subscribe` command.

##### Command RedeliverUnacknowledgedMessages

A consumer can ask the broker to redeliver some or all of the pending messages
Expand Down

0 comments on commit c2bb553

Please sign in to comment.