Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java Reader Client] Start reader inside batch result in read first message in batch. #6345

Merged
merged 8 commits into from
Feb 24, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -69,8 +71,10 @@
import java.util.stream.Collectors;

import lombok.Cleanup;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand Down Expand Up @@ -1419,7 +1423,6 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
* Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
*
*
* @param batchMessageDelayMs
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -1736,7 +1739,6 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
* Verify: Consumer2 sends ack of Consumer1 and consumer1 should be unblock if it is blocked due to unack-messages
*
*
* @param batchMessageDelayMs
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -3219,7 +3221,7 @@ public void testPartitionedTopicWithOnePartition() throws Exception {
}

@Test(dataProvider = "variationsForExpectedPos")
public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
public void testConsumerStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
throws Exception {
final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos";
final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset
Expand All @@ -3230,14 +3232,31 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star
.enableBatching(batching)
.create();

MessageId resetPos = null;
CountDownLatch latch = new CountDownLatch(numOfMessages);

final AtomicReference<MessageId> resetPos = new AtomicReference<>();

for (int i = 0; i < numOfMessages; i++) {
MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
if (resetIndex == i) {
resetPos = msgId;
}

final int j = i;

producer.sendAsync(String.format("msg num %d", i).getBytes())
.thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
.whenComplete((p, e) -> {
if (e != null) {
fail("send msg failed due to " + e.getMessage());
} else {
log.info("send msg with id {}", p.getRight());
if (p.getLeft() == resetIndex) {
resetPos.set(p.getRight());
}
}
latch.countDown();
});
}

latch.await();

ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic(topicName);

Expand All @@ -3246,7 +3265,8 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star
}

Consumer<byte[]> consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe();
consumer.seek(resetPos);
consumer.seek(resetPos.get());
log.info("reset cursor to {}", resetPos.get());
Set<String> messageSet = Sets.newHashSet();
for (int i = firstMessage; i < numOfMessages; i++) {
Message<byte[]> message = consumer.receive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand All @@ -30,12 +31,16 @@
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -699,17 +704,33 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star
.enableBatching(batching)
.create();

MessageId resetPos = null;
CountDownLatch latch = new CountDownLatch(numOfMessages);

final AtomicReference<MessageId> resetPos = new AtomicReference<>();

for (int i = 0; i < numOfMessages; i++) {
MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
if (resetIndex == i) {
resetPos = msgId;
}

final int j = i;

producer.sendAsync(String.format("msg num %d", i).getBytes())
.thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
.whenComplete((p, e) -> {
if (e != null) {
fail("send msg failed due to " + e.getMessage());
} else {
if (p.getLeft() == resetIndex) {
resetPos.set(p.getRight());
}
}
latch.countDown();
});
}

latch.await();

ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader()
.topic(topicName)
.startMessageId(resetPos);
.startMessageId(resetPos.get());

if (startInclusive) {
readerBuilder.startMessageIdInclusive();
Expand Down Expand Up @@ -761,4 +782,43 @@ public void testReaderBuilderConcurrentCreate() throws Exception {
producers.get(i).close();
}
}

@Test
public void testReaderStartInMiddleOfBatch() throws Exception {
final String topicName = "persistent://my-property/my-ns/ReaderStartInMiddleOfBatch";
final int numOfMessage = 100;

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(true)
.batchingMaxMessages(10)
.create();

CountDownLatch latch = new CountDownLatch(100);

List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < numOfMessage; i++) {
producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
if (e != null) {
fail();
} else {
allIds.add(mid);
}
latch.countDown();
});
}

latch.await();

for (MessageId id : allIds) {
Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).startMessageIdInclusive().create();
MessageId idGot = reader.readNext().getMessageId();
assertEquals(idGot, id);
reader.close();
}

producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class BatchReceivePolicy {
/**
* Default batch receive policy.
*
* <p>Max number of messages: 100
* <p>Max number of messages: no limit
* Max number of bytes: 10MB
* Timeout: 100ms<p/>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
}

protected boolean hasEnoughMessagesForBatchReceive() {
if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumMessages() <= 0) {
if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
return false;
}
return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -123,6 +124,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final SubscriptionMode subscriptionMode;
private volatile BatchMessageIdImpl startMessageId;

private volatile BatchMessageIdImpl seekMessageId;
private final AtomicBoolean duringSeek;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible only add seekMessageId to achieve the seek logic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Without the flag, we cannot tell if seekMessageId or lastDequeueMessage should be used as the start message id.


private final BatchMessageIdImpl initialStartMessageId;
private final long startMessageRollbackDurationInSec;

Expand Down Expand Up @@ -205,6 +209,8 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
stats = ConsumerStatsDisabled.INSTANCE;
}

duringSeek = new AtomicBoolean(false);

if (conf.getAckTimeoutMillis() != 0) {
if (conf.getTickDurationMillis() > 0) {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(),
Expand Down Expand Up @@ -667,6 +673,13 @@ private BatchMessageIdImpl clearReceiverQueue() {
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);

if (duringSeek.compareAndSet(true, false)) {
return seekMessageId;
} else if (subscriptionMode == SubscriptionMode.Durable) {
return null;
}

if (!currentMessageQueue.isEmpty()) {
MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId();
BatchMessageIdImpl previousMessage;
Expand Down Expand Up @@ -867,7 +880,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {

if (isResetIncludedAndSameEntryLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
// We need to discard entries that were prior to startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
Expand Down Expand Up @@ -1018,7 +1031,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
singleMessageMetadataBuilder, i, batchSize);

if (isResetIncludedAndSameEntryLedger(messageId) && isPriorBatchIndex(i)) {
if (isSameEntry(messageId) && isPriorBatchIndex(i)) {
// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1091,8 +1104,8 @@ private boolean isPriorBatchIndex(long idx) {
return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex();
}

private boolean isResetIncludedAndSameEntryLedger(MessageIdData messageId) {
return !resetIncludeHead && startMessageId != null
private boolean isSameEntry(MessageIdData messageId) {
return startMessageId != null
&& messageId.getLedgerId() == startMessageId.getLedgerId()
&& messageId.getEntryId() == startMessageId.getEntryId();
}
Expand Down Expand Up @@ -1477,7 +1490,10 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to publish time {}", topic, subscription, timestamp);
acknowledgmentsGroupingTracker.flushAndClean();
lastDequeuedMessage = MessageId.earliest;

seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest);
duringSeek.set(true);

incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
seekFuture.complete(null);
Expand Down Expand Up @@ -1520,7 +1536,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId);
acknowledgmentsGroupingTracker.flushAndClean();
lastDequeuedMessage = messageId;

seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId);
duringSeek.set(true);
Comment on lines +1540 to +1541
Copy link
Contributor

@Lanayx Lanayx Mar 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi! Looks like this commit is a breaking change, it broke my integration test for Reader. The logic of unit test is as follows

  1. Read 10 messages
  2. Seek back to the first
  3. Read 10 messages again
    Earlier hasMessageAvailableAsync returned true on this line because lastDequeuedMessage was reset, but now it returns false which leads to getLastMessageIdAsync execution. And because seek drops connection, getLastMessageIdAsync fails for me with server error "Error: MetadataError. Message: Consumer not found", sot the test fails

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic w/o this PR is problematic because lastDequeueMessage is insufficient to express seek, startMessageId and normal dequeue behavior. Therefore lead the bug as this PR aims to fix.

Therefore, If it breaks your test, please reconsider your code logic by using the seekMessageId or lastDequeueMessage. Since the master branch is in the health state

Copy link
Contributor

@Lanayx Lanayx Mar 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I like this improvement, just want to add that additional effort needs to be done here to make it work properly, since Seek+Read is a very common reader pattern. If I put a small delay between 2 and 3 step, then everything works properly, because connection State changes to Connecting and getLastMessageIdAsync is retried, but if hasMessageAvailableAsync is called before state change, then I get the error.
To make it clear - I don't run the code from the branch, but porting changes to .net client library, so this is how I found the issue, but it looks like this issue logically exists for Java client as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the test fail if the reader is used concurrently by multiple readers? Link to The line in the first comment is broken.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, fixed the link. No, reader is used by single thread, it waits for seek result and immediately calls hasMessageAvailableAsync

Copy link
Contributor

@Lanayx Lanayx Mar 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another issue with the same test with Batching enabled - as long as lastDequeueMessage is not reset, it stays on last BatchMessageIdImpl while getLastMessageIdAsync always returns MessageIdImpl so this check returns false because types of messageIds are different and I get 0 messages instead of 10 on the 3d step

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, adding this line fixes both issues for me at the moment


incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
seekFuture.complete(null);
Expand Down