Skip to content

Commit

Permalink
Fix Broker enters an infinite loop in ManagedLedgerImpl.asyncReadEntr…
Browse files Browse the repository at this point in the history
…ies (apache#8284)

Fix apache#8229 

### Bug Detail
refer to broker dump file info: https://gist.github.com/lhotari/c34db2664b15fc32c9f0e7ae8b37dbda#gistcomment-3491707

We take the first row in the extracted table for example.

```shell
cursor.markDeletePosition = 2273599:-1
opread readPosition = 2282101:0
opread nextReadPosition = 2282101:0
cursor readPosition = 2273599:0
cursor writePosition = 2273599:4
cursor.ledger.currentLedger.lastAddConfirmed = -1
cursor.ledger.currentLedger.ledgerId = 2282101

SQL:
((cursor.markDeletePosition.ledgerId.toString() + ":") + cursor.markDeletePosition.entryId.toString()) AS "cursor.markDeletePosition", ((readPosition.ledgerId.toString() + ":") + readPosition.entryId.toString()) AS "opread readPosition", 
((nextReadPosition.ledgerId.toString() + ":") + nextReadPosition.entryId.toString()) AS "opread nextReadPosition",
 ((cursor.readPosition.ledgerId.toString() + ":") + cursor.readPosition.entryId.toString()) AS "cursor readPosition", 
((cursor.ledger.lastConfirmedEntry.ledgerId.toString() + ":") + cursor.ledger.lastConfirmedEntry.entryId.toString()) AS "cursor writePosition", 
cursor.ledger.currentLedger.lastAddConfirmed, cursor.ledger.currentLedger.ledgerId.toString() AS
 "cursor.ledger.currentLedger.ledgerId"
```
When call `ManagedCursorImpl#asyncReadEntries`, cursor.readPosisition is **2273599:0**, however, when using cursor.readPostition to construct opReadEntry `OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx);`, it use the cursor.readPosition to construct op.readPostition `op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);`. Due to cursor.readPosition not exist in managedLedger ledgers map, `startReadOperationOnLedger` return the earliest available ledger position, and set op.readPosition to **2282101:0**, but the cursor.readPosition still **2273599:0**.

When call `ManagedLedgerImpl#asyncReadEntries` according to the constructed opReadEntry, it call `ManagedLedgerImpl#internalReadFromLedger`. The key variables as follow
```shell
ledger = 2282101:-1
lastPosition = 2273599:4
ledger.getId()[2282101] != lastPosition.getLedgerId() [2273599]
firstEntry = op.readPosition.getEntryId() = 0
lastEntryInLedger = ledger.getLastAddConfirmed = -1
```
Thus, it will go into the following branch
```java
if (firstEntry > lastEntryInLedger) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] No more messages to read from ledger={} lastEntry={} readEntry={}", name,
                        ledger.getId(), lastEntryInLedger, firstEntry);
            }

            if (currentLedger == null || ledger.getId() != currentLedger.getId()) {
                // Cursor was placed past the end of one ledger, move it to the
                // beginning of the next ledger
                Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1);
                if (nextLedgerId != null) {
                    opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0));
                } else {
                    opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0));
                }
            }

            opReadEntry.checkReadCompletion();
            return;
        }
```
Finally, it call `opReadEntry.checkReadCompletion()`, and then call `ManagedCursor#hasMoreEntries` to check whether has more entries to read. If `hasMoreEntries` returns true, it will setup another read thread to read more entries.

```java
public boolean hasMoreEntries() {
        // If writer and reader are on the same ledger, we just need to compare the entry id to know if we have more
        // entries.
        // If they are on different ledgers we have 2 cases :
        // * Writer pointing to valid entry --> should return true since we have available entries
        // * Writer pointing to "invalid" entry -1 (meaning no entries in that ledger) --> Need to check if the reader
        // is
        // at the last entry in the previous ledger
        PositionImpl writerPosition = ledger.getLastPosition();
        if (writerPosition.getEntryId() != -1) {
            return readPosition.compareTo(writerPosition) <= 0;
        } else {
            // Fall back to checking the number of entries to ensure we are at the last entry in ledger and no ledgers
            // are in the middle
            return getNumberOfEntries() > 0;
        }
    }
```
In `hasMoreEntries`, the key variables are `writerPosition` and `readPosition`. `writerPosition` is cursor.ledger.lastConfirmedEntry, which is `2273599:4` and `readPosition` is cursor.readPosition, which is `2273599:0`, thus, `hasMoreEntries` always return `true` and will fall into infinite loop and create a lot of read thread.

The bug is op.readPosition not sync immediatly with cursor.readPosition.

### Changes
1. sync op.readPosition with cursor.readPosition before calling `checkReadCompletion`.


* Fix Broker enters an infinite loop in ManagedLedgerImpl.asyncReadEntries

* format code

* format code
  • Loading branch information
hangc0276 authored and huangdx0726 committed Nov 13, 2020
1 parent 7fe1e2b commit 728135d
Showing 1 changed file with 2 additions and 2 deletions.
Expand Up @@ -1595,11 +1595,9 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob
}

private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {

// Perform the read
long firstEntry = opReadEntry.readPosition.getEntryId();
long lastEntryInLedger;
final ManagedCursorImpl cursor = opReadEntry.cursor;

PositionImpl lastPosition = lastConfirmedEntry;

Expand Down Expand Up @@ -1627,6 +1625,8 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
} else {
opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0));
}
} else {
opReadEntry.updateReadPosition(opReadEntry.readPosition);
}

opReadEntry.checkReadCompletion();
Expand Down

0 comments on commit 728135d

Please sign in to comment.