From eb35966617cc83b554eb63c2c4114bfe043d12a8 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Fri, 16 Oct 2020 23:46:14 +0200 Subject: [PATCH 1/2] ENH: extract spatial partitioning information from partitioned Parquet dataset --- dask_geopandas/io/parquet.py | 52 +++++++++++++++++++++++++++++++++++- tests/io/test_parquet.py | 4 ++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/dask_geopandas/io/parquet.py b/dask_geopandas/io/parquet.py index eedeb0ac..0be65232 100644 --- a/dask_geopandas/io/parquet.py +++ b/dask_geopandas/io/parquet.py @@ -1,9 +1,11 @@ from functools import partial +import json from typing import TYPE_CHECKING import pandas as pd import geopandas +import shapely.geometry import dask.dataframe as dd @@ -17,6 +19,40 @@ import pyarrow +def _get_partition_bounds(part): + """ + Based on the part information gathered by dask, get the partition bounds + if available. + + """ + from pyarrow.parquet import read_metadata + + # read the metadata from the actual file (this is again file IO, but + # we can't rely on the schema metadata, because this is only the + # metadata of the first piece) + pq_metadata = None + if "piece" in part: + path = part["piece"][0] + if isinstance(path, str): + pq_metadata = read_metadata(path) + if pq_metadata is None: + return None + + metadata_str = pq_metadata.metadata.get(b"geo", None) + if metadata_str is None: + return None + + metadata = json.loads(metadata_str.decode("utf-8")) + + # for now only check the primary column (TODO generalize this to follow + # the logic of geopandas to fallback to other geometry columns) + geometry = metadata["primary_column"] + bbox = metadata["columns"][geometry].get("bbox", None) + if bbox is None: + return None + return shapely.geometry.box(*bbox) + + class GeoArrowEngine(ArrowEngine): @classmethod def read_metadata(cls, *args, **kwargs): @@ -27,6 +63,12 @@ def read_metadata(cls, *args, **kwargs): # for a default "geometry" column) meta = geopandas.GeoDataFrame(meta) + # get spatial partitions if available + regions = geopandas.GeoSeries([_get_partition_bounds(part) for part in parts]) + if regions.notna().all(): + # a bit hacky, but this allows us to get this passed through + meta.attrs["spatial_partitions"] = regions + return (meta, stats, parts, index) @classmethod @@ -138,5 +180,13 @@ def write_partition( to_parquet = partial(dd.to_parquet, engine=GeoArrowEngine) to_parquet.__doc__ = dd.to_parquet.__doc__ -read_parquet = partial(dd.read_parquet, engine=GeoArrowEngine) + +def read_parquet(*args, **kwargs): + result = dd.read_parquet(*args, engine=GeoArrowEngine, **kwargs) + # check if spatial partitioning information was stored + spatial_partitions = result._meta.attrs.get("spatial_partitions", None) + result.spatial_partitions = spatial_partitions + return result + + read_parquet.__doc__ = dd.read_parquet.__doc__ diff --git a/tests/io/test_parquet.py b/tests/io/test_parquet.py index 571e2e2f..ce60474f 100644 --- a/tests/io/test_parquet.py +++ b/tests/io/test_parquet.py @@ -28,8 +28,10 @@ def test_parquet_roundtrip(tmp_path): # reading back gives identical GeoDataFrame result = dask_geopandas.read_parquet(basedir) - assert ddf.npartitions == 4 + assert result.npartitions == 4 assert_geodataframe_equal(result.compute(), df) + # reading back also populates the spatial partitioning property + assert result.spatial_partitions is not None # the written dataset is also readable by plain geopandas result_gpd = geopandas.read_parquet(basedir) From 02297a23452cbe96b048eb837082cd40b7f5f172 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Mon, 5 Jul 2021 22:41:14 +0200 Subject: [PATCH 2/2] update test --- tests/io/test_parquet.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/io/test_parquet.py b/tests/io/test_parquet.py index ce60474f..738fbbe3 100644 --- a/tests/io/test_parquet.py +++ b/tests/io/test_parquet.py @@ -56,6 +56,8 @@ def test_column_selection_push_down(tmp_path): # selecting columns including geometry column still gives GeoDataFrame ddf_subset = ddf[["pop_est", "geometry"]] assert type(ddf_subset) is dask_geopandas.GeoDataFrame + # and also preserves the spatial partitioning information + assert ddf_subset.spatial_partitions is not None # selecting a single non-geometry column on the dataframe should work s = ddf["pop_est"]