-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return the last position of the compacted data while the original data been deleted #12161
Merged
codelipenghui
merged 1 commit into
apache:master
from
codelipenghui:penghui/fix-get-last-messages
Sep 24, 2021
Merged
Return the last position of the compacted data while the original data been deleted #12161
codelipenghui
merged 1 commit into
apache:master
from
codelipenghui:penghui/fix-get-last-messages
Sep 24, 2021
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…a been deleted. Currently, for the get last message ID request the broker returns -1:-1 if all the original data been deleted. ``` 09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:44443] Created subscription on topic xxx 09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Reset cursor:ManagedCursorImpl{ledger=xxx, name=__compaction, ackPos=44946:0, readPos=44946:1} to 66425:-1 since ledger consumed completely 09:51:12.156 [BookKeeperClientWorker-OrderedExecutor-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Ledger 44946 contains the current last confirmed entry 44946:0, and it is going to be deleted 09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] End TrimConsumedLedgers. ledgers=1 totalSize=0 09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Removing ledger 44946 - size: 3999 ``` After the rollover task, the topic internal stats will be: ``` { "entriesAddedCounter": 0, "numberOfEntries": 0, "totalSize": 0, "currentLedgerEntries": 0, "currentLedgerSize": 0, "lastLedgerCreatedTimestamp": "2021-09-20T09:51:12.15Z", "waitingCursorsCount": 29, "pendingAddEntriesCount": 0, "lastConfirmedEntry": "44946:0", "state": "LedgerOpened", "ledgers": [ { "ledgerId": 66425, "entries": 0, "size": 0, "offloaded": false, "underReplicated": false } ], "cursors": { "__compaction": { "markDeletePosition": "44946:0", "readPosition": "44946:1", "waitingReadOp": false, "pendingReadOps": 0, "messagesConsumedCounter": 0, "cursorLedger": -1, "cursorLedgerLastEntry": -1, "individuallyDeletedMessages": "[]", "lastLedgerSwitchTimestamp": "2021-09-20T09:51:12.154Z", "state": "NoLedger", "numberOfEntriesSinceFirstNotAckedMessage": 1, "totalNonContiguousDeletedMessagesRange": 0, "subscriptionHavePendingRead": false, "subscriptionHavePendingReplayRead": false, "properties": { "CompactedTopicLedger": 64365 } } }, "schemaLedgers": [], "compactedLedger": { "ledgerId": 64365, "entries": 1, "size": 4024, "offloaded": false, "underReplicated": false } } ``` At this time, when a reader call hasMessageAvailable(), the client will get the last message id from the broker, the NonRecoverableLedgerException will throw at the broker side due the ledger 44946 has been deleted. ``` 12:41:40.937 [pulsar-io-4-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:53488] Created subscription on topic xxx / yyy 12:41:41.131 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Error opening ledger for reading at position 44946:0 - org.apache.bookkeeper.mledger.ManagedLedgerException$NonRecoverableLedgerException: No such ledger exists on Metadata Server ``` The problem is we are not checking if there is compacted data for the topic. If the topic has compacted data but encounter the above situation, we should return the last message ID of the compacted Ledger to the client. Added the test for the new changes.
codelipenghui
requested review from
eolivelli,
merlimat,
hangc0276,
315157973,
gaoran10 and
sijie
September 23, 2021 13:47
merlimat
approved these changes
Sep 23, 2021
codelipenghui
added a commit
that referenced
this pull request
Sep 24, 2021
…a been deleted. (#12161) Currently, for the get last message ID request the broker returns -1:-1 if all the original data been deleted. ``` 09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:44443] Created subscription on topic xxx 09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Reset cursor:ManagedCursorImpl{ledger=xxx, name=__compaction, ackPos=44946:0, readPos=44946:1} to 66425:-1 since ledger consumed completely 09:51:12.156 [BookKeeperClientWorker-OrderedExecutor-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Ledger 44946 contains the current last confirmed entry 44946:0, and it is going to be deleted 09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] End TrimConsumedLedgers. ledgers=1 totalSize=0 09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Removing ledger 44946 - size: 3999 ``` After the rollover task, the topic internal stats will be: ``` { "entriesAddedCounter": 0, "numberOfEntries": 0, "totalSize": 0, "currentLedgerEntries": 0, "currentLedgerSize": 0, "lastLedgerCreatedTimestamp": "2021-09-20T09:51:12.15Z", "waitingCursorsCount": 29, "pendingAddEntriesCount": 0, "lastConfirmedEntry": "44946:0", "state": "LedgerOpened", "ledgers": [ { "ledgerId": 66425, "entries": 0, "size": 0, "offloaded": false, "underReplicated": false } ], "cursors": { "__compaction": { "markDeletePosition": "44946:0", "readPosition": "44946:1", "waitingReadOp": false, "pendingReadOps": 0, "messagesConsumedCounter": 0, "cursorLedger": -1, "cursorLedgerLastEntry": -1, "individuallyDeletedMessages": "[]", "lastLedgerSwitchTimestamp": "2021-09-20T09:51:12.154Z", "state": "NoLedger", "numberOfEntriesSinceFirstNotAckedMessage": 1, "totalNonContiguousDeletedMessagesRange": 0, "subscriptionHavePendingRead": false, "subscriptionHavePendingReplayRead": false, "properties": { "CompactedTopicLedger": 64365 } } }, "schemaLedgers": [], "compactedLedger": { "ledgerId": 64365, "entries": 1, "size": 4024, "offloaded": false, "underReplicated": false } } ``` At this time, when a reader call hasMessageAvailable(), the client will get the last message id from the broker, the NonRecoverableLedgerException will throw at the broker side due the ledger 44946 has been deleted. ``` 12:41:40.937 [pulsar-io-4-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:53488] Created subscription on topic xxx / yyy 12:41:41.131 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Error opening ledger for reading at position 44946:0 - org.apache.bookkeeper.mledger.ManagedLedgerException$NonRecoverableLedgerException: No such ledger exists on Metadata Server ``` The problem is we are not checking if there is compacted data for the topic. If the topic has compacted data but encounter the above situation, we should return the last message ID of the compacted Ledger to the client. Added the test for the new changes. (cherry picked from commit 86e720f)
codelipenghui
added a commit
to codelipenghui/incubator-pulsar
that referenced
this pull request
Oct 5, 2021
… with negative entry ID. Which recover a ManagedLedger, if the ManagedLedgers does not contain any ledgers, the ManagedLedger will use the current Ledger ID and -1 to generate the `lastConfirmedEntry`. For more details to see: https://github.com/apache/pulsar/blob/4bc3c405a565b1c756b9b70ff02a63ea06c32c0d/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L477 But for compacted topic, all the data might be compacted and move to the compacted Ledger, In this case, the broker will return X:-1 as the last message ID of the topic to the consumer, the consumer will treat the negative entry ID as no data in this topic, so hasMoreMessages will return false, but there is compacted data in the topic. The fix is as apache#12161 does to return the last message ID from the compacted Ledger if `lastConfirmedEntry` of the ManagedLedger with negative entry ID. Improved the `testLastMessageIdForCompactedLedger` test to cover the new changes.
merlimat
pushed a commit
that referenced
this pull request
Oct 6, 2021
… with negative entry ID. (#12277) Which recover a ManagedLedger, if the ManagedLedgers does not contain any ledgers, the ManagedLedger will use the current Ledger ID and -1 to generate the `lastConfirmedEntry`. For more details to see: https://github.com/apache/pulsar/blob/4bc3c405a565b1c756b9b70ff02a63ea06c32c0d/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L477 But for compacted topic, all the data might be compacted and move to the compacted Ledger, In this case, the broker will return X:-1 as the last message ID of the topic to the consumer, the consumer will treat the negative entry ID as no data in this topic, so hasMoreMessages will return false, but there is compacted data in the topic. The fix is as #12161 does to return the last message ID from the compacted Ledger if `lastConfirmedEntry` of the ManagedLedger with negative entry ID. Improved the `testLastMessageIdForCompactedLedger` test to cover the new changes.
codelipenghui
added a commit
that referenced
this pull request
Oct 6, 2021
… with negative entry ID. (#12277) Which recover a ManagedLedger, if the ManagedLedgers does not contain any ledgers, the ManagedLedger will use the current Ledger ID and -1 to generate the `lastConfirmedEntry`. For more details to see: https://github.com/apache/pulsar/blob/4bc3c405a565b1c756b9b70ff02a63ea06c32c0d/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L477 But for compacted topic, all the data might be compacted and move to the compacted Ledger, In this case, the broker will return X:-1 as the last message ID of the topic to the consumer, the consumer will treat the negative entry ID as no data in this topic, so hasMoreMessages will return false, but there is compacted data in the topic. The fix is as #12161 does to return the last message ID from the compacted Ledger if `lastConfirmedEntry` of the ManagedLedger with negative entry ID. Improved the `testLastMessageIdForCompactedLedger` test to cover the new changes. (cherry picked from commit 65fb6c6)
eolivelli
pushed a commit
to eolivelli/pulsar
that referenced
this pull request
Feb 25, 2022
…a been deleted. (apache#12161) Currently, for the get last message ID request the broker returns -1:-1 if all the original data been deleted. ``` 09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:44443] Created subscription on topic xxx 09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Reset cursor:ManagedCursorImpl{ledger=xxx, name=__compaction, ackPos=44946:0, readPos=44946:1} to 66425:-1 since ledger consumed completely 09:51:12.156 [BookKeeperClientWorker-OrderedExecutor-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Ledger 44946 contains the current last confirmed entry 44946:0, and it is going to be deleted 09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] End TrimConsumedLedgers. ledgers=1 totalSize=0 09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Removing ledger 44946 - size: 3999 ``` After the rollover task, the topic internal stats will be: ``` { "entriesAddedCounter": 0, "numberOfEntries": 0, "totalSize": 0, "currentLedgerEntries": 0, "currentLedgerSize": 0, "lastLedgerCreatedTimestamp": "2021-09-20T09:51:12.15Z", "waitingCursorsCount": 29, "pendingAddEntriesCount": 0, "lastConfirmedEntry": "44946:0", "state": "LedgerOpened", "ledgers": [ { "ledgerId": 66425, "entries": 0, "size": 0, "offloaded": false, "underReplicated": false } ], "cursors": { "__compaction": { "markDeletePosition": "44946:0", "readPosition": "44946:1", "waitingReadOp": false, "pendingReadOps": 0, "messagesConsumedCounter": 0, "cursorLedger": -1, "cursorLedgerLastEntry": -1, "individuallyDeletedMessages": "[]", "lastLedgerSwitchTimestamp": "2021-09-20T09:51:12.154Z", "state": "NoLedger", "numberOfEntriesSinceFirstNotAckedMessage": 1, "totalNonContiguousDeletedMessagesRange": 0, "subscriptionHavePendingRead": false, "subscriptionHavePendingReplayRead": false, "properties": { "CompactedTopicLedger": 64365 } } }, "schemaLedgers": [], "compactedLedger": { "ledgerId": 64365, "entries": 1, "size": 4024, "offloaded": false, "underReplicated": false } } ``` At this time, when a reader call hasMessageAvailable(), the client will get the last message id from the broker, the NonRecoverableLedgerException will throw at the broker side due the ledger 44946 has been deleted. ``` 12:41:40.937 [pulsar-io-4-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:53488] Created subscription on topic xxx / yyy 12:41:41.131 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Error opening ledger for reading at position 44946:0 - org.apache.bookkeeper.mledger.ManagedLedgerException$NonRecoverableLedgerException: No such ledger exists on Metadata Server ``` The problem is we are not checking if there is compacted data for the topic. If the topic has compacted data but encounter the above situation, we should return the last message ID of the compacted Ledger to the client. Added the test for the new changes. (cherry picked from commit 86e720f) (cherry picked from commit b3e7be9)
eolivelli
pushed a commit
to eolivelli/pulsar
that referenced
this pull request
Feb 25, 2022
… with negative entry ID. (apache#12277) Which recover a ManagedLedger, if the ManagedLedgers does not contain any ledgers, the ManagedLedger will use the current Ledger ID and -1 to generate the `lastConfirmedEntry`. For more details to see: https://github.com/apache/pulsar/blob/4bc3c405a565b1c756b9b70ff02a63ea06c32c0d/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L477 But for compacted topic, all the data might be compacted and move to the compacted Ledger, In this case, the broker will return X:-1 as the last message ID of the topic to the consumer, the consumer will treat the negative entry ID as no data in this topic, so hasMoreMessages will return false, but there is compacted data in the topic. The fix is as apache#12161 does to return the last message ID from the compacted Ledger if `lastConfirmedEntry` of the ManagedLedger with negative entry ID. Improved the `testLastMessageIdForCompactedLedger` test to cover the new changes. (cherry picked from commit 65fb6c6) (cherry picked from commit 2e7088d)
bharanic-dev
pushed a commit
to bharanic-dev/pulsar
that referenced
this pull request
Mar 18, 2022
…a been deleted. (apache#12161) Currently, for the get last message ID request the broker returns -1:-1 if all the original data been deleted. ``` 09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:44443] Created subscription on topic xxx 09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Reset cursor:ManagedCursorImpl{ledger=xxx, name=__compaction, ackPos=44946:0, readPos=44946:1} to 66425:-1 since ledger consumed completely 09:51:12.156 [BookKeeperClientWorker-OrderedExecutor-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Ledger 44946 contains the current last confirmed entry 44946:0, and it is going to be deleted 09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] End TrimConsumedLedgers. ledgers=1 totalSize=0 09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Removing ledger 44946 - size: 3999 ``` After the rollover task, the topic internal stats will be: ``` { "entriesAddedCounter": 0, "numberOfEntries": 0, "totalSize": 0, "currentLedgerEntries": 0, "currentLedgerSize": 0, "lastLedgerCreatedTimestamp": "2021-09-20T09:51:12.15Z", "waitingCursorsCount": 29, "pendingAddEntriesCount": 0, "lastConfirmedEntry": "44946:0", "state": "LedgerOpened", "ledgers": [ { "ledgerId": 66425, "entries": 0, "size": 0, "offloaded": false, "underReplicated": false } ], "cursors": { "__compaction": { "markDeletePosition": "44946:0", "readPosition": "44946:1", "waitingReadOp": false, "pendingReadOps": 0, "messagesConsumedCounter": 0, "cursorLedger": -1, "cursorLedgerLastEntry": -1, "individuallyDeletedMessages": "[]", "lastLedgerSwitchTimestamp": "2021-09-20T09:51:12.154Z", "state": "NoLedger", "numberOfEntriesSinceFirstNotAckedMessage": 1, "totalNonContiguousDeletedMessagesRange": 0, "subscriptionHavePendingRead": false, "subscriptionHavePendingReplayRead": false, "properties": { "CompactedTopicLedger": 64365 } } }, "schemaLedgers": [], "compactedLedger": { "ledgerId": 64365, "entries": 1, "size": 4024, "offloaded": false, "underReplicated": false } } ``` At this time, when a reader call hasMessageAvailable(), the client will get the last message id from the broker, the NonRecoverableLedgerException will throw at the broker side due the ledger 44946 has been deleted. ``` 12:41:40.937 [pulsar-io-4-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:53488] Created subscription on topic xxx / yyy 12:41:41.131 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Error opening ledger for reading at position 44946:0 - org.apache.bookkeeper.mledger.ManagedLedgerException$NonRecoverableLedgerException: No such ledger exists on Metadata Server ``` The problem is we are not checking if there is compacted data for the topic. If the topic has compacted data but encounter the above situation, we should return the last message ID of the compacted Ledger to the client. Added the test for the new changes.
bharanic-dev
pushed a commit
to bharanic-dev/pulsar
that referenced
this pull request
Mar 18, 2022
… with negative entry ID. (apache#12277) Which recover a ManagedLedger, if the ManagedLedgers does not contain any ledgers, the ManagedLedger will use the current Ledger ID and -1 to generate the `lastConfirmedEntry`. For more details to see: https://github.com/apache/pulsar/blob/4bc3c405a565b1c756b9b70ff02a63ea06c32c0d/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L477 But for compacted topic, all the data might be compacted and move to the compacted Ledger, In this case, the broker will return X:-1 as the last message ID of the topic to the consumer, the consumer will treat the negative entry ID as no data in this topic, so hasMoreMessages will return false, but there is compacted data in the topic. The fix is as apache#12161 does to return the last message ID from the compacted Ledger if `lastConfirmedEntry` of the ManagedLedger with negative entry ID. Improved the `testLastMessageIdForCompactedLedger` test to cover the new changes.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
area/compaction
cherry-picked/branch-2.7
Archived: 2.7 is end of life
cherry-picked/branch-2.8
Archived: 2.8 is end of life
doc-not-needed
Your PR changes do not impact docs
release/2.7.5
release/2.8.2
release/2.9.0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Currently, for the get last message ID request the broker returns -1:-1 if all the original data been deleted.
After the rollover task, the topic internal stats will be:
At this time, when a reader call hasMessageAvailable(), the client will get the last message id from the broker, the NonRecoverableLedgerException will throw at the broker side due the ledger 44946 has been deleted.
The problem is we are not checking if there is compacted data for the topic. If the topic has compacted data but encounter the above situation, we should return the last message ID of the compacted Ledger to the client.
Added the test for the new changes.