New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Broker enters an infinite loop in ManagedLedgerImpl.asyncReadEntries #8284
Conversation
@merlimat @rdhabalia Please help take a look, thanks. |
@@ -1618,6 +1616,8 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) | |||
ledger.getId(), lastEntryInLedger, firstEntry); | |||
} | |||
|
|||
opReadEntry.updateReadPosition(opReadEntry.readPosition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's better to move it to the following if ... else ...
if (currentLedger == null || ledger.getId() != currentLedger.getId()) {
...
} else {
opReadEntry.updateReadPosition(opReadEntry.readPosition);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elegant!
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
…ies (#8284) Fix #8229 ### Bug Detail refer to broker dump file info: https://gist.github.com/lhotari/c34db2664b15fc32c9f0e7ae8b37dbda#gistcomment-3491707 We take the first row in the extracted table for example. ```shell cursor.markDeletePosition = 2273599:-1 opread readPosition = 2282101:0 opread nextReadPosition = 2282101:0 cursor readPosition = 2273599:0 cursor writePosition = 2273599:4 cursor.ledger.currentLedger.lastAddConfirmed = -1 cursor.ledger.currentLedger.ledgerId = 2282101 SQL: ((cursor.markDeletePosition.ledgerId.toString() + ":") + cursor.markDeletePosition.entryId.toString()) AS "cursor.markDeletePosition", ((readPosition.ledgerId.toString() + ":") + readPosition.entryId.toString()) AS "opread readPosition", ((nextReadPosition.ledgerId.toString() + ":") + nextReadPosition.entryId.toString()) AS "opread nextReadPosition", ((cursor.readPosition.ledgerId.toString() + ":") + cursor.readPosition.entryId.toString()) AS "cursor readPosition", ((cursor.ledger.lastConfirmedEntry.ledgerId.toString() + ":") + cursor.ledger.lastConfirmedEntry.entryId.toString()) AS "cursor writePosition", cursor.ledger.currentLedger.lastAddConfirmed, cursor.ledger.currentLedger.ledgerId.toString() AS "cursor.ledger.currentLedger.ledgerId" ``` When call `ManagedCursorImpl#asyncReadEntries`, cursor.readPosisition is **2273599:0**, however, when using cursor.readPostition to construct opReadEntry `OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx);`, it use the cursor.readPosition to construct op.readPostition `op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);`. Due to cursor.readPosition not exist in managedLedger ledgers map, `startReadOperationOnLedger` return the earliest available ledger position, and set op.readPosition to **2282101:0**, but the cursor.readPosition still **2273599:0**. When call `ManagedLedgerImpl#asyncReadEntries` according to the constructed opReadEntry, it call `ManagedLedgerImpl#internalReadFromLedger`. The key variables as follow ```shell ledger = 2282101:-1 lastPosition = 2273599:4 ledger.getId()[2282101] != lastPosition.getLedgerId() [2273599] firstEntry = op.readPosition.getEntryId() = 0 lastEntryInLedger = ledger.getLastAddConfirmed = -1 ``` Thus, it will go into the following branch ```java if (firstEntry > lastEntryInLedger) { if (log.isDebugEnabled()) { log.debug("[{}] No more messages to read from ledger={} lastEntry={} readEntry={}", name, ledger.getId(), lastEntryInLedger, firstEntry); } if (currentLedger == null || ledger.getId() != currentLedger.getId()) { // Cursor was placed past the end of one ledger, move it to the // beginning of the next ledger Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1); if (nextLedgerId != null) { opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0)); } else { opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0)); } } opReadEntry.checkReadCompletion(); return; } ``` Finally, it call `opReadEntry.checkReadCompletion()`, and then call `ManagedCursor#hasMoreEntries` to check whether has more entries to read. If `hasMoreEntries` returns true, it will setup another read thread to read more entries. ```java public boolean hasMoreEntries() { // If writer and reader are on the same ledger, we just need to compare the entry id to know if we have more // entries. // If they are on different ledgers we have 2 cases : // * Writer pointing to valid entry --> should return true since we have available entries // * Writer pointing to "invalid" entry -1 (meaning no entries in that ledger) --> Need to check if the reader // is // at the last entry in the previous ledger PositionImpl writerPosition = ledger.getLastPosition(); if (writerPosition.getEntryId() != -1) { return readPosition.compareTo(writerPosition) <= 0; } else { // Fall back to checking the number of entries to ensure we are at the last entry in ledger and no ledgers // are in the middle return getNumberOfEntries() > 0; } } ``` In `hasMoreEntries`, the key variables are `writerPosition` and `readPosition`. `writerPosition` is cursor.ledger.lastConfirmedEntry, which is `2273599:4` and `readPosition` is cursor.readPosition, which is `2273599:0`, thus, `hasMoreEntries` always return `true` and will fall into infinite loop and create a lot of read thread. The bug is op.readPosition not sync immediatly with cursor.readPosition. ### Changes 1. sync op.readPosition with cursor.readPosition before calling `checkReadCompletion`. * Fix Broker enters an infinite loop in ManagedLedgerImpl.asyncReadEntries * format code * format code (cherry picked from commit 5df23b5)
…ies (apache#8284) Fix apache#8229 ### Bug Detail refer to broker dump file info: https://gist.github.com/lhotari/c34db2664b15fc32c9f0e7ae8b37dbda#gistcomment-3491707 We take the first row in the extracted table for example. ```shell cursor.markDeletePosition = 2273599:-1 opread readPosition = 2282101:0 opread nextReadPosition = 2282101:0 cursor readPosition = 2273599:0 cursor writePosition = 2273599:4 cursor.ledger.currentLedger.lastAddConfirmed = -1 cursor.ledger.currentLedger.ledgerId = 2282101 SQL: ((cursor.markDeletePosition.ledgerId.toString() + ":") + cursor.markDeletePosition.entryId.toString()) AS "cursor.markDeletePosition", ((readPosition.ledgerId.toString() + ":") + readPosition.entryId.toString()) AS "opread readPosition", ((nextReadPosition.ledgerId.toString() + ":") + nextReadPosition.entryId.toString()) AS "opread nextReadPosition", ((cursor.readPosition.ledgerId.toString() + ":") + cursor.readPosition.entryId.toString()) AS "cursor readPosition", ((cursor.ledger.lastConfirmedEntry.ledgerId.toString() + ":") + cursor.ledger.lastConfirmedEntry.entryId.toString()) AS "cursor writePosition", cursor.ledger.currentLedger.lastAddConfirmed, cursor.ledger.currentLedger.ledgerId.toString() AS "cursor.ledger.currentLedger.ledgerId" ``` When call `ManagedCursorImpl#asyncReadEntries`, cursor.readPosisition is **2273599:0**, however, when using cursor.readPostition to construct opReadEntry `OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx);`, it use the cursor.readPosition to construct op.readPostition `op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);`. Due to cursor.readPosition not exist in managedLedger ledgers map, `startReadOperationOnLedger` return the earliest available ledger position, and set op.readPosition to **2282101:0**, but the cursor.readPosition still **2273599:0**. When call `ManagedLedgerImpl#asyncReadEntries` according to the constructed opReadEntry, it call `ManagedLedgerImpl#internalReadFromLedger`. The key variables as follow ```shell ledger = 2282101:-1 lastPosition = 2273599:4 ledger.getId()[2282101] != lastPosition.getLedgerId() [2273599] firstEntry = op.readPosition.getEntryId() = 0 lastEntryInLedger = ledger.getLastAddConfirmed = -1 ``` Thus, it will go into the following branch ```java if (firstEntry > lastEntryInLedger) { if (log.isDebugEnabled()) { log.debug("[{}] No more messages to read from ledger={} lastEntry={} readEntry={}", name, ledger.getId(), lastEntryInLedger, firstEntry); } if (currentLedger == null || ledger.getId() != currentLedger.getId()) { // Cursor was placed past the end of one ledger, move it to the // beginning of the next ledger Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1); if (nextLedgerId != null) { opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0)); } else { opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0)); } } opReadEntry.checkReadCompletion(); return; } ``` Finally, it call `opReadEntry.checkReadCompletion()`, and then call `ManagedCursor#hasMoreEntries` to check whether has more entries to read. If `hasMoreEntries` returns true, it will setup another read thread to read more entries. ```java public boolean hasMoreEntries() { // If writer and reader are on the same ledger, we just need to compare the entry id to know if we have more // entries. // If they are on different ledgers we have 2 cases : // * Writer pointing to valid entry --> should return true since we have available entries // * Writer pointing to "invalid" entry -1 (meaning no entries in that ledger) --> Need to check if the reader // is // at the last entry in the previous ledger PositionImpl writerPosition = ledger.getLastPosition(); if (writerPosition.getEntryId() != -1) { return readPosition.compareTo(writerPosition) <= 0; } else { // Fall back to checking the number of entries to ensure we are at the last entry in ledger and no ledgers // are in the middle return getNumberOfEntries() > 0; } } ``` In `hasMoreEntries`, the key variables are `writerPosition` and `readPosition`. `writerPosition` is cursor.ledger.lastConfirmedEntry, which is `2273599:4` and `readPosition` is cursor.readPosition, which is `2273599:0`, thus, `hasMoreEntries` always return `true` and will fall into infinite loop and create a lot of read thread. The bug is op.readPosition not sync immediatly with cursor.readPosition. ### Changes 1. sync op.readPosition with cursor.readPosition before calling `checkReadCompletion`. * Fix Broker enters an infinite loop in ManagedLedgerImpl.asyncReadEntries * format code * format code
…ies (apache#8284) Fix apache#8229 ### Bug Detail refer to broker dump file info: https://gist.github.com/lhotari/c34db2664b15fc32c9f0e7ae8b37dbda#gistcomment-3491707 We take the first row in the extracted table for example. ```shell cursor.markDeletePosition = 2273599:-1 opread readPosition = 2282101:0 opread nextReadPosition = 2282101:0 cursor readPosition = 2273599:0 cursor writePosition = 2273599:4 cursor.ledger.currentLedger.lastAddConfirmed = -1 cursor.ledger.currentLedger.ledgerId = 2282101 SQL: ((cursor.markDeletePosition.ledgerId.toString() + ":") + cursor.markDeletePosition.entryId.toString()) AS "cursor.markDeletePosition", ((readPosition.ledgerId.toString() + ":") + readPosition.entryId.toString()) AS "opread readPosition", ((nextReadPosition.ledgerId.toString() + ":") + nextReadPosition.entryId.toString()) AS "opread nextReadPosition", ((cursor.readPosition.ledgerId.toString() + ":") + cursor.readPosition.entryId.toString()) AS "cursor readPosition", ((cursor.ledger.lastConfirmedEntry.ledgerId.toString() + ":") + cursor.ledger.lastConfirmedEntry.entryId.toString()) AS "cursor writePosition", cursor.ledger.currentLedger.lastAddConfirmed, cursor.ledger.currentLedger.ledgerId.toString() AS "cursor.ledger.currentLedger.ledgerId" ``` When call `ManagedCursorImpl#asyncReadEntries`, cursor.readPosisition is **2273599:0**, however, when using cursor.readPostition to construct opReadEntry `OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx);`, it use the cursor.readPosition to construct op.readPostition `op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);`. Due to cursor.readPosition not exist in managedLedger ledgers map, `startReadOperationOnLedger` return the earliest available ledger position, and set op.readPosition to **2282101:0**, but the cursor.readPosition still **2273599:0**. When call `ManagedLedgerImpl#asyncReadEntries` according to the constructed opReadEntry, it call `ManagedLedgerImpl#internalReadFromLedger`. The key variables as follow ```shell ledger = 2282101:-1 lastPosition = 2273599:4 ledger.getId()[2282101] != lastPosition.getLedgerId() [2273599] firstEntry = op.readPosition.getEntryId() = 0 lastEntryInLedger = ledger.getLastAddConfirmed = -1 ``` Thus, it will go into the following branch ```java if (firstEntry > lastEntryInLedger) { if (log.isDebugEnabled()) { log.debug("[{}] No more messages to read from ledger={} lastEntry={} readEntry={}", name, ledger.getId(), lastEntryInLedger, firstEntry); } if (currentLedger == null || ledger.getId() != currentLedger.getId()) { // Cursor was placed past the end of one ledger, move it to the // beginning of the next ledger Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1); if (nextLedgerId != null) { opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0)); } else { opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0)); } } opReadEntry.checkReadCompletion(); return; } ``` Finally, it call `opReadEntry.checkReadCompletion()`, and then call `ManagedCursor#hasMoreEntries` to check whether has more entries to read. If `hasMoreEntries` returns true, it will setup another read thread to read more entries. ```java public boolean hasMoreEntries() { // If writer and reader are on the same ledger, we just need to compare the entry id to know if we have more // entries. // If they are on different ledgers we have 2 cases : // * Writer pointing to valid entry --> should return true since we have available entries // * Writer pointing to "invalid" entry -1 (meaning no entries in that ledger) --> Need to check if the reader // is // at the last entry in the previous ledger PositionImpl writerPosition = ledger.getLastPosition(); if (writerPosition.getEntryId() != -1) { return readPosition.compareTo(writerPosition) <= 0; } else { // Fall back to checking the number of entries to ensure we are at the last entry in ledger and no ledgers // are in the middle return getNumberOfEntries() > 0; } } ``` In `hasMoreEntries`, the key variables are `writerPosition` and `readPosition`. `writerPosition` is cursor.ledger.lastConfirmedEntry, which is `2273599:4` and `readPosition` is cursor.readPosition, which is `2273599:0`, thus, `hasMoreEntries` always return `true` and will fall into infinite loop and create a lot of read thread. The bug is op.readPosition not sync immediatly with cursor.readPosition. ### Changes 1. sync op.readPosition with cursor.readPosition before calling `checkReadCompletion`. * Fix Broker enters an infinite loop in ManagedLedgerImpl.asyncReadEntries * format code * format code
Fix #8229
Bug Detail
refer to broker dump file info: https://gist.github.com/lhotari/c34db2664b15fc32c9f0e7ae8b37dbda#gistcomment-3491707
We take the first row in the extracted table for example.
When call
ManagedCursorImpl#asyncReadEntries
, cursor.readPosisition is 2273599:0, however, when using cursor.readPostition to construct opReadEntryOpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx);
, it use the cursor.readPosition to construct op.readPostitionop.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
. Due to cursor.readPosition not exist in managedLedger ledgers map,startReadOperationOnLedger
return the earliest available ledger position, and set op.readPosition to 2282101:0, but the cursor.readPosition still 2273599:0.When call
ManagedLedgerImpl#asyncReadEntries
according to the constructed opReadEntry, it callManagedLedgerImpl#internalReadFromLedger
. The key variables as followThus, it will go into the following branch
Finally, it call
opReadEntry.checkReadCompletion()
, and then callManagedCursor#hasMoreEntries
to check whether has more entries to read. IfhasMoreEntries
returns true, it will setup another read thread to read more entries.In
hasMoreEntries
, the key variables arewriterPosition
andreadPosition
.writerPosition
is cursor.ledger.lastConfirmedEntry, which is2273599:4
andreadPosition
is cursor.readPosition, which is2273599:0
, thus,hasMoreEntries
always returntrue
and will fall into infinite loop and create a lot of read thread.The bug is op.readPosition not sync immediatly with cursor.readPosition.
Changes
checkReadCompletion
.@sijie @jiazhai @codelipenghui @lhotari Please help take a look, we may discuss in this pr, thanks.