Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 157 additions & 34 deletions datafusion/physical-plan/src/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
use crate::spill::get_record_batch_memory_size;
use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryReservation;
use log::warn;
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::sync::Arc;

#[derive(Debug, Copy, Clone, Default)]
Expand Down Expand Up @@ -126,49 +129,97 @@ impl BatchBuilder {
&self.schema
}

/// Try to interleave all columns using the given index slice.
fn try_interleave_columns(
&self,
indices: &[(usize, usize)],
) -> Result<Vec<Arc<dyn arrow::array::Array>>> {
(0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
// Arrow's interleave panics on i32 offset overflow with
// `.expect("overflow")`. Catch that panic so the caller
// can retry with fewer rows.
match catch_unwind(AssertUnwindSafe(|| interleave(&arrays, indices))) {
Ok(result) => Ok(result?),
Err(panic_payload) => {
if is_overflow_panic(&panic_payload) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching any panic whose message merely contains "overflow" is too broad for a recovery path in the merge operator.

This now converts unrelated bugs such as Rust arithmetic overflows ("attempt to multiply with overflow") or allocation failures like "capacity overflow" into a synthetic OffsetOverflowError, causing the stream to silently split batches instead of surfacing the real defect.
Since this code is on the hot path and intentionally swallows panics, I think we need a tighter discriminator before merging. Ideally the overflow detection should match the specific Arrow panic we expect, or be isolated behind a smaller helper/API so we are not turning arbitrary panics into data-dependent control flow.

Err(DataFusionError::ArrowError(
Box::new(ArrowError::OffsetOverflowError(0)),
None,
))
} else {
std::panic::resume_unwind(panic_payload);
}
}
}
})
.collect::<Result<Vec<_>>>()
}

/// Drains the in_progress row indexes, and builds a new RecordBatch from them
///
/// Will then drop any batches for which all rows have been yielded to the output
/// Will then drop any batches for which all rows have been yielded to the output.
/// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX),
/// retries with progressively fewer rows until it succeeds.
///
/// Returns `None` if no pending rows
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
if self.is_empty() {
return Ok(None);
}

let columns = (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
Ok(interleave(&arrays, &self.indices)?)
})
.collect::<Result<Vec<_>>>()?;

self.indices.clear();

// New cursors are only created once the previous cursor for the stream
// is finished. This means all remaining rows from all but the last batch
// for each stream have been yielded to the newly created record batch
//
// We can therefore drop all but the last batch for each stream
let mut batch_idx = 0;
let mut retained = 0;
self.batches.retain(|(stream_idx, batch)| {
let stream_cursor = &mut self.cursors[*stream_idx];
let retain = stream_cursor.batch_idx == batch_idx;
batch_idx += 1;

if retain {
stream_cursor.batch_idx = retained;
retained += 1;
} else {
self.batches_mem_used -= get_record_batch_memory_size(batch);
// Try interleaving all indices. On offset overflow, halve and retry.
let mut end = self.indices.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry loop is clear, but I think end is really "rows_to_emit".
Renaming that variable or extracting a helper like build_partial_record_batch would make the control flow a bit easier to scan now that build_record_batch has to coordinate retry, draining, and delayed cleanup.

let columns = loop {
match self.try_interleave_columns(&self.indices[..end]) {
Ok(cols) => break cols,
Err(e) if is_offset_overflow(&e) => {
end /= 2;
if end == 0 {
return Err(e);
}
warn!(
"Interleave offset overflow with {} rows, retrying with {}",
self.indices.len(),
end
);
}
Err(e) => return Err(e),
}
retain
});
};

// Remove consumed indices, keeping any remaining for the next call.
self.indices.drain(..end);

// Only clean up fully-consumed batches when all indices are drained,
// because remaining indices may still reference earlier batches.
if self.indices.is_empty() {
// New cursors are only created once the previous cursor for the stream
// is finished. This means all remaining rows from all but the last batch
// for each stream have been yielded to the newly created record batch
//
// We can therefore drop all but the last batch for each stream
let mut batch_idx = 0;
let mut retained = 0;
self.batches.retain(|(stream_idx, batch)| {
let stream_cursor = &mut self.cursors[*stream_idx];
let retain = stream_cursor.batch_idx == batch_idx;
batch_idx += 1;

if retain {
stream_cursor.batch_idx = retained;
retained += 1;
} else {
self.batches_mem_used -= get_record_batch_memory_size(batch);
}
retain
});
}

// Release excess memory back to the pool, but never shrink below
// initial_reservation to maintain the anti-starvation guarantee
Expand Down Expand Up @@ -200,3 +251,75 @@ pub(crate) fn try_grow_reservation_to_at_least(
}
Ok(())
}

