Skip to content

Commit

Permalink
Revert "Fixed hasMessageAvailable() with empty topic (#9652)"
Browse files Browse the repository at this point in the history
This reverts commit a1ef8a8.
  • Loading branch information
zymap committed Mar 4, 2021
1 parent 41a7e78 commit 58a07d0
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -73,7 +72,6 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
Expand Down Expand Up @@ -1511,19 +1509,12 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
long requestId = getLastMessageId.getRequestId();

Topic topic = consumer.getSubscription().getTopic();
Position lastPosition = topic.getLastPosition();
Position position = topic.getLastPosition();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());

Position markDeletePosition = null;
if (consumer.getSubscription() instanceof PersistentSubscription) {
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
}

getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) lastPosition,
(PositionImpl) markDeletePosition,
(PositionImpl) position,
partitionIndex,
requestId,
consumer.getSubscription().getName());
Expand All @@ -1535,8 +1526,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)

private void getLargestBatchIndexWhenPossible(
Topic topic,
PositionImpl lastPosition,
PositionImpl markDeletePosition,
PositionImpl position,
int partitionIndex,
long requestId,
String subscriptionName) {
Expand All @@ -1545,26 +1535,19 @@ private void getLargestBatchIndexWhenPossible(
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();

// If it's not pointing to a valid entry, respond messageId of the current position.
if (lastPosition.getEntryId() == -1) {
if (position.getEntryId() == -1) {
MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(lastPosition.getLedgerId())
.setEntryId(lastPosition.getEntryId())
.setPartition(partitionIndex).build();
MessageIdData consumerMarkDeletePosition = null;
if (markDeletePosition != null) {
consumerMarkDeletePosition = MessageIdData.newBuilder()
.setLedgerId(markDeletePosition.getLedgerId())
.setEntryId(markDeletePosition.getEntryId()).build();
}
.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.setPartition(partitionIndex).build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId,
Optional.ofNullable(consumerMarkDeletePosition)));
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
return;
}

// For a valid position, we read the entry out and parse the batch size from its metadata.
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
entryFuture.complete(entry);
Expand Down Expand Up @@ -1592,22 +1575,16 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {

if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), subscriptionName, lastPosition, partitionIndex);
topic.getName(), subscriptionName, position, partitionIndex);
}

MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(lastPosition.getLedgerId())
.setEntryId(lastPosition.getEntryId())
.setPartition(largestBatchIndex).build();
MessageIdData consumerMarkDeletePosition = null;
if (markDeletePosition != null) {
consumerMarkDeletePosition = MessageIdData.newBuilder()
.setLedgerId(markDeletePosition.getLedgerId())
.setEntryId(markDeletePosition.getEntryId()).build();
}
.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.setPartition(partitionIndex)
.setBatchIndex(largestBatchIndex).build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId,
Optional.ofNullable(consumerMarkDeletePosition)));
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.Sets;

import java.lang.reflect.Method;
import java.util.Random;
import java.util.Set;

import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand Down Expand Up @@ -62,9 +61,4 @@ protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T rece
Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
}

private static final Random random = new Random();

