Conversation
|
I can directly capture the panic then fix |
kosiew
left a comment
There was a problem hiding this comment.
👋 @xudong963,
THanks for working on this.
| match catch_unwind(AssertUnwindSafe(|| interleave(&arrays, indices))) { | ||
| Ok(result) => Ok(result?), | ||
| Err(panic_payload) => { | ||
| if is_overflow_panic(&panic_payload) { |
There was a problem hiding this comment.
Catching any panic whose message merely contains "overflow" is too broad for a recovery path in the merge operator.
This now converts unrelated bugs such as Rust arithmetic overflows ("attempt to multiply with overflow") or allocation failures like "capacity overflow" into a synthetic OffsetOverflowError, causing the stream to silently split batches instead of surfacing the real defect.
Since this code is on the hot path and intentionally swallows panics, I think we need a tighter discriminator before merging. Ideally the overflow detection should match the specific Arrow panic we expect, or be isolated behind a smaller helper/API so we are not turning arbitrary panics into data-dependent control flow.
| /// panic and retries with fewer rows until the output fits in i32 | ||
| /// offsets. | ||
| #[test] | ||
| fn test_interleave_overflow_is_caught() { |
There was a problem hiding this comment.
this and test_sort_merge_fetch_interleave_overflow
allocate enormous strings (768 * 1024 * 1024 bytes each) and then materialize them into multiple StringArrays.
In practice that means several gigabytes of heap allocation per test, which is likely to make CI flaky or OOM outright.
The coverage is important, but I do not think these tests are better replaced with a lower-memory reproduction, for example by constructing the overflow condition with a purpose-built array fixture/helper instead of copying multi-GB payloads into StringArrays.
| cx: &mut Context<'_>, | ||
| ) -> Poll<Option<Result<RecordBatch>>> { | ||
| if self.done { | ||
| // When `build_record_batch()` hits an i32 offset overflow (e.g. |
There was a problem hiding this comment.
The done branch and the normal emit path both repeat the same before = len(); build_record_batch(); produced += ... bookkeeping.
This feels like it wants a small helper on SortPreservingMergeStream or BatchBuilder so the overflow/drain behavior stays in one place.
| } else { | ||
| self.batches_mem_used -= get_record_batch_memory_size(batch); | ||
| // Try interleaving all indices. On offset overflow, halve and retry. | ||
| let mut end = self.indices.len(); |
There was a problem hiding this comment.
The retry loop is clear, but I think end is really "rows_to_emit".
Renaming that variable or extracting a helper like build_partial_record_batch would make the control flow a bit easier to scan now that build_record_batch has to coordinate retry, draining, and delayed cleanup.
Which issue does this PR close?
Rationale for this change
When SortPreservingMergeStream merges batches containing large string/binary columns whose combined offsets exceed i32::MAX, Arrow's interleave panics with .expect("overflow"). This PR catches that panic and retries with progressively fewer rows, producing smaller output batches that fit within i32 offset limits.
What changes are included in this PR?
Are these changes tested?
Yes UT
Are there any user-facing changes?