Skip to content

Commit

Permalink
[pulsar-client] Fix multi topic reader has message available behavior (
Browse files Browse the repository at this point in the history
…#13332)

### Motivation

When we use a multi-topic reader, the `hasMessageAvailable` method might have the wrong behavior, since the multi-topics consumer receives all messages from the single-topic consumer, the single-topic consumer `hasMessageAvailable` might always be `false` (The lastDequeuedMessageId reach to the end of the queue, all message enqueue to multi-topic consumer's `incomingMessages` queue).

We should check the multi-topics consumer  `incomingMessages` size > 0 when calling `hasMessageAvailable `.

### Modifications

1. Add a check of `incomingMessages` size > 0
2. Add units test `testHasMessageAvailableAsync` to verify the behavior.

(cherry picked from commit 6c7dcc0)
  • Loading branch information
Demogorgon314 authored and zymap committed Dec 23, 2021
1 parent 05d88f1 commit d0eac9a
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -43,6 +47,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -61,6 +66,7 @@
import org.testng.annotations.Test;

@Test(groups = "flaky")
@Slf4j
public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {

private static final String subscription = "reader-multi-topics-sub";
Expand Down Expand Up @@ -121,6 +127,67 @@ public void testReadMessageWithBatching() throws Exception {
testReadMessages(topic, true);
}

@Test(timeOut = 10000)
public void testHasMessageAvailableAsync() throws Exception {
String topic = "persistent://my-property/my-ns/testHasMessageAvailableAsync";
String content = "my-message-";
int msgNum = 10;
admin.topics().createPartitionedTopic(topic, 2);
// stop retention from cleaning up
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();

try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic).readCompacted(true)
.startMessageId(MessageId.earliest).create()) {
Assert.assertFalse(reader.hasMessageAvailable());
Assert.assertFalse(reader.hasMessageAvailableAsync().get(10, TimeUnit.SECONDS));
}

try (Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic).startMessageId(MessageId.earliest).create()) {
try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create()) {
for (int i = 0; i < msgNum; i++) {
producer.newMessage().key(content + i)
.value((content + i).getBytes(StandardCharsets.UTF_8)).send();
}
}
// Should have message available
Assert.assertTrue(reader.hasMessageAvailableAsync().get());
try {
// Should have message available too
Assert.assertTrue(reader.hasMessageAvailable());
} catch (PulsarClientException e) {
fail("Expect success but failed.", e);
}
List<Message<byte[]>> msgs = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(1);
readMessageUseAsync(reader, msgs, latch);
latch.await();
Assert.assertEquals(msgs.size(), msgNum);
}
}

private static <T> void readMessageUseAsync(Reader<T> reader, List<Message<T>> msgs, CountDownLatch latch) {
reader.hasMessageAvailableAsync().thenAccept(hasMessageAvailable -> {
if (hasMessageAvailable) {
try {
Message<T> msg = reader.readNext();
msgs.add(msg);
} catch (PulsarClientException e) {
log.error("Read message failed.", e);
latch.countDown();
return;
}
readMessageUseAsync(reader, msgs, latch);
} else {
latch.countDown();
}
}).exceptionally(throwable -> {
log.error("Read message failed.", throwable);
latch.countDown();
return null;
});
}

@Test(timeOut = 10000)
public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,9 @@ public boolean hasMessageAvailable() throws PulsarClientException {
}

public CompletableFuture<Boolean> hasMessageAvailableAsync() {
if (numMessagesInQueue() > 0) {
return CompletableFuture.completedFuture(true);
}
List<CompletableFuture<Void>> futureList = new ArrayList<>();
final AtomicBoolean hasMessageAvailable = new AtomicBoolean(false);
for (ConsumerImpl<T> consumer : consumers.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public boolean hasReachedEndOfTopic() {

@Override
public boolean hasMessageAvailable() throws PulsarClientException {
return multiTopicsConsumer.hasMessageAvailable() || multiTopicsConsumer.numMessagesInQueue() > 0;
return multiTopicsConsumer.hasMessageAvailable();
}

@Override
Expand Down

0 comments on commit d0eac9a

Please sign in to comment.