# High Performance Jupyter

## Scale out with Dask

|<img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg" width="400" /> | <img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg" width="400" /> | <img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg" width="400" />|
| -- | -- | -- |

We will do the same analysis as [dask.ipynb](dask.ipynb) except now on a cluster!

AWS EC2 instances: 5 r5.xlarge (2 CPU, 16GB RAM)

We are running in [Saturn Cloud](https://www.saturncloud.io/) so we are using a `SaturnCluster`, but Dask supports many other cluster deployment tools such as [YARN](https://yarn.dask.org/en/latest/) or [Kubernetes](https://docs.dask.org/en/latest/setup/kubernetes.html).

As of September 2020, Saturn doesn't yet support the Dask JupyterLab extension for monitoring a `SaturnCluster` (it's on the roadmap). For now you can see the same visualizations in a separate window via the Dashboard link. 

In [1]:
from dask.distributed import Client
from dask_saturn import SaturnCluster

cluster = SaturnCluster(
    n_workers=5,
    worker_size='xlarge', 
    scheduler_size='xlarge',
    nthreads=2,  
    # its useful to set nthreads to the number of CPU cores on each machine
)
client = Client(cluster)
cluster

[2020-09-26 20:39:46] INFO - dask-saturn | Cluster is ready


VBox(children=(HTML(value='<h2>SaturnCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n   …

<br>The scheduler might be ready before all the workers are. We'll wait until all the workers are up.

In [2]:
client.wait_for_workers(5)

In [3]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import datetime
import s3fs
import warnings
warnings.simplefilter("ignore")

data_path = 's3://nyc-tlc/trip data'
seed = 42

# Load and explore data

Load the data for all of 2019. Note that when working with a Dask cluster each worker is a separate machine, so they do not share filesystems. This is not a problem for our case because we're already loading the data from S3.

In [4]:
%%time

taxi = dd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv',
    assume_missing=True,
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
)

CPU times: user 121 ms, sys: 24.2 ms, total: 145 ms
Wall time: 242 ms


In [5]:
%%time
print(f"Row count: {len(taxi)}")
print(f"Size in GB: {taxi.memory_usage(deep=True).sum().compute() / 1e9}")

Row count: 84399019
Size in GB: 16.367014316
CPU times: user 135 ms, sys: 6.54 ms, total: 142 ms
Wall time: 1min 10s


In [6]:
%%time
np.round(taxi.describe().compute(), 3).T

CPU times: user 3.61 s, sys: 43.9 ms, total: 3.66 s
Wall time: 48.7 s


Unnamed: 0,count,mean,std,min,25%,50%,75%,max
VendorID,84152418.0,1.645,0.498,1.0,1.0,2.0,2.0,4.0
passenger_count,84152418.0,1.563,1.208,0.0,1.0,1.0,2.0,9.0
trip_distance,84399019.0,3.001,8.091,-37264.53,1.07,1.93,8.82,45977.22
RatecodeID,84152418.0,1.061,0.76,1.0,1.0,1.0,1.0,99.0
PULocationID,84399019.0,163.158,66.016,1.0,132.0,162.0,234.0,265.0
DOLocationID,84399019.0,161.353,70.251,1.0,116.0,163.0,236.0,265.0
payment_type,84152418.0,1.289,0.479,1.0,1.0,1.0,2.0,5.0
fare_amount,84399019.0,13.344,174.375,-1856.0,7.0,11.0,32.04,943274.8
extra,84399019.0,1.087,1.249,-60.0,0.0,1.0,3.0,535.38
mta_tax,84399019.0,0.495,0.067,-0.5,0.5,0.5,0.5,212.42


# Feature engineering

Same feature engineering from [laptop.ipynb](laptop.ipynb), using the same code!

In [7]:
numeric_feat = [
    'pickup_weekday', 
    'pickup_hour', 
    'pickup_week_hour', 
    'pickup_minute', 
    'passenger_count',
]
categorical_feat = [
    'PULocationID', 
    'DOLocationID',
]
features = numeric_feat + categorical_feat
y_col = 'high_tip'

In [8]:
def prep_df(df: dd.DataFrame) -> dd.DataFrame:
    '''
    Generate features from a raw taxi dataframe.
    '''
    df = df[df.fare_amount > 0]  # avoid divide-by-zero
    df['tip_fraction'] = df.tip_amount / df.fare_amount
    df['high_tip'] = (df['tip_fraction'] > 0.2) # class label
    
    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.weekday
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.weekofyear
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [y_col]].astype(float).fillna(-1)
    
    return df
    
taxi = prep_df(taxi)

In [9]:
taxi.head()

Unnamed: 0,pickup_weekday,pickup_hour,pickup_week_hour,pickup_minute,passenger_count,PULocationID,DOLocationID,high_tip
0,1.0,0.0,24.0,46.0,1.0,151.0,239.0,1.0
1,1.0,0.0,24.0,59.0,1.0,239.0,246.0,0.0
2,4.0,13.0,109.0,48.0,3.0,236.0,236.0,0.0
3,2.0,15.0,63.0,52.0,5.0,193.0,193.0,0.0
4,2.0,15.0,63.0,56.0,5.0,193.0,193.0,0.0


<br>

Since we're using a cluster with lots of RAM, we can call `persist()` on the dataframe to avoid repeated CSV loading in downstream processing. This tells Dask to execute the task graph that exists up to this point and hold the results in memory. 


The function returns a [future](https://docs.dask.org/en/latest/futures.html) which continues to execute in the background until it's complete. To wait until execution is complete, we run `wait()`.

In [10]:
%%time
from dask.distributed import wait

taxi = taxi.persist()
_ = wait(taxi)

CPU times: user 172 ms, sys: 3.69 ms, total: 176 ms
Wall time: 40.5 s


Notice now that our commands run super fast!

In [11]:
%%time
len(taxi)

CPU times: user 34.9 ms, sys: 3.96 ms, total: 38.8 ms
Wall time: 108 ms


84194625

In [12]:
%%time
np.round(taxi.describe().compute(), 3).T

CPU times: user 1.87 s, sys: 23.8 ms, total: 1.9 s
Wall time: 7.98 s


Unnamed: 0,count,mean,std,min,25%,50%,75%,max
pickup_weekday,84194625.0,2.977,1.933,0.0,2.0,4.0,6.0,6.0
pickup_hour,84194625.0,13.89,6.021,0.0,12.0,16.0,22.0,23.0
pickup_week_hour,84194625.0,85.35,46.356,0.0,62.0,111.0,166.0,167.0
pickup_minute,84194625.0,29.564,17.34,0.0,15.0,30.0,45.0,59.0
passenger_count,84194625.0,1.555,1.214,-1.0,1.0,1.0,2.0,9.0
PULocationID,84194625.0,163.161,66.011,1.0,132.0,162.0,234.0,265.0
DOLocationID,84194625.0,161.342,70.245,1.0,116.0,163.0,236.0,265.0
high_tip,84194625.0,0.541,0.498,0.0,0.0,1.0,1.0,1.0


# Hyperparameter tuning

Use a simiarly-sized sample as [laptop.ipynb](laptop.ipynb) for comparison purposes.

In [13]:
taxi_sample = taxi.sample(frac=0.0045, replace=False, random_state=seed)
taxi_sample = taxi_sample.persist()
_ = wait(taxi_sample)

len(taxi_sample)

378878

In [14]:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from dask_ml.compose import ColumnTransformer
from dask_ml.preprocessing import StandardScaler, DummyEncoder, Categorizer
from dask_ml.model_selection import GridSearchCV

lr = LogisticRegression(
    solver='saga',
    penalty='elasticnet', 
    l1_ratio=0.5,
    max_iter=100, 
    random_state=seed,
)
pipeline = Pipeline(steps=[
    ('categorize', Categorizer(columns=categorical_feat)),
    ('onehot', DummyEncoder(columns=categorical_feat)),
    ('scale', ColumnTransformer(transformers=[('num', StandardScaler(), numeric_feat)])),
    ('clf', lr),
])

params = {
    'clf__l1_ratio': [0.2, 0.3, 0.5, 0.7, 0.9],
}

grid_search = GridSearchCV(
    pipeline, 
    params,
    cv=3, 
    scoring='accuracy',
)

In [15]:
%%time
_ = grid_search.fit(taxi_sample[features], taxi_sample[y_col])
grid_search.best_score_

CPU times: user 122 ms, sys: 4.35 ms, total: 126 ms
Wall time: 14.6 s


0.5367321406890873

# Wait, there's more!

We're at a great place now. By using Dask across a cluster of machines we can analyze large amounts of data in a reasonable amount of time.

Everything we've been doing so far has been processing on CPUs, whether on a laptop or across a cluster. GPU computing brings crazy acceleration to these same workloads, as illustrated in the [rapids.ipynb](rapids.ipynb) notebook.