# Introduction to Optuna + RAPIDS

Optuna is a lightweight framework for hyperparameter optimization. It provides a code-by-run method which makes it easy to adapt to any already existing code that we have. Just wrapping the objective function with Optuna can help perform a parallel-distributed HPO search over a search space.

We'll explore how to use Optuna with RAPIDS and run multi-GPU HPO runs. 

Notes - 

1. Using default SVM parameters ("Rbf" kernel) with full airline data results in `cudaErrorMemoryAllocation` - out of memory when `fit` is called.
2. Even with 1/10th the data linear kernel hangs for a long time (is this a possible bug or expected behavior?)


In [1]:
import random
import time
from contextlib import contextmanager

import cudf
import cuml
import dask.array as da
import mlflow
import numpy as np
import optuna
import pandas as pd
import sklearn
from cuml.dask.common import utils as dask_utils
from cuml.ensemble import RandomForestClassifier
from cuml.metrics import accuracy_score
from cuml.preprocessing.model_selection import train_test_split
from dask.distributed import Client, wait, performance_report
from joblib import parallel_backend
from sklearn.datasets import load_iris

from dask_cuda import LocalCUDACluster



In [2]:
# Helper function for timing blocks of code.
@contextmanager
def timed(name):
    t0 = time.time()
    yield
    t1 = time.time()
    print("..%-24s:  %8.4f" % (name, t1 - t0))

In [3]:
# This will use all GPUs on the local host by default
cluster = LocalCUDACluster(threads_per_worker=1, ip="", dashboard_address="8002")
c = Client(cluster)

# Query the client for all connected workers
workers = c.has_what().keys()
n_workers = len(workers)
n_streams = 8 # Performance optimization
c

0,1
Client  Scheduler: tcp://172.17.0.2:42429  Dashboard: http://172.17.0.2:8002/status,Cluster  Workers: 2  Cores: 2  Memory: 49.16 GB


## Loading the data

We'll load the airline data from the path specified by `INPUT_FILE`. The aim of the problem is to predict whether a plane will be delayed or not by the target variable `ArrDelayBinary`

In [4]:
N_TRIALS = 10

INPUT_FILE = "/home/hyperopt/hyperopt/data/air_par.parquet"
df = cudf.read_parquet(INPUT_FILE)
X, y = df.drop(["ArrDelayBinary"], axis=1), df["ArrDelayBinary"].astype('int32')

# Training and Evaluation

Here, we define `train_and_eval` function which simply fits a RandomForestClassifier (with`max_depth` and `n_estimators`) on the passed `X_param`, `y_param`. This function should look very similar for any ML workflow. We'll use this function within the Optuna `objective` function to show how easily we can fit an existing workflow into the Optuna work.

In [5]:
def train_and_eval(X_param, y_param, max_depth, n_estimators):

    classifier = RandomForestClassifier(max_depth=max_depth,
                         n_estimators=n_estimators)


    X_train, X_valid, y_train, y_valid = train_test_split(X_param, y_param, random_state=77)

    classifier.fit(X_train, y_train)
    y_pred = classifier.predict(X_valid)
    score = accuracy_score(y_valid, y_pred)
    return score

For a baseline number, let's see what the default performance of RFC is. Note the defauly values for `max_depth` = 16 and `n_estimators` = 100; we pass these to the `train_and_eval` function.

In [8]:
print("Score with default parameters : ",train_and_eval(X, y, max_depth=16, n_estimators=100))

Score with default parameters :  0.837736189365387


## Objective Function

The objective function will be the one we optimize in Optuna studys. Objective funciton tries out different values for the parameters that we are tuning and saving the results in `study.trials_dataframes()`. 

Let's define the objective function for this HPO task by making use of the `train_and_eval()`. You can see that we simply choose a value for the parameters and call the `train_and_eval` method, making Optuna very easy to use in an existing workflow.

The objective remains constant over different samplers, which are built-in options in Optuna to enable the selection of different sampling algorithms that optuna provides. Some of the available ones include - GridSampler, RandomSampler, TPESampler, etc. We'll try out different samplers and compare their performances

In [9]:
def objective(trial, X_param, y_param):
    max_depth = trial.suggest_int("max_depth", 10, 15)
    n_estimators = trial.suggest_int("n_estimators", 150, 750)
    score = train_and_eval(X_param, y_param, max_depth=max_depth, n_estimators=n_estimators)
    return score

## HPO Trials and Study

