# Dask - GeoPandas Example

*Rob Knapen, Wageningen Environmental Research*
<br>

A notebook for trying out the Dask framework (as alternative to PySpark) with GeoPandas. This could be useful for processing large datasets of species observations.

### Import Python packages

In [1]:
import os
os.environ['USE_PYGEOS'] = '0'

import pandas as pd
import numpy as np

import dask.dataframe as dd
import dask.array as da
import dask.bag as db
from dask.distributed import Client

import geopandas as gp
import dask_geopandas as dgp

### Start a Dask client
Get a client for the dummy local Dask 'cluster', and the IP for the dashboard.

In [2]:
dask_client = Client()
dask_client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 50825 instead
2023-05-03 20:08:18,345 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-5ahw3yqj', purging
2023-05-03 20:08:18,345 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-3i8tz3wr', purging
2023-05-03 20:08:18,345 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-oatcw_u3', purging
2023-05-03 20:08:18,345 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-27cz5dic', purging
2023-05-03 20:08:18,345 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/w

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:50825/status,

0,1
Dashboard: http://127.0.0.1:50825/status,Workers: 5
Total threads: 10,Total memory: 32.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:50826,Workers: 5
Dashboard: http://127.0.0.1:50825/status,Total threads: 10
Started: Just now,Total memory: 32.00 GiB

0,1
Comm: tcp://127.0.0.1:50839,Total threads: 2
Dashboard: http://127.0.0.1:50841/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:50829,
Local directory: /var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-axn6r5qj,Local directory: /var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-axn6r5qj

0,1
Comm: tcp://127.0.0.1:50846,Total threads: 2
Dashboard: http://127.0.0.1:50850/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:50830,
Local directory: /var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-olodqxck,Local directory: /var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-olodqxck

0,1
Comm: tcp://127.0.0.1:50845,Total threads: 2
Dashboard: http://127.0.0.1:50848/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:50831,
Local directory: /var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-_gmtewez,Local directory: /var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-_gmtewez

0,1
Comm: tcp://127.0.0.1:50840,Total threads: 2
Dashboard: http://127.0.0.1:50843/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:50832,
Local directory: /var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-61du8qce,Local directory: /var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-61du8qce

0,1
Comm: tcp://127.0.0.1:50847,Total threads: 2
Dashboard: http://127.0.0.1:50852/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:50833,
Local directory: /var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-dqjmme1t,Local directory: /var/folders/71/dq62wtb15f9ckfvyn853m3d80000gn/T/dask-worker-space/worker-dqjmme1t


### Load sample species observation data
As an example a dataset from the Dutch 'Nationale Databank Flora en Fauna' (ndff.nl). While we are hoping for this to be available as open data (soon), it is not yet. However, we have permission to use it for the FAIRiCUBE EU project.

In [3]:
# the NDFF datafile to process
species_filename = "../../../local/data/ndff/broedvogels_2016.csv"

# the columns to drop right away
always_drop_cols_from_source = None

In [4]:
# load the data into a regular pandas dataframe
species_df = pd.read_csv(species_filename, header='infer', sep=';', on_bad_lines='warn')

# remove not needed columns
if always_drop_cols_from_source:
    species_df.drop(columns=always_drop_cols_from_source, inplace=True)

# remove the crs prefix from the wkt data
species_df['wkt_excl_crs'] = species_df['wkt'].map(lambda x: x.split(';')[1], na_action='ignore')
species_df.drop(columns=['wkt'], inplace=True)
species_df.rename(columns={'wkt_excl_crs': 'wkt'}, inplace=True)

species_df.head(5)

Unnamed: 0,nl_name,sci_name,jaar,countsubject,orig_abundance,straal,wkt
0,Krakeend,Anas strepera,2016,territorium,1,71,"POLYGON((247282 584478,247280.535533906 584474..."
1,Zwartkop,Sylvia atricapilla,2016,territorium,1,283,"POLYGON((65405.294923655 436165.134083641,6540..."
2,Zanglijster,Turdus philomelos,2016,territorium,1,283,"POLYGON((188642.918478918 344195.927912087,188..."
3,Zanglijster,Turdus philomelos,2016,territorium,1,283,"POLYGON((169116.484375 446596.0625,169113.5554..."
4,Spreeuw,Sturnus vulgaris,2016,nest,1,283,"POLYGON((127692 427328,127689.071067812 427320..."


In [5]:
# load the data into a dask dataframe

# read a Dask DataFrame
species_dd = dd.read_csv(
    species_filename,
    header='infer',
    sep=';',
    on_bad_lines='warn',
    dtype={ 'orig_abundance': 'object'} # because of '*' used as abundance value
)

# remove not needed columns, note that Dask DataFrames are immutable (unlike regular Pandas)
if always_drop_cols_from_source:
    species_dd = species_dd.drop(columns=always_drop_cols_from_source)

# remove the crs prefix from the wkt data
# note that map with a custom function needs additional meta info
species_dd['wkt_excl_crs'] = species_dd['wkt'].map(
    lambda x: x.split(';')[1],
    na_action='ignore',
    meta=pd.Series(dtype='str'))

species_dd = species_dd.drop(columns=['wkt'])
species_dd = species_dd.rename(columns={'wkt_excl_crs': 'wkt'})

species_dd.head(5)

