Skip to content

Commit

Permalink
KAFKA-15771: fix concurrency bug in ProduceRequest#partitionSizes() (#…
Browse files Browse the repository at this point in the history
…14674)

A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this method to incorrectly returning an empty or incomplete response for a thread when another thread is in the process of initialising it. 

Reviewers: Divij Vaidya <diviv@amazon.com>, hudeqi <1217150961@qq.com>, vamossagar12 <sagarmeansocean@gmail.com>

--------------------------------
Co-authored-by: fangxiaobing <fangxiaobing@kuaishou.com>
  • Loading branch information
2 people authored and divijvaidya committed Nov 7, 2023
1 parent c7eae56 commit 5829fca
Showing 1 changed file with 3 additions and 2 deletions.
Expand Up @@ -133,14 +133,15 @@ Map<TopicPartition, Integer> partitionSizes() {
// this method may be called by different thread (see the comment on data)
synchronized (this) {
if (partitionSizes == null) {
partitionSizes = new HashMap<>();
Map<TopicPartition, Integer> tmpPartitionSizes = new HashMap<>();
data.topicData().forEach(topicData ->
topicData.partitionData().forEach(partitionData ->
partitionSizes.compute(new TopicPartition(topicData.name(), partitionData.index()),
tmpPartitionSizes.compute(new TopicPartition(topicData.name(), partitionData.index()),
(ignored, previousValue) ->
partitionData.records().sizeInBytes() + (previousValue == null ? 0 : previousValue))
)
);
partitionSizes = tmpPartitionSizes;
}
}
}
Expand Down

0 comments on commit 5829fca

Please sign in to comment.