/// Returns true if the error is an Arrow offset overflow.
fn is_offset_overflow(e: &DataFusionError) -> bool {
matches!(
e,
DataFusionError::ArrowError(boxed, _)
if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_))
)
}

/// Returns true if a caught panic payload looks like an Arrow offset overflow.
fn is_overflow_panic(payload: &Box<dyn std::any::Any + Send>) -> bool {
if let Some(msg) = payload.downcast_ref::<&str>() {
return msg.contains("overflow");
}
if let Some(msg) = payload.downcast_ref::<String>() {
return msg.contains("overflow");
}
false
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_execution::memory_pool::{
MemoryConsumer, MemoryPool, UnboundedMemoryPool,
};

/// Test that interleaving string columns whose combined byte length
/// exceeds i32::MAX does not panic. Arrow's `interleave` panics with
/// `.expect("overflow")` in this case; `BatchBuilder` catches the
/// panic and retries with fewer rows until the output fits in i32
/// offsets.
#[test]
fn test_interleave_overflow_is_caught() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and test_sort_merge_fetch_interleave_overflow
allocate enormous strings (768 * 1024 * 1024 bytes each) and then materialize them into multiple StringArrays.

In practice that means several gigabytes of heap allocation per test, which is likely to make CI flaky or OOM outright.

The coverage is important, but I do not think these tests are better replaced with a lower-memory reproduction, for example by constructing the overflow condition with a purpose-built array fixture/helper instead of copying multi-GB payloads into StringArrays.

// Each string is ~768 MB. Three rows total → ~2.3 GB > i32::MAX.
let big_str: String = "x".repeat(768 * 1024 * 1024);

let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)]));

let pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
let reservation = MemoryConsumer::new("test").register(&pool);
let mut builder = BatchBuilder::new(
Arc::clone(&schema),
/* stream_count */ 3,
/* batch_size */ 16,
reservation,
);

// Push one batch per stream, each containing one large string.
for stream_idx in 0..3 {
let array = StringArray::from(vec![big_str.as_str()]);
let batch =
RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap();
builder.push_batch(stream_idx, batch).unwrap();
builder.push_row(stream_idx);
}

// 3 rows total; interleaving all 3 would overflow i32 offsets.
// The retry loop should halve until it succeeds.
let batch = builder.build_record_batch().unwrap().unwrap();
assert!(batch.num_rows() > 0);
assert!(batch.num_rows() < 3);

// Drain remaining rows.
let batch2 = builder.build_record_batch().unwrap().unwrap();
assert!(batch2.num_rows() > 0);
assert_eq!(batch.num_rows() + batch2.num_rows(), 3);
}
}
29 changes: 27 additions & 2 deletions datafusion/physical-plan/src/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {
/// `fetch` limit.
done: bool,

/// Whether buffered rows should be drained after `done` is set.
///
/// This is enabled when we stop because the `fetch` limit has been
/// reached, allowing partial batches left over after overflow handling to
/// be emitted on subsequent polls. It remains disabled for terminal
/// errors so the stream does not yield data after returning `Err`.
drain_in_progress_on_done: bool,

/// A loser tree that always produces the minimum cursor
///
/// Node 0 stores the top winner, Nodes 1..num_streams store
Expand Down Expand Up @@ -164,6 +172,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
streams,
metrics,
done: false,
drain_in_progress_on_done: false,
cursors: (0..stream_count).map(|_| None).collect(),
prev_cursors: (0..stream_count).map(|_| None).collect(),
round_robin_tie_breaker_mode: false,
Expand Down Expand Up @@ -208,6 +217,19 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
if self.done {
// When `build_record_batch()` hits an i32 offset overflow (e.g.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The done branch and the normal emit path both repeat the same before = len(); build_record_batch(); produced += ... bookkeeping.

This feels like it wants a small helper on SortPreservingMergeStream or BatchBuilder so the overflow/drain behavior stays in one place.

// combined string offsets exceed 2 GB), it emits a partial batch
// and keeps the remaining rows in `self.in_progress.indices`.
// Drain those leftover rows before terminating the stream,
// otherwise they would be silently dropped.
// Repeated overflows are fine — each poll emits another partial
// batch until `in_progress` is fully drained.
if self.drain_in_progress_on_done && !self.in_progress.is_empty() {
let before = self.in_progress.len();
let result = self.in_progress.build_record_batch();
self.produced += before - self.in_progress.len();
return Poll::Ready(result.transpose());
}
return Poll::Ready(None);
}
// Once all partitions have set their corresponding cursors for the loser tree,
Expand Down Expand Up @@ -283,14 +305,17 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
// stop sorting if fetch has been reached
if self.fetch_reached() {
self.done = true;
self.drain_in_progress_on_done = true;
} else if self.in_progress.len() < self.batch_size {
continue;
}
}

