Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java Client] Send CloseProducer on timeout #13161

Merged
merged 2 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -170,6 +171,7 @@ private void producerCreateFailAfterRetryTimeout(String topic) throws Exception
PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
.operationTimeout(1, TimeUnit.SECONDS).build();
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger closeProducerCounter = new AtomicInteger(0);

mockBrokerService.setHandleProducer((ctx, producer) -> {
if (counter.incrementAndGet() == 2) {
Expand All @@ -182,15 +184,69 @@ private void producerCreateFailAfterRetryTimeout(String topic) throws Exception
ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.ServiceNotReady, "msg"));
});

mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
closeProducerCounter.incrementAndGet();
});

try {
client.newProducer().topic(topic).create();
fail("Should have failed");
} catch (Exception e) {
// we fail even on the retriable error
assertTrue(e instanceof PulsarClientException);
}
// There is a small race condition here because the producer's timeout both fails the client creation
// and triggers sending CloseProducer.
Awaitility.await().until(() -> closeProducerCounter.get() == 1);
mockBrokerService.resetHandleProducer();
mockBrokerService.resetHandleCloseProducer();
}

@Test
public void testCreatedProducerSendsCloseProducerAfterTimeout() throws Exception {
producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1");
}

@Test
public void testCreatedPartitionedProducerSendsCloseProducerAfterTimeout() throws Exception {
producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1");
}

private void producerCreatedThenFailsRetryTimeout(String topic) throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
.operationTimeout(1, TimeUnit.SECONDS).build();
final AtomicInteger producerCounter = new AtomicInteger(0);
final AtomicInteger closeProducerCounter = new AtomicInteger(0);

mockBrokerService.setHandleProducer((ctx, producer) -> {
int producerCount = producerCounter.incrementAndGet();
if (producerCount == 1) {
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "producer1",
SchemaVersion.Empty));
// Trigger reconnect
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1));
} else if (producerCount != 2) {
// Respond to subsequent requests to prevent timeouts
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "producer1",
SchemaVersion.Empty));
}
// Don't respond to the second Producer command to ensure timeout
});

mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
closeProducerCounter.incrementAndGet();
ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
});

// Create producer should succeed then upon closure, it should reattempt creation. The first request will
// timeout, which triggers CloseProducer. The client might send send the third Producer command before the
// below assertion, so we pass with 2 or 3.
client.newProducer().topic(topic).create();
Awaitility.await().until(() -> closeProducerCounter.get() == 1);
Awaitility.await().until(() -> producerCounter.get() == 2 || producerCounter.get() == 3);
mockBrokerService.resetHandleProducer();
mockBrokerService.resetHandleCloseProducer();
}

@Test
Expand Down Expand Up @@ -491,7 +547,6 @@ public void testPartitionedProducerFailOnInitialization() throws Throwable {

mockBrokerService.resetHandleProducer();
mockBrokerService.resetHandleCloseProducer();
client.close();
}

// failed to connect to partition at sending step if a producer which connects to broker as lazy-loading mode
Expand Down Expand Up @@ -552,7 +607,6 @@ public void testPartitionedProducerFailOnSending() throws Throwable {

mockBrokerService.resetHandleProducer();
mockBrokerService.resetHandleCloseProducer();
client.close();
}

// if a producer which doesn't connect as lazy-loading mode fails to connect while creating partitioned producer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,16 @@ public void connectionOpened(final ClientCnx cnx) {
cnx.channel().close();
return null;
}

if (cause instanceof TimeoutException) {
// Creating the producer has timed out. We need to ensure the broker closes the producer
// in case it was indeed created, otherwise it might prevent new create producer operation,
// since we are not necessarily closing the connection.
long closeRequestId = client.newRequestId();
ByteBuf cmd = Commands.newCloseProducer(producerId, closeRequestId);
cnx.sendRequestWithId(cmd, closeRequestId);
}

if (cause instanceof PulsarClientException.ProducerFencedException) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Failed to create producer: {}",
Expand All @@ -1484,7 +1494,7 @@ public void connectionOpened(final ClientCnx cnx) {
} else {
log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage());
}
// Close the producer since topic does not exists.
// Close the producer since topic does not exist.
if (cause instanceof PulsarClientException.TopicDoesNotExistException) {
closeAsync().whenComplete((v, ex) -> {
if (ex != null) {
Expand Down