Skip to content

Panic in FULL OUTER JOIN with schema of (u32, FSL) when number of rows and size of FSL exceed u32::MAX and target_partition=1 #20673

@valkum

Description

@valkum

Describe the bug

FULL OUTER JOIN panics on tables with >23M rows containing a FixedSizeListArray column (e.g. embeddings) when you use a single partition.

Root cause: u32 overflow in take_value_indices_from_fixed_size_list (arrow-select take.rs). DataFusion's HashJoinExec concatenates all build-side batches into a single RecordBatch via concat_batches.
When the FixedSizeList column has more than u32::MAX / value_length rows, take() overflows internally and panics.

Not sure what the correct outcome should be. This might be the desired outcome, but if it is, it might be a bit underdocumented, and the panic is not very graceful.

To Reproduce

main.rs:

//! Reproducer: FULL OUTER JOIN panics on tables with >23M rows containing a
//! `FixedSizeListArray` column (e.g. embeddings).
//!
//! Root cause: u32 overflow in `take_value_indices_from_fixed_size_list`
//! (arrow-select take.rs). DataFusion's `HashJoinExec` concatenates all
//! build-side batches into a single `RecordBatch` via `concat_batches`.
//! When the `FixedSizeList` column has more than `u32::MAX / value_length`
//! rows, `take()` overflows internally and panics with:
//!
//!     assertion failed: (offset + length) <= self.len()
//!
//! For value_length=184, the threshold is 23,342,213 rows.
//!
//! Uses FixedSizeList(Boolean, 184) to keep memory at ~600 MB. The same bug
//! triggers with Float32 embeddings but would require ~17 GB.
//!
//! Run:  cargo run --release          (requires ~1.5 GB RAM, panics)
//!       cargo run --release -- 5000  (5k rows, passes)
//!       cargo run --release -- 25000000 2  (25M rows, 2 partitions, passes)

use arrow_array::{BooleanArray, FixedSizeListArray, RecordBatch, UInt32Array};
use arrow_buffer::{BooleanBuffer, MutableBuffer};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use std::sync::Arc;

const DIM: i32 = 184;
const NUM_ROWS: usize = 24_000_000;
const CHUNK: usize = 1_000_000;

#[tokio::main]
async fn main() {
    let n: usize = std::env::args()
        .nth(1)
        .and_then(|s| s.parse().ok())
        .unwrap_or(NUM_ROWS);
    let partitions: usize = std::env::args()
        .nth(2)
        .and_then(|s| s.parse().ok())
        .unwrap_or(1);

    let threshold = (u32::MAX as u64) / (DIM as u64);
    eprintln!("rows={n}, dim={DIM}, overflow threshold={threshold}");

    // target_partitions=1 ensures all build-side batches are concatenated into
    // a single RecordBatch, matching what happens in practice (e.g. lance's
    // merge_insert v1 path sets target_partitions(1) for the FULL OUTER JOIN).
    let config = SessionConfig::default().with_target_partitions(partitions);
    let ctx = SessionContext::new_with_config(config);

    eprintln!("creating {n}-row table...");
    let table = MemTable::try_new(schema(), vec![make_batches(n)]).unwrap();
    ctx.register_table("t", Arc::new(table)).unwrap();

    // Self-join: both sides have 24M rows with FixedSizeList(Boolean, 184).
    // Whichever side becomes the HashJoin build side will trigger the overflow.
    eprintln!("running FULL OUTER JOIN...");
    let df = ctx
        .sql("SELECT * FROM t AS a FULL OUTER JOIN t AS b ON a.id = b.id")
        .await
        .unwrap();
    let _results = df.collect().await.unwrap();

    eprintln!("join succeeded (bug not triggered at this scale)");
}

fn schema() -> Arc<Schema> {
    Arc::new(Schema::new(vec![
        Field::new("id", DataType::UInt32, false),
        Field::new(
            "vec",
            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Boolean, true)), DIM),
            true,
        ),
    ]))
}

fn make_batch(start: usize, count: usize) -> RecordBatch {
    let ids: Vec<u32> = (start..start + count).map(|i| i as u32).collect();
    let n = count * DIM as usize;
    let buf: arrow_buffer::Buffer = MutableBuffer::from_len_zeroed((n + 7) / 8).into();
    let bools = BooleanArray::new(BooleanBuffer::new(buf, 0, n), None);
    let fsl = FixedSizeListArray::new(
        Arc::new(Field::new("item", DataType::Boolean, true)),
        DIM,
        Arc::new(bools),
        None,
    );
    RecordBatch::try_new(
        schema(),
        vec![Arc::new(UInt32Array::from(ids)), Arc::new(fsl)],
    )
    .unwrap()
}

fn make_batches(total: usize) -> Vec<RecordBatch> {
    let mut result = Vec::new();
    let mut off = 0;
    while off < total {
        let n = (total - off).min(CHUNK);
        result.push(make_batch(off, n));
        off += n;
    }
    result
}

Cargo.toml

[workspace]

[package]
name = "arrow-take-fsl-overflow"
version = "0.1.0"
edition = "2021"

[dependencies]
arrow-array = "58"
arrow-buffer = "58"
arrow-schema = "58"
datafusion = "52"
tokio = { version = "1", features = ["full"] }

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions