diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index e6266b2c088d7..9b010810afdf8 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -26,6 +26,8 @@ //! select * from data limit 10; //! ``` +use std::sync::Arc; + use arrow::compute::concat_batches; use arrow::record_batch::RecordBatch; use datafusion::physical_plan::collect; @@ -617,6 +619,91 @@ fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { } } +/// Verifies that `struct_col IS NOT NULL` is pushed into the parquet scan +/// and produces correct results, including when the struct is non-null +/// but all leaf fields are null. +#[tokio::test] +async fn struct_is_not_null_pushdown() { + use arrow::array::{Int32Array, StructArray}; + use arrow::datatypes::{DataType, Field, Fields, Schema}; + + let fields = Fields::from(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + ]); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("s", DataType::Struct(fields.clone()), true), + ])); + + // Row 0: s={a:1, b:10, c:100} → struct NOT null, leaves NOT null + // Row 1: s={a:NULL, b:NULL, c:NULL} → struct NOT null, ALL leaves null + // Row 2: s=NULL → struct IS null + // Row 3: s={a:3, b:NULL, c:300} → struct NOT null, mixed leaves + let ids = Arc::new(Int32Array::from(vec![1, 2, 3, 4])); + let a = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(3)])); + let b = Arc::new(Int32Array::from(vec![Some(10), None, None, None])); + let c = Arc::new(Int32Array::from(vec![Some(100), None, None, Some(300)])); + let struct_array = StructArray::try_new( + fields, + vec![a, b, c], + Some(vec![true, true, false, true].into()), + ) + .unwrap(); + let batch = + RecordBatch::try_new(schema.clone(), vec![ids, Arc::new(struct_array)]).unwrap(); + + let tempdir = TempDir::new().unwrap(); + let props = WriterProperties::builder().build(); + let test_file = TestParquetFile::try_new( + tempdir.path().join("struct.parquet"), + props, + vec![batch], + ) + .unwrap(); + + let scan_options = ParquetScanOptions { + pushdown_filters: true, + enable_page_index: false, + reorder_filters: false, + }; + let ctx = SessionContext::new_with_config(scan_options.config()); + let filter = col("s").is_not_null(); + let exec = test_file.create_scan(&ctx, Some(filter)).await.unwrap(); + let result = collect(exec.clone(), ctx.task_ctx()).await.unwrap(); + + // Verify correct rows: 1, 2, 4 (row 2 has non-null struct with null leaves) + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, 3, + "Expected 3 rows (struct non-null), got {total_rows}" + ); + + let batch = concat_batches(&test_file.schema(), &result).unwrap(); + let id_col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ids: Vec = (0..id_col.len()).map(|i| id_col.value(i)).collect(); + assert_eq!( + ids, + vec![1, 2, 4], + "Row 2 (null-leaves, non-null struct) must be included" + ); + + // Verify pushdown metrics: filter was evaluated at the row level + let metrics = TestParquetFile::parquet_metrics(&exec).expect("found metrics"); + let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned"); + let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched"); + assert_eq!( + pushdown_rows_matched, 3, + "Expected 3 rows matched by pushdown" + ); + assert_eq!(pushdown_rows_pruned, 1, "Expected 1 row pruned by pushdown"); +} + #[tokio::test] async fn predicate_cache_default() -> datafusion_common::Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/datasource-parquet/benches/parquet_struct_filter_pushdown.rs b/datafusion/datasource-parquet/benches/parquet_struct_filter_pushdown.rs index b52408d4222d8..26bd627766785 100644 --- a/datafusion/datasource-parquet/benches/parquet_struct_filter_pushdown.rs +++ b/datafusion/datasource-parquet/benches/parquet_struct_filter_pushdown.rs @@ -349,5 +349,224 @@ fn build_struct_batch( )?) } -criterion_group!(benches, parquet_struct_filter_pushdown); +// --------------------------------------------------------------------------- +// Benchmark: struct IS NOT NULL pushdown +// +// Uses a separate dataset with a NULLABLE struct containing many leaf fields. +// Compares scanning with vs without row-level pushdown for `s IS NOT NULL`. +// +// The key metric: with pushdown, only 1 leaf column is read for the null +// check (via definition levels); without pushdown, ALL leaf columns are +// decoded to materialize the struct and then check nullability post-scan. +// --------------------------------------------------------------------------- + +const NULLABLE_STRUCT_COLUMN_NAME: &str = "s"; +/// Number of leaf fields inside the nullable struct. +/// More fields = bigger difference between pushdown and no-pushdown. +const NUM_STRUCT_FIELDS: usize = 12; +/// Fraction of rows where the struct is null (~10%). +const NULL_FRACTION: usize = 10; + +struct NullableBenchmarkDataset { + _tempdir: TempDir, + file_path: PathBuf, +} + +impl NullableBenchmarkDataset { + fn path(&self) -> &Path { + &self.file_path + } +} + +static NULLABLE_DATASET: LazyLock = LazyLock::new(|| { + create_nullable_dataset() + .expect("failed to prepare nullable struct benchmark dataset") +}); + +fn nullable_struct_schema() -> SchemaRef { + let struct_fields: Vec = (0..NUM_STRUCT_FIELDS) + .map(|i| Field::new(format!("f{i}"), DataType::Utf8, true)) + .collect(); + Arc::new(Schema::new(vec![ + Field::new(ID_COLUMN_NAME, DataType::Int32, false), + Field::new( + NULLABLE_STRUCT_COLUMN_NAME, + DataType::Struct(Fields::from(struct_fields)), + true, + ), + ])) +} + +fn create_nullable_dataset() -> datafusion_common::Result { + let tempdir = TempDir::new()?; + let file_path = tempdir.path().join("struct_nullable_filter.parquet"); + + let schema = nullable_struct_schema(); + let writer_props = WriterProperties::builder() + .set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT)) + .build(); + + let mut writer = ArrowWriter::try_new( + std::fs::File::create(&file_path)?, + Arc::clone(&schema), + Some(writer_props), + )?; + + for rg_idx in 0..TOTAL_ROW_GROUPS { + let batch = build_nullable_struct_batch(&schema, rg_idx, ROW_GROUP_ROW_COUNT)?; + writer.write(&batch)?; + } + + writer.close()?; + Ok(NullableBenchmarkDataset { + _tempdir: tempdir, + file_path, + }) +} + +fn build_nullable_struct_batch( + schema: &SchemaRef, + _rg_idx: usize, + len: usize, +) -> datafusion_common::Result { + use arrow::array::NullBufferBuilder; + + let large_string: String = "x".repeat(LARGE_STRING_LEN); + let id_array = Arc::new(Int32Array::from_iter_values(0..len as i32)); + + // Build struct fields — each leaf is a large string column + let fields: Vec<(Arc, Arc)> = (0..NUM_STRUCT_FIELDS) + .map(|i| { + let mut builder = StringBuilder::new(); + for _ in 0..len { + builder.append_value(&large_string); + } + ( + Arc::new(Field::new(format!("f{i}"), DataType::Utf8, true)), + Arc::new(builder.finish()) as Arc, + ) + }) + .collect(); + + // ~10% of rows have null struct + let mut null_buffer = NullBufferBuilder::new(len); + for row in 0..len { + null_buffer.append(row % NULL_FRACTION != 0); + } + let struct_array = StructArray::try_new( + Fields::from( + fields + .iter() + .map(|(f, _)| Arc::clone(f)) + .collect::>(), + ), + fields.into_iter().map(|(_, a)| a).collect(), + null_buffer.finish(), + )?; + + Ok(RecordBatch::try_new( + Arc::clone(schema), + vec![id_array, Arc::new(struct_array)], + )?) +} + +/// `s IS NOT NULL` +fn struct_is_not_null_expr() -> Expr { + col(NULLABLE_STRUCT_COLUMN_NAME).is_not_null() +} + +/// `s IS NULL` +fn struct_is_null_expr() -> Expr { + col(NULLABLE_STRUCT_COLUMN_NAME).is_null() +} + +fn expected_non_null_rows() -> usize { + // rows where row % NULL_FRACTION != 0 + TOTAL_ROWS - TOTAL_ROWS / NULL_FRACTION +} + +fn expected_null_rows() -> usize { + TOTAL_ROWS / NULL_FRACTION +} + +fn parquet_struct_null_check_pushdown(c: &mut Criterion) { + let dataset_path = NULLABLE_DATASET.path().to_owned(); + let mut group = c.benchmark_group("parquet_struct_null_check_pushdown"); + group.throughput(Throughput::Elements(TOTAL_ROWS as u64)); + + // Scenario 1: SELECT * FROM t WHERE s IS NOT NULL — no pushdown + // Without pushdown, ALL 12 leaf columns of the struct are decoded + // to materialize the struct, then IS NOT NULL is checked post-scan. + group.bench_function("select_star/no_pushdown", |b| { + let file_schema = setup_reader(&dataset_path); + let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema); + b.iter(|| { + let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all()) + .expect("scan succeeded"); + assert_eq!(matched, expected_non_null_rows()); + }); + }); + + // Scenario 2: SELECT * FROM t WHERE s IS NOT NULL — with pushdown + // With pushdown, only 1 leaf column is read for the null check. + // Remaining leaves are read only for matched rows. + group.bench_function("select_star/with_pushdown", |b| { + let file_schema = setup_reader(&dataset_path); + let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema); + b.iter(|| { + let matched = scan(&dataset_path, &predicate, true, ProjectionMask::all()) + .expect("scan succeeded"); + assert_eq!(matched, expected_non_null_rows()); + }); + }); + + // Scenario 3: SELECT id FROM t WHERE s IS NOT NULL — no pushdown + // Without pushdown we must read all columns to materialize the struct + // for post-scan IS NOT NULL evaluation, so ProjectionMask::all() is + // correct here even though the query only needs `id` in the output. + group.bench_function("select_id/no_pushdown", |b| { + let file_schema = setup_reader(&dataset_path); + let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema); + b.iter(|| { + let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all()) + .expect("scan succeeded"); + assert_eq!(matched, expected_non_null_rows()); + }); + }); + + // Scenario 4: SELECT id FROM t WHERE s IS NOT NULL — with pushdown + // Best case: pushdown reads 1 leaf for null check, output reads only `id`. + // The 12 struct leaves are never decoded at all. + group.bench_function("select_id/with_pushdown", |b| { + let file_schema = setup_reader(&dataset_path); + let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema); + let id_only = id_projection(&dataset_path); + b.iter(|| { + let matched = scan(&dataset_path, &predicate, true, id_only.clone()) + .expect("scan succeeded"); + assert_eq!(matched, expected_non_null_rows()); + }); + }); + + // Scenario 5: SELECT id FROM t WHERE s IS NULL — with pushdown + // Verify IS NULL pushdown works symmetrically with IS NOT NULL. + group.bench_function("select_id_is_null/with_pushdown", |b| { + let file_schema = setup_reader(&dataset_path); + let predicate = logical2physical(&struct_is_null_expr(), &file_schema); + let id_only = id_projection(&dataset_path); + b.iter(|| { + let matched = scan(&dataset_path, &predicate, true, id_only.clone()) + .expect("scan succeeded"); + assert_eq!(matched, expected_null_rows()); + }); + }); + + group.finish(); +} + +criterion_group!( + benches, + parquet_struct_filter_pushdown, + parquet_struct_null_check_pushdown +); criterion_main!(benches); diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 67b65321d9bff..f5944f20bdbb0 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -57,13 +57,16 @@ //! List-aware predicates (for example, `array_has`, `array_has_all`, and //! `array_has_any`) can be evaluated directly during Parquet decoding. //! Struct field access via `get_field` is also supported when the accessed -//! leaf is a primitive type. Filters that reference entire struct columns -//! rather than individual fields cannot be pushed down and are instead +//! leaf is a primitive type. IS NULL and IS NOT NULL checks on whole struct +//! columns are also pushed down, reading only a single leaf column to +//! reconstruct the struct's null bitmap from definition levels. Other filters +//! that reference entire struct columns cannot be pushed down and are instead //! evaluated after the full batches are materialized. //! //! For example, given a struct column `s {name: Utf8, value: Int32}`: //! - `WHERE s['value'] > 5` — pushed down (accesses a primitive leaf) -//! - `WHERE s IS NOT NULL` — not pushed down (references the whole struct) +//! - `WHERE s IS NOT NULL` — pushed down (reads one leaf for null bitmap) +//! - `WHERE s = ROW(...)` — not pushed down (references the whole struct) use std::collections::BTreeSet; use std::sync::Arc; @@ -82,7 +85,7 @@ use datafusion_common::Result; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_physical_expr::ScalarFunctionExpr; -use datafusion_physical_expr::expressions::{Column, Literal}; +use datafusion_physical_expr::expressions::{Column, IsNotNullExpr, IsNullExpr, Literal}; use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; @@ -265,6 +268,10 @@ struct PushdownChecker<'schema> { required_columns: Vec, /// Struct field accesses via `get_field`. struct_field_accesses: Vec, + /// Root column indices of struct columns used in IS NULL / IS NOT NULL. + /// These need only a single leaf column to evaluate (the struct's null bitmap + /// is derived from definition levels of any leaf). + struct_null_checks: Vec, /// Whether nested list columns are supported by the predicate semantics. allow_list_columns: bool, /// The Arrow schema of the parquet file. @@ -278,6 +285,7 @@ impl<'schema> PushdownChecker<'schema> { projected_columns: false, required_columns: Vec::new(), struct_field_accesses: Vec::new(), + struct_null_checks: Vec::new(), allow_list_columns, file_schema, } @@ -370,6 +378,33 @@ impl<'schema> PushdownChecker<'schema> { self.allow_list_columns && is_list } + /// If `node` is `IS NULL(Column)` or `IS NOT NULL(Column)` where the column + /// is a struct type, record it as a struct null check and return `Jump`. + /// Otherwise return `None` so normal traversal continues. + fn try_handle_struct_null_check( + &mut self, + node: &Arc, + ) -> Option { + let column = if let Some(expr) = node.as_any().downcast_ref::() { + expr.arg().as_any().downcast_ref::() + } else if let Some(expr) = node.as_any().downcast_ref::() { + expr.arg().as_any().downcast_ref::() + } else { + None + }?; + + // If the column is missing from the file schema, return None and let + // normal traversal handle it — check_single_column will set + // projected_columns = true, correctly blocking pushdown. + let idx = self.file_schema.index_of(column.name()).ok()?; + if !matches!(self.file_schema.field(idx).data_type(), DataType::Struct(_)) { + return None; + } + + self.struct_null_checks.push(idx); + Some(TreeNodeRecursion::Jump) + } + #[inline] fn prevents_pushdown(&self) -> bool { self.non_primitive_columns || self.projected_columns @@ -384,9 +419,12 @@ impl<'schema> PushdownChecker<'schema> { fn into_sorted_columns(mut self) -> PushdownColumns { self.required_columns.sort_unstable(); self.required_columns.dedup(); + self.struct_null_checks.sort_unstable(); + self.struct_null_checks.dedup(); PushdownColumns { required_columns: self.required_columns, struct_field_accesses: self.struct_field_accesses, + struct_null_checks: self.struct_null_checks, } } } @@ -395,6 +433,16 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { type Node = Arc; fn f_down(&mut self, node: &Self::Node) -> Result { + // Handle IS NULL / IS NOT NULL on struct columns. + // + // These predicates only need the struct's null bitmap, which arrow-rs + // populates from definition levels of any single leaf column. We + // intercept here so the visitor never reaches the raw Column node + // (which would trigger `check_single_column` and reject the struct). + if let Some(recursion) = self.try_handle_struct_null_check(node) { + return Ok(recursion); + } + // Handle struct field access like `s['foo']['bar'] > 10`. // // DataFusion represents nested field access as `get_field(Column("s"), "foo")` @@ -505,6 +553,9 @@ struct PushdownColumns { /// Struct field accesses via `get_field`. Each entry records the root struct /// column index and the field path being accessed. struct_field_accesses: Vec, + /// Root indices of struct columns used only in IS NULL / IS NOT NULL checks. + /// Only a single leaf column is needed per struct to reconstruct its null bitmap. + struct_null_checks: Vec, } /// Records a struct field access via `get_field(struct_col, 'field1', 'field2', ...)`. @@ -572,6 +623,14 @@ pub(crate) fn build_parquet_read_plan( schema_descr, ); leaf_indices.extend_from_slice(&struct_leaf_indices); + + // For struct null checks, resolve to only the first leaf of each struct + let null_check_leaf_indices = resolve_struct_null_check_leaves( + &required_columns.struct_null_checks, + schema_descr, + ); + leaf_indices.extend_from_slice(&null_check_leaf_indices); + leaf_indices.sort_unstable(); leaf_indices.dedup(); @@ -584,6 +643,8 @@ pub(crate) fn build_parquet_read_plan( file_schema, root_indices, &required_columns.struct_field_accesses, + &required_columns.struct_null_checks, + schema_descr, ); Ok(Some(( @@ -716,8 +777,13 @@ pub(crate) fn build_projection_read_plan( let projection_mask = ProjectionMask::leaves(schema_descr, leaf_indices.iter().copied()); - let projected_schema = - build_filter_schema(file_schema, &all_root_indices, &all_struct_accesses); + let projected_schema = build_filter_schema( + file_schema, + &all_root_indices, + &all_struct_accesses, + &[], + schema_descr, + ); ParquetReadPlan { projection_mask, @@ -788,6 +854,52 @@ fn resolve_struct_field_leaves( leaf_indices } +/// Resolves struct null-check root indices to a single Parquet leaf each. +/// +/// For IS NULL / IS NOT NULL on a struct column, we only need one leaf to +/// reconstruct the struct's null bitmap from definition levels. We pick the +/// first leaf (in Parquet schema order) whose root is the target struct. +fn resolve_struct_null_check_leaves( + root_indices: &[usize], + schema_descr: &SchemaDescriptor, +) -> Vec { + let root_set: BTreeSet = root_indices.iter().copied().collect(); + let mut found: BTreeSet = BTreeSet::new(); + let mut result = Vec::new(); + + for leaf_idx in 0..schema_descr.num_columns() { + let root = schema_descr.get_column_root_idx(leaf_idx); + if root_set.contains(&root) && !found.contains(&root) { + found.insert(root); + result.push(leaf_idx); + if found.len() == root_set.len() { + break; + } + } + } + + result +} + +/// Returns the field path (relative to the struct root) for the first leaf column +/// of a struct. Used to build a pruned schema matching the single-leaf projection +/// mask for struct null checks. +fn first_leaf_field_path( + root_index: usize, + schema_descr: &SchemaDescriptor, +) -> Vec { + let first_leaf_idx = (0..schema_descr.num_columns()) + .find(|&leaf_idx| schema_descr.get_column_root_idx(leaf_idx) == root_index) + .expect("struct must have at least one leaf column"); + + let leaf_col = schema_descr.column(first_leaf_idx); + // parts()[0] is the struct root name, parts()[1..] is the field path + leaf_col.path().parts()[1..] + .iter() + .map(|s| s.to_string()) + .collect() +} + /// Builds a filter schema that includes only the fields actually accessed by the /// filter expression. /// @@ -799,8 +911,11 @@ fn build_filter_schema( file_schema: &Schema, regular_indices: &[usize], struct_field_accesses: &[StructFieldAccess], + struct_null_checks: &[usize], + schema_descr: &SchemaDescriptor, ) -> SchemaRef { let regular_set: BTreeSet = regular_indices.iter().copied().collect(); + let null_check_set: BTreeSet = struct_null_checks.iter().copied().collect(); let all_indices = regular_indices .iter() @@ -810,6 +925,7 @@ fn build_filter_schema( .iter() .map(|&StructFieldAccess { root_index, .. }| root_index), ) + .chain(struct_null_checks.iter().copied()) .collect::>(); let fields = all_indices @@ -821,13 +937,13 @@ fn build_filter_schema( // keep the full type // // Pruning is only valid when the column is accessed exclusively - // through struct field accesses + // through struct field accesses or null checks if regular_set.contains(&idx) { return Arc::new(field.clone()); } // collect all field paths that access this root struct column - let field_paths = struct_field_accesses + let mut field_paths = struct_field_accesses .iter() .filter_map( |&StructFieldAccess { @@ -839,6 +955,19 @@ fn build_filter_schema( ) .collect::>(); + // When this struct is used in a null check, include the first + // leaf's field path so the pruned schema matches the projection + // mask. This handles both the null-check-only case and the + // combined case (e.g., `s IS NOT NULL OR s['value'] > 5`). + let null_check_path = if null_check_set.contains(&idx) { + Some(first_leaf_field_path(idx, schema_descr)) + } else { + None + }; + if let Some(ref path) = null_check_path { + field_paths.push(path.as_slice()); + } + if field_paths.is_empty() { return Arc::new(field.clone()); } @@ -1240,7 +1369,7 @@ mod test { } #[test] - fn struct_data_structures_prevent_pushdown() { + fn struct_is_not_null_allows_pushdown() { let table_schema = Arc::new(Schema::new(vec![Field::new( "struct_col", DataType::Struct( @@ -1249,17 +1378,56 @@ mod test { true, )])); + // IS NOT NULL on a struct column should be pushable — only the struct's + // null bitmap is needed, reconstructed from a single leaf's definition levels. let expr = col("struct_col").is_not_null(); let expr = logical2physical(&expr, &table_schema); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + + #[test] + fn struct_is_null_allows_pushdown() { + let table_schema = Arc::new(Schema::new(vec![Field::new( + "struct_col", + DataType::Struct( + vec![Arc::new(Field::new("a", DataType::Int32, true))].into(), + ), + true, + )])); + + let expr = col("struct_col").is_null(); + let expr = logical2physical(&expr, &table_schema); + + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + + #[test] + fn struct_bare_reference_prevents_pushdown() { + // A bare struct column reference (not wrapped in IS NULL / IS NOT NULL) + // should still block pushdown. + let table_schema = Arc::new(Schema::new(vec![ + Field::new( + "struct_col", + DataType::Struct( + vec![Arc::new(Field::new("a", DataType::Int32, true))].into(), + ), + true, + ), + Field::new("int_col", DataType::Int32, false), + ])); + + // Expression: struct_col = int_col — not a null check, should block + let expr = col("struct_col").eq(col("int_col")); + let expr = logical2physical(&expr, &table_schema); + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } #[test] - fn mixed_primitive_and_struct_prevents_pushdown() { - // Even when a predicate contains both primitive and unsupported nested columns, - // the entire predicate should not be pushed down because the struct column - // cannot be evaluated during Parquet decoding. + fn mixed_struct_null_check_and_primitive_allows_pushdown() { + // When the struct column is used only in IS NOT NULL and the rest is + // primitive, the combined expression should be pushable. let table_schema = Arc::new(Schema::new(vec![ Field::new( "struct_col", @@ -1272,17 +1440,14 @@ mod test { ])); // Expression: (struct_col IS NOT NULL) AND (int_col = 5) - // Even though int_col is primitive, the presence of struct_col in the - // conjunction should prevent pushdown of the entire expression. let expr = col("struct_col") .is_not_null() .and(col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None))); let expr = logical2physical(&expr, &table_schema); - // The entire expression should not be pushed down - assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); - // However, just the int_col predicate alone should be pushable + // Just the int_col predicate alone should also be pushable let expr_int_only = col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None)); let expr_int_only = logical2physical(&expr_int_only, &table_schema); @@ -1292,6 +1457,147 @@ mod test { )); } + #[test] + fn not_is_null_struct_allows_pushdown() { + // NOT(IS NULL(struct_col)) — the NOT wrapping should not block pushdown. + // f_down visits NOT first (no-op), then IsNullExpr which is intercepted. + let table_schema = Arc::new(Schema::new(vec![Field::new( + "struct_col", + DataType::Struct( + vec![Arc::new(Field::new("a", DataType::Int32, true))].into(), + ), + true, + )])); + + let expr = Expr::Not(Box::new(col("struct_col").is_null())); + let expr = logical2physical(&expr, &table_schema); + + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + + #[test] + fn struct_null_check_combined_with_field_access() { + // s IS NOT NULL OR get_field(s, 'value') > 5 + // Both struct_null_checks and struct_field_accesses populated for same root. + // Verify the combined expression is accepted for pushdown and that + // the projected schema includes both the null-check's first leaf and + // the field-access leaf (no schema/mask mismatch). + let table_schema = Arc::new(Schema::new(vec![Field::new( + "s", + DataType::Struct( + vec![ + Arc::new(Field::new("a", DataType::Int32, true)), + Arc::new(Field::new("value", DataType::Int32, true)), + ] + .into(), + ), + true, + )])); + + let get_field_expr = get_field().call(vec![ + col("s"), + Expr::Literal(ScalarValue::Utf8(Some("value".to_string())), None), + ]); + let expr = col("s") + .is_not_null() + .or(get_field_expr.gt(Expr::Literal(ScalarValue::Int32(Some(5)), None))); + let expr = logical2physical(&expr, &table_schema); + + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + + /// Verify IS NOT NULL pushdown works correctly on a nested struct + /// (outer { inner { leaf: Int32 } }). + #[test] + fn nested_struct_is_not_null_pushdown_correctness() { + let inner_fields = Fields::from(vec![Field::new("leaf", DataType::Int32, true)]); + let outer_fields = Fields::from(vec![Field::new( + "inner", + DataType::Struct(inner_fields.clone()), + true, + )]); + let schema = Arc::new(Schema::new(vec![Field::new( + "outer", + DataType::Struct(outer_fields.clone()), + true, + )])); + + // Row 0: outer={inner={leaf:1}} → outer NOT null + // Row 1: outer={inner=NULL} → outer NOT null, inner null + // Row 2: outer=NULL → outer IS null + let leaf_values = Arc::new(Int32Array::from(vec![Some(1), None, None])); + let inner_array = StructArray::try_new( + inner_fields, + vec![leaf_values], + Some(vec![true, false, false].into()), // inner null bitmap + ) + .unwrap(); + let outer_array = StructArray::try_new( + outer_fields, + vec![Arc::new(inner_array)], + Some(vec![true, true, false].into()), // outer null bitmap + ) + .unwrap(); + + let batch = + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(outer_array)]) + .unwrap(); + + let file = NamedTempFile::new().expect("temp file"); + let mut writer = + ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), None) + .expect("writer"); + writer.write(&batch).expect("write batch"); + writer.close().expect("close writer"); + + let reader_file = file.reopen().expect("reopen file"); + let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file) + .expect("reader builder"); + let metadata = builder.metadata().clone(); + let file_schema = builder.schema().clone(); + + let expr = col("outer").is_not_null(); + let expr = logical2physical(&expr, &file_schema); + + let candidate = FilterCandidateBuilder::new(expr, file_schema.clone()) + .build(&metadata) + .expect("building candidate") + .expect("nested struct IS NOT NULL should be pushable"); + + // Should project only 1 leaf + let expected_mask = + ProjectionMask::leaves(metadata.file_metadata().schema_descr(), [0]); + assert_eq!(candidate.read_plan.projection_mask, expected_mask); + + // Evaluate the filter + let mut row_filter = DatafusionArrowPredicate::try_new( + candidate, + Count::new(), + Count::new(), + Time::new(), + ) + .expect("creating filter predicate"); + + let mut parquet_reader = + ParquetRecordBatchReaderBuilder::try_new(file.reopen().expect("reopen")) + .expect("reader builder") + .with_projection(row_filter.projection().clone()) + .build() + .expect("building reader"); + + let rb = parquet_reader.next().unwrap().unwrap(); + let result = row_filter.evaluate(rb).expect("evaluating filter"); + + // Row 0: outer non-null → true + // Row 1: outer non-null (inner null) → true + // Row 2: outer null → false + assert_eq!( + result, + BooleanArray::from(vec![true, true, false]), + "outer IS NOT NULL must reflect outer nullability only" + ); + } + #[test] fn nested_lists_allow_pushdown_checks() { let table_schema = Arc::new(get_lists_table_schema()); @@ -1660,6 +1966,121 @@ mod test { assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } + /// Verifies that `struct_col IS NOT NULL` pushed down as a row filter + /// produces correct results when the struct is non-null but its leaf + /// fields are null. This is the critical correctness test: the struct's + /// null bitmap must come from parquet definition levels, not from any + /// leaf's null bitmap. + #[test] + fn struct_is_not_null_pushdown_correctness() { + // Schema: struct_col { a: Int32 nullable, b: Int32 nullable } nullable + let fields = Fields::from(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ]); + let schema = Arc::new(Schema::new(vec![Field::new( + "struct_col", + DataType::Struct(fields.clone()), + true, + )])); + + // Build a struct array with 4 rows: + // Row 0: struct={a:1, b:10} → struct NOT null, leaves NOT null + // Row 1: struct={a:NULL, b:NULL} → struct NOT null, leaves ARE null + // Row 2: struct=NULL → struct IS null + // Row 3: struct={a:3, b:NULL} → struct NOT null, mixed leaves + let a_values = Arc::new(Int32Array::from(vec![ + Some(1), + None, // leaf null, struct non-null + None, // struct null + Some(3), + ])); + let b_values = Arc::new(Int32Array::from(vec![ + Some(10), + None, // leaf null, struct non-null + None, // struct null + None, // leaf null, struct non-null + ])); + + let struct_array = StructArray::try_new( + fields, + vec![a_values, b_values], + Some(vec![true, true, false, true].into()), // struct null bitmap + ) + .unwrap(); + + let batch = + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(struct_array)]) + .unwrap(); + + // Write to parquet + let file = NamedTempFile::new().expect("temp file"); + let mut writer = + ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), None) + .expect("writer"); + writer.write(&batch).expect("write batch"); + writer.close().expect("close writer"); + + // Read back and build filter + let reader_file = file.reopen().expect("reopen file"); + let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file) + .expect("reader builder"); + let metadata = builder.metadata().clone(); + let file_schema = builder.schema().clone(); + + // Build filter: struct_col IS NOT NULL + let expr = col("struct_col").is_not_null(); + let expr = logical2physical(&expr, &file_schema); + + let candidate = FilterCandidateBuilder::new(expr, file_schema.clone()) + .build(&metadata) + .expect("building candidate") + .expect("struct IS NOT NULL should be pushable"); + + // Verify only 1 leaf is projected (not both a and b) + let expected_mask = ProjectionMask::leaves( + metadata.file_metadata().schema_descr(), + [0], // only first leaf + ); + assert_eq!( + candidate.read_plan.projection_mask, expected_mask, + "should project only the first leaf of the struct" + ); + + // Now actually evaluate the filter to verify correctness + let mut row_filter = DatafusionArrowPredicate::try_new( + candidate, + Count::new(), + Count::new(), + Time::new(), + ) + .expect("creating filter predicate"); + + let mut parquet_reader = + ParquetRecordBatchReaderBuilder::try_new(file.reopen().expect("reopen")) + .expect("reader builder") + .with_projection(row_filter.projection().clone()) + .build() + .expect("building reader"); + + let rb = parquet_reader + .next() + .expect("expected record batch") + .expect("expected no error"); + + let result = row_filter.evaluate(rb).expect("evaluating filter"); + + // Row 0: struct non-null → true + // Row 1: struct non-null (leaves null) → true ← critical case + // Row 2: struct null → false + // Row 3: struct non-null → true + assert_eq!( + result, + BooleanArray::from(vec![true, true, false, true]), + "struct IS NOT NULL must reflect struct nullability, not leaf nullability" + ); + } + /// get_field returning a list inside a struct should allow pushdown when /// wrapped in a supported list predicate like `array_has_any`. /// e.g. `array_has_any(get_field(s, 'items'), make_array('x'))`