Unnamed: 0,nl_name,sci_name,jaar,countsubject,orig_abundance,straal,wkt
0,Krakeend,Anas strepera,2016,territorium,1,71,"POLYGON((247282 584478,247280.535533906 584474..."
1,Zwartkop,Sylvia atricapilla,2016,territorium,1,283,"POLYGON((65405.294923655 436165.134083641,6540..."
2,Zanglijster,Turdus philomelos,2016,territorium,1,283,"POLYGON((188642.918478918 344195.927912087,188..."
3,Zanglijster,Turdus philomelos,2016,territorium,1,283,"POLYGON((169116.484375 446596.0625,169113.5554..."
4,Spreeuw,Sturnus vulgaris,2016,nest,1,283,"POLYGON((127692 427328,127689.071067812 427320..."


In [6]:
# Dask is lazy, need to call compute to get the result from a task graph
graph = species_dd['sci_name'].value_counts(sort=True, dropna=True)
graph.compute()

sci_name
Sylvia atricapilla         22475
Troglodytes troglodytes    18088
Phylloscopus collybita     16349
Sylvia communis            14712
Phylloscopus trochilus     14189
                           ...  
Pernis apivorus               78
Spinus spinus                 77
Serinus serinus               53
Ardea purpurea                42
Corvus corax                  34
Name: count, Length: 129, dtype: int64

### Create a GeoPandas DataFrame
The observations have spatial attributes, so lift them into a GeoPandas DataFrame to be able to process them.

Note that there is a dask-geopandas package that bridges Dask with GeoPandas.

In [7]:
# construct a GeoDataFrame, with the data using the Dutch RD coordinate reference system

# note that we used the pandas dataframe
gs = gp.GeoSeries.from_wkt(species_df['wkt'])
species_gdf = gp.GeoDataFrame(species_df, geometry=gs, crs='EPSG:28992')

# transform the dataset to the more common WGS84 (unprojected) CRS
species_gdf.to_crs(crs="EPSG:4326", inplace=True)
species_gdf.drop(columns=['wkt'], inplace=True)

species_gdf.head(5)

Unnamed: 0,nl_name,sci_name,jaar,countsubject,orig_abundance,straal,geometry
0,Krakeend,Anas strepera,2016,territorium,1,71,"POLYGON ((6.76944 53.23891, 6.76942 53.23888, ..."
1,Zwartkop,Sylvia atricapilla,2016,territorium,1,283,"POLYGON ((4.08519 51.90677, 4.08514 51.90670, ..."
2,Zanglijster,Turdus philomelos,2016,territorium,1,283,"POLYGON ((5.86735 51.08631, 5.86731 51.08625, ..."
3,Zanglijster,Turdus philomelos,2016,territorium,1,283,"POLYGON ((5.59280 52.00755, 5.59276 52.00749, ..."
4,Spreeuw,Sturnus vulgaris,2016,nest,1,283,"POLYGON ((4.99101 51.83388, 4.99097 51.83382, ..."


### Create a Dask GeoDataFrame
Turn a regular geodataframe into a Dask geodataframe that support lazy graphs computed on a cluster.

In [8]:
%%capture --no-display
# (hides warning about sending large graph)

# create a dask geodataframe
species_gdd = dgp.from_geopandas(species_gdf, npartitions=4)
species_gdd.compute()

Unnamed: 0,nl_name,sci_name,jaar,countsubject,orig_abundance,straal,geometry
0,Krakeend,Anas strepera,2016,territorium,1,71,"POLYGON ((6.76944 53.23891, 6.76942 53.23888, ..."
1,Zwartkop,Sylvia atricapilla,2016,territorium,1,283,"POLYGON ((4.08519 51.90677, 4.08514 51.90670, ..."
2,Zanglijster,Turdus philomelos,2016,territorium,1,283,"POLYGON ((5.86735 51.08631, 5.86731 51.08625, ..."
3,Zanglijster,Turdus philomelos,2016,territorium,1,283,"POLYGON ((5.59280 52.00755, 5.59276 52.00749, ..."
4,Spreeuw,Sturnus vulgaris,2016,nest,1,283,"POLYGON ((4.99101 51.83388, 4.99097 51.83382, ..."
...,...,...,...,...,...,...,...
454447,Kuifmees,Lophophanes cristatus,2016,levend exemplaar,1,283,"POLYGON ((5.30305 52.10910, 5.30301 52.10903, ..."
454448,Groene specht,Picus viridis,2016,territorium,1,283,"POLYGON ((5.84707 51.34738, 5.84703 51.34732, ..."
454449,Goudhaan,Regulus regulus,2016,levend exemplaar,1,283,"POLYGON ((5.23365 52.18130, 5.23361 52.18124, ..."
454450,Appelvink,Coccothraustes coccothraustes,2016,territorium,1,71,"POLYGON ((6.03809 52.41482, 6.03807 52.41479, ..."


In [9]:
%%capture --no-display
# (hides warning about sending large graph)

species_gdd['sci_name'].value_counts(sort=True, dropna=True).compute()

sci_name
Sylvia atricapilla         22475
Troglodytes troglodytes    18088
Phylloscopus collybita     16349
Sylvia communis            14712
Phylloscopus trochilus     14189
                           ...  
Pernis apivorus               78
Spinus spinus                 77
Serinus serinus               53
Ardea purpurea                42
Corvus corax                  34
Name: count, Length: 129, dtype: int64

In [10]:
%%capture --no-display
# (hides warning about area calculation on non-projected data)

species_gdd.geometry.area.compute()


  df = func(*args, **kwargs)


0         9.515338e-09
1         3.693998e-08
2         3.628487e-08
3         3.702603e-08
4         3.688382e-08
              ...     
454447    3.710962e-08
454448    3.649180e-08
454449    3.716920e-08
454450    9.340626e-09
454451    3.732582e-10
Length: 454452, dtype: float64


  df = func(*args, **kwargs)

  df = func(*args, **kwargs)

  df = func(*args, **kwargs)


In [11]:
dask_client.close()