# Hyperparameter Optimization with Dask and Coiled

This example will walk through the following:

* **Getting and processing the data.**
* **Defining a model and parameters.**
* **Finding the best parameters,** and some details on why we're using the chosen search algorithm.
* **Scoring** and deploying.

All of these tasks will be performed on the New York City Taxi Cab dataset.

## Setup cluster

In [None]:
# Create cluster with Coiled
import coiled

cluster = coiled.Cluster(
    n_workers=20,
    software="examples/hyperband-optimization",
)

In [None]:
# Connect Dask to the cluster
import dask.distributed

client = dask.distributed.Client(cluster)
client

#### ☝️ Don’t forget to click the "Dashboard" link above to view the cluster dashboard!

## Get and pre-process data

This example will mirror the Kaggle "[NYC Taxi Trip Duration][1]" example with different data.

These data have records on 84 million taxi rides.

[1]:https://www.kaggle.com/c/nyc-taxi-trip-duration/

In [None]:
import dask.dataframe as dd

features = ["passenger_count", "trip_distance", "fare_amount"]
categorical_features = ["RatecodeID", "payment_type"]
output = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]

df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv", 
    parse_dates=output,
    usecols=features + categorical_features + output,
    dtype={
        "passenger_count": "UInt8",
        "RatecodeID": "category",
        "payment_type": "category",
    },
    blocksize="16 MiB",
)

df = df.repartition(partition_size="10 MiB").persist()

# one hot encode the categorical columns
df = df.categorize(categorical_features)
df = dd.get_dummies(df, columns=categorical_features)

# persist so only download once
df = df.persist()

data = df[[c for c in df.columns if c not in output]]
data = data.fillna(0)

In [None]:
durations = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds() / 60  # minutes

In [None]:
from dask_ml.model_selection import train_test_split
import dask

X = data.to_dask_array(lengths=True).astype("float32")
y = durations.to_dask_array(lengths=True).astype("float32")
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=2, shuffle=True)

# persist the data so it's not re-computed
X_train, X_test, y_train, y_test = dask.persist(X_train, X_test, y_train, y_test)

## Define model and hyperparameters

Let's use a simple neural network from [PyTorch] using [Skorch], a simple wrapper that provides a Scikit-Learn API for PyTorch.

This network is only small for demonstration. If desired, we could use much larger networks on GPUs.

[PyTorch]:https://pytorch.org/
[skorch]:https://skorch.readthedocs.io/en/stable/

In [None]:
# Import our HiddenLayerNet pytorch model from a local torch_model.py module
from torch_model import HiddenLayerNet
# Send module with HiddenLayerNet to workers on cluster
client.upload_file("torch_model.py")

In [None]:
# Print contents of torch_model.py module
!cat torch_model.py

In [None]:
import torch
import torch.optim as optim
import torch.nn as nn
from skorch import NeuralNetRegressor

niceties = {
    "callbacks": False,
    "warm_start": True,
    "train_split": None,
    "max_epochs": 1,
}

class NonNanLossRegressor(NeuralNetRegressor):
    def get_loss(self, y_pred, y_true, X=None, training=False):
        if torch.abs(y_true - y_pred).abs().mean() > 1e6:
            return torch.tensor([0.0], requires_grad=True)
        return super().get_loss(y_pred, y_true, X=X, training=training)

model = NonNanLossRegressor(
    module=HiddenLayerNet,
    module__n_features=X_train.shape[1],
    optimizer=optim.SGD,
    criterion=nn.MSELoss,
    lr=0.0001,
    **niceties,
)

In [None]:
from scipy.stats import loguniform, uniform

params = {
    "module__activation": ["relu", "elu", "softsign", "leaky_relu", "rrelu"],
    "batch_size": [32, 64, 128, 256],
    "optimizer__lr": loguniform(1e-4, 1e-3),
    "optimizer__weight_decay": loguniform(1e-6, 1e-3),
    "optimizer__momentum": uniform(0, 1),
    "optimizer__nesterov": [True],
}

All of these parameters control model architecture, execpt for two basic optimizatino parameters, `batch_size` and `learning_rate_init`. They control finding the best model of a particular architecture.

## Find the best hyperparameters

Our search is "computationally-constrained" because (hypothetically) it requires GPUs and has a pretty complicated search space (in reality it has neither of those features). And obviously it's "memory-constrained" because the dataset doesn't fit in memory.

[Dask-ML's documentation on hyperparameter searches][2] indicates that we should use `HyperbandSearchCV`.

[2]:https://ml.dask.org/hyper-parameter-search.html

In [None]:
from dask_ml.model_selection import HyperbandSearchCV
search = HyperbandSearchCV(model, params, random_state=2, verbose=True, max_iter=9)

By default, `HyperbandSearchCV` will call `partial_fit` on each chunk of the Dask Array. `HyperbandSearchCV`'s rule of thumb specifies how to train for longer or sample more parameters.

In [None]:
y_train2 = y_train.reshape(-1, 1).persist()
search.fit(X_train, y_train2)

## Score

`HyperbandSearchCV` and the like mirror the Scikit-Learn model selection interface, so all attributes of Scikit-Learn's [RandomizedSearchCV][rscv] are available:

[rscv]:https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.RandomizedSearchCV.html

In [None]:
search.best_score_

In [None]:
search.best_params_

In [None]:
search.best_estimator_

This means we can deploy the best model and score on the testing dataset:

In [None]:
from dask_ml.wrappers import ParallelPostFit
deployed_model = ParallelPostFit(search.best_estimator_)
deployed_model.score(X_test, y_test)