Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: spatial partitioning of the GeoDataFrame #8

Open
jorisvandenbossche opened this issue Jun 5, 2020 · 22 comments
Open

ENH: spatial partitioning of the GeoDataFrame #8

jorisvandenbossche opened this issue Jun 5, 2020 · 22 comments
Labels
enhancement New feature or request

Comments

@jorisvandenbossche
Copy link
Member

For making spatial joins or overlays, spatial predicates, reading from spatially partitioned datasets, etc more efficient, we can have spatially partitioned dataframes: the bounds of each partition is known, and thus it can be checked based on those bounds whether on operation needs to involve that partition or not.
And then geodataframes can also be re-partitioned to optimize the bounds (minimize the overlap) as much as possible (initial costly shuffle operation, but can pay-off later).

This complicates the implementation (we need to keep track of the spatial partitioning, the partitions can change during spatial operations, ..), but I think it will also be critical for improving performance on large datasets.


How can we add this?

In the previous iteration at https://github.com/mrocklin/dask-geopandas, the dataframes had an additional _regions attribute, which was a geopandas.GeoSeries with the "regions" of each partition (so len(regions) == npartitions).

See https://github.com/mrocklin/dask-geopandas/blob/8133969bf03d158f51faf85d020641e86c9a7e28/dask_geopandas/core.py#L50

I think one advantage of using a GeoSeries is that this makes it easy to work with (eg it is easy to check which partitions would intersect with a given geometry).

In spatialpandas (https://github.com/holoviz/spatialpandas), there is a combo of partition_bounds and partition_sindex.
The partition_bounds is basically the total_bounds of each partition (so you could see it as the _regions but limited to a rectangular box and stores as the 4 (minx, miny, maxx, maxy) numbers). And then partition_sindex is a spatial index built on the partition_bounds.

See https://github.com/holoviz/spatialpandas/blob/master/spatialpandas/dask.py


I suppose starting with a basic "partition bounds" should be fine, and allows to later expand it with a spatial index or with more fine-grained shapes.

@borao
Copy link

borao commented Jun 6, 2020

What do you think about potentially partitioning by attribute?

For example, if I have a set of points (maybe cities) in the US with the state as a feature, and I want to spatially join it with a set of counties (polygons) in the US that also has the state as a feature, the computation could be significantly optimized by not having to worry about overlap/other geometric nuances in the borders, and most computation would be within the partition.

@mrocklin
Copy link
Member

mrocklin commented Jun 6, 2020

FWIW I found that spatial partitioning with a GeoPandas series felt pretty clean. Many of the operations like spatial joins were relativly straightforward to write. Ideally some the implementation in the previous version could be reused.

Things do become odd when you start considering geometries that can cross partition boundaries though. (This will be a problem regardless of which partitioning scheme you use). I think that there are ways to handle it, but it requires some thinking. For point-wise geometries though everything is pretty straightforward.

@jorisvandenbossche
Copy link
Member Author

What do you think about potentially partitioning by attribute?

I think that's something that is already supported by dask (but @mrocklin can correct me if I am wrong). See eg the doc page on shuffling at https://docs.dask.org/en/latest/dataframe-groupby.html
You can for example do a set_index(..) for a certain attribute column, which will repartition the dataframe based on that attribute, making joins on this new index more efficient.

The spatial partitioning will then mostly be useful for cases when you don't have such attribute that can be used as index (or where the regional extent of the attribute is not exactly the same accross datasets).

Of course, if you know you have an attribute that describes certain regions, we could make it possible to specify this as input to the "spatial re-partitioning" function as a basis on how to repartition (instead of using some other logic to divide the space into regions).
(this would be similar as doing a set_index() and then calculating the bounds for the partitions as they are)

@jorisvandenbossche jorisvandenbossche added the enhancement New feature or request label Jun 11, 2020
@jsignell
Copy link
Member

I think copying Matt's original implementation of using a geoseries to represent the divisions makes sense. Is there a cost associated with that that makes the spatialpandas path more compelling?

@jorisvandenbossche
Copy link
Member Author

Only boxes are in principle a bit simpler, but on the other hand, storing it as a geoseries might make working with it easier. And is also more general (and actually gives a spatial index on those partition bounds directly as well, as the geoseries has it).
We can also always simplify the regions on the partitions in the geoseries to boxes / envelopes to avoid the partition bounds being too detailed (a plain union of all geometries inside the partition might give a very detailed polygon).

So certainly on board with starting with such an approach.

Regarding naming, I actually like the "partition_bounds", only "bounds" is typically pointing to a bounding box.

@jsignell
Copy link
Member

Can we just call them divisions like dask does?

@jorisvandenbossche
Copy link
Member Author

I am not sure that will play nicely with dask? Eg does dask expect it to be a tuple?

Also, this are not divisions for the index, but a different concept, which might be confusing?

@caspervdw
Copy link

Only boxes are in principle a bit simpler, but on the other hand, storing it as a geoseries might make working with it easier. And is also more general (and actually gives a spatial index on those partition bounds directly as well, as the geoseries has it).
We can also always simplify the regions on the partitions in the geoseries to boxes / envelopes to avoid the partition bounds being too detailed (a plain union of all geometries inside the partition might give a very detailed polygon).

So certainly on board with starting with such an approach.

Regarding naming, I actually like the "partition_bounds", only "bounds" is typically pointing to a bounding box.

I might be looking at this issue too much from one side, but I am really drawn towards the bounding box-only approach. Such a partition looks more like an array chunk. For vector-raster analyses this match will potentially allow more optimalizations.

Maybe this would also allow borrowing some (battle-tested) logic from dask.array?

@jorisvandenbossche
Copy link
Member Author

In the analogy with dask.array, I think some problems are that 1) we don't have a regular grid with rectangles (in dask, the chunksize can vary in each dimension, but it's still are all rectangles in a grid, I think), and 2) we will typically have overlapping bounding boxes / spatial partitions (with polygons of all shapes, and each polygon having to fit completely in one of the bounding boxes, I think it is unavoidable that the boxes will typically overlap slightly).

