In [1]:
import geopandas
import dask_geopandas

# Dissolve with dask-geopandas

Dask-geopandas offers a distributed version of the `dissolve` method based on `dask.dataframe.groupby` method. It shares all benefits and caveats of the vanilla `groupby` with a few minor differences.

The geometry column is aggregated using the `unary_union` operation, which tends to be relatively costly. Therefore it is beneficial to try to use all workers in every step of the operation. GroupBy by default returns a result as a single partition, which in reality means that the final aggregation step is done within a single thread. For geometries, that means that the operation needs to _loop_ through groups of geometries coming from other partitions and call `unary_union` on every group one by one.

The number of output partitions can be specified using `split_out` keyword passed to aggregation. Therefore, if we set `split_out=16`, it will return 16 partitions (if there are 16 or more unique groups) each of which is processed by a different worker (if there are 16 or more workers). The final aggregation (`unary_union`) is then parallelised.

Dask-geopandas `dissolve` uses the same default number of output partitions (1) as dask.dataframe but it is recommended to change it to match at least the number of workers to get all the benefits of parallelised computation.

In [12]:
df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))

ddf = dask_geopandas.from_geopandas(df, npartitions=4)
ddf

Unnamed: 0_level_0,pop_est,continent,name,iso_a3,gdp_md_est,geometry
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
0,int64,object,object,object,float64,geometry
45,...,...,...,...,...,...
90,...,...,...,...,...,...
135,...,...,...,...,...,...
176,...,...,...,...,...,...


Using default settings, you get a single partition:

In [13]:
dissolved = ddf.dissolve("continent")
dissolved

Unnamed: 0_level_0,pop_est,name,iso_a3,gdp_md_est,geometry
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,int64,object,object,float64,geometry
,...,...,...,...,...


You specify the number of partitions and get parallelised implmentation in every step.

In [14]:
dissolved_4parts = ddf.dissolve("continent", split_out=4)
dissolved_4parts

Unnamed: 0_level_0,pop_est,name,iso_a3,gdp_md_est,geometry
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,int64,object,object,float64,geometry
,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [15]:
dissolved_4parts.compute()

Unnamed: 0_level_0,pop_est,name,iso_a3,gdp_md_est,geometry
continent,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Europe,142257519,Russia,RUS,3745000.0,"MULTIPOLYGON (((-54.26971 2.73239, -54.18173 3..."
North America,35623680,Canada,CAN,1674000.0,"MULTIPOLYGON (((-61.66000 10.36500, -61.68000 ..."
Antarctica,4050,Antarctica,ATA,810.0,"MULTIPOLYGON (((-59.86585 -80.54966, -60.15966..."
Oceania,920938,Fiji,FJI,8374.0,"MULTIPOLYGON (((173.02037 -40.91905, 173.24723..."
Asia,18556698,Kazakhstan,KAZ,460700.0,"MULTIPOLYGON (((126.95724 -8.27334, 127.33593 ..."
Seven seas (open ocean),140,Fr. S. Antarctic Lands,ATF,16.0,"POLYGON ((68.93500 -48.62500, 69.58000 -48.940..."
South America,44293293,Argentina,ARG,879400.0,"MULTIPOLYGON (((-71.00568 -55.05383, -72.26390..."
Africa,53950935,Tanzania,TZA,150600.0,"MULTIPOLYGON (((49.86334 -16.45104, 49.77456 -..."


## Alternative solution

In some specific cases, `groupby` may not be the most performant option. If your GeoDataFrame fits fully in memory on a single computer, it may be faster to sort data based on the `by` column first and then map geopandas dissolve across the partitions. In that case, dask-geopandas calls `unary_union` only once per each geometry (`unary_union` is done twice in case of GroupBy). However, shuffling is an expensive operation. For larger data, this alternative solution is not feasible.

In [17]:
def dissolve_shuffle(ddf, by=None, **kwargs):
    """Shuffle and map partition"""

    meta = ddf._meta.dissolve(by=by, as_index=False, **kwargs)

    shuffled = ddf.shuffle(
        by, npartitions=ddf.npartitions, shuffle="tasks", ignore_index=True
    )

    return shuffled.map_partitions(
        geopandas.GeoDataFrame.dissolve, by=by, as_index=False, meta=meta, **kwargs
    )

In [18]:
shuffled = dissolve_shuffle(ddf, "continent")
shuffled

Unnamed: 0_level_0,continent,geometry,pop_est,name,iso_a3,gdp_md_est
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,object,geometry,int64,object,object,float64
,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


In [19]:
shuffled.compute()