Skip to content

MinAccumulator over List<T> drops inner-Field metadata for non-null row groups, causing concat / comparison failures #21981

@adriangb

Description

@adriangb

Describe the bug

MinAccumulator (and MaxAccumulator) over a List<Utf8> column whose inner item Field carries metadata produces ScalarValue::List results whose inner-Field metadata is path-dependent: preserved for all-null row groups but stripped for row groups that contain at least one non-null value. Concatenating those per-row-group results via ScalarValue::iter_to_array then fails with a type mismatch, and a SQL MIN/MAX over such a column with multiple partitions surfaces the same divergence as Internal error: cannot compare values of different or incompatible types.

The same shape holds for LargeList and FixedSizeList. Struct / Map / Union / RunEndEncoded are unaffected because their try_from_array paths preserve the source array's full DataType.

Root cause

The non-null path goes through ScalarValue::try_from_array:

DataType::List(field) => {
    let list_array = array.as_list::<i32>();
    let nested_array = list_array.value(index);
    SingleRowListArrayBuilder::new(nested_array)
        .with_field(field)
        .build_list_scalar()
}

But SingleRowListArrayBuilder::with_field only copies name and nullable:

pub fn with_field(self, field: &Field) -> Self {
    self.with_field_name(Some(field.name().to_owned()))
        .with_nullable(field.is_nullable())
}

Then into_field_and_arr builds a fresh Field::new(name, data_type, nullable) with empty metadata.

The all-null path in min_max_batch_generic goes through ScalarValue::try_from(array.data_type())try_new_null:

DataType::List(field_ref) => ScalarValue::List(Arc::new(
    GenericListArray::new_null(Arc::clone(field_ref), 1),
)),

Arc::clone(field_ref) preserves the full Field, so metadata survives. The two paths return ScalarValues with different inner-Field metadata for the same logical column.

To Reproduce

Cargo.toml:

[dependencies]
datafusion = "53.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

src/main.rs:

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::arrow::array::{Array, ArrayRef, ListArray, RecordBatch, StringArray};
use datafusion::arrow::buffer::OffsetBuffer;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use datafusion::functions_aggregate::min_max::MinAccumulator;
use datafusion::logical_expr::Accumulator;
use datafusion::prelude::SessionContext;
use datafusion::scalar::ScalarValue;

fn inner_with_metadata() -> Arc<Field> {
    let mut md = HashMap::new();
    md.insert("ARROW:extension:name".to_string(), "arrow.json".to_string());
    md.insert("ARROW:extension:metadata".to_string(), "{}".to_string());
    Arc::new(Field::new("item", DataType::Utf8, true).with_metadata(md))
}

fn dump(label: &str, sv: &ScalarValue) {
    if let ScalarValue::List(arr) = sv {
        if let DataType::List(inner) = arr.data_type() {
            println!("  {label}: inner_field metadata={:?}", inner.metadata());
            return;
        }
    }
    println!("  {label}: {sv:?}");
}

fn unit_repro() {
    println!("== Direct MinAccumulator reproducer ==");
    let inner = inner_with_metadata();
    let list_dt = DataType::List(Arc::clone(&inner));

    // Row group 1: all NULL → metadata preserved
    let mut acc1 = MinAccumulator::try_new(&list_dt).unwrap();
    let null_arr: ArrayRef = Arc::new(ListArray::new_null(Arc::clone(&inner), 2));
    acc1.update_batch(&[null_arr]).unwrap();
    let sv1 = acc1.evaluate().unwrap();
    dump("row group 1 (all-null)", &sv1);

    // Row group 2: has values → metadata stripped
    let mut acc2 = MinAccumulator::try_new(&list_dt).unwrap();
    let values: ArrayRef = Arc::new(StringArray::from(vec!["alpha", "beta"]));
    let offsets = OffsetBuffer::new(vec![0_i32, 1, 2].into());
    let arr_vals: ArrayRef = Arc::new(ListArray::new(Arc::clone(&inner), offsets, values, None));
    acc2.update_batch(&[arr_vals]).unwrap();
    let sv2 = acc2.evaluate().unwrap();
    dump("row group 2 (values)", &sv2);

    println!(
        "  iter_to_array: {:?}",
        ScalarValue::iter_to_array(vec![sv1, sv2]).map(|a| a.data_type().clone())
    );
}

