From 65ff5242c7472deed9482123fd5ab9a76349f241 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 20 Apr 2026 15:07:13 +0800 Subject: [PATCH 01/10] refactor: Simplify NLJ re-scans with `ReplayableStreamSource` --- .../src/joins/nested_loop_join.rs | 118 ++---- datafusion/physical-plan/src/spill/mod.rs | 1 + .../src/spill/replayable_spill_input.rs | 372 ++++++++++++++++++ 3 files changed, 408 insertions(+), 83 deletions(-) create mode 100644 datafusion/physical-plan/src/spill/replayable_spill_input.rs diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 030fa5bbf94ff..8a19c8b3198d9 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -79,7 +79,7 @@ use log::debug; use parking_lot::Mutex; use crate::metrics::SpillMetrics; -use crate::spill::in_progress_spill_file::InProgressSpillFile; +use crate::spill::replayable_spill_input::ReplayableStreamSource; use crate::spill::spill_manager::SpillManager; #[expect(rustdoc::private_intra_doc_links)] @@ -931,16 +931,8 @@ pub(crate) struct SpillStateActive { reservation: MemoryReservation, /// Accumulated left batches for the current chunk pending_batches: Vec, - /// SpillManager for right-side spilling - right_spill_manager: SpillManager, - /// In-progress spill file for writing right batches during first pass - right_spill_in_progress: Option, - /// Completed right-side spill file (available after first pass) - right_spill_file: Option, - /// Max right batch memory size (for read_spill_as_stream) - right_max_batch_memory: usize, - /// Whether this is the first right-side pass (need to spill while reading) - is_first_right_pass: bool, + /// Right input that spills on the first pass and replays from spill later. + right_input: ReplayableStreamSource, } pub(crate) struct NestedLoopJoinStream { @@ -960,7 +952,7 @@ pub(crate) struct NestedLoopJoinStream { /// type of the join pub(crate) join_type: JoinType, /// the probe-side(right) table data of the nested loop join - pub(crate) right_data: SendableRecordBatchStream, + pub(crate) right_data: Option, /// the build-side table data of the nested loop join pub(crate) left_data: OnceFut, /// Projection to construct the output schema from the left and right tables. @@ -1258,7 +1250,7 @@ impl NestedLoopJoinStream { output_schema: Arc::clone(&schema), join_filter: filter, join_type, - right_data, + right_data: Some(right_data), column_indices, left_data, metrics, @@ -1356,7 +1348,15 @@ impl NestedLoopJoinStream { .register(context.memory_pool()); // Create SpillManager for right-side spilling - let right_schema = self.right_data.schema(); + let right_schema = self + .right_data + .as_ref() + .expect("right_data must be present before fallback") + .schema(); + let right_data = self + .right_data + .take() + .expect("right_data must be present before fallback"); let right_spill_manager = SpillManager::new( context.runtime_env(), self.metrics.spill_metrics.clone(), @@ -1370,11 +1370,11 @@ impl NestedLoopJoinStream { left_schema: None, reservation, pending_batches: Vec::new(), - right_spill_manager, - right_spill_in_progress: None, - right_spill_file: None, - right_max_batch_memory: 0, - is_first_right_pass: true, + right_input: ReplayableStreamSource::new( + right_data, + right_spill_manager, + "NestedLoopJoin right spill", + ), })); // State stays BufferingLeft — next poll will enter @@ -1571,33 +1571,12 @@ impl NestedLoopJoinStream { self.buffered_left_data = Some(Arc::new(left_data)); - // Set up right-side stream for this pass - if !active.is_first_right_pass { - if let Some(file) = active.right_spill_file.as_ref() { - match active.right_spill_manager.read_spill_as_stream( - file.clone(), - Some(active.right_max_batch_memory), - ) { - Ok(stream) => { - self.right_data = stream; - } - Err(e) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); - } - } + match active.right_input.open_pass() { + Ok(stream) => { + self.right_data = Some(stream); } - } else { - // First pass: create InProgressSpillFile for right side - match active - .right_spill_manager - .create_in_progress_file("NestedLoopJoin right spill") - { - Ok(file) => { - active.right_spill_in_progress = Some(file); - } - Err(e) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); - } + Err(e) => { + return ControlFlow::Break(Poll::Ready(Some(Err(e)))); } } @@ -1613,7 +1592,12 @@ impl NestedLoopJoinStream { &mut self, cx: &mut std::task::Context<'_>, ) -> ControlFlow>>> { - match self.right_data.poll_next_unpin(cx) { + match self + .right_data + .as_mut() + .expect("right_data must be present while fetching right") + .poll_next_unpin(cx) + { Poll::Ready(result) => match result { Some(Ok(right_batch)) => { // Update metrics @@ -1626,19 +1610,6 @@ impl NestedLoopJoinStream { return ControlFlow::Continue(()); } - // In memory-limited mode, spill right batch to disk on first pass - if let SpillState::Active(ref mut active) = self.spill_state - && active.is_first_right_pass - && let Some(ref mut spill_file) = active.right_spill_in_progress - { - if let Err(e) = spill_file.append_batch(&right_batch) { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); - } - active.right_max_batch_memory = active - .right_max_batch_memory - .max(right_batch.get_array_memory_size()); - } - self.current_right_batch = Some(right_batch); // Prepare right bitmap @@ -1654,29 +1625,6 @@ impl NestedLoopJoinStream { } Some(Err(e)) => ControlFlow::Break(Poll::Ready(Some(Err(e)))), None => { - // Right stream exhausted. - // In memory-limited mode, finalize the spill file after first pass. - if let SpillState::Active(ref mut active) = self.spill_state - && active.is_first_right_pass - { - if let Some(mut spill_in_progress) = - active.right_spill_in_progress.take() - { - match spill_in_progress.finish() { - Ok(Some(file)) => { - active.right_spill_file = Some(file); - } - Ok(None) => { - // No data was spilled (right side was empty) - } - Err(e) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); - } - } - } - active.is_first_right_pass = false; - } - self.state = NLJState::EmitLeftUnmatched; ControlFlow::Continue(()) } @@ -2257,7 +2205,11 @@ impl NestedLoopJoinStream { } let bitmap_sliced = BooleanArray::new(bitmap_sliced.finish(), None); - let right_schema = self.right_data.schema(); + let right_schema = self + .right_data + .as_ref() + .expect("right_data must be present when building unmatched batch") + .schema(); build_unmatched_batch( &self.output_schema, &left_batch_sliced, diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 1c900b7579f73..904dd5801be03 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -18,6 +18,7 @@ //! Defines the spilling functions pub(crate) mod in_progress_spill_file; +pub(crate) mod replayable_spill_input; pub(crate) mod spill_manager; pub mod spill_pool; diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs new file mode 100644 index 0000000000000..9bc3eff77e359 --- /dev/null +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility for replaying a one-shot input `RecourdBatchStream` through spill. +//! +//! See comments in [`ReplayableStreamSource`] for details. + +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::task::{Context, Poll}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::{Result, internal_err}; +use datafusion_execution::RecordBatchStream; +use datafusion_execution::SendableRecordBatchStream; +use datafusion_execution::disk_manager::RefCountedTempFile; +use futures::Stream; +use parking_lot::Mutex; + +use crate::EmptyRecordBatchStream; +use crate::spill::in_progress_spill_file::InProgressSpillFile; +use crate::spill::spill_manager::SpillManager; + +const FIRST_PASS_ACTIVE_EPOCH: u32 = 1; +const POISONED_EPOCH: u32 = u32::MAX; + +/// Spill-backed replayable stream source. +/// +/// [`ReplayableStreamSource`] is constructed from an input stream, usually produced +/// by executing an input `ExecutionPlan`. +/// +/// - On the first pass, it evaluates the input stream, produces `RecordBatch`es, +/// caches those batches to a local spill file, and also forwards them to the +/// output. +/// - On subsequent passes, it reads directly from the spill file. +/// +/// ```text +/// first pass: +/// +/// RecordBatch stream +/// | +/// v +/// [batch] -> output +/// | +/// +----> spill file +/// +/// +/// later passes: +/// +/// spill file +/// | +/// v +/// [batch] -> output +/// ``` +/// +/// This is useful when an input stream must be replayed and: +/// - Re-evaluation is expensive because the input stream may come from a long +/// and complex pipeline. +/// - The parent operator is under memory pressure and cannot cache the input in +/// memory for replay. +pub(crate) struct ReplayableStreamSource { + schema: SchemaRef, + input: Option, + spill_manager: SpillManager, + request_description: String, + /// 0 = unopened, 1 = first pass active, 2 = replayable/empty, MAX = poisoned + /// on execution errors. + epoch: Arc, + spill_file: Arc>>, +} + +impl ReplayableStreamSource { + /// Creates a replayable stream producer over a one-shot input stream. + /// + /// It caches the input into a local spill file on the first pass, then + /// reads directly from that spill file on subsequent passes. + pub(crate) fn new( + input: SendableRecordBatchStream, + spill_manager: SpillManager, + request_description: impl Into, + ) -> Self { + let schema = input.schema(); + Self { + schema, + input: Some(input), + spill_manager, + request_description: request_description.into(), + epoch: Arc::new(AtomicU32::new(0)), + spill_file: Arc::new(Mutex::new(None)), + } + } + + /// Opens the next pass over this input. + /// + /// The first call returns a stream that forwards upstream batches while + /// caching them to spill. Later calls return streams that read directly + /// from the completed spill file. + /// + /// # Note + /// Subsequent passes MUST be opened only after the initial pass is fully + /// consumed; otherwise, an error is returned. + pub(crate) fn open_pass(&mut self) -> Result { + match self.epoch.load(Ordering::Relaxed) { + 0 => { + let Some(input) = self.input.take() else { + return internal_err!( + "ReplayableStreamSource missing first-pass input" + ); + }; + let spill_file = self + .spill_manager + .create_in_progress_file(&self.request_description)?; + *self.spill_file.lock() = None; + self.epoch.store(FIRST_PASS_ACTIVE_EPOCH, Ordering::Relaxed); + + Ok(Box::pin(SpillCachingStream::new( + Arc::clone(&self.schema), + input, + Arc::clone(&self.epoch), + Arc::clone(&self.spill_file), + FIRST_PASS_ACTIVE_EPOCH, + spill_file, + ))) + } + FIRST_PASS_ACTIVE_EPOCH => { + internal_err!("ReplayableStreamSource first pass is still active") + } + POISONED_EPOCH => { + internal_err!( + "ReplayableStreamSource first pass did not complete successfully" + ) + } + _ => { + let spill_file = self.spill_file.lock(); + if let Some(file) = spill_file.as_ref() { + self.spill_manager.read_spill_as_stream(file.clone(), None) + } else { + Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( + &self.schema, + )))) + } + } + } + } +} + +/// Evaluates and forwards the `inner` stream output while caching it to a spill file +/// for future replays. +struct SpillCachingStream { + schema: SchemaRef, + epoch_state: Arc, + spill_file_state: Arc>>, + epoch: u32, + spill_file: Option, + inner: SendableRecordBatchStream, +} + +impl SpillCachingStream { + fn new( + schema: SchemaRef, + inner: SendableRecordBatchStream, + epoch_state: Arc, + spill_file_state: Arc>>, + epoch: u32, + spill_file: InProgressSpillFile, + ) -> Self { + Self { + schema, + epoch_state, + spill_file_state, + epoch, + spill_file: Some(spill_file), + inner, + } + } + + fn publish_result( + epoch_state: &Arc, + spill_file_state: &Arc>>, + epoch: u32, + spill_file: Option, + ) { + if epoch_state.load(Ordering::Relaxed) == epoch { + *spill_file_state.lock() = spill_file; + epoch_state.store(epoch.saturating_add(1), Ordering::Relaxed); + } + } + + fn poison( + epoch_state: &Arc, + spill_file_state: &Arc>>, + epoch: u32, + ) { + if epoch_state.load(Ordering::Relaxed) == epoch { + *spill_file_state.lock() = None; + epoch_state.store(POISONED_EPOCH, Ordering::Relaxed); + } + } +} + +impl Stream for SpillCachingStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + match this.inner.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(batch))) => { + if batch.num_rows() > 0 + && let Some(spill_file) = this.spill_file.as_mut() + && let Err(e) = spill_file.append_batch(&batch) + { + this.spill_file.take(); + Self::poison(&this.epoch_state, &this.spill_file_state, this.epoch); + return Poll::Ready(Some(Err(e))); + } + + Poll::Ready(Some(Ok(batch))) + } + Poll::Ready(Some(Err(e))) => { + this.spill_file.take(); + Self::poison(&this.epoch_state, &this.spill_file_state, this.epoch); + Poll::Ready(Some(Err(e))) + } + Poll::Ready(None) => { + let result = match this.spill_file.as_mut() { + Some(spill_file) => spill_file.finish(), + None => Ok(None), + }; + + match result { + Ok(file) => { + this.spill_file.take(); + Self::publish_result( + &this.epoch_state, + &this.spill_file_state, + this.epoch, + file, + ); + Poll::Ready(None) + } + Err(e) => { + this.spill_file.take(); + Self::poison( + &this.epoch_state, + &this.spill_file_state, + this.epoch, + ); + Poll::Ready(Some(Err(e))) + } + } + } + Poll::Pending => Poll::Pending, + } + } +} + +impl RecordBatchStream for SpillCachingStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +impl Drop for SpillCachingStream { + fn drop(&mut self) { + if self.spill_file.is_some() { + Self::poison(&self.epoch_state, &self.spill_file_state, self.epoch); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int64Array; + use arrow_schema::{DataType, Field, Schema}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use datafusion_physical_expr_common::metrics::{ + ExecutionPlanMetricsSet, SpillMetrics, + }; + use futures::{StreamExt, TryStreamExt}; + + use crate::stream::RecordBatchStreamAdapter; + + fn build_spill_manager(schema: SchemaRef) -> Result { + let runtime = Arc::new(RuntimeEnvBuilder::new().build()?); + let metrics_set = ExecutionPlanMetricsSet::new(); + let spill_metrics = SpillMetrics::new(&metrics_set, 0); + Ok(SpillManager::new(runtime, spill_metrics, schema)) + } + + fn build_batch(schema: SchemaRef, values: Vec) -> Result { + RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(values))]) + .map_err(Into::into) + } + + #[tokio::test] + async fn test_replayable_spill_input_replays_completed_first_pass() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch1 = build_batch(Arc::clone(&schema), vec![1, 2])?; + let batch2 = build_batch(Arc::clone(&schema), vec![3, 4])?; + + let input = Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&schema), + futures::stream::iter(vec![Ok(batch1.clone()), Ok(batch2.clone())]), + )); + let spill_manager = build_spill_manager(Arc::clone(&schema))?; + let mut replayable = + ReplayableStreamSource::new(input, spill_manager, "test replayable spill"); + + let pass1 = replayable.open_pass()?; + let pass1_batches = pass1.try_collect::>().await?; + assert_eq!(pass1_batches, vec![batch1.clone(), batch2.clone()]); + + let pass2 = replayable.open_pass()?; + let pass2_batches = pass2.try_collect::>().await?; + assert_eq!(pass2_batches, vec![batch1, batch2]); + + Ok(()) + } + + #[tokio::test] + async fn test_replayable_spill_input_poisoned_when_first_pass_dropped() -> Result<()> + { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch1 = build_batch(Arc::clone(&schema), vec![1, 2])?; + let batch2 = build_batch(Arc::clone(&schema), vec![3, 4])?; + + let input = Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&schema), + futures::stream::iter(vec![Ok(batch1), Ok(batch2)]), + )); + let spill_manager = build_spill_manager(Arc::clone(&schema))?; + let mut replayable = + ReplayableStreamSource::new(input, spill_manager, "test replayable spill"); + + let mut pass1 = replayable.open_pass()?; + let first = pass1.next().await.transpose()?; + assert!(first.is_some()); + drop(pass1); + // The first pass has not finished, so the spill file is only partially + // written and cannot be used to open subsequent replay passes. + + let err = match replayable.open_pass() { + Ok(_) => panic!("expected first pass to poison replayable spill input"), + Err(err) => err.strip_backtrace(), + }; + assert!( + err.to_string().contains( + "ReplayableStreamSource first pass did not complete successfully" + ) + ); + + Ok(()) + } +} From a8f702d1dbf58ff4995fbc609bfe813ca14c161e Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 20 Apr 2026 15:15:32 +0800 Subject: [PATCH 02/10] more comments --- datafusion/physical-plan/src/joins/nested_loop_join.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 8a19c8b3198d9..24fd03fb4196d 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -952,6 +952,7 @@ pub(crate) struct NestedLoopJoinStream { /// type of the join pub(crate) join_type: JoinType, /// the probe-side(right) table data of the nested loop join + /// `Option` is used becuase memory-limited path requires resetting it. pub(crate) right_data: Option, /// the build-side table data of the nested loop join pub(crate) left_data: OnceFut, From f97f5bcfc0560371f280a3fe25f109f002f3d4f4 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 20 Apr 2026 15:42:02 +0800 Subject: [PATCH 03/10] fix typo Co-authored-by: Yongting You <2010youy01@gmail.com> --- datafusion/physical-plan/src/joins/nested_loop_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 24fd03fb4196d..c686d9d5a05ff 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -952,7 +952,7 @@ pub(crate) struct NestedLoopJoinStream { /// type of the join pub(crate) join_type: JoinType, /// the probe-side(right) table data of the nested loop join - /// `Option` is used becuase memory-limited path requires resetting it. + /// `Option` is used because memory-limited path requires resetting it. pub(crate) right_data: Option, /// the build-side table data of the nested loop join pub(crate) left_data: OnceFut, From 19494c245d8921967df57f9dadf309b07e4c65a0 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 21 Apr 2026 13:51:40 +0800 Subject: [PATCH 04/10] review: refactor to make shared state management more clear --- .../src/spill/replayable_spill_input.rs | 253 +++++++++++------- 1 file changed, 159 insertions(+), 94 deletions(-) diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs index 9bc3eff77e359..01ce77bae9903 100644 --- a/datafusion/physical-plan/src/spill/replayable_spill_input.rs +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -21,7 +21,6 @@ use std::pin::Pin; use std::sync::Arc; -use std::sync::atomic::{AtomicU32, Ordering}; use std::task::{Context, Poll}; use arrow::datatypes::SchemaRef; @@ -37,9 +36,6 @@ use crate::EmptyRecordBatchStream; use crate::spill::in_progress_spill_file::InProgressSpillFile; use crate::spill::spill_manager::SpillManager; -const FIRST_PASS_ACTIVE_EPOCH: u32 = 1; -const POISONED_EPOCH: u32 = u32::MAX; - /// Spill-backed replayable stream source. /// /// [`ReplayableStreamSource`] is constructed from an input stream, usually produced @@ -74,15 +70,29 @@ const POISONED_EPOCH: u32 = u32::MAX; /// and complex pipeline. /// - The parent operator is under memory pressure and cannot cache the input in /// memory for replay. +/// +/// # Concurrency assumption +/// Passes must be opened and consumed sequentially. +/// Opening another pass before exhausting the current one returns an error. pub(crate) struct ReplayableStreamSource { schema: SchemaRef, input: Option, spill_manager: SpillManager, request_description: String, - /// 0 = unopened, 1 = first pass active, 2 = replayable/empty, MAX = poisoned - /// on execution errors. - epoch: Arc, - spill_file: Arc>>, + /// Inner state is owned by either the source or one active stream to ensure + /// sequential access; see struct docs for the concurrency contract. + /// + /// Ownership model: + /// - No active stream: source owns the state (`source.state = Some(state)`). + /// - Active stream: the stream owns the state (`source.state = None`). + state: Arc>>, +} + +/// Inner state exclusively owned by either [`ReplayableStreamSource`] or one [`ReplayablePassStream`] +enum StateInner { + Unopened, + Replayable(Option), + Poisoned, } impl ReplayableStreamSource { @@ -101,11 +111,14 @@ impl ReplayableStreamSource { input: Some(input), spill_manager, request_description: request_description.into(), - epoch: Arc::new(AtomicU32::new(0)), - spill_file: Arc::new(Mutex::new(None)), + state: Arc::new(Mutex::new(Some(StateInner::Unopened))), } } + fn restore_state(&self, state: StateInner) { + *self.state.lock() = Some(state); + } + /// Opens the next pass over this input. /// /// The first call returns a stream that forwards upstream batches while @@ -113,47 +126,60 @@ impl ReplayableStreamSource { /// from the completed spill file. /// /// # Note - /// Subsequent passes MUST be opened only after the initial pass is fully + /// Subsequent passes MUST be opened only after the previous pass is fully /// consumed; otherwise, an error is returned. pub(crate) fn open_pass(&mut self) -> Result { - match self.epoch.load(Ordering::Relaxed) { - 0 => { + let state = self.state.lock().take(); + let Some(state) = state else { + return internal_err!("ReplayableStreamSource pass is still active"); + }; + + match state { + StateInner::Unopened => { let Some(input) = self.input.take() else { + self.restore_state(StateInner::Poisoned); return internal_err!( "ReplayableStreamSource missing first-pass input" ); }; - let spill_file = self + let spill_file = match self .spill_manager - .create_in_progress_file(&self.request_description)?; - *self.spill_file.lock() = None; - self.epoch.store(FIRST_PASS_ACTIVE_EPOCH, Ordering::Relaxed); + .create_in_progress_file(&self.request_description) + { + Ok(spill_file) => spill_file, + Err(e) => { + self.input = Some(input); + self.restore_state(StateInner::Unopened); + return Err(e); + } + }; - Ok(Box::pin(SpillCachingStream::new( + Ok(Box::pin(ReplayablePassStream::new_first( Arc::clone(&self.schema), input, - Arc::clone(&self.epoch), - Arc::clone(&self.spill_file), - FIRST_PASS_ACTIVE_EPOCH, + Arc::clone(&self.state), spill_file, ))) } - FIRST_PASS_ACTIVE_EPOCH => { - internal_err!("ReplayableStreamSource first pass is still active") - } - POISONED_EPOCH => { + StateInner::Poisoned => { + self.restore_state(StateInner::Poisoned); internal_err!( "ReplayableStreamSource first pass did not complete successfully" ) } - _ => { - let spill_file = self.spill_file.lock(); - if let Some(file) = spill_file.as_ref() { - self.spill_manager.read_spill_as_stream(file.clone(), None) - } else { - Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( - &self.schema, - )))) + StateInner::Replayable(spill_file) => { + let replay_state = spill_file.clone(); + match ReplayablePassStream::new_replay( + Arc::clone(&self.schema), + &self.spill_manager, + Arc::clone(&self.state), + spill_file, + ) { + Ok(stream) => Ok(Box::pin(stream)), + Err(e) => { + self.restore_state(StateInner::Replayable(replay_state)); + Err(e) + } } } } @@ -162,59 +188,71 @@ impl ReplayableStreamSource { /// Evaluates and forwards the `inner` stream output while caching it to a spill file /// for future replays. -struct SpillCachingStream { +struct ReplayablePassStream { schema: SchemaRef, - epoch_state: Arc, - spill_file_state: Arc>>, - epoch: u32, + shared_state: Arc>>, + held_state: Option, spill_file: Option, inner: SendableRecordBatchStream, } -impl SpillCachingStream { - fn new( +impl ReplayablePassStream { + fn new_first( schema: SchemaRef, inner: SendableRecordBatchStream, - epoch_state: Arc, - spill_file_state: Arc>>, - epoch: u32, + shared_state: Arc>>, spill_file: InProgressSpillFile, ) -> Self { Self { schema, - epoch_state, - spill_file_state, - epoch, + shared_state, + held_state: Some(StateInner::Unopened), spill_file: Some(spill_file), inner, } } - fn publish_result( - epoch_state: &Arc, - spill_file_state: &Arc>>, - epoch: u32, + fn new_replay( + schema: SchemaRef, + spill_manager: &SpillManager, + shared_state: Arc>>, spill_file: Option, - ) { - if epoch_state.load(Ordering::Relaxed) == epoch { - *spill_file_state.lock() = spill_file; - epoch_state.store(epoch.saturating_add(1), Ordering::Relaxed); + ) -> Result { + let inner = if let Some(file) = spill_file.as_ref() { + spill_manager.read_spill_as_stream(file.clone(), None)? + } else { + Box::pin(EmptyRecordBatchStream::new(Arc::clone(&schema))) + }; + + Ok(Self { + schema, + shared_state, + held_state: Some(StateInner::Replayable(spill_file)), + spill_file: None, + inner, + }) + } + + fn restore_held_state(&mut self) { + if let Some(state) = self.held_state.take() { + *self.shared_state.lock() = Some(state); } } - fn poison( - epoch_state: &Arc, - spill_file_state: &Arc>>, - epoch: u32, - ) { - if epoch_state.load(Ordering::Relaxed) == epoch { - *spill_file_state.lock() = None; - epoch_state.store(POISONED_EPOCH, Ordering::Relaxed); + fn restore_state(&mut self, state: StateInner) { + if self.held_state.take().is_some() { + *self.shared_state.lock() = Some(state); + } + } + + fn poison(&mut self) { + if self.held_state.take().is_some() { + *self.shared_state.lock() = Some(StateInner::Poisoned); } } } -impl Stream for SpillCachingStream { +impl Stream for ReplayablePassStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -227,7 +265,7 @@ impl Stream for SpillCachingStream { && let Err(e) = spill_file.append_batch(&batch) { this.spill_file.take(); - Self::poison(&this.epoch_state, &this.spill_file_state, this.epoch); + this.poison(); return Poll::Ready(Some(Err(e))); } @@ -235,35 +273,27 @@ impl Stream for SpillCachingStream { } Poll::Ready(Some(Err(e))) => { this.spill_file.take(); - Self::poison(&this.epoch_state, &this.spill_file_state, this.epoch); + this.poison(); Poll::Ready(Some(Err(e))) } + // The stream is exhausted, give the inner state ownership back to `ReplayableStreamSource` Poll::Ready(None) => { - let result = match this.spill_file.as_mut() { - Some(spill_file) => spill_file.finish(), - None => Ok(None), - }; - - match result { - Ok(file) => { - this.spill_file.take(); - Self::publish_result( - &this.epoch_state, - &this.spill_file_state, - this.epoch, - file, - ); - Poll::Ready(None) - } - Err(e) => { - this.spill_file.take(); - Self::poison( - &this.epoch_state, - &this.spill_file_state, - this.epoch, - ); - Poll::Ready(Some(Err(e))) + if let Some(spill_file) = this.spill_file.as_mut() { + match spill_file.finish() { + Ok(file) => { + this.spill_file.take(); + this.restore_state(StateInner::Replayable(file)); + Poll::Ready(None) + } + Err(e) => { + this.spill_file.take(); + this.poison(); + Poll::Ready(Some(Err(e))) + } } + } else { + this.restore_held_state(); + Poll::Ready(None) } } Poll::Pending => Poll::Pending, @@ -271,16 +301,17 @@ impl Stream for SpillCachingStream { } } -impl RecordBatchStream for SpillCachingStream { +impl RecordBatchStream for ReplayablePassStream { fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } } -impl Drop for SpillCachingStream { +impl Drop for ReplayablePassStream { fn drop(&mut self) { - if self.spill_file.is_some() { - Self::poison(&self.epoch_state, &self.spill_file_state, self.epoch); + if self.held_state.is_some() { + self.spill_file.take(); + self.poison(); } } } @@ -335,6 +366,8 @@ mod tests { Ok(()) } + // Try to open a new pass, when the first pass has not finished. + // The spill file is only partially written, so an error will be returned. #[tokio::test] async fn test_replayable_spill_input_poisoned_when_first_pass_dropped() -> Result<()> { @@ -354,8 +387,6 @@ mod tests { let first = pass1.next().await.transpose()?; assert!(first.is_some()); drop(pass1); - // The first pass has not finished, so the spill file is only partially - // written and cannot be used to open subsequent replay passes. let err = match replayable.open_pass() { Ok(_) => panic!("expected first pass to poison replayable spill input"), @@ -369,4 +400,38 @@ mod tests { Ok(()) } + + // Open a new pass, when the previous pass from spill is still in progress. + // An error is expected, since it requires sequential access. + #[tokio::test] + async fn test_replayable_spill_input_errors_when_replay_pass_in_progress() + -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch1 = build_batch(Arc::clone(&schema), vec![1, 2])?; + let batch2 = build_batch(Arc::clone(&schema), vec![3, 4])?; + + let input = Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&schema), + futures::stream::iter(vec![Ok(batch1.clone()), Ok(batch2.clone())]), + )); + let spill_manager = build_spill_manager(Arc::clone(&schema))?; + let mut replayable = + ReplayableStreamSource::new(input, spill_manager, "test replayable spill"); + + let pass1 = replayable.open_pass()?; + let _ = pass1.try_collect::>().await?; + + let pass2 = replayable.open_pass()?; + let err = match replayable.open_pass() { + Ok(_) => panic!("expected open_pass to fail while replay pass is active"), + Err(err) => err.strip_backtrace(), + }; + assert!( + err.to_string() + .contains("ReplayableStreamSource pass is still active") + ); + drop(pass2); + + Ok(()) + } } From cf8a5d293bcf8c0f59e83d8158d71e4dc0f535c2 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 21 Apr 2026 16:03:02 +0800 Subject: [PATCH 05/10] fix lint --- ...verrides@explain_plan_environment_overrides.snap | 13 +++++++------ .../src/spill/replayable_spill_input.rs | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion-cli/tests/snapshots/cli_explain_environment_overrides@explain_plan_environment_overrides.snap b/datafusion-cli/tests/snapshots/cli_explain_environment_overrides@explain_plan_environment_overrides.snap index 1359cefbe71c7..8129cc14a2acb 100644 --- a/datafusion-cli/tests/snapshots/cli_explain_environment_overrides@explain_plan_environment_overrides.snap +++ b/datafusion-cli/tests/snapshots/cli_explain_environment_overrides@explain_plan_environment_overrides.snap @@ -1,5 +1,6 @@ --- source: datafusion-cli/tests/cli_integration.rs +assertion_line: 188 info: program: datafusion-cli args: @@ -18,19 +19,19 @@ exit_code: 0 | logical_plan | [ | | | { | | | "Plan": { | -| | "Expressions": [ | -| | "Int64(123)" | -| | ], | | | "Node Type": "Projection", | -| | "Output": [ | +| | "Expressions": [ | | | "Int64(123)" | | | ], | | | "Plans": [ | | | { | | | "Node Type": "EmptyRelation", | -| | "Output": [], | -| | "Plans": [] | +| | "Plans": [], | +| | "Output": [] | | | } | +| | ], | +| | "Output": [ | +| | "Int64(123)" | | | ] | | | } | | | } | diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs index 01ce77bae9903..e0c95eea31fda 100644 --- a/datafusion/physical-plan/src/spill/replayable_spill_input.rs +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -70,7 +70,7 @@ use crate::spill::spill_manager::SpillManager; /// and complex pipeline. /// - The parent operator is under memory pressure and cannot cache the input in /// memory for replay. -/// +/// /// # Concurrency assumption /// Passes must be opened and consumed sequentially. /// Opening another pass before exhausting the current one returns an error. From ecac3b84becd4423df917e0d58fe98998b726cda Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 21 Apr 2026 16:10:44 +0800 Subject: [PATCH 06/10] Update datafusion/physical-plan/src/spill/replayable_spill_input.rs Co-authored-by: Martin Grigorov --- datafusion/physical-plan/src/spill/replayable_spill_input.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs index e0c95eea31fda..28e9ed048a114 100644 --- a/datafusion/physical-plan/src/spill/replayable_spill_input.rs +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Utility for replaying a one-shot input `RecourdBatchStream` through spill. +//! Utility for replaying a one-shot input `RecordBatchStream` through spill. //! //! See comments in [`ReplayableStreamSource`] for details. From b08d2e24dcb5fbb1dbdbdbe1fa8e5e4a2cf8a23f Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 21 Apr 2026 16:19:06 +0800 Subject: [PATCH 07/10] fix ci --- ...verrides@explain_plan_environment_overrides.snap | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/datafusion-cli/tests/snapshots/cli_explain_environment_overrides@explain_plan_environment_overrides.snap b/datafusion-cli/tests/snapshots/cli_explain_environment_overrides@explain_plan_environment_overrides.snap index 8129cc14a2acb..1359cefbe71c7 100644 --- a/datafusion-cli/tests/snapshots/cli_explain_environment_overrides@explain_plan_environment_overrides.snap +++ b/datafusion-cli/tests/snapshots/cli_explain_environment_overrides@explain_plan_environment_overrides.snap @@ -1,6 +1,5 @@ --- source: datafusion-cli/tests/cli_integration.rs -assertion_line: 188 info: program: datafusion-cli args: @@ -19,19 +18,19 @@ exit_code: 0 | logical_plan | [ | | | { | | | "Plan": { | -| | "Node Type": "Projection", | | | "Expressions": [ | | | "Int64(123)" | | | ], | +| | "Node Type": "Projection", | +| | "Output": [ | +| | "Int64(123)" | +| | ], | | | "Plans": [ | | | { | | | "Node Type": "EmptyRelation", | -| | "Plans": [], | -| | "Output": [] | +| | "Output": [], | +| | "Plans": [] | | | } | -| | ], | -| | "Output": [ | -| | "Int64(123)" | | | ] | | | } | | | } | From 8403dae7ef6fe76d2ca54483afe46a37f235cdcf Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 21 Apr 2026 17:34:05 +0800 Subject: [PATCH 08/10] review: cleanup --- .../src/spill/replayable_spill_input.rs | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs index 28e9ed048a114..d9d93c81178fd 100644 --- a/datafusion/physical-plan/src/spill/replayable_spill_input.rs +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -115,7 +115,7 @@ impl ReplayableStreamSource { } } - fn restore_state(&self, state: StateInner) { + fn set_state(&self, state: StateInner) { *self.state.lock() = Some(state); } @@ -137,7 +137,7 @@ impl ReplayableStreamSource { match state { StateInner::Unopened => { let Some(input) = self.input.take() else { - self.restore_state(StateInner::Poisoned); + self.set_state(StateInner::Poisoned); return internal_err!( "ReplayableStreamSource missing first-pass input" ); @@ -149,7 +149,7 @@ impl ReplayableStreamSource { Ok(spill_file) => spill_file, Err(e) => { self.input = Some(input); - self.restore_state(StateInner::Unopened); + self.set_state(StateInner::Unopened); return Err(e); } }; @@ -162,7 +162,6 @@ impl ReplayableStreamSource { ))) } StateInner::Poisoned => { - self.restore_state(StateInner::Poisoned); internal_err!( "ReplayableStreamSource first pass did not complete successfully" ) @@ -177,7 +176,7 @@ impl ReplayableStreamSource { ) { Ok(stream) => Ok(Box::pin(stream)), Err(e) => { - self.restore_state(StateInner::Replayable(replay_state)); + self.set_state(StateInner::Replayable(replay_state)); Err(e) } } @@ -239,16 +238,14 @@ impl ReplayablePassStream { } } - fn restore_state(&mut self, state: StateInner) { + fn set_state(&mut self, state: StateInner) { if self.held_state.take().is_some() { *self.shared_state.lock() = Some(state); } } fn poison(&mut self) { - if self.held_state.take().is_some() { - *self.shared_state.lock() = Some(StateInner::Poisoned); - } + self.set_state(StateInner::Poisoned); } } @@ -282,7 +279,7 @@ impl Stream for ReplayablePassStream { match spill_file.finish() { Ok(file) => { this.spill_file.take(); - this.restore_state(StateInner::Replayable(file)); + this.set_state(StateInner::Replayable(file)); Poll::Ready(None) } Err(e) => { @@ -308,9 +305,13 @@ impl RecordBatchStream for ReplayablePassStream { } impl Drop for ReplayablePassStream { + /// If a stream is dropped before it finishes, poison the state so later + /// replay attempts fail. + /// + /// A partial first pass leaves the spill file incomplete, so replaying it + /// would be unsafe. fn drop(&mut self) { if self.held_state.is_some() { - self.spill_file.take(); self.poison(); } } From d7e81d905286db77bed4ef34079dd702495ab004 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 21 Apr 2026 17:42:58 +0800 Subject: [PATCH 09/10] rename struct --- .../src/spill/replayable_spill_input.rs | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs index d9d93c81178fd..55d57b73fdd4d 100644 --- a/datafusion/physical-plan/src/spill/replayable_spill_input.rs +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -154,7 +154,7 @@ impl ReplayableStreamSource { } }; - Ok(Box::pin(ReplayablePassStream::new_first( + Ok(Box::pin(ReplayableSpillStream::new_first( Arc::clone(&self.schema), input, Arc::clone(&self.state), @@ -168,7 +168,7 @@ impl ReplayableStreamSource { } StateInner::Replayable(spill_file) => { let replay_state = spill_file.clone(); - match ReplayablePassStream::new_replay( + match ReplayableSpillStream::new_replay( Arc::clone(&self.schema), &self.spill_manager, Arc::clone(&self.state), @@ -185,9 +185,16 @@ impl ReplayableStreamSource { } } -/// Evaluates and forwards the `inner` stream output while caching it to a spill file -/// for future replays. -struct ReplayablePassStream { +/// Makes a one-shot stream replayable using spill caching, keeping replays fast +/// and memory efficient. +/// +/// On the first pass, it evaluates and forwards output from `inner` while +/// caching it to a spill file for future replays. +/// +/// On later passes, it replays directly from the cached spill file. +/// +/// See also [`ReplayableStreamSource`] for details. +struct ReplayableSpillStream { schema: SchemaRef, shared_state: Arc>>, held_state: Option, @@ -195,7 +202,7 @@ struct ReplayablePassStream { inner: SendableRecordBatchStream, } -impl ReplayablePassStream { +impl ReplayableSpillStream { fn new_first( schema: SchemaRef, inner: SendableRecordBatchStream, @@ -249,7 +256,7 @@ impl ReplayablePassStream { } } -impl Stream for ReplayablePassStream { +impl Stream for ReplayableSpillStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -298,13 +305,13 @@ impl Stream for ReplayablePassStream { } } -impl RecordBatchStream for ReplayablePassStream { +impl RecordBatchStream for ReplayableSpillStream { fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } } -impl Drop for ReplayablePassStream { +impl Drop for ReplayableSpillStream { /// If a stream is dropped before it finishes, poison the state so later /// replay attempts fail. /// From 3eba1744266cbe50971f88a6edce43844c109d82 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 21 Apr 2026 17:50:14 +0800 Subject: [PATCH 10/10] ci fix --- datafusion/physical-plan/src/spill/replayable_spill_input.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs index 55d57b73fdd4d..ddef15a639183 100644 --- a/datafusion/physical-plan/src/spill/replayable_spill_input.rs +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -88,7 +88,7 @@ pub(crate) struct ReplayableStreamSource { state: Arc>>, } -/// Inner state exclusively owned by either [`ReplayableStreamSource`] or one [`ReplayablePassStream`] +/// Inner state exclusively owned by either [`ReplayableStreamSource`] or one [`ReplayableSpillStream`] enum StateInner { Unopened, Replayable(Option),