protected String newTopicName() {
return "my-property/my-ns/topic-" + Long.toHexString(random.nextLong());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import lombok.Cleanup;

import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
Expand Down Expand Up @@ -1082,14 +1080,14 @@ public void testHasMessageAvailableWithBatch() throws Exception {
.topic(topicName).create();

//For batch-messages with single message, the type of client messageId should be the same as that of broker
MessageIdImpl messageId = (MessageIdImpl) producer.send("msg".getBytes());
MessageId messageId = producer.send("msg".getBytes());
assertTrue(messageId instanceof MessageIdImpl);
ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).startMessageIdInclusive().create();
MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
MessageId lastMsgId = reader.getConsumer().getLastMessageId();
assertTrue(lastMsgId instanceof BatchMessageIdImpl);
assertTrue(messageId instanceof BatchMessageIdImpl);
assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
assertEquals(lastMsgId, messageId);
reader.close();

CountDownLatch latch = new CountDownLatch(numOfMessage);
Expand Down Expand Up @@ -1127,7 +1125,7 @@ public void testHasMessageAvailableWithBatch() throws Exception {
//For non-batch message, the type of client messageId should be the same as that of broker
producer = pulsarClient.newProducer()
.enableBatching(false).topic(topicName).create();
messageId = (MessageIdImpl) producer.send("non-batch".getBytes());
messageId = producer.send("non-batch".getBytes());
assertFalse(messageId instanceof BatchMessageIdImpl);
assertTrue(messageId instanceof MessageIdImpl);
reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName)
Expand Down Expand Up @@ -1557,59 +1555,4 @@ public void testReaderStartInMiddleOfBatch() throws Exception {

producer.close();
}

@Test
public void testHasMessageAvailableOnEmptyTopic() throws Exception {
String topic = newTopicName();

@Cleanup
Reader<String> r1 = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.earliest)
.create();

@Cleanup
Reader<String> r2 = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.latest)
.create();

@Cleanup
Reader<String> r2Inclusive = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.latest)
.startMessageIdInclusive()
.create();

// no data write, should return false
assertFalse(r1.hasMessageAvailable());
assertFalse(r2.hasMessageAvailable());
assertFalse(r2Inclusive.hasMessageAvailable());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

producer.send("hello-1");
assertTrue(r1.hasMessageAvailable());
assertTrue(r2.hasMessageAvailable());
assertTrue(r2Inclusive.hasMessageAvailable());

@Cleanup
Reader<String> r3 = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.latest)
.create();


assertFalse(r3.hasMessageAvailable());

producer.send("hello-2");

assertTrue(r1.hasMessageAvailable());
assertTrue(r2.hasMessageAvailable());
assertTrue(r2Inclusive.hasMessageAvailable());
assertTrue(r3.hasMessageAvailable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -455,10 +455,9 @@ protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse suc
log.debug("{} Received success GetLastMessageId response from server: {}", ctx.channel(), success.getRequestId());
}
long requestId = success.getRequestId();
CompletableFuture<CommandGetLastMessageIdResponse> requestFuture =
(CompletableFuture<CommandGetLastMessageIdResponse>) pendingRequests.remove(requestId);
CompletableFuture<MessageIdData> requestFuture = (CompletableFuture<MessageIdData>) pendingRequests.remove(requestId);
if (requestFuture != null) {
requestFuture.complete(success);
requestFuture.complete(success.getLastMessageId());
} else {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId());
}
Expand Down Expand Up @@ -803,7 +802,7 @@ private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMess
return future;
}

