# PyTorch MNIST Example: Ray + Horovod

----

We are going to run a simple PyTorch MNIST Example. We will train the model distributed using horovod on a Ray Cluster. We are following the code in this example https://docs.ray.io/en/latest/train/examples/horovod/horovod_example.html


## Setup

Execute these command in terminal/shell to install the jupyter kernel in your user environment:
```bash
source setup.sh
```

Then select the kernel which uses `nersc/pytorch:ngc-22.09-v0` shifter image.



## Step 1: Start the Ray cluster in the background


**[Dev note]**: Current this is an `sbatch` submission on shared-cpu in JupyterHub but might be better if running in the Jupyter-qos with srun instead.


Open terminal and execute:
```bash
source scripts/submit_ray_cluster.sh
```

Change parameters in `scripts/submit_ray_cluster.sh` to change the job configuration.


In [1]:
# Check job log
!cat logs/*.log


cat: 'logs/*.log': No such file or directory


## Step 2: Connect to the Cluster

In [1]:
import ray

from utility import get_ray_cluster_address, cluster_summary

cluster_address = get_ray_cluster_address()

In [2]:
ray.init(cluster_address)

0,1
Python version:,3.8.13
Ray version:,2.0.0


In [3]:
cluster_summary()

Cluster Summary
---------------
Nodes: 2
CPU:   256
GPU:   8
RAM:   315.08 GB


## Step 3: Setup Model

In [4]:
import argparse
import os

import horovod.torch as hvd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from filelock import FileLock
from torchvision import datasets, transforms

import ray
from ray.air import session
from ray.train.horovod import HorovodTrainer
from ray.air.config import ScalingConfig

In [5]:
def metric_average(val, name):
    tensor = torch.tensor(val)
    avg_tensor = hvd.allreduce(tensor, name=name)
    return avg_tensor.item()


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)

In [6]:
def setup(config):
    data_dir = config.get("data_dir", None)
    seed = config.get("seed", 42)
    batch_size = config.get("batch_size", 64)
    use_adasum = config.get("use_adasum", False)
    lr = config.get("lr", 0.01)
    momentum = config.get("momentum", 0.5)
    use_cuda = config.get("use_cuda", False)

    # Horovod: initialize library.
    hvd.init()
    torch.manual_seed(seed)

    if use_cuda:
        # Horovod: pin GPU to local rank.
        torch.cuda.set_device(hvd.local_rank())
        torch.cuda.manual_seed(seed)

    # Horovod: limit # of CPU threads to be used per worker.
    torch.set_num_threads(1)

    kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}
    data_dir = data_dir or "~/data"
    with FileLock(os.path.expanduser("~/.horovod_lock")):
        train_dataset = datasets.MNIST(
            data_dir,
            train=True,
            download=True,
            transform=transforms.Compose(
                [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
            ),
        )
    # Horovod: use DistributedSampler to partition the training data.
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset, num_replicas=hvd.size(), rank=hvd.rank()
    )
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler, **kwargs
    )

    model = Net()

    # By default, Adasum doesn't need scaling up learning rate.
    lr_scaler = hvd.size() if not use_adasum else 1

    if use_cuda:
        # Move model to GPU.
        model.cuda()
        # If using GPU Adasum allreduce, scale learning rate by local_size.
        if use_adasum and hvd.nccl_built():
            lr_scaler = hvd.local_size()

    # Horovod: scale learning rate by lr_scaler.
    optimizer = optim.SGD(model.parameters(), lr=lr * lr_scaler, momentum=momentum)

    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(
        optimizer,
        named_parameters=model.named_parameters(),
        op=hvd.Adasum if use_adasum else hvd.Average,
    )

    return model, optimizer, train_loader, train_sampler

In [7]:
def train_epoch(
    model, optimizer, train_sampler, train_loader, epoch, log_interval, use_cuda
):
    loss = None
    model.train()
    # Horovod: set epoch to sampler for shuffling.
    train_sampler.set_epoch(epoch)
    for batch_idx, (data, target) in enumerate(train_loader):
        if use_cuda:
            data, target = data.cuda(), target.cuda()
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            # Horovod: use train_sampler to determine the number of
            # examples in this worker's partition.
            print(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_sampler),
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )
    return loss.item() if loss else None


In [8]:
def train_func(config):
    num_epochs = config.get("num_epochs", 10)
    log_interval = config.get("log_interval", 10)
    use_cuda = config.get("use_cuda", False)

    model, optimizer, train_loader, train_sampler = setup(config)

    for epoch in range(num_epochs):
        loss = train_epoch(
            model, optimizer, train_sampler, train_loader, epoch, log_interval, use_cuda
        )
        session.report(dict(loss=loss))


## Step 4: Train Model

In [19]:
import os

P_SCRATCH = os.getenv('SCRATCH')

num_workers = 8
use_cuda = True

kwargs = {
        "data_dir": os.path.join(P_SCRATCH, 'MNIST'),
        "seed": 42,
        "use_cuda": use_cuda,
        "batch_size": 64,
        "use_adasum": False,
        "lr": 0.01,
        "momentum": 0.5,
        "num_epochs": 50, #5
        "log_interval": 20,
    }

In [20]:
trainer = HorovodTrainer(
        train_func,
        train_loop_config=kwargs,
        scaling_config=ScalingConfig(use_gpu=use_cuda, num_workers=num_workers),
    )
results = trainer.fit()
print(results.metrics)



[2m[36m(TunerInternal pid=55624)[0m == Status ==
[2m[36m(TunerInternal pid=55624)[0m Current time: 2023-02-03 23:54:04 (running for 00:00:04.20)
[2m[36m(TunerInternal pid=55624)[0m Memory usage on this node: 36.0/251.3 GiB
[2m[36m(TunerInternal pid=55624)[0m Using FIFO scheduling algorithm.
[2m[36m(TunerInternal pid=55624)[0m Resources requested: 1.0/256 CPUs, 8.0/8 GPUs, 0.0/315.08 GiB heap, 0.0/139.03 GiB objects (0.0/2.0 accelerator_type:A100)
[2m[36m(TunerInternal pid=55624)[0m Result logdir: /global/homes/a/asnaylor/ray_results/HorovodTrainer_2023-02-03_23-54-00
[2m[36m(TunerInternal pid=55624)[0m Number of trials: 1/1 (1 RUNNING)
[2m[36m(TunerInternal pid=55624)[0m +----------------------------+----------+------------------+
[2m[36m(TunerInternal pid=55624)[0m | Trial name                 | status   | loc              |
[2m[36m(TunerInternal pid=55624)[0m |----------------------------+----------+------------------|
[2m[36m(TunerInternal pid=55624)



[2m[36m(TunerInternal pid=55624)[0m Result for HorovodTrainer_078cb_00000:
[2m[36m(TunerInternal pid=55624)[0m   _time_this_iter_s: 7.078815460205078
[2m[36m(TunerInternal pid=55624)[0m   _timestamp: 1675468458
[2m[36m(TunerInternal pid=55624)[0m   _training_iteration: 1
[2m[36m(TunerInternal pid=55624)[0m   date: 2023-02-03_23-54-18
[2m[36m(TunerInternal pid=55624)[0m   done: false
[2m[36m(TunerInternal pid=55624)[0m   experiment_id: 6ede253bc04341c0b40ed61f6b7b2d6e
[2m[36m(TunerInternal pid=55624)[0m   hostname: nid001033
[2m[36m(TunerInternal pid=55624)[0m   iterations_since_restore: 1
[2m[36m(TunerInternal pid=55624)[0m   loss: 0.6732971668243408
[2m[36m(TunerInternal pid=55624)[0m   node_ip: 10.249.4.75
[2m[36m(TunerInternal pid=55624)[0m   pid: 9294
[2m[36m(TunerInternal pid=55624)[0m   time_since_restore: 13.968108654022217
[2m[36m(TunerInternal pid=55624)[0m   time_this_iter_s: 13.968108654022217
[2m[36m(TunerInternal pid=55624)[0m  

[2m[36m(RayTrainWorker pid=11276, ip=128.55.64.255)[0m [2023-02-03 23:55:12.557471: E /tmp/pip-install-422qdyel/horovod_c00d8dc7772b41e1ac75987ace25e1c6/horovod/common/operations.cc:697] [2]: Horovod background loop uncaught exception: [/tmp/pip-install-422qdyel/horovod_c00d8dc7772b41e1ac75987ace25e1c6/third_party/gloo/gloo/transport/tcp/pair.cc:598] Connection closed by peer [10.249.4.75]:52107


[2m[36m(TunerInternal pid=55624)[0m Result for HorovodTrainer_078cb_00000:
[2m[36m(TunerInternal pid=55624)[0m   _time_this_iter_s: 1.106130599975586
[2m[36m(TunerInternal pid=55624)[0m   _timestamp: 1675468512
[2m[36m(TunerInternal pid=55624)[0m   _training_iteration: 50
[2m[36m(TunerInternal pid=55624)[0m   date: 2023-02-03_23-55-12
[2m[36m(TunerInternal pid=55624)[0m   done: true
[2m[36m(TunerInternal pid=55624)[0m   experiment_id: 6ede253bc04341c0b40ed61f6b7b2d6e
[2m[36m(TunerInternal pid=55624)[0m   experiment_tag: '0'
[2m[36m(TunerInternal pid=55624)[0m   hostname: nid001033
[2m[36m(TunerInternal pid=55624)[0m   iterations_since_restore: 50
[2m[36m(TunerInternal pid=55624)[0m   loss: 0.019091863185167313
[2m[36m(TunerInternal pid=55624)[0m   node_ip: 10.249.4.75
[2m[36m(TunerInternal pid=55624)[0m   pid: 9294
[2m[36m(TunerInternal pid=55624)[0m   time_since_restore: 68.00352334976196
[2m[36m(TunerInternal pid=55624)[0m   time_this_iter

[2m[36m(TunerInternal pid=55624)[0m 2023-02-03 23:55:13,993	INFO tune.py:758 -- Total run time: 73.90 seconds (73.78 seconds for the tuning loop).


In [23]:
results

Result(metrics={'loss': 0.019091863185167313, '_timestamp': 1675468512, '_time_this_iter_s': 1.106130599975586, '_training_iteration': 50, 'done': True, 'trial_id': '078cb_00000', 'experiment_tag': '0'}, error=None, log_dir=PosixPath('/global/homes/a/asnaylor/ray_results/HorovodTrainer_2023-02-03_23-54-00/HorovodTrainer_078cb_00000_0_2023-02-03_23-54-00'))

## Step 5: Explore Training in Tensorboard

In [24]:
import nersc_tensorboard_helper
%load_ext tensorboard

In [34]:
%tensorboard --logdir $results.log_dir --port 0

In [35]:
nersc_tensorboard_helper.tb_address()

### Stop cluster connection and job

In [22]:
ray.shutdown()

Excecute this in terminal:
```bash
scancel -u $USER #note this stops all current jobs for the user
```