diff --git a/docs/development.md b/docs/development.md index 6b67ead0..58f7178f 100644 --- a/docs/development.md +++ b/docs/development.md @@ -34,6 +34,12 @@ git submodule init git submodule update --recursive ``` +Additionally, some of the data required in the tests can be downloaded by running the following script. + +```bash +python submodules/download-assets.py +``` + Some crates wrap external native libraries and require system dependencies to build. At this time the only crate that requires this is the sedona-s2geography crate, which requires [CMake](https://cmake.org), diff --git a/python/sedonadb/tests/io/test_parquet.py b/python/sedonadb/tests/io/test_parquet.py index 2af35771..42fcaf55 100644 --- a/python/sedonadb/tests/io/test_parquet.py +++ b/python/sedonadb/tests/io/test_parquet.py @@ -60,13 +60,25 @@ def test_read_sedona_testing(sedona_testing, name): @pytest.mark.parametrize("name", ["water-junc", "water-point"]) -def test_read_geoparquet_pruned(geoarrow_data, name): +@pytest.mark.parametrize( + "predicate", + [ + "intersects", + "coveredby", + "within", + "equals", + "disjoint", + ], +) +def test_read_geoparquet_prune_points(geoarrow_data, name, predicate): # Note that this doesn't check that pruning actually occurred, just that # for a query where we should be pruning automatically that we don't omit results. eng = SedonaDB() path = geoarrow_data / "ns-water" / "files" / f"ns-water_{name}_geo.parquet" skip_if_not_exists(path) + gdf = geopandas.read_parquet(path) + # Roughly a diamond around Gaspereau Lake, Nova Scotia, in UTM zone 20 wkt_filter = """ POLYGON (( @@ -74,14 +86,22 @@ def test_read_geoparquet_pruned(geoarrow_data, name): 376000 4983000, 371000 4978000 )) """ + + # Use a wkt_filter that will lead to non-empty results + if predicate == "equals": + wkt_filter = gdf.geometry.iloc[0].wkt + poly_filter = shapely.from_wkt(wkt_filter) - gdf = geopandas.read_parquet(path) - gdf = ( - gdf[gdf.geometry.intersects(poly_filter)] - .sort_values(by="OBJECTID") - .reset_index(drop=True) - ) + if predicate == "equals": + # Geopandas does not have an equals predicate, so we use the == operator + mask = gdf.geometry == poly_filter + elif predicate == "coveredby": + mask = gdf.geometry.covered_by(poly_filter) + else: + mask = getattr(gdf.geometry, predicate)(poly_filter) + + gdf = gdf[mask].sort_values(by="OBJECTID").reset_index(drop=True) gdf = gdf[["OBJECTID", "geometry"]] # Make sure this isn't a bogus test @@ -102,7 +122,7 @@ def test_read_geoparquet_pruned(geoarrow_data, name): result = eng.execute_and_collect( f""" SELECT "OBJECTID", geometry FROM tab - WHERE ST_Intersects(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}')) + WHERE ST_{predicate}(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}')) ORDER BY "OBJECTID"; """ ) @@ -127,8 +147,94 @@ def test_read_geoparquet_pruned(geoarrow_data, name): result = eng.execute_and_collect( f""" SELECT * FROM tab_dataset - WHERE ST_Intersects(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}')) + WHERE ST_{predicate}(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}')) ORDER BY "OBJECTID"; """ ) eng.assert_result(result, gdf) + + +@pytest.mark.parametrize( + "predicate", + [ + "contains", + "covers", + "touches", + ], +) +def test_read_geoparquet_prune_polygons(sedona_testing, predicate): + # Note that this doesn't check that pruning actually occurred, just that + # for a query where we should be pruning automatically that we don't omit results. + eng = SedonaDB() + path = sedona_testing / "data" / "parquet" / "geoparquet-1.0.0.parquet" + skip_if_not_exists(path) + + # A point inside of a polygon for contains / covers + wkt_filter = "POINT (33.60 -5.54)" + + if predicate == "touches": + # A point on the boundary of a polygon + wkt_filter = "POINT (33.90371119710453 -0.9500000000000001)" + + poly_filter = shapely.from_wkt(wkt_filter) + + gdf = geopandas.read_parquet(path) + if predicate == "equals": + # Geopandas does not have an equals predicate, so we use the == operator + mask = gdf.geometry == poly_filter + elif predicate == "coveredby": + mask = gdf.geometry.covered_by(poly_filter) + else: + mask = getattr(gdf.geometry, predicate)(poly_filter) + + gdf = gdf[mask].sort_values(by="pop_est").reset_index(drop=True) + gdf = gdf[["pop_est", "geometry"]] + + # Make sure this isn't a bogus test + assert len(gdf) > 0 + + with tempfile.TemporaryDirectory() as td: + # Write using GeoPandas, which implements GeoParquet 1.1 bbox covering + # Write tiny row groups so that many bounding boxes have to be checked + tmp_parquet = Path(td) / "geoparquet-1.0.0.parquet" + geopandas.read_parquet(path).to_parquet( + tmp_parquet, + schema_version="1.1.0", + write_covering_bbox=True, + row_group_size=1024, + ) + + eng.create_view_parquet("tab", tmp_parquet) + result = eng.execute_and_collect( + f""" + SELECT "pop_est", geometry FROM tab + WHERE ST_{predicate}(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}')) + ORDER BY "pop_est"; + """ + ) + eng.assert_result(result, gdf) + + # Write a dataset with one file per row group to check file pruning correctness + ds_dir = Path(td) / "ds" + ds_dir.mkdir() + ds_paths = [] + + with parquet.ParquetFile(tmp_parquet) as f: + for i in range(f.metadata.num_row_groups): + tab = f.read_row_group(i, ["pop_est", "geometry"]) + df = geopandas.GeoDataFrame.from_arrow(tab) + ds_path = ds_dir / f"file{i}.parquet" + df.to_parquet(ds_path) + ds_paths.append(ds_path) + + # Check a query against the same dataset without the bbox column but with file-level + # geoparquet metadata bounding boxes + eng.create_view_parquet("tab_dataset", ds_paths) + result = eng.execute_and_collect( + f""" + SELECT * FROM tab_dataset + WHERE ST_{predicate}(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}')) + ORDER BY "pop_est"; + """ + ) + eng.assert_result(result, gdf) diff --git a/rust/sedona-expr/src/spatial_filter.rs b/rust/sedona-expr/src/spatial_filter.rs index 348a4c4c..64feda29 100644 --- a/rust/sedona-expr/src/spatial_filter.rs +++ b/rust/sedona-expr/src/spatial_filter.rs @@ -45,8 +45,8 @@ use crate::{ pub enum SpatialFilter { /// ST_Intersects(\, \) or ST_Intersects(\, \) Intersects(Column, BoundingBox), - /// ST_CoveredBy(\, \) or ST_CoveredBy(\, \) - CoveredBy(Column, BoundingBox), + /// ST_Covers(\, \) or ST_Covers(\, \) + Covers(Column, BoundingBox), /// ST_HasZ(\) HasZ(Column), /// Logical AND @@ -69,8 +69,8 @@ impl SpatialFilter { SpatialFilter::Intersects(column, bounds) => { Self::evaluate_intersects_bbox(&table_stats[column.index()], bounds) } - SpatialFilter::CoveredBy(column, bounds) => { - Self::evaluate_covered_by_bbox(&table_stats[column.index()], bounds) + SpatialFilter::Covers(column, bounds) => { + Self::evaluate_covers_bbox(&table_stats[column.index()], bounds) } SpatialFilter::HasZ(column) => Self::evaluate_has_z(&table_stats[column.index()]), SpatialFilter::And(lhs, rhs) => Self::evaluate_and(lhs, rhs, table_stats), @@ -88,9 +88,9 @@ impl SpatialFilter { } } - fn evaluate_covered_by_bbox(column_stats: &GeoStatistics, bounds: &BoundingBox) -> bool { + fn evaluate_covers_bbox(column_stats: &GeoStatistics, bounds: &BoundingBox) -> bool { if let Some(bbox) = column_stats.bbox() { - bounds.contains(bbox) + bbox.contains(bounds) } else { true } @@ -203,19 +203,19 @@ impl SpatialFilter { match (&args[0], &args[1]) { (ArgRef::Col(column), ArgRef::Lit(literal)) => { - // column within/covered_by literal -> CoveredBy filter + // column within/covered_by literal -> Intersects filter match literal_bounds(literal) { Ok(literal_bounds) => { - Ok(Some(Self::CoveredBy(column.clone(), literal_bounds))) + Ok(Some(Self::Intersects(column.clone(), literal_bounds))) } Err(e) => Err(DataFusionError::External(Box::new(e))), } } (ArgRef::Lit(literal), ArgRef::Col(column)) => { - // literal within/covered_by column -> Intersects filter + // literal within/covered_by column -> Covers filter match literal_bounds(literal) { Ok(literal_bounds) => { - Ok(Some(Self::Intersects(column.clone(), literal_bounds))) + Ok(Some(Self::Covers(column.clone(), literal_bounds))) } Err(e) => Err(DataFusionError::External(Box::new(e))), } @@ -231,21 +231,21 @@ impl SpatialFilter { match (&args[0], &args[1]) { (ArgRef::Col(column), ArgRef::Lit(literal)) => { - // column contains/covers literal -> Intersects filter - // (column must potentially intersect literal to contain it) + // column contains/covers literal -> Covers filter + // (column's bbox must fully cover literal's bbox) match literal_bounds(literal) { Ok(literal_bounds) => { - Ok(Some(Self::Intersects(column.clone(), literal_bounds))) + Ok(Some(Self::Covers(column.clone(), literal_bounds))) } Err(e) => Err(DataFusionError::External(Box::new(e))), } } (ArgRef::Lit(literal), ArgRef::Col(column)) => { - // literal contains/covers column -> CoveredBy filter - // (equivalent to st_within(column, literal)) + // literal contains/covers column -> Intersects filter + // (if literal contains column, they must at least intersect) match literal_bounds(literal) { Ok(literal_bounds) => { - Ok(Some(Self::CoveredBy(column.clone(), literal_bounds))) + Ok(Some(Self::Intersects(column.clone(), literal_bounds))) } Err(e) => Err(DataFusionError::External(Box::new(e))), } @@ -424,7 +424,7 @@ mod test { } #[test] - fn predicate_covered_by() { + fn predicate_covers() { let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap(); let literal = Literal::new_with_metadata( create_scalar(Some("POLYGON ((0 0, 4 0, 4 4, 0 4, 0 0))"), &WKB_GEOMETRY), @@ -433,20 +433,17 @@ mod test { let bounds = literal_bounds(&literal).unwrap(); let stats_no_info = [GeoStatistics::unspecified()]; - let stats_covered = [ - GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((1.0, 1.0), (2.0, 2.0)))) - ]; + let stats_covered = + [GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0, 4), (0, 4))))]; let stats_not_covered = [ GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((3.0, 3.0), (5.0, 5.0)))) ]; let col0 = Column::new("col0", 0); - // CoveredBy should return true when column bbox is fully contained in literal bounds - assert!(SpatialFilter::CoveredBy(col0.clone(), bounds.clone()).evaluate(&stats_no_info)); - assert!(SpatialFilter::CoveredBy(col0.clone(), bounds.clone()).evaluate(&stats_covered)); - assert!( - !SpatialFilter::CoveredBy(col0.clone(), bounds.clone()).evaluate(&stats_not_covered) - ); + // Covers should return true when column bbox is fully contained in literal bounds + assert!(SpatialFilter::Covers(col0.clone(), bounds.clone()).evaluate(&stats_no_info)); + assert!(SpatialFilter::Covers(col0.clone(), bounds.clone()).evaluate(&stats_covered)); + assert!(!SpatialFilter::Covers(col0.clone(), bounds.clone()).evaluate(&stats_not_covered)); } #[test] @@ -607,8 +604,8 @@ mod test { )); let predicate = SpatialFilter::try_from_expr(&expr).unwrap(); assert!( - matches!(predicate, SpatialFilter::CoveredBy(_, _)), - "Function {func_name} should produce CoveredBy filter" + matches!(predicate, SpatialFilter::Intersects(_, _)), + "Function {func_name} should produce Intersects filter" ); // Test reversed argument order: should be converted to Intersects filter @@ -620,8 +617,8 @@ mod test { )); let predicate_reversed = SpatialFilter::try_from_expr(&expr_reversed).unwrap(); assert!( - matches!(predicate_reversed, SpatialFilter::Intersects(_, _)), - "Function {func_name} with reversed args should produce Intersects filter" + matches!(predicate_reversed, SpatialFilter::Covers(_, _)), + "Function {func_name} with reversed args should produce Covers filter" ); } @@ -636,8 +633,8 @@ mod test { Some(storage_field.metadata().into()), )); - // Test functions that should result in Intersects filter when column is first arg - // (column contains/covers literal -> column must intersect literal) + // Test functions that should result in CoveredBy filter when column is first arg + // (column contains/covers literal -> column's bbox must fully contain literal's bbox) let func = create_dummy_spatial_function(func_name, 2); let expr: Arc = Arc::new(ScalarFunctionExpr::new( func_name, @@ -647,12 +644,12 @@ mod test { )); let predicate = SpatialFilter::try_from_expr(&expr).unwrap(); assert!( - matches!(predicate, SpatialFilter::Intersects(_, _)), - "Function {func_name} should produce Intersects filter" + matches!(predicate, SpatialFilter::Covers(_, _)), + "Function {func_name} should produce CoveredBy filter" ); - // Test reversed argument order: should be converted to CoveredBy filter - // (literal contains/covers column -> equivalent to st_within(column, literal)) + // Test reversed argument order: should be converted to Intersects filter + // (literal contains/covers column -> they must at least intersect) let expr_reversed: Arc = Arc::new(ScalarFunctionExpr::new( func_name, Arc::new(func), @@ -661,8 +658,8 @@ mod test { )); let predicate_reversed = SpatialFilter::try_from_expr(&expr_reversed).unwrap(); assert!( - matches!(predicate_reversed, SpatialFilter::CoveredBy(_, _)), - "Function {func_name} with reversed args should produce CoveredBy filter" + matches!(predicate_reversed, SpatialFilter::Intersects(_, _)), + "Function {func_name} with reversed args should produce Intersects filter" ); }