Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ClientAPI]Fix hasMessageAvailable() #6362

Merged
merged 12 commits into from Mar 3, 2020
Expand Up @@ -46,7 +46,11 @@
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -59,6 +63,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -1396,22 +1401,83 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
Topic topic = consumer.getSubscription().getTopic();
Position position = topic.getLastMessageId();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), consumer.getSubscription().getName(), position, partitionIndex);
}
MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(((PositionImpl)position).getLedgerId())
.setEntryId(((PositionImpl)position).getEntryId())
.setPartition(partitionIndex)
.build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) position,
partitionIndex,
requestId,
consumer.getSubscription().getName());

} else {
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found"));
}
}

private void getLargestBatchIndexWhenPossible(
Topic topic,
PositionImpl position,
int partitionIndex,
long requestId,
String subscriptionName) {

PersistentTopic persistentTopic = (PersistentTopic) topic;
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();

// If it's not pointing to a valid entry, respond messageId of the current position.
if (position.getEntryId() == -1) {
MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.setPartition(partitionIndex).build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
}

// For a valid position, we read the entry out and parse the batch size from its metadata.
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should be avoiding this read by just recording the number of messages in the last batch, along with the current position.

@Override
public void readEntryComplete(Entry entry, Object ctx) {
entryFuture.complete(entry);
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
entryFuture.completeExceptionally(exception);
}
}, null);

CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
int batchSize = metadata.getNumMessagesInBatch();
entry.release();
return batchSize;
});

batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
ctx.writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError, "Failed to get batch size for entry " + e.getMessage()));
} else {
int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1;

if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), subscriptionName, position, partitionIndex);
}

MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.setPartition(partitionIndex)
.setBatchIndex(largestBatchIndex).build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
}
});
}

@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
Expand Down
Expand Up @@ -91,6 +91,17 @@ public static Object[][] variationsForResetOnLatestMsg() {
};
}

@DataProvider
public static Object[][] variationsForHasMessageAvailable() {
return new Object[][] {
// batching / start-inclusive
{true, true},
{true, false},
{false, true},
{false, false},
};
}

@Test
public void testSimpleReader() throws Exception {
Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
Expand Down Expand Up @@ -531,6 +542,68 @@ public void testMessageAvailableAfterRestart() throws Exception {

}

@Test(dataProvider = "variationsForHasMessageAvailable")
public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) throws Exception {
final String topicName = "persistent://my-property/my-ns/HasMessageAvailable";
final int numOfMessage = 100;

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic(topicName);

if (enableBatch) {
producerBuilder
.enableBatching(true)
.batchingMaxMessages(10);
} else {
producerBuilder
.enableBatching(false);
}

Producer<byte[]> producer = producerBuilder.create();

CountDownLatch latch = new CountDownLatch(100);
yjshen marked this conversation as resolved.
Show resolved Hide resolved

List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < numOfMessage; i++) {
producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
if (e != null) {
Assert.fail();
} else {
allIds.add(mid);
}
latch.countDown();
});
}

latch.await();

allIds.sort(null); // make sure the largest mid appears at last.

for (MessageId id : allIds) {
Reader<byte[]> reader;

if (startInclusive) {
reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).startMessageIdInclusive().create();
} else {
reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).create();
}

if (startInclusive) {
assertTrue(reader.hasMessageAvailable());
} else if (id != allIds.get(allIds.size() - 1)) {
assertTrue(reader.hasMessageAvailable());
} else {
assertFalse(reader.hasMessageAvailable());
}
reader.close();
}

producer.close();
}

@Test
public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
final int numOfMessage = 10;
Expand Down
Expand Up @@ -145,7 +145,9 @@ public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
while (reader.hasMessageAvailable()) {
Assert.assertTrue(keys.remove(reader.readNext().getKey()));
}
Assert.assertTrue(keys.isEmpty());
// start from latest with start message inclusive should only read the last message in batch
Assert.assertTrue(keys.size() == 9);
Assert.assertFalse(keys.contains("key9"));
Assert.assertFalse(reader.hasMessageAvailable());
}

Expand Down
Expand Up @@ -104,7 +104,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
@SuppressWarnings("unused")
private volatile int availablePermits = 0;

protected volatile MessageId lastDequeuedMessage = MessageId.earliest;
protected volatile MessageId lastDequeuedMessageId = MessageId.earliest;
private volatile MessageId lastMessageIdInBroker = MessageId.earliest;

private long subscribeTimeout;
Expand Down Expand Up @@ -188,7 +188,6 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.consumerId = client.newConsumerId();
this.subscriptionMode = conf.getSubscriptionMode();
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest : startMessageId;
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
AVAILABLE_PERMITS_UPDATER.set(this, 0);
Expand Down Expand Up @@ -695,10 +694,10 @@ private BatchMessageIdImpl clearReceiverQueue() {
}

