diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 59910a806c3ed..e907197de7b1b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -47,6 +47,8 @@ import java.time.Clock; import java.time.Instant; import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -69,8 +71,10 @@ import java.util.stream.Collectors; import lombok.Cleanup; +import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -1419,7 +1423,6 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex * Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message. * * - * @param batchMessageDelayMs * @throws Exception */ @Test @@ -1736,7 +1739,6 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception { * Verify: Consumer2 sends ack of Consumer1 and consumer1 should be unblock if it is blocked due to unack-messages * * - * @param batchMessageDelayMs * @throws Exception */ @Test @@ -3219,7 +3221,7 @@ public void testPartitionedTopicWithOnePartition() throws Exception { } @Test(dataProvider = "variationsForExpectedPos") - public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages) + public void testConsumerStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages) throws Exception { final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos"; final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset @@ -3230,14 +3232,31 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star .enableBatching(batching) .create(); - MessageId resetPos = null; + CountDownLatch latch = new CountDownLatch(numOfMessages); + + final AtomicReference resetPos = new AtomicReference<>(); + for (int i = 0; i < numOfMessages; i++) { - MessageId msgId = producer.send(String.format("msg num %d", i).getBytes()); - if (resetIndex == i) { - resetPos = msgId; - } + + final int j = i; + + producer.sendAsync(String.format("msg num %d", i).getBytes()) + .thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId))) + .whenComplete((p, e) -> { + if (e != null) { + fail("send msg failed due to " + e.getMessage()); + } else { + log.info("send msg with id {}", p.getRight()); + if (p.getLeft() == resetIndex) { + resetPos.set(p.getRight()); + } + } + latch.countDown(); + }); } + latch.await(); + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer() .topic(topicName); @@ -3246,7 +3265,8 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star } Consumer consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe(); - consumer.seek(resetPos); + consumer.seek(resetPos.get()); + log.info("reset cursor to {}", resetPos.get()); Set messageSet = Sets.newHashSet(); for (int i = firstMessage; i < numOfMessages; i++) { Message message = consumer.receive(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 69139b5d04d72..afc887183cbe8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -30,12 +31,16 @@ import java.nio.file.Paths; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RelativeTimeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -699,17 +704,33 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star .enableBatching(batching) .create(); - MessageId resetPos = null; + CountDownLatch latch = new CountDownLatch(numOfMessages); + + final AtomicReference resetPos = new AtomicReference<>(); + for (int i = 0; i < numOfMessages; i++) { - MessageId msgId = producer.send(String.format("msg num %d", i).getBytes()); - if (resetIndex == i) { - resetPos = msgId; - } + + final int j = i; + + producer.sendAsync(String.format("msg num %d", i).getBytes()) + .thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId))) + .whenComplete((p, e) -> { + if (e != null) { + fail("send msg failed due to " + e.getMessage()); + } else { + if (p.getLeft() == resetIndex) { + resetPos.set(p.getRight()); + } + } + latch.countDown(); + }); } + latch.await(); + ReaderBuilder readerBuilder = pulsarClient.newReader() .topic(topicName) - .startMessageId(resetPos); + .startMessageId(resetPos.get()); if (startInclusive) { readerBuilder.startMessageIdInclusive(); @@ -761,4 +782,43 @@ public void testReaderBuilderConcurrentCreate() throws Exception { producers.get(i).close(); } } + + @Test + public void testReaderStartInMiddleOfBatch() throws Exception { + final String topicName = "persistent://my-property/my-ns/ReaderStartInMiddleOfBatch"; + final int numOfMessage = 100; + + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(true) + .batchingMaxMessages(10) + .create(); + + CountDownLatch latch = new CountDownLatch(100); + + List allIds = Collections.synchronizedList(new ArrayList<>()); + + for (int i = 0; i < numOfMessage; i++) { + producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> { + if (e != null) { + fail(); + } else { + allIds.add(mid); + } + latch.countDown(); + }); + } + + latch.await(); + + for (MessageId id : allIds) { + Reader reader = pulsarClient.newReader().topic(topicName) + .startMessageId(id).startMessageIdInclusive().create(); + MessageId idGot = reader.readNext().getMessageId(); + assertEquals(idGot, id); + reader.close(); + } + + producer.close(); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java index 2b9be7128c4d8..7a5a1bd19d2cc 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java @@ -46,7 +46,7 @@ public class BatchReceivePolicy { /** * Default batch receive policy. * - *

Max number of messages: 100 + *

Max number of messages: no limit * Max number of bytes: 10MB * Timeout: 100ms

*/ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 86e7fb27da751..0f5219b0a9b95 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -464,7 +464,7 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message message) { } protected boolean hasEnoughMessagesForBatchReceive() { - if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumMessages() <= 0) { + if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) { return false; } return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages()) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5b62248370c54..ec6d4ca43ee24 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; @@ -123,6 +124,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final SubscriptionMode subscriptionMode; private volatile BatchMessageIdImpl startMessageId; + private volatile BatchMessageIdImpl seekMessageId; + private final AtomicBoolean duringSeek; + private final BatchMessageIdImpl initialStartMessageId; private final long startMessageRollbackDurationInSec; @@ -205,6 +209,8 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat stats = ConsumerStatsDisabled.INSTANCE; } + duringSeek = new AtomicBoolean(false); + if (conf.getAckTimeoutMillis() != 0) { if (conf.getTickDurationMillis() > 0) { this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(), @@ -667,6 +673,13 @@ private BatchMessageIdImpl clearReceiverQueue() { List> currentMessageQueue = new ArrayList<>(incomingMessages.size()); incomingMessages.drainTo(currentMessageQueue); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + + if (duringSeek.compareAndSet(true, false)) { + return seekMessageId; + } else if (subscriptionMode == SubscriptionMode.Durable) { + return null; + } + if (!currentMessageQueue.isEmpty()) { MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId(); BatchMessageIdImpl previousMessage; @@ -867,7 +880,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade // and return undecrypted payload if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) { - if (isResetIncludedAndSameEntryLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) { + if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) { // We need to discard entries that were prior to startMessageId if (log.isDebugEnabled()) { log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription, @@ -1018,7 +1031,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload, singleMessageMetadataBuilder, i, batchSize); - if (isResetIncludedAndSameEntryLedger(messageId) && isPriorBatchIndex(i)) { + if (isSameEntry(messageId) && isPriorBatchIndex(i)) { // If we are receiving a batch message, we need to discard messages that were prior // to the startMessageId if (log.isDebugEnabled()) { @@ -1091,8 +1104,8 @@ private boolean isPriorBatchIndex(long idx) { return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex(); } - private boolean isResetIncludedAndSameEntryLedger(MessageIdData messageId) { - return !resetIncludeHead && startMessageId != null + private boolean isSameEntry(MessageIdData messageId) { + return startMessageId != null && messageId.getLedgerId() == startMessageId.getLedgerId() && messageId.getEntryId() == startMessageId.getEntryId(); } @@ -1477,7 +1490,10 @@ public CompletableFuture seekAsync(long timestamp) { cnx.sendRequestWithId(seek, requestId).thenRun(() -> { log.info("[{}][{}] Successfully reset subscription to publish time {}", topic, subscription, timestamp); acknowledgmentsGroupingTracker.flushAndClean(); - lastDequeuedMessage = MessageId.earliest; + + seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest); + duringSeek.set(true); + incomingMessages.clear(); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); seekFuture.complete(null); @@ -1520,7 +1536,10 @@ public CompletableFuture seekAsync(MessageId messageId) { cnx.sendRequestWithId(seek, requestId).thenRun(() -> { log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId); acknowledgmentsGroupingTracker.flushAndClean(); - lastDequeuedMessage = messageId; + + seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId); + duringSeek.set(true); + incomingMessages.clear(); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); seekFuture.complete(null);