Ref: https://pytorch.org/tutorials/beginner/dist_overview.html
Videos Ref:
- https://youtu.be/3XUG7cjte2U?si=KFBAQCDN8i74xLRI
- https://pytorch.org/tutorials/beginner/ddp_series_intro.html?utm_source=distr_landing&utm_medium=ddp_series_intro

###**Pytorch Distributed Training**
Instead of training the large model with large data on a single machine/gpu/device, we gonna use distributed training paradigm to use multiple systems/devices/gpus that enables us to give large compute power and also saves us time.

Pytorch Distributed Training has 2 Paradigms
- Data Parallel Training
    - Data is too large, and needs multiple resources to process the data in parallel. In this case, we have one model and is replicated into each resource or device. Each resource receives different copies of data that will processed in parallell across devices. The model replicas are then synchronized across different devices
- Model Parallel Training
    - Model is too large, which cannot fit in a single device or machine. In this case. the model is partitioned or sharded and the shards are then each shard is put on different device or a different servers
-----------------------------
- Data Parallel Training
    - DataParallel `torch.nn.DataParallel`
        - Single-machine multi-GPUs
    - Distributed Data Parallel - DDP `torch.nn.parallel.DistributedDataParallel`(recommended)
        - Single-machine multi-GPUs
    - Distributed Data Parallel + launching script - DDP `torch.nn.parallel.DistributedDataParallel`
        - Multi-machine GPUs
    -  `torch.distributed.elastic`
    - Fully Sharded Data Parallel (FSDP)
- Distributed training / Distributed Model Parallel Training
    - Remote Procedure Call (RPC) distributed training `torch.distributed.rpc`
- Hybrid Parallel Solutions
    - Combination of both DataParallel and ModelParallel training paradigms

###**Data Parallel Training**
PyTorch provides several options for data-parallel training. For applications that gradually grow from simple to complex and from prototype to production, the common development trajectory would be:

- Use single-device training if the data and model can fit in one GPU, and training speed is not a concern.

- Use **single-machine multi-GPU** `DataParallel` to make use of multiple GPUs on a single machine to speed up training with minimal code changes.

- Use **single-machine multi-GPU** `DistributedDataParallel`, if you would like to further speed up training and are willing to write a little more code to set it up.

- Use **multi-machine multiple machines on a cluster/multiple machines with multi-GPU on a cluster** `DistributedDataParallel` and the `launching script`, if the application needs to scale across machine boundaries.

- Use `torch.distributed.elastic` to launch distributed training if errors (e.g., out-of-memory) are expected or if resources can join and leave dynamically during training.

####**DataParallel - `torch.nn.DataParallel`**
    
The `DataParallel package` enables single-machine multi-GPU parallelism with the lowest coding hurdle. It only requires a one-line change to the application code. The tutorial of Data Parallelism shows an example https://pytorch.org/tutorials/beginner/blitz/data_parallel_tutorial.html.

Although DataParallel is very easy to use, it usually does not offer the best performance because `it replicates the model in every forward pass`, and `its single-process multi-thread parallelism` naturally `suffers from Python's GIL Global Interpreter Lock contention`.

To get better performance, consider using `DistributedDataParallel`.

####**Distributed Data-Parallel**

`Distributed Data-Parallel` Training (DDP) is a widely adopted `single-program multiple-data training paradigm`. With DDP, `the model is replicated on every process`, and `every model replica will be fed with a different set of input data samples` with the help of `DistributedSampler`.

DDP takes care of gradient communication to keep model replicas synchronized and overlaps it with the gradient computations to speed up training.

Compared to `DataParallel`, `DistributedDataParallel` requires one more step to set up, i.e., `calling init_process_group`. DDP uses `multi-process parallelism`, and `hence there is no GIL contention across model replicas`. In case of single machine, multi GPU setup, DDP launches one process per GPU where as each process has it own local copy of the model where as DP launches single process with multi threads parallelism. All the model replicas, optimizers, params are same in all the processes because of AllReduce synchronization but only thing that changes is the input data that is fed to each model replica per process. This is done with the help of  `DistributedSampler`. The sampler ensures that each process receives differents inputs from our dataloader to different processes. And this different chunks of data are concurrently processed by the different processes effectively unlike single GPU process training. This is the main idea behind DataParallel. Because of different inputs chunks to each replica, we get different params, gradients and ultimately different model for each process. To get same model in all processes,we perform synchronization during bacward pass. Note the synchorinization of gradients across all processes happens during loss.backward() step before updating the params using optimizer.step() using `bucketed ring AllReduce Synchornization algorithm` which gathers all replicas gradients and average them. Because of this all the model replcas will have same gradients and then the optimizer.step() will updates the params using these averaged gradients and hence all models will be same and doesnt result in different models in each process cuz of receiving different inputs.

Moreover, the model is broadcast at DDP construction time instead of in every forward pass, which also helps to speed up training. DDP is shipped with several performance optimization technologies. For a more in-depth explanation, refer to this paper (http://www.vldb.org/pvldb/vol13/p3005-li.pdf).

**DDP materials are listed below**:

- DDP notes https://pytorch.org/docs/stable/notes/ddp.html offer a starter example and some brief descriptions of its design and implementation. If this is your first time using DDP, start from this document.

- Getting Started with Distributed Data Parallel https://pytorch.org/tutorials/intermediate/ddp_tutorial.html explains some common problems with DDP training, including unbalanced workload, checkpointing, and multi-device models. Note that, DDP can be easily combined with single-machine multi-device model parallelism which is described in the Single-Machine Model Parallel Best Practices tutorial.https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html

- The Launching and configuring distributed data parallel applications document https://github.com/pytorch/examples/blob/main/distributed/ddp/README.md shows how to use the DDP launching script.

- The Shard Optimizer States With ZeroRedundancyOptimizer https://pytorch.org/tutorials/recipes/zero_redundancy_optimizer.html recipe demonstrates how ZeroRedundancyOptimizer helps to reduce optimizer memory footprint.

- The Distributed Training with Uneven Inputs Using the Join Context Manager tutorial https://pytorch.org/tutorials/advanced/generic_join.html walks through using the generic join context for distributed training with uneven inputs.

####**torch.distributed.elastic**
With the growth of the application complexity and scale, failure recovery becomes a requirement. Sometimes it is inevitable to hit errors like out-of-memory (OOM) when using DDP, but DDP itself cannot recover from those errors, and it is not possible to handle them using a standard try-except construct.

This is because DDP requires all processes to operate in a closely synchronized manner and all `AllReduce` communications launched in different processes must match. If one of the processes in the group throws an exception, it is likely to lead to desynchronization (mismatched AllReduce operations) which would then cause a crash or hang.

`torch.distributed.elastic` adds `fault tolerance` and the ability to make use of a dynamic pool of machines (elasticity).

####**Fully Sharded Data Parallel (FSDP)**
Training AI models at a large scale is a challenging task that requires a lot of compute power and resources. It also comes with considerable engineering complexity to handle the training of these very large models. PyTorch FSDP, released in PyTorch 1.11 makes this easier.`

**How FSDP works?**

In `DistributedDataParallel, (DDP) training`, **each process/ worker owns a replica of the model and processes a batch of data, finally it uses all-reduce to sum up gradients over different workers**. In DDP the model weights and optimizer states are replicated across all workers.

`FSDP` is a type of **data parallelism** that `shards model parameters, optimizer states and gradients across DDP ranks`.

When training with FSDP, the GPU memory footprint is smaller than when training with DDP across all workers. This makes the training of some very large models feasible by allowing larger models or batch sizes to fit on device.

This comes with the cost of increased communication volume. The communication overhead is reduced by internal optimizations like overlapping communication and computation.

https://pytorch.org/tutorials/intermediate/FSDP_tutorial.html?utm_source=distr_landing&utm_medium=FSDP_getting_started

One way to view FSDP’s sharding is to decompose the DDP gradient all-reduce into reduce-scatter and all-gather. Specifically, during the backward pass, FSDP reduces and scatters gradients, ensuring that each rank possesses a shard of the gradients. Then it updates the corresponding shard of the parameters in the optimizer step. Finally, in the subsequent forward pass, it performs an all-gather operation to collect and combine the updated parameter shards.

In [14]:
import torch
import os

os.cpu_count(), torch.cuda.device_count()
#torch.cuda.current_device(), torch.cuda.mem_get_info()

(2, 0)

In [5]:
#example to show the data creation logic below - for self reference
data = [(torch.rand(20), torch.rand(1)) for _ in range(10)]
len(data), torch.rand(20), torch.rand(1) #10 datasamples, each datasample has 10 features and 1 label

(10,
 tensor([0.0174, 0.5772, 0.4586, 0.6001, 0.4476, 0.4665, 0.4148, 0.4230, 0.4823,
         0.6494, 0.2060, 0.1267, 0.8235, 0.7276, 0.1596, 0.6896, 0.7913, 0.7325,
         0.9080, 0.6098]),
 tensor([0.7896]))

In [6]:
%load_ext autoreload
%autoreload 2

In [28]:
#Example code of usual pytorch training with single GPU
#we use this code as base for adding DDP to enable Distributed Data Parallel training

%%writefile single_gpy.py
import torch
from torch import nn
import torch.nn.functional as F
from torch.optim import SGD
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm

import argparse


#Create Custom Dummy dataset
class MyTinyDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)] #regression dataset

    def __len__(self):
        return self.size

    def __getitem__(self, index):
        data, label = self.data[index]
        return data, label

#Trainer class
class Trainer:
    def __init__(self,
                 model: torch.nn.Module,
                 dataloader: torch.utils.data.DataLoader,
                 optimizer: torch.optim.Optimizer,
                 save_every: int,
                 gpu_id: int):
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_dataloader = dataloader
        self.optimizer = optimizer
        self.save_every = save_every

    def _run_batch(self, x, y):
        self.optimizer.zero_grad()
        y_pred = self.model(x)
        loss = F.cross_entropy(y_pred, y)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}")
        for x, y in self.train_dataloader:
            x, y = x.to(self.gpu_id), y.to(self.gpu_id)
            self._run_batch(x, y)

    def _save_checkpoint(self, epoch):
        checkpoint_path = "checkpoint.pt"
        ckp = self.model.state_dict()
        torch.save(obj=ckp, f=checkpoint_path)
        print(f"Epoch {epoch} | Training checkpoint saved at {checkpoint_path}")

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            #save checkpoint
            if epoch % self.save_every == 0 or epoch == max_epochs-1:
                self._save_checkpoint(epoch)


#Getting and processing data
def load_train_objs():
    model = torch.nn.Linear(in_features=20, out_features=1) #simple model with just linear layer
    dataset = MyTinyDataset(size=2048) # load your dataset
    optimizer = SGD(params=model.parameters(), lr=1e-3)
    return dataset, model, optimizer

def prepare_dataloader(dataset, batch_size):
    train_dataloader = DataLoader(dataset=dataset,
                                  batch_size=batch_size,
                                  shuffle=True,
                                  pin_memory=True)
    return train_dataloader


#Main method to launch everything
def main(device, total_epochs, save_every, batch_size):
    #Get the dataset, model and optimizer
    dataset, model, optimizer = load_train_objs()
    train_dataloader = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_dataloader, optimizer, save_every, device)
    trainer.train(total_epochs)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="simple distributed training job")
    parser.add_argument("total_epochs", type=int, help="Total epochs to train the model")
    parser.add_argument("save_every", type=int, help="How often to save a snapshot or checkpoint")
    parser.add_argument("--batch_size" , default=32, type=int, help="Input batch size on each device (default: 32)")
    args = parser.parse_args()

    if torch.cuda.is_available():
        device = 0  #shorthand for cuda:0
    else: device = "cpu"
    main(device, args.total_epochs, args.save_every, args.batch_size)


Overwriting single_gpy.py


In [30]:
#excute the above code as python script
!python single_gpy.py 50 10

[GPUcpu] Epoch 0 | Batchsize: 32 | Steps: 64
Epoch 0 | Training checkpoint saved at checkpoint.pt
[GPUcpu] Epoch 1 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 2 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 3 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 4 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 5 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 6 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 7 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 8 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 9 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 10 | Batchsize: 32 | Steps: 64
Epoch 10 | Training checkpoint saved at checkpoint.pt
[GPUcpu] Epoch 11 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 12 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 13 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 14 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 15 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 16 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 17 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 18 | Batchsize: 32 | Steps: 64
[GPUcpu] Epoch 19 | Batchsize

Ref: https://github.com/pytorch/examples/blob/main/distributed/ddp-tutorial-series/multigpu.py

https://discuss.pytorch.org/t/use-distributed-data-parallel-correctly/82500/11

In [7]:
#Migration of code with single GPU training to single machine mutli GPU training i.e Distributed Training

%%writefile multigpu.py
import torch
from torch import nn
import torch.nn.functional as F
from torch.optim import SGD
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm

#-------------NEW------------------
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
#----------------------------------

import argparse
import os

#Create Custom Dummy dataset
class MyTinyDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)] #regression dataset

    def __len__(self):
        return self.size

    def __getitem__(self, index):
        data, label = self.data[index]
        return data, label

#-------------NEW------------------
def ddp_set_up(rank: int, world_size: int):
    """Set up environment variables and intialise the the process group
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    #setting up environment variables
    os.environ["MASTER_ADDR"] = "localhost" #MASTER_ADDR: refers to the ipaddress of the machine thats running the rank 0 process. If our setup is single machine with multi GPU setup, rank 0 process is gonna be on same machine, so ipaddr of our machine or localhost is set for MASTER_ADDR #It is called master because, this machine coordinates the communication across all of our processes.
    os.environ["MASTER_PORT"] = "12345" #MASTER_PORT: any free port on our machine
    #init_process_group - groups all of the processes running in all our GPUs allowing the processes to communicate in the group(typically each GPU runs one process)
    init_process_group(backend="nccl", #nncl - NVIDIA collective communication library used for distributed communciations across CUDA GPUs some other collective communication libraries are gloo, infinity band, mpi
                       rank=rank, #rank: unique identifier assigned to each process, ranges from 0 to world_size-1
                       world_size=world_size) #world_size : total number of processes in a group
    torch.cuda.set_device(rank)
#-------------NEW------------------

#Trainer class
class Trainer:
    def __init__(self,
                 model: torch.nn.Module,
                 dataloader: torch.utils.data.DataLoader,
                 optimizer: torch.optim.Optimizer,
                 save_every: int,
                 gpu_id: int):
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id) #needed otherwise model reside on cpu device
        #-------------NEW------------------
        self.model = DDP(module=model, device_ids=[self.gpu_id]) #wrap the model with DDP before training, device_ids is a single list that contains gpu_id that model lives on
        #-------------NEW------------------
        self.train_dataloader = dataloader
        self.optimizer = optimizer
        self.save_every = save_every


    def _run_batch(self, x, y):
        self.optimizer.zero_grad()
        y_pred = self.model(x)
        loss = F.cross_entropy(y_pred, y)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}")
        #-------------NEW------------------
        self.train_dataloader.set_epoch(epoch) #it’s necessary to use set_epoch to guarantee a different shuffling order:In distributed mode, calling the set_epoch() method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.
        #-------------NEW------------------
        for x, y in self.train_dataloader:
            x, y = x.to(self.gpu_id), y.to(self.gpu_id)
            self._run_batch(x, y)

    def _save_checkpoint(self, epoch):
        checkpoint_path = "checkpoint.pt"
        #-------------NEW------------------
        #ckp = self.model.state_dict() #removed
        ckp = self.model.module.state_dict() #accessing the model state_dict with model.module.state_dict()
        #-------------NEW------------------
        torch.save(obj=ckp, f=checkpoint_path)
        print(f"Epoch {epoch} | Training checkpoint saved at {checkpoint_path}")

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            #save checkpoint
            #-------------NEW------------------
            #if epoch % self.save_every == 0 or epoch == max_epochs-1: #removed
            if self.gpu_id == 0 and (epoch % self.save_every == 0 or epoch == max_epochs-1): #DDP works by launching a process on each GPU and each process is gonna initialize an object of Trainer class. Earlier code saves checkpoint in all processes but as all the checkpoints will be same on all processes - redundacy. So to avoid checkpoint redundacy, save checkpoint only from rank 0 process.
            #-------------NEW------------------
                self._save_checkpoint(epoch)


#Getting and processing data
def load_train_objs():
    model = torch.nn.Linear(in_features=20, out_features=1) #simple model with just linear layer
    dataset = MyTinyDataset(size=2048) # load your dataset
    optimizer = SGD(params=model.parameters(), lr=1e-3)
    return dataset, model, optimizer

def prepare_dataloader(dataset, batch_size):
    train_dataloader = DataLoader(dataset=dataset,
                                  batch_size=batch_size,
                                  #-------------NEW------------------
                                  #shuffle=True,
                                  sampler=DistributedSampler(dataset), # Add sampler=DistributedDataSampler to DataLoader to send non overlapping samples of chunks of the input batch across all of my GPUs, turn shuffle as False as DistributedDataSampler already handles it
                                  shuffle=False,
                                  #-------------NEW------------------
                                  pin_memory=True)
    return train_dataloader

#update main() to initialize and destroy process groups, also takes rank and world size as args, replace device to rank
#-------------NEW------------------
#def main(device, total_epochs, save_every, batch_size):
def main(rank: int, world_size: int, total_epochs, save_every, batch_size):
    ddp_set_up(rank=rank, world_size=world_size)
#-------------NEW------------------
    dataset, model, optimizer = load_train_objs()
    train_dataloader = prepare_dataloader(dataset, batch_size)
    #-------------NEW------------------
    # trainer = Trainer(model, train_dataloader, optimizer, save_every, device)
    #trainer.train(total_epochs)
    trainer = Trainer(model, train_dataloader, optimizer, save_every, rank)
    trainer.train(total_epochs)
    destroy_process_group() #destrpy the process group
    #-------------NEW------------------

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="simple distributed training job")
    parser.add_argument("total_epochs", type=int, help="Total epochs to train the model")
    parser.add_argument("save_every", type=int, help="How often to save a snapshot or checkpoint")
    parser.add_argument("--batch_size" , default=32, type=int, help="Input batch size on each device (default: 32)")
    args = parser.parse_args()

    #-------------NEW------------------
    #device = 0 if torch.cuda.is_available() else "cpu" #shorthand for cuda:0
    # main(device, args.total_epochs, args.save_every, args.batch_size)
    world_size = torch.cuda.device_count() #not device agnostic- so need to make sure to use GPU runtime, make it device agnostic to handle cases when no gpu is available
    mp.spawn(fn=main, args=(world_size, args.total_epochs, args.save_every, args.batch_size), nprocs=world_size) #use mp.spawn() from torch.multiprocessing that takes in a function(here main()) and spawns that across all of our processes in the distribute group, It takes args=(world_size, total_epochs, save_every() as args and nprocs that referes to number of processes which is nprocs=world_size. No need to send rank as parameter as mp.spawn() automatically assigns rank mp.spawn will automatically provide the rank as the first argument to the target function to get rank for each process of main
    #-------------NEW------------------

Overwriting multigpu.py


In [27]:
torch.cuda.device_count()

1

In [8]:
!python multigpu.py 50 10

#### Fault tolerance in distributed multi GPU training using `torchrun`

In distributed training, a single process failure can disrupt the entire training job. Since the susceptibility for failure can be higher here, making your training script robust is particularly important here. You might also prefer your training job to be elastic, for example, compute resources can join and leave dynamically over the course of the job.

PyTorch offers a utility called torchrun that provides fault-tolerance and elastic training. When a failure occurs, torchrun logs the errors and attempts to automatically restart all the processes from the last saved “snapshot” of the training job.

The snapshot saves more than just the model state; it can include details about the number of epochs run, optimizer states or any other stateful attribute of the training job necessary for its continuity.

**Why use torchrun?**

torchrun handles the minutiae of distributed training so that you don’t need to. For instance,

You don’t need to set environment variables or explicitly pass the rank and world_size; torchrun assigns this along with several other environment variables.

No need to call mp.spawn in your script; you only need a generic main() entry point, and launch the script with torchrun. This way the same script can be run in non-distributed as well as single-node and multinode setups.

Gracefully restarting training from the last saved training snapshot.

###**Graceful restarts**
For graceful restarts, you should structure your train script like:

```
def main():
  load_snapshot(snapshot_path)
  initialize()
  train()

def train():
  for batch in iter(dataset):
    train_step(batch)

    if should_checkpoint:
      save_snapshot(snapshot_path)

```

If a failure occurs, torchrun will terminate all the processes and restart them. Each process entry point first loads and initializes the last saved snapshot, and continues training from there. So at any failure, you only lose the training progress from the last saved snapshot.

In elastic training, whenever there are any membership changes (adding or removing nodes), torchrun will terminate and spawn processes on available devices. Having this structure ensures your training job can continue without manual intervention.

#### **Diff for multigpu.py v/s multigpu_torchrun.py**
**Process group initialization**
torchrun assigns RANK and WORLD_SIZE automatically, among other envvariables

In [2]:
import os
os.environ

environ{'SHELL': '/bin/bash',
        'NV_LIBCUBLAS_VERSION': '12.2.5.6-1',
        'NVIDIA_VISIBLE_DEVICES': 'all',
        'COLAB_JUPYTER_TRANSPORT': 'ipc',
        'NV_NVML_DEV_VERSION': '12.2.140-1',
        'NV_CUDNN_PACKAGE_NAME': 'libcudnn8',
        'CGROUP_MEMORY_EVENTS': '/sys/fs/cgroup/memory.events /var/colab/cgroup/jupyter-children/memory.events',
        'NV_LIBNCCL_DEV_PACKAGE': 'libnccl-dev=2.19.3-1+cuda12.2',
        'NV_LIBNCCL_DEV_PACKAGE_VERSION': '2.19.3-1',
        'VM_GCE_METADATA_HOST': '169.254.169.253',
        'HOSTNAME': 'e83b5f9a121f',
        'LANGUAGE': 'en_US',
        'TBE_RUNTIME_ADDR': '172.28.0.1:8011',
        'GCE_METADATA_TIMEOUT': '3',
        'NVIDIA_REQUIRE_CUDA': 'cuda>=12.2 brand=tesla,driver>=470,driver<471 brand=unknown,driver>=470,driver<471 brand=nvidia,driver>=470,driver<471 brand=nvidiartx,driver>=470,driver<471 brand=geforce,driver>=470,driver<471 brand=geforcertx,driver>=470,driver<471 brand=quadro,driver>=470,driver<471 brand=quadror

In [14]:
%%writefile multigpu_torchrun.py
import torch
from torch import nn
import torch.nn.functional as F
from torch.optim import SGD
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm

import argparse
import os

import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from torch.distributed import init_process_group, destroy_process_group

#Create Custom Dummy dataset
class MyTinyDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)] #regression dataset

    def __len__(self):
        return self.size

    def __getitem__(self, index):
        data, label = self.data[index]
        return data, label

def ddp_set_up():
    init_process_group(backend="nccl")
    torch.cuda.set_device(os.environ["LOCAL_RANK"])

#Trainer class
class Trainer:
    def __init__(self,
                 model: torch.nn.Module,
                 dataloader: torch.utils.data.DataLoader,
                 optimizer: torch.optim.Optimizer,
                 save_every: int,
                 snapshot_path: str):
        self.gpu_id = int(os.environ["LOCAL_RANK"])
        self.model = model.to(self.gpu_id)
        self.train_dataloader = dataloader
        self.optimizer = optimizer
        self.save_every = save_every
        self.epochs_runs = 0 #useful for resuming
        #Snapshot check
        self.snapshot_path = snapshot_path
        if os.path.exists(self.snapshot_path):
            print(f"Snapshot found at {self.snapshot_path}... Loading snapshot")
            self._load_snapshot(self.snapshot_path)
        self.ddp_model = DDP(self.model, device_ids=[self.gpu_id])

    def _load_snapshot(self, snapshot_path):
        #Loading snapshot
        device_loc = f"cuda:{self.gpu_id}"
        snapshot = torch.load(f=snapshot_path, map_location=device_loc)
        self.model.load_state_dict(snapshot["MODEL_STATE"])
        self.epochs_runs = snapshot["EPOCHS_RUN"]
        print(f"Resuming training from snapshot at Epochs: {self.epochs_runs}")

    def _run_batch(self, x, y):
        self.optimizer.zero_grad()
        y_pred = self.model(x)
        loss = F.cross_entropy(y_pred, y)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}")
        self.train_dataloader.sampler.set_epoch(epoch)
        for x, y in self.train_dataloader:
            x, y = x.to(self.gpu_id), y.to(self.gpu_id)
            self._run_batch(x, y)

    def _save_snapshot(self, epoch):
        snapshot = {"MODEL_STATE": self.model.module.state_dict(),
                    "EPOCHS_RUN": epoch}
        torch.save(obj=snapshot, f=self.snapshot_path)
        print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")

    def train(self, max_epochs: int):
        for epoch in range(self.epochs_runs, max_epochs):
            self._run_epoch(epoch)
            #save snapshot
            if self.gpu_id == 0 and (epoch % self.save_every == 0 or epoch == max_epochs-1):
                self._save_snapshot(epoch)

#Getting and processing data
def load_train_objs():
    model = torch.nn.Linear(in_features=20, out_features=1) #simple model with just linear layer
    dataset = MyTinyDataset(size=2048) # load your dataset
    optimizer = SGD(params=model.parameters(), lr=1e-3)
    return dataset, model, optimizer

def prepare_dataloader(dataset, batch_size):
    train_dataloader = DataLoader(dataset=dataset,
                                  batch_size=batch_size,
                                  shuffle=False,
                                  sampler=DistributedSampler(dataset),
                                  pin_memory=True)
    return train_dataloader


#Main method to launch everything
def main(total_epochs, save_every, batch_size, snapshot_path: str="snapshot.pt"):
    ddp_set_up()
    #Get the dataset, model and optimizer
    dataset, model, optimizer = load_train_objs()
    train_dataloader = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_dataloader, optimizer, save_every, snapshot_path)
    trainer.train(total_epochs)
    destroy_process_group()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="simple distributed training job with torchrun")
    parser.add_argument("total_epochs", type=int, help="Total epochs to train the model")
    parser.add_argument("save_every", type=int, help="How often to save a snapshot or checkpoint")
    parser.add_argument("--batch_size" , default=32, type=int, help="Input batch size on each device (default: 32)")
    args = parser.parse_args()

    main(args.total_epochs, args.save_every, args.batch_size)


Overwriting multigpu_torchrun.py


`--standalone` represents that this is a single machine/single node set up

`--nproc_per_node` represents how many GPUs/processes per machine/node to use. This is gonna be our local world_size. As I have single GPU in this machine, I set it to 1 using `--nproc_per_node 1`. We can also set it as `--nproc_per_node gpu` which lets torchrun know to use all available GPU's

In [15]:
!torchrun --standalone --nproc_per_node 1 multigpu_torchrun.py 50 10

Traceback (most recent call last):
  File "/content/multigpu_torchrun.py", line 123, in <module>
    main(args.total_epochs, args.save_every, args.batch_size)
  File "/content/multigpu_torchrun.py", line 108, in main
    ddp_set_up()
  File "/content/multigpu_torchrun.py", line 30, in ddp_set_up
    init_process_group(backend="nccl")
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/c10d_logger.py", line 74, in wrapper
    func_return = func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/distributed_c10d.py", line 1148, in init_process_group
    default_pg, _ = _new_process_group_helper(
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/distributed_c10d.py", line 1279, in _new_process_group_helper
    backend_class = ProcessGroupNCCL(backend_prefix_store, group_rank, group_size, pg_options)
RuntimeError: ProcessGroupNCCL is only supported with GPUs, no GPUs found!
[2023-12-26 18:49:14,895] torch.distributed.elastic.mul

Ref: https://pytorch.org/tutorials/intermediate/ddp_series_multinode.html

In [17]:
%%writefile multinode_torchrun.py
import torch
from torch import nn
import torch.nn.functional as F
from torch.optim import SGD
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm

import argparse
import os

import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from torch.distributed import init_process_group, destroy_process_group

#Create Custom Dummy dataset
class MyTinyDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)] #regression dataset

    def __len__(self):
        return self.size

    def __getitem__(self, index):
        data, label = self.data[index]
        return data, label

def ddp_set_up():
    init_process_group(backend="nccl")
    torch.cuda.set_device(os.environ["LOCAL_RANK"])

#Trainer class
class Trainer:
    def __init__(self,
                 model: torch.nn.Module,
                 dataloader: torch.utils.data.DataLoader,
                 optimizer: torch.optim.Optimizer,
                 save_every: int,
                 snapshot_path: str):
        self.local_rank = int(os.environ["LOCAL_RANK"])
        self.global_rank = int(os.environ["RANK"])
        self.model = model.to(self.local_rank)
        self.train_dataloader = dataloader
        self.optimizer = optimizer
        self.save_every = save_every
        self.epochs_runs = 0 #useful for resuming
        #Snapshot check
        self.snapshot_path = snapshot_path
        if os.path.exists(self.snapshot_path):
            print(f"Snapshot found at {self.snapshot_path}... Loading snapshot")
            self._load_snapshot(self.snapshot_path)
        self.ddp_model = DDP(self.model, device_ids=[self.local_rank])

    def _load_snapshot(self, snapshot_path):
        #Loading snapshot
        device_loc = f"cuda:{self.local_rank}"
        snapshot = torch.load(f=snapshot_path, map_location=device_loc)
        self.model.load_state_dict(snapshot["MODEL_STATE"])
        self.epochs_runs = snapshot["EPOCHS_RUN"]
        print(f"Resuming training from snapshot at Epochs: {self.epochs_runs}")

    def _run_batch(self, x, y):
        self.optimizer.zero_grad()
        y_pred = self.model(x)
        loss = F.cross_entropy(y_pred, y)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        print(f"[GPU{self.local_rank}] Epoch {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}")
        self.train_dataloader.sampler.set_epoch(epoch)
        for x, y in self.train_dataloader:
            x, y = x.to(self.local_rank), y.to(self.local_rank)
            self._run_batch(x, y)

    def _save_snapshot(self, epoch):
        snapshot = {"MODEL_STATE": self.model.module.state_dict(),
                    "EPOCHS_RUN": epoch}
        torch.save(obj=snapshot, f=self.snapshot_path)
        print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")

    def train(self, max_epochs: int):
        for epoch in range(self.epochs_runs, max_epochs):
            self._run_epoch(epoch)
            #save snapshot
            if self.local_rank == 0 and (epoch % self.save_every == 0 or epoch == max_epochs-1):
                self._save_snapshot(epoch)

#Getting and processing data
def load_train_objs():
    model = torch.nn.Linear(in_features=20, out_features=1) #simple model with just linear layer
    dataset = MyTinyDataset(size=2048) # load your dataset
    optimizer = SGD(params=model.parameters(), lr=1e-3)
    return dataset, model, optimizer

def prepare_dataloader(dataset, batch_size):
    train_dataloader = DataLoader(dataset=dataset,
                                  batch_size=batch_size,
                                  shuffle=False,
                                  sampler=DistributedSampler(dataset),
                                  pin_memory=True)
    return train_dataloader


#Main method to launch everything
def main(total_epochs, save_every, batch_size, snapshot_path: str="snapshot.pt"):
    ddp_set_up()
    #Get the dataset, model and optimizer
    dataset, model, optimizer = load_train_objs()
    train_dataloader = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_dataloader, optimizer, save_every, snapshot_path)
    trainer.train(total_epochs)
    destroy_process_group()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="simple distributed training job with torchrun")
    parser.add_argument("total_epochs", type=int, help="Total epochs to train the model")
    parser.add_argument("save_every", type=int, help="How often to save a snapshot or checkpoint")
    parser.add_argument("--batch_size" , default=32, type=int, help="Input batch size on each device (default: 32)")
    args = parser.parse_args()

    main(args.total_epochs, args.save_every, args.batch_size)


Overwriting multinode_torchrun.py


In [18]:
!torchrun --nproc_per_node 1 --nnodes 1 --node_rank 0 --rdzv_id 456 --rdzv_backend c10d --rdzv_endpoint=172.31.43.139:29603 multinode_torchrun.py 50 10

[E socket.cpp:922] [c10d] The client socket has timed out after 60s while trying to connect to (172.31.43.139, 29603).
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/elastic/rendezvous/c10d_rendezvous_backend.py", line 155, in _create_tcp_store
    store = TCPStore(
TimeoutError: The client socket has timed out after 60s while trying to connect to (172.31.43.139, 29603).

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/torchrun", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 346, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/run.py", line 806, in main
    run(args)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/run.py", line 797, in run
    elastic_launch(
  File "/u

###**Distributed Training / Distributed Model Parallel Training**

####**RPC-Based Distributed Training**
Many training paradigms do not fit into data parallelism.

e.g., parameter server paradigm, distributed pipeline parallelism, reinforcement learning applications with multiple observers or agents, etc.

`torch.distributed.rpc` aims at supporting general distributed training scenarios.

`torch.distributed.rpc` has 4 main pillars:

- `RPC` supports remote execution i.e running a given arbitrary user function or modules remotely i.e on a remote worker. Thats the main  motive of any RPC system.

- `RRef` stands for remote reference. It allows us to efficiently access and reference remote data objects.It basically  helps to manage the lifetime of a remote object. The reference counting protocol is presented in the RRef notes.https://pytorch.org/docs/stable/rpc/rref.html#remote-reference-protocol

- `Distributed Autograd` extends the Pytorch autograd engine beyond machine boundaries. Originally Pytorch's Aautograd is a per process/worker/device process. Distributed Autograd extends it beyond machine/process boundaries. Please refer to Distributed Autograd Design for more details.https://pytorch.org/docs/stable/rpc/distributed_autograd.html#distributed-autograd-design

- `Distributed Optimizer` automatically reaches out to all participating workers to update parameters using gradients computed by the distributed autograd engine.

**RPC Tutorials are listed below:**

- The Getting Started with Distributed RPC Framework tutorial first uses a simple Reinforcement Learning (RL) example to demonstrate RPC and RRef. Then, it applies a basic distributed model parallelism to an RNN example to show how to use distributed autograd and distributed optimizer. https://pytorch.org/tutorials/intermediate/rpc_tutorial.html

- The Implementing a `Parameter Server Using Distributed RPC Framework` tutorial borrows the spirit of HogWild! training https://people.eecs.berkeley.edu/~brecht/papers/hogwildTR.pdf and applies it to an asynchronous parameter server (PS) training application. https://pytorch.org/tutorials/intermediate/rpc_param_server_tutorial.html

- The `Distributed Pipeline Parallelism Using RPC` tutorial https://pytorch.org/tutorials/intermediate/dist_pipeline_parallel_tutorial.html extends the single-machine pipeline parallel example (presented in Single-Machine Model Parallel Best Practices https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html) to a distributed environment and shows how to implement it using RPC.

- The Implementing `Batch RPC Processing Using Asynchronous Executions` tutorial https://pytorch.org/tutorials/intermediate/rpc_async_execution.html demonstrates how to implement RPC batch processing using the `@rpc.functions.async_execution` decorator, which can help speed up inference and training. It uses RL and PS examples similar to those in the above tutorials 1 and 2.

- The Combining Distributed DataParallel with Distributed RPC Framework tutorial https://pytorch.org/tutorials/advanced/rpc_ddp_tutorial.htmldemonstrates how to combine DDP with RPC to train a model using distributed data parallelism combined with distributed model parallelism.