From 2d9184b51b8a23da2899075cc849dd770a5dad56 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Thu, 11 Sep 2025 10:00:09 +0800 Subject: [PATCH 1/2] GeoParquet respecting pruning opt --- rust/sedona-geoparquet/src/file_opener.rs | 48 +++++++++++++---------- rust/sedona-geoparquet/src/format.rs | 1 + 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/rust/sedona-geoparquet/src/file_opener.rs b/rust/sedona-geoparquet/src/file_opener.rs index 077669c0..d507f2f1 100644 --- a/rust/sedona-geoparquet/src/file_opener.rs +++ b/rust/sedona-geoparquet/src/file_opener.rs @@ -46,6 +46,7 @@ pub struct GeoParquetFileOpener { metadata_size_hint: Option, predicate: Arc, file_schema: SchemaRef, + enable_pruning: bool, } impl GeoParquetFileOpener { @@ -56,6 +57,7 @@ impl GeoParquetFileOpener { metadata_size_hint: Option, predicate: Arc, file_schema: SchemaRef, + enable_pruning: bool, ) -> Self { Self { inner, @@ -63,12 +65,13 @@ impl GeoParquetFileOpener { metadata_size_hint, predicate, file_schema, + enable_pruning, } } } impl FileOpener for GeoParquetFileOpener { - fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> Result { + fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result { let self_clone = self.clone(); Ok(Box::pin(async move { @@ -81,25 +84,28 @@ impl FileOpener for GeoParquetFileOpener { .await?; let mut access_plan = ParquetAccessPlan::new_all(parquet_metadata.num_row_groups()); - let spatial_filter = SpatialFilter::try_from_expr(&self_clone.predicate)?; - - if let Some(geoparquet_metadata) = - GeoParquetMetadata::try_from_parquet_metadata(&parquet_metadata)? - { - filter_access_plan_using_geoparquet_file_metadata( - &self_clone.file_schema, - &mut access_plan, - &spatial_filter, - &geoparquet_metadata, - )?; - - filter_access_plan_using_geoparquet_covering( - &self_clone.file_schema, - &mut access_plan, - &spatial_filter, - &geoparquet_metadata, - &parquet_metadata, - )?; + + if self_clone.enable_pruning { + let spatial_filter = SpatialFilter::try_from_expr(&self_clone.predicate)?; + + if let Some(geoparquet_metadata) = + GeoParquetMetadata::try_from_parquet_metadata(&parquet_metadata)? + { + filter_access_plan_using_geoparquet_file_metadata( + &self_clone.file_schema, + &mut access_plan, + &spatial_filter, + &geoparquet_metadata, + )?; + + filter_access_plan_using_geoparquet_covering( + &self_clone.file_schema, + &mut access_plan, + &spatial_filter, + &geoparquet_metadata, + &parquet_metadata, + )?; + } } // When we have built-in GEOMETRY/GEOGRAPHY types, we can filter the access plan @@ -115,7 +121,7 @@ impl FileOpener for GeoParquetFileOpener { metadata_size_hint: self_clone.metadata_size_hint, }; - self_clone.inner.open(file_meta, _file)?.await + self_clone.inner.open(file_meta, file)?.await })) } } diff --git a/rust/sedona-geoparquet/src/format.rs b/rust/sedona-geoparquet/src/format.rs index 93346d26..a417cd30 100644 --- a/rust/sedona-geoparquet/src/format.rs +++ b/rust/sedona-geoparquet/src/format.rs @@ -438,6 +438,7 @@ impl FileSource for GeoParquetFileSource { self.metadata_size_hint, self.predicate.clone().unwrap(), base_config.file_schema.clone(), + self.inner.table_parquet_options().global.pruning, )) } From 75fa29ef89025c28df3b35575bce70fb82193730 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Thu, 11 Sep 2025 12:51:04 +0800 Subject: [PATCH 2/2] Add unit test --- rust/sedona-geoparquet/src/format.rs | 53 +++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/rust/sedona-geoparquet/src/format.rs b/rust/sedona-geoparquet/src/format.rs index a417cd30..02904e5b 100644 --- a/rust/sedona-geoparquet/src/format.rs +++ b/rust/sedona-geoparquet/src/format.rs @@ -679,7 +679,7 @@ mod test { #[rstest] #[tokio::test] - async fn pruning_geoparquet_metadata(#[values("st_intersects", "st_within")] udf_name: &str) { + async fn pruning_geoparquet_metadata(#[values("st_intersects", "st_contains")] udf_name: &str) { let data_dir = geoarrow_data_dir().unwrap(); let ctx = setup_context(); @@ -691,10 +691,8 @@ mod test { ) .into(); - let definitely_non_intersecting_scalar = create_scalar( - Some("POLYGON ((100 200), (100 300), (200 300), (100 200))"), - &WKB_GEOMETRY, - ); + let definitely_non_intersecting_scalar = + create_scalar(Some("POINT (100 200)"), &WKB_GEOMETRY); let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap(); let df = ctx @@ -713,10 +711,7 @@ mod test { let batches_out = df.collect().await.unwrap(); assert!(batches_out.is_empty()); - let definitely_intersecting_scalar = create_scalar( - Some("POLYGON ((30 10), (30 20), (40 20), (40 10), (30 10))"), - &WKB_GEOMETRY, - ); + let definitely_intersecting_scalar = create_scalar(Some("POINT (30 10)"), &WKB_GEOMETRY); let df = ctx .table(format!("{data_dir}/example/files/*_geo.parquet")) .await @@ -734,6 +729,46 @@ mod test { assert!(!batches_out.is_empty()); } + #[tokio::test] + async fn should_not_prune_geoparquet_metadata_after_disabling_pruning() { + let data_dir = geoarrow_data_dir().unwrap(); + let ctx = setup_context(); + ctx.sql("SET datafusion.execution.parquet.pruning TO false") + .await + .expect("Disabling parquet pruning failed"); + + let udf: ScalarUDF = SimpleScalarUDF::new_with_signature( + "st_intersects", + Signature::any(2, Volatility::Immutable), + DataType::Boolean, + Arc::new(|_args| Ok(ScalarValue::Boolean(Some(true)).into())), + ) + .into(); + + let definitely_non_intersecting_scalar = + create_scalar(Some("POINT (100 200)"), &WKB_GEOMETRY); + let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap(); + + let df = ctx + .table(format!("{data_dir}/example/files/*_geo.parquet")) + .await + .unwrap() + .filter(udf.call(vec![ + col("geometry"), + Expr::Literal( + definitely_non_intersecting_scalar, + Some(storage_field.metadata().into()), + ), + ])) + .unwrap(); + + // Even if the query window does not intersect with the data, we should not prune + // any files because pruning has been disabled. We can retrieve the data here + // because the dummy UDF always returns true. + let batches_out = df.collect().await.unwrap(); + assert!(!batches_out.is_empty()); + } + #[tokio::test] async fn geoparquet_format_factory() { let ctx = SessionContext::new();