diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index 38060e370bfa..08f47d7a75c4 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -17,11 +17,13 @@ use crate::error::{Result, _plan_err}; use arrow::{ - array::{new_null_array, Array, ArrayRef, StructArray}, + array::{ + new_null_array, Array, ArrayRef, AsArray as _, BinaryViewBuilder, StructArray, + }, compute::{cast_with_options, CastOptions}, - datatypes::{DataType::Struct, Field, FieldRef}, + datatypes::{DataType, Field, FieldRef}, }; -use std::sync::Arc; +use std::{str, sync::Arc}; /// Cast a struct column to match target struct fields, handling nested structs recursively. /// @@ -151,9 +153,30 @@ pub fn cast_column( cast_options: &CastOptions, ) -> Result { match target_field.data_type() { - Struct(target_fields) => { + DataType::Struct(target_fields) => { cast_struct_column(source_col, target_fields, cast_options) } + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { + let mut options = cast_options.clone(); + let mut source: ArrayRef = Arc::clone(source_col); + + if matches!( + source_col.data_type(), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView + ) { + options.safe = true; + + if matches!(source_col.data_type(), DataType::BinaryView) { + source = sanitize_binary_array_for_utf8(source); + } + } + + Ok(cast_with_options( + &source, + target_field.data_type(), + &options, + )?) + } _ => Ok(cast_with_options( source_col, target_field.data_type(), @@ -162,6 +185,46 @@ pub fn cast_column( } } +/// Sanitizes a `BinaryView` array so that any element containing invalid UTF-8 +/// is converted to null before casting to a UTF-8 string array. +/// +/// This only transforms the array's values (not any external statistics). Other +/// binary array representations are returned unchanged because Arrow's safe +/// casts already convert invalid UTF-8 sequences to null for those types. +pub fn sanitize_binary_array_for_utf8(array: ArrayRef) -> ArrayRef { + match array.data_type() { + DataType::BinaryView => { + let binary_view = array.as_binary_view(); + + // Check if all bytes are already valid UTF-8 + // Manual validation is required for BinaryView because Arrow's safe cast + // doesn't handle invalid UTF-8 sequences properly for this array type + // See: https://github.com/apache/arrow-rs/issues/8403 + let has_invalid_bytes = binary_view.iter().any( + |value| matches!(value, Some(bytes) if str::from_utf8(bytes).is_err()), + ); + + if !has_invalid_bytes { + return array; + } + + let mut builder = BinaryViewBuilder::with_capacity(binary_view.len()); + + for value in binary_view.iter() { + match value { + Some(bytes) if str::from_utf8(bytes).is_ok() => { + builder.append_value(bytes) + } + _ => builder.append_null(), + } + } + + Arc::new(builder.finish()) as ArrayRef + } + _ => array, + } +} + /// Validates compatibility between source and target struct fields for casting operations. /// /// This function implements comprehensive struct compatibility checking by examining: @@ -220,7 +283,7 @@ pub fn validate_struct_compatibility( // Check if the matching field types are compatible match (source_field.data_type(), target_field.data_type()) { // Recursively validate nested structs - (Struct(source_nested), Struct(target_nested)) => { + (DataType::Struct(source_nested), DataType::Struct(target_nested)) => { validate_struct_compatibility(source_nested, target_nested)?; } // For non-struct types, use the existing castability check @@ -284,7 +347,7 @@ mod tests { } fn struct_type(fields: Vec) -> DataType { - Struct(fields.into()) + DataType::Struct(fields.into()) } fn struct_field(name: &str, fields: Vec) -> Field { diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 5e92dbe227fd..30d9811fdf78 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -19,29 +19,28 @@ //! based on statistics (e.g. Parquet Row Groups) //! //! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html -use std::collections::HashSet; -use std::sync::Arc; - -use arrow::array::AsArray; use arrow::{ - array::{new_null_array, ArrayRef, BooleanArray}, + array::{new_null_array, Array, ArrayRef, AsArray, BooleanArray}, datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; +use std::{collections::HashSet, sync::Arc}; // pub use for backwards compatibility pub use datafusion_common::pruning::PruningStatistics; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_plan::metrics::Count; use log::{debug, trace}; -use datafusion_common::error::{DataFusionError, Result}; -use datafusion_common::tree_node::TransformedResult; +#[cfg(test)] +use datafusion_common::nested_struct::sanitize_binary_array_for_utf8; use datafusion_common::{ + cast_column, + error::{DataFusionError, Result}, + format::DEFAULT_CAST_OPTIONS, internal_err, plan_datafusion_err, plan_err, - tree_node::{Transformed, TreeNode}, - ScalarValue, + tree_node::{Transformed, TransformedResult, TreeNode}, + Column, DFSchema, ScalarValue, }; -use datafusion_common::{Column, DFSchema}; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; @@ -929,7 +928,22 @@ fn build_statistics_record_batch( // cast statistics array to required data type (e.g. parquet // provides timestamp statistics as "Int64") - let array = arrow::compute::cast(&array, data_type)?; + let is_string = matches!( + data_type, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View + ); + let is_binary = matches!( + array.data_type(), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView + ); + let mut cast_options = DEFAULT_CAST_OPTIONS; + if is_string && is_binary { + // Use safe casting so that any invalid UTF8 bytes are converted to NULL + // rather than producing unchecked string values. + cast_options.safe = true; + } + + let array = cast_column(&array, stat_field, &cast_options)?; arrays.push(array); } @@ -1867,13 +1881,16 @@ mod tests { use std::ops::{Not, Rem}; use super::*; - use datafusion_common::test_util::batches_to_string; + use datafusion_common::{assert_contains, test_util::batches_to_string}; use datafusion_expr::{and, col, lit, or}; use insta::assert_snapshot; - use arrow::array::Decimal128Array; use arrow::{ - array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array}, + array::{ + Array, ArrayRef, BinaryArray, BinaryViewArray, BinaryViewBuilder, + BooleanArray, Decimal128Array, Int32Array, Int64Array, LargeBinaryArray, + StringArray, StringViewArray, StructArray, UInt64Array, + }, datatypes::TimeUnit, }; use datafusion_expr::expr::InList; @@ -2259,6 +2276,31 @@ mod tests { } } + fn make_struct_required_cols( + column_name: &str, + column_index: usize, + fields: Vec<(StatisticsType, Field)>, + min_values: Option, + max_values: Option, + num_containers: usize, + ) -> (RequiredColumns, OneContainerStats) { + let column = phys_expr::Column::new(column_name, column_index); + let required_columns = RequiredColumns::from( + fields + .into_iter() + .map(|(statistics_type, field)| (column.clone(), statistics_type, field)) + .collect::>(), + ); + + let statistics = OneContainerStats { + min_values, + max_values, + num_containers, + }; + + (required_columns, statistics) + } + /// Row count should only be referenced once in the pruning expression, even if we need the row count /// for multiple columns. #[test] @@ -2496,6 +2538,265 @@ mod tests { "); } + #[test] + fn test_build_statistics_large_binary_to_large_utf8_invalid_bytes() { + // When casting binary/byte statistics to Utf8/large-utf8, invalid + // UTF-8 sequences should be converted to nulls instead of causing + // an error. This test supplies a LargeBinaryArray containing an + // invalid byte sequence and ensures the corresponding Utf8 + // output column contains a NULL for that entry while preserving + // valid strings. + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("bin", 0), + StatisticsType::Min, + Field::new("bin_min", DataType::LargeUtf8, true), + )]); + + let statistics = TestStatistics::new().with( + "bin", + ContainerStats::new().with_min(Arc::new(LargeBinaryArray::from(vec![ + Some("alpha".as_bytes()), + Some(&[0xf0, 0x28, 0x8c, 0x28]), + Some("omega".as_bytes()), + ])) as ArrayRef), + ); + + let batch = + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + assert!(!batch.column(0).is_null(0)); + assert!(batch.column(0).is_null(1)); + assert!(!batch.column(0).is_null(2)); + + assert_snapshot!(batches_to_string(&[batch]), @r" + +---------+ + | bin_min | + +---------+ + | alpha | + | | + | omega | + +---------+ + "); + } + + #[test] + fn test_build_statistics_binary_view_to_utf8_view_invalid_bytes() { + // Similar to `test_build_statistics_large_binary_to_large_utf8_invalid_bytes`, + // but exercises the BinaryView -> Utf8View conversion path. Invalid + // UTF-8 in the source should produce NULLs in the Utf8View output. + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("bin_view", 0), + StatisticsType::Min, + Field::new("bin_view_min", DataType::Utf8View, true), + )]); + + let statistics = TestStatistics::new().with( + "bin_view", + ContainerStats::new().with_min(Arc::new(BinaryViewArray::from(vec![ + Some("alpha".as_bytes()), + Some(&[0xf0, 0x28, 0x8c, 0x28]), + Some("omega".as_bytes()), + ])) as ArrayRef), + ); + + let batch = + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + assert!(!batch.column(0).is_null(0)); + assert!(batch.column(0).is_null(1)); + assert!(!batch.column(0).is_null(2)); + + assert_snapshot!(batches_to_string(&[batch]), @r" + +--------------+ + | bin_view_min | + +--------------+ + | alpha | + | | + | omega | + +--------------+ + "); + } + + #[test] + fn test_build_statistics_binary_view_to_utf8_view_valid_bytes() { + // If the underlying BinaryViewArray already contains valid UTF-8 + // bytes we should avoid unnecessary cloning or conversion. This test + // verifies that `sanitize_binary_array_for_utf8` returns the same + // array pointer when the bytes are already valid UTF-8 and that + // the resulting record batch contains the expected string values. + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("bin_view", 0), + StatisticsType::Min, + Field::new("bin_view_min", DataType::Utf8View, true), + )]); + + let input_strings = ["alpha", "beta", "gamma"]; + let input_bytes = input_strings + .iter() + .map(|value| Some(value.as_bytes())) + .collect::>(); + + let binary_view_array: ArrayRef = + Arc::new(BinaryViewArray::from(input_bytes)) as ArrayRef; + let sanitized = sanitize_binary_array_for_utf8(Arc::clone(&binary_view_array)); + assert!(Arc::ptr_eq(&binary_view_array, &sanitized)); + + let statistics = TestStatistics::new().with( + "bin_view", + ContainerStats::new().with_min(binary_view_array), + ); + + let batch = + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + let utf8_array = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("StringViewArray"); + + for (idx, expected) in input_strings.iter().enumerate() { + assert!(!utf8_array.is_null(idx)); + assert_eq!(utf8_array.value(idx), *expected); + } + } + + #[test] + fn test_build_statistics_struct_column() { + // Verify that struct-typed statistics are preserved and reconstructed + // correctly into StructArray columns in the record batch. The test + // constructs nested struct min/max arrays and ensures the produced + // columns keep the nested layout and values. + let flag_field = Arc::new(Field::new("flag", DataType::Boolean, true)); + let label_field = Arc::new(Field::new("label", DataType::Utf8, true)); + let nested_struct_fields = + vec![flag_field.as_ref().clone(), label_field.as_ref().clone()]; + let nested_field = Arc::new(Field::new( + "nested", + DataType::Struct(nested_struct_fields.clone().into()), + true, + )); + let id_field = Arc::new(Field::new("id", DataType::Int32, true)); + let struct_type = DataType::Struct( + vec![id_field.as_ref().clone(), nested_field.as_ref().clone()].into(), + ); + + let nested_min = Arc::new(StructArray::from(vec![ + ( + flag_field.clone(), + Arc::new(BooleanArray::from(vec![Some(true), Some(false)])) as ArrayRef, + ), + ( + label_field.clone(), + Arc::new(StringArray::from(vec![Some("alpha"), None])) as ArrayRef, + ), + ])); + + let nested_max = Arc::new(StructArray::from(vec![ + ( + flag_field.clone(), + Arc::new(BooleanArray::from(vec![Some(true), Some(true)])) as ArrayRef, + ), + ( + label_field.clone(), + Arc::new(StringArray::from(vec![Some("omega"), Some("middle")])) + as ArrayRef, + ), + ])); + + let struct_min = Arc::new(StructArray::from(vec![ + ( + id_field.clone(), + Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef, + ), + (nested_field.clone(), nested_min.clone() as ArrayRef), + ])); + + let struct_max = Arc::new(StructArray::from(vec![ + ( + id_field.clone(), + Arc::new(Int32Array::from(vec![Some(10), Some(20)])) as ArrayRef, + ), + (nested_field.clone(), nested_max.clone() as ArrayRef), + ])); + + let expected_min_array: ArrayRef = struct_min.clone(); + let expected_max_array: ArrayRef = struct_max.clone(); + let num_containers = expected_min_array.len(); + + let (required_columns, statistics) = make_struct_required_cols( + "struct_col", + 0, + vec![ + ( + StatisticsType::Min, + Field::new("struct_col_min", struct_type.clone(), true), + ), + ( + StatisticsType::Max, + Field::new("struct_col_max", struct_type.clone(), true), + ), + ], + Some(expected_min_array.clone()), + Some(expected_max_array.clone()), + num_containers, + ); + let batch = + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 2); + + let min_column = batch.column(0).clone(); + let max_column = batch.column(1).clone(); + + assert_eq!(min_column.data_type(), &struct_type); + assert_eq!(max_column.data_type(), &struct_type); + let min_struct = min_column + .as_ref() + .as_any() + .downcast_ref::() + .unwrap(); + let max_struct = max_column + .as_ref() + .as_any() + .downcast_ref::() + .unwrap(); + + let min_nested = min_struct.column(1).clone(); + let max_nested = max_struct.column(1).clone(); + + assert_eq!(min_nested.data_type(), nested_field.data_type()); + assert_eq!(max_nested.data_type(), nested_field.data_type()); + + let expected_nested_min_array: ArrayRef = nested_min.clone(); + let expected_nested_max_array: ArrayRef = nested_max.clone(); + + for row in 0..batch.num_rows() { + let actual_min = ScalarValue::try_from_array(&min_column, row).unwrap(); + let expected_min = + ScalarValue::try_from_array(&expected_min_array, row).unwrap(); + assert_eq!(actual_min, expected_min); + + let actual_max = ScalarValue::try_from_array(&max_column, row).unwrap(); + let expected_max = + ScalarValue::try_from_array(&expected_max_array, row).unwrap(); + assert_eq!(actual_max, expected_max); + + let actual_nested_min = + ScalarValue::try_from_array(&min_nested, row).unwrap(); + let expected_nested_min = + ScalarValue::try_from_array(&expected_nested_min_array, row).unwrap(); + assert_eq!(actual_nested_min, expected_nested_min); + + let actual_nested_max = + ScalarValue::try_from_array(&max_nested, row).unwrap(); + let expected_nested_max = + ScalarValue::try_from_array(&expected_nested_max_array, row).unwrap(); + assert_eq!(actual_nested_max, expected_nested_max); + } + } + #[test] fn test_build_statistics_no_required_stats() { let required_columns = RequiredColumns::new(); @@ -2512,32 +2813,130 @@ mod tests { } #[test] - fn test_build_statistics_inconsistent_types() { - // Test requesting a Utf8 column when the stats return some other type - - // Request a record batch with of s1_min as a timestamp + fn test_build_statistics_invalid_utf8_safe_cast() { + // When casting binary statistics to Utf8 with 'safe' semantics, invalid + // UTF-8 byte sequences should be converted to NULL values rather than + // causing an error. This test feeds an invalid single-byte sequence + // and asserts the produced Utf8 column contains a NULL. let required_columns = RequiredColumns::from(vec![( phys_expr::Column::new("s3", 3), StatisticsType::Min, Field::new("s1_min", DataType::Utf8, true), )]); - // Note the statistics return an invalid UTF-8 sequence which will be converted to null + // Binary statistics contain an invalid UTF-8 sequence let statistics = OneContainerStats { min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))), max_values: None, num_containers: 1, }; - let batch = + // With safe casting, invalid UTF-8 should be converted to null rather than erroring + let result = build_statistics_record_batch(&statistics, &required_columns).unwrap(); - assert_snapshot!(batches_to_string(&[batch]), @r" - +--------+ - | s1_min | - +--------+ - | | - +--------+ - "); + + // Verify we got a record batch with null value for the invalid UTF-8 data + assert_eq!(result.num_rows(), 1); + assert_eq!(result.num_columns(), 1); + let column = result.column(0); + assert!( + column.is_null(0), + "Invalid UTF-8 should be converted to null with safe casting" + ); + } + + #[test] + fn test_build_statistics_nested_invalid_utf8_safe_cast() { + // Ensure nested struct statistics that contain binary fields with + // invalid UTF-8 are cast safely to Utf8, converting invalid + // entries to NULL while preserving valid entries. This exercises + // nested struct casting and the safe-cast behavior for Binary and + // BinaryView arrays. + fn run_case(binary_array: ArrayRef, binary_data_type: DataType, case_name: &str) { + let num_containers = binary_array.len(); + + let label_source_field = + Arc::new(Field::new("label", binary_data_type, true)); + let label_target_field = Arc::new(Field::new("label", DataType::Utf8, true)); + let nested_source_field = Arc::new(Field::new( + "nested", + DataType::Struct(vec![label_source_field.as_ref().clone()].into()), + true, + )); + let nested_target_field = Arc::new(Field::new( + "nested", + DataType::Struct(vec![label_target_field.as_ref().clone()].into()), + true, + )); + let target_struct_type = + DataType::Struct(vec![nested_target_field.as_ref().clone()].into()); + + let nested_values: ArrayRef = Arc::new(StructArray::from(vec![( + Arc::clone(&label_source_field), + Arc::clone(&binary_array), + )])); + let struct_min: ArrayRef = Arc::new(StructArray::from(vec![( + Arc::clone(&nested_source_field), + Arc::clone(&nested_values), + )])); + + let required_columns = RequiredColumns::from(vec![( + phys_expr::Column::new("struct_col", 0), + StatisticsType::Min, + Field::new("struct_col_min", target_struct_type.clone(), true), + )]); + + let statistics = OneContainerStats { + min_values: Some(struct_min), + max_values: None, + num_containers, + }; + + let batch = + build_statistics_record_batch(&statistics, &required_columns).unwrap(); + + assert_eq!(batch.num_columns(), 1, "{case_name}: expected one column"); + assert_eq!(batch.num_rows(), num_containers); + + let struct_array = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("StructArray"); + assert_eq!(struct_array.data_type(), &target_struct_type); + + let nested_array = struct_array + .column(0) + .as_ref() + .as_any() + .downcast_ref::() + .expect("nested StructArray"); + let label_array = nested_array + .column(0) + .as_ref() + .as_any() + .downcast_ref::() + .expect("StringArray"); + + assert!( + label_array.is_null(0), + "{case_name}: invalid UTF-8 should cast to NULL", + ); + assert!(label_array.is_valid(1)); + assert_eq!(label_array.value(1), "valid"); + } + + let binary_array: ArrayRef = Arc::new(BinaryArray::from(vec![ + Some(&[0xFFu8][..]), + Some(b"valid".as_slice()), + ])); + run_case(binary_array, DataType::Binary, "BinaryArray"); + + let mut builder = BinaryViewBuilder::with_capacity(2); + builder.append_value([0xFF]); + builder.append_value(b"valid"); + let binary_view_array: ArrayRef = Arc::new(builder.finish()); + run_case(binary_view_array, DataType::BinaryView, "BinaryViewArray"); } #[test] @@ -2558,12 +2957,115 @@ mod tests { let result = build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); - assert!( - result - .to_string() - .contains("mismatched statistics length. Expected 3, got 1"), - "{}", - result + assert_contains!( + result.to_string(), + "mismatched statistics length. Expected 3, got 1" + ); + } + + #[test] + fn test_build_statistics_struct_incompatible_layout_error() { + // Request struct statistics but provide a source struct layout that + // doesn't match the requested nested layout. This should produce + // an explanatory cast error indicating the incompatible nested + // field types. + let requested_field = Field::new( + "struct_col_min", + DataType::Struct( + vec![Field::new( + "nested", + DataType::Struct( + vec![Field::new("value", DataType::Int32, true)].into(), + ), + true, + )] + .into(), + ), + true, + ); + + let min_values: ArrayRef = Arc::new(StructArray::from(vec![( + Arc::new(Field::new("nested", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef, + )])); + + let (required_columns, statistics) = make_struct_required_cols( + "struct_col", + 3, + vec![(StatisticsType::Min, requested_field)], + Some(min_values), + None, + 1, + ); + + let result = + build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); + assert_contains!( + result.to_string(), + "Cannot cast struct field 'nested' from type Int32 to type Struct" + ); + } + + #[test] + fn test_build_statistics_struct_non_struct_source_error() { + // Request struct statistics but provide a non-struct (primitive) + // statistics array as the underlying data. This should error out + // because a primitive array cannot be cast to a struct layout. + let requested_field = Field::new( + "struct_col_min", + DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), + true, + ); + + let min_values: ArrayRef = Arc::new(Int32Array::from(vec![Some(1)])); + + let (required_columns, statistics) = make_struct_required_cols( + "struct_col", + 3, + vec![(StatisticsType::Min, requested_field)], + Some(min_values), + None, + 1, + ); + + let result = + build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); + assert_contains!( + result.to_string(), + "Cannot cast column of type Int32 to struct type" + ); + } + + #[test] + fn test_build_statistics_struct_inconsistent_length() { + // Verify that struct statistics arrays with mismatched lengths (the + // `num_containers` value vs the actual struct array length) result + // in a clear error message indicating the mismatch. + let requested_field = Field::new( + "struct_col_min", + DataType::Struct(vec![Field::new("value", DataType::Int32, true)].into()), + true, + ); + + let min_values: ArrayRef = Arc::new(StructArray::from(vec![( + Arc::new(Field::new("value", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![Some(10)])) as ArrayRef, + )])); + + let (required_columns, statistics) = make_struct_required_cols( + "struct_col", + 3, + vec![(StatisticsType::Min, requested_field)], + Some(min_values), + None, + 2, + ); + + let result = + build_statistics_record_batch(&statistics, &required_columns).unwrap_err(); + assert_contains!( + result.to_string(), + "mismatched statistics length. Expected 2, got 1" ); }