Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix Reader.hasMessageAvailable might return true after seeking to latest #22201

Merged
Expand Up @@ -68,6 +68,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -813,4 +814,27 @@ public void testReaderReconnectedFromNextEntry() throws Exception {
producer.close();
admin.topics().delete(topic, false);
}

@DataProvider
public static Object[][] initializeLastMessageIdInBroker() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "initializeLastMessageIdInBroker")
public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBroker) throws Exception {
final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek";
@Cleanup Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageId(MessageId.earliest).create();

@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
producer.send("msg");

if (initializeLastMessageIdInBroker) {
assertTrue(reader.hasMessageAvailable());
} // else: lastMessageIdInBroker is earliest

reader.seek(MessageId.latest);
// lastMessageIdInBroker is the last message ID, while startMessageId is still earliest
assertFalse(reader.hasMessageAvailable());
}
}
Expand Up @@ -167,7 +167,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private volatile MessageIdAdv startMessageId;

private volatile MessageIdAdv seekMessageId;
private final AtomicBoolean duringSeek;
@VisibleForTesting
final AtomicReference<SeekStatus> seekStatus;
private volatile CompletableFuture<Void> seekFuture;

private final MessageIdAdv initialStartMessageId;

Expand Down Expand Up @@ -304,7 +306,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
stats = ConsumerStatsDisabled.INSTANCE;
}

duringSeek = new AtomicBoolean(false);
seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED);

// Create msgCrypto if not created already
if (conf.getCryptoKeyReader() != null) {
Expand Down Expand Up @@ -781,15 +783,15 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
clearReceiverQueue();
clearReceiverQueue(false);
return CompletableFuture.completedFuture(null);
}

log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
topic, subscription, cnx.ctx().channel(), consumerId);