self.produced += self.in_progress.len();
let before = self.in_progress.len();
let result = self.in_progress.build_record_batch();
self.produced += before - self.in_progress.len();

return Poll::Ready(self.in_progress.build_record_batch().transpose());
return Poll::Ready(result.transpose());
}
}

Expand Down
98 changes: 98 additions & 0 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1539,4 +1539,102 @@ mod tests {
Err(_) => exec_err!("SortPreservingMerge caused a deadlock"),
}
}

#[tokio::test]
async fn test_sort_merge_stops_after_error_with_buffered_rows() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let sort: LexOrdering = [PhysicalSortExpr::new_default(Arc::new(Column::new(
"i", 0,
))
as Arc<dyn PhysicalExpr>)]
.into();

let mut stream0 = RecordBatchReceiverStream::builder(Arc::clone(&schema), 2);
let tx0 = stream0.tx();
let schema0 = Arc::clone(&schema);
stream0.spawn(async move {
let batch =
RecordBatch::try_new(schema0, vec![Arc::new(Int32Array::from(vec![1]))])?;
tx0.send(Ok(batch)).await.unwrap();
tx0.send(exec_err!("stream failure")).await.unwrap();
Ok(())
});

let mut stream1 = RecordBatchReceiverStream::builder(Arc::clone(&schema), 1);
let tx1 = stream1.tx();
let schema1 = Arc::clone(&schema);
stream1.spawn(async move {
let batch =
RecordBatch::try_new(schema1, vec![Arc::new(Int32Array::from(vec![2]))])?;
tx1.send(Ok(batch)).await.unwrap();
Ok(())
});

let metrics = ExecutionPlanMetricsSet::new();
let reservation =
MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool);

let mut merge_stream = StreamingMergeBuilder::new()
.with_streams(vec![stream0.build(), stream1.build()])
.with_schema(Arc::clone(&schema))
.with_expressions(&sort)
.with_metrics(BaselineMetrics::new(&metrics, 0))
.with_batch_size(task_ctx.session_config().batch_size())
.with_fetch(None)
.with_reservation(reservation)
.build()?;

let first = merge_stream.next().await.unwrap();
assert!(first.is_err(), "expected merge stream to surface the error");
assert!(
merge_stream.next().await.is_none(),
"merge stream yielded data after returning an error"
);

Ok(())
}

/// Test that SortPreservingMerge with FETCH does not silently drop rows
/// when an interleave offset overflow forces a partial batch.
#[tokio::test]
async fn test_sort_merge_fetch_interleave_overflow() -> Result<()> {
// Each string is ~768 MB. Three rows total → ~2.3 GB > i32::MAX,
// which forces `build_record_batch` to emit a partial batch.
let big_str: String = "x".repeat(768 * 1024 * 1024);

let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)]));

// Create 3 single-row partitions, each with one large string.
let mut partitions = Vec::new();
for _ in 0..3 {
let array = StringArray::from(vec![big_str.as_str()]);
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(array) as ArrayRef],
)?;
partitions.push(vec![batch]);
}

let input = TestMemoryExec::try_new_exec(&partitions, Arc::clone(&schema), None)?;

let sort_exprs: LexOrdering = [PhysicalSortExpr::new_default(Arc::new(
Column::new("s", 0),
)
as Arc<dyn PhysicalExpr>)]
.into();

let spm = SortPreservingMergeExec::new(sort_exprs, input).with_fetch(Some(3));

let task_ctx = Arc::new(TaskContext::default());
let batches = collect(Arc::new(spm), task_ctx).await?;

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows, 3,
"Expected all 3 rows to be emitted despite interleave overflow, got {total_rows}"
);

Ok(())
}
}
Loading