Skip to content

Commit

Permalink
merge: #8617
Browse files Browse the repository at this point in the history
8617: [Backport stable/1.2] fix(log/appender): yield thread when experiencing backpressure r=romansmirnov a=github-actions[bot]

# Description
Backport of #8582 to `stable/1.2`.

relates to #8540

Co-authored-by: Roman <roman.smirnov@camunda.com>
  • Loading branch information
zeebe-bors-cloud[bot] and romansmirnov committed Jan 20, 2022
2 parents 80735be + 7029a31 commit 552cdf2
Showing 1 changed file with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ private AppendLimiter initNoBackpressure(final int partition) {
return new NoopAppendLimiter();
}

private void appendBlock(final BlockPeek blockPeek) {
/**
* Appends the passed block to the {@link LogStorage}.
*
* @param blockPeek block to append
* @return true when the block could be appended to log storage, otherwise, false is returned
*/
private boolean appendBlock(final BlockPeek blockPeek) {
final ByteBuffer rawBuffer = blockPeek.getRawBuffer();
final int bytes = rawBuffer.remaining();
final ByteBuffer copiedBuffer = ByteBuffer.allocate(bytes).put(rawBuffer).flip();
Expand All @@ -126,13 +132,15 @@ private void appendBlock(final BlockPeek blockPeek) {
logStorage.append(positions.getLeft(), positions.getRight(), copiedBuffer, listener);

blockPeek.markCompleted();
return true;
} else {
appendBackpressureMetrics.deferred();
LOG.trace(
"Backpressure happens: in flight {} limit {}",
appendEntryLimiter.getInflight(),
appendEntryLimiter.getLimit());
// we will be called later again
return false;
}
}

Expand Down Expand Up @@ -178,10 +186,17 @@ public void onActorFailed() {
}

private void onWriteBufferAvailable() {
final BlockPeek blockPeek = new BlockPeek();
if (writeBufferSubscription.peekBlock(blockPeek, maxAppendBlockSize, true) > 0) {
appendBlock(blockPeek);
} else {
final var blockPeek = new BlockPeek();
final var readBytes = writeBufferSubscription.peekBlock(blockPeek, maxAppendBlockSize, true);

final var canAppend = readBytes > 0;
var appendBlockSucceeded = false;

if (canAppend) {
appendBlockSucceeded = appendBlock(blockPeek);
}

if (!canAppend || !appendBlockSucceeded) {
actor.yieldThread();
}
}
Expand Down

0 comments on commit 552cdf2

Please sign in to comment.