From 406722ba7bfc832834de86983c2a7d47300e7c24 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 22 Nov 2025 20:45:22 +0800 Subject: [PATCH 1/2] wip struct field pushdown parquet need to figure out what to do with SchemaAdapter; there's a couple failing tests Should we do this transformation before or after ExprAdapter? --- Cargo.lock | 1 + datafusion/core/src/datasource/mod.rs | 165 +------------ datafusion/datasource-parquet/Cargo.toml | 1 + datafusion/datasource-parquet/src/opener.rs | 253 +++++++++++++++++--- datafusion/datasource-parquet/src/source.rs | 26 +- 5 files changed, 235 insertions(+), 211 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e2827580ea5..ab81d1beab3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2156,6 +2156,7 @@ dependencies = [ "datafusion-datasource", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "datafusion-functions-aggregate-common", "datafusion-physical-expr", "datafusion-physical-expr-adapter", diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 620e389a0fb8..6f230f461f23 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -51,101 +51,14 @@ pub use datafusion_physical_expr::create_ordering; #[cfg(all(test, feature = "parquet"))] mod tests { - - use crate::prelude::SessionContext; - use ::object_store::{path::Path, ObjectMeta}; use arrow::{ - array::{Int32Array, StringArray}, - datatypes::{DataType, Field, Schema, SchemaRef}, - record_batch::RecordBatch, + datatypes::{DataType, Field, Schema}, }; - use datafusion_common::{record_batch, test_util::batches_to_sort_string}; + use datafusion_common::record_batch; use datafusion_datasource::{ - file::FileSource, - file_scan_config::FileScanConfigBuilder, - schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, - SchemaMapper, - }, - source::DataSourceExec, - PartitionedFile, + schema_adapter::DefaultSchemaAdapterFactory, }; - use datafusion_datasource_parquet::source::ParquetSource; - use datafusion_physical_plan::collect; - use std::{fs, sync::Arc}; - use tempfile::TempDir; - - #[tokio::test] - async fn can_override_schema_adapter() { - // Test shows that SchemaAdapter can add a column that doesn't existing in the - // record batches returned from parquet. This can be useful for schema evolution - // where older files may not have all columns. - - use datafusion_execution::object_store::ObjectStoreUrl; - let tmp_dir = TempDir::new().unwrap(); - let table_dir = tmp_dir.path().join("parquet_test"); - fs::DirBuilder::new().create(table_dir.as_path()).unwrap(); - let f1 = Field::new("id", DataType::Int32, true); - - let file_schema = Arc::new(Schema::new(vec![f1.clone()])); - let filename = "part.parquet".to_string(); - let path = table_dir.as_path().join(filename.clone()); - let file = fs::File::create(path.clone()).unwrap(); - let mut writer = - parquet::arrow::ArrowWriter::try_new(file, file_schema.clone(), None) - .unwrap(); - - let ids = Arc::new(Int32Array::from(vec![1i32])); - let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap(); - - writer.write(&rec_batch).unwrap(); - writer.close().unwrap(); - - let location = Path::parse(path.to_str().unwrap()).unwrap(); - let metadata = fs::metadata(path.as_path()).expect("Local file metadata"); - let meta = ObjectMeta { - location, - last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), - size: metadata.len(), - e_tag: None, - version: None, - }; - - let partitioned_file = PartitionedFile { - object_meta: meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - }; - - let f1 = Field::new("id", DataType::Int32, true); - let f2 = Field::new("extra_column", DataType::Utf8, true); - - let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); - let source = ParquetSource::new(Arc::clone(&schema)) - .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})) - .unwrap(); - let base_conf = - FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source) - .with_file(partitioned_file) - .build(); - - let parquet_exec = DataSourceExec::from_data_source(base_conf); - - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let read = collect(parquet_exec, task_ctx).await.unwrap(); - - insta::assert_snapshot!(batches_to_sort_string(&read),@r###" - +----+--------------+ - | id | extra_column | - +----+--------------+ - | 1 | foo | - +----+--------------+ - "###); - } + use std::sync::Arc; #[test] fn default_schema_adapter() { @@ -198,74 +111,4 @@ mod tests { let err = mapper.map_batch(file_batch).unwrap_err().to_string(); assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}"); } - - #[derive(Debug)] - struct TestSchemaAdapterFactory; - - impl SchemaAdapterFactory for TestSchemaAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(TestSchemaAdapter { - table_schema: projected_table_schema, - }) - } - } - - struct TestSchemaAdapter { - /// Schema for the table - table_schema: SchemaRef, - } - - impl SchemaAdapter for TestSchemaAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.table_schema.field(index); - Some(file_schema.fields.find(field.name())?.0) - } - - fn map_schema( - &self, - file_schema: &Schema, - ) -> datafusion_common::Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if self.table_schema.fields().find(file_field.name()).is_some() { - projection.push(file_idx); - } - } - - Ok((Arc::new(TestSchemaMapping {}), projection)) - } - } - - #[derive(Debug)] - struct TestSchemaMapping {} - - impl SchemaMapper for TestSchemaMapping { - fn map_batch( - &self, - batch: RecordBatch, - ) -> datafusion_common::Result { - let f1 = Field::new("id", DataType::Int32, true); - let f2 = Field::new("extra_column", DataType::Utf8, true); - - let schema = Arc::new(Schema::new(vec![f1, f2])); - - let extra_column = Arc::new(StringArray::from(vec!["foo"])); - let mut new_columns = batch.columns().to_vec(); - new_columns.push(extra_column); - - Ok(RecordBatch::try_new(schema, new_columns).unwrap()) - } - - fn map_column_statistics( - &self, - _file_col_statistics: &[datafusion_common::ColumnStatistics], - ) -> datafusion_common::Result> { - unimplemented!() - } - } } diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index a5f6f56ac6f3..5f816c17acf8 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -39,6 +39,7 @@ datafusion-common-runtime = { workspace = true } datafusion-datasource = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-adapter = { workspace = true } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 3c905d950a96..35371be2b17d 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,16 +24,28 @@ use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, }; use arrow::array::RecordBatch; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use datafusion_physical_expr::expressions::{Column, Literal}; +use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; +use datafusion_physical_expr::ScalarFunctionExpr; +use parquet::schema::types::{ColumnPath, SchemaDescriptor}; +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::hash::Hash; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit}; +use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; -use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_common::{ + exec_err, internal_datafusion_err, DataFusionError, Result, ScalarValue, +}; use datafusion_datasource::PartitionedFile; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; @@ -62,8 +74,12 @@ use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; pub(super) struct ParquetOpener { /// Execution partition index pub partition_index: usize, - /// Column indexes in `table_schema` needed by the query - pub projection: Arc<[usize]>, + /// Projection to apply to the logical file schema. + /// + /// Note that this projection may not be applied directly: + /// - Casts may be optimized away by reading the column into the required data type directly + /// - Struct access will be pushed down + pub projection: ProjectionExprs, /// Target number of rows in each output RecordBatch pub batch_size: usize, /// Optional limit on the number of rows to read @@ -136,13 +152,9 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; - let projected_schema = - SchemaRef::from(self.logical_file_schema.project(&self.projection)?); let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory); - let schema_adapter = self - .schema_adapter_factory - .create(projected_schema, Arc::clone(&self.logical_file_schema)); let mut predicate = self.predicate.clone(); + let mut projections = self.projection.clone(); let logical_file_schema = Arc::clone(&self.logical_file_schema); let partition_fields = self.partition_fields.clone(); let reorder_predicates = self.reorder_filters; @@ -265,19 +277,19 @@ impl FileOpener for ParquetOpener { // Adapt the predicate to the physical file schema. // This evaluates missing columns and inserts any necessary casts. if let Some(expr_adapter_factory) = expr_adapter_factory { + let partition_values = partition_fields + .iter() + .cloned() + .zip(partitioned_file.partition_values.clone()) + .collect_vec(); predicate = predicate .map(|p| { - let partition_values = partition_fields - .iter() - .cloned() - .zip(partitioned_file.partition_values.clone()) - .collect_vec(); let expr = expr_adapter_factory .create( Arc::clone(&logical_file_schema), Arc::clone(&physical_file_schema), ) - .with_partition_values(partition_values) + .with_partition_values(partition_values.clone()) .rewrite(p)?; // After rewriting to the file schema, further simplifications may be possible. // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE` @@ -286,6 +298,24 @@ impl FileOpener for ParquetOpener { }) .transpose()?; predicate_file_schema = Arc::clone(&physical_file_schema); + // Now transform projections + let mut new_projections = Vec::new(); + for projection in projections { + let mut expr = expr_adapter_factory + .create( + Arc::clone(&logical_file_schema), + Arc::clone(&physical_file_schema), + ) + .with_partition_values(partition_values.clone()) + .rewrite(Arc::clone(&projection.expr))?; + // After rewriting to the file schema, further simplifications may be possible. + // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE` + // and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.). + expr = PhysicalExprSimplifier::new(&physical_file_schema) + .simplify(expr)?; + new_projections.push(ProjectionExpr::new(expr, projection.alias)); + } + projections = ProjectionExprs::new(new_projections); } // Build predicates for this specific file @@ -308,6 +338,9 @@ impl FileOpener for ParquetOpener { .await?; } + let mask = + build_projection_mask(&projections, reader_metadata.parquet_schema())?; + metadata_timer.stop(); let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( @@ -315,14 +348,6 @@ impl FileOpener for ParquetOpener { reader_metadata, ); - let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(&physical_file_schema)?; - - let mask = ProjectionMask::roots( - builder.parquet_schema(), - adapted_projections.iter().cloned(), - ); - // Filter pushdown: evaluate predicates during scan if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { let row_filter = row_filter::build_row_filter( @@ -443,7 +468,7 @@ impl FileOpener for ParquetOpener { let arrow_reader_metrics = ArrowReaderMetrics::enabled(); let stream = builder - .with_projection(mask) + .with_projection(mask.mask) .with_batch_size(batch_size) .with_row_groups(row_group_indexes) .with_metrics(arrow_reader_metrics.clone()) @@ -455,6 +480,8 @@ impl FileOpener for ParquetOpener { file_metrics.predicate_cache_inner_records.clone(); let predicate_cache_records = file_metrics.predicate_cache_records.clone(); + let projector = mask.projection.make_projector(stream.schema())?; + let stream = stream.map_err(DataFusionError::from).map(move |b| { b.and_then(|b| { copy_arrow_reader_metrics( @@ -462,7 +489,7 @@ impl FileOpener for ParquetOpener { &predicate_cache_inner_records, &predicate_cache_records, ); - schema_mapping.map_batch(b) + projector.project_batch(&b) }) }); @@ -496,6 +523,165 @@ fn copy_arrow_reader_metrics( } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct ColumnPathPlaceholder { + path: ColumnPath, +} + +impl std::fmt::Display for ColumnPathPlaceholder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let col = self.path.parts().join("."); + f.write_str(&col) + } +} + +impl PhysicalExpr for ColumnPathPlaceholder { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, _input_schema: &Schema) -> Result { + unimplemented!() + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let col = self.path.parts().join("."); + f.write_str(&col) + } + + fn evaluate(&self, _batch: &RecordBatch) -> Result { + unimplemented!() + } +} + +struct ParquetProjection { + mask: ProjectionMask, + projection: ProjectionExprs, +} + +/// Iterate through all of the expressions in this [`ProjectionExprs`] and: +/// 1. For each column reference or struct field access find the appropriate leaf column in the Parquet schema. +/// 2. Rewrite the column reference or struct field access to match the index that will be read out of the Parquet stream. +/// 3. Add this column to the Parquet ProjectionMask. +fn build_projection_mask( + projections: &ProjectionExprs, + schema: &SchemaDescriptor, +) -> Result { + let mut projections = projections.iter().cloned().collect_vec(); + // First pass: transform the expressions *bottoms up* by replacing any columns with a column path placeholder + // and flattening any nested struct field access into column path placeholders. + for projection in projections.iter_mut() { + projection.expr = Arc::clone(&projection.expr) + .transform_up(|e| { + if let Some(col) = e.as_any().downcast_ref::() { + let path = ColumnPath::new(vec![col.name().to_string()]); + Ok(Transformed::yes(Arc::new(ColumnPathPlaceholder { + path: path, + }))) + } else if let Some(func) = e.as_any().downcast_ref::() + { + if func.name() == "get_field" { + let args = func.args(); + if args.len() == 2 { + let input = Arc::clone(&args[0]); + let field = Arc::clone(&args[1]); + if let Some(lit) = field.as_any().downcast_ref::() { + if let ScalarValue::Utf8(Some(field_name)) + | ScalarValue::Utf8View(Some(field_name)) + | ScalarValue::LargeUtf8(Some(field_name)) = + lit.value() + { + if let Some(path) = input + .as_any() + .downcast_ref::( + ) { + // Merge the paths, replace the entire expression + let mut new_path = path.path.clone(); + new_path.append(vec![field_name.to_string()]); + Ok(Transformed::yes(Arc::new( + ColumnPathPlaceholder { path: new_path }, + ))) + } else { + Ok(Transformed::no(e)) + } + } else { + Ok(Transformed::no(e)) + } + } else { + Ok(Transformed::no(e)) + } + } else { + Ok(Transformed::no(e)) + } + } else { + Ok(Transformed::no(e)) + } + } else { + Ok(Transformed::no(e)) + } + }) + .data()?; + } + + // Now do another pass to collect the ColumnPath's that persisted + let mut column_paths = HashSet::new(); + for projection in projections.iter() { + let expr = Arc::clone(&projection.expr); + expr.apply(|e| { + if let Some(path) = e.as_any().downcast_ref::() { + column_paths.insert(path.path.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + } + + // Build the Parquet ProjectionMask and new columns that point to it + let mut path_to_expr = HashMap::new(); + let mut leaves = Vec::new(); + for (idx, column) in schema.columns().iter().enumerate() { + if column_paths.contains(column.path()) { + let expr = Arc::new(Column::new(&column.path().string(), leaves.len())) + as Arc; + path_to_expr.insert(column.path(), expr); + leaves.push(idx); + } + } + let mask = ProjectionMask::leaves(schema, leaves); + + // Final pass: replace all of the placeholders with new column references + for projection in projections.iter_mut() { + projection.expr = Arc::clone(&projection.expr) + .transform_up(|e| { + if let Some(path) = e.as_any().downcast_ref::() { + let expr = + Arc::clone(path_to_expr.get(&path.path).ok_or_else(|| { + internal_datafusion_err!( + "Column path {} not found in file", + path.path.string() + ) + })?); + Ok(Transformed::yes(expr)) + } else { + Ok(Transformed::no(e)) + } + }) + .data()?; + } + let projection = ProjectionExprs::new(projections); + Ok(ParquetProjection { mask, projection }) +} + /// Wraps an inner RecordBatchStream and a [`FilePruner`] /// /// This can terminate the scan early when some dynamic filters is updated after @@ -653,7 +839,7 @@ impl ParquetOpener { /// Note: file_name is only used for error messages fn create_initial_plan( file_name: &str, - extensions: Option>, + extensions: Option>, row_group_count: usize, ) -> Result { if let Some(extensions) = extensions { @@ -775,7 +961,8 @@ mod test { }; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ - expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr, + expressions::DynamicFilterPhysicalExpr, planner::logical2physical, + projection::ProjectionExprs, PhysicalExpr, }; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; @@ -874,7 +1061,7 @@ mod test { let make_opener = |predicate| { ParquetOpener { partition_index: 0, - projection: Arc::new([0, 1]), + projection: ProjectionExprs::from_indices(&[0, 1], &schema), batch_size: 1024, limit: None, predicate: Some(predicate), @@ -943,7 +1130,7 @@ mod test { let make_opener = |predicate| { ParquetOpener { partition_index: 0, - projection: Arc::new([0]), + projection: ProjectionExprs::from_indices(&[0], &file_schema), batch_size: 1024, limit: None, predicate: Some(predicate), @@ -1032,7 +1219,7 @@ mod test { let make_opener = |predicate| { ParquetOpener { partition_index: 0, - projection: Arc::new([0]), + projection: ProjectionExprs::from_indices(&[0], &file_schema), batch_size: 1024, limit: None, predicate: Some(predicate), @@ -1124,7 +1311,7 @@ mod test { let make_opener = |predicate| { ParquetOpener { partition_index: 0, - projection: Arc::new([0]), + projection: ProjectionExprs::from_indices(&[0], &file_schema), batch_size: 1024, limit: None, predicate: Some(predicate), @@ -1216,7 +1403,7 @@ mod test { let make_opener = |predicate| { ParquetOpener { partition_index: 0, - projection: Arc::new([0]), + projection: ProjectionExprs::from_indices(&[0], &file_schema), batch_size: 1024, limit: None, predicate: Some(predicate), @@ -1370,7 +1557,7 @@ mod test { let make_opener = |predicate| ParquetOpener { partition_index: 0, - projection: Arc::new([0, 1]), + projection: ProjectionExprs::from_indices(&[0, 1], &table_schema), batch_size: 1024, limit: None, predicate: Some(predicate), diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 5ed74ecfd98f..89fac48e1e95 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -31,7 +31,6 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; @@ -289,7 +288,7 @@ pub struct ParquetSource { /// Optional hint for the size of the parquet metadata pub(crate) metadata_size_hint: Option, /// Projection information for column pushdown - pub(crate) projection: SplitProjection, + pub(crate) projection: ProjectionExprs, #[cfg(feature = "parquet_encryption")] pub(crate) encryption_factory: Option>, } @@ -303,7 +302,10 @@ impl ParquetSource { pub fn new(table_schema: impl Into) -> Self { let table_schema = table_schema.into(); Self { - projection: SplitProjection::unprojected(&table_schema), + projection: ProjectionExprs::from_indices( + &(0..table_schema.table_schema().fields().len()).collect_vec(), + table_schema.table_schema(), + ), table_schema, table_parquet_options: TableParquetOptions::default(), metrics: ExecutionPlanMetricsSet::new(), @@ -519,8 +521,6 @@ impl FileSource for ParquetSource { base_config: &FileScanConfig, partition: usize, ) -> datafusion_common::Result> { - let split_projection = self.projection.clone(); - let (expr_adapter_factory, schema_adapter_factory) = match ( base_config.expr_adapter_factory.as_ref(), self.schema_adapter_factory.as_ref(), @@ -580,9 +580,9 @@ impl FileSource for ParquetSource { .as_ref() .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); - let mut opener = Arc::new(ParquetOpener { + let opener = Arc::new(ParquetOpener { partition_index: partition, - projection: Arc::from(split_projection.file_indices.clone()), + projection: self.projection.clone(), batch_size: self .batch_size .expect("Batch size must set before creating ParquetOpener"), @@ -607,11 +607,6 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), }) as Arc; - opener = ProjectionOpener::try_new( - split_projection.clone(), - Arc::clone(&opener), - self.table_schema.file_schema(), - )?; Ok(opener) } @@ -638,15 +633,12 @@ impl FileSource for ParquetSource { projection: &ProjectionExprs, ) -> datafusion_common::Result>> { let mut source = self.clone(); - let new_projection = self.projection.source.try_merge(projection)?; - let split_projection = - SplitProjection::new(self.table_schema.file_schema(), &new_projection); - source.projection = split_projection; + source.projection = projection.clone(); Ok(Some(Arc::new(source))) } fn projection(&self) -> Option<&ProjectionExprs> { - Some(&self.projection.source) + Some(&self.projection) } fn metrics(&self) -> &ExecutionPlanMetricsSet { From 45c712d56faabcd6c7d49e07bbde5fcf9679d11d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 27 Nov 2025 14:18:11 +0530 Subject: [PATCH 2/2] refactor --- datafusion/datasource-parquet/src/opener.rs | 63 ++++++++++----------- 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 35371be2b17d..ca62a5d27a95 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -570,6 +570,32 @@ struct ParquetProjection { projection: ProjectionExprs, } +/// Try to extract a merged ColumnPath from a get_field(ColumnPathPlaceholder, "field_name") expression. +fn try_merge_get_field_path(func: &ScalarFunctionExpr) -> Option { + if func.name() != "get_field" { + return None; + } + let args = func.args(); + if args.len() != 2 { + return None; + } + let input = &args[0]; + let field = &args[1]; + + let lit = field.as_any().downcast_ref::()?; + let field_name = match lit.value() { + ScalarValue::Utf8(Some(s)) + | ScalarValue::Utf8View(Some(s)) + | ScalarValue::LargeUtf8(Some(s)) => s, + _ => return None, + }; + + let path = input.as_any().downcast_ref::()?; + let mut new_path = path.path.clone(); + new_path.append(vec![field_name.to_string()]); + Some(new_path) +} + /// Iterate through all of the expressions in this [`ProjectionExprs`] and: /// 1. For each column reference or struct field access find the appropriate leaf column in the Parquet schema. /// 2. Rewrite the column reference or struct field access to match the index that will be read out of the Parquet stream. @@ -591,39 +617,10 @@ fn build_projection_mask( }))) } else if let Some(func) = e.as_any().downcast_ref::() { - if func.name() == "get_field" { - let args = func.args(); - if args.len() == 2 { - let input = Arc::clone(&args[0]); - let field = Arc::clone(&args[1]); - if let Some(lit) = field.as_any().downcast_ref::() { - if let ScalarValue::Utf8(Some(field_name)) - | ScalarValue::Utf8View(Some(field_name)) - | ScalarValue::LargeUtf8(Some(field_name)) = - lit.value() - { - if let Some(path) = input - .as_any() - .downcast_ref::( - ) { - // Merge the paths, replace the entire expression - let mut new_path = path.path.clone(); - new_path.append(vec![field_name.to_string()]); - Ok(Transformed::yes(Arc::new( - ColumnPathPlaceholder { path: new_path }, - ))) - } else { - Ok(Transformed::no(e)) - } - } else { - Ok(Transformed::no(e)) - } - } else { - Ok(Transformed::no(e)) - } - } else { - Ok(Transformed::no(e)) - } + if let Some(new_path) = try_merge_get_field_path(func) { + Ok(Transformed::yes(Arc::new(ColumnPathPlaceholder { + path: new_path, + }))) } else { Ok(Transformed::no(e)) }