public CompletableFuture<CommandGetLastMessageIdResponse> sendGetLastMessageId(ByteBuf request, long requestId) {
public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf request, long requestId) {
return sendRequestAndHandleTimeout(request, requestId, RequestType.GetLastMessageId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.apache.pulsar.common.protocol.Commands.readChecksum;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -2036,42 +2035,25 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
if (lastDequeuedMessageId == MessageId.earliest) {
// if we are starting from latest, we should seek to the actual last message first.
// allow the last one to be read when read head inclusively.
if (startMessageId.equals(MessageId.latest)) {

CompletableFuture<GetLastMessageIdResponse> future = internalGetLastMessageIdAsync();
// if the consumer is configured to read inclusive then we need to seek to the last message
if (resetIncludeHead) {
future = future.thenCompose((lastMessageIdResponse) ->
seekAsync(lastMessageIdResponse.lastMessageId)
.thenApply((ignore) -> lastMessageIdResponse));
}

future.thenAccept(response -> {
MessageIdImpl lastMessageId = MessageIdImpl.convertToMessageIdImpl(response.lastMessageId);
MessageIdImpl markDeletePosition = MessageIdImpl
.convertToMessageIdImpl(response.markDeletePosition);

if (markDeletePosition != null) {
// we only care about comparing ledger ids and entry ids as mark delete position doesn't have other ids such as batch index
int result = ComparisonChain.start()
.compare(markDeletePosition.getLedgerId(), lastMessageId.getLedgerId())
.compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId())
.result();
if (lastMessageId.getEntryId() < 0) {
booleanFuture.complete(false);
} else {
booleanFuture.complete(resetIncludeHead ? result <= 0 : result < 0);
}
} else if (lastMessageId == null || lastMessageId.getEntryId() < 0) {
booleanFuture.complete(false);
} else {
booleanFuture.complete(resetIncludeHead);
}
}).exceptionally(ex -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription, ex);
booleanFuture.completeExceptionally(ex.getCause());
return null;
});
if (startMessageId.getLedgerId() == Long.MAX_VALUE &&
startMessageId.getEntryId() == Long.MAX_VALUE &&
startMessageId.partitionIndex == -1) {

getLastMessageIdAsync()
.thenCompose((msgId) -> seekAsync(msgId).thenApply((ignore) -> msgId))
.whenComplete((msgId, e) -> {
if (e != null) {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
booleanFuture.completeExceptionally(e.getCause());
return;
}
MessageIdImpl messageId = MessageIdImpl.convertToMessageIdImpl(msgId);
if (messageId == null || messageId.getEntryId() < 0) {
booleanFuture.complete(false);
} else {
booleanFuture.complete(resetIncludeHead);
}
});

return booleanFuture;
}
Expand Down Expand Up @@ -2132,22 +2114,8 @@ private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messa
return false;
}

private static final class GetLastMessageIdResponse {
final MessageId lastMessageId;
final MessageId markDeletePosition;

GetLastMessageIdResponse(MessageId lastMessageId, MessageId markDeletePosition) {
this.lastMessageId = lastMessageId;
this.markDeletePosition = markDeletePosition;
}
}

@Override
public CompletableFuture<MessageId> getLastMessageIdAsync() {
return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId);
}

public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException(
Expand All @@ -2162,15 +2130,15 @@ public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create();

CompletableFuture<GetLastMessageIdResponse> getLastMessageIdFuture = new CompletableFuture<>();
CompletableFuture<MessageId> getLastMessageIdFuture = new CompletableFuture<>();

internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture);
return getLastMessageIdFuture;
}

private void internalGetLastMessageIdAsync(final Backoff backoff,
final AtomicLong remainingTime,
CompletableFuture<GetLastMessageIdResponse> future) {
CompletableFuture<MessageId> future) {
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) {
Expand All @@ -2185,23 +2153,16 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
ByteBuf getLastIdCmd = Commands.newGetLastMessageId(consumerId, requestId);
log.info("[{}][{}] Get topic last message Id", topic, subscription);

cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(cmd -> {
MessageIdData lastMessageId = cmd.getLastMessageId();
MessageIdImpl markDeletePosition = null;
if (cmd.hasConsumerMarkDeletePosition()) {
markDeletePosition = new MessageIdImpl(cmd.getConsumerMarkDeletePosition().getLedgerId(),
cmd.getConsumerMarkDeletePosition().getEntryId(), -1);
}
cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept((result) -> {
log.info("[{}][{}] Successfully getLastMessageId {}:{}",
topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId());

MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0 ?
new MessageIdImpl(lastMessageId.getLedgerId(),
lastMessageId.getEntryId(), lastMessageId.getPartition())
: new BatchMessageIdImpl(lastMessageId.getLedgerId(),
lastMessageId.getEntryId(), lastMessageId.getPartition(), lastMessageId.getBatchIndex());

future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition));
topic, subscription, result.getLedgerId(), result.getEntryId());
if (result.getBatchIndex() < 0) {
future.complete(new MessageIdImpl(result.getLedgerId(),
result.getEntryId(), result.getPartition()));
} else {
future.complete(new BatchMessageIdImpl(result.getLedgerId(),
result.getEntryId(), result.getPartition(), result.getBatchIndex()));
}
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
future.completeExceptionally(
Expand Down
Loading

0 comments on commit 58a07d0

Please sign in to comment.