long requestId = client.newRequestId();
if (duringSeek.get()) {
if (seekStatus.get() != SeekStatus.NOT_STARTED) {
acknowledgmentsGroupingTracker.flushAndClean();
}

Expand All @@ -800,7 +802,8 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
int currentSize;
synchronized (this) {
currentSize = incomingMessages.size();
startMessageId = clearReceiverQueue();
setClientCnx(cnx);
clearReceiverQueue(true);
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.clear();
}
Expand Down Expand Up @@ -838,7 +841,6 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
// synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them
final CompletableFuture<Void> future = new CompletableFuture<>();
synchronized (this) {
setClientCnx(cnx);
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
conf.isReplicateSubscriptionState(),
Expand Down Expand Up @@ -943,15 +945,23 @@ protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
* not seen by the application.
*/
private MessageIdAdv clearReceiverQueue() {
private void clearReceiverQueue(boolean updateStartMessageId) {
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
resetIncomingMessageSize();

if (duringSeek.compareAndSet(true, false)) {
return seekMessageId;
} else if (subscriptionMode == SubscriptionMode.Durable) {
return startMessageId;
CompletableFuture<Void> seekFuture = this.seekFuture;
MessageIdAdv seekMessageId = this.seekMessageId;

if (updateStartMessageId && seekStatus.get() != SeekStatus.NOT_STARTED) {
startMessageId = seekMessageId;
}
if (seekStatus.compareAndSet(SeekStatus.COMPLETED, SeekStatus.NOT_STARTED)) {
internalPinnedExecutor.execute(() -> seekFuture.complete(null));
return;
}
if (subscriptionMode == SubscriptionMode.Durable) {
return;
}

if (!currentMessageQueue.isEmpty()) {
Expand All @@ -968,15 +978,14 @@ private MessageIdAdv clearReceiverQueue() {
}
// release messages if they are pooled messages
currentMessageQueue.forEach(Message::release);
return previousMessage;
} else if (!lastDequeuedMessageId.equals(MessageId.earliest)) {
if (updateStartMessageId) {
startMessageId = previousMessage;
}
} else if (updateStartMessageId && !lastDequeuedMessageId.equals(MessageId.earliest)) {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
// in the past
return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);
} else {
// No message was received or dequeued by this consumer. Next message would still be the startMessageId
return startMessageId;
}
startMessageId = new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);
} // else: No message was received or dequeued by this consumer. Next message would still be the startMessageId
}

/**
Expand Down Expand Up @@ -2249,25 +2258,23 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create();

CompletableFuture<Void> seekFuture = new CompletableFuture<>();
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
if (!seekStatus.compareAndSet(SeekStatus.NOT_STARTED, SeekStatus.IN_PROGRESS)) {
final String message = String.format(
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
topic, subscription, seekBy);
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
return FutureUtil.failedFuture(new IllegalStateException(message));
}
seekFuture = new CompletableFuture<>();
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs);
return seekFuture;
}

private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
final Backoff backoff, final AtomicLong remainingTime,
CompletableFuture<Void> seekFuture) {
final Backoff backoff, final AtomicLong remainingTime) {
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
if (!duringSeek.compareAndSet(false, true)) {
final String message = String.format(
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
topic, subscription, seekBy);
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
seekFuture.completeExceptionally(new IllegalStateException(message));
return;
}
MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
Expand All @@ -2279,14 +2286,25 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
lastDequeuedMessageId = MessageId.earliest;

clearIncomingMessages();
seekFuture.complete(null);
CompletableFuture<Void> future = null;
synchronized (this) {
if (cnx() == null) {
// It's during reconnection, complete the seek future after connection is established
seekStatus.set(SeekStatus.COMPLETED);
} else {
future = seekFuture;
startMessageId = seekMessageId;
seekStatus.set(SeekStatus.NOT_STARTED);
}
}
if (future != null) {
future.complete(null);
}
}).exceptionally(e -> {
// re-set duringSeek and seekMessageId if seek failed
seekMessageId = originSeekMessageId;
duringSeek.set(false);
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());

seekFuture.completeExceptionally(
failSeek(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
subscription, topicName.toString(), seekBy)));
Expand All @@ -2295,7 +2313,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
} else {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
seekFuture.completeExceptionally(
failSeek(
new PulsarClientException.TimeoutException(
String.format("The subscription %s of the topic %s could not seek "
+ "withing configured timeout", subscription, topicName.toString())));
Expand All @@ -2306,11 +2324,18 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture);
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime);
}, nextDelay, TimeUnit.MILLISECONDS);
}
}

private void failSeek(Throwable throwable) {
CompletableFuture<Void> seekFuture = this.seekFuture;
if (seekStatus.compareAndSet(SeekStatus.IN_PROGRESS, SeekStatus.NOT_STARTED)) {
seekFuture.completeExceptionally(throwable);
}
}

@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
String seekBy = String.format("the timestamp %d", timestamp);
Expand Down Expand Up @@ -2968,4 +2993,10 @@ boolean isAckReceiptEnabled() {

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

@VisibleForTesting
enum SeekStatus {
NOT_STARTED,
IN_PROGRESS,
COMPLETED
}
}
Expand Up @@ -29,6 +29,8 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -283,15 +285,17 @@

consumer.setClientCnx(cnx);
consumer.setState(HandlerState.State.Ready);
consumer.seekStatus.set(ConsumerImpl.SeekStatus.NOT_STARTED);

// when
CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
CompletableFuture<Void> secondResult = consumer.seekAsync(1L);

clientReq.complete(null);

// then
assertTrue(firstResult.isDone());
// The seek future will be completed in connectionOpened after receiving the seek response
assertFalse(firstResult.isDone());

Check failure on line 297 in pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Pulsar Client

ConsumerImplTest.testSeekAsyncInternal

expected [false] but found [true]
assertEquals(consumer.seekStatus.get(), ConsumerImpl.SeekStatus.COMPLETED);
assertTrue(secondResult.isCompletedExceptionally());
verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());
}
Expand Down