Skip to content

Commit 3ffcbfc

Browse files
shibdsrinath-ctds
authored andcommitted
[fix][client] Fix consumer leak when thread is interrupted before subscribe completes (apache#24100)
(cherry picked from commit e51a639) (cherry picked from commit 9d8e385)
1 parent 83ba1ab commit 3ffcbfc

File tree

2 files changed

+118
-1
lines changed

2 files changed

+118
-1
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.pulsar.broker.BrokerTestUtil;
25+
import org.apache.pulsar.broker.service.BrokerService;
26+
import org.apache.pulsar.client.api.ProducerConsumerBase;
27+
import org.apache.pulsar.client.api.PulsarClientException;
28+
import org.apache.pulsar.common.naming.TopicName;
29+
import org.awaitility.Awaitility;
30+
import org.testng.Assert;
31+
import org.testng.annotations.AfterClass;
32+
import org.testng.annotations.BeforeClass;
33+
import org.testng.annotations.Test;
34+
35+
@Slf4j
36+
@Test(groups = "broker-api")
37+
public class ConsumerCloseTest extends ProducerConsumerBase {
38+
39+
@BeforeClass
40+
@Override
41+
protected void setup() throws Exception {
42+
super.internalSetup();
43+
super.producerBaseSetup();
44+
}
45+
46+
@AfterClass(alwaysRun = true)
47+
@Override
48+
protected void cleanup() throws Exception {
49+
super.internalCleanup();
50+
}
51+
52+
@Test
53+
public void testInterruptedWhenCreateConsumer() throws InterruptedException {
54+
55+
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
56+
String subName = "test-sub";
57+
String mlCursorPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + subName;
58+
59+
// Make create cursor delay 1s
60+
CountDownLatch topicLoadLatch = new CountDownLatch(1);
61+
for (int i = 0; i < 5; i++) {
62+
mockZooKeeper.delay(1000, (op, path) -> {
63+
if (mlCursorPath.equals(path)) {
64+
topicLoadLatch.countDown();
65+
return true;
66+
}
67+
return false;
68+
});
69+
}
70+
71+
Thread startConsumer = new Thread(() -> {
72+
try {
73+
pulsarClient.newConsumer()
74+
.topic(tpName)
75+
.subscriptionName(subName)
76+
.subscribe();
77+
Assert.fail("Should have thrown an exception");
78+
} catch (PulsarClientException e) {
79+
Assert.assertTrue(e.getCause() instanceof InterruptedException);
80+
}
81+
});
82+
startConsumer.start();
83+
topicLoadLatch.await();
84+
startConsumer.interrupt();
85+
86+
PulsarClientImpl clientImpl = (PulsarClientImpl) pulsarClient;
87+
Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
88+
Assert.assertEquals(clientImpl.consumersCount(), 0);
89+
});
90+
}
91+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import lombok.AccessLevel;
3232
import lombok.Getter;
3333
import lombok.NonNull;
34+
import lombok.extern.slf4j.Slf4j;
3435
import org.apache.commons.lang3.StringUtils;
3536
import org.apache.pulsar.client.admin.PulsarAdminException;
3637
import org.apache.pulsar.client.api.BatchReceivePolicy;
@@ -62,13 +63,15 @@
6263
import org.apache.pulsar.common.naming.TopicName;
6364
import org.apache.pulsar.common.util.FutureUtil;
6465

66+
@Slf4j
6567
@Getter(AccessLevel.PUBLIC)
6668
public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
6769

6870
private final PulsarClientImpl client;
6971
private ConsumerConfigurationData<T> conf;
7072
private final Schema<T> schema;
7173
private List<ConsumerInterceptor<T>> interceptorList;
74+
private volatile boolean interruptedBeforeConsumerCreation;
7275

7376
private static final long MIN_ACK_TIMEOUT_MILLIS = 1000;
7477
private static final long MIN_TICK_TIME_MILLIS = 100;
@@ -98,8 +101,31 @@ public ConsumerBuilder<T> clone() {
98101

99102
@Override
100103
public Consumer<T> subscribe() throws PulsarClientException {
104+
CompletableFuture<Consumer<T>> future = new CompletableFuture<>();
101105
try {
102-
return subscribeAsync().get();
106+
subscribeAsync().whenComplete((c, e) -> {
107+
if (e != null) {
108+
// If the subscription fails, there is no need to close the consumer here,
109+
// as it will be handled in the subscribeAsync method.
110+
future.completeExceptionally(e);
111+
return;
112+
}
113+
if (interruptedBeforeConsumerCreation) {
114+
c.closeAsync().exceptionally(closeEx -> {
115+
log.error("Failed to close consumer after interruption", closeEx.getCause());
116+
return null;
117+
});
118+
future.completeExceptionally(new PulsarClientException(
119+
"Subscription was interrupted before the consumer could be fully created"));
120+
} else {
121+
future.complete(c);
122+
}
123+
});
124+
return future.get();
125+
} catch (InterruptedException e) {
126+
interruptedBeforeConsumerCreation = true;
127+
Thread.currentThread().interrupt();
128+
throw PulsarClientException.unwrap(e);
103129
} catch (Exception e) {
104130
throw PulsarClientException.unwrap(e);
105131
}

0 commit comments

Comments
 (0)