# Distributed Hyperparameter Tuning using Ray

This tutorial shows and example of using Ray Tune to perform a simple training task.  

The task consists of training a CNN model on the MNIST dataset using PyTorch. As part of this hands-on we will integrate Ray Tune into a PyTorch training workflow, which is fairly straightforward. All we need to do besides building the model using standard PyTorch is to:

1. wrap data loading and training in functions

2. make some network parameters configurable

3. add checkpointing (optional and not covered here)

4. and define the search space for the model tuning

See references for more information.

#### References:

Ray RLLib Documentation: [Ray RLLib Documentation](https://docs.ray.io/en/master/rllib.html)

Ray Tune Documentation: [Ray Tune Documentation](https://docs.ray.io/en/master/tune/index.html)

*This tutorial is adapted from the documentation for Ray version 1.12

In [None]:
import os
import ray
import torch

import numpy as np
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from ray.tune.schedulers import ASHAScheduler
from ray import tune

Let's begin by using the standard code for connecting to Ray (identical to what we did in Ray-Lab00)

In [None]:
if ray.is_initialized() == False:
    print("Connecting to Ray cluster...")
    service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
    service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
    ray.init(f"ray://{service_host}:{service_port}")

In [None]:
ray.is_initialized()

Next, we define a very basic CNN with a one convolutional, one pooling, and one fully connected layers.

In [None]:
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)


Here things get a bit tricky. Because Ray uses a distributed architecture we can no longer store the training data in our local file system (i.e. in the workspace we are currently using). All Ray cluster nodes must have access to the data in order for Ray to be able to parallelize the execution. 

Luckily, Domino provides [Domion Datasets](https://docs.dominodatalab.com/en/latest/user_guide/0a8d11/domino-datasets/) - a high-performance, versioned, and structured filesystem. This filesystem is a collection of files that are available in user executions (e.g. workspaces and jobs) as a filesystem directory. Every time we start a workspace (with or without an attached cluster) Domino instantiates a local Dataset for the execution, which is available in */domino/datasets/local/\<name_of_your_project\>*. 

Because the name of your project forms part of the absolute dataset path, we can't hard-code it in the notebook. Instead, we'll use an environment variable containing your project name, which Domino conveniently sets for us in each execution.

In [None]:
domino_project = os.environ['DOMINO_PROJECT_NAME']
data_path = os.path.join("/domino/datasets/local", os.environ['DOMINO_PROJECT_NAME'])
print(data_path)

### Task 1

* Inspect the contents of *data_path*. You can open a new terminal by selecting File -> New -> Terminal in JupyterLab.

Now let's get MNIST into the shared filesystem.

In [None]:
# Download the dataset first
datasets.MNIST(data_path, train=True, download=True)

* Inspect *data_path* again. You should now have an MNIST directory created with the raw MNIST subsets (training, test) present.

Next, we set up our training loop. This is implemented in the *train_mist* function.

Notice that *train_mnist* receives a config dictionary with the training hyperparameters. This is identical to how we were performing the training in HyperOpt. Inspect the code below and make sure you understand what the training loop is doing.

In [None]:
def train_mnist(config):
    
    # Load the data into a PyTorch tensor and normalise it
    mnist_transforms = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.1307, ), (0.3081, ))])
    
    # Load the training set
    train_loader = DataLoader(
        datasets.MNIST(data_path, train=True, download=True, transform=mnist_transforms),
        batch_size=64,
        shuffle=True)
    
    # Load the test set
    test_loader = DataLoader(
        datasets.MNIST(data_path, train=False, transform=mnist_transforms),
        batch_size=64,
        shuffle=True)

    # GPUs available?
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Create an instance of the CNN model
    model = ConvNet()
    model.to(device)

    # We use a fixed optim iser - SGD
    # Notice that we get the learning rate and momentum from the config variable
    optimizer = optim.SGD(
        model.parameters(), lr=config["lr"], momentum=config["momentum"])
    
    # Run 10 training iterations
    for i in range(10):
        train(model, optimizer, train_loader)
        acc = test(model, test_loader)

        # Send the current training result back to Tune
        tune.report(mean_accuracy=acc)

        if i % 5 == 0:
            # This saves the model to the trial directory
            torch.save(model.state_dict(), "./model.pth")


You noticed that the training loop calls two helper functions - *train* and *test* to update the model parameters and to report accuracy back to Tune. Here is their implementation.

Notice, that we are again using a relatively small sample of the MNIST data, but feel free to tweak this if you'd like your training to run quicker or slower.

In [None]:
# Change these values if you want the training to run quicker or slower.
EPOCH_SIZE = 1024
TEST_SIZE = 256

def train(model, optimizer, train_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # We set this just for the example to run quickly.
        if batch_idx * len(data) > EPOCH_SIZE:
            return
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

def test(model, data_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(data_loader):
            # We set this just for the example to run quickly.
            if batch_idx * len(data) > TEST_SIZE:
                break
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    return correct / total

### Task 2

Time to set up our search space.

Tune provides a number of functions for sampling ranges in the search space. Some of the widely used sampling routines are:

* *tune.uniform(a,b)* - Uniformly sample a float between a and b
* *tune.randn(mu, std)* - Sample from a Gaussian defined by mean and standard deviation
* *tune.sample_from(lambda x)* - Samples from a custom defined random function
* *tune.grid_search([a,b,c])* - Performs a grid search over [a,b,c]
* *tune.randint(a,b)* - Samples a random integer from [a,b]
* *tune.choice([a,b,c])* - Sample uniformly from the list of values [a,b,c]

For a comprehensive description of the random distributions API check [Ray Tune](https://docs.ray.io/en/releases-1.12.0/tune/api_docs/search_space.html)'s documentation.

Now let's define a *search_space* dictionary with two hyperparameters --- learning rate and momentum.

In [None]:
search_space = {
    "lr": tune.uniform(0.001, 0.1),
    "momentum": tune.choice([0.1, 0.2, 0.5, 0.8, 0.9])
}

Next, we can run a single experiment to validate that our training loop/functions and hyperparameter space are set correctly.

Ray Tune provides a function *tune.run* that executes experiments. Some of the key arguments of the function are:

* *run_or_experiment* - training loop function (*train_mnist* in our case)
* *config* - hyperparameter space. a sample of this dictionary will be passed to the training loop with each iteration
* *resources_per_trial* - machine resources to allocate per trial, e.g. {"cpu": 64, "gpu": 8}
* *checkpoint_freq* - how many training iterations between checkpoints
* *scheduler* - TrialScheduler object for executing the experiment. For example, FIFO (default), MedianStopping, AsyncHyperBand, HyperBand etc.
* *num_samples* – Number of times to sample from the hyperparameter space. 

*tune.run* returns an *ExperimentAnalysis* instance for analyzing the results from a Tune experiment.

Make a call to *tune.run* and execute 1 experiment using the current setup. Capture the results in a variable caled *analysis*.

In [None]:
analysis = tune.run(run_or_experiment = train_mnist, config = search_space, num_samples = 1)

Let's look at the change in accuracy as traing progresses. We can get this information from the *analysis* object. We'll implement the plotting functionallity in a function as we'll need it later.

In [None]:
def plot_results(analysis, figsize=(8,6)):

    plt.figure(figsize=figsize)

    dfs = analysis.trial_dataframes

    # Plot by epoch
    ax = None  # This plots everything on the same plot

    trial_ids = analysis.results.keys()
    for l, d in zip(trial_ids, dfs.values()):
        ax = d.mean_accuracy.plot(ax=ax, legend=True, label=l)
        
plot_results(analysis)

The legend in the plot above provides the experiment id. We can see how the accuracy changes over the 10 epochs (remember the *i* loop in *train_mnist*)

Now let's call *tune.run* again, but this time run 10 experiments. Note that Ray Tune first generates the various lr/momentum combinations that it will try, and then it executes the experiments in pairs (because Ray is currently limited to 2 single-CPU nodes).

In [None]:
analysis = tune.run(run_or_experiment = train_mnist, config = search_space, num_samples = 10)

Let's plot the results.

In [None]:
plot_results(analysis)

*ExperimentAnalysis* provides various methods for processing the experiment data. For example, *get_best_config* can show the configuration that provided the best result according to a specified criterion. In our case we care about the maximal accuracy so the call should look like this:

In [None]:
analysis.get_best_config("mean_accuracy", "max")

We could also pull the results data into a data frame for further analysis:

In [None]:
dfs = analysis.trial_dataframes

And also look at detailed "per epoch" data for individual experiments.

In [None]:
list(dfs.values())[0]

## Early stopping

One issue with search that we've just conducted is that it keeps indiscriminately training all possible hyperparameter combinations for the fixed number of training iterations (epochs). This may not be the most optimal strategy, especially if we can identify that certain combinations are less promising. One obvious idea is to introduce some kind of an early stopping criterion, which terminates hyperparameter combinations that perform badly and focuses on more promising experiments. Ray Tune provides such functionallity in its ASHA (Asynchronous Successive HAlving) scheduler.

The intuition behind the successive halving algorithm is the following. 

* We begin with all candidate configurations in a base rung.
* Uniformly allocate a budget to a set of candidate hyperparameter configurations in a given rung.
* Evaluate the performance of all candidate configurations.
* Promote the top half of candidate configurations to the next rung.
* Double the budget per configuration for the next rung and repeat until one configurations remains.  

For more details on how ASHA works, please see [Massively Parallel Hyperparameter Optimization](https://blog.ml.cmu.edu/2018/12/12/massively-parallel-hyperparameter-optimization/)

Let's see how we can do the hyperparameter space exploration using ASHA. First, we create the scheduler and set the objective.

In [None]:
asha = ASHAScheduler(metric="mean_accuracy", mode="max")

Now let's call *tune.run*. We can also crank up the number of samples to 20 as we'll be abandonig some of them anyway.
Add the relevant values to the *tune.run* call and keep an eye on the *iter* column in the trials table that Ray Tune periodically prints.

In [None]:
analysis = tune.run(
    run_or_experiment = train_mnist, 
    config = search_space, 
    num_samples = 20,
    scheduler = asha
)

Despite doubling the number of experiment, you see that the total execution time only increased by about 20%.

Now let's see the changes in accuracy.

In [None]:
plot_results(analysis, figsize=(10,8))

You see that the majority of the trials were abandoned, with only a handful considered promising enough to continue training. Let's look at the best performing configuration. Use the *get_best_config* method on *analysis* to see this information. Does this match the result in the final experiments table above?

In [None]:
analysis.get_best_config("mean_accuracy", "max")

## HyperOpt

So far we've looked at how Ray Tune can use various scheduling strategies to optimise the hyperparameter search. The flexibility of Tune, however, doesn't end there. It also allows us to plug external frameworks to perform the **selection** of hyperparameters for the individual experiments. As HyperOpt is one of the external frameworks supported by Tune, we could use some Bayesian reasoning in exploring the space. In this way we can incorporate previous knowledge in the process and also actively explore more promising regions of space.

In addition, Ray Tune also supports the following other search algorithms:

* [Ax](https://ax.dev/)
* [Bayesian Optimization](https://github.com/fmfn/BayesianOptimization)
* [HpBandSter](https://github.com/automl/HpBandSter)
* [FAML](https://github.com/microsoft/FLAML)
* [scikit-optimize](https://scikit-optimize.github.io/stable/)
* ...
* and many more

Let's see the HyperOpt/Ray Tune integration in action. First we need some extra libraries (HyperOpt included).

In [None]:
from hyperopt import hp
from ray.tune.suggest.hyperopt import HyperOptSearch

Define the search space (note that we are using *hp.* functions for the hyperparameter sampling).

In [None]:
space = {
    "lr": hp.uniform("learning_rate", 0.0001, 0.1),
    "momentum": hp.uniform("momentum", 0.1, 0.9)
}

Now let's wrap hyperopt in the HyperOptSearch object provided by tune. The arguments are pretty self-explanatory.

In [None]:
hyperopt_search = HyperOptSearch(space, metric="mean_accuracy", mode="max", random_state_seed=1234)

The only argument we need to provide this time is the training loop and *search_alg*. There is no need to specify the search space as this has already been provided to the *HyperOptSerach* object and is handled externally. 

Now set the search algorithm in the *tune.run* call and run the search.

**Optional**: If you are feeling adventurous switch to the ASHA scheduler and increase the number of experiments to 20. Do you observe a difference in the behaviour of Ray Tune when scheduling experiments? How does it differ from the previous ASHA run? Why does it behave this way?

In [None]:
analysis = tune.run(
    run_or_experiment = train_mnist, 
    search_alg = hyperopt_search,
    num_samples = 20,
    scheduler=asha
)

Now let's see the accuracy results.

In [None]:
plot_results(analysis, figsize=(10,8))