From 86b513cf137072db54c6e73a746b5ef60c45f8e5 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 17 Feb 2020 23:55:10 +0800 Subject: [PATCH 1/5] test case to reproduce the bug first --- .../pulsar/client/api/TopicReaderTest.java | 41 +++++++++++++++++++ .../pulsar/client/api/BatchReceivePolicy.java | 2 +- .../pulsar/client/impl/ConsumerBase.java | 2 +- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 69139b5d04d72..25b2f55242b4d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -30,6 +31,7 @@ import java.nio.file.Paths; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; @@ -761,4 +763,43 @@ public void testReaderBuilderConcurrentCreate() throws Exception { producers.get(i).close(); } } + + @Test + public void testReaderStartInMiddleOfBatch() throws Exception { + final String topicName = "persistent://my-property/my-ns/ReaderStartInMiddleOfBatch"; + final int numOfMessage = 100; + + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(true) + .batchingMaxMessages(10) + .create(); + + CountDownLatch latch = new CountDownLatch(100); + + List allIds = Collections.synchronizedList(new ArrayList<>()); + + for (int i = 0; i < numOfMessage; i++) { + producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> { + if (e != null) { + fail(); + } else { + allIds.add(mid); + } + latch.countDown(); + }); + } + + latch.await(); + + for (MessageId id : allIds) { + Reader reader = pulsarClient.newReader().topic(topicName) + .startMessageId(id).startMessageIdInclusive().create(); + MessageId idGot = reader.readNext().getMessageId(); + assertEquals(idGot, id); + reader.close(); + } + + producer.close(); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java index 2b9be7128c4d8..7a5a1bd19d2cc 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java @@ -46,7 +46,7 @@ public class BatchReceivePolicy { /** * Default batch receive policy. * - *

Max number of messages: 100 + *

Max number of messages: no limit * Max number of bytes: 10MB * Timeout: 100ms

*/ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 86e7fb27da751..0f5219b0a9b95 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -464,7 +464,7 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message message) { } protected boolean hasEnoughMessagesForBatchReceive() { - if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumMessages() <= 0) { + if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) { return false; } return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages()) From a17c1cd8faedd2ef884d6b61c107d1245efc6210 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 18 Feb 2020 12:12:32 +0800 Subject: [PATCH 2/5] fix consumer impl skip --- .../pulsar/client/api/TopicReaderTest.java | 55 +++++++++++++------ .../pulsar/client/impl/ConsumerImpl.java | 8 +-- 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 25b2f55242b4d..f782b1443eaa5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -33,11 +33,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RelativeTimeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,15 +70,15 @@ protected void cleanup() throws Exception { public static Object[][] variationsForExpectedPos() { return new Object[][] { // batching / start-inclusive / num-of-messages - {true, true, 10 }, - {true, false, 10 }, - {false, true, 10 }, - {false, false, 10 }, - - {true, true, 100 }, - {true, false, 100 }, - {false, true, 100 }, - {false, false, 100 }, + {true, true, 10, "1"}, + {true, true, 100, "2"}, + {true, false, 10, "3"}, + {true, false, 100, "4"}, + + {false, true, 10, "5"}, + {false, true, 100, "6"}, + {false, false, 10, "7"}, + {false, false, 100, "8"}, }; } @@ -690,9 +693,9 @@ public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception { } @Test(dataProvider = "variationsForExpectedPos") - public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages) + public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages, String suffix) throws Exception { - final String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos"; + final String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos" + suffix; final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; // First message of reset @@ -701,17 +704,33 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star .enableBatching(batching) .create(); - MessageId resetPos = null; + CountDownLatch latch = new CountDownLatch(numOfMessages); + + final AtomicReference resetPos = new AtomicReference<>(); + for (int i = 0; i < numOfMessages; i++) { - MessageId msgId = producer.send(String.format("msg num %d", i).getBytes()); - if (resetIndex == i) { - resetPos = msgId; - } + + final int j = i; + + producer.sendAsync(String.format("msg num %d", i).getBytes()) + .thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId))) + .whenComplete((p, e) -> { + if (e != null) { + fail("send msg failed due to " + e.getMessage()); + } else { + if (p.getLeft() == resetIndex) { + resetPos.set(p.getRight()); + } + } + latch.countDown(); + }); } + latch.await(); + ReaderBuilder readerBuilder = pulsarClient.newReader() .topic(topicName) - .startMessageId(resetPos); + .startMessageId(resetPos.get()); if (startInclusive) { readerBuilder.startMessageIdInclusive(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 919a280238676..819b4ac75c5e9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -867,7 +867,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade // and return undecrypted payload if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) { - if (isResetIncludedAndSameEntryLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) { + if (isNonDurableAndSameEntryAndLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) { // We need to discard entries that were prior to startMessageId if (log.isDebugEnabled()) { log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription, @@ -1018,7 +1018,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload, singleMessageMetadataBuilder, i, batchSize); - if (isResetIncludedAndSameEntryLedger(messageId) && isPriorBatchIndex(i)) { + if (isNonDurableAndSameEntryAndLedger(messageId) && isPriorBatchIndex(i)) { // If we are receiving a batch message, we need to discard messages that were prior // to the startMessageId if (log.isDebugEnabled()) { @@ -1091,8 +1091,8 @@ private boolean isPriorBatchIndex(long idx) { return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex(); } - private boolean isResetIncludedAndSameEntryLedger(MessageIdData messageId) { - return !resetIncludeHead && startMessageId != null + private boolean isNonDurableAndSameEntryAndLedger(MessageIdData messageId) { + return subscriptionMode == SubscriptionMode.NonDurable && startMessageId != null && messageId.getLedgerId() == startMessageId.getLedgerId() && messageId.getEntryId() == startMessageId.getEntryId(); } From 60af2b326663a82abfa7ec70973a568d08bd2b73 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 18 Feb 2020 14:54:06 +0800 Subject: [PATCH 3/5] fix consumer test --- .../api/SimpleProducerConsumerTest.java | 56 ++++++++++++------- .../pulsar/client/impl/ConsumerImpl.java | 2 +- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 59910a806c3ed..dd8e69099e5d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -47,6 +47,8 @@ import java.time.Clock; import java.time.Instant; import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -69,8 +71,10 @@ import java.util.stream.Collectors; import lombok.Cleanup; +import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -113,15 +117,15 @@ protected void setup() throws Exception { public static Object[][] variationsForExpectedPos() { return new Object[][] { // batching / start-inclusive / num-of-messages - {true, true, 10 }, - {true, false, 10 }, - {false, true, 10 }, - {false, false, 10 }, - - {true, true, 100 }, - {true, false, 100 }, - {false, true, 100 }, - {false, false, 100 }, + {true, true, 10, "1"}, + {true, true, 100, "2"}, + {true, false, 10, "3"}, + {true, false, 100, "4"}, + + {false, true, 10, "5"}, + {false, true, 100, "6"}, + {false, false, 10, "7"}, + {false, false, 100, "8"}, }; } @@ -1419,7 +1423,6 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex * Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message. * * - * @param batchMessageDelayMs * @throws Exception */ @Test @@ -1736,7 +1739,6 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception { * Verify: Consumer2 sends ack of Consumer1 and consumer1 should be unblock if it is blocked due to unack-messages * * - * @param batchMessageDelayMs * @throws Exception */ @Test @@ -3219,9 +3221,9 @@ public void testPartitionedTopicWithOnePartition() throws Exception { } @Test(dataProvider = "variationsForExpectedPos") - public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages) + public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages, String suffix) throws Exception { - final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos"; + final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos" + suffix; final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; // First message of reset @@ -3230,14 +3232,30 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star .enableBatching(batching) .create(); - MessageId resetPos = null; + CountDownLatch latch = new CountDownLatch(numOfMessages); + + final AtomicReference resetPos = new AtomicReference<>(); + for (int i = 0; i < numOfMessages; i++) { - MessageId msgId = producer.send(String.format("msg num %d", i).getBytes()); - if (resetIndex == i) { - resetPos = msgId; - } + + final int j = i; + + producer.sendAsync(String.format("msg num %d", i).getBytes()) + .thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId))) + .whenComplete((p, e) -> { + if (e != null) { + fail("send msg failed due to " + e.getMessage()); + } else { + if (p.getLeft() == resetIndex) { + resetPos.set(p.getRight()); + } + } + latch.countDown(); + }); } + latch.await(); + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer() .topic(topicName); @@ -3246,7 +3264,7 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star } Consumer consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe(); - consumer.seek(resetPos); + consumer.seek(resetPos.get()); Set messageSet = Sets.newHashSet(); for (int i = firstMessage; i < numOfMessages; i++) { Message message = consumer.receive(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 819b4ac75c5e9..ada18db3c2d77 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1092,7 +1092,7 @@ private boolean isPriorBatchIndex(long idx) { } private boolean isNonDurableAndSameEntryAndLedger(MessageIdData messageId) { - return subscriptionMode == SubscriptionMode.NonDurable && startMessageId != null + return startMessageId != null && messageId.getLedgerId() == startMessageId.getLedgerId() && messageId.getEntryId() == startMessageId.getEntryId(); } From c2ab1b4ca66cffdd9516e7dde90a07c96ca90ba1 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 18 Feb 2020 15:57:34 +0800 Subject: [PATCH 4/5] rename method --- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ada18db3c2d77..5ef28ec128b5f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -867,7 +867,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade // and return undecrypted payload if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) { - if (isNonDurableAndSameEntryAndLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) { + if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) { // We need to discard entries that were prior to startMessageId if (log.isDebugEnabled()) { log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription, @@ -1018,7 +1018,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload, singleMessageMetadataBuilder, i, batchSize); - if (isNonDurableAndSameEntryAndLedger(messageId) && isPriorBatchIndex(i)) { + if (isSameEntry(messageId) && isPriorBatchIndex(i)) { // If we are receiving a batch message, we need to discard messages that were prior // to the startMessageId if (log.isDebugEnabled()) { @@ -1091,7 +1091,7 @@ private boolean isPriorBatchIndex(long idx) { return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex(); } - private boolean isNonDurableAndSameEntryAndLedger(MessageIdData messageId) { + private boolean isSameEntry(MessageIdData messageId) { return startMessageId != null && messageId.getLedgerId() == startMessageId.getLedgerId() && messageId.getEntryId() == startMessageId.getEntryId(); From 91f75d86246d80591cc14bd5fdf273ad58bc2724 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 20 Feb 2020 20:16:33 +0800 Subject: [PATCH 5/5] fix seek logic --- .../api/SimpleProducerConsumerTest.java | 24 ++++++++++--------- .../pulsar/client/api/TopicReaderTest.java | 22 ++++++++--------- .../pulsar/client/impl/ConsumerImpl.java | 23 ++++++++++++++++-- 3 files changed, 45 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index dd8e69099e5d8..e907197de7b1b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -117,15 +117,15 @@ protected void setup() throws Exception { public static Object[][] variationsForExpectedPos() { return new Object[][] { // batching / start-inclusive / num-of-messages - {true, true, 10, "1"}, - {true, true, 100, "2"}, - {true, false, 10, "3"}, - {true, false, 100, "4"}, - - {false, true, 10, "5"}, - {false, true, 100, "6"}, - {false, false, 10, "7"}, - {false, false, 100, "8"}, + {true, true, 10 }, + {true, false, 10 }, + {false, true, 10 }, + {false, false, 10 }, + + {true, true, 100 }, + {true, false, 100 }, + {false, true, 100 }, + {false, false, 100 }, }; } @@ -3221,9 +3221,9 @@ public void testPartitionedTopicWithOnePartition() throws Exception { } @Test(dataProvider = "variationsForExpectedPos") - public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages, String suffix) + public void testConsumerStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages) throws Exception { - final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos" + suffix; + final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos"; final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; // First message of reset @@ -3246,6 +3246,7 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star if (e != null) { fail("send msg failed due to " + e.getMessage()); } else { + log.info("send msg with id {}", p.getRight()); if (p.getLeft() == resetIndex) { resetPos.set(p.getRight()); } @@ -3265,6 +3266,7 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star Consumer consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe(); consumer.seek(resetPos.get()); + log.info("reset cursor to {}", resetPos.get()); Set messageSet = Sets.newHashSet(); for (int i = firstMessage; i < numOfMessages; i++) { Message message = consumer.receive(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index f782b1443eaa5..afc887183cbe8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -70,15 +70,15 @@ protected void cleanup() throws Exception { public static Object[][] variationsForExpectedPos() { return new Object[][] { // batching / start-inclusive / num-of-messages - {true, true, 10, "1"}, - {true, true, 100, "2"}, - {true, false, 10, "3"}, - {true, false, 100, "4"}, - - {false, true, 10, "5"}, - {false, true, 100, "6"}, - {false, false, 10, "7"}, - {false, false, 100, "8"}, + {true, true, 10 }, + {true, false, 10 }, + {false, true, 10 }, + {false, false, 10 }, + + {true, true, 100 }, + {true, false, 100 }, + {false, true, 100 }, + {false, false, 100 }, }; } @@ -693,9 +693,9 @@ public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception { } @Test(dataProvider = "variationsForExpectedPos") - public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages, String suffix) + public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages) throws Exception { - final String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos" + suffix; + final String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos"; final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; // First message of reset diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5ef28ec128b5f..7b437ce1484c3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; @@ -122,6 +123,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final SubscriptionMode subscriptionMode; private volatile BatchMessageIdImpl startMessageId; + private volatile BatchMessageIdImpl seekMessageId; + private final AtomicBoolean duringSeek; + private final BatchMessageIdImpl initialStartMessageId; private final long startMessageRollbackDurationInSec; @@ -205,6 +209,8 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat stats = ConsumerStatsDisabled.INSTANCE; } + duringSeek = new AtomicBoolean(false); + if (conf.getAckTimeoutMillis() != 0) { if (conf.getTickDurationMillis() > 0) { this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(), @@ -667,6 +673,13 @@ private BatchMessageIdImpl clearReceiverQueue() { List> currentMessageQueue = new ArrayList<>(incomingMessages.size()); incomingMessages.drainTo(currentMessageQueue); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + + if (duringSeek.compareAndSet(true, false)) { + return seekMessageId; + } else if (subscriptionMode == SubscriptionMode.Durable) { + return null; + } + if (!currentMessageQueue.isEmpty()) { MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId(); BatchMessageIdImpl previousMessage; @@ -1477,7 +1490,10 @@ public CompletableFuture seekAsync(long timestamp) { cnx.sendRequestWithId(seek, requestId).thenRun(() -> { log.info("[{}][{}] Successfully reset subscription to publish time {}", topic, subscription, timestamp); acknowledgmentsGroupingTracker.flushAndClean(); - lastDequeuedMessage = MessageId.earliest; + + seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest); + duringSeek.set(true); + incomingMessages.clear(); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); seekFuture.complete(null); @@ -1520,7 +1536,10 @@ public CompletableFuture seekAsync(MessageId messageId) { cnx.sendRequestWithId(seek, requestId).thenRun(() -> { log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId); acknowledgmentsGroupingTracker.flushAndClean(); - lastDequeuedMessage = messageId; + + seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId); + duringSeek.set(true); + incomingMessages.clear(); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); seekFuture.complete(null);