# Hyperparameter Tuning with PyTorch & RayTune

This notebook will walk you through the basics of using [RayTune](https://ray.readthedocs.io/en/latest/tune.html). We'll do so with a PyTorch model in this example.

We'll follow a simple process:
1. We'll first create a model and train it, just like we might on a single node.
2. We'll then make the slight modifications to turn it into a distributed hyperparameter search.
3. We'll then run it on RayTune and see the results.


Let's go ahead and get started, first we're going start off with our core imports. We'll be training on the MNIST dataset with a ConvNet model.

In [None]:
import os 

import ray

from torchvision import datasets, transforms

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

from filelock import FileLock

We'll set our global variables for epochs and test size.

In [None]:
EPOCH_SIZE = 512
TEST_SIZE = 256

## Single Node PyTorch Hyperparameter Tuning

Our example will follow nearly the exact same code that you can find in the [PyTorch MNIST example here](https://github.com/pytorch/examples/blob/master/mnist/main.py).

You'll see that we create an even simpler model than in that example, however you can use that one if you wish to try and make some better predictions.

In [None]:
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)

After creating that network, we can now create our data loaders for training and test data. These are just plain [PyTorch dataloaders](https://pytorch.org/docs/1.1.0/data.html?highlight=dataloader#torch.utils.data.DataLoader) except that we've added a `FileLock` to ensure that only one process downloads the data on each machine (if we have multiple workers / machine on our Ray cluster).

Other than that, there's nothing that's changed from the [PyTorch example version](https://github.com/pytorch/examples/blob/master/mnist/main.py#L101).

In [None]:
def get_data_loaders():
    mnist_transforms = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.1307, ), (0.3081, ))])

    # We add FileLock here because multiple workers will want to
    # download data, and this may cause overwrites since
    # DataLoader is not threadsafe.
    # This is only relevant in the distributed 
    with FileLock(os.path.expanduser("~/data.lock")):
        train_loader = torch.utils.data.DataLoader(
            datasets.MNIST(
                "/tmp/data",
                train=True,
                download=True,
                transform=mnist_transforms),
            batch_size=64,
            shuffle=True)

        test_loader = torch.utils.data.DataLoader(
            datasets.MNIST("/tmp/data", train=False, transform=mnist_transforms),
            batch_size=64,
            shuffle=True)
    return train_loader, test_loader

We defined how we're going to download / load the data [and preprocess it]. Now it's time to define our training and test functions. While the arguments are a bit switched up from the PyTorch tutorial we've referenced, the difference is inconsequential. We're going to take an optimizer, a model, the train loader, specify our device and then train the model.

In [None]:
def train(model, optimizer, train_loader, device=torch.device("cpu")):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        if batch_idx * len(data) > EPOCH_SIZE:
            return
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

It's the same story for our test model. We've defined some basic `average correct prediction` metric that we'll be tracking here. We could add / calculate more as well - we're just keeping it simple.

In [None]:
def test(model, data_loader, device=torch.device("cpu")):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(data_loader):
            if batch_idx * len(data) > TEST_SIZE:
                break
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    return correct / total

Lastly, we'll create a wrapper function for this particular model. In doing so all we need to do is specify the configuration for the model that we would like to train and the function gets the data, creates the model, and optimizes it accordingly.

In [None]:
def train_mnist(config):
    train_loader, test_loader = get_data_loaders()
    model = ConvNet()
    optimizer = optim.SGD(model.parameters(), lr=config["lr"], momentum=config['momentum'])
    for i in range(10):
        train(model, optimizer, train_loader)
        acc = test(model, test_loader)
        print(acc)

### Single-Node Hyperparameter Tuning

Now, let's show what we might have to do if we were going to perform hyperparameter tuning on a single machine. We would have to enumerate all the possibilities and either train them serially or use something like multiprocessing to train them in parallel. That setup takes a little bit of work so often times people opt to train them serially and just wait for it to take a long time.

This is what that might end up looking like.

In [None]:
import itertools
conf = {
    "lr": [0.001, 0.01, 0.1],
    "momentum": [0.001, 0.01, 0.1, 0.9]
}

combinations = list(itertools.product(*conf.values()))
print(len(combinations))

In [None]:
for lr, momentum in combinations:
    train_mnist({"lr":lr, "momentum":momentum})
    break # we'll stop this after one run and just use it for illustrative purposes

### RayTune: Distributed Hyperparameter Tuning

Now, we've seen how you might approach the problem in a single node world. With RayTune, it becomes trivial to move your code from a single node to multiple nodes. Let's take a look at the changes that we're going to need to do achieve that.

First, let's import Ray and initialize our Ray application on the cluster.

In [None]:
import ray

ray.shutdown()
# ray.init(address='auto')
from ray import tune

The first minor change is that we'll specify that we want to perform a strict `grid_search` on our hyperparameters.

In [None]:
conf = {
    "lr": tune.grid_search([0.001, 0.01, 0.1]),
    "momentum": tune.grid_search([0.001, 0.01, 0.1, 0.9])
}

Now let's take our simple training function and add a single line: `tune.track.log(mean_accuracy=acc)`.

That's all that we need to change in order for RayTune to be able to parallelize our different hyperparameter combinations. When we're executing a hyperparameter sweep, we're executing an **experiment**. Each distinct combination of our different hyperparameters is a single **trials**.

In the following example, we're using the **functional API**, this makes it easy to get something up and running but does provide overall less control than the **class API** [`tune.Trainable`].

In [None]:
def train_mnist(config):
    train_loader, test_loader = get_data_loaders()
    model = ConvNet()
    optimizer = optim.SGD(model.parameters(), lr=config["lr"])
    for i in range(10):
        train(model, optimizer, train_loader)
        acc = test(model, test_loader)
        tune.track.log(mean_accuracy=acc)

Here's an example of the **class API**. Note that `_setup` is called **once per trial**. While the number of times `_train` is called is determined by the parameter that we pass to the `tune.run` call in the cell now. `stop={"training_iteration": 10}`.

In [None]:
class TrainMNIST(tune.Trainable):
    def _setup(self, config):
        self.config = config
        self.train_loader, self.test_loader = get_data_loaders()
        self.model = ConvNet()
        self.optimizer = optim.SGD(self.model.parameters(), lr=self.config["lr"])
    
    def _train(self):
        train(self.model, self.optimizer, self.train_loader)
        acc = test(self.model, self.test_loader)
        return {"mean_accuracy": acc}

In [None]:
analysis = tune.run(TrainMNIST, config=conf, stop={"training_iteration": 10})
# # to run using the functional API, run the following
# analysis = tune.run(train_mnist, config=conf)

In [None]:
print("Best config: ", analysis.get_best_config(metric="mean_accuracy"))

In [None]:
# Get a dataframe for analyzing trial results.
df = analysis.dataframe()

In [None]:
df.sort_values('mean_accuracy', ascending=False).head()

In [None]:
analysis = tune.run(train_mnist, config=conf)

In [None]:
# Get a dataframe for analyzing trial results.
df = analysis.dataframe()

In [None]:
df.sort_values('mean_accuracy', ascending=False).head()

# Conclusion

In this example we learned about how to perform distributed hyperparameter tuning with RayTune. We took a sweep that we had to run locally and ran it in a distributed fashion with basically zero code changes. We learned about the different `tunable` types and how to manipulate them. See [the documentation for more information](https://ray.readthedocs.io/en/latest/tune.html).