Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add shuffle #104

Closed
wants to merge 3 commits into from
Closed

ENH: Add shuffle #104

wants to merge 3 commits into from

Conversation

tastatham
Copy link
Contributor

Implement shuffle for Dask-GeoPandas. This allows for spatially shuffling/partitioning a Dask-GeoPandas object using one of the three methods (hilbert, morton or geohash) or a user defined column.

Copy link
Member

@martinfleis martinfleis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into that.

It may also be useful to explore shuffle method of dask.dataframe (https://github.com/dask/dask/blob/4229c16cf0cc7ed5f60f313618680bfd04e381ea/dask/dataframe/shuffle.py#L301). We may be able to use that instead of set_index to avoid some overhead.

dask_geopandas/core.py Outdated Show resolved Hide resolved
dask_geopandas/core.py Outdated Show resolved Hide resolved
dask_geopandas/core.py Show resolved Hide resolved
dask_geopandas/core.py Outdated Show resolved Hide resolved
dask_geopandas/core.py Outdated Show resolved Hide resolved
@martinfleis
Copy link
Member

Just dropping here a note from @mrocklin from dask/dask#8075 (comment) for a reference

Also, as a heads-up, I'm working on a newer algorithm here. The API will
still be the same (I saw that you're doing this for geopandas) but there
may be new algorithms to learn from in the future that may perform slightly
better at large scale.

@TomAugspurger
Copy link
Contributor

An alternative to a separate spatial_shuffle is to override the parent shuffle method (use the same keywords) but have an additional keyword like method="hilbert". Then you'd call the parent method with on=self.hilbert_distance(self[on])

In dask.dataframe.DataFrame, on is required but I think it'd be fine to set the default to None and use the primary geometry column by default.

@gjoseph92
Copy link

I also recommend @TomAugspurger's approach. Calling into super().shuffle(...) seems better than fully overriding the method. Plus, there are some improvements to shuffling coming up that it would be easier to benefit from with this method.

Seems like if on is None, then you want super().shuffle(self, hash_method(self), ...), otherwise just super().shuffle(self, on, ...). (Note that shuffling also isn't quite the same as set_index.)

@tastatham
Copy link
Contributor Author

tastatham commented Oct 7, 2021

@martinfleis, I have made the changes based no your suggestions.

  • The function is now using Dasks shuffle method instead of set_index.
  • I have temporarily named the function spatial_shuffle instead of shuffle but we can discuss this in our next meeting.
  • I have added partitions as an additional argument - whether to calculate the spatial partitions or not
  • I had to add set_geometry to ensure geometries are retained, as discussed in BUG: Computing a dask shuffle returns a pd.DataFrame, not gpd.GeoDataFrame #116
  • I also dropped the column parameter and instead computed the partitioning information outside of the spatial_function method. I felt this was both more interactive and cleaner but happy to retain this within the function.

A quick example is shown below;

import geopandas
import dask_geopandas

p = 10
npartitions = 20

gdf = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(gdf, npartitions=1)

ddf["hilbert"] = ddf.hilbert_distance(p)
shuffled_ddf = ddf.spatial_shuffle(on="hilbert", npartitions=npartitions, partitions=True)
print(shuffled_ddf)

shuffled_ddf.visualize()

@martinfleis
Copy link
Member

That wouldn't work, see the resulting spatial partitions:

Screenshot 2021-10-07 at 16 32 27

shuffle doesn't work in the same way as set_index. See the docs:

Uses hashing of on to map rows to output partitions. After this operation, rows with the same value of on will be in the same partition.

It means that it doesn't do the sorting. What we would need here is to get bins and use bin labels as on in shuffle.

Even if it worked, my proposal was slightly different. I was suggesting that we do the calculation of e.g. hilbert distance under the hood, something along these lines:

def spatial_shuffle(self, on="hilbert", npartitions=20, partitions=True, **kwargs):
    if on == "hilbert":
        on = self.hilbert_distance()
    elif on == "morton":
        on = self.morton_distance()

    # do the actual shuffle here

@martinfleis
Copy link
Member

@tastatham To get closer to the actual release, I have opened #131 to implement spatial_shuffle based on set_index there. That should free a bit of your time and you can finish that notebook with documentation to wrap up GSoC.

@martinfleis
Copy link
Member

Superseded by #131

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants