From 7913fe5614fed853f91199f658fe2584f10e38b9 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Thu, 18 Aug 2022 14:00:35 +0800 Subject: [PATCH] [Fix][Client] Fixed cnx channel Inactive causing the request fail to time out and fail to return (#17051) --- .../pulsar/client/impl/ClientCnxTest.java | 100 ++++++++++++++++++ .../apache/pulsar/client/impl/ClientCnx.java | 10 +- 2 files changed, 106 insertions(+), 4 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java new file mode 100644 index 0000000000000..7f2d11fd3adf5 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import com.google.common.collect.Sets; +import io.netty.channel.ChannelHandlerContext; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +public class ClientCnxTest extends MockedPulsarServiceBaseTest { + + public static final String CLUSTER_NAME = "test"; + public static final String TENANT = "tnx"; + public static final String NAMESPACE = TENANT + "/ns1"; + public static String persistentTopic = "persistent://" + NAMESPACE + "/test"; + ExecutorService executorService = Executors.newFixedThreadPool(20); + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder() + .serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant(TENANT, + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); + admin.namespaces().createNamespace(NAMESPACE); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + this.executorService.shutdown(); + } + + @Test + public void testRemoveAndHandlePendingRequestInCnx() throws Exception { + + String subName = "sub"; + int operationTimes = 5000; + CountDownLatch countDownLatch = new CountDownLatch(operationTimes); + + Consumer consumer = pulsarClient.newConsumer() + .topic(persistentTopic) + .subscriptionName(subName) + .subscribe(); + + new Thread(() -> { + for (int i = 0; i < operationTimes; i++) { + executorService.submit(() -> { + consumer.getLastMessageIdAsync().whenComplete((ignore, exception) -> { + countDownLatch.countDown(); + }); + }); + } + }).start(); + + for (int i = 0; i < operationTimes; i++) { + ClientCnx cnx = ((ConsumerImpl) consumer).getClientCnx(); + if (cnx != null) { + ChannelHandlerContext context = cnx.ctx(); + if (context != null) { + cnx.ctx().close(); + } + } + } + + Awaitility.await().until(() -> { + countDownLatch.await(); + return true; + }); + + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 9e1b34c974c3b..eb39fe53f1ab6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -290,7 +290,11 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { "Disconnected from server at " + ctx.channel().remoteAddress()); // Fail out all the pending ops - pendingRequests.forEach((key, future) -> future.completeExceptionally(e)); + pendingRequests.forEach((key, future) -> { + if (pendingRequests.remove(key, future) && !future.isDone()) { + future.completeExceptionally(e); + } + }); waitingLookupRequests.forEach(pair -> pair.getRight().getRight().completeExceptionally(e)); // Notify all attached producers/consumers so they have a chance to reconnect @@ -299,7 +303,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { transactionMetaStoreHandlers.forEach((id, handler) -> handler.connectionClosed(this)); topicListWatchers.forEach((__, watcher) -> watcher.connectionClosed(this)); - pendingRequests.clear(); waitingLookupRequests.clear(); producers.clear(); @@ -900,8 +903,7 @@ private void sendRequestAndHandleTimeout(ByteBuf requestMessage, long reques if (flush) { ctx.writeAndFlush(requestMessage).addListener(writeFuture -> { if (!writeFuture.isSuccess()) { - CompletableFuture newFuture = pendingRequests.remove(requestId); - if (newFuture != null && !newFuture.isDone()) { + if (pendingRequests.remove(requestId, future) && !future.isDone()) { log.warn("{} Failed to send {} to broker: {}", ctx.channel(), requestType.getDescription(), writeFuture.cause().getMessage()); future.completeExceptionally(writeFuture.cause());