diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index c960bde2ee..e60503926c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1270,7 +1270,7 @@ class ReplicaManager(val config: KafkaConfig, val remainingBytes = new AtomicInteger(limitBytes) var partitionIndex = 0; while (remainingBytes.get() > 0 && partitionIndex < readPartitionInfo.size) { - val readCfArray = new ArrayBuffer[CompletableFuture[LogReadResult]] + val readCfArray = new ArrayBuffer[CompletableFuture[Void]] var assignedBytes = 0 val availableBytes = remainingBytes.get() @@ -1278,13 +1278,12 @@ class ReplicaManager(val config: KafkaConfig, val tp = readPartitionInfo(partitionIndex)._1 val partitionData = readPartitionInfo(partitionIndex)._2 val readCf = read(tp, partitionData, partitionData.maxBytes, minOneMessage) - readCf.thenAccept(rst => { + readCfArray += readCf.thenAccept(rst => { result.synchronized { result += (tp -> rst) } remainingBytes.getAndAdd(-rst.info.records.sizeInBytes) }) - readCfArray += readCf assignedBytes += partitionData.maxBytes partitionIndex += 1 } @@ -1293,18 +1292,17 @@ class ReplicaManager(val config: KafkaConfig, // The remaining partitions still need to be read, but we limit byte size to 0. // The corresponding futures are completed immediately with empty LogReadResult. - val remainingCfArray = new ArrayBuffer[CompletableFuture[LogReadResult]] + val remainingCfArray = new ArrayBuffer[CompletableFuture[Void]] while (partitionIndex < readPartitionInfo.size) { val tp = readPartitionInfo(partitionIndex)._1 val partitionData = readPartitionInfo(partitionIndex)._2 val readCf = read(tp, partitionData, 0, minOneMessage) - readCf.thenAccept(rst => { + remainingCfArray += readCf.thenAccept(rst => { result.synchronized { result += (tp -> rst) } remainingBytes.getAndAdd(-rst.info.records.sizeInBytes) }) - remainingCfArray += readCf partitionIndex += 1 } CompletableFuture.allOf(remainingCfArray.toArray: _*).get()