Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1270,21 +1270,20 @@ class ReplicaManager(val config: KafkaConfig,
val remainingBytes = new AtomicInteger(limitBytes)
var partitionIndex = 0;
while (remainingBytes.get() > 0 && partitionIndex < readPartitionInfo.size) {
val readCfArray = new ArrayBuffer[CompletableFuture[LogReadResult]]
val readCfArray = new ArrayBuffer[CompletableFuture[Void]]
var assignedBytes = 0
val availableBytes = remainingBytes.get()

while (assignedBytes < availableBytes && partitionIndex < readPartitionInfo.size) {
val tp = readPartitionInfo(partitionIndex)._1
val partitionData = readPartitionInfo(partitionIndex)._2
val readCf = read(tp, partitionData, partitionData.maxBytes, minOneMessage)
readCf.thenAccept(rst => {
readCfArray += readCf.thenAccept(rst => {
result.synchronized {
result += (tp -> rst)
}
remainingBytes.getAndAdd(-rst.info.records.sizeInBytes)
})
readCfArray += readCf
assignedBytes += partitionData.maxBytes
partitionIndex += 1
}
Expand All @@ -1293,18 +1292,17 @@ class ReplicaManager(val config: KafkaConfig,

// The remaining partitions still need to be read, but we limit byte size to 0.
// The corresponding futures are completed immediately with empty LogReadResult.
val remainingCfArray = new ArrayBuffer[CompletableFuture[LogReadResult]]
val remainingCfArray = new ArrayBuffer[CompletableFuture[Void]]
while (partitionIndex < readPartitionInfo.size) {
val tp = readPartitionInfo(partitionIndex)._1
val partitionData = readPartitionInfo(partitionIndex)._2
val readCf = read(tp, partitionData, 0, minOneMessage)
readCf.thenAccept(rst => {
remainingCfArray += readCf.thenAccept(rst => {
result.synchronized {
result += (tp -> rst)
}
remainingBytes.getAndAdd(-rst.info.records.sizeInBytes)
})
remainingCfArray += readCf
partitionIndex += 1
}
CompletableFuture.allOf(remainingCfArray.toArray: _*).get()
Expand Down