From 3da2ff8a3f09de63287e5e1fe4943c7968494fd5 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 13 Mar 2026 17:14:26 +0800 Subject: [PATCH 1/6] Fix sort merge interleave overflow --- datafusion/physical-plan/src/sorts/builder.rs | 65 +++++++++++++++++-- datafusion/physical-plan/src/sorts/merge.rs | 6 +- 2 files changed, 65 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index a462b832056bd..b30975879dcfc 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -18,8 +18,9 @@ use crate::spill::get_record_batch_memory_size; use arrow::compute::interleave; use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; use std::sync::Arc; @@ -126,9 +127,28 @@ impl BatchBuilder { &self.schema } + /// Try to interleave all columns using the given index slice. + fn try_interleave_columns( + &self, + indices: &[(usize, usize)], + ) -> Result>> { + (0..self.schema.fields.len()) + .map(|column_idx| { + let arrays: Vec<_> = self + .batches + .iter() + .map(|(_, batch)| batch.column(column_idx).as_ref()) + .collect(); + Ok(interleave(&arrays, indices)?) + }) + .collect::>>() + } + /// Drains the in_progress row indexes, and builds a new RecordBatch from them /// - /// Will then drop any batches for which all rows have been yielded to the output + /// Will then drop any batches for which all rows have been yielded to the output. + /// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX), + /// retries with progressively fewer rows until it succeeds. /// /// Returns `None` if no pending rows pub fn build_record_batch(&mut self) -> Result> { @@ -167,8 +187,35 @@ impl BatchBuilder { } else { self.batches_mem_used -= get_record_batch_memory_size(batch); } - retain - }); + }; + + // Remove consumed indices, keeping any remaining for the next call. + self.indices.drain(..end); + + // Only clean up fully-consumed batches when all indices are drained, + // because remaining indices may still reference earlier batches. + if self.indices.is_empty() { + // New cursors are only created once the previous cursor for the stream + // is finished. This means all remaining rows from all but the last batch + // for each stream have been yielded to the newly created record batch + // + // We can therefore drop all but the last batch for each stream + let mut batch_idx = 0; + let mut retained = 0; + self.batches.retain(|(stream_idx, batch)| { + let stream_cursor = &mut self.cursors[*stream_idx]; + let retain = stream_cursor.batch_idx == batch_idx; + batch_idx += 1; + + if retain { + stream_cursor.batch_idx = retained; + retained += 1; + } else { + self.reservation.shrink(get_record_batch_memory_size(batch)); + } + retain + }); + } // Release excess memory back to the pool, but never shrink below // initial_reservation to maintain the anti-starvation guarantee @@ -185,6 +232,7 @@ impl BatchBuilder { } } +<<<<<<< HEAD /// Try to grow `reservation` so it covers at least `needed` bytes. /// /// When a reservation has been pre-loaded with bytes (e.g. via @@ -199,4 +247,13 @@ pub(crate) fn try_grow_reservation_to_at_least( reservation.try_grow(needed - reservation.size())?; } Ok(()) +======= +/// Returns `true` if the error is an Arrow offset overflow error. +fn is_offset_overflow(e: &DataFusionError) -> bool { + matches!( + e, + DataFusionError::ArrowError(err, _) + if matches!(err.as_ref(), ArrowError::OffsetOverflowError(_)) + ) +>>>>>>> 967cf0a65 (Fix sort merge interleave overflow) } diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 272816251daf9..80e062f434b16 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -288,9 +288,11 @@ impl SortPreservingMergeStream { } } - self.produced += self.in_progress.len(); + let before = self.in_progress.len(); + let result = self.in_progress.build_record_batch(); + self.produced += before - self.in_progress.len(); - return Poll::Ready(self.in_progress.build_record_batch().transpose()); + return Poll::Ready(result.transpose()); } } From 61455f69b471b3ef7f98791e6a7d2d0ae723a58a Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 19 Mar 2026 15:30:41 +0800 Subject: [PATCH 2/6] add tests --- datafusion/physical-plan/src/sorts/builder.rs | 84 ++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index b30975879dcfc..1895e7ad07896 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -22,6 +22,7 @@ use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; +use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::Arc; #[derive(Debug, Copy, Clone, Default)] @@ -139,7 +140,22 @@ impl BatchBuilder { .iter() .map(|(_, batch)| batch.column(column_idx).as_ref()) .collect(); - Ok(interleave(&arrays, indices)?) + // Arrow's interleave panics on i32 offset overflow with + // `.expect("overflow")`. Catch that panic so the caller + // can retry with fewer rows. + match catch_unwind(AssertUnwindSafe(|| interleave(&arrays, indices))) { + Ok(result) => Ok(result?), + Err(panic_payload) => { + if is_overflow_panic(&panic_payload) { + Err(DataFusionError::ArrowError( + Box::new(ArrowError::OffsetOverflowError(0)), + None, + )) + } else { + std::panic::resume_unwind(panic_payload); + } + } + } }) .collect::>>() } @@ -257,3 +273,69 @@ fn is_offset_overflow(e: &DataFusionError) -> bool { ) >>>>>>> 967cf0a65 (Fix sort merge interleave overflow) } + +/// Returns true if a caught panic payload looks like an Arrow offset overflow. +fn is_overflow_panic(payload: &Box) -> bool { + if let Some(msg) = payload.downcast_ref::<&str>() { + return msg.contains("overflow"); + } + if let Some(msg) = payload.downcast_ref::() { + return msg.contains("overflow"); + } + false +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::StringArray; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, UnboundedMemoryPool}; + + /// Test that interleaving string columns whose combined byte length + /// exceeds i32::MAX does not panic. Arrow's `interleave` panics with + /// `.expect("overflow")` in this case; `BatchBuilder` catches the + /// panic and retries with fewer rows until the output fits in i32 + /// offsets. + #[test] + fn test_interleave_overflow_is_caught() { + // Each string is ~768 MB. Three rows total → ~2.3 GB > i32::MAX. + let big_str: String = "x".repeat(768 * 1024 * 1024); + + let schema = Arc::new(Schema::new(vec![Field::new( + "s", + DataType::Utf8, + false, + )])); + + let pool: Arc = Arc::new(UnboundedMemoryPool::default()); + let reservation = MemoryConsumer::new("test").register(&pool); + let mut builder = BatchBuilder::new( + Arc::clone(&schema), + /* stream_count */ 3, + /* batch_size */ 16, + reservation, + ); + + // Push one batch per stream, each containing one large string. + for stream_idx in 0..3 { + let array = StringArray::from(vec![big_str.as_str()]); + let batch = + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]) + .unwrap(); + builder.push_batch(stream_idx, batch).unwrap(); + builder.push_row(stream_idx); + } + + // 3 rows total; interleaving all 3 would overflow i32 offsets. + // The retry loop should halve until it succeeds. + let batch = builder.build_record_batch().unwrap().unwrap(); + assert!(batch.num_rows() > 0); + assert!(batch.num_rows() < 3); + + // Drain remaining rows. + let batch2 = builder.build_record_batch().unwrap().unwrap(); + assert!(batch2.num_rows() > 0); + assert_eq!(batch.num_rows() + batch2.num_rows(), 3); + } +} From 714a5a97024c86c25d2fb029916dc321b38f7abe Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 19 Mar 2026 15:37:38 +0800 Subject: [PATCH 3/6] add log --- datafusion/physical-plan/src/sorts/builder.rs | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index 1895e7ad07896..4a10cfaf88d76 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -22,7 +22,8 @@ use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; -use std::panic::{catch_unwind, AssertUnwindSafe}; +use log::warn; +use std::panic::{AssertUnwindSafe, catch_unwind}; use std::sync::Arc; #[derive(Debug, Copy, Clone, Default)] @@ -248,6 +249,7 @@ impl BatchBuilder { } } +<<<<<<< HEAD <<<<<<< HEAD /// Try to grow `reservation` so it covers at least `needed` bytes. /// @@ -265,11 +267,14 @@ pub(crate) fn try_grow_reservation_to_at_least( Ok(()) ======= /// Returns `true` if the error is an Arrow offset overflow error. +======= +/// Returns true if the error is an Arrow offset overflow. +>>>>>>> a53942d48 (add log) fn is_offset_overflow(e: &DataFusionError) -> bool { matches!( e, - DataFusionError::ArrowError(err, _) - if matches!(err.as_ref(), ArrowError::OffsetOverflowError(_)) + DataFusionError::ArrowError(boxed, _) + if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_)) ) >>>>>>> 967cf0a65 (Fix sort merge interleave overflow) } @@ -290,7 +295,9 @@ mod tests { use super::*; use arrow::array::StringArray; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, UnboundedMemoryPool}; + use datafusion_execution::memory_pool::{ + MemoryConsumer, MemoryPool, UnboundedMemoryPool, + }; /// Test that interleaving string columns whose combined byte length /// exceeds i32::MAX does not panic. Arrow's `interleave` panics with @@ -302,11 +309,7 @@ mod tests { // Each string is ~768 MB. Three rows total → ~2.3 GB > i32::MAX. let big_str: String = "x".repeat(768 * 1024 * 1024); - let schema = Arc::new(Schema::new(vec![Field::new( - "s", - DataType::Utf8, - false, - )])); + let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)])); let pool: Arc = Arc::new(UnboundedMemoryPool::default()); let reservation = MemoryConsumer::new("test").register(&pool); @@ -321,8 +324,7 @@ mod tests { for stream_idx in 0..3 { let array = StringArray::from(vec![big_str.as_str()]); let batch = - RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]) - .unwrap(); + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap(); builder.push_batch(stream_idx, batch).unwrap(); builder.push_row(stream_idx); } From 57d512095848220b657fbe56fd5e81dca3cb6dd9 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 19 Mar 2026 16:35:47 +0800 Subject: [PATCH 4/6] fix --- datafusion/physical-plan/src/sorts/merge.rs | 13 ++++++ .../src/sorts/sort_preserving_merge.rs | 43 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 80e062f434b16..89cb40ebcbb85 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -208,6 +208,19 @@ impl SortPreservingMergeStream { cx: &mut Context<'_>, ) -> Poll>> { if self.done { + // When `build_record_batch()` hits an i32 offset overflow (e.g. + // combined string offsets exceed 2 GB), it emits a partial batch + // and keeps the remaining rows in `self.in_progress.indices`. + // Drain those leftover rows before terminating the stream, + // otherwise they would be silently dropped. + // Repeated overflows are fine — each poll emits another partial + // batch until `in_progress` is fully drained. + if !self.in_progress.is_empty() { + let before = self.in_progress.len(); + let result = self.in_progress.build_record_batch(); + self.produced += before - self.in_progress.len(); + return Poll::Ready(result.transpose()); + } return Poll::Ready(None); } // Once all partitions have set their corresponding cursors for the loser tree, diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b1ee5b4d5e8da..0b60dba8896da 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -1539,4 +1539,47 @@ mod tests { Err(_) => exec_err!("SortPreservingMerge caused a deadlock"), } } + + /// Test that SortPreservingMerge with FETCH does not silently drop rows + /// when an interleave offset overflow forces a partial batch. + #[tokio::test] + async fn test_sort_merge_fetch_interleave_overflow() -> Result<()> { + // Each string is ~768 MB. Three rows total → ~2.3 GB > i32::MAX, + // which forces `build_record_batch` to emit a partial batch. + let big_str: String = "x".repeat(768 * 1024 * 1024); + + let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)])); + + // Create 3 single-row partitions, each with one large string. + let mut partitions = Vec::new(); + for _ in 0..3 { + let array = StringArray::from(vec![big_str.as_str()]); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(array) as ArrayRef], + )?; + partitions.push(vec![batch]); + } + + let input = TestMemoryExec::try_new_exec(&partitions, Arc::clone(&schema), None)?; + + let sort_exprs: LexOrdering = [PhysicalSortExpr::new_default(Arc::new( + Column::new("s", 0), + ) + as Arc)] + .into(); + + let spm = SortPreservingMergeExec::new(sort_exprs, input).with_fetch(Some(3)); + + let task_ctx = Arc::new(TaskContext::default()); + let batches = collect(Arc::new(spm), task_ctx).await?; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, 3, + "Expected all 3 rows to be emitted despite interleave overflow, got {total_rows}" + ); + + Ok(()) + } } From 5ab467621267d3ee1b312b5ee8f7660a75641894 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 19 Mar 2026 17:07:25 +0800 Subject: [PATCH 5/6] resolve conflicts --- datafusion/physical-plan/src/sorts/builder.rs | 58 +++++++------------ 1 file changed, 20 insertions(+), 38 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index 4a10cfaf88d76..440762c1a57c1 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -173,36 +173,23 @@ impl BatchBuilder { return Ok(None); } - let columns = (0..self.schema.fields.len()) - .map(|column_idx| { - let arrays: Vec<_> = self - .batches - .iter() - .map(|(_, batch)| batch.column(column_idx).as_ref()) - .collect(); - Ok(interleave(&arrays, &self.indices)?) - }) - .collect::>>()?; - - self.indices.clear(); - - // New cursors are only created once the previous cursor for the stream - // is finished. This means all remaining rows from all but the last batch - // for each stream have been yielded to the newly created record batch - // - // We can therefore drop all but the last batch for each stream - let mut batch_idx = 0; - let mut retained = 0; - self.batches.retain(|(stream_idx, batch)| { - let stream_cursor = &mut self.cursors[*stream_idx]; - let retain = stream_cursor.batch_idx == batch_idx; - batch_idx += 1; - - if retain { - stream_cursor.batch_idx = retained; - retained += 1; - } else { - self.batches_mem_used -= get_record_batch_memory_size(batch); + // Try interleaving all indices. On offset overflow, halve and retry. + let mut end = self.indices.len(); + let columns = loop { + match self.try_interleave_columns(&self.indices[..end]) { + Ok(cols) => break cols, + Err(e) if is_offset_overflow(&e) => { + end /= 2; + if end == 0 { + return Err(e); + } + warn!( + "Interleave offset overflow with {} rows, retrying with {}", + self.indices.len(), + end + ); + } + Err(e) => return Err(e), } }; @@ -228,7 +215,7 @@ impl BatchBuilder { stream_cursor.batch_idx = retained; retained += 1; } else { - self.reservation.shrink(get_record_batch_memory_size(batch)); + self.batches_mem_used -= get_record_batch_memory_size(batch); } retain }); @@ -249,8 +236,6 @@ impl BatchBuilder { } } -<<<<<<< HEAD -<<<<<<< HEAD /// Try to grow `reservation` so it covers at least `needed` bytes. /// /// When a reservation has been pre-loaded with bytes (e.g. via @@ -265,18 +250,15 @@ pub(crate) fn try_grow_reservation_to_at_least( reservation.try_grow(needed - reservation.size())?; } Ok(()) -======= -/// Returns `true` if the error is an Arrow offset overflow error. -======= +} + /// Returns true if the error is an Arrow offset overflow. ->>>>>>> a53942d48 (add log) fn is_offset_overflow(e: &DataFusionError) -> bool { matches!( e, DataFusionError::ArrowError(boxed, _) if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_)) ) ->>>>>>> 967cf0a65 (Fix sort merge interleave overflow) } /// Returns true if a caught panic payload looks like an Arrow offset overflow. From f62bdee3dd3da7cba930044a58b53c503da7b3a7 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 19 Mar 2026 17:35:03 +0800 Subject: [PATCH 6/6] fix --- datafusion/physical-plan/src/sorts/merge.rs | 12 +++- .../src/sorts/sort_preserving_merge.rs | 55 +++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 89cb40ebcbb85..c13d8ab7e6383 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -53,6 +53,14 @@ pub(crate) struct SortPreservingMergeStream { /// `fetch` limit. done: bool, + /// Whether buffered rows should be drained after `done` is set. + /// + /// This is enabled when we stop because the `fetch` limit has been + /// reached, allowing partial batches left over after overflow handling to + /// be emitted on subsequent polls. It remains disabled for terminal + /// errors so the stream does not yield data after returning `Err`. + drain_in_progress_on_done: bool, + /// A loser tree that always produces the minimum cursor /// /// Node 0 stores the top winner, Nodes 1..num_streams store @@ -164,6 +172,7 @@ impl SortPreservingMergeStream { streams, metrics, done: false, + drain_in_progress_on_done: false, cursors: (0..stream_count).map(|_| None).collect(), prev_cursors: (0..stream_count).map(|_| None).collect(), round_robin_tie_breaker_mode: false, @@ -215,7 +224,7 @@ impl SortPreservingMergeStream { // otherwise they would be silently dropped. // Repeated overflows are fine — each poll emits another partial // batch until `in_progress` is fully drained. - if !self.in_progress.is_empty() { + if self.drain_in_progress_on_done && !self.in_progress.is_empty() { let before = self.in_progress.len(); let result = self.in_progress.build_record_batch(); self.produced += before - self.in_progress.len(); @@ -296,6 +305,7 @@ impl SortPreservingMergeStream { // stop sorting if fetch has been reached if self.fetch_reached() { self.done = true; + self.drain_in_progress_on_done = true; } else if self.in_progress.len() < self.batch_size { continue; } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 0b60dba8896da..3a5dd8c776670 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -1540,6 +1540,61 @@ mod tests { } } + #[tokio::test] + async fn test_sort_merge_stops_after_error_with_buffered_rows() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); + let sort: LexOrdering = [PhysicalSortExpr::new_default(Arc::new(Column::new( + "i", 0, + )) + as Arc)] + .into(); + + let mut stream0 = RecordBatchReceiverStream::builder(Arc::clone(&schema), 2); + let tx0 = stream0.tx(); + let schema0 = Arc::clone(&schema); + stream0.spawn(async move { + let batch = + RecordBatch::try_new(schema0, vec![Arc::new(Int32Array::from(vec![1]))])?; + tx0.send(Ok(batch)).await.unwrap(); + tx0.send(exec_err!("stream failure")).await.unwrap(); + Ok(()) + }); + + let mut stream1 = RecordBatchReceiverStream::builder(Arc::clone(&schema), 1); + let tx1 = stream1.tx(); + let schema1 = Arc::clone(&schema); + stream1.spawn(async move { + let batch = + RecordBatch::try_new(schema1, vec![Arc::new(Int32Array::from(vec![2]))])?; + tx1.send(Ok(batch)).await.unwrap(); + Ok(()) + }); + + let metrics = ExecutionPlanMetricsSet::new(); + let reservation = + MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool); + + let mut merge_stream = StreamingMergeBuilder::new() + .with_streams(vec![stream0.build(), stream1.build()]) + .with_schema(Arc::clone(&schema)) + .with_expressions(&sort) + .with_metrics(BaselineMetrics::new(&metrics, 0)) + .with_batch_size(task_ctx.session_config().batch_size()) + .with_fetch(None) + .with_reservation(reservation) + .build()?; + + let first = merge_stream.next().await.unwrap(); + assert!(first.is_err(), "expected merge stream to surface the error"); + assert!( + merge_stream.next().await.is_none(), + "merge stream yielded data after returning an error" + ); + + Ok(()) + } + /// Test that SortPreservingMerge with FETCH does not silently drop rows /// when an interleave offset overflow forces a partial batch. #[tokio::test]