# Different Approaches to Hyperparameter Optimisation using Dask

## The approaches covered in this notebook

This table presents the covered approaches in the order they should be considered in.
With an increasing index, the amount of _potential_ parallelism (see suitability and applicability) increases, thereby potentially increasing performance.

| Index | Name                             | Implementation         | Underlying Complexity | Applicability                                                            | Suitability                                          | Parallelism     | Overhead | Working |
|-------|----------------------------------|------------------------|-----------------------|--------------------------------------------------------------------------|------------------------------------------------------|-----------------|----------|---------|
|   1   | Dask-ML GridSearchCV             | Dask-ML                | Low                   | Estimators with **single-threaded**<br /> `fit()` methods                | $$n_{\text{fits}} \gg n_{\text{cores}}$$             | Low             | Low      | Yes     |
|   2   | Sequantial Fitting               | Dask backend           | Low                   | Estimators with **multi-threaded**<br />`fit()` (and `score()` methods)  | $$n_{\text{estimators}} \gg n_{\text{cores}}$$       | Low*            | High     | Yes     |
|   3   | `GridSearchCV` Modification      | Custom `GridSearchCV`  | Medium                | Estimators with **multi-threaded**<br /> `fit()` (and `score()` methods) | $$n_{\text{fits}} \gg n_{\text{workers}}$$           | Medium          | Medium   | Yes     |
|   4   | Custom RF Implementation         | Custom RF              | High                  | `RandomForestRegressor`                                                  | $$n_{\text{tree-fits}} \gg n_{\text{cores}}$$        | High            | High     | Yes     |
|   5   | Native scikit-learn GridSearchCV | Dask backend           | Low                   | General                                                                  | $$n_{\text{fits}} \gg n_{\text{cores}}$$             | High            | High     | No      |

\* For a single call to `fit()` this method would deserve a parallelism of 'High', but since each subsequent call needs to wait for the previous call to finish, a lot of time is potentially wasted when the suitability criterion $n_{\text{estimators}} \gg n_{\text{cores}}$ is not satisfied.

### Terminology
  - $n_{\text{estimators}}$ is the number of estimators (per forest)
  - $n_{\text{fits}} = n_{\text{parameters}} \times n_{\text{splits}}$ 
  - $n_{\text{tree-fits}} = n_{\text{fits}} \times n_{\text{estimators}}$ 
  - $n_{\text{workers}}$ is the number of Dask workers
  - $n_{\text{cores}}$ is the total number of cores available for all Dask workers
  
Note that $n_{\text{workers}} \le n_{\text{cores}}$, determining the ordering between 2 and 4.

### Example timing

#### Setup

 - $n$ workers (see table below)
   - LocalCluster (ie. low worker-worker and worker-scheduler latency and high bandwidth)
   - 5 threads per worker
 - Local scoring with 1 thread (for methods 2 & 4)
   - Method 2 does it sequentially after each fit
   - Method 4 carries out scoring asynchronously
 - `X` with shape `(int(2e5), 40)`
 - `y` with shape `(int(2e5),)`
 - The estimator used was the `RandomForestRegressor`
 - k-Fold cross-validation using 5 folds
 
The following parameters were used:
```python
parameters_RF = {
    "n_estimators": [50],
    "max_depth": [6, 9, 12],
    "min_samples_split": [2],
    "min_samples_leaf": [1, 5, 10],
}

default_param_dict = {
    "random_state": 1,
    "bootstrap": True,
    "max_features": "auto",
}
```

#### Timing results

| Index | Name                             | $$n_{workers}=3$$ Time (s) / (% of total) / Fraction of min                    | $$n_{workers}=5$$ Time (s) / (% of total) / Fraction of min |
|-------|----------------------------------|--------------------------------------------------------------------------------|-------------------------------------------------------------|
|   1   | Dask-ML GridSearchCV             | 932 / 27.6% / 1.22                                                             | 897 / 31.2% / 1.52                                          |
|   2   | Sequantial Fitting               | 921 / 27.2% / 1.21                                                             | 678 / 24.0% / 1.15                                          |
|   3   | `GridSearchCV` Modification      | 765 / 22.6% / 1.00                                                             | 655 / 23.2% / 1.11                                          |
|   4   | Custom RF Implementation         | 763 / 22.6% / 1.00                                                             | 590 / 20.9% / 1.00                                          |
|   5   | Native scikit-learn GridSearchCV | N/A                                                                            | N/A                                                         |

The ordering in the first table is 'obeyed' as we scale up the cluster, since methods 2-4 become much faster, while method 1 does not.
While the method pairs (1,2) and (3,4) performed roughly equally for 3 workers, methods 2 and 4 outperformed methods 1 and 3 respectively as the cluster is scaled up.
Note that these results are dependent on the exact workload, of course.
The fact that a `LocalCluster` is used here also needs to be taken into account, since a realistic cluster will suffer from reduced bandwidth and increased latency, favouring methods with reduced overhead (reduced communication).

## Initialisation

In [None]:
from itertools import product
from time import time

import numpy as np
from dask.distributed import Client, as_completed
from joblib import parallel_backend
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import KFold, cross_val_score
from tqdm import tqdm


class Time:
    def __init__(self, name=""):
        self.name = name

    def __enter__(self):
        self.start = time()

    def __exit__(self, type, value, traceback):
        print("Time taken for {}: {}".format(self.name, time() - self.start))

## Create a LocalCluster for demonstration purposes

In [None]:
# Used for `score()` on the current node in the case of `fit_dask_rf_grid_search_cv()` (method 4).
local_n_jobs = 1

threads_per_worker = 5
client = Client(
    n_workers=3,
    threads_per_worker=threads_per_worker,
    # This resource specification is required by `DaskGridSearchCV`.
    resources={"threads": threads_per_worker},
)
client

