Skip to content

Conversation

@poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Aug 31, 2022

Motivation

截屏2022-08-30 18 29 31

Task "Clear the ledger that has been consumed" will be triggered by mark delete event and retention policy.

(High light) Because there is no durable cursor(__compaction already acknowledged), the ledger will be removed by the asynchronous clean-ledger-task, even if the reader is still reading

admin.topics().triggerCompaction(topic);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
Assert.assertEquals(stats.compactedLedger.entries, numMessages);
Assert.assertEquals(admin.topics().getStats(topic)
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
});
// Unload the topic to make sure the original ledger been deleted.
admin.topics().unload(topic);
// Produce more messages to the original topic
for (int i = 0; i < numMessages; ++i) {
lastMessage = producer.newMessage().key(i + numMessages + "").value(String.format("msg [%d]", i + numMessages)).sendAsync();
}
producer.flush();
lastMessage.join();
// For now the topic has 1000 messages in the compacted ledger and 1000 messages in the original topic.
@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageIdInclusive()
.startMessageId(MessageId.earliest)
.readCompacted(true)
.create();


The error execution flow is as follows:

producer:

  • send 1000 messages ( ledger-1 )
  • unload the topic.
  • send 1000 messages (ledger-2)

reader:

  • read 500 messages
  • auto acknowledgment
  • calculate the next read position:
    • getNextValidPosition(position) At this time the ledger-1 has been deleted, so getNextValidPosition(position) will return (originalLedger+1:0)
  • read (ledger-2:0), then error occur

Modifications

  • Create a consumer to generate a durable cursor, to avoid deleting the ledger before the reader receives.
  • Make the reader creates before calling triggerCompaction

Documentation

  • doc-required

  • doc-not-needed

  • doc

  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 31, 2022
@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

1 similar comment
@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@poorbarcode poorbarcode force-pushed the flaky/testReadCompleteMessagesDuringTopicUnloading branch from 6fa6c5e to 1f92855 Compare September 14, 2022 06:18
@poorbarcode
Copy link
Contributor Author

rebase master

@lhotari lhotari merged commit 4af8bf0 into apache:master Sep 20, 2022
@lhotari
Copy link
Member

lhotari commented Sep 20, 2022

Thank you @poorbarcode

@poorbarcode poorbarcode deleted the flaky/testReadCompleteMessagesDuringTopicUnloading branch September 20, 2022 12:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants