### Distributed Data Parallel on a Simple Computer Vision Task

This tutorial shows and example of using Tune to perform a simple training task.  The task consists of training a model on the MNIST dataset using Data Distributed Parallel and Pytorch.  Ray provides a nice wrapper function for DDP so that any trainable classifier can be used for deep learning.  The MNIST database The MNIST (Modified National Institute of Standards and Technology database is a large set of handwritten numbers.  The data is commonly used for training various image processing system (Wikipedia). Here we train a simple classifer for images, train_mnist, and leverage the paradign of distributed data parallel (DDP).  DDP distributes analyses in parallel across multiple nodes.  It is considered one of the fastest and most efficient algorithms.  It was originally designed by scientists at Meta (then called Facebook).  See references for more information.

#### References:

Pytorch Distributed Overview: [Pytorch Distributed Overview](https://pytorch.org/tutorials/beginner/dist_overview.html)

Distributed Data Parallel: [Distributed Data Parallel](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)

Ray RLLib Documentation: [Ray RLLib Documentation](https://docs.ray.io/en/master/rllib.html)

Ray Tune Documentation: [Ray Tune Documentation](https://docs.ray.io/en/master/tune/index.html)

*This tutorial is adapted from the documentation for Ray version 1.9.

#### Check the version of Ray, instantiate and instance of ray and check the number of nodes

It is helpful to check your verison of Ray to make sure you have correct code.  Ray tends to get updated frequently and as of the time of this tutorial the latest stable version is 1.9.  You can adapt this tutorial to work with other versions of Ray.

In [1]:
!ray --version

ray, version 1.9.0
[0m

In [2]:
import ray
import os

if ray.is_initialized() == False:
   service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
   service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
   _temp_dir='/domino/datasets/local/{}/'.format(os.environ['DOMINO_PROJECT_NAME']) #set to a dataset
   ray.util.connect(f"{service_host}:{service_port}")

In [3]:
ray.nodes()

[{'NodeID': '22b8f92ba357331f114bc8096d97e23e9fda4d0e9340a3025b6be54f',
  'Alive': True,
  'NodeManagerAddress': '10.0.46.158',
  'NodeManagerHostname': 'ray-61bce103fa3de41ad2934902-ray-worker-3',
  'NodeManagerPort': 2385,
  'ObjectManagerPort': 2384,
  'ObjectStoreSocketName': '/tmp/ray/session_2021-12-17_19-12-18_498860_1/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_2021-12-17_19-12-18_498860_1/sockets/raylet',
  'MetricsExportPort': 63824,
  'alive': True,
  'Resources': {'accelerator_type:V100': 1.0,
   'memory': 41296992666.0,
   'GPU': 1.0,
   'CPU': 7.0,
   'node:10.0.46.158': 1.0,
   'object_store_memory': 17698711142.0}},
 {'NodeID': '34613c8b93a2c46e37a14440d9ed5fcde848e096707257d55868b0f1',
  'Alive': True,
  'NodeManagerAddress': '10.0.40.69',
  'NodeManagerHostname': 'ray-61bce103fa3de41ad2934902-ray-head-0',
  'NodeManagerPort': 2385,
  'ObjectManagerPort': 2384,
  'ObjectStoreSocketName': '/tmp/ray/session_2021-12-17_19-12-18_498860_1/sockets/plasma_s

#### Setting Up Tune, Distributed Data Parallel Wrapper and Trainable classes

In order to use Tune a trainable classier is required.  That is a classifer that is created and will be fed into the Tune training definition (tune.run).  Our classifier here is called 'train_mnist' and the trainable classifier is the first classifer under the function run_ddp.  We have a second snippet of code that uses tune.run to run the classifer.  For this example we are running the iterations only once, but the number of iteration can be modified in your project depending on how the code is set-up.  Try running the code and seeing the classifer being trained. The trained classifer will be saved to your local Domino directory at the location specified.

In [4]:
import argparse
import logging
import os
import torch
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel

import ray
from ray import tune
from ray.tune.examples.mnist_pytorch import (train, test, get_data_loaders,
                                             ConvNet)
from ray.tune.integration.torch import (DistributedTrainableCreator,
                                        distributed_checkpoint_dir)


In [7]:
logger = logging.getLogger(__name__)

def train_mnist(config, checkpoint_dir=False):
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    train_loader, test_loader = get_data_loaders()
    model = ConvNet().to(device)
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    if checkpoint_dir:
        with open(os.path.join(checkpoint_dir, "checkpoint")) as f:
            model_state, optimizer_state = torch.load(f)

        model.load_state_dict(model_state)
        optimizer.load_state_dict(optimizer_state)

    model = DistributedDataParallel(model)

    for epoch in range(40):
        train(model, optimizer, train_loader, device)
        acc = test(model, test_loader, device)

        if epoch % 3 == 0:
            with distributed_checkpoint_dir(step=epoch) as checkpoint_dir:
                path = os.path.join(checkpoint_dir, "checkpoint")
                torch.save((model.state_dict(), optimizer.state_dict()), path)
        tune.report(mean_accuracy=acc)


def run_ddp_tune(num_workers, num_gpus_per_worker, workers_per_node=None):
    trainable_cls = DistributedTrainableCreator(
        train_mnist,
        num_workers=num_workers,
        num_gpus_per_worker=num_gpus_per_worker,
        num_workers_per_host=workers_per_node) #add in

    analysis = tune.run(
        trainable_cls,
        num_samples=1,
        stop={"training_iteration": 1},
        metric="mean_accuracy",
        mode="max",
        sync_config=tune.SyncConfig(
        syncer=None))

#### Training the Classifer

Here we train the classifer without using a sync configuration (that is we do not sync our log system to the cloud storage).  If you wish to sync to cloud storage instructions on how to do so are here: [Sync to Cloud Storage](!https://docs.ray.io/en/latest/tune/user-guide.html#tune-checkpoint-syncing).  During this process you will see three statuses for the run.  Those are pending, running and terminated.  Terminated indicates the run was successfully completed.  After training, you can choose the best checkpoint, call it back, and voìla, use it for predictions on new data.

In [8]:
import torch
from ray.util.sgd import TorchTrainer
from ray.util.sgd.torch import TrainingOperator
from ray.tune import grid_search

run_ddp_tune(
        num_workers=2,
        num_gpus_per_worker=1,
        workers_per_node=1)

[2m[36m(run pid=222)[0m == Status ==
[2m[36m(run pid=222)[0m Current time: 2021-12-17 19:14:15 (running for 00:00:04.31)
[2m[36m(run pid=222)[0m Memory usage on this node: 3.3/60.0 GiB
[2m[36m(run pid=222)[0m Using FIFO scheduling algorithm.
[2m[36m(run pid=222)[0m Resources requested: 2.0/35 CPUs, 2.0/5 GPUs, 0.0/186.81 GiB heap, 0.0/82.42 GiB objects (0.0/5.0 accelerator_type:V100)
[2m[36m(run pid=222)[0m Result logdir: /home/ubuntu/ray_results/WrappedDistributedTorchTrainable_2021-12-17_19-14-11
[2m[36m(run pid=222)[0m Number of trials: 1/1 (1 RUNNING)
[2m[36m(run pid=222)[0m +----------------------------------------------+----------+----------------+
[2m[36m(run pid=222)[0m | Trial name                                   | status   | loc            |
[2m[36m(run pid=222)[0m |----------------------------------------------+----------+----------------|
[2m[36m(run pid=222)[0m | WrappedDistributedTorchTrainable_83e1d_00000 | RUNNING  | 10.0.38.75:112 |


  0%|          | 0/9912422 [00:00<?, ?it/s]158)[0m 
  4%|▍         | 384000/9912422 [00:00<00:02, 3839124.96it/s]
 40%|████      | 3974144/9912422 [00:00<00:00, 22697234.80it/s]
9913344it [00:00, 32939824.71it/s]                             


[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m Extracting /home/ubuntu/data/MNIST/raw/train-images-idx3-ubyte.gz to /home/ubuntu/data/MNIST/raw
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m 
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to /home/ubuntu/data/MNIST/raw/train-labels-idx1-ubyte.gz
[2m[36m(run pid=222)[0m == Status ==
[2m[36m(run pid=222)[0m Current time: 2021-12-17 19:14:16 (running for 00:00:05.31)
[2m[36m(run pid=222)[0m Memory usage on this node: 3.3/60.0 GiB
[2m[36m(run pid=222)[0m Using FIFO scheduling algorithm.
[2m[36m(run pid=222)[0m Resources requested: 2.0/35 CPUs, 2.0/5 GPUs, 0.0/186.81 GiB heap, 0.0/82.42 GiB objects (0.0/5.0 accelerator_type:V100)
[2m[36m(run pid=222)[0m Result logdir: /home/ubuntu/ray_results/WrappedDistributedTorch

[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m   0%|          | 0/28881 [00:00<?, ?it/s]29696it [00:00, 1044758.77it/s]          


[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m Extracting /home/ubuntu/data/MNIST/raw/train-labels-idx1-ubyte.gz to /home/ubuntu/data/MNIST/raw
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m 
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to /home/ubuntu/data/MNIST/raw/t10k-images-idx3-ubyte.gz


[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m   0%|          | 0/1648877 [00:00<?, ?it/s]
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m  23%|██▎       | 371712/1648877 [00:00<00:00, 3684323.64it/s]1649664it [00:00, 10175297.78it/s]                           


[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m Extracting /home/ubuntu/data/MNIST/raw/t10k-images-idx3-ubyte.gz to /home/ubuntu/data/MNIST/raw
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m 
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to /home/ubuntu/data/MNIST/raw/t10k-labels-idx1-ubyte.gz


[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m   0%|          | 0/4542 [00:00<?, ?it/s]5120it [00:00, 23572817.21it/s]         


[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m Extracting /home/ubuntu/data/MNIST/raw/t10k-labels-idx1-ubyte.gz to /home/ubuntu/data/MNIST/raw
[2m[36m(ImplicitFunc pid=105, ip=10.0.46.158)[0m 
[2m[36m(run pid=222)[0m Result for WrappedDistributedTorchTrainable_83e1d_00000:
[2m[36m(run pid=222)[0m   date: 2021-12-17_19-14-21
[2m[36m(run pid=222)[0m   done: false
[2m[36m(run pid=222)[0m   experiment_id: 843829f8718046e6978ce91e10548d3b
[2m[36m(run pid=222)[0m   hostname: ray-61bce103fa3de41ad2934902-ray-worker-2
[2m[36m(run pid=222)[0m   iterations_since_restore: 1
[2m[36m(run pid=222)[0m   mean_accuracy: 0.575
[2m[36m(run pid=222)[0m   node_ip: 10.0.38.75
[2m[36m(run pid=222)[0m   pid: 112
[2m[36m(run pid=222)[0m   should_checkpoint: true
[2m[36m(run pid=222)[0m   time_since_restore: 5.652081727981567
[2m[36m(run pid=222)[0m   time_this_iter_s: 5.652081727981567
[2m[36m(run pid=222)[0m   time_total_s: 5.652081727981567
[2m[36m(run pid=222

[2m[36m(run pid=222)[0m 2021-12-17 19:14:21,606	INFO tune.py:626 -- Total run time: 10.58 seconds (10.46 seconds for the tuning loop).


#### What's next?

*Try our Reinforcement Learning use case or our Beginner's Tutorial*