# Data Parallelism

This notebook is organized as follows:
- Overview of Distributed Data Parallel (DDP)
- Implementation of DDP workflow (Steps 1–6)
- Issues about `dist.barrier()`

## Overview of Pytorch Distributed Data Parallel

Terms used in distributed training:

- **master node**: the main GPU responsible for synchronizations, making copies, loading models, writing checkpoints and logs;
- **process group**: if you want to train/test the model over K GPUs, then the K process forms a group, which is supported by a backend (PyTorch managed that for you, according to the [documentation](https://pytorch.org/docs/1.9.0/generated/torch.nn.parallel.DistributedDataParallel.html?highlight=distributeddataparallel#torch.nn.parallel.DistributedDataParallel), NCCL is the most recommended one);
- **rank**: within the process group, each process is identified by its rank, from 0 to K-1;
- **world size**: the number of processes in the group.

PyTorch provides two settings for distributed training: `torch.nn.DataParallel` (DP) and `torch.nn.parallel.DistributedDataParallel` (DDP), where the latter is officially recommended. In short, DDP is faster, more flexible than DP. **The fundamental thing DDP does is to copy the model to multiple gpus, gather the gradients from them, average the gradients to update the model, then synchronize the model over all K processes**. 

We can also gather/scatter tensors/objects other than gradients by torch.distributed.gather/scatter/reduce.

In case the model can fit on one GPU (it can be trained on one GPU with batch_size = 1) and we want to train/test it on K GPUs, the best practice of DDP is to copy the model onto the K GPUs (the DDP class automatically does this for you) and split the dataloader to K non-overlapping groups to feed into K models respectively.

In order to make your model leverages multiple GPUS, this is the list of steps to follow:

1. setup the process group, which is three lines of code and needs no modification;
2. split the dataloader to each process in the group, which can be easily achieved by `torch.utils.data.DistributedSampler` or any customized sampler;
3. wrap our model with DDP, which is one line of code and barely needs modification;
4. train/test our model, which is the same as is on 1 GPU;
5. clean up the process groups, which is one line of code;
6. optional: gather extra data among processes (possibly needed for distributed testing), which is basically one line of code.

The example program in this notebook uses the [`torch.nn.parallel.DistributedDataParallel`](https://pytorch.org/docs/stable/nn.html#distributeddataparallel) class for training models in a _data parallel_ fashion: multiple workers train the same global model by processing different portions of a large dataset, computing local gradients (aka _sub_-gradients) independently and then collectively synchronizing gradients using the AllReduce primitive. In HPC terminology, this model of execution is called _Single Program Multiple Data_ or SPMD since the same application runs on all application but each one operates on different portions of the training dataset.

## Setup the process group

```python
import torch.distributed as dist

def setup(rank, world_size):    
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'    
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
```

## Split the dataloader

We can easily split our dataloader by `torch.utils.data.distributed.DistributedSampler`. The sampler returns an iterator over indices, which are fed into dataloader to bachify.

The DistributedSampler splits the total indices of the dataset into `world_size` parts, and evenly distributes them to the dataloader in each process without duplication.

```python
from torch.utils.data.distributed import DistributedSampler

def prepare(rank, world_size, batch_size=32, pin_memory=False, num_workers=0):
    dataset = Your_Dataset()
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=False, drop_last=False)
    dataloader = DataLoader(dataset, batch_size=batch_size, pin_memory=pin_memory, num_workers=num_workers, 
                            drop_last=False, shuffle=False, sampler=sampler)
    return dataloader
```

Suppose K=3, and the length of dataset is 10. We must understand that DistributedSampler imposes even partition of indices.

If we set `drop_last=False` when defining `DistributedSampler`, it will automatically pad. For example, it splits indices `[0,1,2,3,4,5,6,7,8,9]` to `[0,3,6,9]` when `rank=1`, `[0,4,7,0]` when `rank=2`, and `[2,5,8,0]` when `rank=3`. As you can see, such padding may cause issues because the padded 0 is a data record. Otherwise, it will strip off the trailing elements. For example, it splits the indices to `[0,3,6]` at `rank=1`, `[1,4,7]` at `rank=2`, and `[2,5,8]` at `rank=3`. In this case, it tailored 9 to make the indice number divisible by `world_size`.

It is very simple to customize our `Sampler`. We only need to create a class, then define its `__iter__()` and `__len__()` function. Refer to the [official documentation](https://pytorch.org/docs/stable/data.html?highlight=distributedsampler#torch.utils.data.distributed.DistributedSampler) for more details.

## Wrap the model with DDP

We should first move our model to the specific GPU (recall that one model replica resides in one GPU), then we wrap it with DDP class. The following function takes in an argument rank, which we will introduce soon. For now, we just keep in mind rank equals the GPU id.

```python
from torch.nn.parallel import DistributedDataParallel as DDP

def main(rank, world_size):
    # setup the process groups
    setup(rank, world_size)    
    
    # prepare the dataloader
    dataloader = prepare(rank, world_size)
    
    # instantiate the model(it's your own model) and move it to the right device
    model = Model().to(rank)
    
    # wrap the model with DDP
    # device_ids tell DDP where is your model
    # output_device tells DDP where to output, in our case, it is rank
    # find_unused_parameters=True instructs DDP to find unused output of the forward() function of any module in the model    
    model = DDP(model, device_ids=[rank], output_device=rank, find_unused_parameters=True)
```

There are a few tricky things here:

- When we want to access some customized attributes of the DDP wrapped model, we must reference `model.module`. That is to say, our model instance is saved as a module attribute of the DDP model. If we assign some attributes `xxx` other than built-in properties or functions, we must access them by `model.module.xxx`.
- When we save the DDP model, our `state_dict` would add a module prefix to all parameters. 
- Consequently, if we want to load a DDP saved model to a non-DDP model, we have to manually strip the extra prefix. I provide my code below:

In case we load a DDP model checkpoint to a non-DDP `modelmodel_dict = OrderedDict()`

```python
import re
pattern = re.compile('module.')

for k,v in state_dict.items():
    if re.search("module", k):
        model_dict[re.sub(pattern, '', k)] = v
    else:
        model_dict = state_dict
        
model.load_state_dict(model_dict)
```

## Train/test our model

This part is the key to implementing DDP. This notebook uses multi-processing to spawn the K processes (all children processes together with the parent process run the same code) but other options are possibile, i.e. the use of a distrubuted launcher `python -m torch.distributed.launch ..`.

In PyTorch, `torch.multiprocessing` provides convenient ways to create parallel processes. As the official documentation says,

> The spawn function below addresses these concerns and takes care of error propagation, out of order termination, and will actively terminate processes upon detecting an error in one of them.

So, using `spawn` is a good choice.

In our script, we should define a train/test function before spawning it to parallel processes:

```python
def main(rank, world_size):
    # setup the process groups
    setup(rank, world_size)
    # prepare the dataloader
    dataloader = prepare(rank, world_size)
    
    # instantiate the model(it's your own model) and move it to the right device
    model = Your_Model().to(rank)
    
    # wrap the model with DDP
    # device_ids tell DDP where is your model
    # output_device tells DDP where to output, in our case, it is rank
    # find_unused_parameters=True instructs DDP to find unused output of the forward() function of any module in the model    
    
    model = DDP(model, device_ids=[rank], output_device=rank, find_unused_parameters=True)    
    
    #################### The above is defined previously
   
    optimizer = Your_Optimizer()
    loss_fn = Your_Loss()    
    for epoch in epochs:
        # if we are using DistributedSampler, we have to tell it which epoch this is
        dataloader.sampler.set_epoch(epoch)       
        
        for step, x in enumerate(dataloader):
            optimizer.zero_grad(set_to_none=True)
            
            pred = model(x)
            label = x['label']
            
            loss = loss_fn(pred, label)
            loss.backward()
            optimizer.step()    
    cleanup()
```

This `main` function is run in every parallel process. We now need to call it by `spawn` method. In our `.py` script, we write:

```python
import torch.multiprocessing as mp

world_size = 2
mp.spawn(
    main,
    args=(world_size),
    nprocs=world_size
)
```

Remember the first argument of `main` is `rank`? It is automatically passed to each process by `mp.spawn`, we don’t need to pass it explicitly. `rank=0` is the master node by default. The `rank` ranges from `0` to `K-1` (2 in our case).

## Clean up the process groups

The last line of main function is the clean up function, which is:

```python
def cleanup():
    dist.destroy_process_group()
```

## Optional: Gather extra data among processes

Sometimes we need to collect some data from all processes, such as the testing result. We can easily gather tensors by `dist.all_gather` and objects by `dist.all_gather_object`.

Without loss of generality, I assume we want to collect python objects. The only constraint of the object is it must be serializable, which is basically everything in Python. One should always assign `torch.cuda.set_device(rank)` before using `all_gather_xxx`. And, if we want to store a tensor in the object, it must locate at the `output_device`.

```python
def main(rank, world_size):
    torch.cuda.set_device(rank)
    data = {
        'tensor': torch.ones(3,device=rank) + rank,
        'list': [1,2,3] + rank,
        'dict': {'rank':rank}   
    }
    
    # we have to create enough room to store the collected objects
    outputs = [None for _ in range(world_size)]
    
    # the first argument is the collected lists, the second argument is the data unique in each process
    dist.all_gather_object(outputs, data)    
    
    # we only want to operate on the collected objects at master node
    if rank == 0:
        print(outputs)
```

## Issues about dist.barrier()

As the documentation says, `dist.barrier()` synchronizes processes. In other words, it blocks processes until all of them reaches the same line of code: `dist.barrier()`. I summarize its usage as follows:

- we do not need it when training, since DDP automatically does it for us (in `loss.backward()`);
- we do not need it when gathering data, since `dist.all_gather_object` does it for us;
- we need it when enforcing execution order of codes, [say one process loads the model that another process saves](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html) (I can hardly imagine this scenario is needed).

# Preparing and launching a DDP application
Independent of how a DDP application is launched (through multi-processing or distributed launcher), each process needs a mechanism to know its global and local ranks. Once this is known, all processes create a `ProcessGroup` that enables them to participate in collective communication operations such as `AllReduce`.

PyTorch has relatively simple interface for distributed training, i.e. the training script would just have to be launched using `torch.distributed.launch` or `torchrun`, or leave the code spawn multiple processes. 

This set of examples presents simple implementations of distributed training or message passings: 
1. CIFAR-10 classification using DistributedDataParallel wrapped ResNet models
2. ToyModel using multiprocessing interface

## Example 1 (Image classification using ResNet-18 over CIFAR-10)


<table>
<tbody>
<tr>

<td align='center'>
<img src='./figs/ResNet-18-Architecture.png' width=100% /><br/>
<b>ResNet-18 architecture</b>
</td>

</tr>
</tbody>
</table>

```python
"""code/ddp.py"""
def main():

    num_epochs_default = 5
    batch_size_default = 256 # 1024
    learning_rate_default = 0.1
    random_seed_default = 0
    model_dir_default = "saved_models"
    model_filename_default = "resnet_distributed.pth"

    # Each process runs on 1 GPU device specified by the local_rank argument.
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
    parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)
    parser.add_argument("--learning_rate", type=float, help="Learning rate.", default=learning_rate_default)
    parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
    parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
    parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default)
    parser.add_argument("--resume", action="store_true", help="Resume training from saved checkpoint.")
    argv = parser.parse_args()

    num_epochs = argv.num_epochs
    batch_size = argv.batch_size
    learning_rate = argv.learning_rate
    random_seed = argv.random_seed
    model_dir = argv.model_dir
    model_filename = argv.model_filename
    resume = argv.resume

    if local_rank is None:
        local_rank = int(os.environ["LOCAL_RANK"])
    
    # Create directories outside the PyTorch program
    # Do not create directory here because it is not multiprocess safe
    '''
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    '''

    model_filepath = os.path.join(model_dir, model_filename)

    # We need to use seeds to make sure that the models initialized in different processes are the same
    set_random_seeds(random_seed=random_seed)

    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group(backend="nccl")
    # torch.distributed.init_process_group(backend="gloo")

    # Encapsulate the model on the GPU assigned to the current process
    model = torchvision.models.resnet18(pretrained=False)

    device = torch.device("cuda:{}".format(local_rank))
    model = model.to(device)
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

    # We only save the model who uses device "cuda:0"
    # To resume, the device for the saved model would also be "cuda:0"
    if resume == True:
        map_location = {"cuda:0": "cuda:{}".format(local_rank)}
        ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))

    # Prepare dataset and dataloader
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    # Data should be prefetched
    # Download should be set to be False, because it is not multiprocess safe
    train_set = torchvision.datasets.CIFAR10(root="/workspace/data/cifar-10", train=True, download=True, transform=transform) 
    test_set = torchvision.datasets.CIFAR10(root="/workspace/data/cifar-10", train=False, download=True, transform=transform)

    # Restricts data loading to a subset of the dataset exclusive to the current process
    train_sampler = DistributedSampler(dataset=train_set)

    train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
    # Test loader does not have to follow distributed sampling strategy
    test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)

    # Loop over the dataset multiple times
    for epoch in range(num_epochs):

        t0 = time.time()
        # Save and evaluate model routinely
        if epoch % 1 == 0:
            if local_rank == 0:
                accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
                torch.save(ddp_model.state_dict(), model_filepath)
                print("-" * 75)
                print("Epoch: {}, Accuracy: {}".format(epoch, accuracy))
                print("-" * 75)

        ddp_model.train()
        
        for data in train_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        print("Local Rank: {}, Epoch: {}, Training ...".format(local_rank, epoch))
        print("Time {} seconds".format(round(time.time() - t0, 2)))
```

### Caveats

- Set random seed to make sure that the models initialized in different processes are the same. 
- Use `DistributedDataParallel` to wrap the model for distributed training.
- Use `DistributedSampler` to training data loader.
- To save models, each node would save a copy of the checkpoint file in the local hard drive.
- Downloading dataset and making directories should be avoided in the distributed training program as they are not multi-process safe, unless we use some sort of barriers, such as `torch.distributed.barrier`.
- The node communication bandwidth are extremely important for multi-node distributed training. Instead of randomly finding two computers in the network, try to use the nodes from the specialized computing clusters, since the communications between the nodes are highly optimized.

### Launch command 1

In [5]:
! CUDA_VISIBLE_DEVICES=0,1 python -W ignore -m torch.distributed.launch --nproc_per_node=2 --nnodes=1 --node_rank=0 --master_addr="127.0.0.1" --master_port=1234 code/ddp.py --num_epochs 5 --model_dir data

Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
Files already downloaded and verifiedFiles already downloaded and verified

Files already downloaded and verifiedFiles already downloaded and verified

---------------------------------------------------------------------------
Epoch: 0, Accuracy: 0.0
---------------------------------------------------------------------------
Local Rank: 0, Epoch: 0, Training ...
Time 7.86 seconds
Local Rank: 1, Epoch: 0, Training ...
Time 7.86 seconds
---------------------------------------------------------------------------
Epoch: 1, Accuracy: 0.3269
---------------------------------------------------------------------------
Local Rank: 0, Epoch: 1, Training ...
Time 6.0 seconds
Local Rank: 1, Epoch: 1, Training ...
Time 6.01 seconds
------------------

#### Launch command 2

In [2]:
! CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --standalone --nnodes=1 --nproc_per_node=2 code/ddp.py --num_epochs 5

Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
Files already downloaded and verifiedFiles already downloaded and verified

Files already downloaded and verifiedFiles already downloaded and verified

---------------------------------------------------------------------------
Epoch: 0, Accuracy: 0.0
---------------------------------------------------------------------------
Local Rank: 0, Epoch: 0, Training ...
Time 7.96 seconds
Local Rank: 1, Epoch: 0, Training ...
Time 7.96 seconds
---------------------------------------------------------------------------
Epoch: 1, Accuracy: 0.3269
---------------------------------------------------------------------------
Local Rank: 0, Epoch: 1, Training ...
Time 5.68 seconds
Local Rank: 1, Epoch: 1, Training ...
Time 5.69 seconds
-----------------

## Example 2 (ToyModel using multiprocessing)

```python
"""mp.py"""
def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)
```

#### Launch command

In [6]:
! CUDA_VISIBLE_DEVICES=1,2 python -W ignore code/mp.py 

Running basic DDP example on rank 1.
Running basic DDP example on rank 0.
Running DDP with model parallel example on rank 0.
Running DDP with model parallel example on rank 1.


**What's happen with the following command?**

In [7]:
! CUDA_VISIBLE_DEVICES=1,2 torchrun --standalone --nnodes=1 --nproc_per_node=2 code/mp.py

Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
Running basic DDP example on rank 1.
Running basic DDP example on rank 0.
Running basic DDP example on rank 0.
Running basic DDP example on rank 1.
Running DDP with model parallel example on rank 0.
Running DDP with model parallel example on rank 1.
Running DDP with model parallel example on rank 1.
Running DDP with model parallel example on rank 0.


## Credits
- [https://medium.com/codex/a-comprehensive-tutorial-to-pytorch-distributeddataparallel-1f4b42bb1b51](https://medium.com/codex/a-comprehensive-tutorial-to-pytorch-distributeddataparallel-1f4b42bb1b51)
- [https://pytorch.org/tutorials/intermediate/dist_tuto.html](https://pytorch.org/tutorials/intermediate/dist_tuto.html)
- [https://leimao.github.io/blog/PyTorch-Distributed-Training/](https://leimao.github.io/blog/PyTorch-Distributed-Training/)
- [https://pytorch.org/tutorials/intermediate/ddp_tutorial.html](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)
- [https://pytorch.org/docs/stable/notes/cuda.html](https://pytorch.org/docs/stable/notes/cuda.html)
- [https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html](https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html)
- [https://huggingface.co/docs/transformers/performance](https://huggingface.co/docs/transformers/performance)
- [https://github.com/pytorch/tutorials/blob/master/intermediate_source/dist_tuto.rst](https://github.com/pytorch/tutorials/blob/master/intermediate_source/dist_tuto.rst)