From 9e7e9a7a33fd5cefe60f8fa520a4f140d9015770 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 30 Nov 2025 09:05:39 +0100 Subject: [PATCH 01/16] wip --- datafusion/datasource-parquet/src/opener.rs | 2 - .../datasource-parquet/src/row_filter.rs | 111 +++++------------- .../src/schema_rewriter.rs | 18 +-- .../physical-expr/src/expressions/cast.rs | 95 +++++++++++++++ 4 files changed, 138 insertions(+), 88 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 83235dafdaf8..926a842a2d2c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -142,7 +142,6 @@ impl FileOpener for ParquetOpener { let projected_schema = SchemaRef::from(self.logical_file_schema.project(&self.projection)?); - let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory); let schema_adapter = self .schema_adapter_factory .create(projected_schema, Arc::clone(&self.logical_file_schema)); @@ -337,7 +336,6 @@ impl FileOpener for ParquetOpener { builder.metadata(), reorder_predicates, &file_metrics, - &schema_adapter_factory, ); match row_filter { diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index de9fe181f84f..998da0c43b00 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -74,7 +74,6 @@ use parquet::file::metadata::ParquetMetaData; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::Result; -use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; @@ -106,8 +105,6 @@ pub(crate) struct DatafusionArrowPredicate { rows_matched: metrics::Count, /// how long was spent evaluating this predicate time: metrics::Time, - /// used to perform type coercion while filtering rows - schema_mapper: Arc, } impl DatafusionArrowPredicate { @@ -131,7 +128,6 @@ impl DatafusionArrowPredicate { rows_pruned, rows_matched, time, - schema_mapper: candidate.schema_mapper, }) } } @@ -142,8 +138,6 @@ impl ArrowPredicate for DatafusionArrowPredicate { } fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { - let batch = self.schema_mapper.map_batch(batch)?; - // scoped timer updates on drop let mut timer = self.time.timer(); @@ -183,12 +177,8 @@ pub(crate) struct FilterCandidate { /// Can this filter use an index (e.g. a page index) to prune rows? can_use_index: bool, /// The projection to read from the file schema to get the columns - /// required to pass through a `SchemaMapper` to the table schema - /// upon which we then evaluate the filter expression. + /// required to evaluate the filter expression. projection: Vec, - /// A `SchemaMapper` used to map batches read from the file schema to - /// the filter's projection of the table schema. - schema_mapper: Arc, /// The projected table schema that this filter references filter_schema: SchemaRef, } @@ -198,42 +188,13 @@ pub(crate) struct FilterCandidate { /// This will do several things /// 1. Determine the columns required to evaluate the expression /// 2. Calculate data required to estimate the cost of evaluating the filter -/// 3. Rewrite column expressions in the predicate which reference columns not -/// in the particular file schema. -/// -/// # Schema Rewrite -/// -/// When parquet files are read in the context of "schema evolution" there are -/// potentially wo schemas: -/// -/// 1. The table schema (the columns of the table that the parquet file is part of) -/// 2. The file schema (the columns actually in the parquet file) -/// -/// There are times when the table schema contains columns that are not in the -/// file schema, such as when new columns have been added in new parquet files -/// but old files do not have the columns. -/// -/// When a file is missing a column from the table schema, the value of the -/// missing column is filled in by a `SchemaAdapter` (by default as `NULL`). -/// -/// When a predicate is pushed down to the parquet reader, the predicate is -/// evaluated in the context of the file schema. -/// For each predicate we build a filter schema which is the projection of the table -/// schema that contains only the columns that this filter references. -/// If any columns from the file schema are missing from a particular file they are -/// added by the `SchemaAdapter`, by default as `NULL`. struct FilterCandidateBuilder { expr: Arc, /// The schema of this parquet file. - /// Columns may have different types from the table schema and there may be - /// columns in the file schema that are not in the table schema or columns that - /// are in the table schema that are not in the file schema. file_schema: SchemaRef, /// The schema of the table (merged schema) -- columns may be in different /// order than in the file and have columns that are not in the file schema table_schema: SchemaRef, - /// A `SchemaAdapterFactory` used to map the file schema to the table schema. - schema_adapter_factory: Arc, } impl FilterCandidateBuilder { @@ -241,13 +202,11 @@ impl FilterCandidateBuilder { expr: Arc, file_schema: Arc, table_schema: Arc, - schema_adapter_factory: Arc, ) -> Self { Self { expr, file_schema, table_schema, - schema_adapter_factory, } } @@ -270,10 +229,12 @@ impl FilterCandidateBuilder { .project(&required_indices_into_table_schema)?, ); - let (schema_mapper, projection_into_file_schema) = self - .schema_adapter_factory - .create(Arc::clone(&projected_table_schema), self.table_schema) - .map_schema(&self.file_schema)?; + // Compute the projection into the file schema by matching column names + let projection_into_file_schema: Vec = projected_table_schema + .fields() + .iter() + .filter_map(|f| self.file_schema.index_of(f.name()).ok()) + .collect(); let required_bytes = size_of_columns(&projection_into_file_schema, metadata)?; let can_use_index = columns_sorted(&projection_into_file_schema, metadata)?; @@ -283,7 +244,6 @@ impl FilterCandidateBuilder { required_bytes, can_use_index, projection: projection_into_file_schema, - schema_mapper: Arc::clone(&schema_mapper), filter_schema: Arc::clone(&projected_table_schema), })) } @@ -429,7 +389,6 @@ pub fn build_row_filter( metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, - schema_adapter_factory: &Arc, ) -> Result> { let rows_pruned = &file_metrics.pushdown_rows_pruned; let rows_matched = &file_metrics.pushdown_rows_matched; @@ -447,7 +406,6 @@ pub fn build_row_filter( Arc::clone(expr), Arc::clone(physical_file_schema), Arc::clone(predicate_file_schema), - Arc::clone(schema_adapter_factory), ) .build(metadata) }) @@ -511,9 +469,9 @@ mod test { use datafusion_common::ScalarValue; use arrow::datatypes::{Field, TimeUnit::Nanosecond}; - use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion_expr::{col, Expr}; use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_expr_adapter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory}; use datafusion_physical_plan::metrics::{Count, Time}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -538,17 +496,12 @@ mod test { let expr = col("int64_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); let table_schema = Arc::new(table_schema.clone()); - let candidate = FilterCandidateBuilder::new( - expr, - table_schema.clone(), - table_schema, - schema_adapter_factory, - ) - .build(metadata) - .expect("building candidate"); + let candidate = + FilterCandidateBuilder::new(expr, table_schema.clone(), table_schema) + .build(metadata) + .expect("building candidate"); assert!(candidate.is_none()); } @@ -578,17 +531,18 @@ mod test { None, )); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); - let table_schema = Arc::new(table_schema.clone()); - let candidate = FilterCandidateBuilder::new( - expr, - file_schema.clone(), - table_schema.clone(), - schema_adapter_factory, + let expr = DefaultPhysicalExprAdapterFactory::default().create( + Arc::new(table_schema.clone()), + Arc::clone(&file_schema), ) - .build(&metadata) - .expect("building candidate") - .expect("candidate expected"); + .rewrite(expr) + .expect("rewriting expression"); + let table_schema = Arc::new(table_schema.clone()); + let candidate = + FilterCandidateBuilder::new(expr, file_schema.clone(), table_schema.clone()) + .build(&metadata) + .expect("building candidate") + .expect("candidate expected"); let mut row_filter = DatafusionArrowPredicate::try_new( candidate, @@ -619,16 +573,15 @@ mod test { None, )); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); - let candidate = FilterCandidateBuilder::new( - expr, - file_schema, - table_schema, - schema_adapter_factory, - ) - .build(&metadata) - .expect("building candidate") - .expect("candidate expected"); + // Rewrite the expression to add CastExpr for type coercion + let expr = DefaultPhysicalExprAdapterFactory::default() + .create(table_schema.clone(), Arc::clone(&file_schema)) + .rewrite(expr) + .expect("rewriting expression"); + let candidate = FilterCandidateBuilder::new(expr, file_schema, table_schema) + .build(&metadata) + .expect("building candidate") + .expect("candidate expected"); let mut row_filter = DatafusionArrowPredicate::try_new( candidate, diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index 61cc97dae300..e6574c9b7eda 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -27,8 +27,9 @@ use datafusion_common::{ Result, ScalarValue, }; use datafusion_functions::core::getfield::GetFieldFunc; +use datafusion_physical_expr::expressions::CastColumnExpr; use datafusion_physical_expr::{ - expressions::{self, CastExpr, Column}, + expressions::{self, Column}, ScalarFunctionExpr, }; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; @@ -152,7 +153,7 @@ pub trait PhysicalExprAdapterFactory: Send + Sync + std::fmt::Debug { ) -> Arc; } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct DefaultPhysicalExprAdapterFactory; impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory { @@ -424,11 +425,14 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { ); } - let cast_expr = Arc::new(CastExpr::new( - Arc::new(column), - logical_field.data_type().clone(), - None, - )); + let cast_expr = Arc::new( + CastColumnExpr::new( + Arc::new(column), + Arc::new(physical_field.clone()), + Arc::new(logical_field.clone()), + None, + ) + ); Ok(Transformed::yes(cast_expr)) } diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 0419161b532c..9bf242fccfc0 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -740,6 +740,101 @@ mod tests { Ok(()) } + #[test] + fn test_cast_timestamp_with_timezone_to_timestamp() -> Result<()> { + // Test casting from Timestamp(Nanosecond, Some("UTC")) to Timestamp(Nanosecond, None) + let schema = Schema::new(vec![Field::new( + "a", + Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + true, + )]); + let a = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("UTC"); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; + + let expression = cast_with_options( + col("a", &schema)?, + &schema, + Timestamp(TimeUnit::Nanosecond, None), + None, + )?; + + // verify that the expression's type is correct + assert_eq!( + expression.data_type(&schema)?, + Timestamp(TimeUnit::Nanosecond, None) + ); + + // compute + let result = expression + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); + + // verify that the array's data_type is correct + assert_eq!(*result.data_type(), Timestamp(TimeUnit::Nanosecond, None)); + + // verify that the data itself is downcastable and correct + let result = result + .as_any() + .downcast_ref::() + .expect("failed to downcast"); + + for (i, expected) in [1_i64, 2, 3, 4, 5].iter().enumerate() { + assert_eq!(result.value(i), *expected); + } + + Ok(()) + } + + #[test] + fn test_cast_timestamp_to_timestamp_with_timezone() -> Result<()> { + // Test casting from Timestamp(Nanosecond, None) to Timestamp(Nanosecond, Some("UTC")) + let schema = Schema::new(vec![Field::new( + "a", + Timestamp(TimeUnit::Nanosecond, None), + true, + )]); + let a = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; + + let expression = cast_with_options( + col("a", &schema)?, + &schema, + Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + None, + )?; + + // verify that the expression's type is correct + assert_eq!( + expression.data_type(&schema)?, + Timestamp(TimeUnit::Nanosecond, Some("UTC".into())) + ); + + // compute + let result = expression + .evaluate(&batch)? + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); + + // verify that the array's data_type is correct + assert_eq!( + *result.data_type(), + Timestamp(TimeUnit::Nanosecond, Some("UTC".into())) + ); + + // verify that the data itself is downcastable and correct + let result = result + .as_any() + .downcast_ref::() + .expect("failed to downcast"); + + for (i, expected) in [1_i64, 2, 3, 4, 5].iter().enumerate() { + assert_eq!(result.value(i), *expected); + } + + Ok(()) + } + #[test] fn invalid_cast() { // Ensure a useful error happens at plan time if invalid casts are used From b4e6a34985dc0636e487cfc131eac191bb8f059f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 30 Nov 2025 09:42:55 +0100 Subject: [PATCH 02/16] Have PhysicalExprAdapterFactory handle struct columns and remove SchemaAdapterFactory from row filter --- .../core/tests/parquet/schema_adapter.rs | 32 +--- datafusion/datasource-parquet/src/opener.rs | 140 ------------------ .../datasource-parquet/src/row_filter.rs | 16 +- .../src/schema_rewriter.rs | 56 ++++--- datafusion/pruning/src/pruning_predicate.rs | 15 ++ 5 files changed, 60 insertions(+), 199 deletions(-) diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 0e76d626aac5..4f3c12e7f3e0 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -293,36 +293,8 @@ async fn test_custom_schema_adapter_and_custom_expression_adapter() { ]; assert_batches_eq!(expected, &batches); - // Test using a custom schema adapter and no explicit physical expr adapter - // This should use the custom schema adapter both for projections and predicate pushdown - let listing_table_config = - ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) - .infer_options(&ctx.state()) - .await - .unwrap() - .with_schema(table_schema.clone()) - .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory)); - let table = ListingTable::try_new(listing_table_config).unwrap(); - ctx.deregister_table("t").unwrap(); - ctx.register_table("t", Arc::new(table)).unwrap(); - let batches = ctx - .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'a'") - .await - .unwrap() - .collect() - .await - .unwrap(); - let expected = [ - "+----+----+", - "| c2 | c1 |", - "+----+----+", - "| a | 2 |", - "+----+----+", - ]; - assert_batches_eq!(expected, &batches); - - // Do the same test but with a custom physical expr adapter - // Now the default schema adapter will be used for projections, but the custom physical expr adapter will be used for predicate pushdown + // Test with a custom physical expr adapter only + // The default schema adapter will be used for projections, and the custom physical expr adapter will be used for predicate pushdown let listing_table_config = ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) .infer_options(&ctx.state()) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 926a842a2d2c..248955038df2 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1287,144 +1287,4 @@ mod test { } } } - - #[tokio::test] - async fn test_custom_schema_adapter_no_rewriter() { - // Make a hardcoded schema adapter that adds a new column "b" with default value 0.0 - // and converts the first column "a" from Int32 to UInt64. - #[derive(Debug, Clone)] - struct CustomSchemaMapper; - - impl SchemaMapper for CustomSchemaMapper { - fn map_batch( - &self, - batch: arrow::array::RecordBatch, - ) -> datafusion_common::Result { - let a_column = cast(batch.column(0), &DataType::UInt64)?; - // Add in a new column "b" with default value 0.0 - let b_column = - arrow::array::Float64Array::from(vec![Some(0.0); batch.num_rows()]); - let columns = vec![a_column, Arc::new(b_column)]; - let new_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::UInt64, false), - Field::new("b", DataType::Float64, false), - ])); - Ok(arrow::record_batch::RecordBatch::try_new( - new_schema, columns, - )?) - } - - fn map_column_statistics( - &self, - file_col_statistics: &[ColumnStatistics], - ) -> datafusion_common::Result> { - Ok(vec![ - file_col_statistics[0].clone(), - ColumnStatistics::new_unknown(), - ]) - } - } - - #[derive(Debug, Clone)] - struct CustomSchemaAdapter; - - impl SchemaAdapter for CustomSchemaAdapter { - fn map_schema( - &self, - _file_schema: &Schema, - ) -> datafusion_common::Result<(Arc, Vec)> - { - let mapper = Arc::new(CustomSchemaMapper); - let projection = vec![0]; // We only need to read the first column "a" from the file - Ok((mapper, projection)) - } - - fn map_column_index( - &self, - index: usize, - file_schema: &Schema, - ) -> Option { - if index < file_schema.fields().len() { - Some(index) - } else { - None // The new column "b" is not in the original schema - } - } - } - - #[derive(Debug, Clone)] - struct CustomSchemaAdapterFactory; - - impl SchemaAdapterFactory for CustomSchemaAdapterFactory { - fn create( - &self, - _projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(CustomSchemaAdapter) - } - } - - // Test that if no expression rewriter is provided we use a schemaadapter to adapt the data to the expression - let store = Arc::new(InMemory::new()) as Arc; - let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); - // Write out the batch to a Parquet file - let data_size = - write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; - let file = PartitionedFile::new( - "test.parquet".to_string(), - u64::try_from(data_size).unwrap(), - ); - let table_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::UInt64, false), - Field::new("b", DataType::Float64, false), - ])); - - let make_opener = |predicate| ParquetOpener { - partition_index: 0, - projection: Arc::new([0, 1]), - batch_size: 1024, - limit: None, - predicate: Some(predicate), - logical_file_schema: Arc::clone(&table_schema), - metadata_size_hint: None, - metrics: ExecutionPlanMetricsSet::new(), - parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new( - Arc::clone(&store), - )), - partition_fields: vec![], - pushdown_filters: true, - reorder_filters: false, - force_filter_selections: false, - enable_page_index: false, - enable_bloom_filter: false, - schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory), - enable_row_group_stats_pruning: false, - coerce_int96: None, - #[cfg(feature = "parquet_encryption")] - file_decryption_properties: None, - expr_adapter_factory: None, - #[cfg(feature = "parquet_encryption")] - encryption_factory: None, - max_predicate_cache_size: None, - }; - - let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema); - let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); - let batches = collect_batches(stream).await; - - #[rustfmt::skip] - let expected = [ - "+---+-----+", - "| a | b |", - "+---+-----+", - "| 1 | 0.0 |", - "+---+-----+", - ]; - assert_batches_eq!(expected, &batches); - let metrics = opener.metrics.clone_inner(); - assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0); - assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 2); - } } diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 998da0c43b00..d8327f26a44a 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -471,7 +471,9 @@ mod test { use arrow::datatypes::{Field, TimeUnit::Nanosecond}; use datafusion_expr::{col, Expr}; use datafusion_physical_expr::planner::logical2physical; - use datafusion_physical_expr_adapter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory}; + use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory, + }; use datafusion_physical_plan::metrics::{Count, Time}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -531,12 +533,10 @@ mod test { None, )); let expr = logical2physical(&expr, &table_schema); - let expr = DefaultPhysicalExprAdapterFactory::default().create( - Arc::new(table_schema.clone()), - Arc::clone(&file_schema), - ) - .rewrite(expr) - .expect("rewriting expression"); + let expr = DefaultPhysicalExprAdapterFactory {} + .create(Arc::new(table_schema.clone()), Arc::clone(&file_schema)) + .rewrite(expr) + .expect("rewriting expression"); let table_schema = Arc::new(table_schema.clone()); let candidate = FilterCandidateBuilder::new(expr, file_schema.clone(), table_schema.clone()) @@ -574,7 +574,7 @@ mod test { )); let expr = logical2physical(&expr, &table_schema); // Rewrite the expression to add CastExpr for type coercion - let expr = DefaultPhysicalExprAdapterFactory::default() + let expr = DefaultPhysicalExprAdapterFactory {} .create(table_schema.clone(), Arc::clone(&file_schema)) .rewrite(expr) .expect("rewriting expression"); diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index e6574c9b7eda..3fe1cd8e69aa 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -153,7 +153,7 @@ pub trait PhysicalExprAdapterFactory: Send + Sync + std::fmt::Debug { ) -> Arc; } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct DefaultPhysicalExprAdapterFactory; impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory { @@ -425,14 +425,12 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { ); } - let cast_expr = Arc::new( - CastColumnExpr::new( - Arc::new(column), - Arc::new(physical_field.clone()), - Arc::new(logical_field.clone()), - None, - ) - ); + let cast_expr = Arc::new(CastColumnExpr::new( + Arc::new(column), + Arc::new(physical_field.clone()), + Arc::new(logical_field.clone()), + None, + )); Ok(Transformed::yes(cast_expr)) } @@ -452,7 +450,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::{assert_contains, record_batch, Result, ScalarValue}; use datafusion_expr::Operator; - use datafusion_physical_expr::expressions::{col, lit, CastExpr, Column, Literal}; + use datafusion_physical_expr::expressions::{col, lit, Column, Literal}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use itertools::Itertools; use std::sync::Arc; @@ -483,7 +481,7 @@ mod tests { let result = adapter.rewrite(column_expr).unwrap(); // Should be wrapped in a cast expression - assert!(result.as_any().downcast_ref::().is_some()); + assert!(result.as_any().downcast_ref::().is_some()); } #[test] @@ -514,9 +512,10 @@ mod tests { println!("Rewritten expression: {result}"); let expected = expressions::BinaryExpr::new( - Arc::new(CastExpr::new( + Arc::new(CastColumnExpr::new( Arc::new(Column::new("a", 0)), - DataType::Int64, + Arc::new(Field::new("a", DataType::Int32, false)), + Arc::new(Field::new("a", DataType::Int64, false)), None, )), Operator::Plus, @@ -593,15 +592,30 @@ mod tests { let result = adapter.rewrite(column_expr).unwrap(); - let expected = Arc::new(CastExpr::new( + let expected = Arc::new(CastColumnExpr::new( Arc::new(Column::new("data", 0)), - DataType::Struct( - vec![ - Field::new("id", DataType::Int64, false), - Field::new("name", DataType::Utf8View, true), - ] - .into(), - ), + Arc::new(Field::new( + "data", + DataType::Struct( + vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ] + .into(), + ), + false, + )), + Arc::new(Field::new( + "data", + DataType::Struct( + vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8View, true), + ] + .into(), + ), + false, + )), None, )) as Arc; diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index b9bbaea45a06..4110391514df 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -43,6 +43,7 @@ use datafusion_common::{ ScalarValue, }; use datafusion_expr_common::operator::Operator; +use datafusion_physical_expr::expressions::CastColumnExpr; use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; @@ -1105,6 +1106,20 @@ fn rewrite_expr_to_prunable( None, )); Ok((left, op, right)) + } else if let Some(cast_col) = column_expr_any.downcast_ref::() { + // `cast_column(col) op lit()` - same as CastExpr but uses CastColumnExpr + let arrow_schema = schema.as_arrow(); + let from_type = cast_col.expr().data_type(arrow_schema)?; + let to_type = cast_col.target_field().data_type(); + verify_support_type_for_prune(&from_type, to_type)?; + let (left, op, right) = + rewrite_expr_to_prunable(cast_col.expr(), op, scalar_expr, schema)?; + // Predicate pruning / statistics generally don't support struct columns yet. + // In the future we may want to support pruning on nested fields, in which case we probably need to + // do something more sophisticated here. + // But for now since we don't support pruning on nested fields, we can just cast to the target type directly. + let left = Arc::new(phys_expr::CastExpr::new(left, to_type.clone(), None)); + Ok((left, op, right)) } else if let Some(try_cast) = column_expr_any.downcast_ref::() { From c3f9ef9c0c725d4511113a3683e5c0d9c881b0bb Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 30 Nov 2025 09:48:57 +0100 Subject: [PATCH 03/16] lints --- datafusion/datasource-parquet/src/opener.rs | 43 +++---------------- .../datasource-parquet/src/row_filter.rs | 4 ++ 2 files changed, 9 insertions(+), 38 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 248955038df2..0d00d8ac851e 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -763,21 +763,14 @@ fn should_enable_page_index( mod test { use std::sync::Arc; - use arrow::{ - compute::cast, - datatypes::{DataType, Field, Schema, SchemaRef}, - }; + use arrow::datatypes::{DataType, Field, Schema}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ - assert_batches_eq, record_batch, stats::Precision, ColumnStatistics, - DataFusionError, ScalarValue, Statistics, + record_batch, stats::Precision, ColumnStatistics, DataFusionError, ScalarValue, + Statistics, }; use datafusion_datasource::{ - file_stream::FileOpener, - schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, - SchemaMapper, - }, + file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile, }; use datafusion_expr::{col, lit}; @@ -785,7 +778,7 @@ mod test { expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr, }; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; - use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; + use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{Stream, StreamExt}; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; @@ -809,21 +802,6 @@ mod test { (num_batches, num_rows) } - async fn collect_batches( - mut stream: std::pin::Pin< - Box< - dyn Stream> - + Send, - >, - >, - ) -> Vec { - let mut batches = vec![]; - while let Some(Ok(batch)) = stream.next().await { - batches.push(batch); - } - batches - } - async fn write_parquet( store: Arc, filename: &str, @@ -1276,15 +1254,4 @@ mod test { assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); } - - fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { - match metrics.sum_by_name(metric_name) { - Some(v) => v.as_usize(), - _ => { - panic!( - "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}" - ); - } - } - } } diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index d8327f26a44a..237690c281c9 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -188,6 +188,10 @@ pub(crate) struct FilterCandidate { /// This will do several things /// 1. Determine the columns required to evaluate the expression /// 2. Calculate data required to estimate the cost of evaluating the filter +/// +/// Note that this does *not* handle any adaptation of the data schema to the expression schema, +/// it is assumed that the expression has already been adapted to the file schema before being passed in here, +/// generally using [`PhysicalExprAdapter`](datafusion_physical_expr_adapter::PhysicalExprAdapter). struct FilterCandidateBuilder { expr: Arc, /// The schema of this parquet file. From 1593758e8c5a239f193c36a5b86c4cb2099aa616 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 30 Nov 2025 18:00:43 +0100 Subject: [PATCH 04/16] fully removed SchemaAdapter --- datafusion/core/src/datasource/mod.rs | 122 +++---- .../src/datasource/physical_plan/parquet.rs | 2 +- .../core/tests/parquet/schema_adapter.rs | 310 +----------------- .../schema_adapter_integration_tests.rs | 152 ++++----- .../datasource-parquet/src/file_format.rs | 5 +- datafusion/datasource-parquet/src/opener.rs | 85 +++-- datafusion/datasource-parquet/src/source.rs | 91 +---- 7 files changed, 201 insertions(+), 566 deletions(-) diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 620e389a0fb8..28faea9a6837 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -55,30 +55,35 @@ mod tests { use crate::prelude::SessionContext; use ::object_store::{path::Path, ObjectMeta}; use arrow::{ - array::{Int32Array, StringArray}, - datatypes::{DataType, Field, Schema, SchemaRef}, + array::Int32Array, + datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}, record_batch::RecordBatch, }; - use datafusion_common::{record_batch, test_util::batches_to_sort_string}; + use datafusion_common::{ + record_batch, + test_util::batches_to_sort_string, + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, ScalarValue, + }; use datafusion_datasource::{ - file::FileSource, file_scan_config::FileScanConfigBuilder, - schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, - SchemaMapper, - }, - source::DataSourceExec, + schema_adapter::DefaultSchemaAdapterFactory, source::DataSourceExec, PartitionedFile, }; use datafusion_datasource_parquet::source::ParquetSource; + use datafusion_physical_expr::expressions::{Column, Literal}; + use datafusion_physical_expr_adapter::{ + PhysicalExprAdapter, PhysicalExprAdapterFactory, + }; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::collect; use std::{fs, sync::Arc}; use tempfile::TempDir; #[tokio::test] - async fn can_override_schema_adapter() { - // Test shows that SchemaAdapter can add a column that doesn't existing in the - // record batches returned from parquet. This can be useful for schema evolution + async fn can_override_physical_expr_adapter() { + // Test shows that PhysicalExprAdapter can add a column that doesn't exist in the + // record batches returned from parquet. This can be useful for schema evolution // where older files may not have all columns. use datafusion_execution::object_store::ObjectStoreUrl; @@ -124,12 +129,11 @@ mod tests { let f2 = Field::new("extra_column", DataType::Utf8, true); let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); - let source = ParquetSource::new(Arc::clone(&schema)) - .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})) - .unwrap(); + let source = Arc::new(ParquetSource::new(Arc::clone(&schema))); let base_conf = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source) .with_file(partitioned_file) + .with_expr_adapter(Some(Arc::new(TestPhysicalExprAdapterFactory))) .build(); let parquet_exec = DataSourceExec::from_data_source(base_conf); @@ -200,72 +204,54 @@ mod tests { } #[derive(Debug)] - struct TestSchemaAdapterFactory; + struct TestPhysicalExprAdapterFactory; - impl SchemaAdapterFactory for TestSchemaAdapterFactory { + impl PhysicalExprAdapterFactory for TestPhysicalExprAdapterFactory { fn create( &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(TestSchemaAdapter { - table_schema: projected_table_schema, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + Arc::new(TestPhysicalExprAdapter { + logical_file_schema, + physical_file_schema, }) } } - struct TestSchemaAdapter { - /// Schema for the table - table_schema: SchemaRef, + #[derive(Debug)] + struct TestPhysicalExprAdapter { + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, } - impl SchemaAdapter for TestSchemaAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.table_schema.field(index); - Some(file_schema.fields.find(field.name())?.0) - } - - fn map_schema( - &self, - file_schema: &Schema, - ) -> datafusion_common::Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if self.table_schema.fields().find(file_field.name()).is_some() { - projection.push(file_idx); + impl PhysicalExprAdapter for TestPhysicalExprAdapter { + fn rewrite(&self, expr: Arc) -> Result> { + expr.transform(|e| { + if let Some(column) = e.as_any().downcast_ref::() { + // If column is "extra_column" and missing from physical schema, inject "foo" + if column.name() == "extra_column" + && self.physical_file_schema.index_of("extra_column").is_err() + { + return Ok(Transformed::yes(Arc::new(Literal::new( + ScalarValue::Utf8(Some("foo".to_string())), + )) + as Arc)); + } } - } - - Ok((Arc::new(TestSchemaMapping {}), projection)) - } - } - - #[derive(Debug)] - struct TestSchemaMapping {} - - impl SchemaMapper for TestSchemaMapping { - fn map_batch( - &self, - batch: RecordBatch, - ) -> datafusion_common::Result { - let f1 = Field::new("id", DataType::Int32, true); - let f2 = Field::new("extra_column", DataType::Utf8, true); - - let schema = Arc::new(Schema::new(vec![f1, f2])); - - let extra_column = Arc::new(StringArray::from(vec!["foo"])); - let mut new_columns = batch.columns().to_vec(); - new_columns.push(extra_column); - - Ok(RecordBatch::try_new(schema, new_columns).unwrap()) + Ok(Transformed::no(e)) + }) + .data() } - fn map_column_statistics( + fn with_partition_values( &self, - _file_col_statistics: &[datafusion_common::ColumnStatistics], - ) -> datafusion_common::Result> { - unimplemented!() + _partition_values: Vec<(FieldRef, ScalarValue)>, + ) -> Arc { + Arc::new(TestPhysicalExprAdapter { + logical_file_schema: self.logical_file_schema.clone(), + physical_file_schema: self.physical_file_schema.clone(), + }) } } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 4d7c4a878888..cd66ae87ec4e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1272,7 +1272,7 @@ mod tests { .round_trip_to_batches(vec![batch1, batch2]) .await; assert_contains!(read.unwrap_err().to_string(), - "Cannot cast file schema field c3 of type Date64 to table schema field of type Int8"); + "Cannot cast column 'c3' from 'Date64' (physical data type) to 'Int8' (logical data type)"); } #[tokio::test] diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 4f3c12e7f3e0..c6db98c0d1cd 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -17,8 +17,7 @@ use std::sync::Arc; -use arrow::array::{record_batch, RecordBatch, RecordBatchOptions}; -use arrow::compute::{cast_with_options, CastOptions}; +use arrow::array::{record_batch, RecordBatch}; use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion::assert_batches_eq; @@ -29,14 +28,8 @@ use datafusion::datasource::listing::{ use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::DataFusionError; -use datafusion_common::{ColumnStatistics, ScalarValue}; -use datafusion_datasource::file::FileSource; -use datafusion_datasource::file_scan_config::FileScanConfigBuilder; -use datafusion_datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, -}; +use datafusion_common::ScalarValue; use datafusion_datasource::ListingTableUrl; -use datafusion_datasource_parquet::source::ParquetSource; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::expressions::{self, Column}; use datafusion_physical_expr::PhysicalExpr; @@ -44,7 +37,6 @@ use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, }; -use itertools::Itertools; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; @@ -59,97 +51,6 @@ async fn write_parquet(batch: RecordBatch, store: Arc, path: &s store.put(&Path::from(path), data.into()).await.unwrap(); } -#[derive(Debug)] -struct CustomSchemaAdapterFactory; - -impl SchemaAdapterFactory for CustomSchemaAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(CustomSchemaAdapter { - logical_file_schema: projected_table_schema, - }) - } -} - -#[derive(Debug)] -struct CustomSchemaAdapter { - logical_file_schema: SchemaRef, -} - -impl SchemaAdapter for CustomSchemaAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - for (idx, field) in file_schema.fields().iter().enumerate() { - if field.name() == self.logical_file_schema.field(index).name() { - return Some(idx); - } - } - None - } - - fn map_schema( - &self, - file_schema: &Schema, - ) -> Result<(Arc, Vec)> { - let projection = (0..file_schema.fields().len()).collect_vec(); - Ok(( - Arc::new(CustomSchemaMapper { - logical_file_schema: Arc::clone(&self.logical_file_schema), - }), - projection, - )) - } -} - -#[derive(Debug)] -struct CustomSchemaMapper { - logical_file_schema: SchemaRef, -} - -impl SchemaMapper for CustomSchemaMapper { - fn map_batch(&self, batch: RecordBatch) -> Result { - let mut output_columns = - Vec::with_capacity(self.logical_file_schema.fields().len()); - for field in self.logical_file_schema.fields() { - if let Some(array) = batch.column_by_name(field.name()) { - output_columns.push(cast_with_options( - array, - field.data_type(), - &CastOptions::default(), - )?); - } else { - // Create a new array with the default value for the field type - let default_value = match field.data_type() { - DataType::Int64 => ScalarValue::Int64(Some(0)), - DataType::Utf8 => ScalarValue::Utf8(Some("a".to_string())), - _ => unimplemented!("Unsupported data type: {}", field.data_type()), - }; - output_columns - .push(default_value.to_array_of_size(batch.num_rows()).unwrap()); - } - } - let batch = RecordBatch::try_new_with_options( - Arc::clone(&self.logical_file_schema), - output_columns, - &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())), - ) - .unwrap(); - Ok(batch) - } - - fn map_column_statistics( - &self, - _file_col_statistics: &[ColumnStatistics], - ) -> Result> { - Ok(vec![ - ColumnStatistics::new_unknown(); - self.logical_file_schema.fields().len() - ]) - } -} - // Implement a custom PhysicalExprAdapterFactory that fills in missing columns with the default value for the field type #[derive(Debug)] struct CustomPhysicalExprAdapterFactory; @@ -264,13 +165,13 @@ async fn test_custom_schema_adapter_and_custom_expression_adapter() { ); assert!(!ctx.state().config().collect_statistics()); + // Test with DefaultPhysicalExprAdapterFactory - missing columns are filled with NULL let listing_table_config = ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) .infer_options(&ctx.state()) .await .unwrap() .with_schema(table_schema.clone()) - .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory)) .with_expr_adapter_factory(Arc::new(DefaultPhysicalExprAdapterFactory)); let table = ListingTable::try_new(listing_table_config).unwrap(); @@ -293,8 +194,9 @@ async fn test_custom_schema_adapter_and_custom_expression_adapter() { ]; assert_batches_eq!(expected, &batches); - // Test with a custom physical expr adapter only - // The default schema adapter will be used for projections, and the custom physical expr adapter will be used for predicate pushdown + // Test with a custom physical expr adapter + // PhysicalExprAdapterFactory now handles both predicates AND projections + // CustomPhysicalExprAdapterFactory fills missing columns with 'b' for Utf8 let listing_table_config = ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) .infer_options(&ctx.state()) @@ -312,208 +214,14 @@ async fn test_custom_schema_adapter_and_custom_expression_adapter() { .collect() .await .unwrap(); + // With CustomPhysicalExprAdapterFactory, missing column c2 is filled with 'b' + // in both the predicate (c2 = 'b' becomes 'b' = 'b' -> true) and the projection let expected = [ "+----+----+", "| c2 | c1 |", "+----+----+", - "| | 2 |", + "| b | 2 |", "+----+----+", ]; assert_batches_eq!(expected, &batches); - - // If we use both then the custom physical expr adapter will be used for predicate pushdown and the custom schema adapter will be used for projections - let listing_table_config = - ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) - .infer_options(&ctx.state()) - .await - .unwrap() - .with_schema(table_schema.clone()) - .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory)) - .with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory)); - let table = ListingTable::try_new(listing_table_config).unwrap(); - ctx.deregister_table("t").unwrap(); - ctx.register_table("t", Arc::new(table)).unwrap(); - let batches = ctx - .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'") - .await - .unwrap() - .collect() - .await - .unwrap(); - let expected = [ - "+----+----+", - "| c2 | c1 |", - "+----+----+", - "| a | 2 |", - "+----+----+", - ]; - assert_batches_eq!(expected, &batches); -} - -/// A test schema adapter factory that adds prefix to column names -#[derive(Debug)] -struct PrefixAdapterFactory { - prefix: String, -} - -impl SchemaAdapterFactory for PrefixAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(PrefixAdapter { - input_schema: projected_table_schema, - prefix: self.prefix.clone(), - }) - } -} - -/// A test schema adapter that adds prefix to column names -#[derive(Debug)] -struct PrefixAdapter { - input_schema: SchemaRef, - prefix: String, -} - -impl SchemaAdapter for PrefixAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.input_schema.field(index); - file_schema.fields.find(field.name()).map(|(i, _)| i) - } - - fn map_schema( - &self, - file_schema: &Schema, - ) -> Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - for (file_idx, file_field) in file_schema.fields().iter().enumerate() { - if self.input_schema.fields().find(file_field.name()).is_some() { - projection.push(file_idx); - } - } - - // Create a schema mapper that adds a prefix to column names - #[derive(Debug)] - struct PrefixSchemaMapping { - // Keep only the prefix field which is actually used in the implementation - prefix: String, - } - - impl SchemaMapper for PrefixSchemaMapping { - fn map_batch(&self, batch: RecordBatch) -> Result { - // Create a new schema with prefixed field names - let prefixed_fields: Vec = batch - .schema() - .fields() - .iter() - .map(|field| { - Field::new( - format!("{}{}", self.prefix, field.name()), - field.data_type().clone(), - field.is_nullable(), - ) - }) - .collect(); - let prefixed_schema = Arc::new(Schema::new(prefixed_fields)); - - // Create a new batch with the prefixed schema but the same data - let options = RecordBatchOptions::default(); - RecordBatch::try_new_with_options( - prefixed_schema, - batch.columns().to_vec(), - &options, - ) - .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) - } - - fn map_column_statistics( - &self, - stats: &[ColumnStatistics], - ) -> Result> { - // For testing, just return the input statistics - Ok(stats.to_vec()) - } - } - - Ok(( - Arc::new(PrefixSchemaMapping { - prefix: self.prefix.clone(), - }), - projection, - )) - } -} - -#[test] -fn test_apply_schema_adapter_with_factory() { - // Create a schema - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - ])); - - // Create a parquet source - let source = ParquetSource::new(schema.clone()); - - // Create a file scan config with source that has a schema adapter factory - let factory = Arc::new(PrefixAdapterFactory { - prefix: "test_".to_string(), - }); - - let file_source = source.clone().with_schema_adapter_factory(factory).unwrap(); - - let config = - FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) - .build(); - - // Apply schema adapter to a new source - let result_source = source.apply_schema_adapter(&config).unwrap(); - - // Verify the adapter was applied - assert!(result_source.schema_adapter_factory().is_some()); - - // Create adapter and test it produces expected schema - let adapter_factory = result_source.schema_adapter_factory().unwrap(); - let adapter = adapter_factory.create(schema.clone(), schema.clone()); - - // Create a dummy batch to test the schema mapping - let dummy_batch = RecordBatch::new_empty(schema.clone()); - - // Get the file schema (which is the same as the table schema in this test) - let (mapper, _) = adapter.map_schema(&schema).unwrap(); - - // Apply the mapping to get the output schema - let mapped_batch = mapper.map_batch(dummy_batch).unwrap(); - let output_schema = mapped_batch.schema(); - - // Check the column names have the prefix - assert_eq!(output_schema.field(0).name(), "test_id"); - assert_eq!(output_schema.field(1).name(), "test_name"); -} - -#[test] -fn test_apply_schema_adapter_without_factory() { - // Create a schema - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - ])); - - // Create a parquet source - let source = ParquetSource::new(schema.clone()); - - // Convert to Arc - let file_source: Arc = Arc::new(source.clone()); - - // Create a file scan config without a schema adapter factory - let config = - FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) - .build(); - - // Apply schema adapter function - should pass through the source unchanged - let result_source = source.apply_schema_adapter(&config).unwrap(); - - // Verify no adapter was applied - assert!(result_source.schema_adapter_factory().is_none()); } diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index 191529816481..52e61f6eed90 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -18,17 +18,20 @@ use std::sync::Arc; use arrow::array::RecordBatch; -use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion::common::Result; use datafusion::datasource::listing::PartitionedFile; +#[cfg(feature = "parquet")] +use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::physical_plan::{ - ArrowSource, CsvSource, FileSource, JsonSource, ParquetSource, + ArrowSource, CsvSource, FileSource, JsonSource, }; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_common::config::CsvOptions; -use datafusion_common::ColumnStatistics; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{ColumnStatistics, ScalarValue}; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::schema_adapter::{ SchemaAdapter, SchemaAdapterFactory, SchemaMapper, @@ -36,6 +39,9 @@ use datafusion_datasource::schema_adapter::{ use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::TableSchema; use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; @@ -156,9 +162,61 @@ impl SchemaMapper for UppercaseSchemaMapper { } } +/// A physical expression adapter factory that maps uppercase column names to lowercase +#[derive(Debug)] +struct UppercasePhysicalExprAdapterFactory; + +impl PhysicalExprAdapterFactory for UppercasePhysicalExprAdapterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + Arc::new(UppercasePhysicalExprAdapter { + logical_file_schema, + physical_file_schema, + }) + } +} + +#[derive(Debug)] +struct UppercasePhysicalExprAdapter { + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, +} + +impl PhysicalExprAdapter for UppercasePhysicalExprAdapter { + fn rewrite(&self, expr: Arc) -> Result> { + expr.transform(|e| { + if let Some(column) = e.as_any().downcast_ref::() { + // Map uppercase column name (from logical schema) to lowercase (in physical file) + let lowercase_name = column.name().to_lowercase(); + if let Ok(idx) = self.physical_file_schema.index_of(&lowercase_name) { + return Ok(Transformed::yes( + Arc::new(Column::new(&lowercase_name, idx)) + as Arc, + )); + } + } + Ok(Transformed::no(e)) + }) + .data() + } + + fn with_partition_values( + &self, + _partition_values: Vec<(FieldRef, ScalarValue)>, + ) -> Arc { + Arc::new(Self { + logical_file_schema: self.logical_file_schema.clone(), + physical_file_schema: self.physical_file_schema.clone(), + }) + } +} + #[cfg(feature = "parquet")] #[tokio::test] -async fn test_parquet_integration_with_schema_adapter() -> Result<()> { +async fn test_parquet_integration_with_physical_expr_adapter() -> Result<()> { // Create test data let batch = RecordBatch::try_new( Arc::new(Schema::new(vec![ @@ -190,12 +248,13 @@ async fn test_parquet_integration_with_schema_adapter() -> Result<()> { Field::new("NAME", DataType::Utf8, true), ])); - // Create a ParquetSource with the adapter factory - let file_source = ParquetSource::new(table_schema.clone()) - .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}))?; + // Create a ParquetSource with the table schema (uppercase columns) + let file_source = Arc::new(ParquetSource::new(table_schema.clone())); + // Use PhysicalExprAdapterFactory to map uppercase column names to lowercase let config = FileScanConfigBuilder::new(store_url, file_source) .with_file(PartitionedFile::new(path, file_size)) + .with_expr_adapter(Some(Arc::new(UppercasePhysicalExprAdapterFactory))) .build(); // Create a data source executor @@ -217,62 +276,6 @@ async fn test_parquet_integration_with_schema_adapter() -> Result<()> { Ok(()) } -#[cfg(feature = "parquet")] -#[tokio::test] -async fn test_parquet_integration_with_schema_adapter_and_expression_rewriter( -) -> Result<()> { - // Create test data - let batch = RecordBatch::try_new( - Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - ])), - vec![ - Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), - Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])), - ], - )?; - - let store = Arc::new(InMemory::new()) as Arc; - let store_url = ObjectStoreUrl::parse("memory://").unwrap(); - let path = "test.parquet"; - write_parquet(batch.clone(), store.clone(), path).await; - - // Get the actual file size from the object store - let object_meta = store.head(&Path::from(path)).await?; - let file_size = object_meta.size; - - // Create a session context and register the object store - let ctx = SessionContext::new(); - ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); - - // Create a ParquetSource with the adapter factory - let file_source = ParquetSource::new(batch.schema()) - .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}))?; - - let config = FileScanConfigBuilder::new(store_url, file_source) - .with_file(PartitionedFile::new(path, file_size)) - .build(); - - // Create a data source executor - let exec = DataSourceExec::from_data_source(config); - - // Collect results - let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx)?; - let batches = datafusion::physical_plan::common::collect(stream).await?; - - // There should be one batch - assert_eq!(batches.len(), 1); - - // Verify the schema has the original column names (schema adapter not applied in DataSourceExec) - let result_schema = batches[0].schema(); - assert_eq!(result_schema.field(0).name(), "id"); - assert_eq!(result_schema.field(1).name(), "name"); - - Ok(()) -} - #[tokio::test] async fn test_multi_source_schema_adapter_reuse() -> Result<()> { // This test verifies that the same schema adapter factory can be reused @@ -306,27 +309,8 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> { ); } - // Test ParquetSource - #[cfg(feature = "parquet")] - { - let schema = - Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); - let source = ParquetSource::new(schema); - let source_with_adapter = source - .clone() - .with_schema_adapter_factory(factory.clone()) - .unwrap(); - - let base_source: Arc = source.into(); - assert!(base_source.schema_adapter_factory().is_none()); - assert!(source_with_adapter.schema_adapter_factory().is_some()); - - let retrieved_factory = source_with_adapter.schema_adapter_factory().unwrap(); - assert_eq!( - format!("{:?}", retrieved_factory.as_ref()), - format!("{:?}", factory.as_ref()) - ); - } + // Note: ParquetSource no longer supports SchemaAdapterFactory. + // Parquet now uses PhysicalExprAdapterFactory for schema adaptation. // Test CsvSource { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index a2ce16cd530d..9cc061cc45dc 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -483,11 +483,8 @@ impl FileFormat for ParquetFormat { source = self.set_source_encryption_factory(source, state)?; - // Apply schema adapter factory before building the new config - let file_source = source.apply_schema_adapter(&conf)?; - let conf = FileScanConfigBuilder::from(conf) - .with_source(file_source) + .with_source(Arc::new(source)) .build(); Ok(DataSourceExec::from_data_source(conf)) } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 0d00d8ac851e..27916e43569c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -25,7 +25,8 @@ use crate::{ }; use arrow::array::RecordBatch; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -97,8 +98,6 @@ pub(super) struct ParquetOpener { /// Should the bloom filter be read from parquet, if present, to skip row /// groups pub enable_bloom_filter: bool, - /// Schema adapter factory - pub schema_adapter_factory: Arc, /// Should row group pruning be applied pub enable_row_group_stats_pruning: bool, /// Coerce INT96 timestamps to specific TimeUnit @@ -140,11 +139,7 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; - let projected_schema = - SchemaRef::from(self.logical_file_schema.project(&self.projection)?); - let schema_adapter = self - .schema_adapter_factory - .create(projected_schema, Arc::clone(&self.logical_file_schema)); + let projection = Arc::clone(&self.projection); let mut predicate = self.predicate.clone(); let logical_file_schema = Arc::clone(&self.logical_file_schema); let partition_fields = self.partition_fields.clone(); @@ -268,7 +263,7 @@ impl FileOpener for ParquetOpener { // Adapt the predicate to the physical file schema. // This evaluates missing columns and inserts any necessary casts. - if let Some(expr_adapter_factory) = expr_adapter_factory { + if let Some(expr_adapter_factory) = expr_adapter_factory.as_ref() { predicate = predicate .map(|p| { let partition_values = partition_fields @@ -319,13 +314,43 @@ impl FileOpener for ParquetOpener { reader_metadata, ); - let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(&physical_file_schema)?; + let mut projection = + ProjectionExprs::from_indices(&projection, &logical_file_schema); + if let Some(expr_adapter_factory) = expr_adapter_factory { + let adapter = expr_adapter_factory + .create( + Arc::clone(&logical_file_schema), + Arc::clone(&physical_file_schema), + ) + .with_partition_values( + partition_fields + .iter() + .cloned() + .zip(partitioned_file.partition_values.clone()) + .collect_vec(), + ); + let projection_expressions = projection + .as_ref() + .iter() + .cloned() + .map(|mut proj| { + proj.expr = adapter.rewrite(Arc::clone(&proj.expr))?; + Ok(proj) + }) + .collect::>>()?; + projection = ProjectionExprs::new(projection_expressions); + } + let indices = projection + .as_ref() + .iter() + .map(|p| collect_columns(&p.expr)) + .flatten() + .map(|c| c.index()) + .sorted_unstable() + .unique() + .collect_vec(); - let mask = ProjectionMask::roots( - builder.parquet_schema(), - adapted_projections.iter().cloned(), - ); + let mask = ProjectionMask::roots(builder.parquet_schema(), indices); // Filter pushdown: evaluate predicates during scan if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { @@ -462,6 +487,24 @@ impl FileOpener for ParquetOpener { file_metrics.predicate_cache_inner_records.clone(); let predicate_cache_records = file_metrics.predicate_cache_records.clone(); + let stream_schema = Arc::clone(stream.schema()); + + // Rebase column indices to match the narrowed stream schema. + // The projection expressions have indices based on physical_file_schema, + // but the stream only contains the columns selected by the ProjectionMask. + let rebased_exprs = projection + .as_ref() + .iter() + .cloned() + .map(|mut proj| { + proj.expr = reassign_expr_columns(proj.expr, &stream_schema)?; + Ok(proj) + }) + .collect::>>()?; + let projection = ProjectionExprs::new(rebased_exprs); + + let projector = projection.make_projector(&stream_schema)?; + let stream = stream.map_err(DataFusionError::from).map(move |b| { b.and_then(|b| { copy_arrow_reader_metrics( @@ -469,7 +512,7 @@ impl FileOpener for ParquetOpener { &predicate_cache_inner_records, &predicate_cache_records, ); - schema_mapping.map_batch(b) + projector.project_batch(&b) }) }); @@ -769,10 +812,7 @@ mod test { record_batch, stats::Precision, ColumnStatistics, DataFusionError, ScalarValue, Statistics, }; - use datafusion_datasource::{ - file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory, - PartitionedFile, - }; + use datafusion_datasource::{file_stream::FileOpener, PartitionedFile}; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr, @@ -875,7 +915,6 @@ mod test { force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, #[cfg(feature = "parquet_encryption")] @@ -949,7 +988,6 @@ mod test { force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, #[cfg(feature = "parquet_encryption")] @@ -1039,7 +1077,6 @@ mod test { force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, #[cfg(feature = "parquet_encryption")] @@ -1132,7 +1169,6 @@ mod test { force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, #[cfg(feature = "parquet_encryption")] @@ -1225,7 +1261,6 @@ mod test { force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, #[cfg(feature = "parquet_encryption")] diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index d84ddf599379..4ad8de8d925e 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -32,9 +32,6 @@ use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; -use datafusion_datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapterFactory, -}; use arrow::datatypes::TimeUnit; use datafusion_common::config::TableParquetOptions; @@ -135,7 +132,7 @@ use parquet::encryption::decrypt::FileDecryptionProperties; /// details. /// /// * Schema evolution: read parquet files with different schemas into a unified -/// table schema. See [`SchemaAdapterFactory`] for more details. +/// table schema. See [`DefaultPhysicalExprAdapterFactory`] for more details. /// /// * metadata_size_hint: controls the number of bytes read from the end of the /// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a @@ -262,12 +259,13 @@ use parquet::encryption::decrypt::FileDecryptionProperties; /// [`Self::with_pushdown_filters`]). /// /// * Step 5: As each [`RecordBatch`] is read, it may be adapted by a -/// [`SchemaAdapter`] to match the table schema. By default missing columns are -/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`]. +/// [`DefaultPhysicalExprAdapterFactory`] to match the table schema. By default missing columns are +/// filled with nulls, but this can be customized via [`PhysicalExprAdapterFactory`]. /// /// [`RecordBatch`]: arrow::record_batch::RecordBatch /// [`SchemaAdapter`]: datafusion_datasource::schema_adapter::SchemaAdapter /// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData +/// [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory #[derive(Clone, Debug)] pub struct ParquetSource { /// Options for reading Parquet files @@ -282,8 +280,6 @@ pub struct ParquetSource { pub(crate) predicate: Option>, /// Optional user defined parquet file reader factory pub(crate) parquet_file_reader_factory: Option>, - /// Optional user defined schema adapter - pub(crate) schema_adapter_factory: Option>, /// Batch size configuration pub(crate) batch_size: Option, /// Optional hint for the size of the parquet metadata @@ -309,7 +305,6 @@ impl ParquetSource { metrics: ExecutionPlanMetricsSet::new(), predicate: None, parquet_file_reader_factory: None, - schema_adapter_factory: None, batch_size: None, metadata_size_hint: None, #[cfg(feature = "parquet_encryption")] @@ -456,28 +451,6 @@ impl ParquetSource { self.table_parquet_options.global.max_predicate_cache_size } - /// Applies schema adapter factory from the FileScanConfig if present. - /// - /// # Arguments - /// * `conf` - FileScanConfig that may contain a schema adapter factory - /// # Returns - /// The converted FileSource with schema adapter factory applied if provided - pub fn apply_schema_adapter( - self, - conf: &FileScanConfig, - ) -> datafusion_common::Result> { - let file_source: Arc = self.into(); - - // If the FileScanConfig.file_source() has a schema adapter factory, apply it - if let Some(factory) = conf.file_source().schema_adapter_factory() { - file_source.with_schema_adapter_factory( - Arc::::clone(&factory), - ) - } else { - Ok(file_source) - } - } - #[cfg(feature = "parquet_encryption")] fn get_encryption_factory_with_config( &self, @@ -526,43 +499,10 @@ impl FileSource for ParquetSource { ) -> datafusion_common::Result> { let split_projection = self.projection.clone(); - let (expr_adapter_factory, schema_adapter_factory) = match ( - base_config.expr_adapter_factory.as_ref(), - self.schema_adapter_factory.as_ref(), - ) { - (Some(expr_adapter_factory), Some(schema_adapter_factory)) => { - // Use both the schema adapter factory and the expr adapter factory. - // This results in the SchemaAdapter being used for projections (e.g. a column was selected that is a UInt32 in the file and a UInt64 in the table schema) - // but the PhysicalExprAdapterFactory being used for predicate pushdown and stats pruning. - ( - Some(Arc::clone(expr_adapter_factory)), - Arc::clone(schema_adapter_factory), - ) - } - (Some(expr_adapter_factory), None) => { - // If no custom schema adapter factory is provided but an expr adapter factory is provided use the expr adapter factory alongside the default schema adapter factory. - // This means that the PhysicalExprAdapterFactory will be used for predicate pushdown and stats pruning, while the default schema adapter factory will be used for projections. - ( - Some(Arc::clone(expr_adapter_factory)), - Arc::new(DefaultSchemaAdapterFactory) as _, - ) - } - (None, Some(schema_adapter_factory)) => { - // If a custom schema adapter factory is provided but no expr adapter factory is provided use the custom SchemaAdapter for both projections and predicate pushdown. - // This maximizes compatibility with existing code that uses the SchemaAdapter API and did not explicitly opt into the PhysicalExprAdapterFactory API. - (None, Arc::clone(schema_adapter_factory) as _) - } - (None, None) => { - // If no custom schema adapter factory or expr adapter factory is provided, use the default schema adapter factory and the default physical expr adapter factory. - // This means that the default SchemaAdapter will be used for projections (e.g. a column was selected that is a UInt32 in the file and a UInt64 in the table schema) - // and the default PhysicalExprAdapterFactory will be used for predicate pushdown and stats pruning. - // This is the default behavior with not customization and means that most users of DataFusion will be cut over to the new PhysicalExprAdapterFactory API. - ( - Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - Arc::new(DefaultSchemaAdapterFactory) as _, - ) - } - }; + let expr_adapter_factory = base_config + .expr_adapter_factory + .clone() + .or_else(|| Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _)); let parquet_file_reader_factory = self.parquet_file_reader_factory.clone().unwrap_or_else(|| { @@ -604,7 +544,6 @@ impl FileSource for ParquetSource { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, - schema_adapter_factory, coerce_int96, #[cfg(feature = "parquet_encryption")] file_decryption_properties, @@ -780,20 +719,6 @@ impl FileSource for ParquetSource { ) .with_updated_node(source)) } - - fn with_schema_adapter_factory( - &self, - schema_adapter_factory: Arc, - ) -> datafusion_common::Result> { - Ok(Arc::new(Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self.clone() - })) - } - - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() - } } #[cfg(test)] From f5673625e0c6edcdf7a3b9fecd56b006e2b86c2b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 30 Nov 2025 18:12:07 +0100 Subject: [PATCH 05/16] some tweaks, add upgrading guide --- .../core/tests/parquet/schema_adapter.rs | 94 +++++++++++++++++++ .../schema_adapter_integration_tests.rs | 18 ++++ .../src/schema_rewriter.rs | 6 +- docs/source/library-user-guide/upgrading.md | 11 ++- 4 files changed, 124 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index c6db98c0d1cd..581bb21d0cb1 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -225,3 +225,97 @@ async fn test_custom_schema_adapter_and_custom_expression_adapter() { ]; assert_batches_eq!(expected, &batches); } + +/// Test demonstrating how to implement a custom PhysicalExprAdapterFactory +/// that fills missing columns with non-null default values. +/// +/// This is the recommended migration path for users who previously used +/// SchemaAdapterFactory to fill missing columns with default values. +/// Instead of transforming batches after reading (SchemaAdapter::map_batch), +/// the PhysicalExprAdapterFactory rewrites expressions to use literals for +/// missing columns, achieving the same result more efficiently. +#[tokio::test] +async fn test_physical_expr_adapter_with_non_null_defaults() { + // File only has c1 column + let batch = record_batch!(("c1", Int32, [10, 20, 30])).unwrap(); + + let store = Arc::new(InMemory::new()) as Arc; + let store_url = ObjectStoreUrl::parse("memory://").unwrap(); + write_parquet(batch, store.clone(), "defaults_test.parquet").await; + + // Table schema has additional columns c2 (Utf8) and c3 (Int64) that don't exist in file + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int64, false), // type differs from file (Int32 vs Int64) + Field::new("c2", DataType::Utf8, true), // missing from file + Field::new("c3", DataType::Int64, true), // missing from file + ])); + + let mut cfg = SessionConfig::new() + .with_collect_statistics(false) + .with_parquet_pruning(false); + cfg.options_mut().execution.parquet.pushdown_filters = true; + let ctx = SessionContext::new_with_config(cfg); + ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + + // CustomPhysicalExprAdapterFactory fills: + // - missing Utf8 columns with 'b' + // - missing Int64 columns with 1 + let listing_table_config = + ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) + .infer_options(&ctx.state()) + .await + .unwrap() + .with_schema(table_schema.clone()) + .with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory)); + + let table = ListingTable::try_new(listing_table_config).unwrap(); + ctx.register_table("t", Arc::new(table)).unwrap(); + + // Query all columns - missing columns should have default values + let batches = ctx + .sql("SELECT c1, c2, c3 FROM t ORDER BY c1") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // c1 is cast from Int32 to Int64, c2 defaults to 'b', c3 defaults to 1 + let expected = [ + "+----+----+----+", + "| c1 | c2 | c3 |", + "+----+----+----+", + "| 10 | b | 1 |", + "| 20 | b | 1 |", + "| 30 | b | 1 |", + "+----+----+----+", + ]; + assert_batches_eq!(expected, &batches); + + // Verify predicates work with default values + // c3 = 1 should match all rows since default is 1 + let batches = ctx + .sql("SELECT c1 FROM t WHERE c3 = 1 ORDER BY c1") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let expected = [ + "+----+", "| c1 |", "+----+", "| 10 |", "| 20 |", "| 30 |", "+----+", + ]; + assert_batches_eq!(expected, &batches); + + // c3 = 999 should match no rows + let batches = ctx + .sql("SELECT c1 FROM t WHERE c3 = 999") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let expected = ["++", "++"]; + assert_batches_eq!(expected, &batches); +} diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index 52e61f6eed90..b1610f5bd722 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -273,6 +273,24 @@ async fn test_parquet_integration_with_physical_expr_adapter() -> Result<()> { assert_eq!(result_schema.field(0).name(), "ID"); assert_eq!(result_schema.field(1).name(), "NAME"); + // Verify the data was correctly read from the lowercase file columns + // This confirms the PhysicalExprAdapter successfully mapped uppercase -> lowercase + let id_array = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected Int32Array for ID column"); + assert_eq!(id_array.values(), &[1, 2, 3]); + + let name_array = batches[0] + .column(1) + .as_any() + .downcast_ref::() + .expect("Expected StringArray for NAME column"); + assert_eq!(name_array.value(0), "a"); + assert_eq!(name_array.value(1), "b"); + assert_eq!(name_array.value(2), "c"); + Ok(()) } diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index 3fe1cd8e69aa..2f587136fe1f 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -381,10 +381,8 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { column.name() ); } - // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. - // TODO: do we need to sync this with what the `SchemaAdapter` actually does? - // While the default implementation fills in nulls in theory a custom `SchemaAdapter` could do something else! - // See https://github.com/apache/datafusion/issues/16527 + // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` used to do. + // If users want a different behavior they need to provide a custom `PhysicalExprAdapter` implementation. let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?; return Ok(Transformed::yes(expressions::lit(null_value))); diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index eac1e962a0a1..37da84104265 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -352,7 +352,16 @@ let config = FileScanConfigBuilder::new(url, source) .build(); ``` -**Handling projections in `FileSource`:** +### `SchemaAdapterFactory` Fully Removed from Parquet + +Following the deprecation announced in [DataFusion 49.0.0](#deprecating-schemaadapterfactory-and-schemaadapter), `SchemaAdapterFactory` has been fully removed from Parquet scanning. This applies to both: + +- **Predicate pushdown / row filtering** (deprecated in 49.0.0) +- **Projections** (newly removed in 52.0.0) + +If you were using a custom `SchemaAdapterFactory` for schema adaptation (e.g., default column values, type coercion), you should now implement `PhysicalExprAdapterFactory` instead. + +See the [default column values example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs) for how to implement a custom `PhysicalExprAdapterFactory`. ### `PhysicalOptimizerRule::optimize` deprecated in favor of `optimize_plan` From 5a65cbd970baced71f5fb5c9617b37801b6571e3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 30 Nov 2025 18:23:32 +0100 Subject: [PATCH 06/16] lint --- .../tests/schema_adapter/schema_adapter_integration_tests.rs | 3 --- datafusion/datasource-parquet/src/opener.rs | 3 +-- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index b1610f5bd722..769334f30395 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -327,9 +327,6 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> { ); } - // Note: ParquetSource no longer supports SchemaAdapterFactory. - // Parquet now uses PhysicalExprAdapterFactory for schema adaptation. - // Test CsvSource { let schema = diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 27916e43569c..b20f7d14e367 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -343,8 +343,7 @@ impl FileOpener for ParquetOpener { let indices = projection .as_ref() .iter() - .map(|p| collect_columns(&p.expr)) - .flatten() + .flat_map(|p| collect_columns(&p.expr)) .map(|c| c.index()) .sorted_unstable() .unique() From 74256d046d1d48a498d0bae43dad2d4cef96193f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 30 Nov 2025 18:27:15 +0100 Subject: [PATCH 07/16] add test, fix bug --- .../src/schema_rewriter.rs | 157 ++++++++++++++++-- 1 file changed, 145 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index 2f587136fe1f..daa4e6203c2f 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -23,6 +23,7 @@ use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef}; use datafusion_common::{ exec_err, + nested_struct::validate_struct_compatibility, tree_node::{Transformed, TransformedResult, TreeNode}, Result, ScalarValue, }; @@ -412,15 +413,28 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { // TODO: add optimization to move the cast from the column to literal expressions in the case of `col = 123` // since that's much cheaper to evalaute. // See https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928 - let is_compatible = - can_cast_types(physical_field.data_type(), logical_field.data_type()); - if !is_compatible { - return exec_err!( - "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", - column.name(), - physical_field.data_type(), - logical_field.data_type() - ); + // + // For struct types, use validate_struct_compatibility which handles: + // - Missing fields in source (filled with nulls) + // - Extra fields in source (ignored) + // - Recursive validation of nested structs + // For non-struct types, use Arrow's can_cast_types + match (physical_field.data_type(), logical_field.data_type()) { + (DataType::Struct(physical_fields), DataType::Struct(logical_fields)) => { + validate_struct_compatibility(physical_fields, logical_fields)?; + } + _ => { + let is_compatible = + can_cast_types(physical_field.data_type(), logical_field.data_type()); + if !is_compatible { + return exec_err!( + "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", + column.name(), + physical_field.data_type(), + logical_field.data_type() + ); + } + } } let cast_expr = Arc::new(CastColumnExpr::new( @@ -444,8 +458,11 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { #[cfg(test)] mod tests { use super::*; - use arrow::array::{RecordBatch, RecordBatchOptions}; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use arrow::array::{ + BooleanArray, Int32Array, Int64Array, RecordBatch, RecordBatchOptions, + StringArray, StringViewArray, StructArray, + }; + use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion_common::{assert_contains, record_batch, Result, ScalarValue}; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{col, lit, Column, Literal}; @@ -555,7 +572,11 @@ mod tests { let column_expr = Arc::new(Column::new("data", 0)); let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string(); - assert_contains!(error_msg, "Cannot cast column 'data'"); + // validate_struct_compatibility provides more specific error about which field can't be cast + assert_contains!( + error_msg, + "Cannot cast struct field 'field1' from type Binary to type Int32" + ); } #[test] @@ -837,6 +858,118 @@ mod tests { ); } + /// Test that struct columns are properly adapted including: + /// - Type casting of subfields (Int32 -> Int64, Utf8 -> Utf8View) + /// - Missing fields in logical schema are filled with nulls + #[test] + fn test_adapt_struct_batches() { + // Physical struct: {id: Int32, name: Utf8} + let physical_struct_fields: Fields = vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ] + .into(); + + let struct_array = StructArray::new( + physical_struct_fields.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])) as _, + Arc::new(StringArray::from(vec![ + Some("alice"), + None, + Some("charlie"), + ])) as _, + ], + None, + ); + + let physical_schema = Arc::new(Schema::new(vec![Field::new( + "data", + DataType::Struct(physical_struct_fields), + false, + )])); + + let physical_batch = RecordBatch::try_new( + Arc::clone(&physical_schema), + vec![Arc::new(struct_array)], + ) + .unwrap(); + + // Logical struct: {id: Int64, name: Utf8View, extra: Boolean} + // - id: cast from Int32 to Int64 + // - name: cast from Utf8 to Utf8View + // - extra: missing from physical, should be filled with nulls + let logical_struct_fields: Fields = vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8View, true), + Field::new("extra", DataType::Boolean, true), // New field, not in physical + ] + .into(); + + let logical_schema = Arc::new(Schema::new(vec![Field::new( + "data", + DataType::Struct(logical_struct_fields), + false, + )])); + + let projection = vec![col("data", &logical_schema).unwrap()]; + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = + factory.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema)); + + let adapted_projection = projection + .into_iter() + .map(|expr| adapter.rewrite(expr).unwrap()) + .collect_vec(); + + let adapted_schema = Arc::new(Schema::new( + adapted_projection + .iter() + .map(|expr| expr.return_field(&physical_schema).unwrap()) + .collect_vec(), + )); + + let res = batch_project( + adapted_projection, + &physical_batch, + Arc::clone(&adapted_schema), + ) + .unwrap(); + + assert_eq!(res.num_columns(), 1); + + let result_struct = res + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Verify id field is cast to Int64 + let id_col = result_struct.column_by_name("id").unwrap(); + assert_eq!(id_col.data_type(), &DataType::Int64); + let id_values = id_col.as_any().downcast_ref::().unwrap(); + assert_eq!( + id_values.iter().collect_vec(), + vec![Some(1), Some(2), Some(3)] + ); + + // Verify name field is cast to Utf8View + let name_col = result_struct.column_by_name("name").unwrap(); + assert_eq!(name_col.data_type(), &DataType::Utf8View); + let name_values = name_col.as_any().downcast_ref::().unwrap(); + assert_eq!( + name_values.iter().collect_vec(), + vec![Some("alice"), None, Some("charlie")] + ); + + // Verify extra field (missing from physical) is filled with nulls + let extra_col = result_struct.column_by_name("extra").unwrap(); + assert_eq!(extra_col.data_type(), &DataType::Boolean); + let extra_values = extra_col.as_any().downcast_ref::().unwrap(); + assert_eq!(extra_values.iter().collect_vec(), vec![None, None, None]); + } + #[test] fn test_try_rewrite_struct_field_access() { // Test the core logic of try_rewrite_struct_field_access From a5592af99623fc9cb0209890a6d008eabebdac37 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 30 Nov 2025 22:18:36 +0100 Subject: [PATCH 08/16] update comments --- datafusion/core/src/datasource/physical_plan/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index cd66ae87ec4e..aefbfe95cd20 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1257,7 +1257,7 @@ mod tests { ("c3", c3.clone()), ]); - // batch2: c3(int8), c2(int64), c1(string), c4(string) + // batch2: c3(date64), c2(int64), c1(string), c4(date64) let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]); let table_schema = Schema::new(vec![ From 95a1caf556c9747e630fc5201aac8b429d47a20a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 2 Dec 2025 00:40:54 +0100 Subject: [PATCH 09/16] add tests --- .../schema_adapter_integration_tests.rs | 207 ++++++++++++++++++ 1 file changed, 207 insertions(+) diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index 769334f30395..691db464a569 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -214,6 +214,213 @@ impl PhysicalExprAdapter for UppercasePhysicalExprAdapter { } } +/// Test reading a Parquet file where the table schema is flipped (c, b, a) vs. the physical file schema (a, b, c) +#[cfg(feature = "parquet")] +#[tokio::test] +async fn test_parquet_flipped_projection() -> Result<()> { + // Create test data + use datafusion::assert_batches_eq; + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, false), + ])), + vec![ + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), + Arc::new(arrow::array::StringArray::from(vec!["x", "y", "z"])), + Arc::new(arrow::array::Float64Array::from(vec![1.1, 2.2, 3.3])), + ], + )?; + + let store = Arc::new(InMemory::new()) as Arc; + let store_url = ObjectStoreUrl::parse("memory://").unwrap(); + let path = "flipped.parquet"; + write_parquet(batch.clone(), store.clone(), path).await; + + // Get the actual file size from the object store + let object_meta = store.head(&Path::from(path)).await?; + let file_size = object_meta.size; + + // Create a session context and register the object store + let ctx = SessionContext::new(); + ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + + // Create a table schema with flipped column order (c, b, a) + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c", DataType::Float64, false), + Field::new("b", DataType::Utf8, true), + Field::new("a", DataType::Int32, false), + ])); + + // Create a ParquetSource with the table schema + let file_source = Arc::new(ParquetSource::new(table_schema.clone())); + + // Use PhysicalExprAdapterFactory to map flipped columns + let config = FileScanConfigBuilder::new(store_url.clone(), file_source) + .with_file(PartitionedFile::new(path, file_size)) + .with_expr_adapter(None) + .build(); + + // Create a data source executor + let exec = DataSourceExec::from_data_source(config); + + // Collect results + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + // There should be one batch + assert_eq!(batches.len(), 1); + #[rustfmt::skip] + let expected = [ + "+-----+---+---+", + "| c | b | a |", + "+-----+---+---+", + "| 1.1 | x | 1 |", + "| 2.2 | y | 2 |", + "| 3.3 | z | 3 |", + "+-----+---+---+", + ]; + assert_batches_eq!(expected, &batches); + + // And now with a projection applied that selects (`b`, `a`) + let projection = datafusion_physical_expr::projection::ProjectionExprs::from_indices( + &[1, 2], + &table_schema, + ); + let source = Arc::new(ParquetSource::new(table_schema.clone())) + .try_pushdown_projection(&projection) + .unwrap() + .unwrap(); + let config = FileScanConfigBuilder::new(store_url, source) + .with_file(PartitionedFile::new(path, file_size)) + .with_expr_adapter(None) + .build(); + // Create a data source executor + let exec = DataSourceExec::from_data_source(config); + // Collect results + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + // There should be one batch + assert_eq!(batches.len(), 1); + #[rustfmt::skip] + let expected = [ + "+---+---+", + "| b | a |", + "+---+---+", + "| x | 1 |", + "| y | 2 |", + "| z | 3 |", + "+---+---+", + ]; + assert_batches_eq!(expected, &batches); + + Ok(()) +} + +/// Test reading a Parquet file that is missing a column specified in the table schema, which should get filled in with nulls by default. +/// We test with the file having columns (a, c) and the table schema having (a, b, c) +#[cfg(feature = "parquet")] +#[tokio::test] +async fn test_parquet_missing_column() -> Result<()> { + // Create test data with columns (a, c) + use datafusion::assert_batches_eq; + use datafusion_physical_expr::projection::ProjectionExprs; + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("c", DataType::Float64, false), + ])), + vec![ + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), + Arc::new(arrow::array::Float64Array::from(vec![1.1, 2.2, 3.3])), + ], + )?; + + let store = Arc::new(InMemory::new()) as Arc; + let store_url = ObjectStoreUrl::parse("memory://").unwrap(); + let path = "missing_column.parquet"; + write_parquet(batch.clone(), store.clone(), path).await; + + // Get the actual file size from the object store + let object_meta = store.head(&Path::from(path)).await?; + let file_size = object_meta.size; + + // Create a session context and register the object store + let ctx = SessionContext::new(); + ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + + // Create a table schema with an extra column 'b' (a, b, c) + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, false), + ])); + + // Create a ParquetSource with the table schema + let file_source = Arc::new(ParquetSource::new(table_schema.clone())); + + // Use PhysicalExprAdapterFactory to handle missing column + let config = FileScanConfigBuilder::new(store_url.clone(), file_source) + .with_file(PartitionedFile::new(path, file_size)) + .with_expr_adapter(None) + .build(); + + // Create a data source executor + let exec = DataSourceExec::from_data_source(config); + + // Collect results + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + // There should be one batch + assert_eq!(batches.len(), 1); + #[rustfmt::skip] + let expected = [ + "+---+---+-----+", + "| a | b | c |", + "+---+---+-----+", + "| 1 | | 1.1 |", + "| 2 | | 2.2 |", + "| 3 | | 3.3 |", + "+---+---+-----+", + ]; + assert_batches_eq!(expected, &batches); + + // And with a projection applied that selects (`c, `a`, `b`) + let projection = ProjectionExprs::from_indices(&[2, 0, 1], &table_schema); + let source = Arc::new(ParquetSource::new(table_schema.clone())) + .try_pushdown_projection(&projection) + .unwrap() + .unwrap(); + let config = FileScanConfigBuilder::new(store_url, source) + .with_file(PartitionedFile::new(path, file_size)) + .with_expr_adapter(None) + .build(); + // Create a data source executor + let exec = DataSourceExec::from_data_source(config); + // Collect results + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + // There should be one batch + assert_eq!(batches.len(), 1); + #[rustfmt::skip] + let expected = [ + "+-----+---+---+", + "| c | a | b |", + "+-----+---+---+", + "| 1.1 | 1 | |", + "| 2.2 | 2 | |", + "| 3.3 | 3 | |", + "+-----+---+---+", + ]; + assert_batches_eq!(expected, &batches); + + Ok(()) +} + #[cfg(feature = "parquet")] #[tokio::test] async fn test_parquet_integration_with_physical_expr_adapter() -> Result<()> { From b81b5de76148a10f0d729a7e2d12115107be254a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 2 Dec 2025 00:49:20 +0100 Subject: [PATCH 10/16] add assertion --- datafusion/datasource-parquet/src/row_filter.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 237690c281c9..059663c2318b 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -73,7 +73,7 @@ use parquet::file::metadata::ParquetMetaData; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; -use datafusion_common::Result; +use datafusion_common::{internal_datafusion_err, Result}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; @@ -234,11 +234,23 @@ impl FilterCandidateBuilder { ); // Compute the projection into the file schema by matching column names - let projection_into_file_schema: Vec = projected_table_schema + let mut projection_into_file_schema: Vec = projected_table_schema .fields() .iter() .filter_map(|f| self.file_schema.index_of(f.name()).ok()) .collect(); + // Sort and remove duplicates + let original_len = projection_into_file_schema.len(); + projection_into_file_schema.sort_unstable(); + projection_into_file_schema.dedup(); + if projection_into_file_schema.len() < original_len { + // This should not happen, as we built projected_table_schema from + // the table schema which should not have duplicate column names. + return Err(internal_datafusion_err!( + "Duplicate column names found when building filter candidate: {:?}", + projection_into_file_schema + )); + } let required_bytes = size_of_columns(&projection_into_file_schema, metadata)?; let can_use_index = columns_sorted(&projection_into_file_schema, metadata)?; From 91e261bcb62c81fb25ab4a9d73eef2e8bcd3dd8a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:46:55 +0100 Subject: [PATCH 11/16] add filter tests --- .../schema_adapter_integration_tests.rs | 81 ++++++++++++++++--- 1 file changed, 70 insertions(+), 11 deletions(-) diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index 691db464a569..f89577fe4be3 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -27,8 +27,9 @@ use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::physical_plan::{ ArrowSource, CsvSource, FileSource, JsonSource, }; +use datafusion::logical_expr::{col, lit}; use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::SessionContext; +use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::config::CsvOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ColumnStatistics, ScalarValue}; @@ -36,12 +37,21 @@ use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::schema_adapter::{ SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; + +use datafusion::assert_batches_eq; use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::TableSchema; use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_expr::Expr; use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::planner::logical2physical; +use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_optimizer::{ + filter_pushdown::FilterPushdown, OptimizerContext, PhysicalOptimizerRule, +}; +use datafusion_physical_plan::filter::FilterExec; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; @@ -214,12 +224,24 @@ impl PhysicalExprAdapter for UppercasePhysicalExprAdapter { } } -/// Test reading a Parquet file where the table schema is flipped (c, b, a) vs. the physical file schema (a, b, c) +fn push_down_filters( + plan: Arc, + filter: Expr, +) -> Result> { + let filter_expr = logical2physical(&filter, &plan.schema()); + let plan = Arc::new(FilterExec::try_new(filter_expr, plan)?); + let cfg = SessionConfig::new() + .set_str("datafusion.execution.parquet.pushdown_filters", "true"); + let optimizer_context = OptimizerContext::new(cfg); + let optimizer = FilterPushdown::new(); + optimizer.optimize_plan(plan, &optimizer_context) +} + +/// Test reading and filtering a Parquet file where the table schema is flipped (c, b, a) vs. the physical file schema (a, b, c) #[cfg(feature = "parquet")] #[tokio::test] async fn test_parquet_flipped_projection() -> Result<()> { // Create test data - use datafusion::assert_batches_eq; let batch = RecordBatch::try_new( Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), @@ -284,10 +306,7 @@ async fn test_parquet_flipped_projection() -> Result<()> { assert_batches_eq!(expected, &batches); // And now with a projection applied that selects (`b`, `a`) - let projection = datafusion_physical_expr::projection::ProjectionExprs::from_indices( - &[1, 2], - &table_schema, - ); + let projection = ProjectionExprs::from_indices(&[1, 2], &table_schema); let source = Arc::new(ParquetSource::new(table_schema.clone())) .try_pushdown_projection(&projection) .unwrap() @@ -300,7 +319,7 @@ async fn test_parquet_flipped_projection() -> Result<()> { let exec = DataSourceExec::from_data_source(config); // Collect results let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx)?; + let stream = exec.execute(0, task_ctx.clone())?; let batches = datafusion::physical_plan::common::collect(stream).await?; // There should be one batch assert_eq!(batches.len(), 1); @@ -316,6 +335,27 @@ async fn test_parquet_flipped_projection() -> Result<()> { ]; assert_batches_eq!(expected, &batches); + // And with a filter on `b`, `a` + // a = 1 or b != 'foo' and a = 3 -> matches [{a=1,b=x},{b=z,a=3}] + let filter = col("a") + .eq(lit(1)) + .or(col("b").not_eq(lit("foo")).and(col("a").eq(lit(3)))); + let exec = push_down_filters(exec, filter).unwrap(); + let stream = exec.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + // There should be one batch + assert_eq!(batches.len(), 1); + #[rustfmt::skip] + let expected = [ + "+---+---+", + "| b | a |", + "+---+---+", + "| x | 1 |", + "| z | 3 |", + "+---+---+", + ]; + assert_batches_eq!(expected, &batches); + Ok(()) } @@ -325,8 +365,6 @@ async fn test_parquet_flipped_projection() -> Result<()> { #[tokio::test] async fn test_parquet_missing_column() -> Result<()> { // Create test data with columns (a, c) - use datafusion::assert_batches_eq; - use datafusion_physical_expr::projection::ProjectionExprs; let batch = RecordBatch::try_new( Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), @@ -402,7 +440,7 @@ async fn test_parquet_missing_column() -> Result<()> { let exec = DataSourceExec::from_data_source(config); // Collect results let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx)?; + let stream = exec.execute(0, task_ctx.clone())?; let batches = datafusion::physical_plan::common::collect(stream).await?; // There should be one batch assert_eq!(batches.len(), 1); @@ -418,6 +456,27 @@ async fn test_parquet_missing_column() -> Result<()> { ]; assert_batches_eq!(expected, &batches); + // And with a filter on a, b + // a = 1 or b is null and a = 3 + let filter = col("a") + .eq(lit(1)) + .or(col("b").is_null().and(col("a").eq(lit(3)))); + let exec = push_down_filters(exec, filter).unwrap(); + let stream = exec.execute(0, task_ctx.clone())?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + // There should be one batch + assert_eq!(batches.len(), 1); + #[rustfmt::skip] + let expected = [ + "+-----+---+---+", + "| c | a | b |", + "+-----+---+---+", + "| 1.1 | 1 | |", + "| 3.3 | 3 | |", + "+-----+---+---+", + ]; + assert_batches_eq!(expected, &batches); + Ok(()) } From ba36483f74969f96b3e4e447c8cce1c8be911c69 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:48:15 +0100 Subject: [PATCH 12/16] lint --- .../schema_adapter/schema_adapter_integration_tests.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index f89577fe4be3..190a9182e20b 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -226,7 +226,7 @@ impl PhysicalExprAdapter for UppercasePhysicalExprAdapter { fn push_down_filters( plan: Arc, - filter: Expr, + filter: &Expr, ) -> Result> { let filter_expr = logical2physical(&filter, &plan.schema()); let plan = Arc::new(FilterExec::try_new(filter_expr, plan)?); @@ -340,7 +340,7 @@ async fn test_parquet_flipped_projection() -> Result<()> { let filter = col("a") .eq(lit(1)) .or(col("b").not_eq(lit("foo")).and(col("a").eq(lit(3)))); - let exec = push_down_filters(exec, filter).unwrap(); + let exec = push_down_filters(exec, &filter).unwrap(); let stream = exec.execute(0, task_ctx)?; let batches = datafusion::physical_plan::common::collect(stream).await?; // There should be one batch @@ -461,7 +461,7 @@ async fn test_parquet_missing_column() -> Result<()> { let filter = col("a") .eq(lit(1)) .or(col("b").is_null().and(col("a").eq(lit(3)))); - let exec = push_down_filters(exec, filter).unwrap(); + let exec = push_down_filters(exec, &filter).unwrap(); let stream = exec.execute(0, task_ctx.clone())?; let batches = datafusion::physical_plan::common::collect(stream).await?; // There should be one batch From 54a8dbda2c67a8fa994b63539ad2ddf394daaafe Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:51:35 +0100 Subject: [PATCH 13/16] a bit more test --- .../schema_adapter/schema_adapter_integration_tests.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index 190a9182e20b..c5144c70aaa0 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -477,6 +477,14 @@ async fn test_parquet_missing_column() -> Result<()> { ]; assert_batches_eq!(expected, &batches); + // Filter `b is not null or a = 24` doesn't match any rows + let filter = col("b").is_not_null().or(col("a").eq(lit(24))); + let exec = push_down_filters(exec, &filter).unwrap(); + let stream = exec.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + // There should be zero batches + assert_eq!(batches.len(), 0); + Ok(()) } From c2eb2dcf915563a4710a5c1766ad43d0163f8791 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 3 Dec 2025 18:07:41 +0100 Subject: [PATCH 14/16] lint --- .../tests/schema_adapter/schema_adapter_integration_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index c5144c70aaa0..7b8f8c3daeb4 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -228,7 +228,7 @@ fn push_down_filters( plan: Arc, filter: &Expr, ) -> Result> { - let filter_expr = logical2physical(&filter, &plan.schema()); + let filter_expr = logical2physical(filter, &plan.schema()); let plan = Arc::new(FilterExec::try_new(filter_expr, plan)?); let cfg = SessionConfig::new() .set_str("datafusion.execution.parquet.pushdown_filters", "true"); From 3cd7c84b11a94c3fd27884249b37ce4986908c20 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:41:06 +0100 Subject: [PATCH 15/16] refactor into test runner, add stats based tests --- .../schema_adapter_integration_tests.rs | 423 +++++++++++------- 1 file changed, 263 insertions(+), 160 deletions(-) diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index 7b8f8c3daeb4..f0d09d713410 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -18,9 +18,11 @@ use std::sync::Arc; use arrow::array::RecordBatch; + use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion::common::Result; +use datafusion::config::{ConfigOptions, TableParquetOptions}; use datafusion::datasource::listing::PartitionedFile; #[cfg(feature = "parquet")] use datafusion::datasource::physical_plan::ParquetSource; @@ -29,8 +31,9 @@ use datafusion::datasource::physical_plan::{ }; use datafusion::logical_expr::{col, lit}; use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion::prelude::SessionContext; use datafusion_common::config::CsvOptions; +use datafusion_common::record_batch; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ColumnStatistics, ScalarValue}; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; @@ -48,22 +51,33 @@ use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_optimizer::{ - filter_pushdown::FilterPushdown, OptimizerContext, PhysicalOptimizerRule, -}; -use datafusion_physical_plan::filter::FilterExec; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; async fn write_parquet(batch: RecordBatch, store: Arc, path: &str) { + write_batches_to_parquet(&[batch], store, path).await; +} + +/// Write RecordBatches to a Parquet file with each batch in its own row group. +async fn write_batches_to_parquet( + batches: &[RecordBatch], + store: Arc, + path: &str, +) -> usize { let mut out = BytesMut::new().writer(); { - let mut writer = ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); - writer.write(&batch).unwrap(); + let mut writer = + ArrowWriter::try_new(&mut out, batches[0].schema(), None).unwrap(); + for batch in batches { + writer.write(batch).unwrap(); + writer.flush().unwrap(); + } writer.finish().unwrap(); } let data = out.into_inner().freeze(); + let file_size = data.len(); store.put(&Path::from(path), data.into()).await.unwrap(); + file_size } /// A schema adapter factory that transforms column names to uppercase @@ -224,49 +238,99 @@ impl PhysicalExprAdapter for UppercasePhysicalExprAdapter { } } -fn push_down_filters( - plan: Arc, - filter: &Expr, -) -> Result> { - let filter_expr = logical2physical(filter, &plan.schema()); - let plan = Arc::new(FilterExec::try_new(filter_expr, plan)?); - let cfg = SessionConfig::new() - .set_str("datafusion.execution.parquet.pushdown_filters", "true"); - let optimizer_context = OptimizerContext::new(cfg); - let optimizer = FilterPushdown::new(); - optimizer.optimize_plan(plan, &optimizer_context) +#[derive(Clone)] +struct ParquetTestCase { + table_schema: TableSchema, + batches: Vec, + predicate: Option, + projection: Option, + push_down_filters: bool, +} + +impl ParquetTestCase { + fn new(table_schema: TableSchema, batches: Vec) -> Self { + Self { + table_schema, + batches, + predicate: None, + projection: None, + push_down_filters: true, + } + } + + fn push_down_filters(mut self, pushdown_filters: bool) -> Self { + self.push_down_filters = pushdown_filters; + self + } + + fn with_predicate(mut self, predicate: Expr) -> Self { + self.predicate = Some(predicate); + self + } + + fn with_projection(mut self, projection: ProjectionExprs) -> Self { + self.projection = Some(projection); + self + } + + async fn execute(self) -> Result> { + let store = Arc::new(InMemory::new()) as Arc; + let store_url = ObjectStoreUrl::parse("memory://").unwrap(); + let path = "test.parquet"; + let file_size = + write_batches_to_parquet(&self.batches, store.clone(), path).await; + + let ctx = SessionContext::new(); + ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + + let mut table_options = TableParquetOptions::default(); + // controlled via ConfigOptions flag; ParquetSources ORs them so if either is true then pushdown is enabled + table_options.global.pushdown_filters = false; + let mut file_source = Arc::new( + ParquetSource::new(self.table_schema.table_schema().clone()) + .with_table_parquet_options(table_options), + ) as Arc; + + if let Some(projection) = self.projection { + file_source = file_source.try_pushdown_projection(&projection)?.unwrap(); + } + + if let Some(predicate) = &self.predicate { + let filter_expr = + logical2physical(predicate, self.table_schema.table_schema()); + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = self.push_down_filters; + let result = file_source.try_pushdown_filters(vec![filter_expr], &config)?; + file_source = result.updated_node.unwrap(); + } + + let config = FileScanConfigBuilder::new(store_url.clone(), file_source) + .with_file(PartitionedFile::new(path, file_size as u64)) // size 0 for test + .with_expr_adapter(None) + .build(); + + let exec = DataSourceExec::from_data_source(config); + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx)?; + datafusion::physical_plan::common::collect(stream).await + } } /// Test reading and filtering a Parquet file where the table schema is flipped (c, b, a) vs. the physical file schema (a, b, c) -#[cfg(feature = "parquet")] #[tokio::test] +#[cfg(feature = "parquet")] async fn test_parquet_flipped_projection() -> Result<()> { - // Create test data - let batch = RecordBatch::try_new( - Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, false), - ])), - vec![ - Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), - Arc::new(arrow::array::StringArray::from(vec!["x", "y", "z"])), - Arc::new(arrow::array::Float64Array::from(vec![1.1, 2.2, 3.3])), - ], + // Create test data with columns (a, b, c) - the file schema + let batch1 = record_batch!( + ("a", Int32, vec![1, 2]), + ("b", Utf8, vec!["x", "y"]), + ("c", Float64, vec![1.1, 2.2]) + )?; + let batch2 = record_batch!( + ("a", Int32, vec![3]), + ("b", Utf8, vec!["z"]), + ("c", Float64, vec![3.3]) )?; - - let store = Arc::new(InMemory::new()) as Arc; - let store_url = ObjectStoreUrl::parse("memory://").unwrap(); - let path = "flipped.parquet"; - write_parquet(batch.clone(), store.clone(), path).await; - - // Get the actual file size from the object store - let object_meta = store.head(&Path::from(path)).await?; - let file_size = object_meta.size; - - // Create a session context and register the object store - let ctx = SessionContext::new(); - ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); // Create a table schema with flipped column order (c, b, a) let table_schema = Arc::new(Schema::new(vec![ @@ -274,25 +338,12 @@ async fn test_parquet_flipped_projection() -> Result<()> { Field::new("b", DataType::Utf8, true), Field::new("a", DataType::Int32, false), ])); + let table_schema = TableSchema::from_file_schema(table_schema); - // Create a ParquetSource with the table schema - let file_source = Arc::new(ParquetSource::new(table_schema.clone())); + let test_case = ParquetTestCase::new(table_schema.clone(), vec![batch1, batch2]); - // Use PhysicalExprAdapterFactory to map flipped columns - let config = FileScanConfigBuilder::new(store_url.clone(), file_source) - .with_file(PartitionedFile::new(path, file_size)) - .with_expr_adapter(None) - .build(); - - // Create a data source executor - let exec = DataSourceExec::from_data_source(config); - - // Collect results - let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx)?; - let batches = datafusion::physical_plan::common::collect(stream).await?; - // There should be one batch - assert_eq!(batches.len(), 1); + // Test reading with flipped schema + let batches = test_case.clone().execute().await?; #[rustfmt::skip] let expected = [ "+-----+---+---+", @@ -305,24 +356,13 @@ async fn test_parquet_flipped_projection() -> Result<()> { ]; assert_batches_eq!(expected, &batches); - // And now with a projection applied that selects (`b`, `a`) - let projection = ProjectionExprs::from_indices(&[1, 2], &table_schema); - let source = Arc::new(ParquetSource::new(table_schema.clone())) - .try_pushdown_projection(&projection) - .unwrap() - .unwrap(); - let config = FileScanConfigBuilder::new(store_url, source) - .with_file(PartitionedFile::new(path, file_size)) - .with_expr_adapter(None) - .build(); - // Create a data source executor - let exec = DataSourceExec::from_data_source(config); - // Collect results - let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx.clone())?; - let batches = datafusion::physical_plan::common::collect(stream).await?; - // There should be one batch - assert_eq!(batches.len(), 1); + // Test with a projection that selects (b, a) + let projection = ProjectionExprs::from_indices(&[1, 2], table_schema.table_schema()); + let batches = test_case + .clone() + .with_projection(projection.clone()) + .execute() + .await?; #[rustfmt::skip] let expected = [ "+---+---+", @@ -335,85 +375,108 @@ async fn test_parquet_flipped_projection() -> Result<()> { ]; assert_batches_eq!(expected, &batches); - // And with a filter on `b`, `a` + // Test with a filter on b, a // a = 1 or b != 'foo' and a = 3 -> matches [{a=1,b=x},{b=z,a=3}] let filter = col("a") .eq(lit(1)) .or(col("b").not_eq(lit("foo")).and(col("a").eq(lit(3)))); - let exec = push_down_filters(exec, &filter).unwrap(); - let stream = exec.execute(0, task_ctx)?; - let batches = datafusion::physical_plan::common::collect(stream).await?; - // There should be one batch - assert_eq!(batches.len(), 1); + let batches = test_case + .clone() + .with_projection(projection.clone()) + .with_predicate(filter.clone()) + .execute() + .await?; + #[rustfmt::skip] + let expected = [ + "+---+---+", + "| b | a |", + "+---+---+", + "| x | 1 |", + "| z | 3 |", + "+---+---+", + ]; + assert_batches_eq!(expected, &batches); + + // Test with only statistics-based filter pushdown (no row-level filtering) + // Since we have 2 row groups and the filter matches rows in both, stats pruning alone won't filter any + let batches = test_case + .clone() + .with_projection(projection) + .with_predicate(filter) + .push_down_filters(false) + .execute() + .await?; #[rustfmt::skip] let expected = [ "+---+---+", "| b | a |", "+---+---+", "| x | 1 |", + "| y | 2 |", "| z | 3 |", "+---+---+", ]; assert_batches_eq!(expected, &batches); + // Test with a filter that can prune via statistics: a > 10 (no rows match) + let filter = col("a").gt(lit(10)); + let batches = test_case + .clone() + .with_predicate(filter) + .push_down_filters(false) + .execute() + .await?; + // Stats show a has max=3, so a > 10 prunes all row groups + assert_eq!(batches.len(), 0); + + // With a filter that matches only the first row group: a < 3 + let filter = col("a").lt(lit(3)); + let batches = test_case + .clone() + .with_predicate(filter) + .push_down_filters(false) + .execute() + .await?; + #[rustfmt::skip] + let expected = [ + "+-----+---+---+", + "| c | b | a |", + "+-----+---+---+", + "| 1.1 | x | 1 |", + "| 2.2 | y | 2 |", + "+-----+---+---+", + ]; + assert_batches_eq!(expected, &batches); + Ok(()) } /// Test reading a Parquet file that is missing a column specified in the table schema, which should get filled in with nulls by default. /// We test with the file having columns (a, c) and the table schema having (a, b, c) -#[cfg(feature = "parquet")] #[tokio::test] +#[cfg(feature = "parquet")] async fn test_parquet_missing_column() -> Result<()> { - // Create test data with columns (a, c) - let batch = RecordBatch::try_new( - Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("c", DataType::Float64, false), - ])), - vec![ - Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), - Arc::new(arrow::array::Float64Array::from(vec![1.1, 2.2, 3.3])), - ], - )?; - - let store = Arc::new(InMemory::new()) as Arc; - let store_url = ObjectStoreUrl::parse("memory://").unwrap(); - let path = "missing_column.parquet"; - write_parquet(batch.clone(), store.clone(), path).await; - - // Get the actual file size from the object store - let object_meta = store.head(&Path::from(path)).await?; - let file_size = object_meta.size; - - // Create a session context and register the object store - let ctx = SessionContext::new(); - ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + // Create test data with columns (a, c) as 2 batches + // | a | c | + // |---|-----| + // | 1 | 1.1 | + // | 2 | 2.2 | + // | ~ | ~~~ | + // | 3 | 3.3 | + let batch1 = record_batch!(("a", Int32, vec![1, 2]), ("c", Float64, vec![1.1, 2.2]))?; + let batch2 = record_batch!(("a", Int32, vec![3]), ("c", Float64, vec![3.3]))?; // Create a table schema with an extra column 'b' (a, b, c) - let table_schema = Arc::new(Schema::new(vec![ + let logical_file_schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, false), ])); + let table_schema = TableSchema::from_file_schema(logical_file_schema.clone()); - // Create a ParquetSource with the table schema - let file_source = Arc::new(ParquetSource::new(table_schema.clone())); - - // Use PhysicalExprAdapterFactory to handle missing column - let config = FileScanConfigBuilder::new(store_url.clone(), file_source) - .with_file(PartitionedFile::new(path, file_size)) - .with_expr_adapter(None) - .build(); - - // Create a data source executor - let exec = DataSourceExec::from_data_source(config); + let test_case = ParquetTestCase::new(table_schema.clone(), vec![batch1, batch2]); - // Collect results - let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx)?; - let batches = datafusion::physical_plan::common::collect(stream).await?; - // There should be one batch - assert_eq!(batches.len(), 1); + let batches = test_case.clone().execute().await?; #[rustfmt::skip] let expected = [ "+---+---+-----+", @@ -427,23 +490,13 @@ async fn test_parquet_missing_column() -> Result<()> { assert_batches_eq!(expected, &batches); // And with a projection applied that selects (`c, `a`, `b`) - let projection = ProjectionExprs::from_indices(&[2, 0, 1], &table_schema); - let source = Arc::new(ParquetSource::new(table_schema.clone())) - .try_pushdown_projection(&projection) - .unwrap() - .unwrap(); - let config = FileScanConfigBuilder::new(store_url, source) - .with_file(PartitionedFile::new(path, file_size)) - .with_expr_adapter(None) - .build(); - // Create a data source executor - let exec = DataSourceExec::from_data_source(config); - // Collect results - let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx.clone())?; - let batches = datafusion::physical_plan::common::collect(stream).await?; - // There should be one batch - assert_eq!(batches.len(), 1); + let projection = + ProjectionExprs::from_indices(&[2, 0, 1], table_schema.table_schema()); + let batches = test_case + .clone() + .with_projection(projection) + .execute() + .await?; #[rustfmt::skip] let expected = [ "+-----+---+---+", @@ -461,35 +514,85 @@ async fn test_parquet_missing_column() -> Result<()> { let filter = col("a") .eq(lit(1)) .or(col("b").is_null().and(col("a").eq(lit(3)))); - let exec = push_down_filters(exec, &filter).unwrap(); - let stream = exec.execute(0, task_ctx.clone())?; - let batches = datafusion::physical_plan::common::collect(stream).await?; - // There should be one batch - assert_eq!(batches.len(), 1); + let batches = test_case + .clone() + .with_predicate(filter.clone()) + .execute() + .await?; #[rustfmt::skip] let expected = [ - "+-----+---+---+", - "| c | a | b |", - "+-----+---+---+", - "| 1.1 | 1 | |", - "| 3.3 | 3 | |", - "+-----+---+---+", + "+---+---+-----+", + "| a | b | c |", + "+---+---+-----+", + "| 1 | | 1.1 |", + "| 3 | | 3.3 |", + "+---+---+-----+", + ]; + assert_batches_eq!(expected, &batches); + // With only statistics-based filter pushdown + let batches = test_case + .clone() + .with_predicate(filter) + .push_down_filters(false) + .execute() + .await?; + #[rustfmt::skip] + let expected = [ + "+---+---+-----+", + "| a | b | c |", + "+---+---+-----+", + "| 1 | | 1.1 |", + "| 2 | | 2.2 |", + "| 3 | | 3.3 |", + "+---+---+-----+", ]; assert_batches_eq!(expected, &batches); // Filter `b is not null or a = 24` doesn't match any rows let filter = col("b").is_not_null().or(col("a").eq(lit(24))); - let exec = push_down_filters(exec, &filter).unwrap(); - let stream = exec.execute(0, task_ctx)?; - let batches = datafusion::physical_plan::common::collect(stream).await?; + let batches = test_case + .clone() + .with_predicate(filter.clone()) + .execute() + .await?; + // There should be zero batches + assert_eq!(batches.len(), 0); + // With only statistics-based filter pushdown + let batches = test_case + .clone() + .with_predicate(filter) + .push_down_filters(false) + .execute() + .await?; + // There will be data: the filter is (null) is not null or a = 24. + // Statistics pruning doesn't handle `null is not null` so it resolves to `true or a = 24` -> `true` so no row groups are pruned + #[rustfmt::skip] + let expected = [ + "+---+---+-----+", + "| a | b | c |", + "+---+---+-----+", + "| 1 | | 1.1 |", + "| 2 | | 2.2 |", + "| 3 | | 3.3 |", + "+---+---+-----+", + ]; + assert_batches_eq!(expected, &batches); + // On the other hand the filter `b = 'foo' and a = 24` should prune all data even with only statistics-based pushdown + let filter = col("b").eq(lit("foo")).and(col("a").eq(lit(24))); + let batches = test_case + .clone() + .with_predicate(filter) + .push_down_filters(false) + .execute() + .await?; // There should be zero batches assert_eq!(batches.len(), 0); Ok(()) } -#[cfg(feature = "parquet")] #[tokio::test] +#[cfg(feature = "parquet")] async fn test_parquet_integration_with_physical_expr_adapter() -> Result<()> { // Create test data let batch = RecordBatch::try_new( From 24713b6ada90cc27deaab675b39c947f49b810e3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Dec 2025 01:46:52 +0100 Subject: [PATCH 16/16] Address feedback --- .../src/datasource/physical_plan/parquet.rs | 2 +- .../core/tests/parquet/schema_adapter.rs | 20 +++- datafusion/datasource-parquet/src/opener.rs | 34 +------ .../physical-expr/src/expressions/cast.rs | 96 +------------------ datafusion/physical-expr/src/projection.rs | 43 +++++++++ .../sqllogictest/test_files/timestamps.slt | 70 ++++++++++++++ 6 files changed, 138 insertions(+), 127 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index aefbfe95cd20..6ed01cde14a3 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1257,7 +1257,7 @@ mod tests { ("c3", c3.clone()), ]); - // batch2: c3(date64), c2(int64), c1(string), c4(date64) + // batch2: c3(date64), c2(int64), c1(string) let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]); let table_schema = Schema::new(vec![ diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 581bb21d0cb1..541785319cfd 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -51,7 +51,10 @@ async fn write_parquet(batch: RecordBatch, store: Arc, path: &s store.put(&Path::from(path), data.into()).await.unwrap(); } -// Implement a custom PhysicalExprAdapterFactory that fills in missing columns with the default value for the field type +// Implement a custom PhysicalExprAdapterFactory that fills in missing columns with +// the default value for the field type: +// - Int64 columns are filled with `1` +// - Utf8 columns are filled with `'b'` #[derive(Debug)] struct CustomPhysicalExprAdapterFactory; @@ -302,8 +305,15 @@ async fn test_physical_expr_adapter_with_non_null_defaults() { .await .unwrap(); + #[rustfmt::skip] let expected = [ - "+----+", "| c1 |", "+----+", "| 10 |", "| 20 |", "| 30 |", "+----+", + "+----+", + "| c1 |", + "+----+", + "| 10 |", + "| 20 |", + "| 30 |", + "+----+", ]; assert_batches_eq!(expected, &batches); @@ -316,6 +326,10 @@ async fn test_physical_expr_adapter_with_non_null_defaults() { .await .unwrap(); - let expected = ["++", "++"]; + #[rustfmt::skip] + let expected = [ + "++", + "++", + ]; assert_batches_eq!(expected, &batches); } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index b20f7d14e367..b84d895e11f9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -26,7 +26,7 @@ use crate::{ use arrow::array::RecordBatch; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; -use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; +use datafusion_physical_expr::utils::reassign_expr_columns; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -329,25 +329,9 @@ impl FileOpener for ParquetOpener { .zip(partitioned_file.partition_values.clone()) .collect_vec(), ); - let projection_expressions = projection - .as_ref() - .iter() - .cloned() - .map(|mut proj| { - proj.expr = adapter.rewrite(Arc::clone(&proj.expr))?; - Ok(proj) - }) - .collect::>>()?; - projection = ProjectionExprs::new(projection_expressions); + projection = projection.try_map_exprs(|expr| adapter.rewrite(expr))?; } - let indices = projection - .as_ref() - .iter() - .flat_map(|p| collect_columns(&p.expr)) - .map(|c| c.index()) - .sorted_unstable() - .unique() - .collect_vec(); + let indices = projection.column_indices(); let mask = ProjectionMask::roots(builder.parquet_schema(), indices); @@ -491,16 +475,8 @@ impl FileOpener for ParquetOpener { // Rebase column indices to match the narrowed stream schema. // The projection expressions have indices based on physical_file_schema, // but the stream only contains the columns selected by the ProjectionMask. - let rebased_exprs = projection - .as_ref() - .iter() - .cloned() - .map(|mut proj| { - proj.expr = reassign_expr_columns(proj.expr, &stream_schema)?; - Ok(proj) - }) - .collect::>>()?; - let projection = ProjectionExprs::new(rebased_exprs); + let projection = projection + .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; let projector = projection.make_projector(&stream_schema)?; diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 9bf242fccfc0..a368aafbc62d 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -740,100 +740,8 @@ mod tests { Ok(()) } - #[test] - fn test_cast_timestamp_with_timezone_to_timestamp() -> Result<()> { - // Test casting from Timestamp(Nanosecond, Some("UTC")) to Timestamp(Nanosecond, None) - let schema = Schema::new(vec![Field::new( - "a", - Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), - true, - )]); - let a = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("UTC"); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; - - let expression = cast_with_options( - col("a", &schema)?, - &schema, - Timestamp(TimeUnit::Nanosecond, None), - None, - )?; - - // verify that the expression's type is correct - assert_eq!( - expression.data_type(&schema)?, - Timestamp(TimeUnit::Nanosecond, None) - ); - - // compute - let result = expression - .evaluate(&batch)? - .into_array(batch.num_rows()) - .expect("Failed to convert to array"); - - // verify that the array's data_type is correct - assert_eq!(*result.data_type(), Timestamp(TimeUnit::Nanosecond, None)); - - // verify that the data itself is downcastable and correct - let result = result - .as_any() - .downcast_ref::() - .expect("failed to downcast"); - - for (i, expected) in [1_i64, 2, 3, 4, 5].iter().enumerate() { - assert_eq!(result.value(i), *expected); - } - - Ok(()) - } - - #[test] - fn test_cast_timestamp_to_timestamp_with_timezone() -> Result<()> { - // Test casting from Timestamp(Nanosecond, None) to Timestamp(Nanosecond, Some("UTC")) - let schema = Schema::new(vec![Field::new( - "a", - Timestamp(TimeUnit::Nanosecond, None), - true, - )]); - let a = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; - - let expression = cast_with_options( - col("a", &schema)?, - &schema, - Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), - None, - )?; - - // verify that the expression's type is correct - assert_eq!( - expression.data_type(&schema)?, - Timestamp(TimeUnit::Nanosecond, Some("UTC".into())) - ); - - // compute - let result = expression - .evaluate(&batch)? - .into_array(batch.num_rows()) - .expect("Failed to convert to array"); - - // verify that the array's data_type is correct - assert_eq!( - *result.data_type(), - Timestamp(TimeUnit::Nanosecond, Some("UTC".into())) - ); - - // verify that the data itself is downcastable and correct - let result = result - .as_any() - .downcast_ref::() - .expect("failed to downcast"); - - for (i, expected) in [1_i64, 2, 3, 4, 5].iter().enumerate() { - assert_eq!(result.value(i), *expected); - } - - Ok(()) - } + // Tests for timestamp timezone casting have been moved to timestamps.slt + // See the "Casting between timestamp with and without timezone" section #[test] fn invalid_cast() { diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 3d6740510bec..4688ac0e1ba2 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -240,6 +240,49 @@ impl ProjectionExprs { self.exprs.iter().map(|e| Arc::clone(&e.expr)) } + /// Apply a fallible transformation to the [`PhysicalExpr`] of each projection. + /// + /// This method transforms the expression in each [`ProjectionExpr`] while preserving + /// the alias. This is useful for rewriting expressions, such as when adapting + /// expressions to a different schema. + /// + /// # Example + /// + /// ```rust + /// use std::sync::Arc; + /// use arrow::datatypes::{DataType, Field, Schema}; + /// use datafusion_common::Result; + /// use datafusion_physical_expr::expressions::Column; + /// use datafusion_physical_expr::projection::ProjectionExprs; + /// use datafusion_physical_expr::PhysicalExpr; + /// + /// // Create a schema and projection + /// let schema = Arc::new(Schema::new(vec![ + /// Field::new("a", DataType::Int32, false), + /// Field::new("b", DataType::Int32, false), + /// ])); + /// let projection = ProjectionExprs::from_indices(&[0, 1], &schema); + /// + /// // Transform each expression (this example just clones them) + /// let transformed = projection.try_map_exprs(|expr| Ok(expr))?; + /// assert_eq!(transformed.as_ref().len(), 2); + /// # Ok::<(), datafusion_common::DataFusionError>(()) + /// ``` + pub fn try_map_exprs(self, mut f: F) -> Result + where + F: FnMut(Arc) -> Result>, + { + let exprs = self + .exprs + .into_iter() + .map(|mut proj| { + proj.expr = f(proj.expr)?; + Ok(proj) + }) + .collect::>>()?; + Ok(Self::new(exprs)) + } + /// Apply another projection on top of this projection, returning the combined projection. /// For example, if this projection is `SELECT c@2 AS x, b@1 AS y, a@0 as z` and the other projection is `SELECT x@0 + 1 AS c1, y@1 + z@2 as c2`, /// we return a projection equivalent to `SELECT c@2 + 1 AS c1, b@1 + a@0 as c2`. diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 3a1a26257b17..3bf11c92e2fb 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -3702,6 +3702,76 @@ FROM ts_data_micros_kolkata 2020-09-08T18:12:29.190+05:30 2020-09-08T17:12:29.190+05:30 + +########## +## Casting between timestamp with and without timezone +########## + +# Test casting from Timestamp(Nanosecond, Some("UTC")) to Timestamp(Nanosecond, None) +# Verifies that the underlying nanosecond values are preserved when removing timezone + +# Verify input type +query T +SELECT arrow_typeof(arrow_cast(1, 'Timestamp(Nanosecond, Some("UTC"))')); +---- +Timestamp(ns, "UTC") + +# Verify output type after casting +query T +SELECT arrow_typeof(arrow_cast(arrow_cast(1, 'Timestamp(Nanosecond, Some("UTC"))'), 'Timestamp(Nanosecond, None)')); +---- +Timestamp(ns) + +# Verify values are preserved when casting from timestamp with timezone to timestamp without timezone +query P rowsort +SELECT arrow_cast(column1, 'Timestamp(Nanosecond, None)') +FROM (VALUES + (arrow_cast(1, 'Timestamp(Nanosecond, Some("UTC"))')), + (arrow_cast(2, 'Timestamp(Nanosecond, Some("UTC"))')), + (arrow_cast(3, 'Timestamp(Nanosecond, Some("UTC"))')), + (arrow_cast(4, 'Timestamp(Nanosecond, Some("UTC"))')), + (arrow_cast(5, 'Timestamp(Nanosecond, Some("UTC"))')) +) t; +---- +1970-01-01T00:00:00.000000001 +1970-01-01T00:00:00.000000002 +1970-01-01T00:00:00.000000003 +1970-01-01T00:00:00.000000004 +1970-01-01T00:00:00.000000005 + +# Test casting from Timestamp(Nanosecond, None) to Timestamp(Nanosecond, Some("UTC")) +# Verifies that the underlying nanosecond values are preserved when adding timezone + +# Verify input type +query T +SELECT arrow_typeof(arrow_cast(1, 'Timestamp(Nanosecond, None)')); +---- +Timestamp(ns) + +# Verify output type after casting +query T +SELECT arrow_typeof(arrow_cast(arrow_cast(1, 'Timestamp(Nanosecond, None)'), 'Timestamp(Nanosecond, Some("UTC"))')); +---- +Timestamp(ns, "UTC") + +# Verify values are preserved when casting from timestamp without timezone to timestamp with timezone +query P rowsort +SELECT arrow_cast(column1, 'Timestamp(Nanosecond, Some("UTC"))') +FROM (VALUES + (arrow_cast(1, 'Timestamp(Nanosecond, None)')), + (arrow_cast(2, 'Timestamp(Nanosecond, None)')), + (arrow_cast(3, 'Timestamp(Nanosecond, None)')), + (arrow_cast(4, 'Timestamp(Nanosecond, None)')), + (arrow_cast(5, 'Timestamp(Nanosecond, None)')) +) t; +---- +1970-01-01T00:00:00.000000001Z +1970-01-01T00:00:00.000000002Z +1970-01-01T00:00:00.000000003Z +1970-01-01T00:00:00.000000004Z +1970-01-01T00:00:00.000000005Z + + ########## ## Common timestamp data ##########