return previousMessage;
} else if (!lastDequeuedMessage.equals(MessageId.earliest)) {
} else if (!lastDequeuedMessageId.equals(MessageId.earliest)) {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
// in the past
return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessage);
return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);
} else {
// No message was received or dequeued by this consumer. Next message would still be the startMessageId
return startMessageId;
Expand Down Expand Up @@ -1118,7 +1117,7 @@ private boolean isSameEntry(MessageIdData messageId) {
protected synchronized void messageProcessed(Message<?> msg) {
ClientCnx currentCnx = cnx();
ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx();
lastDequeuedMessage = msg.getMessageId();
lastDequeuedMessageId = msg.getMessageId();

if (msgCnx != currentCnx) {
// The processed message did belong to the old queue that was cleared after reconnection.
Expand Down Expand Up @@ -1555,17 +1554,7 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
}

public boolean hasMessageAvailable() throws PulsarClientException {
// we need to seek to the last position then the last message can be received when the resetIncludeHead
// specified.
if (lastDequeuedMessage == MessageId.latest && resetIncludeHead) {
lastDequeuedMessage = getLastMessageId();
seek(lastDequeuedMessage);
}
try {
if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
return true;
}

return hasMessageAvailableAsync().get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
Expand All @@ -1575,12 +1564,52 @@ public boolean hasMessageAvailable() throws PulsarClientException {
public CompletableFuture<Boolean> hasMessageAvailableAsync() {
final CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();

if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
booleanFuture.complete(true);
// we haven't read yet. use startMessageId for comparison
if (lastDequeuedMessageId == MessageId.earliest) {
// if we are starting from latest, we should seek to the actual last message first.
// allow the last one to be read when read head inclusively.
if (startMessageId.getLedgerId() == Long.MAX_VALUE &&
startMessageId.getEntryId() == Long.MAX_VALUE &&
startMessageId.partitionIndex == -1) {

getLastMessageIdAsync().thenApply(messageId -> seekAsync(messageId)).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
booleanFuture.completeExceptionally(e.getCause());
return null;
});

booleanFuture.complete(resetIncludeHead);
return booleanFuture;
}

if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) {
booleanFuture.complete(true);
return booleanFuture;
}

getLastMessageIdAsync().thenAccept(messageId -> {
lastMessageIdInBroker = messageId;
if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) {
booleanFuture.complete(true);
} else {
booleanFuture.complete(false);
}
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
booleanFuture.completeExceptionally(e.getCause());
return null;
});

} else {
// read before, use lastDequeueMessage for comparison
if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) {
booleanFuture.complete(true);
return booleanFuture;
}

getLastMessageIdAsync().thenAccept(messageId -> {
lastMessageIdInBroker = messageId;
if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) {
booleanFuture.complete(true);
} else {
booleanFuture.complete(false);
Expand All @@ -1591,18 +1620,22 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
return null;
});
}

return booleanFuture;
}

private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId lastDequeuedMessage) {
if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messageId, boolean inclusive) {
if (inclusive && lastMessageIdInBroker.compareTo(messageId) >= 0 &&
((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
return true;
} else {
// Make sure batching message can be read completely.
return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0
&& incomingMessages.size() > 0;
}

if (!inclusive && lastMessageIdInBroker.compareTo(messageId) > 0 &&
((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
return true;
}

return false;
}

@Override
Expand Down Expand Up @@ -1647,8 +1680,13 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept((result) -> {
log.info("[{}][{}] Successfully getLastMessageId {}:{}",
topic, subscription, result.getLedgerId(), result.getEntryId());
future.complete(new MessageIdImpl(result.getLedgerId(),
result.getEntryId(), result.getPartition()));
if (result.getBatchIndex() < 0) {
future.complete(new MessageIdImpl(result.getLedgerId(),
result.getEntryId(), result.getPartition()));
} else {
future.complete(new BatchMessageIdImpl(result.getLedgerId(),
result.getEntryId(), result.getPartition(), result.getBatchIndex()));
}
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
future.completeExceptionally(
Expand Down
Expand Up @@ -98,7 +98,7 @@ private Message<T> fetchSingleMessageFromBroker() throws PulsarClientException {
}
do {
message = incomingMessages.take();
lastDequeuedMessage = message.getMessageId();
lastDequeuedMessageId = message.getMessageId();
ClientCnx msgCnx = ((MessageImpl<?>) message).getCnx();
// synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()"
synchronized (this) {
Expand Down
Expand Up @@ -162,4 +162,7 @@ public boolean hasBase64EncodedKey() {
return msgMetadata.get().getPartitionKeyB64Encoded();
}

public int getBatchSize() {
return msgMetadata.get().getNumMessagesInBatch();
}
}