diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index ce977b0706fd..9435de1cc448 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -754,19 +754,18 @@ impl TopKHeap { return Ok((None, topk_rows)); } - // Indices for each row within its respective RecordBatch - let indices: Vec<_> = topk_rows - .iter() - .enumerate() - .map(|(i, k)| (i, k.index)) - .collect(); + // Collect the batches into a vec and store the "batch_id -> array_pos" mapping, to then + // build the `indices` vec below. This is needed since the batch ids are not continuous. + let mut record_batches = Vec::new(); + let mut batch_id_array_pos = HashMap::new(); + for (array_pos, (batch_id, batch)) in self.store.batches.iter().enumerate() { + record_batches.push(&batch.batch); + batch_id_array_pos.insert(*batch_id, array_pos); + } - let record_batches: Vec<_> = topk_rows + let indices: Vec<_> = topk_rows .iter() - .map(|k| { - let entry = self.store.get(k.batch_id).expect("invalid stored batch id"); - &entry.batch - }) + .map(|k| (batch_id_array_pos[&k.batch_id], k.index)) .collect(); // At this point `indices` contains indexes within the