From 3bc81854fb28f0966304b2bec9212a8cea3401c6 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Wed, 22 Nov 2023 18:06:07 +0800 Subject: [PATCH] fix(issues460): guard read thread unsafe array Signed-off-by: Robin Han --- core/src/main/scala/kafka/server/ReplicaManager.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index edba628b47..c960bde2ee 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1279,7 +1279,9 @@ class ReplicaManager(val config: KafkaConfig, val partitionData = readPartitionInfo(partitionIndex)._2 val readCf = read(tp, partitionData, partitionData.maxBytes, minOneMessage) readCf.thenAccept(rst => { - result += (tp -> rst) + result.synchronized { + result += (tp -> rst) + } remainingBytes.getAndAdd(-rst.info.records.sizeInBytes) }) readCfArray += readCf @@ -1297,7 +1299,9 @@ class ReplicaManager(val config: KafkaConfig, val partitionData = readPartitionInfo(partitionIndex)._2 val readCf = read(tp, partitionData, 0, minOneMessage) readCf.thenAccept(rst => { - result += (tp -> rst) + result.synchronized { + result += (tp -> rst) + } remainingBytes.getAndAdd(-rst.info.records.sizeInBytes) }) remainingCfArray += readCf