Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions rust/sedona-geoparquet/src/file_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct GeoParquetFileOpener {
metadata_size_hint: Option<usize>,
predicate: Arc<dyn PhysicalExpr>,
file_schema: SchemaRef,
enable_pruning: bool,
}

impl GeoParquetFileOpener {
Expand All @@ -56,19 +57,21 @@ impl GeoParquetFileOpener {
metadata_size_hint: Option<usize>,
predicate: Arc<dyn PhysicalExpr>,
file_schema: SchemaRef,
enable_pruning: bool,
) -> Self {
Self {
inner,
object_store,
metadata_size_hint,
predicate,
file_schema,
enable_pruning,
}
}
}

impl FileOpener for GeoParquetFileOpener {
fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture> {
let self_clone = self.clone();

Ok(Box::pin(async move {
Expand All @@ -81,25 +84,28 @@ impl FileOpener for GeoParquetFileOpener {
.await?;

let mut access_plan = ParquetAccessPlan::new_all(parquet_metadata.num_row_groups());
let spatial_filter = SpatialFilter::try_from_expr(&self_clone.predicate)?;

if let Some(geoparquet_metadata) =
GeoParquetMetadata::try_from_parquet_metadata(&parquet_metadata)?
{
filter_access_plan_using_geoparquet_file_metadata(
&self_clone.file_schema,
&mut access_plan,
&spatial_filter,
&geoparquet_metadata,
)?;

filter_access_plan_using_geoparquet_covering(
&self_clone.file_schema,
&mut access_plan,
&spatial_filter,
&geoparquet_metadata,
&parquet_metadata,
)?;

if self_clone.enable_pruning {
let spatial_filter = SpatialFilter::try_from_expr(&self_clone.predicate)?;

if let Some(geoparquet_metadata) =
GeoParquetMetadata::try_from_parquet_metadata(&parquet_metadata)?
{
filter_access_plan_using_geoparquet_file_metadata(
&self_clone.file_schema,
&mut access_plan,
&spatial_filter,
&geoparquet_metadata,
)?;

filter_access_plan_using_geoparquet_covering(
&self_clone.file_schema,
&mut access_plan,
&spatial_filter,
&geoparquet_metadata,
&parquet_metadata,
)?;
}
}

// When we have built-in GEOMETRY/GEOGRAPHY types, we can filter the access plan
Expand All @@ -115,7 +121,7 @@ impl FileOpener for GeoParquetFileOpener {
metadata_size_hint: self_clone.metadata_size_hint,
};

self_clone.inner.open(file_meta, _file)?.await
self_clone.inner.open(file_meta, file)?.await
}))
}
}
Expand Down
54 changes: 45 additions & 9 deletions rust/sedona-geoparquet/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ impl FileSource for GeoParquetFileSource {
self.metadata_size_hint,
self.predicate.clone().unwrap(),
base_config.file_schema.clone(),
self.inner.table_parquet_options().global.pruning,
))
}

Expand Down Expand Up @@ -678,7 +679,7 @@ mod test {

#[rstest]
#[tokio::test]
async fn pruning_geoparquet_metadata(#[values("st_intersects", "st_within")] udf_name: &str) {
async fn pruning_geoparquet_metadata(#[values("st_intersects", "st_contains")] udf_name: &str) {
let data_dir = geoarrow_data_dir().unwrap();
let ctx = setup_context();

Expand All @@ -690,10 +691,8 @@ mod test {
)
.into();

let definitely_non_intersecting_scalar = create_scalar(
Some("POLYGON ((100 200), (100 300), (200 300), (100 200))"),
&WKB_GEOMETRY,
);
let definitely_non_intersecting_scalar =
create_scalar(Some("POINT (100 200)"), &WKB_GEOMETRY);
let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();

let df = ctx
Expand All @@ -712,10 +711,7 @@ mod test {
let batches_out = df.collect().await.unwrap();
assert!(batches_out.is_empty());

let definitely_intersecting_scalar = create_scalar(
Some("POLYGON ((30 10), (30 20), (40 20), (40 10), (30 10))"),
&WKB_GEOMETRY,
);
let definitely_intersecting_scalar = create_scalar(Some("POINT (30 10)"), &WKB_GEOMETRY);
let df = ctx
.table(format!("{data_dir}/example/files/*_geo.parquet"))
.await
Expand All @@ -733,6 +729,46 @@ mod test {
assert!(!batches_out.is_empty());
}

#[tokio::test]
async fn should_not_prune_geoparquet_metadata_after_disabling_pruning() {
let data_dir = geoarrow_data_dir().unwrap();
let ctx = setup_context();
ctx.sql("SET datafusion.execution.parquet.pruning TO false")
.await
.expect("Disabling parquet pruning failed");

let udf: ScalarUDF = SimpleScalarUDF::new_with_signature(
"st_intersects",
Signature::any(2, Volatility::Immutable),
DataType::Boolean,
Arc::new(|_args| Ok(ScalarValue::Boolean(Some(true)).into())),
)
.into();

let definitely_non_intersecting_scalar =
create_scalar(Some("POINT (100 200)"), &WKB_GEOMETRY);
let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();

let df = ctx
.table(format!("{data_dir}/example/files/*_geo.parquet"))
.await
.unwrap()
.filter(udf.call(vec![
col("geometry"),
Expr::Literal(
definitely_non_intersecting_scalar,
Some(storage_field.metadata().into()),
),
]))
.unwrap();

// Even if the query window does not intersect with the data, we should not prune
// any files because pruning has been disabled. We can retrieve the data here
// because the dummy UDF always returns true.
let batches_out = df.collect().await.unwrap();
assert!(!batches_out.is_empty());
}

#[tokio::test]
async fn geoparquet_format_factory() {
let ctx = SessionContext::new();
Expand Down