## Evaluate Different Approaches

### Define Common Parameters and Data

In [None]:
# Define the common training and test data.
np.random.seed(1)
X = np.random.random((int(2e5), 40))
y = X[:, 0] + X[:, 1] + np.random.random((X.shape[0],))

# Define the number of splits.
n_splits = 5
kf = KFold(n_splits=n_splits)

# Define the parameter space.
parameters_RF = {
    "n_estimators": [50],
    "max_depth": [6, 9, 12],
    "min_samples_split": [2],
    "min_samples_leaf": [1, 5, 10],
}

default_param_dict = {
    "random_state": 1,
    "bootstrap": True,
    "max_features": "auto",
}

rf_params_list = [
    dict(zip(parameters_RF, param_values))
    for param_values in product(*parameters_RF.values())
]

### 1. Dask-ML GridSearchCV

This works, but only allocates one thread per **forest fit**, _not per tree_, making for very slow training when $n_{\text{fits}} \lt n_{\text{cores}}$.

Use this when $n_{\text{fits}} \gg n_{\text{cores}}$, where $n_{\text{fits}} = n_{\text{parameters}} \times n_{\text{splits}}$ and $n_{\text{cores}}$ is the total number of cores available for all Dask workers, or when individual estimator `fit()` calls are only single threaded (**unlike** `RandomForestRegressor.fit()`, which releases the GIL). In the latter case, it doesn't make a difference which method is chosen, since the parallism is inherently limited by the chosen estimator.

In [None]:
from dask_ml.model_selection import GridSearchCV

gs = GridSearchCV(
    RandomForestRegressor(**default_param_dict),
    parameters_RF,
    cv=n_splits,
    return_train_score=True,
    refit=False,
)
with Time("Dask-ML GridSearchCV"):
    gs = gs.fit(X, y)

### 2. Individual fits in series

Wait for each RF fit to complete (using the Dask backend) and score (using local threading backend, since `predict()` (used by `score()` requires 'sharedmem'!) before starting the next one.

In [None]:
def fit_and_score(X, y, train_index, test_index, rf_params):
    rf = RandomForestRegressor(**rf_params)
    with parallel_backend("dask"):
        rf.fit(X[train_index], y[train_index])

    with parallel_backend("threading", n_jobs=local_n_jobs):
        test_score = rf.score(X[test_index], y[test_index])
        train_score = rf.score(X[train_index], y[train_index])

    return test_score, train_score


rf_params = default_param_dict.copy()

test_scores_list = []
train_scores_list = []

with Time("In Series"):
    for rf_grid_params in tqdm(rf_params_list, desc="Params"):
        rf_params.update(rf_grid_params)
        test_scores = []
        train_scores = []
        for i, (train_index, test_index) in enumerate(list(kf.split(X))):
            test_score, train_score = fit_and_score(
                X, y, train_index, test_index, rf_params
            )
            test_scores.append(test_score)
            train_scores.append(train_score)

        test_scores_list.append(test_scores)
        train_scores_list.append(train_scores)

### 3. Modify `GridSearchCV` to fit one forest per worker.

Use this when $n_{\text{fits}} \gg n_{\text{workers}}$, where $n_{\text{fits}} = n_{\text{parameters}} \times n_{\text{splits}}$ and $n_{\text{workers}}$ is the number of Dask workers.
Individual estimator `fit()` and `score()` calls should be multithreaded.

This uses Dask `resources`. See here for further information regarding this use case:
 - https://github.com/dask/dask-jobqueue/issues/181
 - https://github.com/dask/dask-jobqueue/issues/231
 
Using this approach seems to prevent work stealing from working properly, resulting in _new_ workers not being allocated existing work:
 - https://github.com/dask/distributed/issues/1851

A workaround seems to be to wait for _all_ desired workers to be registered to the scheduler before starting to submit work.

In [None]:
from wildfires.dask_cx1 import DaskGridSearchCV

gs = DaskGridSearchCV(
    RandomForestRegressor(**default_param_dict),
    parameters_RF,
    cv=n_splits,
    return_train_score=True,
    refit=False,
    verbose=10,
)
with Time("Custom Single-Workers Dask GridsearchCV"):
    gs = gs.dask_fit(client, X, y)

### 4. Define our own RF implementation that submits individual trees as Dask tasks.

In [None]:
from wildfires.dask_cx1 import DaskRandomForestRegressor, fit_dask_rf_grid_search_cv

with Time("Custom Dask grid search"):
    results = fit_dask_rf_grid_search_cv(
        DaskRandomForestRegressor(**default_param_dict),
        X,
        y,
        n_splits,
        parameters_RF,
        client,
        verbose=True,
        return_train_score=True,
        refit=False,
        local_n_jobs=local_n_jobs,
    )

### 5. Native scikit-learn GridSearchCV (fails, e.g. with CancelledError)

It is apparent (prior to failing) that this does spread out the training of individual trees, which should have lead to expected speedups when $n_{\text{fits}} \lt n_{\text{workers}}$ (or about the same magnitude).

The CancelledError occurrence has already been reported:
 - https://github.com/scikit-learn/scikit-learn/issues/12315
 - https://github.com/scikit-learn/scikit-learn/issues/15383
 - https://github.com/joblib/joblib/issues/959
 - https://github.com/joblib/joblib/issues/1021

In [None]:
from sklearn.model_selection import GridSearchCV

gs = GridSearchCV(
    RandomForestRegressor(**default_param_dict),
    parameters_RF,
    cv=n_splits,
    return_train_score=True,
    refit=False,
)
with Time("Scikit-learn GridSearchCV with Dask"):
    with parallel_backend("dask"):
        gs = gs.fit(X, y)