From c210b31bbc98137005e7beb6bc997533c0821ea8 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 19 Feb 2020 15:37:22 +0800 Subject: [PATCH 01/11] fix hasMessageAvailable for reader --- .../pulsar/broker/service/ServerCnx.java | 57 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 14 +++-- .../pulsar/common/api/raw/RawMessageImpl.java | 3 + 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bef63fc7f5b0a..18f49a9e3e542 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -34,11 +34,14 @@ import io.netty.channel.ChannelOption; import io.netty.handler.ssl.SslHandler; +import java.io.IOException; import java.net.SocketAddress; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -46,7 +49,11 @@ import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.commons.lang3.StringUtils; @@ -59,6 +66,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.web.RestException; @@ -68,6 +76,8 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn; +import org.apache.pulsar.common.api.raw.MessageParser; +import org.apache.pulsar.common.api.raw.RawMessageImpl; import org.apache.pulsar.common.protocol.CommandUtils; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; @@ -1396,6 +1406,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) Topic topic = consumer.getSubscription().getTopic(); Position position = topic.getLastMessageId(); int partitionIndex = TopicName.getPartitionIndex(topic.getName()); + int largestBatchIndex = getLargestBatchIndex(topic, (PositionImpl) position, requestId); if (log.isDebugEnabled()) { log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, topic.getName(), consumer.getSubscription().getName(), position, partitionIndex); @@ -1404,6 +1415,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) .setLedgerId(((PositionImpl)position).getLedgerId()) .setEntryId(((PositionImpl)position).getEntryId()) .setPartition(partitionIndex) + .setBatchIndex(largestBatchIndex) .build(); ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); @@ -1412,6 +1424,51 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) } } + private int getLargestBatchIndex(Topic topic, PositionImpl position, long requestId) { + PersistentTopic persistentTopic = (PersistentTopic) topic; + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + + CompletableFuture entryFuture = new CompletableFuture<>(); + ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + entryFuture.complete(entry); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + entryFuture.completeExceptionally(exception); + } + }, null); + + CompletableFuture batchSizeFuture = entryFuture.thenApply(entry -> { + int[] sizeHolder = new int[1]; + try { + MessageParser.parseMessage(TopicName.get(topic.getName()), entry.getLedgerId(), entry.getEntryId(), + entry.getDataBuffer(), (message) -> { + sizeHolder[0] = ((RawMessageImpl) message).getBatchSize(); + message.release(); + }, Commands.DEFAULT_MAX_MESSAGE_SIZE); + entry.release(); + } catch (IOException e) { + throw new CompletionException(e); + } + return sizeHolder[0]; + }); + + try { + int batchSize = batchSizeFuture.get(); + if (batchSize > 0) { + return batchSize - 1; + } else { + return -1; + } + } catch (InterruptedException | ExecutionException e) { + ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Failed to open entry for batch size check"));; + } + return -1; + } + @Override protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) { final long requestId = commandGetTopicsOfNamespace.getRequestId(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5b62248370c54..1bf1c4be79703 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1581,8 +1581,9 @@ private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId lastD return true; } else { // Make sure batching message can be read completely. - return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0 - && incomingMessages.size() > 0; + return resetIncludeHead ? + lastMessageIdInBroker.compareTo(lastDequeuedMessage) >= 0: + lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0; } } @@ -1628,8 +1629,13 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept((result) -> { log.info("[{}][{}] Successfully getLastMessageId {}:{}", topic, subscription, result.getLedgerId(), result.getEntryId()); - future.complete(new MessageIdImpl(result.getLedgerId(), - result.getEntryId(), result.getPartition())); + if (result.getBatchIndex() < 0) { + future.complete(new MessageIdImpl(result.getLedgerId(), + result.getEntryId(), result.getPartition())); + } else { + future.complete(new BatchMessageIdImpl(result.getLedgerId(), + result.getEntryId(), result.getPartition(), result.getBatchIndex())); + } }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); future.completeExceptionally( diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java index 335bf967bf099..371f47f4054fd 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java @@ -162,4 +162,7 @@ public boolean hasBase64EncodedKey() { return msgMetadata.get().getPartitionKeyB64Encoded(); } + public int getBatchSize() { + return msgMetadata.get().getNumMessagesInBatch(); + } } From 095f932eb55ec247ee6d1a878e7130137dfc8fd2 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 19 Feb 2020 17:46:50 +0800 Subject: [PATCH 02/11] add test case --- .../pulsar/broker/service/ServerCnx.java | 2 +- .../pulsar/client/api/TopicReaderTest.java | 72 +++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 18f49a9e3e542..ec10bfa258f4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1458,7 +1458,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { try { int batchSize = batchSizeFuture.get(); - if (batchSize > 0) { + if (batchSize > 1) { return batchSize - 1; } else { return -1; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 69139b5d04d72..44c42302a9188 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -30,6 +30,7 @@ import java.nio.file.Paths; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; @@ -86,6 +87,17 @@ public static Object[][] variationsForResetOnLatestMsg() { }; } + @DataProvider + public static Object[][] variationsForHasMessageAvailable() { + return new Object[][] { + // batching / start-inclusive + {true, true}, + {true, false}, + {false, true}, + {false, false}, + }; + } + @Test public void testSimpleReader() throws Exception { Reader reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader") @@ -526,6 +538,66 @@ public void testMessageAvailableAfterRestart() throws Exception { } + @Test(dataProvider = "variationsForHasMessageAvailable") + public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) throws Exception { + final String topicName = "persistent://my-property/my-ns/HasMessageAvailable"; + final int numOfMessage = 100; + + ProducerBuilder producerBuilder = pulsarClient.newProducer() + .topic(topicName); + + if (enableBatch) { + producerBuilder + .enableBatching(true) + .batchingMaxMessages(10); + } else { + producerBuilder + .enableBatching(false); + } + + Producer producer = producerBuilder.create(); + + CountDownLatch latch = new CountDownLatch(100); + + List allIds = Collections.synchronizedList(new ArrayList<>()); + + for (int i = 0; i < numOfMessage; i++) { + producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> { + if (e != null) { + Assert.fail(); + } else { + allIds.add(mid); + } + latch.countDown(); + }); + } + + latch.await(); + + for (MessageId id : allIds) { + Reader reader; + + if (startInclusive) { + reader = pulsarClient.newReader().topic(topicName) + .startMessageId(id).startMessageIdInclusive().create(); + } else { + reader = pulsarClient.newReader().topic(topicName) + .startMessageId(id).create(); + } + + if (startInclusive) { + assertTrue(reader.hasMessageAvailable()); + } else if (id != allIds.get(allIds.size() - 1)) { + assertTrue(reader.hasMessageAvailable()); + } else { + assertFalse(reader.hasMessageAvailable()); + } + reader.close(); + } + + producer.close(); + } + @Test public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception { final int numOfMessage = 10; From 55fd071c8e1420aa70df4d78ad02ef3da95c71dc Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 19 Feb 2020 17:52:10 +0800 Subject: [PATCH 03/11] sort id to make test deterministic --- .../test/java/org/apache/pulsar/client/api/TopicReaderTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 44c42302a9188..2fd1d112f8ca6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -574,6 +574,8 @@ public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) latch.await(); + allIds.sort(null); // make sure the largest mid appears at last. + for (MessageId id : allIds) { Reader reader; From 11102e0abefb2393f381fbfe97ecce1c2b588418 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 20 Feb 2020 00:08:03 +0800 Subject: [PATCH 04/11] fix comments --- .../apache/pulsar/broker/service/ServerCnx.java | 16 ++++------------ .../apache/pulsar/client/impl/ConsumerImpl.java | 9 ++++++--- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index ec10bfa258f4e..c9bc1e770be50 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1442,18 +1442,10 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { }, null); CompletableFuture batchSizeFuture = entryFuture.thenApply(entry -> { - int[] sizeHolder = new int[1]; - try { - MessageParser.parseMessage(TopicName.get(topic.getName()), entry.getLedgerId(), entry.getEntryId(), - entry.getDataBuffer(), (message) -> { - sizeHolder[0] = ((RawMessageImpl) message).getBatchSize(); - message.release(); - }, Commands.DEFAULT_MAX_MESSAGE_SIZE); - entry.release(); - } catch (IOException e) { - throw new CompletionException(e); - } - return sizeHolder[0]; + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + int batchSize = metadata.getNumMessagesInBatch(); + entry.release(); + return batchSize; }); try { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 1bf1c4be79703..28f52bb24b6d7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1576,9 +1576,12 @@ public CompletableFuture hasMessageAvailableAsync() { } private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId lastDequeuedMessage) { - if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && - ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { - return true; + if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0) { + if (((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { + return true; + } else { + return false; + } } else { // Make sure batching message can be read completely. return resetIncludeHead ? From 9b6e600f28d3fa5faf16543a2e02b855705b511a Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 20 Feb 2020 12:10:55 +0800 Subject: [PATCH 05/11] get batch size async --- .../pulsar/broker/service/ServerCnx.java | 71 ++++++++++++------- 1 file changed, 44 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c9bc1e770be50..4497c977036fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -34,14 +34,11 @@ import io.netty.channel.ChannelOption; import io.netty.handler.ssl.SslHandler; -import java.io.IOException; import java.net.SocketAddress; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -76,8 +73,6 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn; -import org.apache.pulsar.common.api.raw.MessageParser; -import org.apache.pulsar.common.api.raw.RawMessageImpl; import org.apache.pulsar.common.protocol.CommandUtils; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; @@ -1406,28 +1401,40 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) Topic topic = consumer.getSubscription().getTopic(); Position position = topic.getLastMessageId(); int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - int largestBatchIndex = getLargestBatchIndex(topic, (PositionImpl) position, requestId); - if (log.isDebugEnabled()) { - log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, - topic.getName(), consumer.getSubscription().getName(), position, partitionIndex); - } - MessageIdData messageId = MessageIdData.newBuilder() - .setLedgerId(((PositionImpl)position).getLedgerId()) - .setEntryId(((PositionImpl)position).getEntryId()) - .setPartition(partitionIndex) - .setBatchIndex(largestBatchIndex) - .build(); - ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + getLargestBatchIndexWhenPossible( + topic, + (PositionImpl) position, + partitionIndex, + requestId, + consumer.getSubscription().getName()); + } else { ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found")); } } - private int getLargestBatchIndex(Topic topic, PositionImpl position, long requestId) { + private void getLargestBatchIndexWhenPossible( + Topic topic, + PositionImpl position, + int partitionIndex, + long requestId, + String subscriptionName) { + PersistentTopic persistentTopic = (PersistentTopic) topic; ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + // If it's not pointing to a valid entry, respond messageId of the current position. + if (position.getEntryId() == -1) { + MessageIdData messageId = MessageIdData.newBuilder() + .setLedgerId(position.getLedgerId()) + .setEntryId(position.getEntryId()) + .setPartition(partitionIndex).build(); + + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + } + + // For a valid position, we read the entry out and parse the batch size from its metadata. CompletableFuture entryFuture = new CompletableFuture<>(); ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override @@ -1448,17 +1455,27 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { return batchSize; }); - try { - int batchSize = batchSizeFuture.get(); - if (batchSize > 1) { - return batchSize - 1; + batchSizeFuture.whenComplete((batchSize, e) -> { + if (e != null) { + ctx.writeAndFlush(Commands.newError( + requestId, ServerError.MetadataError, "Failed to get batch size for entry " + e.getMessage())); } else { - return -1; + int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1; + + if (log.isDebugEnabled()) { + log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, + topic.getName(), subscriptionName, position, partitionIndex); + } + + MessageIdData messageId = MessageIdData.newBuilder() + .setLedgerId(position.getLedgerId()) + .setEntryId(position.getEntryId()) + .setPartition(partitionIndex) + .setBatchIndex(largestBatchIndex).build(); + + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); } - } catch (InterruptedException | ExecutionException e) { - ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Failed to open entry for batch size check"));; - } - return -1; + }); } @Override From 2278a9b9cf4d4a7aaaae7fa110170095e5622afa Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 25 Feb 2020 13:36:40 +0800 Subject: [PATCH 06/11] move logic to async --- .../apache/pulsar/client/impl/ReaderTest.java | 4 +- .../pulsar/client/impl/ConsumerImpl.java | 89 ++++++++++++------- .../client/impl/ZeroQueueConsumerImpl.java | 2 +- 3 files changed, 63 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 5f97a5888389f..b75cfcee0b9d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -145,7 +145,9 @@ public void testReadMessageWithBatchingWithMessageInclusive() throws Exception { while (reader.hasMessageAvailable()) { Assert.assertTrue(keys.remove(reader.readNext().getKey())); } - Assert.assertTrue(keys.isEmpty()); + // start from latest with start message inclusive should only read the last message in batch + Assert.assertTrue(keys.size() == 9); + Assert.assertFalse(keys.contains("key9")); Assert.assertFalse(reader.hasMessageAvailable()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a99608be4bb78..f1331c7438f0a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -104,7 +104,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle @SuppressWarnings("unused") private volatile int availablePermits = 0; - protected volatile MessageId lastDequeuedMessage = MessageId.earliest; + protected volatile MessageId lastDequeuedMessageId = MessageId.earliest; private volatile MessageId lastMessageIdInBroker = MessageId.earliest; private long subscribeTimeout; @@ -188,7 +188,6 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.consumerId = client.newConsumerId(); this.subscriptionMode = conf.getSubscriptionMode(); this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null; - this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest : startMessageId; this.initialStartMessageId = this.startMessageId; this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec; AVAILABLE_PERMITS_UPDATER.set(this, 0); @@ -695,10 +694,10 @@ private BatchMessageIdImpl clearReceiverQueue() { } return previousMessage; - } else if (!lastDequeuedMessage.equals(MessageId.earliest)) { + } else if (!lastDequeuedMessageId.equals(MessageId.earliest)) { // If the queue was empty we need to restart from the message just after the last one that has been dequeued // in the past - return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessage); + return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId); } else { // No message was received or dequeued by this consumer. Next message would still be the startMessageId return startMessageId; @@ -1118,7 +1117,7 @@ private boolean isSameEntry(MessageIdData messageId) { protected synchronized void messageProcessed(Message msg) { ClientCnx currentCnx = cnx(); ClientCnx msgCnx = ((MessageImpl) msg).getCnx(); - lastDequeuedMessage = msg.getMessageId(); + lastDequeuedMessageId = msg.getMessageId(); if (msgCnx != currentCnx) { // The processed message did belong to the old queue that was cleared after reconnection. @@ -1555,17 +1554,7 @@ public CompletableFuture seekAsync(MessageId messageId) { } public boolean hasMessageAvailable() throws PulsarClientException { - // we need to seek to the last position then the last message can be received when the resetIncludeHead - // specified. - if (lastDequeuedMessage == MessageId.latest && resetIncludeHead) { - lastDequeuedMessage = getLastMessageId(); - seek(lastDequeuedMessage); - } try { - if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) { - return true; - } - return hasMessageAvailableAsync().get(); } catch (Exception e) { throw PulsarClientException.unwrap(e); @@ -1575,12 +1564,52 @@ public boolean hasMessageAvailable() throws PulsarClientException { public CompletableFuture hasMessageAvailableAsync() { final CompletableFuture booleanFuture = new CompletableFuture<>(); - if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) { - booleanFuture.complete(true); + // we haven't read yet. use startMessageId for comparison + if (lastDequeuedMessageId == MessageId.earliest) { + // if we are starting from latest, we should seek to the actual last message first. + // allow the last one to be read when read head inclusively. + if (startMessageId.getLedgerId() == Long.MAX_VALUE && + startMessageId.getEntryId() == Long.MAX_VALUE && + startMessageId.partitionIndex == -1) { + + getLastMessageIdAsync().thenApply(messageId -> seekAsync(messageId)).exceptionally(e -> { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); + booleanFuture.completeExceptionally(e.getCause()); + return null; + }); + + booleanFuture.complete(resetIncludeHead); + return booleanFuture; + } + + if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) { + booleanFuture.complete(true); + return booleanFuture; + } + + getLastMessageIdAsync().thenAccept(messageId -> { + lastMessageIdInBroker = messageId; + if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) { + booleanFuture.complete(true); + } else { + booleanFuture.complete(false); + } + }).exceptionally(e -> { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); + booleanFuture.completeExceptionally(e.getCause()); + return null; + }); + } else { + // read before, use lastDequeueMessage for comparison + if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { + booleanFuture.complete(true); + return booleanFuture; + } + getLastMessageIdAsync().thenAccept(messageId -> { lastMessageIdInBroker = messageId; - if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) { + if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { booleanFuture.complete(true); } else { booleanFuture.complete(false); @@ -1591,22 +1620,22 @@ public CompletableFuture hasMessageAvailableAsync() { return null; }); } + return booleanFuture; } - private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId lastDequeuedMessage) { - if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0) { - if (((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { - return true; - } else { - return false; - } - } else { - // Make sure batching message can be read completely. - return resetIncludeHead ? - lastMessageIdInBroker.compareTo(lastDequeuedMessage) >= 0: - lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0; + private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messageId, boolean inclusive) { + if (inclusive && lastMessageIdInBroker.compareTo(messageId) >= 0 && + ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { + return true; + } + + if (!inclusive && lastMessageIdInBroker.compareTo(messageId) > 0 && + ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { + return true; } + + return false; } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index 94c8dd3c56ea4..a0de0704c2d9a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -98,7 +98,7 @@ private Message fetchSingleMessageFromBroker() throws PulsarClientException { } do { message = incomingMessages.take(); - lastDequeuedMessage = message.getMessageId(); + lastDequeuedMessageId = message.getMessageId(); ClientCnx msgCnx = ((MessageImpl) message).getCnx(); // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()" synchronized (this) { From 8f7d4d5a3b003cc11423b35484fc1c7c01a4e2eb Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 25 Feb 2020 13:54:31 +0800 Subject: [PATCH 07/11] reset lastDequeue to earliest when seek --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index f1331c7438f0a..8193a46591533 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1492,6 +1492,7 @@ public CompletableFuture seekAsync(long timestamp) { seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest); duringSeek.set(true); + lastDequeuedMessageId = MessageId.earliest; incomingMessages.clear(); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); @@ -1538,6 +1539,7 @@ public CompletableFuture seekAsync(MessageId messageId) { seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId); duringSeek.set(true); + lastDequeuedMessageId = MessageId.earliest; incomingMessages.clear(); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); From 8c895f17677fa7cff68b499efdd154754cee2d1b Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 25 Feb 2020 14:21:28 +0800 Subject: [PATCH 08/11] resolve comments --- .../apache/pulsar/client/api/TopicReaderTest.java | 4 ++-- .../apache/pulsar/client/impl/ConsumerImpl.java | 14 ++++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 0afb1ae1b90f3..552f69db57609 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -561,7 +561,7 @@ public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) Producer producer = producerBuilder.create(); - CountDownLatch latch = new CountDownLatch(100); + CountDownLatch latch = new CountDownLatch(numOfMessage); List allIds = Collections.synchronizedList(new ArrayList<>()); @@ -867,7 +867,7 @@ public void testReaderStartInMiddleOfBatch() throws Exception { .batchingMaxMessages(10) .create(); - CountDownLatch latch = new CountDownLatch(100); + CountDownLatch latch = new CountDownLatch(numOfMessage); List allIds = Collections.synchronizedList(new ArrayList<>()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 8193a46591533..113bb8530a728 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1574,13 +1574,15 @@ public CompletableFuture hasMessageAvailableAsync() { startMessageId.getEntryId() == Long.MAX_VALUE && startMessageId.partitionIndex == -1) { - getLastMessageIdAsync().thenApply(messageId -> seekAsync(messageId)).exceptionally(e -> { - log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); - booleanFuture.completeExceptionally(e.getCause()); - return null; - }); + getLastMessageIdAsync() + .thenCompose(this::seekAsync) + .thenApply(ignore -> booleanFuture.complete(resetIncludeHead)) + .exceptionally(e -> { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); + booleanFuture.completeExceptionally(e.getCause()); + return null; + }); - booleanFuture.complete(resetIncludeHead); return booleanFuture; } From 3724b449bbf0295c14fa38026bd5a2e7e7460211 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 26 Feb 2020 09:48:12 +0800 Subject: [PATCH 09/11] fix compactor test --- .../org/apache/pulsar/compaction/TwoPhaseCompactor.java | 6 +++++- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 06afe9310433f..51ee32a869dda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.RawBatchConverter; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -95,7 +96,10 @@ private CompletableFuture phaseOne(RawReader reader) { } else { log.info("Commencing phase one of compaction for {}, reading to {}", reader.getTopic(), lastMessageId); - phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastMessageId, latestForKey, + // Each entry is processed as a whole, discard the batchIndex part deliberately + MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId; + MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), lastImpl.getPartitionIndex()); + phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastEntryMessageId, latestForKey, loopPromise); } }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 113bb8530a728..14f835727561b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -676,7 +676,7 @@ private BatchMessageIdImpl clearReceiverQueue() { if (duringSeek.compareAndSet(true, false)) { return seekMessageId; } else if (subscriptionMode == SubscriptionMode.Durable) { - return null; + return startMessageId; } if (!currentMessageQueue.isEmpty()) { From 5af7a1252ab272bee50f257d5830486aa84f99a1 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 26 Feb 2020 11:01:31 +0800 Subject: [PATCH 10/11] retrigger --- .../org/apache/pulsar/client/impl/ConsumerImpl.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 14f835727561b..ce4ee1ebc8ea8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1576,11 +1576,13 @@ public CompletableFuture hasMessageAvailableAsync() { getLastMessageIdAsync() .thenCompose(this::seekAsync) - .thenApply(ignore -> booleanFuture.complete(resetIncludeHead)) - .exceptionally(e -> { - log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); - booleanFuture.completeExceptionally(e.getCause()); - return null; + .whenComplete((ignore, e) -> { + if (e != null) { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); + booleanFuture.completeExceptionally(e.getCause()); + } else { + booleanFuture.complete(resetIncludeHead); + } }); return booleanFuture; From c3a94e2d831e40f524ddc0d32e47e0c5b6f50a1b Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 26 Feb 2020 11:48:32 +0800 Subject: [PATCH 11/11] rerun tests --- .../java/org/apache/pulsar/compaction/TwoPhaseCompactor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 51ee32a869dda..95f6f1ad7f5ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -96,7 +96,7 @@ private CompletableFuture phaseOne(RawReader reader) { } else { log.info("Commencing phase one of compaction for {}, reading to {}", reader.getTopic(), lastMessageId); - // Each entry is processed as a whole, discard the batchIndex part deliberately + // Each entry is processed as a whole, discard the batchIndex part deliberately. MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId; MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), lastImpl.getPartitionIndex()); phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastEntryMessageId, latestForKey,