Skip to content

Commit

Permalink
[fix] [client] fix reader.hasMessageAvailable return false when incom…
Browse files Browse the repository at this point in the history
…ing queue is not empty (apache#21259)

Reproduce steps:
- Create a reader.
- Reader pulls messages into `incoming queue`, do not call `reader.readNext` now.
- Trim ledger task will delete the ledgers, then there is no in the topic.
- Now, you can get messages if you call `reader.readNext`, but the method `reader.hasMessageAvailable` return `false`

Note: the similar issue of `MultiTopicsConsumerImpl` has been fixed by apache#13332, current PR only trying to fix the issue of `ConsumerImpl`.

Make `reader.hasMessageAvailable` return `true` when `incoming queue` is not empty.

(cherry picked from commit 6d82b09)
  • Loading branch information
poorbarcode committed Oct 7, 2023
1 parent f243925 commit 38c3f0c
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,34 @@
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.assertTrue;
import java.lang.reflect.Field;
import java.util.UUID;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -45,7 +55,7 @@

@Test(groups = "broker-api")
@Slf4j
public class NonDurableSubscriptionTest extends ProducerConsumerBase {
public class NonDurableSubscriptionTest extends ProducerConsumerBase {

private final AtomicInteger numFlow = new AtomicInteger(0);

Expand Down Expand Up @@ -297,4 +307,79 @@ public void testFlowCountForMultiTopics() throws Exception {

assertEquals(numFlow.get(), numPartitions);
}

private void trimLedgers(final String tpName) {
// Wait for topic loading.
org.awaitility.Awaitility.await().untilAsserted(() -> {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get();
assertNotNull(persistentTopic);
});
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
CompletableFuture<Void> trimLedgersTask = new CompletableFuture<>();
ml.trimConsumedLedgersInBackground(trimLedgersTask);
trimLedgersTask.join();
}

private void switchLedgerManually(final String tpName) throws Exception {
Method ledgerClosed =
ManagedLedgerImpl.class.getDeclaredMethod("ledgerClosed", new Class[]{LedgerHandle.class});
Method createLedgerAfterClosed =
ManagedLedgerImpl.class.getDeclaredMethod("createLedgerAfterClosed", new Class[0]);
ledgerClosed.setAccessible(true);
createLedgerAfterClosed.setAccessible(true);

// Wait for topic create.
org.awaitility.Awaitility.await().untilAsserted(() -> {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get();
assertNotNull(persistentTopic);
});

// Switch ledger.
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
LedgerHandle currentLedger1 = WhiteboxImpl.getInternalState(ml, "currentLedger");
ledgerClosed.invoke(ml, new Object[]{currentLedger1});
createLedgerAfterClosed.invoke(ml, new Object[0]);
Awaitility.await().untilAsserted(() -> {
LedgerHandle currentLedger2 = WhiteboxImpl.getInternalState(ml, "currentLedger");
assertNotEquals(currentLedger1.getId(), currentLedger2.getId());
});
}

@Test
public void testTrimLedgerIfNoDurableCursor() throws Exception {
final String nonDurableCursor = "non-durable-cursor";
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
.subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
MessageIdImpl msgSent = (MessageIdImpl) producer.send("1");

// Trigger switch ledger.
// Trigger a trim ledgers task, and verify trim ledgers successful.
switchLedgerManually(topicName);
trimLedgers(topicName);

// Since there is one message in the incoming queue, so the method "reader.hasMessageAvailable" should return
// true.
boolean hasMessageAvailable = reader.hasMessageAvailable();
Message<String> msgReceived = reader.readNext(2, TimeUnit.SECONDS);
if (msgReceived == null) {
assertFalse(hasMessageAvailable);
} else {
log.info("receive msg: {}", msgReceived.getValue());
assertTrue(hasMessageAvailable);
assertEquals(msgReceived.getValue(), "1");
}

// cleanup.
reader.close();
producer.close();
admin.topics().delete(topicName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2318,6 +2318,10 @@ public boolean hasMessageAvailable() throws PulsarClientException {
public CompletableFuture<Boolean> hasMessageAvailableAsync() {
final CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();

if (incomingMessages != null && !incomingMessages.isEmpty()) {
return CompletableFuture.completedFuture(true);
}

// we haven't read yet. use startMessageId for comparison
if (lastDequeuedMessageId == MessageId.earliest) {
// if we are starting from latest, we should seek to the actual last message first.
Expand Down

0 comments on commit 38c3f0c

Please sign in to comment.