In [1]:
import dask.dataframe as dd
import geopandas as gpd
import numpy as np
import pandas as pd
from shapely.wkt import loads

# change for different ref location for geometries csv
filepath = '../example_geometries.csv'

## 1. Load the data set

In [2]:
# set this so that you have ~2-4 partitions per worker
# e.g. 60 machines each with 4 workers should have ~500 partitions
nparts = 4
head_count = 500 # set to a # if you want to just do a subset of the total for faster ops, else None

# get original csv as pandas dataframe
pdf = pd.read_csv(filepath)[['id', 'geometry']].reset_index(drop=True)

# convert to geopandas dataframe
geometries = gpd.GeoSeries(pdf['geometry'].map(lambda x: loads(x)))
crs = {'init': 'epsg:32154'},
gdf = gpd.GeoDataFrame(data=pdf[['id']], crs=crs, geometry=geometries)

# trim if desired
if head_count is not None:
    gdf = gdf.head(head_count)
print('Working with a dataframe of length {}.'.format(len(gdf)))

# clean the ids column
gdf = gdf.drop('id', axis=1)
gdf['id'] = gdf.index
gdf['id'] = gdf['id'].astype(int)

# we need some generic column to perform the many-to-many join on
gdf = gdf.assign(tmp_key=0)

# then convert into a dask dataframe
gdf1 = gdf.copy()
ddf = dd.from_pandas(gdf1, name='ddf', npartitions=nparts)

Working with a dataframe of length 500.


## 2. Calculate distance matrix the old way

In [3]:
def calc_distances(grouped_result):
    # we just need one geometry from the left side because
    first_row = grouped_result.iloc[0]
    from_geom = first_row['geometry_from'] # a shapely object

    # and then convert to a GeoSeries
    to_geoms = gpd.GeoSeries(grouped_result['geometry_to'])

    # get an array of distances from the GeoSeries comparison
    distances = to_geoms.distance(from_geom)
    return distances.values

In [4]:
%%time

# use dask to calculate distance matrix with geopandas
tall_list = (dd.merge(ddf, gdf, on='tmp_key', suffixes=('_from', '_to'), npartitions=nparts).drop('tmp_key', axis=1))
distances = (tall_list.groupby('id_from').apply(calc_distances, meta=pd.Series()))
computed = distances.compute()

Wall time: 43 s


In [5]:
# show results
pd.Series(computed[0][:5])

0        0.000000
1    17777.332006
2    35648.613342
3    40575.208147
4    33844.863950
dtype: float64

## 3. Calculate distance matrix a new way

Calculate polygons' centroid distances instead of perimeter distances. If we don't need to know the edge-to-edge distance between our polygons, this is superior as it gives us a spatially-representative point for each polygon *and* most importantly allows us to vectorize our distance matrix computation.

Use a vectorized euclidean distance calculator (euclidean works fine if geometries are projected, as they are in your example data -- if they're *not* projected, use a vectorized great circle distance calculator like I wrote for OSMnx).

In [6]:
gdf2 = gdf.copy()

In [7]:
%%time

# convert polygons into xy centroids
centroids = gdf2.centroid
gdf2['x'] = centroids.map(lambda coords: coords.x)
gdf2['y'] = centroids.map(lambda coords: coords.y)
gdf2.drop('geometry', axis='columns', inplace=True) # makes merge faster and more memory efficient

# create OD pairs by a many-to-many merge and index by from/to keys
gdf_od = pd.merge(gdf2, gdf2, on='tmp_key', suffixes=('_from', '_to')).drop('tmp_key', axis=1)
gdf_od = gdf_od.set_index(['id_from', 'id_to'])

# calculate euclidean distance matrix, vectorized
x1 = gdf_od['x_from']
x2 = gdf_od['x_to']
y1 = gdf_od['x_from']
y2 = gdf_od['x_to']
dist_matrix = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)

Wall time: 148 ms


In [8]:
print(head_count ** 2)
print(len(dist_matrix))
dist_matrix.head()

250000
250000


id_from  id_to
0        0            0.000000
         1        15616.178292
         2        47755.558683
         3        47299.455346
         4        47182.117362
dtype: float64

#### Results:

This dropped our compute time from ~24 seconds to ~71 ms. And we've got a nice multiindex for storing this distance matrix efficiently.

## 4. Daskify the new technique

No longer is this an embarassingly parallel problem: given part 3 above, we can vectorize the calculation. At this point, the only reason to use dask or other big data solutions is because the data can't fit in memory, not because we need to divide it up among many workers. If we can fit the data in memory, it is almost surely faster to do this like part 3 above than experience the dask overhead. Also, dask doesn't support multiindexing, so we can't do the nice index like in part 3.

In [9]:
gdf3 = gdf.copy()

In [10]:
%%time

# convert polygons into xy centroids
centroids = gdf3.centroid
gdf3['x'] = centroids.map(lambda coords: coords.x)
gdf3['y'] = centroids.map(lambda coords: coords.y)
gdf3.drop('geometry', axis='columns', inplace=True) # makes merge faster and more memory efficient

# create a dask dataframe of OD pairs
ddf = dd.from_pandas(gdf3, name='ddf', npartitions=nparts)
ddf_od = dd.merge(ddf, gdf3, on='tmp_key', suffixes=('_from', '_to'), npartitions=nparts).drop('tmp_key', axis=1)

# calculate euclidean distance matrix, vectorized and with dask series
x1 = ddf_od['x_from']
x2 = ddf_od['x_to']
y1 = ddf_od['x_from']
y2 = ddf_od['x_to']
euclidean_distances = ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5
dist_matrix = euclidean_distances.compute()

Wall time: 144 ms


In [11]:
print(head_count ** 2)
print(len(dist_matrix))
dist_matrix.head()

250000
250000


0        0.000000
1    15616.178292
2    47755.558683
3    47299.455346
4    47182.117362
dtype: float64

#### Results:

Using dask increases our compute time from ~71 ms to ~108 ms, but allows for dividing the work among multiple machines if memory usage becomes the bottleneck. But, we don't have that nice multiindex for quick pairwise lookups.