From ec44ef73933fec57e1006e42f9ecd10546081263 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 10 Oct 2025 13:51:59 -0500 Subject: [PATCH 1/8] initial implementation of spilling RepartitionExec --- .../physical-plan/src/repartition/mod.rs | 338 ++++++++++++++---- 1 file changed, 265 insertions(+), 73 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index a5bf68a63c38..42b5d61dd517 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -32,12 +32,13 @@ use super::{ }; use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::hash_utils::create_hashes; -use crate::metrics::BaselineMetrics; +use crate::metrics::{BaselineMetrics, SpillMetrics}; use crate::projection::{all_columns, make_with_child, update_expr, ProjectionExec}; use crate::repartition::distributor_channels::{ channels, partition_aware_channels, DistributionReceiver, DistributionSender, }; use crate::sorts::streaming_merge::StreamingMergeBuilder; +use crate::spill::spill_manager::SpillManager; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; @@ -50,6 +51,7 @@ use datafusion_common::utils::transpose; use datafusion_common::{internal_err, ColumnStatistics, HashMap}; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; @@ -66,22 +68,42 @@ use parking_lot::Mutex; mod distributor_channels; -type MaybeBatch = Option>; +/// A batch in the repartition queue - either in memory or spilled to disk +#[derive(Debug)] +enum RepartitionBatch { + /// Batch held in memory (counts against memory reservation) + Memory(RecordBatch), + /// Batch spilled to disk (one file per batch for queue semantics) + /// File automatically deleted when dropped via reference counting + /// The size field stores the original batch size for validation when reading back + Spilled { + spill_file: RefCountedTempFile, + size: usize, + }, +} + +type MaybeBatch = Option>; type InputPartitionsToCurrentPartitionSender = Vec>; type InputPartitionsToCurrentPartitionReceiver = Vec>; +/// Channels and resources for a single output partition +#[derive(Debug)] +struct PartitionChannels { + /// Senders for each input partition to send data to this output partition + tx: InputPartitionsToCurrentPartitionSender, + /// Receivers for each input partition sending data to this output partition + rx: InputPartitionsToCurrentPartitionReceiver, + /// Memory reservation for this output partition + reservation: SharedMemoryReservation, + /// Spill manager for handling disk spills for this output partition + spill_manager: Arc, +} + #[derive(Debug)] struct ConsumingInputStreamsState { /// Channels for sending batches from input partitions to output partitions. /// Key is the partition number. - channels: HashMap< - usize, - ( - InputPartitionsToCurrentPartitionSender, - InputPartitionsToCurrentPartitionReceiver, - SharedMemoryReservation, - ), - >, + channels: HashMap, /// Helper that ensures that that background job is killed once it is no longer needed. abort_helper: Arc>>, @@ -161,8 +183,8 @@ impl RepartitionExecState { let streams_and_metrics = match self { RepartitionExecState::NotInitialized => { self.ensure_input_streams_initialized( - input, - metrics, + Arc::clone(&input), + metrics.clone(), partitioning.partition_count(), Arc::clone(&context), )?; @@ -205,9 +227,24 @@ impl RepartitionExecState { for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() { let reservation = Arc::new(Mutex::new( MemoryConsumer::new(format!("{name}[{partition}]")) + .with_can_spill(true) .register(context.memory_pool()), )); - channels.insert(partition, (tx, rx, reservation)); + let spill_metrics = SpillMetrics::new(&metrics, partition); + let spill_manager = Arc::new(SpillManager::new( + Arc::clone(&context.runtime_env()), + spill_metrics, + input.schema(), + )); + channels.insert( + partition, + PartitionChannels { + tx, + rx, + reservation, + spill_manager, + }, + ); } // launch one async task per *input* partition @@ -217,8 +254,15 @@ impl RepartitionExecState { { let txs: HashMap<_, _> = channels .iter() - .map(|(partition, (tx, _rx, reservation))| { - (*partition, (tx[i].clone(), Arc::clone(reservation))) + .map(|(partition, channels)| { + ( + *partition, + ( + channels.tx[i].clone(), + Arc::clone(&channels.reservation), + Arc::clone(&channels.spill_manager), + ), + ) }) .collect(); @@ -234,7 +278,9 @@ impl RepartitionExecState { let wait_for_task = SpawnedTask::spawn(RepartitionExec::wait_for_task( input_task, txs.into_iter() - .map(|(partition, (tx, _reservation))| (partition, tx)) + .map(|(partition, (tx, _reservation, _spill_manager))| { + (partition, tx) + }) .collect(), )); spawned_tasks.push(wait_for_task); @@ -675,7 +721,7 @@ impl ExecutionPlan for RepartitionExec { let num_input_partitions = input.output_partitioning().partition_count(); // lock scope - let (mut rx, reservation, abort_helper) = { + let (mut rx, reservation, spill_manager, abort_helper) = { // lock mutexes let mut state = state.lock(); let state = state.consume_input_streams( @@ -689,12 +735,22 @@ impl ExecutionPlan for RepartitionExec { // now return stream for the specified *output* partition which will // read from the channel - let (_tx, rx, reservation) = state + let PartitionChannels { + rx, + reservation, + spill_manager, + .. + } = state .channels .remove(&partition) .expect("partition not used yet"); - (rx, reservation, Arc::clone(&state.abort_helper)) + ( + rx, + reservation, + spill_manager, + Arc::clone(&state.abort_helper), + ) }; trace!( @@ -711,6 +767,8 @@ impl ExecutionPlan for RepartitionExec { receiver, _drop_helper: Arc::clone(&abort_helper), reservation: Arc::clone(&reservation), + spill_manager: Arc::clone(&spill_manager), + state: RepartitionStreamState::ReceivingFromChannel, }) as SendableRecordBatchStream }) .collect::>(); @@ -739,6 +797,8 @@ impl ExecutionPlan for RepartitionExec { input: rx.swap_remove(0), _drop_helper: abort_helper, reservation, + spill_manager, + state: RepartitionStreamState::ReceivingFromChannel, }) as SendableRecordBatchStream) } }) @@ -979,7 +1039,11 @@ impl RepartitionExec { mut stream: SendableRecordBatchStream, mut output_channels: HashMap< usize, - (DistributionSender, SharedMemoryReservation), + ( + DistributionSender, + SharedMemoryReservation, + Arc, + ), >, partitioning: Partitioning, metrics: RepartitionMetrics, @@ -1007,12 +1071,38 @@ impl RepartitionExec { let timer = metrics.send_time[partition].timer(); // if there is still a receiver, send to it - if let Some((tx, reservation)) = output_channels.get_mut(&partition) { - reservation.lock().try_grow(size)?; - - if tx.send(Some(Ok(batch))).await.is_err() { + if let Some((tx, reservation, spill_manager)) = + output_channels.get_mut(&partition) + { + let (batch_to_send, is_memory_batch) = + match reservation.lock().try_grow(size) { + Ok(_) => { + // Memory available - send in-memory batch + (RepartitionBatch::Memory(batch), true) + } + Err(_) => { + // We're memory limited - spill this single batch to its own file + let spill_file = spill_manager + .spill_record_batch_and_finish( + &[batch], + &format!( + "RepartitionExec spill partition {}", + partition + ), + )? + .expect("non-empty batch should produce spill file"); + + // Store size for validation when reading back + (RepartitionBatch::Spilled { spill_file, size }, false) + } + }; + + if tx.send(Some(Ok(batch_to_send))).await.is_err() { // If the other end has hung up, it was an early shutdown (e.g. LIMIT) - reservation.lock().shrink(size); + // Only shrink memory if it was a memory batch + if is_memory_batch { + reservation.lock().shrink(size); + } output_channels.remove(&partition); } } @@ -1093,6 +1183,13 @@ impl RepartitionExec { } } +enum RepartitionStreamState { + /// Waiting for next item from channel + ReceivingFromChannel, + /// Reading a spilled batch from disk (stream reads via tokio::fs) + ReadingSpilledBatch(SendableRecordBatchStream), +} + struct RepartitionStream { /// Number of input partitions that will be sending batches to this output channel num_input_partitions: usize, @@ -1111,6 +1208,12 @@ struct RepartitionStream { /// Memory reservation. reservation: SharedMemoryReservation, + + /// Spill manager for reading spilled batches + spill_manager: Arc, + + /// Current state of the stream + state: RepartitionStreamState, } impl Stream for RepartitionStream { @@ -1121,33 +1224,73 @@ impl Stream for RepartitionStream { cx: &mut Context<'_>, ) -> Poll> { loop { - match self.input.recv().poll_unpin(cx) { - Poll::Ready(Some(Some(v))) => { - if let Ok(batch) = &v { - self.reservation - .lock() - .shrink(batch.get_array_memory_size()); + match &mut self.state { + RepartitionStreamState::ReceivingFromChannel => { + match self.input.recv().poll_unpin(cx) { + Poll::Ready(Some(Some(v))) => { + match v { + Ok(RepartitionBatch::Memory(batch)) => { + // Release memory and return + self.reservation + .lock() + .shrink(batch.get_array_memory_size()); + return Poll::Ready(Some(Ok(batch))); + } + Ok(RepartitionBatch::Spilled { spill_file, size }) => { + // Read from disk - SpillReaderStream uses tokio::fs internally + // Pass the original size for validation + let stream = self + .spill_manager + .read_spill_as_stream(spill_file, Some(size))?; + self.state = + RepartitionStreamState::ReadingSpilledBatch( + stream, + ); + // Continue loop to poll the stream immediately + } + Err(e) => { + return Poll::Ready(Some(Err(e))); + } + } + } + Poll::Ready(Some(None)) => { + self.num_input_partitions_processed += 1; + + if self.num_input_partitions + == self.num_input_partitions_processed + { + // all input partitions have finished sending batches + return Poll::Ready(None); + } else { + // other partitions still have data to send + continue; + } + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => { + return Poll::Pending; + } } - - return Poll::Ready(Some(v)); } - Poll::Ready(Some(None)) => { - self.num_input_partitions_processed += 1; - - if self.num_input_partitions == self.num_input_partitions_processed { - // all input partitions have finished sending batches - return Poll::Ready(None); - } else { - // other partitions still have data to send - continue; + RepartitionStreamState::ReadingSpilledBatch(stream) => { + match futures::ready!(stream.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + // Return batch and stay in ReadingSpilledBatch state to read more batches + return Poll::Ready(Some(Ok(batch))); + } + Some(Err(e)) => { + self.state = RepartitionStreamState::ReceivingFromChannel; + return Poll::Ready(Some(Err(e))); + } + None => { + // Spill stream ended - go back to receiving from channel + self.state = RepartitionStreamState::ReceivingFromChannel; + continue; + } } } - Poll::Ready(None) => { - return Poll::Ready(None); - } - Poll::Pending => { - return Poll::Pending; - } } } } @@ -1174,6 +1317,12 @@ struct PerPartitionStream { /// Memory reservation. reservation: SharedMemoryReservation, + + /// Spill manager for reading spilled batches + spill_manager: Arc, + + /// Current state of the stream + state: RepartitionStreamState, } impl Stream for PerPartitionStream { @@ -1183,21 +1332,63 @@ impl Stream for PerPartitionStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match self.receiver.recv().poll_unpin(cx) { - Poll::Ready(Some(Some(v))) => { - if let Ok(batch) = &v { - self.reservation - .lock() - .shrink(batch.get_array_memory_size()); + loop { + match &mut self.state { + RepartitionStreamState::ReceivingFromChannel => { + match self.receiver.recv().poll_unpin(cx) { + Poll::Ready(Some(Some(v))) => { + match v { + Ok(RepartitionBatch::Memory(batch)) => { + // Release memory and return + self.reservation + .lock() + .shrink(batch.get_array_memory_size()); + return Poll::Ready(Some(Ok(batch))); + } + Ok(RepartitionBatch::Spilled { spill_file, size }) => { + // Read from disk - SpillReaderStream uses tokio::fs internally + // Pass the original size for validation + let stream = self + .spill_manager + .read_spill_as_stream(spill_file, Some(size))?; + self.state = + RepartitionStreamState::ReadingSpilledBatch( + stream, + ); + // Continue loop to poll the stream immediately + } + Err(e) => { + return Poll::Ready(Some(Err(e))); + } + } + } + Poll::Ready(Some(None)) => { + // Input partition has finished sending batches + return Poll::Ready(None); + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + + RepartitionStreamState::ReadingSpilledBatch(stream) => { + match futures::ready!(stream.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + // Return batch and stay in ReadingSpilledBatch state to read more batches + return Poll::Ready(Some(Ok(batch))); + } + Some(Err(e)) => { + self.state = RepartitionStreamState::ReceivingFromChannel; + return Poll::Ready(Some(Err(e))); + } + None => { + // Spill stream ended - go back to receiving from channel + self.state = RepartitionStreamState::ReceivingFromChannel; + continue; + } + } } - Poll::Ready(Some(v)) - } - Poll::Ready(Some(None)) => { - // Input partition has finished sending batches - Poll::Ready(None) } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, } } } @@ -1229,8 +1420,8 @@ mod tests { use arrow::array::{ArrayRef, StringArray, UInt32Array}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::cast::as_string_array; + use datafusion_common::exec_err; use datafusion_common::test_util::batches_to_sort_string; - use datafusion_common::{arrow_datafusion_err, exec_err}; use datafusion_common_runtime::JoinSet; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use insta::assert_snapshot; @@ -1712,14 +1903,14 @@ mod tests { } #[tokio::test] - async fn oom() -> Result<()> { - // define input partitions + async fn repartition_with_spilling() -> Result<()> { + // Test that repartition successfully spills to disk when memory is constrained let schema = test_schema(); let partition = create_vec_batches(50); let input_partitions = vec![partition]; let partitioning = Partitioning::RoundRobinBatch(4); - // setup up context + // Set up context with very tight memory limit to force spilling let runtime = RuntimeEnvBuilder::default() .with_memory_limit(1, 1.0) .build_arc()?; @@ -1732,18 +1923,19 @@ mod tests { TestMemoryExec::try_new_exec(&input_partitions, Arc::clone(&schema), None)?; let exec = RepartitionExec::try_new(exec, partitioning)?; - // pull partitions + // Collect all partitions - should succeed by spilling to disk + let mut total_rows = 0; for i in 0..exec.partitioning().partition_count() { let mut stream = exec.execute(i, Arc::clone(&task_ctx))?; - let err = - arrow_datafusion_err!(stream.next().await.unwrap().unwrap_err().into()); - let err = err.find_root(); - assert!( - matches!(err, DataFusionError::ResourcesExhausted(_)), - "Wrong error type: {err}", - ); + while let Some(result) = stream.next().await { + let batch = result?; + total_rows += batch.num_rows(); + } } + // Verify we got all the data (50 batches * 8 rows each) + assert_eq!(total_rows, 50 * 8); + Ok(()) } From 248fe511d0964161b0607b4524426604e9296aef Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 10 Oct 2025 14:09:02 -0500 Subject: [PATCH 2/8] add spilling to RepartitionExec --- .../physical-plan/src/repartition/mod.rs | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 42b5d61dd517..a3c168f9d0a8 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1936,6 +1936,161 @@ mod tests { // Verify we got all the data (50 batches * 8 rows each) assert_eq!(total_rows, 50 * 8); + // Verify spilling metrics to confirm spilling actually happened + let metrics = exec.metrics().unwrap(); + assert!( + metrics.spill_count().unwrap() > 0, + "Expected spill_count > 0, but got {:?}", + metrics.spill_count() + ); + println!("Spilled {} times", metrics.spill_count().unwrap()); + assert!( + metrics.spilled_bytes().unwrap() > 0, + "Expected spilled_bytes > 0, but got {:?}", + metrics.spilled_bytes() + ); + println!( + "Spilled {} bytes in {} spills", + metrics.spilled_bytes().unwrap(), + metrics.spill_count().unwrap() + ); + assert!( + metrics.spilled_rows().unwrap() > 0, + "Expected spilled_rows > 0, but got {:?}", + metrics.spilled_rows() + ); + println!("Spilled {} rows", metrics.spilled_rows().unwrap()); + + Ok(()) + } + + #[tokio::test] + async fn repartition_with_partial_spilling() -> Result<()> { + // Test that repartition can handle partial spilling (some batches in memory, some spilled) + let schema = test_schema(); + let partition = create_vec_batches(50); + let input_partitions = vec![partition]; + let partitioning = Partitioning::RoundRobinBatch(4); + + // Set up context with moderate memory limit to force partial spilling + // 2KB should allow some batches in memory but force others to spill + let runtime = RuntimeEnvBuilder::default() + .with_memory_limit(2 * 1024, 1.0) + .build_arc()?; + + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); + + // create physical plan + let exec = + TestMemoryExec::try_new_exec(&input_partitions, Arc::clone(&schema), None)?; + let exec = RepartitionExec::try_new(exec, partitioning)?; + + // Collect all partitions - should succeed with partial spilling + let mut total_rows = 0; + for i in 0..exec.partitioning().partition_count() { + let mut stream = exec.execute(i, Arc::clone(&task_ctx))?; + while let Some(result) = stream.next().await { + let batch = result?; + total_rows += batch.num_rows(); + } + } + + // Verify we got all the data (50 batches * 8 rows each) + assert_eq!(total_rows, 50 * 8); + + // Verify partial spilling metrics + let metrics = exec.metrics().unwrap(); + let spill_count = metrics.spill_count().unwrap(); + let spilled_rows = metrics.spilled_rows().unwrap(); + let spilled_bytes = metrics.spilled_bytes().unwrap(); + + assert!( + spill_count > 0, + "Expected some spilling to occur, but got spill_count={}", + spill_count + ); + assert!( + spilled_rows > 0 && spilled_rows < total_rows, + "Expected partial spilling (0 < spilled_rows < {}), but got spilled_rows={}", + total_rows, + spilled_rows + ); + assert!( + spilled_bytes > 0, + "Expected some bytes to be spilled, but got spilled_bytes={}", + spilled_bytes + ); + + println!( + "Partial spilling: spilled {} out of {} rows ({:.1}%) in {} spills, {} bytes", + spilled_rows, + total_rows, + (spilled_rows as f64 / total_rows as f64) * 100.0, + spill_count, + spilled_bytes + ); + + Ok(()) + } + + #[tokio::test] + async fn repartition_without_spilling() -> Result<()> { + // Test that repartition does not spill when there's ample memory + let schema = test_schema(); + let partition = create_vec_batches(50); + let input_partitions = vec![partition]; + let partitioning = Partitioning::RoundRobinBatch(4); + + // Set up context with generous memory limit - no spilling should occur + let runtime = RuntimeEnvBuilder::default() + .with_memory_limit(10 * 1024 * 1024, 1.0) // 10MB + .build_arc()?; + + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); + + // create physical plan + let exec = + TestMemoryExec::try_new_exec(&input_partitions, Arc::clone(&schema), None)?; + let exec = RepartitionExec::try_new(exec, partitioning)?; + + // Collect all partitions - should succeed without spilling + let mut total_rows = 0; + for i in 0..exec.partitioning().partition_count() { + let mut stream = exec.execute(i, Arc::clone(&task_ctx))?; + while let Some(result) = stream.next().await { + let batch = result?; + total_rows += batch.num_rows(); + } + } + + // Verify we got all the data (50 batches * 8 rows each) + assert_eq!(total_rows, 50 * 8); + + // Verify no spilling occurred + let metrics = exec.metrics().unwrap(); + assert_eq!( + metrics.spill_count(), + Some(0), + "Expected no spilling, but got spill_count={:?}", + metrics.spill_count() + ); + assert_eq!( + metrics.spilled_bytes(), + Some(0), + "Expected no bytes spilled, but got spilled_bytes={:?}", + metrics.spilled_bytes() + ); + assert_eq!( + metrics.spilled_rows(), + Some(0), + "Expected no rows spilled, but got spilled_rows={:?}", + metrics.spilled_rows() + ); + + println!("No spilling occurred - all data processed in memory"); + Ok(()) } From 33e14d7146ea5f949ace6b9e009f66dd7a5a7e0f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 10 Oct 2025 14:24:46 -0500 Subject: [PATCH 3/8] lint --- datafusion/physical-plan/src/repartition/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index a3c168f9d0a8..dbbbef33b953 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1086,8 +1086,7 @@ impl RepartitionExec { .spill_record_batch_and_finish( &[batch], &format!( - "RepartitionExec spill partition {}", - partition + "RepartitionExec spill partition {partition}" ), )? .expect("non-empty batch should produce spill file"); From 307db53fce79b7ffb6b9e127712fecf785d4f2de Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 10 Oct 2025 15:07:56 -0500 Subject: [PATCH 4/8] fix --- datafusion/physical-plan/src/repartition/mod.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index dbbbef33b953..12d7b101bc91 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -2006,19 +2006,15 @@ mod tests { assert!( spill_count > 0, - "Expected some spilling to occur, but got spill_count={}", - spill_count + "Expected some spilling to occur, but got spill_count={spill_count}" ); assert!( spilled_rows > 0 && spilled_rows < total_rows, - "Expected partial spilling (0 < spilled_rows < {}), but got spilled_rows={}", - total_rows, - spilled_rows + "Expected partial spilling (0 < spilled_rows < {total_rows}), but got spilled_rows={spilled_rows}" ); assert!( spilled_bytes > 0, - "Expected some bytes to be spilled, but got spilled_bytes={}", - spilled_bytes + "Expected some bytes to be spilled, but got spilled_bytes={spilled_bytes}" ); println!( From 4cfc2f5e725774a41b3a9e64c3a3534d5799f7f5 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 11 Oct 2025 07:34:54 -0500 Subject: [PATCH 5/8] add e2e test --- datafusion/core/tests/memory_limit/mod.rs | 1 + .../memory_limit/repartition_mem_limit.rs | 117 ++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 datafusion/core/tests/memory_limit/repartition_mem_limit.rs diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 89bc48b1e634..5d8a1d24181c 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -23,6 +23,7 @@ use std::sync::{Arc, LazyLock}; #[cfg(feature = "extended_tests")] mod memory_limit_validation; +mod repartition_mem_limit; use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray}; use arrow::compute::SortOptions; use arrow::datatypes::{Int32Type, SchemaRef}; diff --git a/datafusion/core/tests/memory_limit/repartition_mem_limit.rs b/datafusion/core/tests/memory_limit/repartition_mem_limit.rs new file mode 100644 index 000000000000..c012f405bb20 --- /dev/null +++ b/datafusion/core/tests/memory_limit/repartition_mem_limit.rs @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int32Array, RecordBatch}; +use datafusion::{ + assert_batches_sorted_eq, + prelude::{SessionConfig, SessionContext}, +}; +use datafusion_catalog::MemTable; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; +use datafusion_physical_plan::repartition::RepartitionExec; +use futures::TryStreamExt; +use itertools::Itertools; + +/// End to end test for spilling in RepartitionExec. +/// The idea is to make a real world query with a relatively low memory limit and +/// then drive one partition at a time, simulating dissimlar execution speed in partitions. +/// Just as some examples of real world scenarios where this can happen consider +/// lopsided groups in a group by especially if one partitions spills and others dont', +/// or in distributed systems if one upstream node is slower than others. +#[tokio::test] +async fn test_repartition_memory_limit() { + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(1024 * 1024, 1.0) + .build() + .unwrap(); + let config = SessionConfig::new() + .with_batch_size(32) + .with_target_partitions(2); + let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime)); + let batches = vec![RecordBatch::try_from_iter(vec![( + "c1", + Arc::new(Int32Array::from_iter_values( + (0..10).cycle().take(1_000_000), + )) as ArrayRef, + )]) + .unwrap()]; + let table = Arc::new(MemTable::try_new(batches[0].schema(), vec![batches]).unwrap()); + ctx.register_table("t", table).unwrap(); + let plan = ctx + .state() + .create_logical_plan("SELECT c1, count(*) as c FROM t GROUP BY c1;") + .await + .unwrap(); + let plan = ctx.state().create_physical_plan(&plan).await.unwrap(); + // Execute partition 0, this should cause items going into the rest of the partitions to queue up and because + // of the low memory limit should spill to disk. + let batches0 = Arc::clone(&plan) + .execute(0, ctx.task_ctx()) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let mut metrics = None; + Arc::clone(&plan) + .transform_down(|node| { + if node.as_any().is::() { + metrics = node.metrics(); + } + Ok(Transformed::no(node)) + }) + .unwrap(); + + let metrics = metrics.unwrap(); + assert!(metrics.spilled_bytes().unwrap() > 0); + assert!(metrics.spilled_rows().unwrap() > 0); + assert!(metrics.spill_count().unwrap() > 0); + + // Execute the other partition + let batches1 = Arc::clone(&plan) + .execute(1, ctx.task_ctx()) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let all_batches = batches0 + .into_iter() + .chain(batches1.into_iter()) + .collect_vec(); + #[rustfmt::skip] + let expected = &[ + "+----+--------+", + "| c1 | c |", + "+----+--------+", + "| 0 | 100000 |", + "| 1 | 100000 |", + "| 2 | 100000 |", + "| 3 | 100000 |", + "| 4 | 100000 |", + "| 5 | 100000 |", + "| 6 | 100000 |", + "| 7 | 100000 |", + "| 8 | 100000 |", + "| 9 | 100000 |", + "+----+--------+", + ]; + assert_batches_sorted_eq!(expected, &all_batches); +} From 446a614aeb0dfb71629b37314653efef33248c3e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 11 Oct 2025 07:39:02 -0500 Subject: [PATCH 6/8] fix typo --- datafusion/core/tests/memory_limit/repartition_mem_limit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/memory_limit/repartition_mem_limit.rs b/datafusion/core/tests/memory_limit/repartition_mem_limit.rs index c012f405bb20..e7ea3aec8062 100644 --- a/datafusion/core/tests/memory_limit/repartition_mem_limit.rs +++ b/datafusion/core/tests/memory_limit/repartition_mem_limit.rs @@ -31,7 +31,7 @@ use itertools::Itertools; /// End to end test for spilling in RepartitionExec. /// The idea is to make a real world query with a relatively low memory limit and -/// then drive one partition at a time, simulating dissimlar execution speed in partitions. +/// then drive one partition at a time, simulating dissimilar execution speed in partitions. /// Just as some examples of real world scenarios where this can happen consider /// lopsided groups in a group by especially if one partitions spills and others dont', /// or in distributed systems if one upstream node is slower than others. From 4c116baab601b44132a93b8a537a40f203837318 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 11 Oct 2025 10:46:17 -0500 Subject: [PATCH 7/8] Update datafusion/core/tests/memory_limit/repartition_mem_limit.rs Co-authored-by: Bruce Ritchie --- datafusion/core/tests/memory_limit/repartition_mem_limit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/memory_limit/repartition_mem_limit.rs b/datafusion/core/tests/memory_limit/repartition_mem_limit.rs index e7ea3aec8062..335bdd3c45c2 100644 --- a/datafusion/core/tests/memory_limit/repartition_mem_limit.rs +++ b/datafusion/core/tests/memory_limit/repartition_mem_limit.rs @@ -33,7 +33,7 @@ use itertools::Itertools; /// The idea is to make a real world query with a relatively low memory limit and /// then drive one partition at a time, simulating dissimilar execution speed in partitions. /// Just as some examples of real world scenarios where this can happen consider -/// lopsided groups in a group by especially if one partitions spills and others dont', +/// lopsided groups in a group by especially if one partitions spills and others don't, /// or in distributed systems if one upstream node is slower than others. #[tokio::test] async fn test_repartition_memory_limit() { From bcfd64a476b822eb29ec3bef0ee196fb3dcc69e9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 13 Oct 2025 10:43:31 -0500 Subject: [PATCH 8/8] handle empty batches, add note about expect --- datafusion/physical-plan/src/repartition/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 12d7b101bc91..8c9f91573e08 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1065,6 +1065,11 @@ impl RepartitionExec { None => break, }; + // Handle empty batch + if batch.num_rows() == 0 { + continue; + } + for res in partitioner.partition_iter(batch)? { let (partition, batch) = res?; let size = batch.get_array_memory_size(); @@ -1089,6 +1094,7 @@ impl RepartitionExec { "RepartitionExec spill partition {partition}" ), )? + // Note that we handled empty batch above, so this is safe .expect("non-empty batch should produce spill file"); // Store size for validation when reading back