Skip to content

[fix] [broker] Fix reader can not get any messages but hasMessageAvailable always return true#18808

Closed
poorbarcode wants to merge 4 commits intoapache:masterfrom
poorbarcode:reproduce/tb_trim_ledger_3
Closed

[fix] [broker] Fix reader can not get any messages but hasMessageAvailable always return true#18808
poorbarcode wants to merge 4 commits intoapache:masterfrom
poorbarcode:reproduce/tb_trim_ledger_3

Conversation

@poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Dec 7, 2022

Motivation

Context

  • After pip-14, the consumer that enabled feature read-compacted will read messages from compacted topic instead of original topic if the task-compaction done, and read messages from original topic if task-compaction is not done.
  • If the data of the last message with key k sent to a topic is null, the compactor will mark all messages for that key as deleted.
  • the logic of consumer.getLastMessageId:
    • return managedLedger.lastConfirmPosition if not compacted.
    • return the last message that is not deleted by compaction; it may be less than managedLedger.lastConfirmPosition.
  • the logic of reader.hasMessageAvailable
    1. we can call the last received message id as lastReceiveMessageId
    2. get the last message id and cache it. aka lastMessageIdInBroker, it could be managedLedger.lastConfirmPosition, it could be less than managedLedger.lastConfirmPosition.
    3. return lastReceiveMessageId < lastMessageIdInBroker

Possible issue scenarios.

Based on the above context, there are two scenarios in that a reader can not get any messages but reader.hasMessageAvailable always returns true:

  • Scenario-1: The last message is deleted by compaction
    1. send messages: [{k1, v1}, {k2, v2}, {k2, null}]
    2. there are three entries in topic: [{3:0}, {3:1}, {3:2}]
    3. call consumer.getLastMessageId will get {3:2}, and cache it as lastMessageIdInBroker.
    4. do compaction will delete the messages {3:1} and {3:2}, because the data of this message with key k2 sent to a topic is null.
    5. receive the message {3:0}, and mark the last received message-id is {3:0}, aka lastReceiveMessageId
    6. call reader.hasMessageAvailable will get true, because lastReceiveMessageId < lastMessageIdInBroker
    7. call receive will get stuck, because there has no message in the compacted topic.

You can reproduce scenario-1 by the test CompactionTest.testRaceConditionByCompactionAndGetLastMessageId with @DataProvider-6.


  • Scenario-2: The last message in the batch is deleted by compaction
    1. send messages use batch feature: [{k1, v1}, {k2, v2}, {k2, null}]
    2. there is one entry in the topic: {position=3:0, batchSize=3}
    3. call consumer.getLastMessageId will get {3:0:-1:2}, and cache it as lastMessageIdInBroker.
    4. do compaction will mark the messages {3:0:-1,1} and {3:0:-1,2} as compactedOut, because the data of this message with key k2 sent to a topic is null.
    5. receive entry {3:0}, when the consumer unpacks the package, the two messages {3:0:-1,1} and {3:0:-1,1} are lost because they have been marked compactedOut. Mark the last received message-id is {3:0,-1,0}, aka lastReceiveMessageId
    6. call reader.hasMessageAvailable will get true, because lastReceiveMessageId < lastMessageIdInBroker
    7. call receive will be stuck, because there has no message in the compacted topic.

You can reproduce scenario-2 by the test CompactionTest.testRaceConditionByCompactionAndGetLastBatchMessageId2 with @DataProvider-5


(High light) We should discuss about the correctness of the behavior scenario-3-iv

  • Scenario-3: The last message in the batch is deleted by compaction
    1. send messages use batch feature: [{k1, v1}, {k2, v2}, {k2, null}]
    2. there is one entry in the topic: {position=3:0, batchSize=3}
    3. do compaction will mark the messages {3:0:-1,1} and {3:0:-1,2} as compactedOut, because the data of this message with key k2 sent to a topic is null.
    4. (High light)call consumer.getLastMessageId will get {3:0:-1:2}, and cache it as lastMessageIdInBroker. After task-compaction, should it return {3:0:-1:0}? You can reproduce it by test GetLastMessageIdCompactedTest.testGetLastMessageIdAfterCompactionEndWithNullMsg2 with @DataProvider-true
    5. receive entry {3:0}, when the consumer unpacks the package, the two messages {3:0:-1,1} and {3:0:-1,1} are lost because they have been marked compactedOut. Mark the last received message-id is {3:0,-1,0}, aka lastReceiveMessageId
    6. call reader.hasMessageAvailable will get true, because lastReceiveMessageId < lastMessageIdInBroker
    7. call receive will be stuck, because there has no message in the compacted topic.

You can reproduce scenario-3 by the test CompactionTest.testReadMessageAfterCompactionWithBatchFuture2 with @DataProvider-6


Modifications

1. Fix issue

In the following two scenarios, the consumer will read the message from the original topic, even if read-compacted is enabled and task-compaction is done. And will move the cursor.markDeletedPosition to compactionHorizon - 1 before reading messages.

  • After the task-compaction, all messages are marked deleted by the compactor.
  • Reading the last message of the compacted topic

2. Add tests

  • add the tests for the changes above.
  • add the tests for getLastMessageId enabled read-compacted.
  • add the tests to ensure that the last ledger which is not empty will not be deleted by task-trimLedgers, even if a new ledger has been created (which has not yet written any data). Because after this PR, it is possible to read the original topic even if task-compaction is done.
    • Even if the topic is a system topic.
    • Even if there are only non-durable cursors besides compaction

The rejected plans

Plan 1

If task-compaction is not done, return the first non-empty message instead of managedLedger.lastConfirmPosition when calling getLastMessageId.

Why rejected: If sending messages like below, we can't easily get that which message was the last one not deleted.

send {key = "k1", value = "v1"}
send {key = "k1", value = "v2"}
send {key = "k1", value = null}

The last non-empty message is {key = "k1", value = "v2"}, but it has been deleted by the third message.


Plan 2

Diff with current changes: read messages from original topic only if the last message is null.

Why rejected: If the last message is a batch message, we can not easily know if the last message in this batch is empty. Rather than unpack by the broker, do unpack messages by the client.


Plan 3

Improve plan 2: If the last message is a batch message, do not execute rebatchMessage, so that the messages in the last entry will not be marked as compactedOut; then the client will not lose the empty messages.

Why rejected: When executing task-compaction, it's impossible to easily determine which message is the last one to keep, as in the example below:

get message `{position= 1:1, key = "k1", value = "v1"}`, mark the last message is `1:1`
get message `{position= 1:2, key = "k2", value = "v2"}`, mark the last message is `1:2`
get message `{position= 1:3, key = "k3", value = "v3"}`, mark the last message is `1:3`
get message `{position= 1:4, key = "k3", value = null}`, we can not determine the last message now

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 7, 2022
@poorbarcode
Copy link
Contributor Author

I will push another PR to solve this issue

@poorbarcode poorbarcode closed this Dec 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments