Skip to content

Commit 5ecdcaa

Browse files
3paccccccsrinath-ctds
authored andcommitted
[fix][test]fix flaky SimpleProducerConsumerTest.testReceiveAsyncCompletedWhenClosing (apache#24858)
(cherry picked from commit 1ca1797) (cherry picked from commit ec89455)
1 parent 16e8ae4 commit 5ecdcaa

File tree

1 file changed

+19
-6
lines changed

1 file changed

+19
-6
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4082,14 +4082,18 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
40824082
// 1) Test receiveAsync is interrupted
40834083
CountDownLatch countDownLatch = new CountDownLatch(1);
40844084
new Thread(() -> {
4085+
CountDownLatch subCountDownLatch = new CountDownLatch(1);
40854086
try {
40864087
new Thread(() -> {
40874088
try {
4089+
subCountDownLatch.await();
40884090
consumer.close();
4089-
} catch (PulsarClientException ignore) {
4091+
} catch (PulsarClientException | InterruptedException ignore) {
40904092
}
40914093
}).start();
4092-
consumer.receiveAsync().get();
4094+
CompletableFuture<Message<String>> futhre = consumer.receiveAsync();
4095+
subCountDownLatch.countDown();
4096+
futhre.get();
40934097
Assert.fail("should be interrupted");
40944098
} catch (Exception e) {
40954099
Assert.assertTrue(e.getMessage().contains(errorMsg));
@@ -4106,13 +4110,17 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
41064110
.batchReceivePolicy(batchReceivePolicy).subscribe();
41074111
new Thread(() -> {
41084112
try {
4113+
CountDownLatch subCountDownLatch = new CountDownLatch(1);
41094114
new Thread(() -> {
41104115
try {
4116+
subCountDownLatch.await();
41114117
consumer2.close();
4112-
} catch (PulsarClientException ignore) {
4118+
} catch (PulsarClientException | InterruptedException ignore) {
41134119
}
41144120
}).start();
4115-
consumer2.batchReceiveAsync().get();
4121+
CompletableFuture<Messages<String>> future = consumer2.batchReceiveAsync();
4122+
subCountDownLatch.countDown();
4123+
future.get();
41164124
Assert.fail("should be interrupted");
41174125
} catch (Exception e) {
41184126
Assert.assertTrue(e.getMessage().contains(errorMsg));
@@ -4129,13 +4137,18 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
41294137
.batchReceivePolicy(batchReceivePolicy).subscribe();
41304138
new Thread(() -> {
41314139
try {
4140+
CountDownLatch subCountDownLatch = new CountDownLatch(1);
41324141
new Thread(() -> {
41334142
try {
4143+
subCountDownLatch.await();
41344144
partitionedTopicConsumer.close();
4135-
} catch (PulsarClientException ignore) {
4145+
} catch (PulsarClientException | InterruptedException ignore) {
41364146
}
41374147
}).start();
4138-
partitionedTopicConsumer.batchReceiveAsync().get();
4148+
CompletableFuture<Messages<String>> future =
4149+
partitionedTopicConsumer.batchReceiveAsync();
4150+
subCountDownLatch.countDown();
4151+
future.get();
41394152
Assert.fail("should be interrupted");
41404153
} catch (Exception e) {
41414154
Assert.assertTrue(e.getMessage().contains(errorMsg));

0 commit comments

Comments
 (0)