Skip to content

Commit

Permalink
[Java Reader Client] Start reader inside batch result in read first m…
Browse files Browse the repository at this point in the history
…essage in batch. (apache#6345)

Fixes apache#6344 
Fixes apache#6350

The bug was brought in apache#5622 by changing the skip logic wrongly.

(cherry picked from commit 63ccd43)
  • Loading branch information
yjshen authored and tuteng committed Mar 21, 2020
1 parent 08196be commit ac59e09
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -69,8 +71,10 @@
import java.util.stream.Collectors;

import lombok.Cleanup;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand Down Expand Up @@ -1419,7 +1423,6 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
* Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
*
*
* @param batchMessageDelayMs
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -1736,7 +1739,6 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
* Verify: Consumer2 sends ack of Consumer1 and consumer1 should be unblock if it is blocked due to unack-messages
*
*
* @param batchMessageDelayMs
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -3219,7 +3221,7 @@ public void testPartitionedTopicWithOnePartition() throws Exception {
}

@Test(dataProvider = "variationsForExpectedPos")
public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
public void testConsumerStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
throws Exception {
final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos";
final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset
Expand All @@ -3230,14 +3232,31 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star
.enableBatching(batching)
.create();

MessageId resetPos = null;
CountDownLatch latch = new CountDownLatch(numOfMessages);

final AtomicReference<MessageId> resetPos = new AtomicReference<>();

for (int i = 0; i < numOfMessages; i++) {
MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
if (resetIndex == i) {
resetPos = msgId;
}

final int j = i;

producer.sendAsync(String.format("msg num %d", i).getBytes())
.thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
.whenComplete((p, e) -> {
if (e != null) {
fail("send msg failed due to " + e.getMessage());
} else {
log.info("send msg with id {}", p.getRight());
if (p.getLeft() == resetIndex) {
resetPos.set(p.getRight());
}
}
latch.countDown();
});
}

latch.await();

ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic(topicName);

Expand All @@ -3246,7 +3265,8 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star
}

Consumer<byte[]> consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe();
consumer.seek(resetPos);
consumer.seek(resetPos.get());
log.info("reset cursor to {}", resetPos.get());
Set<String> messageSet = Sets.newHashSet();
for (int i = firstMessage; i < numOfMessages; i++) {
Message<byte[]> message = consumer.receive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand All @@ -30,12 +31,16 @@
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -699,17 +704,33 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star
.enableBatching(batching)
.create();

MessageId resetPos = null;
CountDownLatch latch = new CountDownLatch(numOfMessages);

final AtomicReference<MessageId> resetPos = new AtomicReference<>();

for (int i = 0; i < numOfMessages; i++) {
MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
if (resetIndex == i) {
resetPos = msgId;
}

final int j = i;

producer.sendAsync(String.format("msg num %d", i).getBytes())
.thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
.whenComplete((p, e) -> {
if (e != null) {
fail("send msg failed due to " + e.getMessage());
} else {
if (p.getLeft() == resetIndex) {
resetPos.set(p.getRight());
}
}
latch.countDown();
});
}

latch.await();

ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader()
.topic(topicName)
.startMessageId(resetPos);
.startMessageId(resetPos.get());

if (startInclusive) {
readerBuilder.startMessageIdInclusive();
Expand Down Expand Up @@ -761,4 +782,43 @@ public void testReaderBuilderConcurrentCreate() throws Exception {
producers.get(i).close();
}
}

@Test
public void testReaderStartInMiddleOfBatch() throws Exception {
final String topicName = "persistent://my-property/my-ns/ReaderStartInMiddleOfBatch";
final int numOfMessage = 100;

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(true)
.batchingMaxMessages(10)
.create();

CountDownLatch latch = new CountDownLatch(100);

List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < numOfMessage; i++) {
producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
if (e != null) {
fail();
} else {
allIds.add(mid);
}
latch.countDown();
});
}

latch.await();

for (MessageId id : allIds) {
Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).startMessageIdInclusive().create();
MessageId idGot = reader.readNext().getMessageId();
assertEquals(idGot, id);
reader.close();
}

producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class BatchReceivePolicy {
/**
* Default batch receive policy.
*
* <p>Max number of messages: 100
* <p>Max number of messages: no limit
* Max number of bytes: 10MB
* Timeout: 100ms<p/>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
}

protected boolean hasEnoughMessagesForBatchReceive() {
if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumMessages() <= 0) {
if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
return false;
}
return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -123,6 +124,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final SubscriptionMode subscriptionMode;
private volatile BatchMessageIdImpl startMessageId;

private volatile BatchMessageIdImpl seekMessageId;
private final AtomicBoolean duringSeek;

private final BatchMessageIdImpl initialStartMessageId;
private final long startMessageRollbackDurationInSec;

Expand Down Expand Up @@ -205,6 +209,8 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
stats = ConsumerStatsDisabled.INSTANCE;
}

duringSeek = new AtomicBoolean(false);

if (conf.getAckTimeoutMillis() != 0) {
if (conf.getTickDurationMillis() > 0) {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(),
Expand Down Expand Up @@ -667,6 +673,13 @@ private BatchMessageIdImpl clearReceiverQueue() {
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);

if (duringSeek.compareAndSet(true, false)) {
return seekMessageId;
} else if (subscriptionMode == SubscriptionMode.Durable) {
return null;
}

if (!currentMessageQueue.isEmpty()) {
MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId();
BatchMessageIdImpl previousMessage;
Expand Down Expand Up @@ -867,7 +880,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {

if (isResetIncludedAndSameEntryLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
// We need to discard entries that were prior to startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
Expand Down Expand Up @@ -1018,7 +1031,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
singleMessageMetadataBuilder, i, batchSize);

if (isResetIncludedAndSameEntryLedger(messageId) && isPriorBatchIndex(i)) {
if (isSameEntry(messageId) && isPriorBatchIndex(i)) {
// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1091,8 +1104,8 @@ private boolean isPriorBatchIndex(long idx) {
return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex();
}

private boolean isResetIncludedAndSameEntryLedger(MessageIdData messageId) {
return !resetIncludeHead && startMessageId != null
private boolean isSameEntry(MessageIdData messageId) {
return startMessageId != null
&& messageId.getLedgerId() == startMessageId.getLedgerId()
&& messageId.getEntryId() == startMessageId.getEntryId();
}
Expand Down Expand Up @@ -1477,7 +1490,10 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to publish time {}", topic, subscription, timestamp);
acknowledgmentsGroupingTracker.flushAndClean();
lastDequeuedMessage = MessageId.earliest;

seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest);
duringSeek.set(true);

incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
seekFuture.complete(null);
Expand Down Expand Up @@ -1520,7 +1536,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId);
acknowledgmentsGroupingTracker.flushAndClean();
lastDequeuedMessage = messageId;

seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId);
duringSeek.set(true);

incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
seekFuture.complete(null);
Expand Down

0 comments on commit ac59e09

Please sign in to comment.