Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][broker] cursor will read in dead loop when do tailing-read with enableTransaction #22943

Open
3 tasks done
TakaHiR07 opened this issue Jun 19, 2024 · 0 comments
Open
3 tasks done
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@TakaHiR07
Copy link
Contributor

TakaHiR07 commented Jun 19, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

client: pulsar-3.0.5
broker: pulsar-3.0.5

Minimal reproduce step

do txn produce and normal consume on a 200-partition topic by pulsar-perf. The throughput is 10MB/s, batchSize is 10, subscriptionType is exclusive. It is a tailing read, consuming the latest message

produce config is : -txn -nmt 1000 -time 0 -s 1024 -i 60 -bm 10 -b 1000 -bb 4194304 -r 10000 -mk random -threads 3

consume config is : -time 0 -i 60 -s sub_test_txn_p200 -ss sub_test_txn_p200 -sp Latest -ioThreads 1 -n 1

What did you expect to see?

cpu load is low

What did you see instead?

broker with little throughput but high cpu load

image
image

Anything else?

This issue is proposed before but actually the issue still exist in the master branch . And it is a serious issue that result in transaction unavailable.

The root is :

In ManagedCursorImpl#asyncReadEntriesWithSkipOrWait, hasMoreEntries() only compare readPosition and lastConfirmedEntry. However, if we enableTransaction, maxReadPosition also decide whether we can read entry.

Currently, if readPosition < lastConfirmedEntry && readPosition > maxReadPosition. We can read entry immediately. But when enter internalReadFromLedger(), we will go into opReadEntry.checkReadCompletion(), and then trigger callback.readEntriesComplete()

Therefore, it would continue to read entry in dead loop, but actually there is no need to read entry.

public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, Position maxPosition,
Predicate<Position> skipCondition) {
checkArgument(maxEntries > 0);
if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}
int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes);
if (hasMoreEntries()) {
// If we have available entries, we can read them immediately
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);
}
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
maxPosition, skipCondition);
} else {
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deferring retry of read at position {}", ledger.getName(), name, op.readPosition);
}
// Check again for new entries after the configured time, then if still no entries are available register
// to be notified
if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
ledger.getScheduledExecutor()
.schedule(() -> checkForNewEntries(op, callback, ctx),
getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
} else {
// If there's no delay, check directly from the same thread
checkForNewEntries(op, callback, ctx);
}
}
}

private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {
if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
opReadEntry.checkReadCompletion();
return;
}

void checkReadCompletion() {
// op readPosition is smaller or equals maxPosition then can read again
if (entries.size() < count && cursor.hasMoreEntries()
&& maxPosition.compareTo(readPosition) > 0) {
// We still have more entries to read from the next ledger, schedule a new async operation
cursor.ledger.getExecutor().execute(() -> {
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
});
} else {
// The reading was already completed, release resources and trigger callback
try {
cursor.readOperationCompleted();
} finally {
cursor.ledger.getExecutor().execute(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
});
}
}
}

Are you willing to submit a PR?

  • I'm willing to submit a PR!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

1 participant