Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -2484,9 +2484,9 @@ void rollbackOrProcessStateUpdates(
Throwable throwable,
List<PersisterBatch> persisterBatches
) {
lock.writeLock().lock();
try {
if (throwable != null) {
if (throwable != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I prefer to return a chained CompletableFuture rather than manually completing an explicitly created one, as calling complete() under a lock is highly error-prone. For example, rollbackOrProcessStateUpdates could be significantly streamlined with this chaining approach.

    CompletableFuture<Void> rollbackOrProcessStateUpdates(
        Throwable throwable,
        List<PersisterBatch> persisterBatches
    ) {
        if (throwable != null) {
            lock.writeLock().lock();
            try {
              ...
                });
            } finally {
                lock.writeLock().unlock();
            }
            return CompletableFuture.failedFuture(throwable);
        }

        if (persisterBatches.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        ...
        return writeShareGroupState(persisterBatches.stream().map(PersisterBatch::stateBatch).toList())
            .whenComplete((result, exception) -> {

Another benefit is that it naturally propagates any exceptions thrown inside the whenComplete block.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Valid suggestion. I wanted to keep the changes minimal for the blocker. Shall I do the chaining only in trunk so we have limited scope of testing in 4.2.1 and 4.3.0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created following ticket and assigned to myself. It will be benfecial and can be well tested in trunk prior 4.4.0 release as it's code improvement: https://issues.apache.org/jira/browse/KAFKA-20521. Please let me know if you think it's fine to have separately.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh, you mentioned already that this can be a follow-up then I shall merge this commit and cherry-pick. Then shall open follow-up.

lock.writeLock().lock();
try {
// Log in DEBUG to avoid flooding of logs for a faulty client.
log.debug("Request failed for updating state, rollback any changed state"
+ " for the share partition: {}-{}", groupId, topicIdPartition);
Expand All @@ -2503,16 +2503,16 @@ void rollbackOrProcessStateUpdates(
deliveryCompleteCount.addAndGet(-numInFlightRecordsInBatch(persisterBatch.stateBatch.firstOffset(), persisterBatch.stateBatch.lastOffset()));
}
});
future.completeExceptionally(throwable);
return;
} finally {
lock.writeLock().unlock();
}
future.completeExceptionally(throwable);
return;
}

if (persisterBatches.isEmpty()) {
future.complete(null);
return;
}
} finally {
lock.writeLock().unlock();
if (persisterBatches.isEmpty()) {
future.complete(null);
return;
}

writeShareGroupState(persisterBatches.stream().map(PersisterBatch::stateBatch).toList())
Expand Down Expand Up @@ -2541,7 +2541,6 @@ void rollbackOrProcessStateUpdates(
deliveryCompleteCount.addAndGet(-numInFlightRecordsInBatch(persisterBatch.stateBatch.firstOffset(), persisterBatch.stateBatch.lastOffset()));
}
});
future.completeExceptionally(exception);
return;
}

Expand Down Expand Up @@ -2639,9 +2638,13 @@ void rollbackOrProcessStateUpdates(
});
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
future.complete(null);
} finally {
lock.writeLock().unlock();
if (exception != null) {
future.completeExceptionally(exception);
} else {
future.complete(null);
}
// Maybe complete the delayed share fetch request if the state has been changed in cache
// which might have moved start offset ahead. Hence, the pending delayed share fetch
// request can be completed. The call should be made outside the lock to avoid deadlock.
Expand Down
Loading