# Data Parallel Training with PyTorch DDP

In this notebook we will learn how to engineer data parallel job using PyTorch Distributed Data Parallel (`DDP`). While SageMaker doesn’t support PyTorch DDP natively, it’s possible to run DDP training jobs on SageMaker. 

As a training task, we will finetune pretrained `Resnet18` model to classify ants and bees. We will use open-source **Hymenoptera dataset**. We will distribute our training across two nodes of `p2.xlarge` instances with single GPU device each. Feel free to modify number and/or type of instances in training cluster and observe how this change training performance. Note, that this is a small-scale training and will not be indicative of training efficiency on real-life tasks. 

We start with necessary imports and basic SageMaker training configuration.

In [None]:
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()
role = get_execution_role() # replace it with role ARN if you are not using SageMaker Notebook or Studio environments.

bucket = sagemaker_session.default_bucket()
prefix = 'sagemaker/pytorch-distribution-options'
print('Bucket:\n{}'.format(bucket))

Next, we download, unzip, and upload dataset to Amazon S3 bucket. Note, it may take several minutes to complete these operations. 

In [None]:
# Downloading dataset and unzipping it locally
! wget https://download.pytorch.org/tutorial/hymenoptera_data.zip
! unzip hymenoptera_data.zip

In [4]:
data_url = sagemaker_session.upload_data(path="./hymenoptera_data", key_prefix="hymenoptera_data")
print(f"S3 location of dataset {data_url}")

### Launching Distributed Training Processes

Amazon SageMaker has no out-of-the-box support for PyTorch DDP training. Specifically, SageMaker doesn't support starting multiple distributed DDP processes in training cluster. To compensate, we need to develop a launching utility to perform this function. This utility is quite simple and then can be re-used for any other DDP-based training jobs. Launcher script is located here: `2_sources/launcher.py`.

In launcher script we use DDP module `torch.distributed.run` which simplifies spawning training processes in cluster.  As part of launcher script, we need to collect information about training world: number of compute nodes and GPUs devices in cluster as well as identify node which will act as master coordinator. Then `torch.distributed.run` will spawn multiple training processes according to this configuration.

Let’s review how this setup is implemented in launcher script. 

**1. Collecting training cluster configuration.**

First, we need to collect information about SageMaker training cluster. For this, we use environmental variables - `SM_HOSTS` (list of compute hosts in training cluster), `SM_CURRENT_HOST` (hostname where given process is running), and `SM_NUM_GPUS` (number of GPU devices available on compute node). These variables are set by SageMaker automatically at the start of your training job.

```python
    nodes = json.loads(os.getenv("SM_HOSTS"))
    nnodes = len(nodes)
    node_rank = nodes.index(os.getenv("SM_CURRENT_HOST"))
    nproc_per_node = os.getenv("SM_NUM_GPUS", 1)
```

**2. Starting Training Processes.**
Next we need to form command line for `torch.distributed.run` with instructions on how it should spawn training processes in training cluster. See code snippet below with inline comments on specific parameters. Note, that below we are using torch.distributed.run as Python module. Alternatively, you can use its script version `torchrun`. Find more details on `torchrun` in PyTorch [documentation](https://pytorch.org/docs/stable/elastic/run.html).

```python
    cmd = [
        sys.executable,
        "-m",
        "torch.distributed.run",
        f"--nproc_per_node={nproc_per_node}", # how many processes per compute node to start
        f"--nnodes={str(nnodes)}", # how many compute nodes in training cluster
        f"--node_rank={node_rank}", # rank of current compute node in training cluster
        f"--rdzv_id={os.getenv('SAGEMAKER_JOB_NAME')}", # a unique job id shared by all nodes in cluster
        "--rdzv_backend=c10d", # distibuted communcation backend
        f"--rdzv_endpoint={nodes[0]}:{RDZV_PORT}", # master node
        distr_args.train_script, # training script which will be executed in all training processes
    ]
    # Adding training hyperparameters which will be then passed in training script
    cmd.extend(training_hyperparameters)
```
Note, that we are adding training hyperparameters “as is” in the end of command line. These arguments are not handled by launcher, but by training script to configure training. 

To actual execut the launch process we use Python `subprocess.Popen()` method:

```python
    process = subprocess.Popen(cmd, env=os.environ)
    process.wait()
    if process.returncode != 0:
        raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd)
```

Note, that we are copying environment variables to subprocesses to preserve all SageMaker variables. If spawned process returns non-zero code (an indication of error), we then raise exception to propagate error code to SageMaker control plane.

Summarizing, our launcher utility is responsible for collecting training cluster configuration and then starting torch.distributed.run on each node. The utility then takes care of starting multiple training processes. Run cell below to review full listing of launcher utility.

In [None]:
!pygmentize 2_sources/launcher.py

### Adopting Training Script For DDP

To use DDP, we need to make minimal changes in our training script. First of all, we initialize training process and add it to DDP process group:

```python
dist.init_process_group(
    backend="nccl",
    rank=int(os.getenv("RANK", 0)),
    world_size=int(os.getenv("WORLD_SIZE", 1)),
)
```

Since we have GPU-based instances, we use `NCCL` communication backend. Also we utilize enviornment variables set but `torch.distributed.run` module: world size and global rank. 

Next, we need to identify which GPU device will store model and run computations. We use `LOCAL_RANK` variable set by `torch.distributed.run` during process spawn.

```python
torch.cuda.set_device(os.getenv("LOCAL_RANK"))
device = torch.device("cuda")
model = model.to(device)
```

We then wrap our regular PyTorch model with special DDP implementation. This implementation allows us to work with PyTorch model as if it is a regular locally stored model. under the hood, DDP implements gradient synchronization between training processes in process group.

```python
model = DDP(model)
```

Last step we need to need to modify training data loader so tghat each training process gets a unqiue slice of data during training step. For this, we use `DistributedSampler` which samples data records based on total number of processes (`world_size` variable) and global rank (`rank` variable) of given training process:

```python
    # Note that we are passing global rank in data samples to get unique data slice
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        image_datasets["train"], num_replicas=args.world_size, rank=args.rank
    )
    train_loader = torch.utils.data.DataLoader(
        image_datasets["train"],
        batch_size=args.batch_size,
        shuffle=False,
        num_workers=0,
        pin_memory=True,
        sampler=train_sampler,
    ) 
```
You can review a full listing of training script by running cell below.


In [None]:
!pygmentize 2_sources/train_ddp.py

## Running Training Job

Once we have launcher utility and training script ready, we can start our distributed training on SageMaker. Note, that since we need to start training via launcher utilityu, we set `entry_point` parameter accordingly. We pass actual training script as part of `hyperparameters` object.

In [None]:
from sagemaker.pytorch import PyTorch

ps_instance_type = 'ml.p3.2xlarge'
ps_instance_count = 2

hyperparameters = {
  'train-script': 'train_ddp.py',
  'epochs': 25,
  }

estimator_ms = PyTorch(
                       source_dir='2_sources',
                       entry_point='launcher.py', 
                       role=role,
                       framework_version='1.9',
                       py_version='py38',
                       disable_profiler=True,
                       debugger_hook_config=False,
                       hyperparameters=hyperparameters,
                       instance_count=ps_instance_count, 
                       instance_type=ps_instance_type,
                       )

In [None]:
estimator_ms.fit(inputs={"train":f"{data_url}/train", "val":f"{data_url}/val"})

The training job should complete within 8–9 minutes. Feel free to review the debug messages in the training job logs. Additionally, you can experiment with other parameters such as the instance type and size, the number of epochs, the batch size, and more.

## Summary

In this example, we learned how to engineer data parallel distributed training using PyTorch DDP - native Allreduce implementation. 