Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
shouldRewindBeforeReadingOrReplaying = true;
} else {
cursor.rewind();
clearDelayedMessages();
shouldRewindBeforeReadingOrReplaying = false;
}
redeliveryMessages.clear();
Expand Down Expand Up @@ -480,6 +481,7 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
// All consumers got disconnected before the completion of the read operation
entries.forEach(Entry::release);
cursor.rewind();
clearDelayedMessages();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If clearDelayedMessages() binds with cursor.rewind(), it's better to wrap them into one method.

shouldRewindBeforeReadingOrReplaying = false;
readMoreEntries();
return;
Expand Down Expand Up @@ -527,6 +529,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
log.info("[{}] rewind because no available consumer found from total {}", name, consumerList.size());
entries.subList(start, entries.size()).forEach(Entry::release);
cursor.rewind();
clearDelayedMessages();
return;
}

Expand Down Expand Up @@ -646,6 +649,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
if (shouldRewindBeforeReadingOrReplaying) {
shouldRewindBeforeReadingOrReplaying = false;
cursor.rewind();
clearDelayedMessages();
}

if (readType == ReadType.Normal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor
msgOutFromRemovedConsumer.add(stats.msgOutCounter);

if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
// clear delay message avoid duplicate.
dispatcher.clearDelayedMessages();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other places will call cursor.rewind(); in the PersistentDispatcherMultipleConsumers. The cursor rewind will cause to add the duplicated delayed message index, and reset the cursor will also introduce this problem, could you please check them?

deactivateCursor();
topic.getManagedLedger().removeWaitingCursor(cursor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;

import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -492,4 +494,52 @@ public void testClearDelayedMessagesWhenClearBacklog() throws PulsarClientExcept
admin.topics().skipAllMessages(topic, subName);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
}

@Test
public void testDelayedDeliveryWithConsumerCloseAndRecreate()
throws Exception {
String topic = BrokerTestUtil.newUniqueName("persistent://public/default/testDelayedDelivery");

@Cleanup
Consumer<String> consumer = buildConsumer(topic);

// Simulate race condition with high frequency of calls to dispatcher.readMoreEntries()
PersistentTopic persistentTopic = (PersistentTopic) pulsar
.getBrokerService().getTopicReference(topic).get();
PersistentSubscription subscription = persistentTopic.getSubscription("shared-sub");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

producer.newMessage()
.value("msg-delay")
.deliverAfter(30, TimeUnit.SECONDS)
.sendAsync();

producer.flush();

producer.close();

// create and close 10 times
for (int i = 0; i < 10; i++) {
consumer.closeAsync();
consumer = buildConsumer(topic);
}
// wait for InMemoryDelayedDeliveryTracker was invoked
TimeUnit.SECONDS.sleep(5);

// expect only once
assertEquals(subscription.dispatcher.getNumberOfDelayedMessages(), 1);
}

private Consumer<String> buildConsumer(String topic) throws PulsarClientException {
return pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("shared-sub")
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(1) // Use small prefecthing to simulate the multiple read batches
.subscribe();
}
}