(that's not to say that a simpler bounding-box-only scheme couldn't be interesting, just pointing out it's quite a bit more complicated as dask.array's chunking scheme)

@caspervdw
Copy link

  1. we don't have a regular grid with rectangles (in dask, the chunksize can vary in each dimension, but it's still are all rectangles in a grid, I think)

You are correct that dask.array works with rectangular grids. I'd propose to make that the default option also for the spatial partitioning of geometries. There are roughly two options: 1) a partition consists of all geometries that are intersecting with it or 2) a partition consists of all geometries that have some deterministically assigned point (e.g. centroid) in it. The first will give duplicates, but the second doesn't allow pruning partitions in some spatial operations like rasterizing or overlay. It depends on your typical geometry size if the duplicates are acceptable; in many cases they are and you still get a big performance increase because of the parallelization.

So I'd propose the first one (intersections, with duplicates) but keep the door open for developing more advanced schemes (carrying the convex hull of the parition along, quadtree, space filling curves, ...). Advantages of this approach (in my personal experience) are:

  • The partitioning does not depend on the actual data. So you don't have to go through and pay the cost of IO / cpu before even starting to build the dask collection.
  • The partitions can be defined with only 6 numbers (x0, y0, dx, dy, nx, ny) as opposed to N geometries. This will allow non-materialized layers (as in high level graphs) and efficient high level optimizations. This is especially useful if you are working with lots of partitions.
  • With the correct tiling, the output could go into a tile grid, which are expected for many services (like vector tiles).

@jorisvandenbossche
Copy link
Member Author

Interesting! That seems to point to a fundamental difference in thinking about "what is a spatial partition (bounds)". Because in my mind, it's still something else as the two options you list.

In your two options, you seem to start from the spatial extent, and then determine which geometries belong to that spatial partition (either all geometries intersecting with it, or having a representative point lying within it).

