From 9d40c16b6f806d58bbdade936c8549b95178bded Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 17:56:27 +0800 Subject: [PATCH 01/33] fix: replace direct cast with cast_column utility for better type handling in statistics --- datafusion/pruning/src/pruning_predicate.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 5e92dbe227fd..1699fa9b8d0a 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -35,13 +35,13 @@ use datafusion_physical_plan::metrics::Count; use log::{debug, trace}; use datafusion_common::error::{DataFusionError, Result}; +use datafusion_common::format::DEFAULT_CAST_OPTIONS; use datafusion_common::tree_node::TransformedResult; use datafusion_common::{ - internal_err, plan_datafusion_err, plan_err, + cast_column, internal_err, plan_datafusion_err, plan_err, tree_node::{Transformed, TreeNode}, - ScalarValue, + Column, DFSchema, ScalarValue, }; -use datafusion_common::{Column, DFSchema}; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; @@ -929,7 +929,7 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - let array = arrow::compute::cast(&array, data_type)?; + let array = cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)?; arrays.push(array); } From 43778453cd64f94267aa524cb638fc7ff3cad383 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 18:00:22 +0800 Subject: [PATCH 02/33] fix: enhance type casting for Utf8 data in statistics to ensure safe conversions --- datafusion/pruning/src/pruning_predicate.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 1699fa9b8d0a..be59578e38f3 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -929,7 +929,14 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - let array = cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)?; + let array = + if data_type == &DataType::Utf8 && array.data_type() == &DataType::Binary { + let mut cast_options = DEFAULT_CAST_OPTIONS; + cast_options.safe = true; + arrow::compute::cast_with_options(&array, data_type, &cast_options)? + } else { + cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? + }; arrays.push(array); } From 5a73f7da4e4337b635593283e72b15a77a67a2ff Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 18:04:43 +0800 Subject: [PATCH 03/33] test: add unit test for building statistics with struct columns --- datafusion/pruning/src/pruning_predicate.rs | 139 +++++++++++++++++++- 1 file changed, 137 insertions(+), 2 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index be59578e38f3..f123baea0f11 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -1878,9 +1878,12 @@ mod tests { use datafusion_expr::{and, col, lit, or}; use insta::assert_snapshot; - use arrow::array::Decimal128Array; + use arrow::array::{Array, Decimal128Array}; use arrow::{ - array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array}, + array::{ + ArrayRef, BinaryArray, BooleanArray, Int32Array, Int64Array, StringArray, + StructArray, UInt64Array, + }, datatypes::TimeUnit, }; use datafusion_expr::expr::InList; @@ -2503,6 +2506,138 @@ mod tests { "); } + #[test] + fn test_build_statistics_struct_column() { + let flag_field = Arc::new(Field::new("flag", DataType::Boolean, true)); + let label_field = Arc::new(Field::new("label", DataType::Utf8, true)); + let nested_struct_fields = + vec![flag_field.as_ref().clone(), label_field.as_ref().clone()]; + let nested_field = Arc::new(Field::new( + "nested", + DataType::Struct(nested_struct_fields.clone().into()), + true, + )); + let id_field = Arc::new(Field::new("id", DataType::Int32, true)); + let struct_type = DataType::Struct( + vec![id_field.as_ref().clone(), nested_field.as_ref().clone()].into(), + ); + + let nested_min = Arc::new(StructArray::from(vec![ + ( + flag_field.clone(), + Arc::new(BooleanArray::from(vec![Some(true), Some(false)])) as ArrayRef, + ), + ( + label_field.clone(), + Arc::new(StringArray::from(vec![Some("alpha"), None])) as ArrayRef, + ), + ])); + + let nested_max = Arc::new(StructArray::from(vec![ + ( + flag_field.clone(), + Arc::new(BooleanArray::from(vec![Some(true), Some(true)])) as ArrayRef, + ), + ( + label_field.clone(), + Arc::new(StringArray::from(vec![Some("omega"), Some("middle")])) + as ArrayRef, + ), + ])); + + let struct_min = Arc::new(StructArray::from(vec![ + ( + id_field.clone(), + Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef, + ), + (nested_field.clone(), nested_min.clone() as ArrayRef), + ])); + + let struct_max = Arc::new(StructArray::from(vec![ + ( + id_field.clone(), + Arc::new(Int32Array::from(vec![Some(10), Some(20)])) as ArrayRef, + ), + (nested_field.clone(), nested_max.clone() as ArrayRef), + ])); + + let container_stats = ContainerStats::new() + .with_min(struct_min.clone() as ArrayRef) + .with_max(struct_max.clone() as ArrayRef); + + let statistics = TestStatistics::new().with("struct_col", container_stats); + + let required_columns = RequiredColumns::from(vec![ + ( + phys_expr::Column::new("struct_col", 0), + StatisticsType::Min, + Field::new("struct_col_min", struct_type.clone(), true), + ), + ( + phys_expr::Column::new("struct_col", 0), + StatisticsType::Max, + Field::new("struct_col_max", struct_type.clone(), true), + ), + ]); + + let batch = + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 2); + + let min_column = batch.column(0).clone(); + let max_column = batch.column(1).clone(); + + assert_eq!(min_column.data_type(), &struct_type); + assert_eq!(max_column.data_type(), &struct_type); + let min_struct = min_column + .as_ref() + .as_any() + .downcast_ref::() + .unwrap(); + let max_struct = max_column + .as_ref() + .as_any() + .downcast_ref::() + .unwrap(); + + let min_nested = min_struct.column(1).clone(); + let max_nested = max_struct.column(1).clone(); + + assert_eq!(min_nested.data_type(), nested_field.data_type()); + assert_eq!(max_nested.data_type(), nested_field.data_type()); + + let expected_min_array: ArrayRef = struct_min.clone(); + let expected_max_array: ArrayRef = struct_max.clone(); + let expected_nested_min_array: ArrayRef = nested_min.clone(); + let expected_nested_max_array: ArrayRef = nested_max.clone(); + + for row in 0..batch.num_rows() { + let actual_min = ScalarValue::try_from_array(&min_column, row).unwrap(); + let expected_min = + ScalarValue::try_from_array(&expected_min_array, row).unwrap(); + assert_eq!(actual_min, expected_min); + + let actual_max = ScalarValue::try_from_array(&max_column, row).unwrap(); + let expected_max = + ScalarValue::try_from_array(&expected_max_array, row).unwrap(); + assert_eq!(actual_max, expected_max); + + let actual_nested_min = + ScalarValue::try_from_array(&min_nested, row).unwrap(); + let expected_nested_min = + ScalarValue::try_from_array(&expected_nested_min_array, row).unwrap(); + assert_eq!(actual_nested_min, expected_nested_min); + + let actual_nested_max = + ScalarValue::try_from_array(&max_nested, row).unwrap(); + let expected_nested_max = + ScalarValue::try_from_array(&expected_nested_max_array, row).unwrap(); + assert_eq!(actual_nested_max, expected_nested_max); + } + } + #[test] fn test_build_statistics_no_required_stats() { let required_columns = RequiredColumns::new(); From 0fbaeae596be842107e54aec871a3c4b6383bf01 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 18:28:15 +0800 Subject: [PATCH 04/33] test: add unit tests for handling struct statistics errors in build_statistics_record_batch --- datafusion/pruning/src/pruning_predicate.rs | 105 ++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index f123baea0f11..a5b4843fca4d 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -2709,6 +2709,111 @@ mod tests { ); } + #[test] + fn test_build_statistics_struct_incompatible_layout_error() { + // Request struct statistics but provide an incompatible nested layout + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("struct_col", 3), + StatisticsType::Min, + Field::new( + "struct_col_min", + DataType::Struct( + vec![Field::new( + "nested", + DataType::Struct( + vec![Field::new("value", DataType::Int32, true)].into(), + ), + true, + )] + .into(), + ), + true, + ), + )]); + + let statistics = OneContainerStats { + min_values: Some(Arc::new(StructArray::from(vec![( + Arc::new(Field::new("nested", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef, + )]))), + max_values: None, + num_containers: 1, + }; + + let result = + build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); + assert!( + result.to_string().contains( + "Cannot cast struct field 'nested' from type Int32 to type Struct" + ), + "{}", + result + ); + } + + #[test] + fn test_build_statistics_struct_non_struct_source_error() { + // Request struct statistics but provide a non-struct statistics array + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("struct_col", 3), + StatisticsType::Min, + Field::new( + "struct_col_min", + DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), + true, + ), + )]); + + let statistics = OneContainerStats { + min_values: Some(Arc::new(Int32Array::from(vec![Some(1)]))), + max_values: None, + num_containers: 1, + }; + + let result = + build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); + assert!( + result + .to_string() + .contains("Cannot cast column of type Int32 to struct type"), + "{}", + result + ); + } + + #[test] + fn test_build_statistics_struct_inconsistent_length() { + // Ensure mismatched statistics lengths for struct arrays still error out + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("struct_col", 3), + StatisticsType::Min, + Field::new( + "struct_col_min", + DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), + true, + ), + )]); + + let statistics = OneContainerStats { + min_values: Some(Arc::new(StructArray::from(vec![( + Arc::new(Field::new("value", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![Some(10)])) as ArrayRef, + )]))), + max_values: None, + num_containers: 2, + }; + + let result = + build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); + assert!( + result + .to_string() + .contains("mismatched statistics length. Expected 2, got 1"), + "{}", + result + ); + } + #[test] fn row_group_predicate_eq() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); From e6ea9e1e9f27357ad158a9a1a5f988041fbf516f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 18:52:55 +0800 Subject: [PATCH 05/33] fix: update casting options to use safe semantics for binary statistics --- datafusion/pruning/src/pruning_predicate.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index a5b4843fca4d..811d8fdb15f5 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::{ array::{new_null_array, ArrayRef, BooleanArray}, + compute::CastOptions, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; @@ -929,14 +930,14 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - let array = - if data_type == &DataType::Utf8 && array.data_type() == &DataType::Binary { - let mut cast_options = DEFAULT_CAST_OPTIONS; - cast_options.safe = true; - arrow::compute::cast_with_options(&array, data_type, &cast_options)? - } else { - cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? - }; + // Cast using "safe" semantics so invalid binary statistics for string fields + // produce `NULL` values instead of errors. + let cast_options = CastOptions { + safe: true, + ..DEFAULT_CAST_OPTIONS + }; + + let array = cast_column(&array, stat_field, &cast_options)?; arrays.push(array); } From fe257006c83e55bb1c16c9509c80614c56efd8ba Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 18:59:23 +0800 Subject: [PATCH 06/33] fix: use safe casting for binary statistics to prevent invalid UTF-8 errors --- datafusion/pruning/src/pruning_predicate.rs | 23 +++++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 811d8fdb15f5..6b87d9d71a33 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -25,7 +25,6 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::{ array::{new_null_array, ArrayRef, BooleanArray}, - compute::CastOptions, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; @@ -930,14 +929,20 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - // Cast using "safe" semantics so invalid binary statistics for string fields - // produce `NULL` values instead of errors. - let cast_options = CastOptions { - safe: true, - ..DEFAULT_CAST_OPTIONS - }; - - let array = cast_column(&array, stat_field, &cast_options)?; + let array = + if data_type == &DataType::Utf8 && array.data_type() == &DataType::Binary { + // Statistics coming from Parquet can store string columns in binary form and + // the bytes are not guaranteed to be valid UTF-8. `cast_column` always uses + // `DEFAULT_CAST_OPTIONS` (with `safe = false`) which would forward those raw + // bytes into a Utf8Array. We instead call Arrow's cast kernel directly with + // `safe = true` so Arrow validates each value and converts invalid sequences + // into nulls (see https://github.com/apache/arrow-rs/issues/3691). + let mut cast_options = DEFAULT_CAST_OPTIONS; + cast_options.safe = true; + arrow::compute::cast_with_options(&array, data_type, &cast_options)? + } else { + cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? + }; arrays.push(array); } From 7cac892935c1e62fa3e6ff641358493c32a42ad1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 19:12:35 +0800 Subject: [PATCH 07/33] feat: add support for nested struct column statistics in pruning logic --- datafusion/pruning/src/pruning_predicate.rs | 268 ++++++++++++-------- 1 file changed, 163 insertions(+), 105 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 6b87d9d71a33..2c19c0ac70fc 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -2275,6 +2275,138 @@ mod tests { } } + struct StructColumnStats { + struct_type: DataType, + nested_field: Arc, + struct_min: Arc, + struct_max: Arc, + nested_min: Arc, + nested_max: Arc, + statistics: TestStatistics, + } + + /// Builds statistics for a nested struct column used in the struct column + /// tests. + /// + /// The produced layout mirrors the `struct_col` column in the tests: + /// + /// ```text + /// struct_col: Struct< + /// id: Int32, + /// nested: Struct< + /// flag: Boolean, + /// label: Utf8, + /// >, + /// > + /// ``` + /// + /// The returned [`StructColumnStats`] provides the min/max arrays for both + /// the outer struct and its nested struct values as well as the + /// [`TestStatistics`] wrapper that exposes them to the pruning code. + fn make_struct_stats() -> StructColumnStats { + let flag_field = Arc::new(Field::new("flag", DataType::Boolean, true)); + let label_field = Arc::new(Field::new("label", DataType::Utf8, true)); + let nested_struct_fields = + vec![flag_field.as_ref().clone(), label_field.as_ref().clone()]; + let nested_field = Arc::new(Field::new( + "nested", + DataType::Struct(nested_struct_fields.clone().into()), + true, + )); + let id_field = Arc::new(Field::new("id", DataType::Int32, true)); + let struct_type = DataType::Struct( + vec![id_field.as_ref().clone(), nested_field.as_ref().clone()].into(), + ); + + let nested_min = Arc::new(StructArray::from(vec![ + ( + flag_field.clone(), + Arc::new(BooleanArray::from(vec![Some(true), Some(false)])) as ArrayRef, + ), + ( + label_field.clone(), + Arc::new(StringArray::from(vec![Some("alpha"), None])) as ArrayRef, + ), + ])); + + let nested_max = Arc::new(StructArray::from(vec![ + ( + flag_field.clone(), + Arc::new(BooleanArray::from(vec![Some(true), Some(true)])) as ArrayRef, + ), + ( + label_field.clone(), + Arc::new(StringArray::from(vec![Some("omega"), Some("middle")])) + as ArrayRef, + ), + ])); + + let struct_min = Arc::new(StructArray::from(vec![ + ( + id_field.clone(), + Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef, + ), + (nested_field.clone(), nested_min.clone() as ArrayRef), + ])); + + let struct_max = Arc::new(StructArray::from(vec![ + ( + id_field.clone(), + Arc::new(Int32Array::from(vec![Some(10), Some(20)])) as ArrayRef, + ), + (nested_field.clone(), nested_max.clone() as ArrayRef), + ])); + + let container_stats = ContainerStats::new() + .with_min(struct_min.clone() as ArrayRef) + .with_max(struct_max.clone() as ArrayRef); + + let statistics = TestStatistics::new().with("struct_col", container_stats); + + StructColumnStats { + struct_type, + nested_field, + struct_min, + struct_max, + nested_min, + nested_max, + statistics, + } + } + + /// Creates [`RequiredColumns`] entries for the `struct_col` statistics using + /// the provided struct layout and statistics kinds. + /// + /// When paired with [`make_struct_stats`], `struct_type` describes the + /// nested layout `Struct>`. + fn make_struct_required_columns( + struct_type: &DataType, + column_index: usize, + stat_types: &[StatisticsType], + ) -> RequiredColumns { + let column = phys_expr::Column::new("struct_col", column_index); + let required: Vec<_> = stat_types + .iter() + .map(|stat_type| { + let suffix = match stat_type { + StatisticsType::Min => "min", + StatisticsType::Max => "max", + StatisticsType::NullCount => "null_count", + StatisticsType::RowCount => "row_count", + }; + let field_name = format!("struct_col_{suffix}"); + ( + column.clone(), + *stat_type, + Field::new(&field_name, struct_type.clone(), true), + ) + }) + .collect(); + + RequiredColumns::from(required) + } + /// Row count should only be referenced once in the pruning expression, even if we need the row count /// for multiple columns. #[test] @@ -2514,78 +2646,22 @@ mod tests { #[test] fn test_build_statistics_struct_column() { - let flag_field = Arc::new(Field::new("flag", DataType::Boolean, true)); - let label_field = Arc::new(Field::new("label", DataType::Utf8, true)); - let nested_struct_fields = - vec![flag_field.as_ref().clone(), label_field.as_ref().clone()]; - let nested_field = Arc::new(Field::new( - "nested", - DataType::Struct(nested_struct_fields.clone().into()), - true, - )); - let id_field = Arc::new(Field::new("id", DataType::Int32, true)); - let struct_type = DataType::Struct( - vec![id_field.as_ref().clone(), nested_field.as_ref().clone()].into(), + let StructColumnStats { + struct_type, + nested_field, + struct_min, + struct_max, + nested_min, + nested_max, + statistics, + } = make_struct_stats(); + + let required_columns = make_struct_required_columns( + &struct_type, + 0, + &[StatisticsType::Min, StatisticsType::Max], ); - let nested_min = Arc::new(StructArray::from(vec![ - ( - flag_field.clone(), - Arc::new(BooleanArray::from(vec![Some(true), Some(false)])) as ArrayRef, - ), - ( - label_field.clone(), - Arc::new(StringArray::from(vec![Some("alpha"), None])) as ArrayRef, - ), - ])); - - let nested_max = Arc::new(StructArray::from(vec![ - ( - flag_field.clone(), - Arc::new(BooleanArray::from(vec![Some(true), Some(true)])) as ArrayRef, - ), - ( - label_field.clone(), - Arc::new(StringArray::from(vec![Some("omega"), Some("middle")])) - as ArrayRef, - ), - ])); - - let struct_min = Arc::new(StructArray::from(vec![ - ( - id_field.clone(), - Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef, - ), - (nested_field.clone(), nested_min.clone() as ArrayRef), - ])); - - let struct_max = Arc::new(StructArray::from(vec![ - ( - id_field.clone(), - Arc::new(Int32Array::from(vec![Some(10), Some(20)])) as ArrayRef, - ), - (nested_field.clone(), nested_max.clone() as ArrayRef), - ])); - - let container_stats = ContainerStats::new() - .with_min(struct_min.clone() as ArrayRef) - .with_max(struct_max.clone() as ArrayRef); - - let statistics = TestStatistics::new().with("struct_col", container_stats); - - let required_columns = RequiredColumns::from(vec![ - ( - phys_expr::Column::new("struct_col", 0), - StatisticsType::Min, - Field::new("struct_col_min", struct_type.clone(), true), - ), - ( - phys_expr::Column::new("struct_col", 0), - StatisticsType::Max, - Field::new("struct_col_max", struct_type.clone(), true), - ), - ]); - let batch = build_statistics_record_batch(&statistics, &required_columns).unwrap(); @@ -2718,24 +2794,16 @@ mod tests { #[test] fn test_build_statistics_struct_incompatible_layout_error() { // Request struct statistics but provide an incompatible nested layout - let required_columns = RequiredColumns::from(vec![( - phys_expr::Column::new("struct_col", 3), - StatisticsType::Min, - Field::new( - "struct_col_min", - DataType::Struct( - vec![Field::new( - "nested", - DataType::Struct( - vec![Field::new("value", DataType::Int32, true)].into(), - ), - true, - )] - .into(), - ), + let struct_type = DataType::Struct( + vec![Field::new( + "nested", + DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), true, - ), - )]); + )] + .into(), + ); + let required_columns = + make_struct_required_columns(&struct_type, 3, &[StatisticsType::Min]); let statistics = OneContainerStats { min_values: Some(Arc::new(StructArray::from(vec![( @@ -2760,15 +2828,10 @@ mod tests { #[test] fn test_build_statistics_struct_non_struct_source_error() { // Request struct statistics but provide a non-struct statistics array - let required_columns = RequiredColumns::from(vec![( - phys_expr::Column::new("struct_col", 3), - StatisticsType::Min, - Field::new( - "struct_col_min", - DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), - true, - ), - )]); + let struct_type = + DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()); + let required_columns = + make_struct_required_columns(&struct_type, 3, &[StatisticsType::Min]); let statistics = OneContainerStats { min_values: Some(Arc::new(Int32Array::from(vec![Some(1)]))), @@ -2790,15 +2853,10 @@ mod tests { #[test] fn test_build_statistics_struct_inconsistent_length() { // Ensure mismatched statistics lengths for struct arrays still error out - let required_columns = RequiredColumns::from(vec![( - phys_expr::Column::new("struct_col", 3), - StatisticsType::Min, - Field::new( - "struct_col_min", - DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), - true, - ), - )]); + let struct_type = + DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()); + let required_columns = + make_struct_required_columns(&struct_type, 3, &[StatisticsType::Min]); let statistics = OneContainerStats { min_values: Some(Arc::new(StructArray::from(vec![( From af03adae7409f062092baa96408849bd0046e1b8 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 19:36:16 +0800 Subject: [PATCH 08/33] fix: use safe casting options for binary statistics to handle invalid UTF-8 sequences --- datafusion/pruning/src/pruning_predicate.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 2c19c0ac70fc..df1163b61a07 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::{ array::{new_null_array, ArrayRef, BooleanArray}, + compute::CastOptions, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; @@ -929,17 +930,14 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - let array = - if data_type == &DataType::Utf8 && array.data_type() == &DataType::Binary { - // Statistics coming from Parquet can store string columns in binary form and - // the bytes are not guaranteed to be valid UTF-8. `cast_column` always uses - // `DEFAULT_CAST_OPTIONS` (with `safe = false`) which would forward those raw - // bytes into a Utf8Array. We instead call Arrow's cast kernel directly with - // `safe = true` so Arrow validates each value and converts invalid sequences - // into nulls (see https://github.com/apache/arrow-rs/issues/3691). - let mut cast_options = DEFAULT_CAST_OPTIONS; - cast_options.safe = true; - arrow::compute::cast_with_options(&array, data_type, &cast_options)? + let array = if data_type == &DataType::Utf8 && array.data_type() == &DataType::Binary { + // Statistics from Parquet may store string columns as binary bytes that are not + // guaranteed valid UTF-8. Use safe cast options so invalid sequences become nulls. + let cast_options = CastOptions { + safe: true, + ..DEFAULT_CAST_OPTIONS + }; + cast_column(&array, stat_field, &cast_options)? } else { cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? }; From 835e3627d79a4e77e379a42343fa33c8cd506885 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 19:41:42 +0800 Subject: [PATCH 09/33] fix: adjust indentation for clarity in statistics record batch casting logic --- datafusion/pruning/src/pruning_predicate.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index df1163b61a07..95daf8e7bc75 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -930,7 +930,8 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - let array = if data_type == &DataType::Utf8 && array.data_type() == &DataType::Binary { + let array = + if data_type == &DataType::Utf8 && array.data_type() == &DataType::Binary { // Statistics from Parquet may store string columns as binary bytes that are not // guaranteed valid UTF-8. Use safe cast options so invalid sequences become nulls. let cast_options = CastOptions { From 06cbcc8c2610287a560520dfa06b0a937ee5cd59 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 19:45:00 +0800 Subject: [PATCH 10/33] fix: enhance UTF-8 validation for binary statistics in pruning logic --- datafusion/pruning/src/pruning_predicate.rs | 78 ++++++++++++++------- 1 file changed, 51 insertions(+), 27 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 95daf8e7bc75..83de193b67c1 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -20,11 +20,12 @@ //! //! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html use std::collections::HashSet; +use std::str; use std::sync::Arc; use arrow::array::AsArray; use arrow::{ - array::{new_null_array, ArrayRef, BooleanArray}, + array::{new_null_array, ArrayRef, BinaryArray, BooleanArray}, compute::CastOptions, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, @@ -39,7 +40,7 @@ use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::format::DEFAULT_CAST_OPTIONS; use datafusion_common::tree_node::TransformedResult; use datafusion_common::{ - cast_column, internal_err, plan_datafusion_err, plan_err, + cast_column, internal_datafusion_err, internal_err, plan_datafusion_err, plan_err, tree_node::{Transformed, TreeNode}, Column, DFSchema, ScalarValue, }; @@ -908,6 +909,7 @@ fn build_statistics_record_batch( // For each needed statistics column: for (column, statistics_type, stat_field) in required_columns.iter() { let column = Column::from_name(column.name()); + let column_name = column.name().to_string(); let data_type = stat_field.data_type(); let num_containers = statistics.num_containers(); @@ -930,18 +932,48 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - let array = - if data_type == &DataType::Utf8 && array.data_type() == &DataType::Binary { - // Statistics from Parquet may store string columns as binary bytes that are not - // guaranteed valid UTF-8. Use safe cast options so invalid sequences become nulls. - let cast_options = CastOptions { - safe: true, - ..DEFAULT_CAST_OPTIONS - }; - cast_column(&array, stat_field, &cast_options)? - } else { - cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? + let array = if data_type == &DataType::Utf8 + && array.data_type() == &DataType::Binary + { + // Statistics from Parquet may store string columns as binary bytes that are not + // guaranteed valid UTF-8. First validate that all bytes are valid UTF-8. + let binary_array = + array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + internal_datafusion_err!( + "Statistics column '{}' expected BinaryArray but found {:?}", + column_name.clone(), + array.data_type() + ) + })?; + + // Check for invalid UTF-8 sequences and return a clear error + if let Some((row, err)) = + binary_array.iter().enumerate().find_map(|(row, value)| { + value.and_then(|bytes| { + str::from_utf8(bytes).err().map(|err| (row, err)) + }) + }) + { + return plan_err!( + "Statistics for column '{}' contains invalid UTF-8 data at row {}: {}", + column_name, + row, + err + ); + } + + // Use cast_column with safe casting to handle edge cases consistently + let cast_options = CastOptions { + safe: true, + ..DEFAULT_CAST_OPTIONS }; + cast_column(&array, stat_field, &cast_options)? + } else { + cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? + }; arrays.push(array); } @@ -2735,32 +2767,24 @@ mod tests { } #[test] - fn test_build_statistics_inconsistent_types() { - // Test requesting a Utf8 column when the stats return some other type - - // Request a record batch with of s1_min as a timestamp + fn test_build_statistics_invalid_utf8_error() { + // Request a record batch for a Utf8 column but provide invalid binary statistics let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("s3", 3), StatisticsType::Min, Field::new("s1_min", DataType::Utf8, true), )]); - // Note the statistics return an invalid UTF-8 sequence which will be converted to null + // Binary statistics contain an invalid UTF-8 sequence let statistics = OneContainerStats { min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))), max_values: None, num_containers: 1, }; - let batch = - build_statistics_record_batch(&statistics, &required_columns).unwrap(); - assert_snapshot!(batches_to_string(&[batch]), @r" - +--------+ - | s1_min | - +--------+ - | | - +--------+ - "); + let result = + build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); + assert!(result.to_string().contains("invalid UTF-8"), "{}", result); } #[test] From 7102a14375d9fd5a8e89914b830dd2c54594957b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 19:51:12 +0800 Subject: [PATCH 11/33] refactor: simplify struct column statistics handling and improve required columns creation --- datafusion/pruning/src/pruning_predicate.rs | 332 +++++++++----------- 1 file changed, 151 insertions(+), 181 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 83de193b67c1..2aa2c4b44fe6 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -2306,136 +2306,29 @@ mod tests { } } - struct StructColumnStats { - struct_type: DataType, - nested_field: Arc, - struct_min: Arc, - struct_max: Arc, - nested_min: Arc, - nested_max: Arc, - statistics: TestStatistics, - } - - /// Builds statistics for a nested struct column used in the struct column - /// tests. - /// - /// The produced layout mirrors the `struct_col` column in the tests: - /// - /// ```text - /// struct_col: Struct< - /// id: Int32, - /// nested: Struct< - /// flag: Boolean, - /// label: Utf8, - /// >, - /// > - /// ``` - /// - /// The returned [`StructColumnStats`] provides the min/max arrays for both - /// the outer struct and its nested struct values as well as the - /// [`TestStatistics`] wrapper that exposes them to the pruning code. - fn make_struct_stats() -> StructColumnStats { - let flag_field = Arc::new(Field::new("flag", DataType::Boolean, true)); - let label_field = Arc::new(Field::new("label", DataType::Utf8, true)); - let nested_struct_fields = - vec![flag_field.as_ref().clone(), label_field.as_ref().clone()]; - let nested_field = Arc::new(Field::new( - "nested", - DataType::Struct(nested_struct_fields.clone().into()), - true, - )); - let id_field = Arc::new(Field::new("id", DataType::Int32, true)); - let struct_type = DataType::Struct( - vec![id_field.as_ref().clone(), nested_field.as_ref().clone()].into(), + fn make_struct_required_cols( + column_name: &str, + column_index: usize, + fields: Vec<(StatisticsType, Field)>, + min_values: Option, + max_values: Option, + num_containers: usize, + ) -> (RequiredColumns, OneContainerStats) { + let column = phys_expr::Column::new(column_name, column_index); + let required_columns = RequiredColumns::from( + fields + .into_iter() + .map(|(statistics_type, field)| (column.clone(), statistics_type, field)) + .collect::>(), ); - let nested_min = Arc::new(StructArray::from(vec![ - ( - flag_field.clone(), - Arc::new(BooleanArray::from(vec![Some(true), Some(false)])) as ArrayRef, - ), - ( - label_field.clone(), - Arc::new(StringArray::from(vec![Some("alpha"), None])) as ArrayRef, - ), - ])); - - let nested_max = Arc::new(StructArray::from(vec![ - ( - flag_field.clone(), - Arc::new(BooleanArray::from(vec![Some(true), Some(true)])) as ArrayRef, - ), - ( - label_field.clone(), - Arc::new(StringArray::from(vec![Some("omega"), Some("middle")])) - as ArrayRef, - ), - ])); - - let struct_min = Arc::new(StructArray::from(vec![ - ( - id_field.clone(), - Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef, - ), - (nested_field.clone(), nested_min.clone() as ArrayRef), - ])); - - let struct_max = Arc::new(StructArray::from(vec![ - ( - id_field.clone(), - Arc::new(Int32Array::from(vec![Some(10), Some(20)])) as ArrayRef, - ), - (nested_field.clone(), nested_max.clone() as ArrayRef), - ])); - - let container_stats = ContainerStats::new() - .with_min(struct_min.clone() as ArrayRef) - .with_max(struct_max.clone() as ArrayRef); - - let statistics = TestStatistics::new().with("struct_col", container_stats); - - StructColumnStats { - struct_type, - nested_field, - struct_min, - struct_max, - nested_min, - nested_max, - statistics, - } - } - - /// Creates [`RequiredColumns`] entries for the `struct_col` statistics using - /// the provided struct layout and statistics kinds. - /// - /// When paired with [`make_struct_stats`], `struct_type` describes the - /// nested layout `Struct>`. - fn make_struct_required_columns( - struct_type: &DataType, - column_index: usize, - stat_types: &[StatisticsType], - ) -> RequiredColumns { - let column = phys_expr::Column::new("struct_col", column_index); - let required: Vec<_> = stat_types - .iter() - .map(|stat_type| { - let suffix = match stat_type { - StatisticsType::Min => "min", - StatisticsType::Max => "max", - StatisticsType::NullCount => "null_count", - StatisticsType::RowCount => "row_count", - }; - let field_name = format!("struct_col_{suffix}"); - ( - column.clone(), - *stat_type, - Field::new(&field_name, struct_type.clone(), true), - ) - }) - .collect(); + let statistics = OneContainerStats { + min_values, + max_values, + num_containers, + }; - RequiredColumns::from(required) + (required_columns, statistics) } /// Row count should only be referenced once in the pruning expression, even if we need the row count @@ -2677,22 +2570,80 @@ mod tests { #[test] fn test_build_statistics_struct_column() { - let StructColumnStats { - struct_type, - nested_field, - struct_min, - struct_max, - nested_min, - nested_max, - statistics, - } = make_struct_stats(); - - let required_columns = make_struct_required_columns( - &struct_type, - 0, - &[StatisticsType::Min, StatisticsType::Max], + let flag_field = Arc::new(Field::new("flag", DataType::Boolean, true)); + let label_field = Arc::new(Field::new("label", DataType::Utf8, true)); + let nested_struct_fields = + vec![flag_field.as_ref().clone(), label_field.as_ref().clone()]; + let nested_field = Arc::new(Field::new( + "nested", + DataType::Struct(nested_struct_fields.clone().into()), + true, + )); + let id_field = Arc::new(Field::new("id", DataType::Int32, true)); + let struct_type = DataType::Struct( + vec![id_field.as_ref().clone(), nested_field.as_ref().clone()].into(), ); + let nested_min = Arc::new(StructArray::from(vec![ + ( + flag_field.clone(), + Arc::new(BooleanArray::from(vec![Some(true), Some(false)])) as ArrayRef, + ), + ( + label_field.clone(), + Arc::new(StringArray::from(vec![Some("alpha"), None])) as ArrayRef, + ), + ])); + + let nested_max = Arc::new(StructArray::from(vec![ + ( + flag_field.clone(), + Arc::new(BooleanArray::from(vec![Some(true), Some(true)])) as ArrayRef, + ), + ( + label_field.clone(), + Arc::new(StringArray::from(vec![Some("omega"), Some("middle")])) + as ArrayRef, + ), + ])); + + let struct_min = Arc::new(StructArray::from(vec![ + ( + id_field.clone(), + Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef, + ), + (nested_field.clone(), nested_min.clone() as ArrayRef), + ])); + + let struct_max = Arc::new(StructArray::from(vec![ + ( + id_field.clone(), + Arc::new(Int32Array::from(vec![Some(10), Some(20)])) as ArrayRef, + ), + (nested_field.clone(), nested_max.clone() as ArrayRef), + ])); + + let expected_min_array: ArrayRef = struct_min.clone(); + let expected_max_array: ArrayRef = struct_max.clone(); + let num_containers = expected_min_array.len(); + + let (required_columns, statistics) = make_struct_required_cols( + "struct_col", + 0, + vec![ + ( + StatisticsType::Min, + Field::new("struct_col_min", struct_type.clone(), true), + ), + ( + StatisticsType::Max, + Field::new("struct_col_max", struct_type.clone(), true), + ), + ], + Some(expected_min_array.clone()), + Some(expected_max_array.clone()), + num_containers, + ); let batch = build_statistics_record_batch(&statistics, &required_columns).unwrap(); @@ -2721,8 +2672,6 @@ mod tests { assert_eq!(min_nested.data_type(), nested_field.data_type()); assert_eq!(max_nested.data_type(), nested_field.data_type()); - let expected_min_array: ArrayRef = struct_min.clone(); - let expected_max_array: ArrayRef = struct_max.clone(); let expected_nested_min_array: ArrayRef = nested_min.clone(); let expected_nested_max_array: ArrayRef = nested_max.clone(); @@ -2817,25 +2766,34 @@ mod tests { #[test] fn test_build_statistics_struct_incompatible_layout_error() { // Request struct statistics but provide an incompatible nested layout - let struct_type = DataType::Struct( - vec![Field::new( - "nested", - DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), - true, - )] - .into(), + let requested_field = Field::new( + "struct_col_min", + DataType::Struct( + vec![Field::new( + "nested", + DataType::Struct( + vec![Field::new("value", DataType::Int32, true)].into(), + ), + true, + )] + .into(), + ), + true, ); - let required_columns = - make_struct_required_columns(&struct_type, 3, &[StatisticsType::Min]); - let statistics = OneContainerStats { - min_values: Some(Arc::new(StructArray::from(vec![( - Arc::new(Field::new("nested", DataType::Int32, true)), - Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef, - )]))), - max_values: None, - num_containers: 1, - }; + let min_values: ArrayRef = Arc::new(StructArray::from(vec![( + Arc::new(Field::new("nested", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef, + )])); + + let (required_columns, statistics) = make_struct_required_cols( + "struct_col", + 3, + vec![(StatisticsType::Min, requested_field)], + Some(min_values), + None, + 1, + ); let result = build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); @@ -2851,16 +2809,22 @@ mod tests { #[test] fn test_build_statistics_struct_non_struct_source_error() { // Request struct statistics but provide a non-struct statistics array - let struct_type = - DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()); - let required_columns = - make_struct_required_columns(&struct_type, 3, &[StatisticsType::Min]); + let requested_field = Field::new( + "struct_col_min", + DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), + true, + ); - let statistics = OneContainerStats { - min_values: Some(Arc::new(Int32Array::from(vec![Some(1)]))), - max_values: None, - num_containers: 1, - }; + let min_values: ArrayRef = Arc::new(Int32Array::from(vec![Some(1)])); + + let (required_columns, statistics) = make_struct_required_cols( + "struct_col", + 3, + vec![(StatisticsType::Min, requested_field)], + Some(min_values), + None, + 1, + ); let result = build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); @@ -2876,19 +2840,25 @@ mod tests { #[test] fn test_build_statistics_struct_inconsistent_length() { // Ensure mismatched statistics lengths for struct arrays still error out - let struct_type = - DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()); - let required_columns = - make_struct_required_columns(&struct_type, 3, &[StatisticsType::Min]); + let requested_field = Field::new( + "struct_col_min", + DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), + true, + ); - let statistics = OneContainerStats { - min_values: Some(Arc::new(StructArray::from(vec![( - Arc::new(Field::new("value", DataType::Int32, true)), - Arc::new(Int32Array::from(vec![Some(10)])) as ArrayRef, - )]))), - max_values: None, - num_containers: 2, - }; + let min_values: ArrayRef = Arc::new(StructArray::from(vec![( + Arc::new(Field::new("value", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![Some(10)])) as ArrayRef, + )])); + + let (required_columns, statistics) = make_struct_required_cols( + "struct_col", + 3, + vec![(StatisticsType::Min, requested_field)], + Some(min_values), + None, + 2, + ); let result = build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); From 4c3e992377fd6368628cb8b23dedd93b55cc25d0 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 21:41:17 +0800 Subject: [PATCH 12/33] fix: implement safe casting for invalid UTF-8 in statistics handling --- datafusion/pruning/src/pruning_predicate.rs | 68 +++++++-------------- 1 file changed, 21 insertions(+), 47 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 2aa2c4b44fe6..5a45d383b229 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::{ - array::{new_null_array, ArrayRef, BinaryArray, BooleanArray}, + array::{new_null_array, ArrayRef, BooleanArray}, compute::CastOptions, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, @@ -40,7 +40,7 @@ use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::format::DEFAULT_CAST_OPTIONS; use datafusion_common::tree_node::TransformedResult; use datafusion_common::{ - cast_column, internal_datafusion_err, internal_err, plan_datafusion_err, plan_err, + cast_column, internal_err, plan_datafusion_err, plan_err, tree_node::{Transformed, TreeNode}, Column, DFSchema, ScalarValue, }; @@ -909,7 +909,7 @@ fn build_statistics_record_batch( // For each needed statistics column: for (column, statistics_type, stat_field) in required_columns.iter() { let column = Column::from_name(column.name()); - let column_name = column.name().to_string(); + let _column_name = column.name().to_string(); let data_type = stat_field.data_type(); let num_containers = statistics.num_containers(); @@ -932,48 +932,16 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - let array = if data_type == &DataType::Utf8 - && array.data_type() == &DataType::Binary - { - // Statistics from Parquet may store string columns as binary bytes that are not - // guaranteed valid UTF-8. First validate that all bytes are valid UTF-8. - let binary_array = - array - .as_any() - .downcast_ref::() - .ok_or_else(|| { - internal_datafusion_err!( - "Statistics column '{}' expected BinaryArray but found {:?}", - column_name.clone(), - array.data_type() - ) - })?; - - // Check for invalid UTF-8 sequences and return a clear error - if let Some((row, err)) = - binary_array.iter().enumerate().find_map(|(row, value)| { - value.and_then(|bytes| { - str::from_utf8(bytes).err().map(|err| (row, err)) - }) - }) - { - return plan_err!( - "Statistics for column '{}' contains invalid UTF-8 data at row {}: {}", - column_name, - row, - err - ); - } - - // Use cast_column with safe casting to handle edge cases consistently - let cast_options = CastOptions { - safe: true, - ..DEFAULT_CAST_OPTIONS + let array = + if data_type == &DataType::Utf8 && array.data_type() == &DataType::Binary { + let cast_opts = CastOptions { + safe: true, + ..DEFAULT_CAST_OPTIONS + }; + cast_column(&array, stat_field, &cast_opts)? + } else { + cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? }; - cast_column(&array, stat_field, &cast_options)? - } else { - cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? - }; arrays.push(array); } @@ -2716,7 +2684,7 @@ mod tests { } #[test] - fn test_build_statistics_invalid_utf8_error() { + fn test_build_statistics_invalid_utf8_safe_cast() { // Request a record batch for a Utf8 column but provide invalid binary statistics let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("s3", 3), @@ -2731,9 +2699,15 @@ mod tests { num_containers: 1, }; + // With safe casting, invalid UTF-8 should be converted to null rather than erroring let result = - build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); - assert!(result.to_string().contains("invalid UTF-8"), "{}", result); + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + // Verify we got a record batch with null value for the invalid UTF-8 data + assert_eq!(result.num_rows(), 1); + assert_eq!(result.num_columns(), 1); + let column = result.column(0); + assert!(column.is_null(0), "Invalid UTF-8 should be converted to null with safe casting"); } #[test] From ae87b25028445fae01d730d7914a708ced36d97b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 21:41:47 +0800 Subject: [PATCH 13/33] fix: ensure invalid UTF-8 is converted to null in statistics record batch --- datafusion/pruning/src/pruning_predicate.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 5a45d383b229..08df4d622ed3 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -2702,12 +2702,15 @@ mod tests { // With safe casting, invalid UTF-8 should be converted to null rather than erroring let result = build_statistics_record_batch(&statistics, &required_columns).unwrap(); - + // Verify we got a record batch with null value for the invalid UTF-8 data assert_eq!(result.num_rows(), 1); assert_eq!(result.num_columns(), 1); let column = result.column(0); - assert!(column.is_null(0), "Invalid UTF-8 should be converted to null with safe casting"); + assert!( + column.is_null(0), + "Invalid UTF-8 should be converted to null with safe casting" + ); } #[test] From 904e44829c6009bd00e4b36ff72222c01836b97a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 21:44:33 +0800 Subject: [PATCH 14/33] fix: enhance casting logic for statistics handling of binary and large UTF-8 types --- datafusion/pruning/src/pruning_predicate.rs | 57 ++++++++++++++++----- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 08df4d622ed3..3c521685ce1b 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -26,7 +26,6 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::{ array::{new_null_array, ArrayRef, BooleanArray}, - compute::CastOptions, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; @@ -932,16 +931,17 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - let array = - if data_type == &DataType::Utf8 && array.data_type() == &DataType::Binary { - let cast_opts = CastOptions { - safe: true, - ..DEFAULT_CAST_OPTIONS - }; - cast_column(&array, stat_field, &cast_opts)? - } else { - cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? - }; + let is_string = matches!(data_type, DataType::Utf8 | DataType::LargeUtf8); + let is_binary = + matches!(array.data_type(), DataType::Binary | DataType::LargeBinary); + + let array = if is_string && is_binary { + let mut cast_options = DEFAULT_CAST_OPTIONS; + cast_options.safe = true; + cast_column(&array, stat_field, &cast_options)? + } else { + cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? + }; arrays.push(array); } @@ -1886,8 +1886,8 @@ mod tests { use arrow::array::{Array, Decimal128Array}; use arrow::{ array::{ - ArrayRef, BinaryArray, BooleanArray, Int32Array, Int64Array, StringArray, - StructArray, UInt64Array, + ArrayRef, BinaryArray, BooleanArray, Int32Array, Int64Array, + LargeBinaryArray, StringArray, StructArray, UInt64Array, }, datatypes::TimeUnit, }; @@ -2536,6 +2536,37 @@ mod tests { "); } + #[test] + fn test_build_statistics_large_binary_to_large_utf8() { + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("bin", 0), + StatisticsType::Min, + Field::new("bin_min", DataType::LargeUtf8, true), + )]); + + let statistics = TestStatistics::new().with( + "bin", + ContainerStats::new().with_min(Arc::new(LargeBinaryArray::from(vec![ + Some("alpha".as_bytes()), + None, + Some("omega".as_bytes()), + ])) as ArrayRef), + ); + + let batch = + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + assert_snapshot!(batches_to_string(&[batch]), @r" + +---------+ + | bin_min | + +---------+ + | alpha | + | | + | omega | + +---------+ + "); + } + #[test] fn test_build_statistics_struct_column() { let flag_field = Arc::new(Field::new("flag", DataType::Boolean, true)); From edb83b3eebfb558e4d10a205369a6ecedabd92d3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 21:46:31 +0800 Subject: [PATCH 15/33] fix: implement safe casting for invalid UTF-8 bytes in statistics record batch --- datafusion/pruning/src/pruning_predicate.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 3c521685ce1b..8bb0c44770a4 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -936,6 +936,8 @@ fn build_statistics_record_batch( matches!(array.data_type(), DataType::Binary | DataType::LargeBinary); let array = if is_string && is_binary { + // Use safe casting so that any invalid UTF8 bytes are converted to NULL + // rather than producing unchecked string values. let mut cast_options = DEFAULT_CAST_OPTIONS; cast_options.safe = true; cast_column(&array, stat_field, &cast_options)? From d1d1c0b72e1f1346b2c7310dc6ce6afba68b03f3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 21:48:37 +0800 Subject: [PATCH 16/33] fix: replace assertions with assert_contains for better error message validation in tests --- datafusion/pruning/src/pruning_predicate.rs | 29 +++++++-------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 8bb0c44770a4..f91cda9fa055 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -1881,7 +1881,7 @@ mod tests { use std::ops::{Not, Rem}; use super::*; - use datafusion_common::test_util::batches_to_string; + use datafusion_common::{assert_contains, test_util::batches_to_string}; use datafusion_expr::{and, col, lit, or}; use insta::assert_snapshot; @@ -2807,12 +2807,9 @@ mod tests { let result = build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); - assert!( - result.to_string().contains( - "Cannot cast struct field 'nested' from type Int32 to type Struct" - ), - "{}", - result + assert_contains!( + result.to_string(), + "Cannot cast struct field 'nested' from type Int32 to type Struct" ); } @@ -2838,12 +2835,9 @@ mod tests { let result = build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); - assert!( - result - .to_string() - .contains("Cannot cast column of type Int32 to struct type"), - "{}", - result + assert_contains!( + result.to_string(), + "Cannot cast column of type Int32 to struct type" ); } @@ -2872,12 +2866,9 @@ mod tests { let result = build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); - assert!( - result - .to_string() - .contains("mismatched statistics length. Expected 2, got 1"), - "{}", - result + assert_contains!( + result.to_string(), + "mismatched statistics length. Expected 2, got 1" ); } From d9c0beff51c0abfb28b3c3696ecb59a3759fe2b9 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 22:45:19 +0800 Subject: [PATCH 17/33] fix: enhance statistics handling for binary and UTF-8 types, adding safe casting and sanitization for invalid bytes --- datafusion/pruning/src/pruning_predicate.rs | 97 ++++++++++++++++++--- 1 file changed, 83 insertions(+), 14 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index f91cda9fa055..4aa28dc1a9bc 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -23,7 +23,7 @@ use std::collections::HashSet; use std::str; use std::sync::Arc; -use arrow::array::AsArray; +use arrow::array::{Array, AsArray, BinaryViewBuilder}; use arrow::{ array::{new_null_array, ArrayRef, BooleanArray}, datatypes::{DataType, Field, Schema, SchemaRef}, @@ -919,7 +919,8 @@ fn build_statistics_record_batch( StatisticsType::NullCount => statistics.null_counts(&column), StatisticsType::RowCount => statistics.row_counts(&column), }; - let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers)); + let mut array = + array.unwrap_or_else(|| new_null_array(data_type, num_containers)); if num_containers != array.len() { return internal_err!( @@ -931,19 +932,27 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - let is_string = matches!(data_type, DataType::Utf8 | DataType::LargeUtf8); - let is_binary = - matches!(array.data_type(), DataType::Binary | DataType::LargeBinary); + let is_string = matches!( + data_type, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View + ); + let is_binary = matches!( + array.data_type(), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView + ); - let array = if is_string && is_binary { + if is_string && is_binary { + array = sanitize_binary_array_for_utf8(array); + } + + let mut cast_options = DEFAULT_CAST_OPTIONS; + if is_string && is_binary { // Use safe casting so that any invalid UTF8 bytes are converted to NULL // rather than producing unchecked string values. - let mut cast_options = DEFAULT_CAST_OPTIONS; cast_options.safe = true; - cast_column(&array, stat_field, &cast_options)? - } else { - cast_column(&array, stat_field, &DEFAULT_CAST_OPTIONS)? - }; + } + + let array = cast_column(&array, stat_field, &cast_options)?; arrays.push(array); } @@ -960,6 +969,27 @@ fn build_statistics_record_batch( }) } +fn sanitize_binary_array_for_utf8(array: ArrayRef) -> ArrayRef { + match array.data_type() { + DataType::BinaryView => { + let binary_view = array.as_binary_view(); + let mut builder = BinaryViewBuilder::with_capacity(binary_view.len()); + + for value in binary_view.iter() { + match value { + Some(bytes) if str::from_utf8(bytes).is_ok() => { + builder.append_value(bytes) + } + _ => builder.append_null(), + } + } + + Arc::new(builder.finish()) as ArrayRef + } + _ => array, + } +} + struct PruningExpressionBuilder<'a> { column: phys_expr::Column, column_expr: Arc, @@ -1888,7 +1918,7 @@ mod tests { use arrow::array::{Array, Decimal128Array}; use arrow::{ array::{ - ArrayRef, BinaryArray, BooleanArray, Int32Array, Int64Array, + ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Int32Array, Int64Array, LargeBinaryArray, StringArray, StructArray, UInt64Array, }, datatypes::TimeUnit, @@ -2539,7 +2569,7 @@ mod tests { } #[test] - fn test_build_statistics_large_binary_to_large_utf8() { + fn test_build_statistics_large_binary_to_large_utf8_invalid_bytes() { let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("bin", 0), StatisticsType::Min, @@ -2550,7 +2580,7 @@ mod tests { "bin", ContainerStats::new().with_min(Arc::new(LargeBinaryArray::from(vec![ Some("alpha".as_bytes()), - None, + Some(&[0xf0, 0x28, 0x8c, 0x28]), Some("omega".as_bytes()), ])) as ArrayRef), ); @@ -2558,6 +2588,10 @@ mod tests { let batch = build_statistics_record_batch(&statistics, &required_columns).unwrap(); + assert!(!batch.column(0).is_null(0)); + assert!(batch.column(0).is_null(1)); + assert!(!batch.column(0).is_null(2)); + assert_snapshot!(batches_to_string(&[batch]), @r" +---------+ | bin_min | @@ -2569,6 +2603,41 @@ mod tests { "); } + #[test] + fn test_build_statistics_binary_view_to_utf8_view_invalid_bytes() { + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("bin_view", 0), + StatisticsType::Min, + Field::new("bin_view_min", DataType::Utf8View, true), + )]); + + let statistics = TestStatistics::new().with( + "bin_view", + ContainerStats::new().with_min(Arc::new(BinaryViewArray::from(vec![ + Some("alpha".as_bytes()), + Some(&[0xf0, 0x28, 0x8c, 0x28]), + Some("omega".as_bytes()), + ])) as ArrayRef), + ); + + let batch = + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + assert!(!batch.column(0).is_null(0)); + assert!(batch.column(0).is_null(1)); + assert!(!batch.column(0).is_null(2)); + + assert_snapshot!(batches_to_string(&[batch]), @r" + +--------------+ + | bin_view_min | + +--------------+ + | alpha | + | | + | omega | + +--------------+ + "); + } + #[test] fn test_build_statistics_struct_column() { let flag_field = Arc::new(Field::new("flag", DataType::Boolean, true)); From 0d365d5405f8409145ba835869f824fbdf6c13cf Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 22:45:55 +0800 Subject: [PATCH 18/33] fix: replace assert with assert_contains for improved error message clarity in statistics tests --- datafusion/pruning/src/pruning_predicate.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 4aa28dc1a9bc..9b1b2c5f19a6 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -2833,12 +2833,9 @@ mod tests { let result = build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); - assert!( - result - .to_string() - .contains("mismatched statistics length. Expected 3, got 1"), - "{}", - result + assert_contains!( + result.to_string(), + "mismatched statistics length. Expected 3, got 1" ); } From d9def1f603c9b2fdbe5bb5289bd89f4c58f459cf Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 22:47:42 +0800 Subject: [PATCH 19/33] fix: remove unused variable in build_statistics_record_batch function --- datafusion/pruning/src/pruning_predicate.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 9b1b2c5f19a6..1db787bfd50f 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -908,7 +908,6 @@ fn build_statistics_record_batch( // For each needed statistics column: for (column, statistics_type, stat_field) in required_columns.iter() { let column = Column::from_name(column.name()); - let _column_name = column.name().to_string(); let data_type = stat_field.data_type(); let num_containers = statistics.num_containers(); From f74b7b2a84d979a8984893b19d7651d88cc93103 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 22:48:08 +0800 Subject: [PATCH 20/33] fix: remove unused imports in pruning_predicate.rs for cleaner code --- datafusion/pruning/src/pruning_predicate.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 1db787bfd50f..a88ca6328b4e 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -20,7 +20,6 @@ //! //! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html use std::collections::HashSet; -use std::str; use std::sync::Arc; use arrow::array::{Array, AsArray, BinaryViewBuilder}; @@ -37,7 +36,6 @@ use log::{debug, trace}; use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::format::DEFAULT_CAST_OPTIONS; -use datafusion_common::tree_node::TransformedResult; use datafusion_common::{ cast_column, internal_err, plan_datafusion_err, plan_err, tree_node::{Transformed, TreeNode}, From f0877331123d5ba29e6b15419768a2a397f8caef Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 16 Sep 2025 22:50:09 +0800 Subject: [PATCH 21/33] fix: add TransformedResult to datafusion_common tree_node imports for enhanced functionality --- datafusion/pruning/src/pruning_predicate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index a88ca6328b4e..6816f29ff736 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -38,7 +38,7 @@ use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::format::DEFAULT_CAST_OPTIONS; use datafusion_common::{ cast_column, internal_err, plan_datafusion_err, plan_err, - tree_node::{Transformed, TreeNode}, + tree_node::{Transformed, TransformedResult, TreeNode}, Column, DFSchema, ScalarValue, }; use datafusion_expr_common::operator::Operator; From 8a36e518eb1492470cb602f7cb367256a330fbad Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 10:05:06 +0800 Subject: [PATCH 22/33] fix: remove TransformedResult import for cleaner code in pruning_predicate.rs --- datafusion/pruning/src/pruning_predicate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 6816f29ff736..a88ca6328b4e 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -38,7 +38,7 @@ use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::format::DEFAULT_CAST_OPTIONS; use datafusion_common::{ cast_column, internal_err, plan_datafusion_err, plan_err, - tree_node::{Transformed, TransformedResult, TreeNode}, + tree_node::{Transformed, TreeNode}, Column, DFSchema, ScalarValue, }; use datafusion_expr_common::operator::Operator; From f465e8735a36893d91e8e15468cc75e638c2b0b4 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 10:08:31 +0800 Subject: [PATCH 23/33] fix: add TransformedResult import in pruning_predicate.rs for improved functionality --- datafusion/pruning/src/pruning_predicate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index a88ca6328b4e..6816f29ff736 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -38,7 +38,7 @@ use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::format::DEFAULT_CAST_OPTIONS; use datafusion_common::{ cast_column, internal_err, plan_datafusion_err, plan_err, - tree_node::{Transformed, TreeNode}, + tree_node::{Transformed, TransformedResult, TreeNode}, Column, DFSchema, ScalarValue, }; use datafusion_expr_common::operator::Operator; From d85c342b3f2afa77e21c599d8149738aadcf9545 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 10:08:53 +0800 Subject: [PATCH 24/33] feat: add sanitize_binary_array_for_utf8 function to preprocess BinaryViewArray statistics --- datafusion/pruning/src/pruning_predicate.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 6816f29ff736..bca299b1d95a 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -966,6 +966,9 @@ fn build_statistics_record_batch( }) } +/// Preprocesses `BinaryViewArray` statistics so that invalid UTF-8 sequences are +/// converted to nulls before the array is cast to UTF-8 strings. +/// Binary arrays with other storage representations are returned unchanged. fn sanitize_binary_array_for_utf8(array: ArrayRef) -> ArrayRef { match array.data_type() { DataType::BinaryView => { From 27f8004a677cc924dea67eee7a7d6d707d5fc254 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 10:09:19 +0800 Subject: [PATCH 25/33] fix: update condition to sanitize binary arrays for UTF-8 compatibility --- datafusion/pruning/src/pruning_predicate.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index bca299b1d95a..44c6f32ac7a7 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -937,8 +937,9 @@ fn build_statistics_record_batch( array.data_type(), DataType::Binary | DataType::LargeBinary | DataType::BinaryView ); + let is_binary_view = matches!(array.data_type(), DataType::BinaryView); - if is_string && is_binary { + if is_string && is_binary_view { array = sanitize_binary_array_for_utf8(array); } From 5be80d5b34d13e6dd47918ac9ee48df6cb642686 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 10:18:51 +0800 Subject: [PATCH 26/33] test: add unit test for building statistics from BinaryViewArray to Utf8View --- datafusion/pruning/src/pruning_predicate.rs | 38 ++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 44c6f32ac7a7..8d02b4e2df13 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -1920,7 +1920,7 @@ mod tests { use arrow::{ array::{ ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Int32Array, Int64Array, - LargeBinaryArray, StringArray, StructArray, UInt64Array, + LargeBinaryArray, StringArray, StringViewArray, StructArray, UInt64Array, }, datatypes::TimeUnit, }; @@ -2639,6 +2639,42 @@ mod tests { "); } + #[test] + fn test_build_statistics_binary_view_to_utf8_view_valid_bytes() { + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("bin_view", 0), + StatisticsType::Min, + Field::new("bin_view_min", DataType::Utf8View, true), + )]); + + let input_strings = vec!["alpha", "beta", "gamma"]; + let input_bytes = input_strings + .iter() + .map(|value| Some(value.as_bytes())) + .collect::>(); + + let statistics = TestStatistics::new().with( + "bin_view", + ContainerStats::new().with_min( + Arc::new(BinaryViewArray::from(input_bytes.clone())) as ArrayRef, + ), + ); + + let batch = + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + let utf8_array = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("StringViewArray"); + + for (idx, expected) in input_strings.iter().enumerate() { + assert!(!utf8_array.is_null(idx)); + assert_eq!(utf8_array.value(idx), *expected); + } + } + #[test] fn test_build_statistics_struct_column() { let flag_field = Arc::new(Field::new("flag", DataType::Boolean, true)); From f3afa052e9fbfdc6dc9c047776c5701b8c7eca2d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 10:35:01 +0800 Subject: [PATCH 27/33] fix: enhance sanitize_binary_array_for_utf8 to check for invalid UTF-8 bytes in BinaryViewArray --- datafusion/pruning/src/pruning_predicate.rs | 22 ++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 8d02b4e2df13..e587aadf1e7a 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -22,7 +22,7 @@ use std::collections::HashSet; use std::sync::Arc; -use arrow::array::{Array, AsArray, BinaryViewBuilder}; +use arrow::array::{Array, AsArray, BinaryViewArray, BinaryViewBuilder}; use arrow::{ array::{new_null_array, ArrayRef, BooleanArray}, datatypes::{DataType, Field, Schema, SchemaRef}, @@ -973,7 +973,16 @@ fn build_statistics_record_batch( fn sanitize_binary_array_for_utf8(array: ArrayRef) -> ArrayRef { match array.data_type() { DataType::BinaryView => { - let binary_view = array.as_binary_view(); + let binary_view: &BinaryViewArray = array.as_binary_view(); + + let has_invalid_bytes = binary_view.iter().any( + |value| matches!(value, Some(bytes) if str::from_utf8(bytes).is_err()), + ); + + if !has_invalid_bytes { + return array; + } + let mut builder = BinaryViewBuilder::with_capacity(binary_view.len()); for value in binary_view.iter() { @@ -2653,11 +2662,14 @@ mod tests { .map(|value| Some(value.as_bytes())) .collect::>(); + let binary_view_array: ArrayRef = + Arc::new(BinaryViewArray::from(input_bytes)) as ArrayRef; + let sanitized = sanitize_binary_array_for_utf8(Arc::clone(&binary_view_array)); + assert!(Arc::ptr_eq(&binary_view_array, &sanitized)); + let statistics = TestStatistics::new().with( "bin_view", - ContainerStats::new().with_min( - Arc::new(BinaryViewArray::from(input_bytes.clone())) as ArrayRef, - ), + ContainerStats::new().with_min(binary_view_array), ); let batch = From d0c62275060ade4503202f1a162697fc06544a07 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 11:39:22 +0800 Subject: [PATCH 28/33] feat: enhance struct casting to support safe UTF-8 conversion from BinaryViewArray --- datafusion/common/src/nested_struct.rs | 71 +++++++++- datafusion/pruning/src/pruning_predicate.rs | 141 ++++++++++++++------ 2 files changed, 162 insertions(+), 50 deletions(-) diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index 6e8a380df921..a4a6c5bbd38a 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -17,11 +17,13 @@ use crate::error::{Result, _plan_err}; use arrow::{ - array::{new_null_array, Array, ArrayRef, StructArray}, + array::{ + new_null_array, Array, ArrayRef, AsArray as _, BinaryViewBuilder, StructArray, + }, compute::{cast_with_options, CastOptions}, - datatypes::{DataType::Struct, Field, FieldRef}, + datatypes::{DataType, Field, FieldRef}, }; -use std::sync::Arc; +use std::{str, sync::Arc}; /// Cast a struct column to match target struct fields, handling nested structs recursively. /// @@ -151,9 +153,30 @@ pub fn cast_column( cast_options: &CastOptions, ) -> Result { match target_field.data_type() { - Struct(target_fields) => { + DataType::Struct(target_fields) => { cast_struct_column(source_col, target_fields, cast_options) } + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { + let mut options = cast_options.clone(); + let mut source: ArrayRef = Arc::clone(source_col); + + if matches!( + source_col.data_type(), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView + ) { + options.safe = true; + + if matches!(source_col.data_type(), DataType::BinaryView) { + source = sanitize_binary_array_for_utf8(source); + } + } + + Ok(cast_with_options( + &source, + target_field.data_type(), + &options, + )?) + } _ => Ok(cast_with_options( source_col, target_field.data_type(), @@ -162,6 +185,42 @@ pub fn cast_column( } } +/// Preprocesses `BinaryViewArray` statistics so that invalid UTF-8 sequences are +/// converted to nulls before casting to a UTF-8 string array. +/// +/// Other binary array representations are returned unchanged as Arrow's safe +/// casts already convert invalid UTF-8 sequences to null. +pub fn sanitize_binary_array_for_utf8(array: ArrayRef) -> ArrayRef { + match array.data_type() { + DataType::BinaryView => { + let binary_view = array.as_binary_view(); + + // Check if all bytes are already valid UTF-8 + let has_invalid_bytes = binary_view.iter().any( + |value| matches!(value, Some(bytes) if str::from_utf8(bytes).is_err()), + ); + + if !has_invalid_bytes { + return array; + } + + let mut builder = BinaryViewBuilder::with_capacity(binary_view.len()); + + for value in binary_view.iter() { + match value { + Some(bytes) if str::from_utf8(bytes).is_ok() => { + builder.append_value(bytes) + } + _ => builder.append_null(), + } + } + + Arc::new(builder.finish()) as ArrayRef + } + _ => array, + } +} + /// Validates compatibility between source and target struct fields for casting operations. /// /// This function implements comprehensive struct compatibility checking by examining: @@ -220,7 +279,7 @@ pub fn validate_struct_compatibility( // Check if the matching field types are compatible match (source_field.data_type(), target_field.data_type()) { // Recursively validate nested structs - (Struct(source_nested), Struct(target_nested)) => { + (DataType::Struct(source_nested), DataType::Struct(target_nested)) => { validate_struct_compatibility(source_nested, target_nested)?; } // For non-struct types, use the existing castability check @@ -284,7 +343,7 @@ mod tests { } fn struct_type(fields: Vec) -> DataType { - Struct(fields.into()) + DataType::Struct(fields.into()) } fn struct_field(name: &str, fields: Vec) -> Field { diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index e587aadf1e7a..964b673296e2 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -22,7 +22,7 @@ use std::collections::HashSet; use std::sync::Arc; -use arrow::array::{Array, AsArray, BinaryViewArray, BinaryViewBuilder}; +use arrow::array::{Array, AsArray}; use arrow::{ array::{new_null_array, ArrayRef, BooleanArray}, datatypes::{DataType, Field, Schema, SchemaRef}, @@ -41,6 +41,9 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode}, Column, DFSchema, ScalarValue, }; + +#[cfg(test)] +use datafusion_common::nested_struct::sanitize_binary_array_for_utf8; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; @@ -916,8 +919,7 @@ fn build_statistics_record_batch( StatisticsType::NullCount => statistics.null_counts(&column), StatisticsType::RowCount => statistics.row_counts(&column), }; - let mut array = - array.unwrap_or_else(|| new_null_array(data_type, num_containers)); + let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers)); if num_containers != array.len() { return internal_err!( @@ -937,12 +939,6 @@ fn build_statistics_record_batch( array.data_type(), DataType::Binary | DataType::LargeBinary | DataType::BinaryView ); - let is_binary_view = matches!(array.data_type(), DataType::BinaryView); - - if is_string && is_binary_view { - array = sanitize_binary_array_for_utf8(array); - } - let mut cast_options = DEFAULT_CAST_OPTIONS; if is_string && is_binary { // Use safe casting so that any invalid UTF8 bytes are converted to NULL @@ -967,39 +963,6 @@ fn build_statistics_record_batch( }) } -/// Preprocesses `BinaryViewArray` statistics so that invalid UTF-8 sequences are -/// converted to nulls before the array is cast to UTF-8 strings. -/// Binary arrays with other storage representations are returned unchanged. -fn sanitize_binary_array_for_utf8(array: ArrayRef) -> ArrayRef { - match array.data_type() { - DataType::BinaryView => { - let binary_view: &BinaryViewArray = array.as_binary_view(); - - let has_invalid_bytes = binary_view.iter().any( - |value| matches!(value, Some(bytes) if str::from_utf8(bytes).is_err()), - ); - - if !has_invalid_bytes { - return array; - } - - let mut builder = BinaryViewBuilder::with_capacity(binary_view.len()); - - for value in binary_view.iter() { - match value { - Some(bytes) if str::from_utf8(bytes).is_ok() => { - builder.append_value(bytes) - } - _ => builder.append_null(), - } - } - - Arc::new(builder.finish()) as ArrayRef - } - _ => array, - } -} - struct PruningExpressionBuilder<'a> { column: phys_expr::Column, column_expr: Arc, @@ -1928,8 +1891,9 @@ mod tests { use arrow::array::{Array, Decimal128Array}; use arrow::{ array::{ - ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Int32Array, Int64Array, - LargeBinaryArray, StringArray, StringViewArray, StructArray, UInt64Array, + ArrayRef, BinaryArray, BinaryViewArray, BinaryViewBuilder, BooleanArray, + Int32Array, Int64Array, LargeBinaryArray, StringArray, StringViewArray, + StructArray, UInt64Array, }, datatypes::TimeUnit, }; @@ -2864,6 +2828,95 @@ mod tests { ); } + #[test] + fn test_build_statistics_nested_invalid_utf8_safe_cast() { + fn run_case(binary_array: ArrayRef, binary_data_type: DataType, case_name: &str) { + let num_containers = binary_array.len(); + + let label_source_field = + Arc::new(Field::new("label", binary_data_type, true)); + let label_target_field = Arc::new(Field::new("label", DataType::Utf8, true)); + let nested_source_field = Arc::new(Field::new( + "nested", + DataType::Struct(vec![label_source_field.as_ref().clone()].into()), + true, + )); + let nested_target_field = Arc::new(Field::new( + "nested", + DataType::Struct(vec![label_target_field.as_ref().clone()].into()), + true, + )); + let target_struct_type = + DataType::Struct(vec![nested_target_field.as_ref().clone()].into()); + + let nested_values: ArrayRef = Arc::new(StructArray::from(vec![( + Arc::clone(&label_source_field), + Arc::clone(&binary_array), + )])); + let struct_min: ArrayRef = Arc::new(StructArray::from(vec![( + Arc::clone(&nested_source_field), + Arc::clone(&nested_values), + )])); + + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("struct_col", 0), + StatisticsType::Min, + Field::new("struct_col_min", target_struct_type.clone(), true), + )]); + + let statistics = OneContainerStats { + min_values: Some(struct_min), + max_values: None, + num_containers, + }; + + let batch = + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + assert_eq!(batch.num_columns(), 1, "{case_name}: expected one column"); + assert_eq!(batch.num_rows(), num_containers); + + let struct_array = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("StructArray"); + assert_eq!(struct_array.data_type(), &target_struct_type); + + let nested_array = struct_array + .column(0) + .as_ref() + .as_any() + .downcast_ref::() + .expect("nested StructArray"); + let label_array = nested_array + .column(0) + .as_ref() + .as_any() + .downcast_ref::() + .expect("StringArray"); + + assert!( + label_array.is_null(0), + "{case_name}: invalid UTF-8 should cast to NULL", + ); + assert!(label_array.is_valid(1)); + assert_eq!(label_array.value(1), "valid"); + } + + let binary_array: ArrayRef = Arc::new(BinaryArray::from(vec![ + Some(&[0xFFu8][..]), + Some(b"valid".as_slice()), + ])); + run_case(binary_array, DataType::Binary, "BinaryArray"); + + let mut builder = BinaryViewBuilder::with_capacity(2); + builder.append_value(&[0xFF]); + builder.append_value(b"valid"); + let binary_view_array: ArrayRef = Arc::new(builder.finish()); + run_case(binary_view_array, DataType::BinaryView, "BinaryViewArray"); + } + #[test] fn test_build_statistics_inconsistent_length() { // return an inconsistent length to the actual statistics arrays From 7c81a5195be5622193fe56e9e68cc73d22deea01 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 12:35:25 +0800 Subject: [PATCH 29/33] refactor: reorganize imports in pruning_predicate.rs for clarity --- datafusion/pruning/src/pruning_predicate.rs | 26 +++++++++------------ 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 964b673296e2..1b550cb0652b 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -19,31 +19,28 @@ //! based on statistics (e.g. Parquet Row Groups) //! //! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html -use std::collections::HashSet; -use std::sync::Arc; - -use arrow::array::{Array, AsArray}; use arrow::{ - array::{new_null_array, ArrayRef, BooleanArray}, + array::{new_null_array, Array, ArrayRef, AsArray, BooleanArray}, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; +use std::{collections::HashSet, sync::Arc}; // pub use for backwards compatibility pub use datafusion_common::pruning::PruningStatistics; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_plan::metrics::Count; use log::{debug, trace}; -use datafusion_common::error::{DataFusionError, Result}; -use datafusion_common::format::DEFAULT_CAST_OPTIONS; +#[cfg(test)] +use datafusion_common::nested_struct::sanitize_binary_array_for_utf8; use datafusion_common::{ - cast_column, internal_err, plan_datafusion_err, plan_err, + cast_column, + error::{DataFusionError, Result}, + format::DEFAULT_CAST_OPTIONS, + internal_err, plan_datafusion_err, plan_err, tree_node::{Transformed, TransformedResult, TreeNode}, Column, DFSchema, ScalarValue, }; - -#[cfg(test)] -use datafusion_common::nested_struct::sanitize_binary_array_for_utf8; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; @@ -1888,12 +1885,11 @@ mod tests { use datafusion_expr::{and, col, lit, or}; use insta::assert_snapshot; - use arrow::array::{Array, Decimal128Array}; use arrow::{ array::{ - ArrayRef, BinaryArray, BinaryViewArray, BinaryViewBuilder, BooleanArray, - Int32Array, Int64Array, LargeBinaryArray, StringArray, StringViewArray, - StructArray, UInt64Array, + Array, ArrayRef, BinaryArray, BinaryViewArray, BinaryViewBuilder, + BooleanArray, Decimal128Array, Int32Array, Int64Array, LargeBinaryArray, + StringArray, StringViewArray, StructArray, UInt64Array, }, datatypes::TimeUnit, }; From bf499f5f30dcea0167c7f6bdb1c7d3b042f1bc12 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 13:58:19 +0800 Subject: [PATCH 30/33] refactor: simplify array initialization in tests for clarity --- datafusion/pruning/src/pruning_predicate.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 1b550cb0652b..44a54615d719 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -2616,7 +2616,7 @@ mod tests { Field::new("bin_view_min", DataType::Utf8View, true), )]); - let input_strings = vec!["alpha", "beta", "gamma"]; + let input_strings = ["alpha", "beta", "gamma"]; let input_bytes = input_strings .iter() .map(|value| Some(value.as_bytes())) @@ -2907,7 +2907,7 @@ mod tests { run_case(binary_array, DataType::Binary, "BinaryArray"); let mut builder = BinaryViewBuilder::with_capacity(2); - builder.append_value(&[0xFF]); + builder.append_value([0xFF]); builder.append_value(b"valid"); let binary_view_array: ArrayRef = Arc::new(builder.finish()); run_case(binary_view_array, DataType::BinaryView, "BinaryViewArray"); From 1751bcf91a45ebae8f2ac8f6eeae7746bee1f7e1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 14:19:24 +0800 Subject: [PATCH 31/33] refactor: improve documentation for sanitize_binary_array_for_utf8 to clarify behavior with invalid UTF-8 --- datafusion/common/src/nested_struct.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index 2b0f04e39159..14b3c498d29d 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -185,11 +185,12 @@ pub fn cast_column( } } -/// Preprocesses `BinaryViewArray` statistics so that invalid UTF-8 sequences are -/// converted to nulls before casting to a UTF-8 string array. +/// Sanitizes a `BinaryView` array so that any element containing invalid UTF-8 +/// is converted to null before casting to a UTF-8 string array. /// -/// Other binary array representations are returned unchanged as Arrow's safe -/// casts already convert invalid UTF-8 sequences to null. +/// This only transforms the array's values (not any external statistics). Other +/// binary array representations are returned unchanged because Arrow's safe +/// casts already convert invalid UTF-8 sequences to null for those types. pub fn sanitize_binary_array_for_utf8(array: ArrayRef) -> ArrayRef { match array.data_type() { DataType::BinaryView => { From 5cb940b02a8206f38ad48a1c924864c3e48f2244 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 18 Sep 2025 14:34:55 +0800 Subject: [PATCH 32/33] refactor: enhance test documentation for statistics casting and handling invalid UTF-8 --- datafusion/pruning/src/pruning_predicate.rs | 41 +++++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 44a54615d719..30d9811fdf78 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -2540,6 +2540,12 @@ mod tests { #[test] fn test_build_statistics_large_binary_to_large_utf8_invalid_bytes() { + // When casting binary/byte statistics to Utf8/large-utf8, invalid + // UTF-8 sequences should be converted to nulls instead of causing + // an error. This test supplies a LargeBinaryArray containing an + // invalid byte sequence and ensures the corresponding Utf8 + // output column contains a NULL for that entry while preserving + // valid strings. let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("bin", 0), StatisticsType::Min, @@ -2575,6 +2581,9 @@ mod tests { #[test] fn test_build_statistics_binary_view_to_utf8_view_invalid_bytes() { + // Similar to `test_build_statistics_large_binary_to_large_utf8_invalid_bytes`, + // but exercises the BinaryView -> Utf8View conversion path. Invalid + // UTF-8 in the source should produce NULLs in the Utf8View output. let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("bin_view", 0), StatisticsType::Min, @@ -2610,6 +2619,11 @@ mod tests { #[test] fn test_build_statistics_binary_view_to_utf8_view_valid_bytes() { + // If the underlying BinaryViewArray already contains valid UTF-8 + // bytes we should avoid unnecessary cloning or conversion. This test + // verifies that `sanitize_binary_array_for_utf8` returns the same + // array pointer when the bytes are already valid UTF-8 and that + // the resulting record batch contains the expected string values. let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("bin_view", 0), StatisticsType::Min, @@ -2649,6 +2663,10 @@ mod tests { #[test] fn test_build_statistics_struct_column() { + // Verify that struct-typed statistics are preserved and reconstructed + // correctly into StructArray columns in the record batch. The test + // constructs nested struct min/max arrays and ensures the produced + // columns keep the nested layout and values. let flag_field = Arc::new(Field::new("flag", DataType::Boolean, true)); let label_field = Arc::new(Field::new("label", DataType::Utf8, true)); let nested_struct_fields = @@ -2796,7 +2814,10 @@ mod tests { #[test] fn test_build_statistics_invalid_utf8_safe_cast() { - // Request a record batch for a Utf8 column but provide invalid binary statistics + // When casting binary statistics to Utf8 with 'safe' semantics, invalid + // UTF-8 byte sequences should be converted to NULL values rather than + // causing an error. This test feeds an invalid single-byte sequence + // and asserts the produced Utf8 column contains a NULL. let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("s3", 3), StatisticsType::Min, @@ -2826,6 +2847,11 @@ mod tests { #[test] fn test_build_statistics_nested_invalid_utf8_safe_cast() { + // Ensure nested struct statistics that contain binary fields with + // invalid UTF-8 are cast safely to Utf8, converting invalid + // entries to NULL while preserving valid entries. This exercises + // nested struct casting and the safe-cast behavior for Binary and + // BinaryView arrays. fn run_case(binary_array: ArrayRef, binary_data_type: DataType, case_name: &str) { let num_containers = binary_array.len(); @@ -2939,7 +2965,10 @@ mod tests { #[test] fn test_build_statistics_struct_incompatible_layout_error() { - // Request struct statistics but provide an incompatible nested layout + // Request struct statistics but provide a source struct layout that + // doesn't match the requested nested layout. This should produce + // an explanatory cast error indicating the incompatible nested + // field types. let requested_field = Field::new( "struct_col_min", DataType::Struct( @@ -2979,7 +3008,9 @@ mod tests { #[test] fn test_build_statistics_struct_non_struct_source_error() { - // Request struct statistics but provide a non-struct statistics array + // Request struct statistics but provide a non-struct (primitive) + // statistics array as the underlying data. This should error out + // because a primitive array cannot be cast to a struct layout. let requested_field = Field::new( "struct_col_min", DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), @@ -3007,7 +3038,9 @@ mod tests { #[test] fn test_build_statistics_struct_inconsistent_length() { - // Ensure mismatched statistics lengths for struct arrays still error out + // Verify that struct statistics arrays with mismatched lengths (the + // `num_containers` value vs the actual struct array length) result + // in a clear error message indicating the mismatch. let requested_field = Field::new( "struct_col_min", DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), From e1b53d2463e6d09ff99a7285c91a17680d920e38 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 22 Sep 2025 18:39:21 +0800 Subject: [PATCH 33/33] refactor: add comments to clarify manual UTF-8 validation in sanitize_binary_array_for_utf8 --- datafusion/common/src/nested_struct.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index 14b3c498d29d..08f47d7a75c4 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -197,6 +197,9 @@ pub fn sanitize_binary_array_for_utf8(array: ArrayRef) -> ArrayRef { let binary_view = array.as_binary_view(); // Check if all bytes are already valid UTF-8 + // Manual validation is required for BinaryView because Arrow's safe cast + // doesn't handle invalid UTF-8 sequences properly for this array type + // See: https://github.com/apache/arrow-rs/issues/8403 let has_invalid_bytes = binary_view.iter().any( |value| matches!(value, Some(bytes) if str::from_utf8(bytes).is_err()), );