# Distributed training

First, set up your dataset and model.



In [1]:
import torch
import torch.nn as nn

num_samples = 20
input_size = 10
layer_size = 15
output_size = 5

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

# In this example we use a randomly generated dataset.
input = torch.randn(num_samples, input_size)
labels = torch.randn(num_samples, output_size)


Now define your single-worker PyTorch training function.



In [2]:

import torch.optim as optim

def train_func():
    num_epochs = 3
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    for epoch in range(num_epochs):
        output = model(input)
        loss = loss_fn(output, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")


This training function can be executed with:

```python
train_func()
```

Now let’s convert this to a distributed multi-worker training function!

All you have to do is use the `ray.train.torch.prepare_model` and `ray.train.torch.prepare_data_loader` utility functions to easily setup your model & data for distributed training. This will automatically wrap your model with `DistributedDataParallel` and place it on the right device, and add `DistributedSampler` to your DataLoaders.

In [34]:

from ray import train
import ray.train.torch

def train_func_distributed():
    num_epochs = 5
    model = NeuralNetwork()
    model = train.torch.prepare_model(model)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    for epoch in range(num_epochs):
        output = model(input)
        loss = loss_fn(output, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")
        train.report(loss=loss.item())


Let's set up a remote cluster we use for the distributed training

```bash
~/kubectl apply -f ray-cluster.yaml
```

This is going to start a cluster with 2 workers and support auto-scaling.

To verify the cluster is acutally running

In [6]:
!~/kubectl get rayclusters

NAME              STATUS    RESTARTS   AGE
example-cluster   Running   0          30m


Then let's connect to the cluster and also set up the worker runtime environment

In [7]:
url = "ray://example-cluster-ray-head:10001"

env = {
    "pip": "requirements-env.txt"
}

context = ray.init(url, runtime_env=env)

Then, instantiate a `Trainer` that uses a "torch" backend with 2 workers, and use it to run the new training function!



In [35]:
from ray.train.callbacks import PrintCallback, TBXLoggerCallback
from ray.train import Trainer

trainer = Trainer(backend="torch", num_workers=3)

# For GPU Training, set `use_gpu` to True.
# trainer = Trainer(backend="torch", num_workers=4, use_gpu=True)

trainer.start()
results = trainer.run(
    train_func_distributed,
    callbacks=[PrintCallback(), TBXLoggerCallback()]
)
trainer.shutdown()


2022-07-06 18:09:58,847	INFO trainer.py:243 -- Trainer logs will be logged in: /home/jovyan/ray_results/train_2022-07-06_18-09-58
[2m[36m(BaseWorkerMixin pid=802, ip=10.10.125.125)[0m 2022-07-06 16:10:05,717	INFO torch.py:347 -- Setting up process group for: env:// [rank=1, world_size=3]
[2m[36m(BaseWorkerMixin pid=803, ip=10.10.125.125)[0m 2022-07-06 16:10:05,719	INFO torch.py:347 -- Setting up process group for: env:// [rank=2, world_size=3]
2022-07-06 18:10:05,751	INFO trainer.py:249 -- Run results will be logged in: /home/jovyan/ray_results/train_2022-07-06_18-09-58/run_001
[2m[36m(BaseWorkerMixin pid=351, ip=10.10.86.36)[0m 2022-07-06 16:10:05,698	INFO torch.py:347 -- Setting up process group for: env:// [rank=0, world_size=3]
[2m[36m(BaseWorkerMixin pid=351, ip=10.10.86.36)[0m 2022-07-06 16:10:05,784	INFO torch.py:98 -- Moving model to device: cpu
[2m[36m(BaseWorkerMixin pid=351, ip=10.10.86.36)[0m 2022-07-06 16:10:05,785	INFO torch.py:132 -- Wrapping provided mode

[
    {
        "loss": 1.4390658140182495,
        "_timestamp": 1657149005,
        "_time_this_iter_s": 0.015300273895263672,
        "_training_iteration": 1
    },
    {
        "loss": 1.4390658140182495,
        "_timestamp": 1657149005,
        "_time_this_iter_s": 0.012506723403930664,
        "_training_iteration": 1
    },
    {
        "loss": 1.4390658140182495,
        "_timestamp": 1657149005,
        "_time_this_iter_s": 0.009943485260009766,
        "_training_iteration": 1
    }
]
[
    {
        "loss": 1.3922909498214722,
        "_timestamp": 1657149005,
        "_time_this_iter_s": 0.003665447235107422,
        "_training_iteration": 2
    },
    {
        "loss": 1.3922909498214722,
        "_timestamp": 1657149005,
        "_time_this_iter_s": 0.0036590099334716797,
        "_training_iteration": 2
    },
    {
        "loss": 1.3922909498214722,
        "_timestamp": 1657149005,
        "_time_this_iter_s": 0.0033502578735351562,
        "_training_iteration": 

We can start a tensorboard to monitor the training progress

In [21]:
%load_ext tensorboard

In [36]:
%tensorboard --logdir  {str(trainer.latest_run_dir)}

# Parallel hyperparameter tuning

Let's first modify our training function by taking into a config object

In [40]:

def train_func_distributed_tunable(config):
    num_epochs = 10
    lr = config["lr"]
    model = NeuralNetwork()
    model = train.torch.prepare_model(model)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        output = model(input)
        loss = loss_fn(output, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")
        train.report(loss=loss.item())

Then start the tuning

In [None]:
trainable = trainer.to_tune_trainable(train_func_distributed_tunable)

from ray import tune
import numpy as np

analysis = tune.run(
    trainable,
    mode="min",
    metric="loss",
    num_samples=3,
    config={
        "lr": tune.uniform(0.01, 0.05)
    }
)

print(analysis.get_best_config(metric="loss", mode="min"))