Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ git submodule init
git submodule update --recursive
```

Additionally, some of the data required in the tests can be downloaded by running the following script.

```bash
python submodules/download-assets.py
```

Some crates wrap external native libraries and require system dependencies
to build. At this time the only crate that requires this is the sedona-s2geography
crate, which requires [CMake](https://cmake.org),
Expand Down
124 changes: 115 additions & 9 deletions python/sedonadb/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,48 @@ def test_read_sedona_testing(sedona_testing, name):


@pytest.mark.parametrize("name", ["water-junc", "water-point"])
def test_read_geoparquet_pruned(geoarrow_data, name):
@pytest.mark.parametrize(
"predicate",
[
"intersects",
"coveredby",
"within",
"equals",
"disjoint",
],
)
def test_read_geoparquet_prune_points(geoarrow_data, name, predicate):
# Note that this doesn't check that pruning actually occurred, just that
# for a query where we should be pruning automatically that we don't omit results.
eng = SedonaDB()
path = geoarrow_data / "ns-water" / "files" / f"ns-water_{name}_geo.parquet"
skip_if_not_exists(path)

gdf = geopandas.read_parquet(path)

# Roughly a diamond around Gaspereau Lake, Nova Scotia, in UTM zone 20
wkt_filter = """
POLYGON ((
371000 4978000, 376000 4972000, 381000 4978000,
376000 4983000, 371000 4978000
))
"""

# Use a wkt_filter that will lead to non-empty results
if predicate == "equals":
wkt_filter = gdf.geometry.iloc[0].wkt

poly_filter = shapely.from_wkt(wkt_filter)

gdf = geopandas.read_parquet(path)
gdf = (
gdf[gdf.geometry.intersects(poly_filter)]
.sort_values(by="OBJECTID")
.reset_index(drop=True)
)
if predicate == "equals":
# Geopandas does not have an equals predicate, so we use the == operator
mask = gdf.geometry == poly_filter
elif predicate == "coveredby":
mask = gdf.geometry.covered_by(poly_filter)
else:
mask = getattr(gdf.geometry, predicate)(poly_filter)

gdf = gdf[mask].sort_values(by="OBJECTID").reset_index(drop=True)
gdf = gdf[["OBJECTID", "geometry"]]

# Make sure this isn't a bogus test
Expand All @@ -102,7 +122,7 @@ def test_read_geoparquet_pruned(geoarrow_data, name):
result = eng.execute_and_collect(
f"""
SELECT "OBJECTID", geometry FROM tab
WHERE ST_Intersects(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
WHERE ST_{predicate}(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
ORDER BY "OBJECTID";
"""
)
Expand All @@ -127,8 +147,94 @@ def test_read_geoparquet_pruned(geoarrow_data, name):
result = eng.execute_and_collect(
f"""
SELECT * FROM tab_dataset
WHERE ST_Intersects(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
WHERE ST_{predicate}(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
ORDER BY "OBJECTID";
"""
)
eng.assert_result(result, gdf)


@pytest.mark.parametrize(
"predicate",
[
"contains",
"covers",
"touches",
],
)
def test_read_geoparquet_prune_polygons(sedona_testing, predicate):
# Note that this doesn't check that pruning actually occurred, just that
# for a query where we should be pruning automatically that we don't omit results.
eng = SedonaDB()
path = sedona_testing / "data" / "parquet" / "geoparquet-1.0.0.parquet"
skip_if_not_exists(path)

# A point inside of a polygon for contains / covers
wkt_filter = "POINT (33.60 -5.54)"

if predicate == "touches":
# A point on the boundary of a polygon
wkt_filter = "POINT (33.90371119710453 -0.9500000000000001)"

poly_filter = shapely.from_wkt(wkt_filter)

gdf = geopandas.read_parquet(path)
if predicate == "equals":
# Geopandas does not have an equals predicate, so we use the == operator
mask = gdf.geometry == poly_filter
elif predicate == "coveredby":
mask = gdf.geometry.covered_by(poly_filter)
else:
mask = getattr(gdf.geometry, predicate)(poly_filter)

gdf = gdf[mask].sort_values(by="pop_est").reset_index(drop=True)
gdf = gdf[["pop_est", "geometry"]]

# Make sure this isn't a bogus test
assert len(gdf) > 0

with tempfile.TemporaryDirectory() as td:
# Write using GeoPandas, which implements GeoParquet 1.1 bbox covering
# Write tiny row groups so that many bounding boxes have to be checked
tmp_parquet = Path(td) / "geoparquet-1.0.0.parquet"
geopandas.read_parquet(path).to_parquet(
tmp_parquet,
schema_version="1.1.0",
write_covering_bbox=True,
row_group_size=1024,
)

eng.create_view_parquet("tab", tmp_parquet)
result = eng.execute_and_collect(
f"""
SELECT "pop_est", geometry FROM tab
WHERE ST_{predicate}(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
ORDER BY "pop_est";
"""
)
eng.assert_result(result, gdf)

# Write a dataset with one file per row group to check file pruning correctness
ds_dir = Path(td) / "ds"
ds_dir.mkdir()
ds_paths = []

with parquet.ParquetFile(tmp_parquet) as f:
for i in range(f.metadata.num_row_groups):
tab = f.read_row_group(i, ["pop_est", "geometry"])
df = geopandas.GeoDataFrame.from_arrow(tab)
ds_path = ds_dir / f"file{i}.parquet"
df.to_parquet(ds_path)
ds_paths.append(ds_path)

# Check a query against the same dataset without the bbox column but with file-level
# geoparquet metadata bounding boxes
eng.create_view_parquet("tab_dataset", ds_paths)
result = eng.execute_and_collect(
f"""
SELECT * FROM tab_dataset
WHERE ST_{predicate}(geometry, ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
ORDER BY "pop_est";
"""
)
eng.assert_result(result, gdf)
73 changes: 35 additions & 38 deletions rust/sedona-expr/src/spatial_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use crate::{
pub enum SpatialFilter {
/// ST_Intersects(\<column\>, \<literal\>) or ST_Intersects(\<literal\>, \<column\>)
Intersects(Column, BoundingBox),
/// ST_CoveredBy(\<column\>, \<literal\>) or ST_CoveredBy(\<literal\>, \<column\>)
CoveredBy(Column, BoundingBox),
/// ST_Covers(\<column\>, \<literal\>) or ST_Covers(\<literal\>, \<column\>)
Covers(Column, BoundingBox),
/// ST_HasZ(\<column\>)
HasZ(Column),
/// Logical AND
Expand All @@ -69,8 +69,8 @@ impl SpatialFilter {
SpatialFilter::Intersects(column, bounds) => {
Self::evaluate_intersects_bbox(&table_stats[column.index()], bounds)
}
SpatialFilter::CoveredBy(column, bounds) => {
Self::evaluate_covered_by_bbox(&table_stats[column.index()], bounds)
SpatialFilter::Covers(column, bounds) => {
Self::evaluate_covers_bbox(&table_stats[column.index()], bounds)
}
SpatialFilter::HasZ(column) => Self::evaluate_has_z(&table_stats[column.index()]),
SpatialFilter::And(lhs, rhs) => Self::evaluate_and(lhs, rhs, table_stats),
Expand All @@ -88,9 +88,9 @@ impl SpatialFilter {
}
}

fn evaluate_covered_by_bbox(column_stats: &GeoStatistics, bounds: &BoundingBox) -> bool {
fn evaluate_covers_bbox(column_stats: &GeoStatistics, bounds: &BoundingBox) -> bool {
if let Some(bbox) = column_stats.bbox() {
bounds.contains(bbox)
bbox.contains(bounds)
} else {
true
}
Expand Down Expand Up @@ -203,19 +203,19 @@ impl SpatialFilter {

match (&args[0], &args[1]) {
(ArgRef::Col(column), ArgRef::Lit(literal)) => {
// column within/covered_by literal -> CoveredBy filter
// column within/covered_by literal -> Intersects filter
match literal_bounds(literal) {
Ok(literal_bounds) => {
Ok(Some(Self::CoveredBy(column.clone(), literal_bounds)))
Ok(Some(Self::Intersects(column.clone(), literal_bounds)))
}
Err(e) => Err(DataFusionError::External(Box::new(e))),
}
}
(ArgRef::Lit(literal), ArgRef::Col(column)) => {
// literal within/covered_by column -> Intersects filter
// literal within/covered_by column -> Covers filter
match literal_bounds(literal) {
Ok(literal_bounds) => {
Ok(Some(Self::Intersects(column.clone(), literal_bounds)))
Ok(Some(Self::Covers(column.clone(), literal_bounds)))
}
Err(e) => Err(DataFusionError::External(Box::new(e))),
}
Expand All @@ -231,21 +231,21 @@ impl SpatialFilter {

match (&args[0], &args[1]) {
(ArgRef::Col(column), ArgRef::Lit(literal)) => {
// column contains/covers literal -> Intersects filter
// (column must potentially intersect literal to contain it)
// column contains/covers literal -> Covers filter
// (column's bbox must fully cover literal's bbox)
match literal_bounds(literal) {
Ok(literal_bounds) => {
Ok(Some(Self::Intersects(column.clone(), literal_bounds)))
Ok(Some(Self::Covers(column.clone(), literal_bounds)))
}
Err(e) => Err(DataFusionError::External(Box::new(e))),
}
}
(ArgRef::Lit(literal), ArgRef::Col(column)) => {
// literal contains/covers column -> CoveredBy filter
// (equivalent to st_within(column, literal))
// literal contains/covers column -> Intersects filter
// (if literal contains column, they must at least intersect)
match literal_bounds(literal) {
Ok(literal_bounds) => {
Ok(Some(Self::CoveredBy(column.clone(), literal_bounds)))
Ok(Some(Self::Intersects(column.clone(), literal_bounds)))
}
Err(e) => Err(DataFusionError::External(Box::new(e))),
}
Expand Down Expand Up @@ -424,7 +424,7 @@ mod test {
}

#[test]
fn predicate_covered_by() {
fn predicate_covers() {
let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
let literal = Literal::new_with_metadata(
create_scalar(Some("POLYGON ((0 0, 4 0, 4 4, 0 4, 0 0))"), &WKB_GEOMETRY),
Expand All @@ -433,20 +433,17 @@ mod test {
let bounds = literal_bounds(&literal).unwrap();

let stats_no_info = [GeoStatistics::unspecified()];
let stats_covered = [
GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((1.0, 1.0), (2.0, 2.0))))
];
let stats_covered =
[GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0, 4), (0, 4))))];
let stats_not_covered = [
GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((3.0, 3.0), (5.0, 5.0))))
];
let col0 = Column::new("col0", 0);

// CoveredBy should return true when column bbox is fully contained in literal bounds
assert!(SpatialFilter::CoveredBy(col0.clone(), bounds.clone()).evaluate(&stats_no_info));
assert!(SpatialFilter::CoveredBy(col0.clone(), bounds.clone()).evaluate(&stats_covered));
assert!(
!SpatialFilter::CoveredBy(col0.clone(), bounds.clone()).evaluate(&stats_not_covered)
);
// Covers should return true when column bbox is fully contained in literal bounds
assert!(SpatialFilter::Covers(col0.clone(), bounds.clone()).evaluate(&stats_no_info));
assert!(SpatialFilter::Covers(col0.clone(), bounds.clone()).evaluate(&stats_covered));
assert!(!SpatialFilter::Covers(col0.clone(), bounds.clone()).evaluate(&stats_not_covered));
}

#[test]
Expand Down Expand Up @@ -607,8 +604,8 @@ mod test {
));
let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
assert!(
matches!(predicate, SpatialFilter::CoveredBy(_, _)),
"Function {func_name} should produce CoveredBy filter"
matches!(predicate, SpatialFilter::Intersects(_, _)),
"Function {func_name} should produce Intersects filter"
);

// Test reversed argument order: should be converted to Intersects filter
Expand All @@ -620,8 +617,8 @@ mod test {
));
let predicate_reversed = SpatialFilter::try_from_expr(&expr_reversed).unwrap();
assert!(
matches!(predicate_reversed, SpatialFilter::Intersects(_, _)),
"Function {func_name} with reversed args should produce Intersects filter"
matches!(predicate_reversed, SpatialFilter::Covers(_, _)),
"Function {func_name} with reversed args should produce Covers filter"
);
}

Expand All @@ -636,8 +633,8 @@ mod test {
Some(storage_field.metadata().into()),
));

// Test functions that should result in Intersects filter when column is first arg
// (column contains/covers literal -> column must intersect literal)
// Test functions that should result in CoveredBy filter when column is first arg
// (column contains/covers literal -> column's bbox must fully contain literal's bbox)
let func = create_dummy_spatial_function(func_name, 2);
let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
func_name,
Expand All @@ -647,12 +644,12 @@ mod test {
));
let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
assert!(
matches!(predicate, SpatialFilter::Intersects(_, _)),
"Function {func_name} should produce Intersects filter"
matches!(predicate, SpatialFilter::Covers(_, _)),
"Function {func_name} should produce CoveredBy filter"
);

// Test reversed argument order: should be converted to CoveredBy filter
// (literal contains/covers column -> equivalent to st_within(column, literal))
// Test reversed argument order: should be converted to Intersects filter
// (literal contains/covers column -> they must at least intersect)
let expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
func_name,
Arc::new(func),
Expand All @@ -661,8 +658,8 @@ mod test {
));
let predicate_reversed = SpatialFilter::try_from_expr(&expr_reversed).unwrap();
assert!(
matches!(predicate_reversed, SpatialFilter::CoveredBy(_, _)),
"Function {func_name} with reversed args should produce CoveredBy filter"
matches!(predicate_reversed, SpatialFilter::Intersects(_, _)),
"Function {func_name} with reversed args should produce Intersects filter"
);
}

Expand Down