In my mind, and thinking about how a dask.dataframe works, another option (and what is currently done in the dummy implementation) is to start from the actual physical data: a dask dataframe is splitted into different partitions (sub-dataframes), and so the spatial partitioning is determined by the spatial extent (total bounding box for simplicity, or a more complex polygon) of the geometries that are stored in a certain partition. So the spatial bounds of a partition is a bounding box (or polygon) that at least fully covers all geometries of its partition.
(and then it of course becomes a matter of ensuring that the geometries are sorted over the different partitions in such a way that those spatial extents are efficient and not overlapping too much -> that's a whole separate topic of how to smartly "re"partition)

In that mindset, for non-point geometries and when working with simple bounding boxes, you will have by definition overlapping bounding boxes of the spatial partitions (unless all the polygon geometries are rectangular as wel, but let's not consider such special case for the general design ;))
For point geometries, it's of course possible to have a more efficient non-overlapping bounding boxes (like the resulting rectangles from a quadtree spatial index).


I want to get back to the two options you listed above as well, and discuss some potential problems (or to explain "properties" that I think are important for a spatial partitioning system):

1) a partition consists of all geometries that are intersecting with it.

As you mention, this will give duplicates. However, I am not sure this is actually practical with dask.dataframe to have duplicates. Either you need to accept that things like len(df) or df["attribute"].mean().compute() will give wrong answers (which I don't think is acceptable), or either such simple calculations like the length or the mean of a column will each time have to "deduplicate" the rows first (which I think is not that easy, will impact performance for non-spatial operations, and will also loose a lot of the benefits of using dask.dataframe).

2) a partition consists of all geometries that have some deterministically assigned point (e.g. centroid) in it.

For this one, you mention that it doesn't allow pruning partitions in some spatial operations. And I think that's indeed an essential property of the spatial partitioning is that the bounding box (or polygon) representing a given partition should fully cover the actual geometries (and not only contain a representative point, or only intersect with the polygon).
For example, if I want to check if a certain polygon intersects with any of the polygons in my dataframe (gdf.intersects(polygon) type of operation), I think we need to be able to rely on the spatial partitioning information to know which partitions to check based on whether the given polygon intersects with any of the bounding boxes of the partitions. If the bounding boxes are not guaranteed to fully cover its geometries, we don't have reliable information about potential intersection with the given polygon, and that means we would basically always need to check the interesects operation with all partitions, not being able to skip any partition, and thus loosing the benefit of having spatial partitions in the first place.

@caspervdw
Copy link

caspervdw commented May 29, 2021

Interesting discussion indeed!

In your two options, you seem to start from the spatial extent, and then determine which geometries belong to that spatial partition (either all geometries intersecting with it, or having a representative point lying within it).

In my mind, and thinking about how a dask.dataframe works, another option (and what is currently done in the dummy implementation) is to start from the actual physical data: a dask dataframe is splitted into different partitions (sub-dataframes), and so the spatial partitioning is determined by the spatial extent (total bounding box for simplicity, or a more complex polygon) of the geometries that are stored in a certain partition. So the spatial bounds of a partition is a bounding box (or polygon) that at least fully covers all geometries of its partition.
(and then it of course becomes a matter of ensuring that the geometries are sorted over the different partitions in such a way that those spatial extents are efficient and not overlapping too much -> that's a whole separate topic of how to smartly "re"partition)

I agree that there is this difference of having (or not having) the data determine the partitions. Having to duplicate all of the input vector data into a partitioned form before an analysis can be done can be quite a show-stopper. IO is expensive, especially if the data is accessed over a network (let's say an SQL database or web API). At the same time, data that is accessible over the network is ideally suited for distributed computations. In my ideal world, you would be able to construct a computation on your laptop and send it to a cluster for computation without ever having to have all the IO flowing to your laptop.

I think this is a really important point to decide on: does dask-geopandas want to aim for this kind of usage?

  1. a partition consists of all geometries that are intersecting with it.

As you mention, this will give duplicates. However, I am not sure this is actually practical with dask.dataframe to have duplicates. Either you need to accept that things like len(df) or df["attribute"].mean().compute() will give wrong answers (which I don't think is acceptable), or either such simple calculations like the length or the mean of a column will each time have to "deduplicate" the rows first (which I think is not that easy, will impact performance for non-spatial operations, and will also loose a lot of the benefits of using dask.dataframe).

So this is a real problem indeed. Maybe dask-geopandas can keep track of which features have their representative point inside the partition? In that way it is very efficient to do aggregations like len or mean over unique features. And it also directly solves #40 by accepting that any partitioned vector dataset has overlaps.

  1. a partition consists of all geometries that have some deterministically assigned point (e.g. centroid) in it.

For this one, you mention that it doesn't allow pruning partitions in some spatial operations. And I think that's indeed an essential property of the spatial partitioning is that the bounding box (or polygon) representing a given partition should fully cover the actual geometries (and not only contain a representative point, or only intersect with the polygon).

This approach is indeed not suitable for spatial operations. But it does have use cases: for example, when you want to do a polygon-raster operation in which the raster is averaged inside each polygon. Both datasets are partitioned. You don't want duplicates in the polygons, but you do want the geometries in a partition to be spatially close so that you can prune the raster partitions.


Summarizing: we could define non-overlapping partitions (e.g. a rectangular grid). Each partition contains at least all geometries that intersect with it, with a flag indicating if it is in our out bounds. Also it has 2 bounding boxes as metadata: the partition box and the box that contains all geometries (including the out-of-bounds ones).

One more thoughts that popped up while writing this: I think the problem here is not so much in the fact that we are working in 2D, but mainly in the fact that the partitioning column consists of "range" objects instead of scalar objects. When partitioning a dask.dataframe, you take a column of scalars (typically, time) and divide this in blocks. For geometries, this can't be done easily because e.g. a polygon covers ranges of x and y. With dask.dataframes, you might have this issue too if your data consists of "events" with a start and end time. How would you partition that by time? Are there examples out there? Maybe somebody already dealt with the problem that partitions of events overlap in time?

@brendan-ward
Copy link
Member

One approach that looks promising, at least in theory, is bounding interval hierarchy. A bit more on the approach here and here.

It uses a recursive splitting algorithm like kdtree (points), but extends this to ranges and explicitly handles geometries that overlap the split line - so that once complete, those geometries are still assigned uniquely to a partition. It looks like it might take a bit of figuring to work out how to handle range queries against this data structure, since most of the examples are from single raycasting.

@caspervdw
Copy link

Two options I encountered:

@martinfleis
Copy link
Member

Spatial partitioning in Apache Sedona seems to be based on KDB-Tree, Quad-Tree and R-Tree.

https://sedona.apache.org/tutorial/core-python/#use-spatial-partitioning

Their example reminds me that we should have a method that repartitions a dataframe A based on partitions of a dataframe B.

@mrocklin
Copy link
Member

I'll jump in here again and promote using polygons. Writing the code the first time around felt really natural. Just to be clear, they don't necessarily have to fully partition the data, as someone said above you don't want to force a model where data must be organized, because in many cases this won't be the case, and it's expensive to implement. Instead, you want possibly overlapping bounding polygons for each partition, with the infinity polygon being the default option.

In the original dask-geopandas I intentionally tried to solve some of the harder problems to make sure that the approach would work well. Spatial joins and whatever-the-groupby-thing-is called (please forgive my memory) were both interesting to write, but also both clean in the end. It felt very much like the right abstraction. We got to reuse a lot of the logic in geopandas. A parallel spatial join between two parallel geodataframes turned into a spatial join of the high level regions of each, followed by mapping spatial-join across those intersections. We got to leverage the power of the already written geopandas code. It was great.

Trees are also good, but I personally would start with a tree of depth 1, and only expand that depth once computation/indexing proved troublesome. I wouldn't expect this to happen until you were well beyond thousands of partitions, which I think already covers the vast majority of use cases.

Their example reminds me that we should have a method that repartitions a dataframe A based on partitions of a dataframe B.

Operations like this are already written in the original version. If people haven't already I encourage them to look through it.

@jorisvandenbossche
Copy link
Member Author

Thanks Matthew for chiming in. I agree with having the spatial partitions as a GeoSeries, it makes writing code very natural (I updated your original spatial join implementation last week -> #54, mostly an update for dask API changes for now).

I also want to emphasize again that it is really not straightforward to have "duplicated" rows (to support non-overlapping spatial partitions). That will certainly be an interesting model for certain applications, but that also means that you basically have to implement that model with dask from scratch, instead of basing it on dask.dataframe (as we currently do). I personally think that for our current efforts in this repo, reusing the logic of dask.dataframe is the best option.

It might still be useful to have readers that are using a spatial intersection to query data from some source, but then the "deduplication" step (eg based on containment of representative point) will need to happen during IO, so that once the data is read and materialized as a geopandas.GeoDataFrame inside a partition, it can be used as is.

I wouldn't expect this to happen until you were well beyond thousands of partitions, which I think already covers the vast majority of use cases.

Indeed, for a number of partitions in the thousands, doing a plain gdf.spatial_partitions.intersects(..) predicate (or spatial join with partitions of another dataframe) with geopandas is not going to be significantly slower (or maybe even faster) as some spatial index query.

Their example reminds me that we should have a method that repartitions a dataframe A based on partitions of a dataframe B.

Operations like this are already written in the original version. If people haven't already I encourage them to look through it.

Yes, we indeed should. As mentioned above, I already ported the sjoin algorithm, and for the demo notebook (https://gist.github.com/jorisvandenbossche/20d3bd91b2d0db2522148e5c2c84d206) I also needed the "repartition given known regions", so I need to open a PR with the ported version of that as well. And we should see if we can reuse a bunch of the tests as well.

@jorisvandenbossche
Copy link
Member Author

We are speaking about "spatial indexing / partitioning" a lot here, but I think there are two distinct aspects:

  1. How to store the "spatial partitioning" information, and how to query this information to retrieve a subset of the partitions you need for a certain operation.
    Currently this is the spatial_partitions attribute which stores a polygon for each partition (and this polygon is expected to cover all geometries in the partition) as a geopandas.GeoSeries on the dask_geopandas.GeoDataFrame. But it could be discussed if we want to use some spatial index also at this level.
  2. How to repartition a dataframe into efficient spatial partitions (or how to determine optimal spatial partitions before reading in the actual data, eg by first only reading in bounding boxes).
    For this repartition step, there are some "easy" options (rectangular grid, based on known regions or an attribute), or this can be done with more advanced spatial indexing algorithms to find a good structure in the space of geometries.

I think several mentions of spatial indexing methods above are rather for the second aspect, but I would propose to open a separate issue for the repartitioning goal, and focus in this issue on the actual spatial partitioning concept and mechanism of the GeoDataFrame (the first aspect).

@caspervdw
Copy link

I also want to emphasize again that it is really not straightforward to have "duplicated" rows (to support non-overlapping spatial partitions). That will certainly be an interesting model for certain applications, but that also means that you basically have to implement that model with dask from scratch, instead of basing it on dask.dataframe (as we currently do). I personally think that for our current efforts in this repo, reusing the logic of dask.dataframe is the best option.

Agreed. I really liked the idea at first, but you're right. I'll stop talking about this.

As @mrocklin said, partitioning data will be expensive for many datasources. We could allow some Partitioner object to get partitions another datastorage, or compute partitions on the spot. Either way, there has to be some work done before someone can start building a dask-geopandas computation, or you can't take advantage of spatial pruning. Isn't there a way around this? Can we make the object representing the layer some sort of a lazy object that maybe only executes once the computation is scheduled?

There seems to be a lot of progress in this area (e.g. https://coiled.io/dask-under-the-hood-scheduler-refactor/) and I wonder if we can make this such that the partitions evaluate lazily as an iterator. I think this is a question that is in scope here, because choosing for e.g. a Geoseries to implement the partitions may limit our options.

Another question that someone else may be able to answer: does the scheduler have access to the same packages (GEOS, PyGEOS, geopandas) as the workers?

Coming back to the discussion about bbox vs. polygons. Initially, I had the feeling that packing partitions inside a Geoseries (which contains an ndarray of objects with a pointer to an opaqua GEOS object) seemed to be a sluggish way to do it, if you could also do it with an N,4 ndarray of floats. But I did some timings and pygeos is actually 'only' 35% slower than intersecting bounding boxes in pure numpy, which is pretty neat.

I am still slightly worried about the polygons though. I have encountered some cases of severe "oversampling" in sourcedata, e.g. a arc that gets a point every centimeter, giving single polygons ~100 megabytes. When computing the convex hull of that, these terrible things might get into the partitions. Using bounding boxes, you just can't have that issue.

So we could leave this option open and require partitions to be an object exposing a part of the Geoseries interface. Then the implementation advantages @mrocklin describes still hold. I wonder what part of the Geoseries interface was required for implementing dask-geopandas?

@jorisvandenbossche
Copy link
Member Author

Yes, I think it will indeed be interesting to investigate how we can leverage the work around high-level graphs to optimize this.

Another question that someone else may be able to answer: does the scheduler have access to the same packages (GEOS, PyGEOS, geopandas) as the workers?

We can assume that, yes, but that is something you need to ensure it's the case as the user (or dev ops providing the client/scheduler environments).

I wonder what part of the Geoseries interface was required for implementing dask-geopandas?

At the moment just the intersects method, and the fact that it can be turned into a GeoDataFrame to pass to sjoin (and I also used the plot method to inspect it).
But I think this is still something that can rather easily be changed/optimized later in case we want to make a specialized object instead of reusing GeoSeries, if we run into issues with it.

@TomAugspurger
Copy link
Contributor

What's the plan for dask_geopandas.GeoDataFrame.set_index("<geometry_column>")?

dask.dataframe's set_index differs from pandas since it sorts / shuffles the data by the column being assigned to the index and then partitions the output dataframe. This divergence from pandas enables fast random access to index labels. Does geopandas want something similar for set_index? (I believe that currently set_index raises while trying to compute the min / max to do the partitioning).

@martinfleis
Copy link
Member

What's the plan for dask_geopandas.GeoDataFrame.set_index("<geometry_column>")?

I don't think there are any. GeometryArray currently doesn't support sorting. I'd say that the best solution, for now, would be to raise NotImplementedError.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

8 participants