Skip to content

Commit 75e5cb3

Browse files
lhotarisrinath-ctds
authored andcommitted
[fix][test] Fix flaky BrokerServiceChaosTest (apache#24162)
(cherry picked from commit b8a7be4)
1 parent d049244 commit 75e5cb3

File tree

3 files changed

+39
-27
lines changed

3 files changed

+39
-27
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ public void testBookieInfoIsCorrectEvenIfLostNotificationDueToZKClientReconnect(
5555
for (int i = 0; i < numberOfBookies - 1; i++){
5656
bkEnsemble.stopBK(i);
5757
}
58-
makeLocalMetadataStoreKeepReconnect();
58+
startLocalMetadataStoreConnectionTermination();
5959
for (int i = 0; i < numberOfBookies - 1; i++){
6060
bkEnsemble.startBK(i);
6161
}
6262
// Sleep 100ms to lose the notifications of ZK node create.
6363
Thread.sleep(100);
64-
stopLocalMetadataStoreAlwaysReconnect();
64+
stopLocalMetadataStoreConnectionTermination();
6565

6666
// Ensure broker still works.
6767
admin.topics().unload(topicName);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020

2121
import static org.testng.Assert.assertEquals;
2222
import java.nio.charset.StandardCharsets;
23-
import java.util.UUID;
23+
import lombok.Cleanup;
2424
import lombok.extern.slf4j.Slf4j;
25+
import org.apache.pulsar.broker.BrokerTestUtil;
2526
import org.apache.pulsar.common.naming.TopicName;
2627
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
2728
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -57,9 +58,10 @@ public void cleanup() throws Exception {
5758
public void testFetchPartitionedTopicMetadataWithCacheRefresh() throws Exception {
5859
final String configMetadataStoreConnectString =
5960
WhiteboxImpl.getInternalState(pulsar.getConfigurationMetadataStore(), "zkConnectString");
61+
@Cleanup
6062
final ZooKeeper anotherZKCli = new ZooKeeper(configMetadataStoreConnectString, 5000, null);
6163
// Set policy of auto create topic to PARTITIONED.
62-
final String ns = defaultTenant + "/ns_" + UUID.randomUUID().toString().replaceAll("-", "");
64+
final String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns");
6365
final TopicName topicName1 = TopicName.get("persistent://" + ns + "/tp1");
6466
final TopicName topicName2 = TopicName.get("persistent://" + ns + "/tp2");
6567
admin.namespaces().createNamespace(ns);
@@ -81,11 +83,11 @@ public void testFetchPartitionedTopicMetadataWithCacheRefresh() throws Exception
8183

8284
// Create the partitioned metadata by another zk client.
8385
// Make a error to make the cache could not update.
84-
makeLocalMetadataStoreKeepReconnect();
86+
startLocalMetadataStoreConnectionTermination();
8587
anotherZKCli.create("/admin/partitioned-topics/" + ns + "/persistent/" + topicName2.getLocalName(),
8688
"{\"partitions\":3}".getBytes(StandardCharsets.UTF_8),
8789
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
88-
stopLocalMetadataStoreAlwaysReconnect();
90+
stopLocalMetadataStoreConnectionTermination();
8991

9092
// Get the partitioned metadata from cache, there is 90% chance that partitions count of metadata is 0.
9193
PartitionedTopicMetadata partitionedTopicMetadata2 =
@@ -97,9 +99,5 @@ public void testFetchPartitionedTopicMetadataWithCacheRefresh() throws Exception
9799
PartitionedTopicMetadata partitionedTopicMetadata3 =
98100
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2, true).get();
99101
assertEquals(partitionedTopicMetadata3.partitions, 3);
100-
101-
// cleanup.
102-
admin.topics().deletePartitionedTopic(topicName2.toString());
103-
anotherZKCli.close();
104102
}
105103
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.nio.channels.SelectionKey;
2626
import java.util.Collections;
2727
import java.util.Optional;
28+
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.atomic.AtomicBoolean;
2930
import lombok.extern.slf4j.Slf4j;
3031
import org.apache.pulsar.broker.PulsarService;
@@ -65,7 +66,8 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
6566
protected PulsarClient client;
6667
protected ZooKeeper localZkOfBroker;
6768
protected Object localMetaDataStoreClientCnx;
68-
protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal = new AtomicBoolean();
69+
protected final AtomicBoolean connectionTerminationThreadKeepRunning = new AtomicBoolean();
70+
private volatile Thread connectionTerminationThread;
6971

7072
protected void startZKAndBK() throws Exception {
7173
// Start ZK.
@@ -95,54 +97,66 @@ protected void startBrokers() throws Exception {
9597
client = PulsarClient.builder().serviceUrl(url.toString()).build();
9698
}
9799

98-
protected void makeLocalMetadataStoreKeepReconnect() throws Exception {
99-
if (!LocalMetadataStoreInReconnectFinishSignal.compareAndSet(false, true)) {
100-
throw new RuntimeException("Local metadata store is already keeping reconnect");
100+
protected void startLocalMetadataStoreConnectionTermination() throws Exception {
101+
if (!connectionTerminationThreadKeepRunning.compareAndSet(false, true)) {
102+
throw new RuntimeException("Local metadata store connection is already being terminated");
101103
}
104+
CompletableFuture<Void> future = new CompletableFuture<>();
102105
if (localMetaDataStoreClientCnx.getClass().getSimpleName().equals("ClientCnxnSocketNIO")) {
103-
makeLocalMetadataStoreKeepReconnectNIO();
106+
startNIOImplTermination(future);
104107
} else {
105108
// ClientCnxnSocketNetty.
106-
makeLocalMetadataStoreKeepReconnectNetty();
109+
startNettyImplTermination(future);
107110
}
111+
// wait until connection is closed at least once
112+
future.get();
108113
}
109114

110-
protected void makeLocalMetadataStoreKeepReconnectNIO() {
111-
new Thread(() -> {
112-
while (LocalMetadataStoreInReconnectFinishSignal.get()) {
115+
private void startNIOImplTermination(CompletableFuture<Void> future) {
116+
connectionTerminationThread = new Thread(() -> {
117+
while (connectionTerminationThreadKeepRunning.get()) {
113118
try {
114119
SelectionKey sockKey = WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "sockKey");
115120
if (sockKey != null) {
116121
sockKey.channel().close();
122+
future.complete(null);
117123
}
118124
// Prevents high cpu usage.
119125
Thread.sleep(5);
120126
} catch (Exception e) {
121127
log.error("Try close the ZK connection of local metadata store failed: {}", e.toString());
122128
}
123129
}
124-
}).start();
130+
});
131+
connectionTerminationThread.start();
125132
}
126133

127-
protected void makeLocalMetadataStoreKeepReconnectNetty() {
128-
new Thread(() -> {
129-
while (LocalMetadataStoreInReconnectFinishSignal.get()) {
134+
private void startNettyImplTermination(CompletableFuture<Void> future) {
135+
connectionTerminationThread = new Thread(() -> {
136+
while (connectionTerminationThreadKeepRunning.get()) {
130137
try {
131138
Channel channel = WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "channel");
132139
if (channel != null) {
133140
channel.close();
141+
future.complete(null);
134142
}
135143
// Prevents high cpu usage.
136144
Thread.sleep(5);
137145
} catch (Exception e) {
138146
log.error("Try close the ZK connection of local metadata store failed: {}", e.toString());
139147
}
140148
}
141-
}).start();
149+
});
150+
connectionTerminationThread.start();
142151
}
143152

144-
protected void stopLocalMetadataStoreAlwaysReconnect() {
145-
LocalMetadataStoreInReconnectFinishSignal.set(false);
153+
protected void stopLocalMetadataStoreConnectionTermination() throws InterruptedException {
154+
connectionTerminationThreadKeepRunning.set(false);
155+
if (connectionTerminationThread != null) {
156+
// Wait for the reconnect thread to finish.
157+
connectionTerminationThread.join();
158+
connectionTerminationThread = null;
159+
}
146160
}
147161

148162
protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
@@ -205,7 +219,7 @@ protected void cleanup() throws Exception {
205219
markCurrentSetupNumberCleaned();
206220
log.info("--- Shutting down ---");
207221

208-
stopLocalMetadataStoreAlwaysReconnect();
222+
stopLocalMetadataStoreConnectionTermination();
209223

210224
// Stop brokers.
211225
if (client != null) {

0 commit comments

Comments
 (0)