## Scaling your workflow with Dask

This notebook goes through some examples using Dask to scale some common workflows on tabular data.

Dask works either on a single machine (executing in parallel using threads) or on a cluster of many machines. The examples here will run on a cluster, just for fun, but Dask is also useful for working with larger-than-memory datasets on a single machine.

When using Dask on a Cluster the typical pattern is to

1. Create a Dask Cluster (using one of our many [deployment options](https://docs.dask.org/en/latest/setup.html) that talks to the resource manager)
2. Connect to the cluster with a local Client

In [2]:
# This example uses Coiled, a for-profit company that will
# manage Dask deployments for you. You could also do it yourself
# and use one of
# * dask_ssh.SSHCluster()
# * dask_yarn.YarnCluster()
# * dask_jobqueue.PBSCluster()
# * dask_kubernetes.KubeCluster()
# * dask_gateway.GatewayCluster()
# * dask_cloudprovider.FargateCluster()
# * dask_cloudprovider.AzureMLCluster()
# * dask_saturn.SaturnCluster()
# * ...

import coiled
cluster = coiled.Cluster(n_workers=10, account="tomaugspurger")
cluster

Creating Cluster. This takes about a minute ...

KeyboardInterrupt: 

Once we have a cluster (coiled, PBS, kubernetes, or otherwise), connect to it. After this, all Dask-backed operations will happen on the cluster.

In [None]:
from distributed import Client

client = Client(cluster)
client

In [None]:
import pandas as pd

dtype={
    "payment_type": "UInt8",
    "VendorID": "UInt8",
    "passenger_count": "UInt8",
    "RatecodeID": "UInt8",
}

df = pd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv",
    dtype=dtype,
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    nrows=1000
)
df

In [None]:
counts = df.passenger_count.value_counts()
counts.sort_index().plot.bar(rot=0, width=1, color='k');

In [None]:
df.groupby("passenger_count").tip_amount.mean()

Dask DataFrame mimics the pandas API. This means *many* of the
APIs you're familiar with will work with Dask. There are often
some dask-specific keywords as well, reflecting the fact that
parallel / distributed computing has its own set of concerns.

Dask's readers typically accept a list of URLs / files, or a globstring indicating a list of files to read in.

In [None]:
import dask.dataframe as dd

ddf = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
    dtype=dtype,
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    storage_options={"anon": True},
    blocksize="16 MiB",
)
ddf

A few things to note:

1. We reused the `dtype` and `parse_dates` options, just like before
2. We have additional dask-specific options like `blocksize`
3. The result returned almost instantly
4. The values in the table aren't shown, just the structure (column names and dtypes)

Dask's high-level collections like `dask.dataframe` are lazy. The just do enough work to propagate metadata for operations, like the type of the output, the column names and dtypes, etc.

In [None]:
dask_counts = ddf.passenger_count.value_counts()
dask_counts

In [None]:
ax = (
    dask_counts
    .compute()
    .sort_index()
    .plot.bar(rot=0, width=1, color='k')
);

In [None]:
ddf = ddf.persist()

In [None]:
dask_counts = ddf.passenger_count.value_counts()
ax = (
    dask_counts
    .compute()
    .sort_index()
    .plot.bar(rot=0, width=1, color='k')
);

In [None]:
ddf.groupby("passenger_count").tip_amount.mean().compute()

In [None]:
ddf.fare_amount.quantile([0.25, 0.5, .75]).compute()

In [None]:
(ddf.fare_amount + ddf.tip_amount).head()

In [None]:
ddf.RatecodeID.isna().mean().compute()

### Dask is familiar

<img width="40%" src="https://docs.dask.org/en/latest/_images/dask-dataframe.svg"/>

We saw earlier that dask.dataframe mimics the pandas API. We could use the same keywords to get the same behavior. But perhaps more importantly, Dask feels familiar because it uses pandas to do dataframe operations. A `dask.dataframe.value_counts` is just a bunch of `pandas.value_counts` plus a bit of logic to combine the results. Dask Array is a bunch of NumPy arrays with some logic for how to work with them in parallel.

In [None]:
import geopandas

zones = geopandas.read_file("./taxi_zones")
zones.head()

In [None]:
zones.plot();

In [None]:
center = zones.geometry.centroid.to_crs(crs="EPSG:4326")
zones["lng"] = center.x
zones["lat"] = center.y
# for memory savings
zones['borough'] = zones['borough'].astype('category')

In [None]:
df[['PULocationID', 'DOLocationID']].head()

In [None]:
zones[['LocationID', 'borough', 'lat', 'lng']].rename(
    columns=lambda x: f"DO{x}"
)

In [None]:
df2 = pd.merge(
    df,
    zones[['LocationID', 'borough', 'lat', 'lng']].rename(
        columns=lambda x: f"DO{x}"
    )
)
df3 = pd.merge(
    df2,
    zones[['LocationID', 'borough', 'lat', 'lng']].rename(
        columns=lambda x: f"PU{x}"
    )
)
df3.head()

In [None]:
import numpy as np


def gcd(lat1, lng1, lat2, lng2):
    '''
    Calculate great circle distance.
    http://www.johndcook.com/blog/python_longitude_latitude/

    Parameters
    ----------
    lat1, lng1, lat2, lng2: float or array of float

    Returns
    -------
    distance:
      distance from ``(lat1, lng1)`` to ``(lat2, lng2)`` in kilometers.
    '''
    # python2 users will have to use ascii identifiers
    ϕ1 = np.deg2rad(90 - lat1)
    ϕ2 = np.deg2rad(90 - lat2)

    θ1 = np.deg2rad(lng1)
    θ2 = np.deg2rad(lng2)

    cos = (np.sin(ϕ1) * np.sin(ϕ2) * np.cos(θ1 - θ2) +
           np.cos(ϕ1) * np.cos(ϕ2))
    arc = np.arccos(cos)
    return arc * 6373

In [None]:
gcd(df3.PUlat, df3.PUlng, df3.DOlat, df3.DOlng)

In [None]:
ddf2 = dd.merge(
    ddf,
    zones[['LocationID', 'borough', 'lat', 'lng']].rename(
        columns=lambda x: f"DO{x}"
    )
)
ddf3 = dd.merge(
    ddf2,
    zones[['LocationID', 'borough', 'lat', 'lng']].rename(
        columns=lambda x: f"PU{x}"
    )
)
ddf3.head()

In [None]:
distance = gcd(ddf3.PUlat, ddf3.PUlng, ddf3.DOlat, ddf3.DOlng)
distance.head()

In [None]:
distance.quantile([0.1, 0.25, 0.5, 0.75, .9]).compute()

## Parallelizing Custom Code

Not all problems fit in the big array / big dataframe model. There's often some bespoke munging that needs to be done before data can be loaded into an array or dataframe. `dask.delayed` helps here.

In [None]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

In [None]:
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other

x = inc(1)
y = inc(2)
z = add(x, y)

In [None]:
import dask

In [None]:
%%time
# This runs immediately, all it does is build a graph

x = dask.delayed(inc)(1)
y = dask.delayed(inc)(2)
z = dask.delayed(add)(x, y)

In [None]:
%%time
# This actually runs the computation in parallel
z.compute()

In [None]:
z

In [None]:
z.visualize()

In [None]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [None]:
%%time
results = []
for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)

In [None]:
%%time
results = []
for x in data:
    y = dask.delayed(inc)(x)
    results.append(y)
    
total = sum(results)

In [None]:
%time total.compute()

In [None]:
total.visualize(rankdir="LR")

In [None]:
k