Optuna uses [study](https://optuna.readthedocs.io/en/stable/reference/study.html) and [trials](https://optuna.readthedocs.io/en/stable/reference/trial.html) to keep track of the HPO experiments. 

We'll make use of a helper function `run_study` to help us run one multi-GPU study with a dask backend.

In [15]:
def run_study(sampler=optuna.samplers.TPESampler(), study_name="Optuna-MultiGPU", callbacks=None):
    
    with timed(study_name):
        study = optuna.create_study(sampler=sampler,
                                    study_name=study_name,
                                    storage="sqlite:///_"+study_name+".db",
                                    direction="maximize",
                                    load_if_exists=True)
        
        with parallel_backend("dask", n_jobs=n_workers, client=c, scatter=[X,y]):
            study.optimize(lambda trial: objective(trial, X, y), n_trials=N_TRIALS, n_jobs=n_workers,
                          callbacks=callbacks)
    print("Number of finished trials: ", len(study.trials))
    print("Best trial:")
    trial = study.best_trial
    print("  Value: ", trial.value)
    print("  Params: ")
    for key, value in trial.params.items():
        print("    {}: {}".format(key, value))
    return study

In [11]:
with performance_report(filename="dask-report.html"):
    study_tpe = run_study(optuna.samplers.TPESampler(),study_name="Optuna-MultiGPU-TPE")

[I 2020-07-02 01:54:19,555] A new study created with name: Optuna-MultiGPU-TPE


..multi-gpu               :  206.6237
Number of finished trials:  10
Best trial:
  Value:  0.8342416286468506
  Params: 
    max_depth: 10
    n_estimators: 605


In [13]:
with timed("no-dask"):
    study = optuna.create_study(sampler=optuna.samplers.TPESampler(),
                                study_name="Optuna-TPE-w",
                                storage="sqlite:///optuna_simple.db",
                                direction="maximize",
                                load_if_exists=True)
    with parallel_backend("loky", n_jobs=n_workers):
        study.optimize(lambda trial: objective(trial, X, y), n_trials=N_TRIALS, n_jobs=n_workers)

[I 2020-07-02 01:58:00,980] Using an existing study with name 'Optuna-TPE-w' instead of creating a new one.


..no-dask                 :  338.0006


In [14]:
with timed("no-dask-no-joblib"):
    study = optuna.create_study(sampler=optuna.samplers.TPESampler(),
                                study_name="Optuna-TPE",
                                storage="sqlite:///optuna_no_joblib.db",
                                direction="maximize",
                                load_if_exists=True)
    study.optimize(lambda trial: objective(trial, X, y), n_trials=N_TRIALS)

[I 2020-07-02 02:03:39,133] A new study created with name: Optuna-TPE
[I 2020-07-02 02:03:53,735] Finished trial#0 with value: 0.8308449983596802 with parameters: {'max_depth': 12, 'n_estimators': 234}. Best is trial#0 with value: 0.8308449983596802.
[I 2020-07-02 02:04:12,032] Finished trial#1 with value: 0.8341159820556641 with parameters: {'max_depth': 10, 'n_estimators': 388}. Best is trial#1 with value: 0.8341159820556641.
[I 2020-07-02 02:04:22,824] Finished trial#2 with value: 0.8310717940330505 with parameters: {'max_depth': 11, 'n_estimators': 192}. Best is trial#1 with value: 0.8341159820556641.
[I 2020-07-02 02:04:37,358] Finished trial#3 with value: 0.8308327794075012 with parameters: {'max_depth': 12, 'n_estimators': 235}. Best is trial#1 with value: 0.8341159820556641.
[I 2020-07-02 02:05:06,128] Finished trial#4 with value: 0.8307821750640869 with parameters: {'max_depth': 13, 'n_estimators': 409}. Best is trial#1 with value: 0.8341159820556641.
[I 2020-07-02 02:05:35,38

..no-dask-no-joblib       :  312.3613


In [None]:
# study_cmae = run_study(optuna.samplers.CmaEsSampler(), study_name="Optuna-MultiGPU-CMAE")

# Sequential calls without Optuna

For a comparison let's try sequential calls without Optuna and it's parallel-processing support. We can cleared see that it takes more time to do this. We'll pick the same parameters as Optuna for a fair comparison - these parameters were selected by the sampling algorithm used by Optuna and is available in the `study.trials_dataframe()` for us to pick out.

In [16]:
df = study_tpe.trials_dataframe()
params_max_depth, params_n_estimators = df['params_max_depth'], df['params_n_estimators']

### Sequential call function 

For a cleaner look, let's use a function to perform sequential calls. The function basically sets the parameters to what was passed and trains and evaluates the model and returns the details of the run which can later be used to find the best performing model.

In [18]:
def seq_call(X, y, max_depth, n_estimators):
    
    score = train_and_eval(X, y, max_depth=max_depth, n_estimators = n_estimators)
    
    return score, max_depth, n_estimators

In [19]:
from joblib import Parallel, delayed
with timed("no-optuna-call"):
    with parallel_backend("dask", n_jobs=n_workers, client=c, scatter=[X,y]):
        results = Parallel()(delayed(seq_call)(X, y, max_depth=params_max_depth[i],
                     n_estimators=params_n_estimators[i]) for i in range(N_TRIALS))
    print(results)

[(0.8308145999908447, 14, 523), (0.830810010433197, 14, 292), (0.8342475891113281, 10, 603), (0.8342695832252502, 10, 605), (0.8308290243148804, 14, 709), (0.830955982208252, 11, 268), (0.8307737708091736, 13, 540), (0.830833375453949, 14, 300), (0.8308601975440979, 12, 543), (0.8307647705078125, 13, 324)]
..no-optuna-call          :  244.6982


Note: Running this without a dask backend is actually faster - takes about 65 seconds to finish by just making N_TRIALS sequential calls. Dask backend makes most sense when used with multi-GPU estimators as we see later in the notebook.

In [21]:
with timed("no-optuna-no-dask"):
    for i in range(N_TRIALS):
        results = seq_call(X, y, max_depth=params_max_depth[i],
                     n_estimators=params_n_estimators[i])
    print(results)

(0.8307656049728394, 13, 324)
..no-optuna-no-dask       :  309.5257


# MLflow callback

Optuna supports the integration of various libraries. One of them is a tracking library MLflow, this is used to keep track of the different Hyperopt runs. We can simply add it by adding a callback to a study as shown. 

In [22]:
def mlflow_callback(study, trial):
    trial_value = trial.value if trial.value is not None else float("nan")
    with mlflow.start_run(run_name=study.study_name):
        print(trial.params)
        mlflow.set_tracking_uri("http://127.0.0.1:5000")
        mlflow.log_params(trial.params)
        mlflow.log_metrics({"accuracy": trial_value})

In [25]:
# study = run_study(optuna.samplers.TPESampler(),study_name="Optuna-MultiGPU-MLflow", callbacks=[mlflow_callback])

# Multi-GPU estimators

We also have estimators that can run on multiple GPUs. `cuml.dask` has a set of multi-GPU estimators that can run incredibly fast. Let's try that out. In order to do this, we need to used `dask_cudf` dataframes and we will redefine the objective function from earlier to do just that. 

`objective_mg` converts our split data into dask_cudf dataframes and persists them across all available dask workers. By doing this, we can now run the multi-GPU RandomForestClassifier. Notice that we import the `cuml.dask.ensemble.RandomForestClassifier` for this cell.

In [28]:
from cuml.dask.ensemble import RandomForestClassifier as dask_RF

def objective_mg(trial):
    max_depth = trial.suggest_int("max_depth", 10, 15)
    n_estimators = trial.suggest_int("n_estimators", 150, 750)

    import dask_cudf 
    
    classifier = dask_RF(max_depth=max_depth,
                         n_estimators=n_estimators)

    X_train, X_valid, y_train, y_valid = train_test_split(X, y)

    X_train_dask = dask_cudf.from_cudf(X_train, npartitions=2)
    X_valid_dask = dask_cudf.from_cudf(X_valid, npartitions=2)
    
    y_train_dask = dask_cudf.from_cudf(y_train, npartitions=2)
    y_valid_dask = dask_cudf.from_cudf(y_valid, npartitions=2)
    
    X_train_dask, X_valid_dask, y_train_dask, y_valid_dask = dask_utils.persist_across_workers(c,
                                                                                               [X_train_dask, 
                                                                                                X_valid_dask,
                                                                                                y_train_dask,
                                                                                                y_valid_dask],
                                                                                               workers=workers)
    
    classifier.fit(X_train_dask, y_train_dask)
    y_pred = classifier.predict(X_valid_dask)
    score = accuracy_score(y_valid, y_pred.compute())
    return score


In [29]:
with timed("multi-GPU-estimators"):
    with performance_report(filename="dask-report-mnmg.html"):
        study = optuna.create_study(sampler= optuna.samplers.TPESampler(),
                                    study_name="Multi-GPU",
                                    direction="maximize",
                                    storage="sqlite:///mnmg.db")
        study.optimize(objective_mg, n_trials=N_TRIALS)

[I 2020-07-02 02:22:40,812] A new study created with name: Multi-GPU
[I 2020-07-02 02:22:49,475] Finished trial#0 with value: 0.8308113813400269 with parameters: {'max_depth': 11, 'n_estimators': 389}. Best is trial#0 with value: 0.8308113813400269.
[I 2020-07-02 02:23:57,777] Finished trial#1 with value: 0.8306707739830017 with parameters: {'max_depth': 15, 'n_estimators': 644}. Best is trial#0 with value: 0.8308113813400269.
[I 2020-07-02 02:24:20,013] Finished trial#2 with value: 0.8308956027030945 with parameters: {'max_depth': 15, 'n_estimators': 218}. Best is trial#2 with value: 0.8308956027030945.
[I 2020-07-02 02:24:53,807] Finished trial#3 with value: 0.8309198021888733 with parameters: {'max_depth': 14, 'n_estimators': 689}. Best is trial#3 with value: 0.8309198021888733.
[I 2020-07-02 02:25:14,744] Finished trial#4 with value: 0.8307151794433594 with parameters: {'max_depth': 15, 'n_estimators': 209}. Best is trial#3 with value: 0.8309198021888733.
[I 2020-07-02 02:25:30,107

..multi-GPU-estimators    :  256.6885


In [None]:
print_results(study)

## Summarizing the timing results

| Study name | Runtime |   
|---|---|
| Optuna-Multi-GPU-TPE | 91.3055 |
| Optuna-Multi-GPU-CMAE | 88.3086 |
| No-Optuna-Call | 89.2947 |
| Optuna-MLflow-callback | 88.1154 |
| Multi-GPU-Estimator | 35.9608 |

We noteice that with 2 GPUS, we were able to run the multi-GPU estimator more than twice as fast as the other options

In [None]:
pool.close()

In [None]:
# # CPU with 750 estimators max does not finish running after hours.
# def objective_cpu(trial):
    
#     max_depth = trial.suggest_int("max_depth", 5, 15)
#     n_estimators = trial.suggest_int("n_estimators", 100, 750)

#     classifier = sklearn.ensemble.RandomForestRegressor(max_depth=max_depth,
#                                        n_estimators=n_estimators)

#     X_train, X_valid, y_train, y_valid = sklearn.model_selection.train_test_split(X_, y_)
    
#     classifier.fit(X_train, y_train)
#     y_pred = classifier.predict(X_valid)
    
#     score = accuracy_score(y_valid, y_pred)
#     return score

In [None]:
# with timed("cpu-etl"):
#     df_pd = pd.read_parquet(INPUT_FILE)
#     X_, y_ = df_pd.drop(["ArrDelayBinary"], axis=1), df_pd["ArrDelayBinary"].astype('int32')
    
# with timed("cpu-hpo"):
#     study = optuna.create_study(direction="maximize") # Equivalent to an experiment, a set of trials
#     study.optimize(objective_cpu, n_trials=N_TRIALS, n_jobs=-1)

In [None]:
# def download_higgs(compressed_filepath, decompressed_filepath):
#     higgs_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz'
#     if not os.path.isfile(compressed_filepath):
#         urlretrieve(higgs_url, compressed_filepath)
#     if not os.path.isfile(decompressed_filepath):
#         cf = gzip.GzipFile(compressed_filepath)
#         with open(decompressed_filepath, 'wb') as df:
#             df.write(cf.read())
# import os
# from urllib.request import urlretrieve
# import gzip

# data_dir = '/home/hyperopt/data/'
# if not os.path.exists(data_dir):
#     print('creating data directory')
#     os.system('mkdir /home/data/')
    
# compressed_filepath = os.path.join(data_dir, 'HIGGS.csv.gz') # Set this as path for gzipped Higgs data file, if you already have
# decompressed_filepath = os.path.join(data_dir, 'HIGGS.csv') # Set this as path for decompressed Higgs data file, if you already have

# # Uncomment this line to download the dataset.
# # download_higgs(compressed_filepath, decompressed_filepath)

# col_names = ['label'] + ["col-{}".format(i) for i in range(2, 30)] # Assign column names
# dtypes_ls = ['int32'] + ['float32' for _ in range(2, 30)] # Assign dtypes to each column
# input_data = cudf.read_csv(decompressed_filepath, names=col_names, dtype=dtypes_ls)

# labels = input_data.label.reset_index().drop(['index'], axis=1)
# for col in labels.columns:
#     labels[col] = labels[col].astype('float32')
# data = input_data.drop(['label'], axis=1)

# N_ROWS = int(len(data) * data_fraction)

# X = data[:N_ROWS]
# y = labels[:N_ROWS]