From 09d32e916383a6fa05aecf394192bb82723974f9 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 5 May 2026 13:21:08 -0400 Subject: [PATCH 1/6] Plumb Parquet virtual columns (e.g., row_number) through TableSchema and ParquetOpener, gated behind a tested-only extension-type allowlist, to unblock Comet's native-DataFusion support for Spark's _tmp_metadata_row_index. --- Cargo.lock | 1 + datafusion/datasource-parquet/Cargo.toml | 1 + datafusion/datasource-parquet/src/opener.rs | 511 +++++++++++++++++++- datafusion/datasource/src/table_schema.rs | 138 +++++- 4 files changed, 623 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de1da5205d891..d0d07a2a2e830 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2003,6 +2003,7 @@ name = "datafusion-datasource-parquet" version = "53.1.0" dependencies = [ "arrow", + "arrow-schema", "async-trait", "bytes", "chrono", diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index a5855af17a536..8aa6ca1f97721 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -32,6 +32,7 @@ all-features = true [dependencies] arrow = { workspace = true } +arrow-schema = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } datafusion-common = { workspace = true, features = ["object_store", "parquet"] } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..fc4c4ae2297aa 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -38,7 +38,8 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; +use arrow::datatypes::{FieldRef, Schema, SchemaRef, TimeUnit}; +use arrow_schema::extension::ExtensionType; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -66,6 +67,7 @@ use futures::{ use log::debug; use parquet::DecodeResult; use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::RowNumber; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, @@ -274,6 +276,10 @@ struct PreparedParquetOpen { output_schema: SchemaRef, projection: ProjectionExprs, predicate: Option>, + /// Virtual columns (e.g. parquet `row_number`) to be produced by the reader + /// in addition to the file's own columns. Empty when no virtual columns + /// were requested by the caller. + virtual_columns: Vec, reorder_predicates: bool, pushdown_filters: bool, force_filter_selections: bool, @@ -535,6 +541,7 @@ impl ParquetMorselizer { &self, partitioned_file: PartitionedFile, ) -> Result { + validate_supported_virtual_columns(self.table_schema.virtual_columns())?; let file_range = partitioned_file.range.clone(); let extensions = partitioned_file.extensions.clone(); let file_name = partitioned_file.object_meta.location.to_string(); @@ -643,6 +650,7 @@ impl ParquetMorselizer { output_schema, projection, predicate, + virtual_columns: self.table_schema.virtual_columns().clone(), reorder_predicates: self.reorder_filters, pushdown_filters: self.pushdown_filters, force_filter_selections: self.force_filter_selections, @@ -758,22 +766,26 @@ impl MetadataLoadedParquetOpen { // - The logical file schema: this is the table schema minus any hive partition columns and projections. // This is what the physical file schema is coerced to. // - The physical file schema: this is the schema that the arrow-rs - // parquet reader will actually produce. + // parquet reader will actually produce for the file's columns. Any + // virtual columns (see [`crate::TableSchema::virtual_columns`]) are + // produced separately by the reader and are not part of this schema. + let has_virtual_cols = !prepared.virtual_columns.is_empty(); + // The initial `load` did not request virtual columns, so + // `reader_metadata.schema()` currently matches the file's own columns + // only. let mut physical_file_schema = Arc::clone(reader_metadata.schema()); // The schema loaded from the file may not be the same as the // desired schema (for example if we want to instruct the parquet // reader to read strings using Utf8View instead). Update if necessary + let mut metadata_dirty = false; if let Some(merged) = apply_file_schema_type_coercions( &prepared.logical_file_schema, &physical_file_schema, ) { physical_file_schema = Arc::new(merged); options = options.with_schema(Arc::clone(&physical_file_schema)); - reader_metadata = ArrowReaderMetadata::try_new( - Arc::clone(reader_metadata.metadata()), - options.clone(), - )?; + metadata_dirty = true; } if let Some(ref coerce) = prepared.coerce_int96 @@ -785,6 +797,23 @@ impl MetadataLoadedParquetOpen { { physical_file_schema = Arc::new(merged); options = options.with_schema(Arc::clone(&physical_file_schema)); + metadata_dirty = true; + } + + // If the caller requested virtual columns (e.g. parquet `row_number`), + // register them on the reader options so the decoder produces them + // alongside the projected file columns. Any schema coercion above has + // already been applied via `options.with_schema` with only the file's + // own columns; arrow-rs appends virtual columns to that schema + // internally, so we must not include them here. + if has_virtual_cols { + options = options + .with_virtual_columns(prepared.virtual_columns.clone()) + .map_err(|e| DataFusionError::External(format!("{e}").into()))?; + metadata_dirty = true; + } + + if metadata_dirty { reader_metadata = ArrowReaderMetadata::try_new( Arc::clone(reader_metadata.metadata()), options.clone(), @@ -807,11 +836,45 @@ impl MetadataLoadedParquetOpen { let needs_rewrite = prepared.predicate.is_some() || prepared.logical_file_schema != physical_file_schema; if needs_rewrite { + // When virtual columns are requested, augment the logical and + // physical schemas passed to the rewriter/simplifier with those + // fields. The rewriter identity-rewrites references found in both + // schemas, keeping virtual-column references as `Column` rather + // than replacing them with null literals; the simplifier needs + // them present so it can resolve their data types while walking + // expression trees. We keep `physical_file_schema` itself as the + // pure file schema so downstream predicate pushdown, pruning, and + // row filter construction stay unaffected. + let (logical_for_rewrite, physical_for_rewrite) = if has_virtual_cols { + let logical_augmented = Schema::new( + prepared + .logical_file_schema + .fields() + .iter() + .cloned() + .chain(prepared.virtual_columns.iter().cloned()) + .collect::>(), + ); + let physical_augmented = Schema::new( + physical_file_schema + .fields() + .iter() + .cloned() + .chain(prepared.virtual_columns.iter().cloned()) + .collect::>(), + ); + (Arc::new(logical_augmented), Arc::new(physical_augmented)) + } else { + ( + Arc::clone(&prepared.logical_file_schema), + Arc::clone(&physical_file_schema), + ) + }; let rewriter = prepared.expr_adapter_factory.create( - Arc::clone(&prepared.logical_file_schema), - Arc::clone(&physical_file_schema), + Arc::clone(&logical_for_rewrite), + Arc::clone(&physical_for_rewrite), )?; - let simplifier = PhysicalExprSimplifier::new(&physical_file_schema); + let simplifier = PhysicalExprSimplifier::new(&physical_for_rewrite); prepared.predicate = prepared .predicate .map(|p| simplifier.simplify(rewriter.rewrite(p)?)) @@ -1133,8 +1196,29 @@ impl RowGroupsPrunedParquetOpen { } let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + // Virtual columns are produced by the reader separately from the + // projection mask, so strip them from the expressions we feed into + // `build_projection_read_plan`. We substitute each virtual column + // reference with a null literal; that leaves the remaining Column + // refs (into `physical_file_schema`) intact for + // `ProjectionMask::roots`, which only understands file columns. + let projection_for_read_plan = if prepared.virtual_columns.is_empty() { + prepared.projection.clone() + } else { + let null_replacements = prepared + .virtual_columns + .iter() + .map(|f| { + ScalarValue::try_from(f.data_type()).map(|v| (f.name().clone(), v)) + }) + .collect::>>()?; + prepared.projection.clone().try_map_exprs(|expr| { + replace_columns_with_literals(expr, &null_replacements) + })? + }; + let read_plan = build_projection_read_plan( - prepared.projection.expr_iter(), + projection_for_read_plan.expr_iter(), &prepared.physical_file_schema, reader_metadata.parquet_schema(), ); @@ -1174,7 +1258,21 @@ impl RowGroupsPrunedParquetOpen { // Check if we need to replace the schema to handle things like differing nullability or metadata. // See note below about file vs. output schema. - let stream_schema = read_plan.projected_schema; + // The reader produces projected file columns followed by any virtual + // columns (`ArrowReaderOptions::with_virtual_columns` appends them to + // each decoded batch). + let stream_schema = if prepared.virtual_columns.is_empty() { + read_plan.projected_schema + } else { + let fields = read_plan + .projected_schema + .fields() + .iter() + .cloned() + .chain(prepared.virtual_columns.iter().cloned()) + .collect::>(); + Arc::new(Schema::new(fields)) + }; let replace_schema = stream_schema != prepared.output_schema; // Rebase column indices to match the narrowed stream schema. @@ -1323,6 +1421,38 @@ impl PushDecoderStreamState { type ConstantColumns = HashMap; +/// Allowlist of Arrow extension types for Parquet virtual columns that this +/// opener is tested against. Only add entries here when a corresponding +/// end-to-end test exists in `mod test::virtual_columns`; arrow-rs may expose +/// additional virtual extension types over time, and silently forwarding +/// untested ones risks producing columns that don't round-trip through +/// DataFusion's predicate and projection paths as expected. +const SUPPORTED_VIRTUAL_EXTENSION_TYPES: &[&str] = &[RowNumber::NAME]; + +fn validate_supported_virtual_columns(virtual_columns: &[FieldRef]) -> Result<()> { + for field in virtual_columns { + let name = field.extension_type_name().ok_or_else(|| { + DataFusionError::Configuration(format!( + "Virtual column '{}' is missing an Arrow extension type; \ + virtual columns must carry one of: {:?}", + field.name(), + SUPPORTED_VIRTUAL_EXTENSION_TYPES, + )) + })?; + if !SUPPORTED_VIRTUAL_EXTENSION_TYPES.contains(&name) { + return Err(DataFusionError::NotImplemented(format!( + "Virtual column '{}' uses unsupported Arrow extension type '{}'; \ + supported types: {:?}. Add the extension type to \ + SUPPORTED_VIRTUAL_EXTENSION_TYPES together with a test covering it.", + field.name(), + name, + SUPPORTED_VIRTUAL_EXTENSION_TYPES, + ))); + } + } + Ok(()) +} + /// Extract constant column values from statistics, keyed by column name in the logical file schema. fn constant_columns_from_stats( statistics: Option<&Statistics>, @@ -1723,12 +1853,28 @@ mod test { self } - /// Set projection by column indices (convenience method for common case). + /// Set projection by column indices. + /// + /// The indices are resolved against the **file schema**, not the full + /// table schema. Callers that need to project partition columns or + /// virtual columns must use [`Self::with_projection`] and construct a + /// [`ProjectionExprs`] against [`TableSchema::table_schema`]. fn with_projection_indices(mut self, indices: &[usize]) -> Self { self.projection_indices = Some(indices.to_vec()); self } + /// Set an explicit projection. + /// + /// Prefer this over [`Self::with_projection_indices`] whenever the + /// projection must reference partition or virtual columns, since + /// `with_projection_indices` resolves its indices against the file + /// schema only. + fn with_projection(mut self, projection: ProjectionExprs) -> Self { + self.projection = Some(projection); + self + } + /// Set the predicate. fn with_predicate(mut self, predicate: Arc) -> Self { self.predicate = Some(predicate); @@ -1977,7 +2123,7 @@ mod test { async fn write_parquet( store: Arc, filename: &str, - batch: arrow::record_batch::RecordBatch, + batch: RecordBatch, ) -> usize { write_parquet_batches(store, filename, vec![batch], None).await } @@ -1986,7 +2132,7 @@ mod test { async fn write_parquet_batches( store: Arc, filename: &str, - batches: Vec, + batches: Vec, props: Option, ) -> usize { let mut out = BytesMut::new().writer(); @@ -2720,4 +2866,341 @@ mod test { "without page index all rows are returned" ); } + + /// Helpers for tests that exercise parquet virtual columns + /// (e.g. `row_number`) plumbed through `TableSchema`/`ParquetOpener`. + mod virtual_columns { + use super::*; + use arrow::array::{Array, Int64Array}; + use arrow::datatypes::FieldRef; + use parquet::arrow::RowNumber; + + /// Build a parquet `row_number` virtual column field. Spark's + /// `_tmp_metadata_row_index` is declared nullable, so the default + /// matches that contract; tests that need `nullable=false` can + /// override via `with_nullable`. + fn row_number_field(name: &str, nullable: bool) -> FieldRef { + Arc::new( + Field::new(name, DataType::Int64, nullable) + .with_extension_type(RowNumber), + ) + } + + /// Collect every `Int64` value from the given column in every batch + /// of a stream. Used to verify the `row_number` column end to end. + async fn collect_int64_values( + mut stream: BoxStream<'static, Result>, + column: usize, + ) -> Vec { + let mut out = vec![]; + while let Some(batch) = stream.next().await { + let batch = batch.unwrap(); + let array = batch + .column(column) + .as_any() + .downcast_ref::() + .expect("expected Int64 column"); + for i in 0..array.len() { + assert!( + !array.is_null(i), + "row_number values produced by the reader must not be null" + ); + out.push(array.value(i)); + } + } + out + } + + /// Write a parquet file containing `num_row_groups` groups of + /// `rows_per_group` rows with a single `value` Int64 column. + /// Values are `0..num_row_groups*rows_per_group`. + async fn write_grouped_file( + store: &Arc, + path: &str, + num_row_groups: usize, + rows_per_group: usize, + ) -> (SchemaRef, usize) { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + let mut batches = Vec::with_capacity(num_row_groups); + for g in 0..num_row_groups { + let start = (g * rows_per_group) as i64; + let values: Vec = (start..start + rows_per_group as i64).collect(); + batches.push( + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(values))], + ) + .unwrap(), + ); + } + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(rows_per_group)) + .build(); + let data_size = + write_parquet_batches(Arc::clone(store), path, batches, Some(props)) + .await; + (schema, data_size) + } + + #[tokio::test] + async fn test_row_index_basic() { + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "basic.parquet", 1, 5).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + // Project [value, row_number] — indices in table_schema are + // [0 file:value, 1 virtual:row_number]. + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + + let file = PartitionedFile::new( + "basic.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let stream = open_file(&morselizer, file).await.unwrap(); + let row_numbers = collect_int64_values(stream, 1).await; + assert_eq!(row_numbers, vec![0, 1, 2, 3, 4]); + } + + #[tokio::test] + async fn test_row_index_projection_only() { + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "proj_only.parquet", 1, 4).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + // Project only the virtual column (index 1). + let projection = + ProjectionExprs::from_indices(&[1], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + + let file = PartitionedFile::new( + "proj_only.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let stream = open_file(&morselizer, file).await.unwrap(); + let row_numbers = collect_int64_values(stream, 0).await; + assert_eq!(row_numbers, vec![0, 1, 2, 3]); + } + + #[tokio::test] + async fn test_row_index_multi_row_group() { + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "multi_rg.parquet", 3, 100).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + + let file = PartitionedFile::new( + "multi_rg.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let stream = open_file(&morselizer, file).await.unwrap(); + let row_numbers = collect_int64_values(stream, 1).await; + let expected: Vec = (0..300).collect(); + assert_eq!(row_numbers, expected); + } + + #[tokio::test] + async fn test_row_index_with_row_group_skip() { + // 3 row groups of 100 rows. A predicate that excludes the middle + // row group (values 100..200) must leave absolute row numbers + // 0..100 and 200..300 intact — not 0..200. This guards against + // the arrow-rs bug fixed in apache/arrow-rs#8863. + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "rg_skip.parquet", 3, 100).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + // `value < 100 OR value >= 200` prunes the middle row group via + // min/max statistics. + let expr = col("value") + .lt(lit(100i64)) + .or(col("value").gt_eq(lit(200i64))); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .with_predicate(predicate) + .with_row_group_stats_pruning(true) + .build(); + + let file = PartitionedFile::new( + "rg_skip.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let stream = open_file(&morselizer, file).await.unwrap(); + let row_numbers = collect_int64_values(stream, 1).await; + let expected: Vec = (0..100).chain(200..300).collect(); + assert_eq!(row_numbers, expected); + } + + #[tokio::test] + async fn test_row_index_with_partition_cols() { + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "part=5/data.parquet", 1, 3).await; + + let rn_field = row_number_field("row_number", false); + let partition_col = Arc::new(Field::new("part", DataType::Int32, false)); + let table_schema = TableSchema::new( + Arc::clone(&file_schema), + vec![Arc::clone(&partition_col)], + ) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + // table_schema layout: [value(0), part(1), row_number(2)]. + let projection = + ProjectionExprs::from_indices(&[0, 1, 2], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + + let mut file = PartitionedFile::new( + "part=5/data.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + file.partition_values = vec![ScalarValue::Int32(Some(5))]; + + let stream = open_file(&morselizer, file).await.unwrap(); + let mut stream = stream; + let batch = stream.next().await.unwrap().unwrap(); + assert!(stream.next().await.is_none()); + + assert_eq!(batch.num_columns(), 3); + assert_eq!(batch.schema().field(0).name(), "value"); + assert_eq!(batch.schema().field(1).name(), "part"); + assert_eq!(batch.schema().field(2).name(), "row_number"); + + let part = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(part.iter().all(|v| v == Some(5))); + + let rn = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + let rn_values: Vec = (0..rn.len()).map(|i| rn.value(i)).collect(); + assert_eq!(rn_values, vec![0, 1, 2]); + } + + #[tokio::test] + async fn test_row_index_nullable_int64() { + // Spark declares `_tmp_metadata_row_index` nullable. Verify the + // nullability flag flows through unchanged. + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "nullable.parquet", 1, 3).await; + + let rn_field = row_number_field("_tmp_metadata_row_index", true); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + + let file = PartitionedFile::new( + "nullable.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let mut stream = open_file(&morselizer, file).await.unwrap(); + let batch = stream.next().await.unwrap().unwrap(); + + let schema_field = batch.schema().field(1).clone(); + assert_eq!(schema_field.name(), "_tmp_metadata_row_index"); + assert_eq!(schema_field.data_type(), &DataType::Int64); + assert!( + schema_field.is_nullable(), + "nullable flag should be preserved for Spark's row index field" + ); + } + + #[tokio::test] + async fn test_unsupported_virtual_extension_type_rejected() { + // Guard: opener must reject virtual columns carrying extension + // types outside the tested allowlist, rather than silently + // forwarding them to arrow-rs (where they would produce columns + // we have not validated against DataFusion's projection and + // predicate paths). + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "unsupported.parquet", 1, 1).await; + + // RowGroupIndex is a real arrow-rs virtual type but is not in + // SUPPORTED_VIRTUAL_EXTENSION_TYPES until a test is added for it. + let rg_field = Arc::new( + Field::new("row_group_index", DataType::Int64, false) + .with_extension_type(parquet::arrow::RowGroupIndex), + ); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![rg_field]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + let file = PartitionedFile::new( + "unsupported.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + let err = morselizer.plan_file(file).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("parquet.virtual.row_group_index"), + "error should name the unsupported extension type, got: {msg}" + ); + } + } } diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 5b7fc4727df05..955870a734620 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -23,10 +23,17 @@ use std::sync::Arc; /// The overall schema for potentially partitioned data sources. /// /// When reading partitioned data (such as Hive-style partitioning), a [`TableSchema`] -/// consists of two parts: +/// consists of up to three parts: /// 1. **File schema**: The schema of the actual data files on disk /// 2. **Partition columns**: Columns whose values are encoded in the directory structure, /// but not stored in the files themselves +/// 3. **Virtual columns**: Columns produced by the file reader (e.g. Parquet +/// `row_number`) that are not stored in the files +/// +/// The full table schema is composed in that order: file columns, then +/// partition columns, then virtual columns. Consumers that need a different +/// output ordering should use a projection on top of +/// [`TableSchema::table_schema`]. /// /// # Example: Partitioned Table /// @@ -63,6 +70,15 @@ pub struct TableSchema { /// this field holds that schema. file_schema: SchemaRef, + /// Virtual columns that are generated by the reader rather than read from + /// the data files or the directory structure. + /// + /// For example, a Parquet reader may inject a `row_number` column whose + /// values are produced per file by the reader. Virtual column fields must + /// carry an arrow extension type (e.g. `RowNumber`, `RowGroupIndex`) so the + /// file reader can recognize them. + virtual_columns: Arc>, + /// Columns that are derived from the directory structure (partitioning scheme). /// /// For Hive-style partitioning like `/date=2025-10-10/region=us-west/`, @@ -72,10 +88,11 @@ pub struct TableSchema { /// row during query execution based on the file's location. table_partition_cols: Arc>, - /// The complete table schema: file_schema columns followed by partition columns. + /// The complete table schema: file_schema columns, followed by partition + /// columns, followed by virtual columns. /// - /// This is pre-computed during construction by concatenating `file_schema` - /// and `table_partition_cols`, so it can be returned as a cheap reference. + /// This is pre-computed during construction, so it can be returned as a + /// cheap reference. table_schema: SchemaRef, } @@ -117,12 +134,18 @@ impl TableSchema { /// assert_eq!(table_schema.table_schema().fields().len(), 4); /// ``` pub fn new(file_schema: SchemaRef, table_partition_cols: Vec) -> Self { - let mut builder = SchemaBuilder::from(file_schema.as_ref()); - builder.extend(table_partition_cols.iter().cloned()); + let table_partition_cols = Arc::new(table_partition_cols); + let virtual_columns: Arc> = Arc::new(vec![]); + let table_schema = build_table_schema( + &file_schema, + virtual_columns.as_ref(), + table_partition_cols.as_ref(), + ); Self { file_schema, - table_partition_cols: Arc::new(table_partition_cols), - table_schema: Arc::new(builder.finish()), + virtual_columns, + table_partition_cols, + table_schema, } } @@ -149,9 +172,38 @@ impl TableSchema { ); table_partition_cols.extend(partition_cols); } - let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); - builder.extend(self.table_partition_cols.iter().cloned()); - self.table_schema = Arc::new(builder.finish()); + self.table_schema = build_table_schema( + &self.file_schema, + self.virtual_columns.as_ref(), + self.table_partition_cols.as_ref(), + ); + self + } + + /// Add virtual columns to an existing TableSchema, returning a new instance. + /// + /// Virtual columns are produced by the file reader (e.g. a Parquet + /// `row_number` column) rather than being stored in the files or derived + /// from partition paths. Each field must carry an arrow virtual extension + /// type so the reader can recognize it; `ParquetOpener` forwards these + /// fields to [`parquet::arrow::arrow_reader::ArrowReaderOptions::with_virtual_columns`]. + /// + /// Virtual columns are appended at the end of the table schema, after any + /// partition columns. + pub fn with_virtual_columns(mut self, virtual_columns: Vec) -> Self { + if self.virtual_columns.is_empty() { + self.virtual_columns = Arc::new(virtual_columns); + } else { + let existing = Arc::get_mut(&mut self.virtual_columns).expect( + "Expected to be the sole owner of virtual_columns since this function accepts mut self", + ); + existing.extend(virtual_columns); + } + self.table_schema = build_table_schema( + &self.file_schema, + self.virtual_columns.as_ref(), + self.table_partition_cols.as_ref(), + ); self } @@ -170,15 +222,35 @@ impl TableSchema { &self.table_partition_cols } - /// Get the full table schema (file schema + partition columns). + /// Get the virtual columns. /// - /// This is the complete schema that will be seen by queries, combining - /// both the columns from the files and the partition columns. + /// Virtual columns are produced by the file reader (e.g. Parquet + /// `row_number`) and are not stored in the data files or derived from + /// partition paths. + pub fn virtual_columns(&self) -> &Vec { + &self.virtual_columns + } + + /// Get the full table schema (file schema + partition columns + virtual columns). + /// + /// This is the complete schema that will be seen by queries. Fields appear + /// in the order: file columns, partition columns, virtual columns. pub fn table_schema(&self) -> &SchemaRef { &self.table_schema } } +fn build_table_schema( + file_schema: &SchemaRef, + virtual_columns: &[FieldRef], + table_partition_cols: &[FieldRef], +) -> SchemaRef { + let mut builder = SchemaBuilder::from(file_schema.as_ref()); + builder.extend(table_partition_cols.iter().cloned()); + builder.extend(virtual_columns.iter().cloned()); + Arc::new(builder.finish()) +} + impl From for TableSchema { fn from(schema: SchemaRef) -> Self { Self::from_file_schema(schema) @@ -276,4 +348,42 @@ mod tests { &expected_schema ); } + + #[test] + fn test_with_virtual_columns_layout() { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + ])); + + let virtual_cols = + vec![Arc::new(Field::new("row_number", DataType::Int64, true))]; + + let partition_cols = vec![Arc::new(Field::new("date", DataType::Utf8, false))]; + + // Apply virtual columns and partition columns in either order; the + // resulting table schema should always be [file, partition, virtual]. + let built_virtual_first = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(virtual_cols.clone()) + .with_table_partition_cols(partition_cols.clone()); + + let built_partition_first = + TableSchema::new(Arc::clone(&file_schema), partition_cols.clone()) + .with_virtual_columns(virtual_cols.clone()); + + let expected = Schema::new(vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + Field::new("date", DataType::Utf8, false), + Field::new("row_number", DataType::Int64, true), + ]); + + for ts in [built_virtual_first, built_partition_first] { + assert_eq!(ts.table_schema().as_ref(), &expected); + assert_eq!(ts.virtual_columns().len(), 1); + assert_eq!(ts.virtual_columns()[0].name(), "row_number"); + assert_eq!(ts.table_partition_cols().len(), 1); + assert_eq!(ts.file_schema().fields().len(), 2); + } + } } From f54b003888543d0f517feb9b9ab1baadd109e89e Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 5 May 2026 13:38:12 -0400 Subject: [PATCH 2/6] Cleanup. --- datafusion/datasource-parquet/src/opener.rs | 93 ++++++++------------- datafusion/datasource/src/table_schema.rs | 8 +- 2 files changed, 38 insertions(+), 63 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index fc4c4ae2297aa..c2f4881ded080 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -44,6 +44,7 @@ use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, + not_impl_err, }; use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; @@ -769,10 +770,6 @@ impl MetadataLoadedParquetOpen { // parquet reader will actually produce for the file's columns. Any // virtual columns (see [`crate::TableSchema::virtual_columns`]) are // produced separately by the reader and are not part of this schema. - let has_virtual_cols = !prepared.virtual_columns.is_empty(); - // The initial `load` did not request virtual columns, so - // `reader_metadata.schema()` currently matches the file's own columns - // only. let mut physical_file_schema = Arc::clone(reader_metadata.schema()); // The schema loaded from the file may not be the same as the @@ -800,16 +797,10 @@ impl MetadataLoadedParquetOpen { metadata_dirty = true; } - // If the caller requested virtual columns (e.g. parquet `row_number`), - // register them on the reader options so the decoder produces them - // alongside the projected file columns. Any schema coercion above has - // already been applied via `options.with_schema` with only the file's - // own columns; arrow-rs appends virtual columns to that schema - // internally, so we must not include them here. - if has_virtual_cols { - options = options - .with_virtual_columns(prepared.virtual_columns.clone()) - .map_err(|e| DataFusionError::External(format!("{e}").into()))?; + // Arrow-rs appends virtual columns to the supplied schema internally, + // so any `with_schema` coercion above must stay limited to file columns. + if !prepared.virtual_columns.is_empty() { + options = options.with_virtual_columns(prepared.virtual_columns.clone())?; metadata_dirty = true; } @@ -845,31 +836,10 @@ impl MetadataLoadedParquetOpen { // expression trees. We keep `physical_file_schema` itself as the // pure file schema so downstream predicate pushdown, pruning, and // row filter construction stay unaffected. - let (logical_for_rewrite, physical_for_rewrite) = if has_virtual_cols { - let logical_augmented = Schema::new( - prepared - .logical_file_schema - .fields() - .iter() - .cloned() - .chain(prepared.virtual_columns.iter().cloned()) - .collect::>(), - ); - let physical_augmented = Schema::new( - physical_file_schema - .fields() - .iter() - .cloned() - .chain(prepared.virtual_columns.iter().cloned()) - .collect::>(), - ); - (Arc::new(logical_augmented), Arc::new(physical_augmented)) - } else { - ( - Arc::clone(&prepared.logical_file_schema), - Arc::clone(&physical_file_schema), - ) - }; + let logical_for_rewrite = + append_fields(&prepared.logical_file_schema, &prepared.virtual_columns); + let physical_for_rewrite = + append_fields(&physical_file_schema, &prepared.virtual_columns); let rewriter = prepared.expr_adapter_factory.create( Arc::clone(&logical_for_rewrite), Arc::clone(&physical_for_rewrite), @@ -1261,18 +1231,8 @@ impl RowGroupsPrunedParquetOpen { // The reader produces projected file columns followed by any virtual // columns (`ArrowReaderOptions::with_virtual_columns` appends them to // each decoded batch). - let stream_schema = if prepared.virtual_columns.is_empty() { - read_plan.projected_schema - } else { - let fields = read_plan - .projected_schema - .fields() - .iter() - .cloned() - .chain(prepared.virtual_columns.iter().cloned()) - .collect::>(); - Arc::new(Schema::new(fields)) - }; + let stream_schema = + append_fields(&read_plan.projected_schema, &prepared.virtual_columns); let replace_schema = stream_schema != prepared.output_schema; // Rebase column indices to match the narrowed stream schema. @@ -1429,25 +1389,40 @@ type ConstantColumns = HashMap; /// DataFusion's predicate and projection paths as expected. const SUPPORTED_VIRTUAL_EXTENSION_TYPES: &[&str] = &[RowNumber::NAME]; +/// Return `base` unchanged when `extra` is empty; otherwise build a new schema +/// with `extra` appended to `base`'s fields. +fn append_fields(base: &SchemaRef, extra: &[FieldRef]) -> SchemaRef { + if extra.is_empty() { + return Arc::clone(base); + } + let fields = base + .fields() + .iter() + .cloned() + .chain(extra.iter().cloned()) + .collect::>(); + Arc::new(Schema::new(fields)) +} + fn validate_supported_virtual_columns(virtual_columns: &[FieldRef]) -> Result<()> { for field in virtual_columns { - let name = field.extension_type_name().ok_or_else(|| { - DataFusionError::Configuration(format!( + let Some(name) = field.extension_type_name() else { + return not_impl_err!( "Virtual column '{}' is missing an Arrow extension type; \ virtual columns must carry one of: {:?}", field.name(), - SUPPORTED_VIRTUAL_EXTENSION_TYPES, - )) - })?; + SUPPORTED_VIRTUAL_EXTENSION_TYPES + ); + }; if !SUPPORTED_VIRTUAL_EXTENSION_TYPES.contains(&name) { - return Err(DataFusionError::NotImplemented(format!( + return not_impl_err!( "Virtual column '{}' uses unsupported Arrow extension type '{}'; \ supported types: {:?}. Add the extension type to \ SUPPORTED_VIRTUAL_EXTENSION_TYPES together with a test covering it.", field.name(), name, - SUPPORTED_VIRTUAL_EXTENSION_TYPES, - ))); + SUPPORTED_VIRTUAL_EXTENSION_TYPES + ); } } Ok(()) diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 955870a734620..9e2a5b49d973f 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -138,8 +138,8 @@ impl TableSchema { let virtual_columns: Arc> = Arc::new(vec![]); let table_schema = build_table_schema( &file_schema, - virtual_columns.as_ref(), table_partition_cols.as_ref(), + virtual_columns.as_ref(), ); Self { file_schema, @@ -174,8 +174,8 @@ impl TableSchema { } self.table_schema = build_table_schema( &self.file_schema, - self.virtual_columns.as_ref(), self.table_partition_cols.as_ref(), + self.virtual_columns.as_ref(), ); self } @@ -201,8 +201,8 @@ impl TableSchema { } self.table_schema = build_table_schema( &self.file_schema, - self.virtual_columns.as_ref(), self.table_partition_cols.as_ref(), + self.virtual_columns.as_ref(), ); self } @@ -242,8 +242,8 @@ impl TableSchema { fn build_table_schema( file_schema: &SchemaRef, - virtual_columns: &[FieldRef], table_partition_cols: &[FieldRef], + virtual_columns: &[FieldRef], ) -> SchemaRef { let mut builder = SchemaBuilder::from(file_schema.as_ref()); builder.extend(table_partition_cols.iter().cloned()); From 59fc97f723e0e89afaee2c33aabcf564c38681f8 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 5 May 2026 14:15:39 -0400 Subject: [PATCH 3/6] Fix cargo docs. --- datafusion/datasource/src/table_schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 9e2a5b49d973f..4dd642c940c4a 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -186,7 +186,7 @@ impl TableSchema { /// `row_number` column) rather than being stored in the files or derived /// from partition paths. Each field must carry an arrow virtual extension /// type so the reader can recognize it; `ParquetOpener` forwards these - /// fields to [`parquet::arrow::arrow_reader::ArrowReaderOptions::with_virtual_columns`]. + /// fields to `parquet::arrow::arrow_reader::ArrowReaderOptions::with_virtual_columns`. /// /// Virtual columns are appended at the end of the table schema, after any /// partition columns. From 8d455c5516f11e5f06dbeece650214215df2ee1f Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 5 May 2026 16:24:57 -0400 Subject: [PATCH 4/6] Address PR feedback. --- datafusion/datasource-parquet/src/mod.rs | 2 + datafusion/datasource-parquet/src/opener.rs | 216 +++++++++++++++++--- datafusion/datasource-parquet/src/source.rs | 73 ++++++- datafusion/datasource/src/table_schema.rs | 25 +++ 4 files changed, 285 insertions(+), 31 deletions(-) diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 9a907f4118a86..ecfae78cf1a0f 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -36,6 +36,7 @@ mod row_group_filter; mod sort; pub mod source; mod supported_predicates; +mod virtual_column; mod writer; pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; @@ -46,4 +47,5 @@ pub use reader::*; // Expose so downstream crates can use it pub use row_filter::build_row_filter; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; +pub use virtual_column::ParquetVirtualColumn; pub use writer::plan_to_parquet; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index c2f4881ded080..37c5b8c9bad20 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -22,7 +22,8 @@ use crate::row_filter::build_projection_read_plan; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, - apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, + ParquetVirtualColumn, apply_file_schema_type_coercions, coerce_int96_to_resolution, + row_filter, }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; @@ -39,14 +40,15 @@ use std::sync::Arc; use std::task::{Context, Poll}; use arrow::datatypes::{FieldRef, Schema, SchemaRef, TimeUnit}; -use arrow_schema::extension::ExtensionType; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, not_impl_err, }; use datafusion_datasource::{PartitionedFile, TableSchema}; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ @@ -68,7 +70,6 @@ use futures::{ use log::debug; use parquet::DecodeResult; use parquet::arrow::ParquetRecordBatchStreamBuilder; -use parquet::arrow::RowNumber; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, @@ -543,6 +544,14 @@ impl ParquetMorselizer { partitioned_file: PartitionedFile, ) -> Result { validate_supported_virtual_columns(self.table_schema.virtual_columns())?; + if self.pushdown_filters + && let Some(predicate) = self.predicate.as_ref() + { + validate_predicate_does_not_reference_virtual_columns( + predicate, + self.table_schema.virtual_columns(), + )?; + } let file_range = partitioned_file.range.clone(); let extensions = partitioned_file.extensions.clone(); let file_name = partitioned_file.object_meta.location.to_string(); @@ -1381,14 +1390,6 @@ impl PushDecoderStreamState { type ConstantColumns = HashMap; -/// Allowlist of Arrow extension types for Parquet virtual columns that this -/// opener is tested against. Only add entries here when a corresponding -/// end-to-end test exists in `mod test::virtual_columns`; arrow-rs may expose -/// additional virtual extension types over time, and silently forwarding -/// untested ones risks producing columns that don't round-trip through -/// DataFusion's predicate and projection paths as expected. -const SUPPORTED_VIRTUAL_EXTENSION_TYPES: &[&str] = &[RowNumber::NAME]; - /// Return `base` unchanged when `extra` is empty; otherwise build a new schema /// with `extra` appended to `base`'s fields. fn append_fields(base: &SchemaRef, extra: &[FieldRef]) -> SchemaRef { @@ -1404,26 +1405,65 @@ fn append_fields(base: &SchemaRef, extra: &[FieldRef]) -> SchemaRef { Arc::new(Schema::new(fields)) } +/// Validate that each field is a DataFusion-supported parquet virtual column +/// by round-tripping it through [`ParquetVirtualColumn::try_from`]. Adding a +/// new supported extension type means adding a variant there — not editing a +/// stringly-typed allowlist here. fn validate_supported_virtual_columns(virtual_columns: &[FieldRef]) -> Result<()> { for field in virtual_columns { - let Some(name) = field.extension_type_name() else { - return not_impl_err!( - "Virtual column '{}' is missing an Arrow extension type; \ - virtual columns must carry one of: {:?}", - field.name(), - SUPPORTED_VIRTUAL_EXTENSION_TYPES - ); - }; - if !SUPPORTED_VIRTUAL_EXTENSION_TYPES.contains(&name) { - return not_impl_err!( - "Virtual column '{}' uses unsupported Arrow extension type '{}'; \ - supported types: {:?}. Add the extension type to \ - SUPPORTED_VIRTUAL_EXTENSION_TYPES together with a test covering it.", - field.name(), - name, - SUPPORTED_VIRTUAL_EXTENSION_TYPES - ); - } + ParquetVirtualColumn::try_from(Arc::clone(field))?; + } + Ok(()) +} + +/// Reject predicates that reference a virtual column when filter pushdown is +/// enabled. +/// +/// arrow-rs's `RowFilter` evaluates predicates against a `ProjectionMask` that +/// addresses parquet leaves only; virtual columns (e.g. `row_number`) are +/// synthesized by the reader *after* filter evaluation and cannot be referenced +/// inside a row filter. Silently dropping such a predicate would produce wrong +/// results, so we fail loudly here. +/// +/// Callers constructing a `ParquetSource` should rely on +/// `FileSource::try_pushdown_filters` (invoked by the `FilterPushdown` physical +/// optimizer rule) to classify filters correctly: filters referencing virtual +/// columns are reported as `PushedDown::No` and stay in the enclosing +/// `FilterExec`, while the scan emits the virtual columns for the filter to +/// consume. Callers building plans manually must keep the `FilterExec` above +/// the `DataSourceExec` themselves rather than setting the predicate on +/// `ParquetSource` with pushdown enabled. +fn validate_predicate_does_not_reference_virtual_columns( + predicate: &Arc, + virtual_columns: &[FieldRef], +) -> Result<()> { + if virtual_columns.is_empty() { + return Ok(()); + } + let mut offender: Option = None; + predicate.apply(|node: &Arc| { + if let Some(column) = node.downcast_ref::() + && virtual_columns + .iter() + .any(|f| f.name().as_str() == column.name()) + { + offender = Some(column.name().to_string()); + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + })?; + if let Some(name) = offender { + return not_impl_err!( + "Predicate references virtual column '{name}' while \ + pushdown_filters=true. DataFusion cannot push predicates on \ + virtual columns into the Parquet reader: arrow-rs's RowFilter \ + operates on a ProjectionMask over file leaves, and virtual \ + columns (e.g. row_number) are synthesized by the reader after \ + filter evaluation. Either leave the filter in a FilterExec above \ + the scan (this is what `ParquetSource::try_pushdown_filters` / \ + the FilterPushdown optimizer rule will do automatically), or \ + disable pushdown via `with_pushdown_filters(false)`." + ); } Ok(()) } @@ -3177,5 +3217,123 @@ mod test { "error should name the unsupported extension type, got: {msg}" ); } + + #[tokio::test] + async fn test_row_index_predicate_pushdown_mixed_or_errors() { + // Mixed `row_number = 2 OR value = 4` with pushdown_filters=true. + // Silently dropping this in the scan would return all 5 rows; we + // instead surface a guidance-rich error so callers realize the + // predicate needs to stay above the scan. + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "pushdown_mixed.parquet", 1, 5).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let expr = col("row_number") + .eq(lit(2i64)) + .or(col("value").eq(lit(4i64))); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .with_predicate(predicate) + .with_pushdown_filters(true) + .build(); + + let file = PartitionedFile::new( + "pushdown_mixed.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let err = morselizer.plan_file(file).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("row_number"), + "error should name the offending virtual column, got: {msg}" + ); + assert!( + msg.contains("arrow-rs") && msg.contains("RowFilter"), + "error should explain the arrow-rs limitation, got: {msg}" + ); + assert!( + msg.contains("FilterExec") + || msg.contains("FilterPushdown") + || msg.contains("try_pushdown_filters"), + "error should point at the correct API path, got: {msg}" + ); + } + + #[tokio::test] + async fn test_row_index_predicate_pushdown_virtual_only_errors() { + // Predicate referencing only a virtual column with pushdown_filters=true. + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "pushdown_virtual_only.parquet", 1, 5).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let expr = col("row_number").eq(lit(2i64)); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .with_predicate(predicate) + .with_pushdown_filters(true) + .build(); + + let file = PartitionedFile::new( + "pushdown_virtual_only.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let err = morselizer.plan_file(file).unwrap_err(); + assert!(err.to_string().contains("row_number")); + } + + #[tokio::test] + async fn test_row_index_predicate_allowed_when_pushdown_disabled() { + // Same predicate, but pushdown_filters=false. The predicate is only + // used for stats pruning (which is a no-op for row_number) and must + // NOT error. + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "pushdown_off.parquet", 1, 5).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let expr = col("row_number").eq(lit(2i64)); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .with_predicate(predicate) + .build(); // pushdown_filters defaults to false + + let file = PartitionedFile::new( + "pushdown_off.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + // Should open cleanly and return all rows (no row filter applied). + let stream = open_file(&morselizer, file).await.unwrap(); + let (_batches, rows) = count_batches_and_rows(stream).await; + assert_eq!(rows, 5); + } } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..a68befabbc83b 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -678,7 +678,12 @@ impl FileSource for ParquetSource { filters: Vec>, config: &ConfigOptions, ) -> datafusion_common::Result>> { - let table_schema = self.table_schema.table_schema(); + // Use the schema excluding virtual columns: virtual columns (e.g. + // Parquet `row_number`) are produced by the reader itself and cannot + // be referenced inside a RowFilter, so predicates that reference them + // must not be marked as pushed down — otherwise the scan would + // silently drop them and produce wrong results. + let pushable_schema = self.table_schema.schema_without_virtual_columns(); // Determine if based on configs we should push filters down. // If either the table / scan itself or the config has pushdown enabled, // we will push down the filters. @@ -694,7 +699,7 @@ impl FileSource for ParquetSource { let filters: Vec = filters .into_iter() .map(|filter| { - if can_expr_be_pushed_down_with_schemas(&filter, table_schema) { + if can_expr_be_pushed_down_with_schemas(&filter, &pushable_schema) { PushedDownPredicate::supported(filter) } else { PushedDownPredicate::unsupported(filter) @@ -946,4 +951,68 @@ mod tests { assert!(source.reverse_row_groups()); assert!(source.filter().is_some()); } + + #[test] + fn test_try_pushdown_filters_rejects_virtual_column_refs() { + // Virtual columns are produced by the reader and cannot be referenced + // inside a RowFilter. `try_pushdown_filters` must report such filters + // as `PushedDown::No` so the FilterExec above the scan stays in + // place — otherwise the scan would silently drop the predicate and + // produce wrong results. + use arrow::datatypes::{DataType, Field, FieldRef, Schema}; + use datafusion_common::config::ConfigOptions; + use datafusion_datasource::TableSchema; + use datafusion_expr::{col, lit as logical_lit}; + use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_plan::filter_pushdown::PushedDown; + use parquet::arrow::RowNumber; + + let file_schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + let row_number_field: FieldRef = Arc::new( + Field::new("row_number", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let table_schema = TableSchema::from_file_schema(file_schema) + .with_virtual_columns(vec![row_number_field]); + + let source = ParquetSource::new(table_schema).with_pushdown_filters(true); + + // Three filters: pure file-col (pushable), pure virtual (not pushable), + // and a mixed OR conjunct (not pushable). + let full_schema = source.table_schema.table_schema(); + + let pushable = logical2physical(&col("value").eq(logical_lit(1i64)), full_schema); + let virtual_only = + logical2physical(&col("row_number").eq(logical_lit(2i64)), full_schema); + let mixed = logical2physical( + &col("row_number") + .eq(logical_lit(2i64)) + .or(col("value").eq(logical_lit(4i64))), + full_schema, + ); + + let config = ConfigOptions::default(); + let prop = source + .try_pushdown_filters(vec![pushable, virtual_only, mixed], &config) + .expect("try_pushdown_filters must not error"); + + assert_eq!(prop.filters.len(), 3); + assert!( + matches!(prop.filters[0], PushedDown::Yes), + "file-column filter should be pushable" + ); + assert!( + matches!(prop.filters[1], PushedDown::No), + "filter referencing only a virtual column must not be pushed down" + ); + assert!( + matches!(prop.filters[2], PushedDown::No), + "filter mixing a virtual column with a file column must not be \ + pushed down (row filter would silently drop it)" + ); + } } diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 4dd642c940c4a..51f5204d7b7a7 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -77,6 +77,10 @@ pub struct TableSchema { /// values are produced per file by the reader. Virtual column fields must /// carry an arrow extension type (e.g. `RowNumber`, `RowGroupIndex`) so the /// file reader can recognize them. + /// + /// Virtual columns are appended at the end of the table schema, after the + /// file columns and any partition columns (layout: `[file, partition, + /// virtual]`). virtual_columns: Arc>, /// Columns that are derived from the directory structure (partitioning scheme). @@ -238,6 +242,27 @@ impl TableSchema { pub fn table_schema(&self) -> &SchemaRef { &self.table_schema } + + /// Schema of columns that can be referenced by predicates pushed into the + /// file reader: file columns plus partition columns, excluding virtual + /// columns. + /// + /// Virtual columns are produced by the reader itself (e.g. Parquet + /// `row_number`) and cannot be referenced inside the reader's row filter, + /// so predicates that reference them must stay above the scan. Callers + /// deciding which filters to push down should check against this schema + /// rather than [`Self::table_schema`]. + /// + /// When there are no virtual columns this returns the same schema as + /// [`Self::table_schema`]. + pub fn schema_without_virtual_columns(&self) -> SchemaRef { + if self.virtual_columns.is_empty() { + return Arc::clone(&self.table_schema); + } + let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); + builder.extend(self.table_partition_cols.iter().cloned()); + Arc::new(builder.finish()) + } } fn build_table_schema( From dbb8f3b21d771414832dc46a0ac899cd74ca25f4 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 5 May 2026 16:28:11 -0400 Subject: [PATCH 5/6] Address PR feedback. --- .../datasource-parquet/src/virtual_column.rs | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 datafusion/datasource-parquet/src/virtual_column.rs diff --git a/datafusion/datasource-parquet/src/virtual_column.rs b/datafusion/datasource-parquet/src/virtual_column.rs new file mode 100644 index 0000000000000..8fcf10142ab6c --- /dev/null +++ b/datafusion/datasource-parquet/src/virtual_column.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Typed wrapper for parquet virtual columns. +//! +//! arrow-rs identifies virtual columns (produced by the reader, not read from +//! the file) via arrow extension types carried on the `FieldRef`. This module +//! lifts that contract into the type system: [`ParquetVirtualColumn`] is the +//! set of virtual columns DataFusion currently supports, and its +//! [`TryFrom`] impl validates the extension type at the boundary so +//! downstream code can pattern-match on variants rather than string-compare. + +use arrow::datatypes::FieldRef; +use arrow_schema::extension::ExtensionType; +use datafusion_common::{DataFusionError, Result, not_impl_err}; +use parquet::arrow::RowNumber; + +/// A parquet virtual column validated to have a supported arrow extension +/// type. +/// +/// Virtual columns are synthesized by the parquet reader (e.g. +/// [`RowNumber`] produces the absolute row number within the file) rather +/// than being read from column data. Because arrow-rs identifies them via +/// extension types on `FieldRef`, it would be easy to pass an unsupported or +/// misspelled extension type and have the error surface deep in the reader +/// pipeline; this enum forces the check to the construction boundary. +/// +/// Construct via [`TryFrom`]; add a new variant (and update the +/// `TryFrom` impl) when DataFusion gains support for another arrow-rs virtual +/// extension type. +#[derive(Debug, Clone)] +pub enum ParquetVirtualColumn { + /// Absolute row number within the parquet file. Backed by arrow-rs's + /// [`RowNumber`] extension type. + RowNumber(FieldRef), +} + +impl ParquetVirtualColumn { + /// The underlying arrow field, with its extension type and metadata + /// preserved. Suitable for passing to + /// `ArrowReaderOptions::with_virtual_columns`. + pub fn field(&self) -> &FieldRef { + match self { + Self::RowNumber(field) => field, + } + } +} + +impl From for FieldRef { + fn from(col: ParquetVirtualColumn) -> Self { + match col { + ParquetVirtualColumn::RowNumber(field) => field, + } + } +} + +impl TryFrom for ParquetVirtualColumn { + type Error = DataFusionError; + + fn try_from(field: FieldRef) -> Result { + let Some(name) = field.extension_type_name() else { + return not_impl_err!( + "Virtual column '{}' is missing an Arrow extension type; \ + supported extension types: [{}]", + field.name(), + RowNumber::NAME + ); + }; + match name { + n if n == RowNumber::NAME => Ok(Self::RowNumber(field)), + other => not_impl_err!( + "Virtual column '{}' uses unsupported Arrow extension type '{}'; \ + supported types: [{}]. Add a ParquetVirtualColumn variant and \ + a test for this type before wiring it through.", + field.name(), + other, + RowNumber::NAME + ), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field}; + use std::sync::Arc; + + #[test] + fn row_number_field_converts() { + let field: FieldRef = Arc::new( + Field::new("row_number", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let col = + ParquetVirtualColumn::try_from(Arc::clone(&field)).expect("valid row_number"); + assert!(matches!(col, ParquetVirtualColumn::RowNumber(_))); + assert_eq!(col.field().name(), "row_number"); + } + + #[test] + fn missing_extension_type_rejected() { + let field: FieldRef = Arc::new(Field::new("plain", DataType::Int64, false)); + let err = ParquetVirtualColumn::try_from(field).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("missing an Arrow extension type"), + "got: {msg}" + ); + } + + #[test] + fn unsupported_extension_type_rejected() { + // RowGroupIndex is a real arrow-rs virtual type not yet in our enum. + let field: FieldRef = Arc::new( + Field::new("row_group_index", DataType::Int64, false) + .with_extension_type(parquet::arrow::RowGroupIndex), + ); + let err = ParquetVirtualColumn::try_from(field).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("parquet.virtual.row_group_index"), + "error should name the offending extension type, got: {msg}" + ); + } +} From bd513ecc759e4fc8ec7086b796364dd62241f332 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 5 May 2026 16:45:43 -0400 Subject: [PATCH 6/6] Address PR feedback. --- datafusion/datasource-parquet/src/opener.rs | 161 +++++++----------- datafusion/datasource-parquet/src/source.rs | 2 - .../datasource-parquet/src/virtual_column.rs | 47 ++--- datafusion/datasource/src/table_schema.rs | 4 +- 4 files changed, 79 insertions(+), 135 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 37c5b8c9bad20..a35f1c23615f9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -44,8 +44,8 @@ use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{ - ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, - not_impl_err, + ColumnStatistics, DataFusionError, HashSet, Result, ScalarValue, Statistics, + exec_err, not_impl_err, }; use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_physical_expr::expressions::Column; @@ -1411,7 +1411,7 @@ fn append_fields(base: &SchemaRef, extra: &[FieldRef]) -> SchemaRef { /// stringly-typed allowlist here. fn validate_supported_virtual_columns(virtual_columns: &[FieldRef]) -> Result<()> { for field in virtual_columns { - ParquetVirtualColumn::try_from(Arc::clone(field))?; + ParquetVirtualColumn::try_from(field)?; } Ok(()) } @@ -1425,14 +1425,11 @@ fn validate_supported_virtual_columns(virtual_columns: &[FieldRef]) -> Result<() /// inside a row filter. Silently dropping such a predicate would produce wrong /// results, so we fail loudly here. /// -/// Callers constructing a `ParquetSource` should rely on -/// `FileSource::try_pushdown_filters` (invoked by the `FilterPushdown` physical -/// optimizer rule) to classify filters correctly: filters referencing virtual -/// columns are reported as `PushedDown::No` and stay in the enclosing -/// `FilterExec`, while the scan emits the virtual columns for the filter to -/// consume. Callers building plans manually must keep the `FilterExec` above -/// the `DataSourceExec` themselves rather than setting the predicate on -/// `ParquetSource` with pushdown enabled. +/// `ParquetSource::try_pushdown_filters` already classifies virtual-column +/// filters as `PushedDown::No` so the `FilterPushdown` optimizer leaves them +/// above the scan; this check is defense-in-depth for callers that build plans +/// manually and set `with_pushdown_filters(true)` alongside a predicate +/// referencing virtual columns. fn validate_predicate_does_not_reference_virtual_columns( predicate: &Arc, virtual_columns: &[FieldRef], @@ -1440,12 +1437,12 @@ fn validate_predicate_does_not_reference_virtual_columns( if virtual_columns.is_empty() { return Ok(()); } + let virtual_names: HashSet<&str> = + virtual_columns.iter().map(|f| f.name().as_str()).collect(); let mut offender: Option = None; predicate.apply(|node: &Arc| { if let Some(column) = node.downcast_ref::() - && virtual_columns - .iter() - .any(|f| f.name().as_str() == column.name()) + && virtual_names.contains(column.name()) { offender = Some(column.name().to_string()); return Ok(TreeNodeRecursion::Stop); @@ -1455,14 +1452,9 @@ fn validate_predicate_does_not_reference_virtual_columns( if let Some(name) = offender { return not_impl_err!( "Predicate references virtual column '{name}' while \ - pushdown_filters=true. DataFusion cannot push predicates on \ - virtual columns into the Parquet reader: arrow-rs's RowFilter \ - operates on a ProjectionMask over file leaves, and virtual \ - columns (e.g. row_number) are synthesized by the reader after \ - filter evaluation. Either leave the filter in a FilterExec above \ - the scan (this is what `ParquetSource::try_pushdown_filters` / \ - the FilterPushdown optimizer rule will do automatically), or \ - disable pushdown via `with_pushdown_filters(false)`." + pushdown_filters=true; predicates on virtual columns must be \ + evaluated above the scan. Disable filter pushdown or leave the \ + filter above the scan." ); } Ok(()) @@ -3218,39 +3210,50 @@ mod test { ); } - #[tokio::test] - async fn test_row_index_predicate_pushdown_mixed_or_errors() { - // Mixed `row_number = 2 OR value = 4` with pushdown_filters=true. - // Silently dropping this in the scan would return all 5 rows; we - // instead surface a guidance-rich error so callers realize the - // predicate needs to stay above the scan. - let store = Arc::new(InMemory::new()) as Arc; - let (file_schema, data_size) = - write_grouped_file(&store, "pushdown_mixed.parquet", 1, 5).await; - + /// Build a morselizer + file for a 5-row single-row-group parquet at + /// `path`, with a single `row_number` virtual column and the given + /// physical predicate applied to + /// `table_schema = [value(0), row_number(1)]`. + async fn build_pushdown_morselizer( + store: &Arc, + path: &str, + predicate_expr: datafusion_expr::Expr, + pushdown_filters: bool, + ) -> (ParquetMorselizer, PartitionedFile) { + let (file_schema, data_size) = write_grouped_file(store, path, 1, 5).await; let rn_field = row_number_field("row_number", false); let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) .with_virtual_columns(vec![Arc::clone(&rn_field)]); let projection = ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); - - let expr = col("row_number") - .eq(lit(2i64)) - .or(col("value").eq(lit(4i64))); - let predicate = logical2physical(&expr, table_schema.table_schema()); + let predicate = + logical2physical(&predicate_expr, table_schema.table_schema()); let morselizer = ParquetMorselizerBuilder::new() - .with_store(Arc::clone(&store)) + .with_store(Arc::clone(store)) .with_table_schema(table_schema) .with_projection(projection) .with_predicate(predicate) - .with_pushdown_filters(true) + .with_pushdown_filters(pushdown_filters) .build(); - let file = PartitionedFile::new( - "pushdown_mixed.parquet".to_string(), - u64::try_from(data_size).unwrap(), - ); + let file = + PartitionedFile::new(path.to_string(), u64::try_from(data_size).unwrap()); + (morselizer, file) + } + + #[tokio::test] + async fn test_row_index_predicate_pushdown_mixed_or_errors() { + // Silent drop in the scan would return all 5 rows; we want a loud + // error instead. + let store = Arc::new(InMemory::new()) as Arc; + let expr = col("row_number") + .eq(lit(2i64)) + .or(col("value").eq(lit(4i64))); + let (morselizer, file) = + build_pushdown_morselizer(&store, "pushdown_mixed.parquet", expr, true) + .await; + let err = morselizer.plan_file(file).unwrap_err(); let msg = err.to_string(); assert!( @@ -3258,79 +3261,39 @@ mod test { "error should name the offending virtual column, got: {msg}" ); assert!( - msg.contains("arrow-rs") && msg.contains("RowFilter"), - "error should explain the arrow-rs limitation, got: {msg}" - ); - assert!( - msg.contains("FilterExec") - || msg.contains("FilterPushdown") - || msg.contains("try_pushdown_filters"), - "error should point at the correct API path, got: {msg}" + msg.contains("virtual column") && msg.contains("pushdown"), + "error should explain the virtual-column + pushdown context, \ + got: {msg}" ); } #[tokio::test] async fn test_row_index_predicate_pushdown_virtual_only_errors() { - // Predicate referencing only a virtual column with pushdown_filters=true. let store = Arc::new(InMemory::new()) as Arc; - let (file_schema, data_size) = - write_grouped_file(&store, "pushdown_virtual_only.parquet", 1, 5).await; - - let rn_field = row_number_field("row_number", false); - let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) - .with_virtual_columns(vec![Arc::clone(&rn_field)]); - let projection = - ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); - let expr = col("row_number").eq(lit(2i64)); - let predicate = logical2physical(&expr, table_schema.table_schema()); - - let morselizer = ParquetMorselizerBuilder::new() - .with_store(Arc::clone(&store)) - .with_table_schema(table_schema) - .with_projection(projection) - .with_predicate(predicate) - .with_pushdown_filters(true) - .build(); + let (morselizer, file) = build_pushdown_morselizer( + &store, + "pushdown_virtual_only.parquet", + expr, + true, + ) + .await; - let file = PartitionedFile::new( - "pushdown_virtual_only.parquet".to_string(), - u64::try_from(data_size).unwrap(), - ); let err = morselizer.plan_file(file).unwrap_err(); assert!(err.to_string().contains("row_number")); } #[tokio::test] async fn test_row_index_predicate_allowed_when_pushdown_disabled() { - // Same predicate, but pushdown_filters=false. The predicate is only - // used for stats pruning (which is a no-op for row_number) and must - // NOT error. + // Guards the `pushdown_filters=false` path: the predicate is only + // used for stats pruning (a no-op for row_number) and must not + // trip the pushdown-guard error. let store = Arc::new(InMemory::new()) as Arc; - let (file_schema, data_size) = - write_grouped_file(&store, "pushdown_off.parquet", 1, 5).await; - - let rn_field = row_number_field("row_number", false); - let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) - .with_virtual_columns(vec![Arc::clone(&rn_field)]); - let projection = - ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); - let expr = col("row_number").eq(lit(2i64)); - let predicate = logical2physical(&expr, table_schema.table_schema()); - - let morselizer = ParquetMorselizerBuilder::new() - .with_store(Arc::clone(&store)) - .with_table_schema(table_schema) - .with_projection(projection) - .with_predicate(predicate) - .build(); // pushdown_filters defaults to false + let (morselizer, file) = + build_pushdown_morselizer(&store, "pushdown_off.parquet", expr, false) + .await; - let file = PartitionedFile::new( - "pushdown_off.parquet".to_string(), - u64::try_from(data_size).unwrap(), - ); - // Should open cleanly and return all rows (no row filter applied). let stream = open_file(&morselizer, file).await.unwrap(); let (_batches, rows) = count_batches_and_rows(stream).await; assert_eq!(rows, 5); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a68befabbc83b..d08b8a03729e1 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -981,8 +981,6 @@ mod tests { let source = ParquetSource::new(table_schema).with_pushdown_filters(true); - // Three filters: pure file-col (pushable), pure virtual (not pushable), - // and a mixed OR conjunct (not pushable). let full_schema = source.table_schema.table_schema(); let pushable = logical2physical(&col("value").eq(logical_lit(1i64)), full_schema); diff --git a/datafusion/datasource-parquet/src/virtual_column.rs b/datafusion/datasource-parquet/src/virtual_column.rs index 8fcf10142ab6c..2290ad2aeab9d 100644 --- a/datafusion/datasource-parquet/src/virtual_column.rs +++ b/datafusion/datasource-parquet/src/virtual_column.rs @@ -17,29 +17,21 @@ //! Typed wrapper for parquet virtual columns. //! -//! arrow-rs identifies virtual columns (produced by the reader, not read from -//! the file) via arrow extension types carried on the `FieldRef`. This module -//! lifts that contract into the type system: [`ParquetVirtualColumn`] is the -//! set of virtual columns DataFusion currently supports, and its -//! [`TryFrom`] impl validates the extension type at the boundary so -//! downstream code can pattern-match on variants rather than string-compare. +//! arrow-rs identifies virtual columns via arrow extension types carried on +//! the `FieldRef`. [`ParquetVirtualColumn`] lifts that contract into the type +//! system so callers validate at the boundary (via `TryFrom<&FieldRef>`) +//! rather than string-comparing extension-type names deep inside the reader. use arrow::datatypes::FieldRef; use arrow_schema::extension::ExtensionType; use datafusion_common::{DataFusionError, Result, not_impl_err}; use parquet::arrow::RowNumber; +use std::sync::Arc; /// A parquet virtual column validated to have a supported arrow extension /// type. /// -/// Virtual columns are synthesized by the parquet reader (e.g. -/// [`RowNumber`] produces the absolute row number within the file) rather -/// than being read from column data. Because arrow-rs identifies them via -/// extension types on `FieldRef`, it would be easy to pass an unsupported or -/// misspelled extension type and have the error surface deep in the reader -/// pipeline; this enum forces the check to the construction boundary. -/// -/// Construct via [`TryFrom`]; add a new variant (and update the +/// Construct via [`TryFrom<&FieldRef>`]; add a new variant (and update the /// `TryFrom` impl) when DataFusion gains support for another arrow-rs virtual /// extension type. #[derive(Debug, Clone)] @@ -50,9 +42,6 @@ pub enum ParquetVirtualColumn { } impl ParquetVirtualColumn { - /// The underlying arrow field, with its extension type and metadata - /// preserved. Suitable for passing to - /// `ArrowReaderOptions::with_virtual_columns`. pub fn field(&self) -> &FieldRef { match self { Self::RowNumber(field) => field, @@ -68,10 +57,10 @@ impl From for FieldRef { } } -impl TryFrom for ParquetVirtualColumn { +impl TryFrom<&FieldRef> for ParquetVirtualColumn { type Error = DataFusionError; - fn try_from(field: FieldRef) -> Result { + fn try_from(field: &FieldRef) -> Result { let Some(name) = field.extension_type_name() else { return not_impl_err!( "Virtual column '{}' is missing an Arrow extension type; \ @@ -81,7 +70,7 @@ impl TryFrom for ParquetVirtualColumn { ); }; match name { - n if n == RowNumber::NAME => Ok(Self::RowNumber(field)), + n if n == RowNumber::NAME => Ok(Self::RowNumber(Arc::clone(field))), other => not_impl_err!( "Virtual column '{}' uses unsupported Arrow extension type '{}'; \ supported types: [{}]. Add a ParquetVirtualColumn variant and \ @@ -98,7 +87,6 @@ impl TryFrom for ParquetVirtualColumn { mod tests { use super::*; use arrow::datatypes::{DataType, Field}; - use std::sync::Arc; #[test] fn row_number_field_converts() { @@ -106,8 +94,7 @@ mod tests { Field::new("row_number", DataType::Int64, false) .with_extension_type(RowNumber), ); - let col = - ParquetVirtualColumn::try_from(Arc::clone(&field)).expect("valid row_number"); + let col = ParquetVirtualColumn::try_from(&field).expect("valid row_number"); assert!(matches!(col, ParquetVirtualColumn::RowNumber(_))); assert_eq!(col.field().name(), "row_number"); } @@ -115,11 +102,10 @@ mod tests { #[test] fn missing_extension_type_rejected() { let field: FieldRef = Arc::new(Field::new("plain", DataType::Int64, false)); - let err = ParquetVirtualColumn::try_from(field).unwrap_err(); - let msg = err.to_string(); + let err = ParquetVirtualColumn::try_from(&field).unwrap_err(); assert!( - msg.contains("missing an Arrow extension type"), - "got: {msg}" + err.to_string().contains("missing an Arrow extension type"), + "got: {err}" ); } @@ -130,11 +116,10 @@ mod tests { Field::new("row_group_index", DataType::Int64, false) .with_extension_type(parquet::arrow::RowGroupIndex), ); - let err = ParquetVirtualColumn::try_from(field).unwrap_err(); - let msg = err.to_string(); + let err = ParquetVirtualColumn::try_from(&field).unwrap_err(); assert!( - msg.contains("parquet.virtual.row_group_index"), - "error should name the offending extension type, got: {msg}" + err.to_string().contains("parquet.virtual.row_group_index"), + "error should name the offending extension type, got: {err}" ); } } diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 51f5204d7b7a7..efa46618f1691 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -259,9 +259,7 @@ impl TableSchema { if self.virtual_columns.is_empty() { return Arc::clone(&self.table_schema); } - let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); - builder.extend(self.table_partition_cols.iter().cloned()); - Arc::new(builder.finish()) + build_table_schema(&self.file_schema, &self.table_partition_cols, &[]) } }