async fn sql_repro() -> datafusion::error::Result<()> {
    println!();
    println!("== SQL MIN() reproducer ==");
    let inner = inner_with_metadata();
    let list_dt = DataType::List(Arc::clone(&inner));
    let schema = Arc::new(Schema::new(vec![Field::new("col", list_dt, true)]));

    // Two partitions: one all-null row, one non-null row.
    let null_arr: ArrayRef = Arc::new(ListArray::new_null(Arc::clone(&inner), 1));
    let batch_a = RecordBatch::try_new(schema.clone(), vec![null_arr])?;

    let values: ArrayRef = Arc::new(StringArray::from(vec!["alpha"]));
    let offsets = OffsetBuffer::new(vec![0_i32, 1].into());
    let list_arr: ArrayRef = Arc::new(ListArray::new(Arc::clone(&inner), offsets, values, None));
    let batch_b = RecordBatch::try_new(schema.clone(), vec![list_arr])?;

    let table = MemTable::try_new(schema, vec![vec![batch_a], vec![batch_b]])?;
    let ctx = SessionContext::new();
    ctx.register_table("t", Arc::new(table))?;

    let df = ctx.sql("SELECT MIN(col) AS m FROM t").await?;
    match df.collect().await {
        Ok(batches) => println!("  result: {batches:?}"),
        Err(e) => println!("  SQL FAILED: {e}"),
    }
    Ok(())
}

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    unit_repro();
    sql_repro().await
}

Expected behavior

MinAccumulator over a List<Utf8> column should produce ScalarValue::List values with consistent inner-Field metadata across row groups, regardless of whether a particular row group is all-null. ScalarValue::iter_to_array over those values should succeed, and SELECT MIN(col) over a multi-partition table should not error with "cannot compare values of different or incompatible types".

Actual output

== Direct MinAccumulator reproducer ==
  row group 1 (all-null): inner_field metadata={"ARROW:extension:name": "arrow.json", "ARROW:extension:metadata": "{}"}
  row group 2 (values): inner_field metadata={}
  iter_to_array: Err(ArrowError(InvalidArgumentError("It is not possible to concatenate arrays of different data types (List(Utf8, metadata: {\"ARROW:extension:metadata\": \"{}\", \"ARROW:extension:name\": \"arrow.json\"}), List(Utf8)).")))

== SQL MIN() reproducer ==
  SQL FAILED: Internal error: cannot compare values of different or incompatible types: List() vs List([alpha]).

Suggested fix

Extend SingleRowListArrayBuilder to carry a full Arc<Field> (not just field_name + nullable) and use it directly in into_field_and_arr rather than constructing a fresh Field. The existing with_field callers in try_from_array for List / LargeList / FixedSizeList / ListView / LargeListView (and the try_new_null arms for FixedSizeList / ListView / LargeListView that use with_field) would then preserve the inner-Field metadata.

Struct, Map, Union, and RunEndEncoded arms of try_from_array already preserve metadata (via array.slice or explicit Arc::clone), so the fix is scoped to the SingleRowListArrayBuilder path.

Additional context

  • Surfaced in production while writing zone-map-style stats files: per-row-group MinAccumulator::evaluate() results are concatenated via ScalarValue::iter_to_array to form a stats batch. Mixed metadata across row groups makes the concat fail at write time. This is what got us to track down the underlying mechanism.
  • Bisected: bug exists in 53.1.0 (latest release as of filing) and on main at with_field line 449-452.
  • Datafusion version tested: datafusion = "53.1" from crates.io.
  • Rust version: stable.

API breaking changes

No (the proposed fix adds a field to SingleRowListArrayBuilder and changes with_field's body; signature is unchanged).

Component(s)

common, functions-aggregate

Metadata

Metadata

Labels

bugSomething isn't working

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions