diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bef63fc7f5b0a..4497c977036fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -46,7 +46,11 @@ import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.commons.lang3.StringUtils; @@ -59,6 +63,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.web.RestException; @@ -1396,22 +1401,83 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) Topic topic = consumer.getSubscription().getTopic(); Position position = topic.getLastMessageId(); int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - if (log.isDebugEnabled()) { - log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, - topic.getName(), consumer.getSubscription().getName(), position, partitionIndex); - } - MessageIdData messageId = MessageIdData.newBuilder() - .setLedgerId(((PositionImpl)position).getLedgerId()) - .setEntryId(((PositionImpl)position).getEntryId()) - .setPartition(partitionIndex) - .build(); - ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + getLargestBatchIndexWhenPossible( + topic, + (PositionImpl) position, + partitionIndex, + requestId, + consumer.getSubscription().getName()); + } else { ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found")); } } + private void getLargestBatchIndexWhenPossible( + Topic topic, + PositionImpl position, + int partitionIndex, + long requestId, + String subscriptionName) { + + PersistentTopic persistentTopic = (PersistentTopic) topic; + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + + // If it's not pointing to a valid entry, respond messageId of the current position. + if (position.getEntryId() == -1) { + MessageIdData messageId = MessageIdData.newBuilder() + .setLedgerId(position.getLedgerId()) + .setEntryId(position.getEntryId()) + .setPartition(partitionIndex).build(); + + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + } + + // For a valid position, we read the entry out and parse the batch size from its metadata. + CompletableFuture entryFuture = new CompletableFuture<>(); + ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + entryFuture.complete(entry); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + entryFuture.completeExceptionally(exception); + } + }, null); + + CompletableFuture batchSizeFuture = entryFuture.thenApply(entry -> { + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + int batchSize = metadata.getNumMessagesInBatch(); + entry.release(); + return batchSize; + }); + + batchSizeFuture.whenComplete((batchSize, e) -> { + if (e != null) { + ctx.writeAndFlush(Commands.newError( + requestId, ServerError.MetadataError, "Failed to get batch size for entry " + e.getMessage())); + } else { + int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1; + + if (log.isDebugEnabled()) { + log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, + topic.getName(), subscriptionName, position, partitionIndex); + } + + MessageIdData messageId = MessageIdData.newBuilder() + .setLedgerId(position.getLedgerId()) + .setEntryId(position.getEntryId()) + .setPartition(partitionIndex) + .setBatchIndex(largestBatchIndex).build(); + + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + } + }); + } + @Override protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) { final long requestId = commandGetTopicsOfNamespace.getRequestId(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 06afe9310433f..95f6f1ad7f5ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.RawBatchConverter; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -95,7 +96,10 @@ private CompletableFuture phaseOne(RawReader reader) { } else { log.info("Commencing phase one of compaction for {}, reading to {}", reader.getTopic(), lastMessageId); - phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastMessageId, latestForKey, + // Each entry is processed as a whole, discard the batchIndex part deliberately. + MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId; + MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), lastImpl.getPartitionIndex()); + phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastEntryMessageId, latestForKey, loopPromise); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 7eda4465f7ff0..552f69db57609 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -91,6 +91,17 @@ public static Object[][] variationsForResetOnLatestMsg() { }; } + @DataProvider + public static Object[][] variationsForHasMessageAvailable() { + return new Object[][] { + // batching / start-inclusive + {true, true}, + {true, false}, + {false, true}, + {false, false}, + }; + } + @Test public void testSimpleReader() throws Exception { Reader reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader") @@ -531,6 +542,68 @@ public void testMessageAvailableAfterRestart() throws Exception { } + @Test(dataProvider = "variationsForHasMessageAvailable") + public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) throws Exception { + final String topicName = "persistent://my-property/my-ns/HasMessageAvailable"; + final int numOfMessage = 100; + + ProducerBuilder producerBuilder = pulsarClient.newProducer() + .topic(topicName); + + if (enableBatch) { + producerBuilder + .enableBatching(true) + .batchingMaxMessages(10); + } else { + producerBuilder + .enableBatching(false); + } + + Producer producer = producerBuilder.create(); + + CountDownLatch latch = new CountDownLatch(numOfMessage); + + List allIds = Collections.synchronizedList(new ArrayList<>()); + + for (int i = 0; i < numOfMessage; i++) { + producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> { + if (e != null) { + Assert.fail(); + } else { + allIds.add(mid); + } + latch.countDown(); + }); + } + + latch.await(); + + allIds.sort(null); // make sure the largest mid appears at last. + + for (MessageId id : allIds) { + Reader reader; + + if (startInclusive) { + reader = pulsarClient.newReader().topic(topicName) + .startMessageId(id).startMessageIdInclusive().create(); + } else { + reader = pulsarClient.newReader().topic(topicName) + .startMessageId(id).create(); + } + + if (startInclusive) { + assertTrue(reader.hasMessageAvailable()); + } else if (id != allIds.get(allIds.size() - 1)) { + assertTrue(reader.hasMessageAvailable()); + } else { + assertFalse(reader.hasMessageAvailable()); + } + reader.close(); + } + + producer.close(); + } + @Test public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception { final int numOfMessage = 10; @@ -794,7 +867,7 @@ public void testReaderStartInMiddleOfBatch() throws Exception { .batchingMaxMessages(10) .create(); - CountDownLatch latch = new CountDownLatch(100); + CountDownLatch latch = new CountDownLatch(numOfMessage); List allIds = Collections.synchronizedList(new ArrayList<>()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 5f97a5888389f..b75cfcee0b9d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -145,7 +145,9 @@ public void testReadMessageWithBatchingWithMessageInclusive() throws Exception { while (reader.hasMessageAvailable()) { Assert.assertTrue(keys.remove(reader.readNext().getKey())); } - Assert.assertTrue(keys.isEmpty()); + // start from latest with start message inclusive should only read the last message in batch + Assert.assertTrue(keys.size() == 9); + Assert.assertFalse(keys.contains("key9")); Assert.assertFalse(reader.hasMessageAvailable()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9431496688944..ce4ee1ebc8ea8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -104,7 +104,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle @SuppressWarnings("unused") private volatile int availablePermits = 0; - protected volatile MessageId lastDequeuedMessage = MessageId.earliest; + protected volatile MessageId lastDequeuedMessageId = MessageId.earliest; private volatile MessageId lastMessageIdInBroker = MessageId.earliest; private long subscribeTimeout; @@ -188,7 +188,6 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.consumerId = client.newConsumerId(); this.subscriptionMode = conf.getSubscriptionMode(); this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null; - this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest : startMessageId; this.initialStartMessageId = this.startMessageId; this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec; AVAILABLE_PERMITS_UPDATER.set(this, 0); @@ -677,7 +676,7 @@ private BatchMessageIdImpl clearReceiverQueue() { if (duringSeek.compareAndSet(true, false)) { return seekMessageId; } else if (subscriptionMode == SubscriptionMode.Durable) { - return null; + return startMessageId; } if (!currentMessageQueue.isEmpty()) { @@ -695,10 +694,10 @@ private BatchMessageIdImpl clearReceiverQueue() { } return previousMessage; - } else if (!lastDequeuedMessage.equals(MessageId.earliest)) { + } else if (!lastDequeuedMessageId.equals(MessageId.earliest)) { // If the queue was empty we need to restart from the message just after the last one that has been dequeued // in the past - return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessage); + return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId); } else { // No message was received or dequeued by this consumer. Next message would still be the startMessageId return startMessageId; @@ -1118,7 +1117,7 @@ private boolean isSameEntry(MessageIdData messageId) { protected synchronized void messageProcessed(Message msg) { ClientCnx currentCnx = cnx(); ClientCnx msgCnx = ((MessageImpl) msg).getCnx(); - lastDequeuedMessage = msg.getMessageId(); + lastDequeuedMessageId = msg.getMessageId(); if (msgCnx != currentCnx) { // The processed message did belong to the old queue that was cleared after reconnection. @@ -1493,6 +1492,7 @@ public CompletableFuture seekAsync(long timestamp) { seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest); duringSeek.set(true); + lastDequeuedMessageId = MessageId.earliest; incomingMessages.clear(); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); @@ -1539,6 +1539,7 @@ public CompletableFuture seekAsync(MessageId messageId) { seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId); duringSeek.set(true); + lastDequeuedMessageId = MessageId.earliest; incomingMessages.clear(); INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); @@ -1555,17 +1556,7 @@ public CompletableFuture seekAsync(MessageId messageId) { } public boolean hasMessageAvailable() throws PulsarClientException { - // we need to seek to the last position then the last message can be received when the resetIncludeHead - // specified. - if (lastDequeuedMessage == MessageId.latest && resetIncludeHead) { - lastDequeuedMessage = getLastMessageId(); - seek(lastDequeuedMessage); - } try { - if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) { - return true; - } - return hasMessageAvailableAsync().get(); } catch (Exception e) { throw PulsarClientException.unwrap(e); @@ -1575,12 +1566,56 @@ public boolean hasMessageAvailable() throws PulsarClientException { public CompletableFuture hasMessageAvailableAsync() { final CompletableFuture booleanFuture = new CompletableFuture<>(); - if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) { - booleanFuture.complete(true); + // we haven't read yet. use startMessageId for comparison + if (lastDequeuedMessageId == MessageId.earliest) { + // if we are starting from latest, we should seek to the actual last message first. + // allow the last one to be read when read head inclusively. + if (startMessageId.getLedgerId() == Long.MAX_VALUE && + startMessageId.getEntryId() == Long.MAX_VALUE && + startMessageId.partitionIndex == -1) { + + getLastMessageIdAsync() + .thenCompose(this::seekAsync) + .whenComplete((ignore, e) -> { + if (e != null) { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); + booleanFuture.completeExceptionally(e.getCause()); + } else { + booleanFuture.complete(resetIncludeHead); + } + }); + + return booleanFuture; + } + + if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) { + booleanFuture.complete(true); + return booleanFuture; + } + + getLastMessageIdAsync().thenAccept(messageId -> { + lastMessageIdInBroker = messageId; + if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) { + booleanFuture.complete(true); + } else { + booleanFuture.complete(false); + } + }).exceptionally(e -> { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); + booleanFuture.completeExceptionally(e.getCause()); + return null; + }); + } else { + // read before, use lastDequeueMessage for comparison + if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { + booleanFuture.complete(true); + return booleanFuture; + } + getLastMessageIdAsync().thenAccept(messageId -> { lastMessageIdInBroker = messageId; - if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) { + if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { booleanFuture.complete(true); } else { booleanFuture.complete(false); @@ -1591,18 +1626,22 @@ public CompletableFuture hasMessageAvailableAsync() { return null; }); } + return booleanFuture; } - private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId lastDequeuedMessage) { - if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && + private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messageId, boolean inclusive) { + if (inclusive && lastMessageIdInBroker.compareTo(messageId) >= 0 && ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { return true; - } else { - // Make sure batching message can be read completely. - return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0 - && incomingMessages.size() > 0; } + + if (!inclusive && lastMessageIdInBroker.compareTo(messageId) > 0 && + ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { + return true; + } + + return false; } @Override @@ -1647,8 +1686,13 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept((result) -> { log.info("[{}][{}] Successfully getLastMessageId {}:{}", topic, subscription, result.getLedgerId(), result.getEntryId()); - future.complete(new MessageIdImpl(result.getLedgerId(), - result.getEntryId(), result.getPartition())); + if (result.getBatchIndex() < 0) { + future.complete(new MessageIdImpl(result.getLedgerId(), + result.getEntryId(), result.getPartition())); + } else { + future.complete(new BatchMessageIdImpl(result.getLedgerId(), + result.getEntryId(), result.getPartition(), result.getBatchIndex())); + } }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); future.completeExceptionally( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index 94c8dd3c56ea4..a0de0704c2d9a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -98,7 +98,7 @@ private Message fetchSingleMessageFromBroker() throws PulsarClientException { } do { message = incomingMessages.take(); - lastDequeuedMessage = message.getMessageId(); + lastDequeuedMessageId = message.getMessageId(); ClientCnx msgCnx = ((MessageImpl) message).getCnx(); // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()" synchronized (this) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java index 335bf967bf099..371f47f4054fd 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java @@ -162,4 +162,7 @@ public boolean hasBase64EncodedKey() { return msgMetadata.get().getPartitionKeyB64Encoded(); } + public int getBatchSize() { + return msgMetadata.get().getNumMessagesInBatch(); + } }