[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/DoranLyong/Awesome-Tensor-Architecture/blob/main/pytorch_reference/simple_reference/06_PyTorch_Acceleration_and_Optimization/02_MultiGPU_Distributed.ipynb)

# PyTorch on Multiple GPUs (Single Machine)
1. data parallel processing 
2. model parallel processing 

In [2]:
import torch 
import torch.nn as nn 
import torch.optim as optim 
import torch.nn.functional as F 
from torchvision.models import vgg16

In [3]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model = vgg16(pretrained=False)

In [4]:
from torchvision import transforms as T 
from torch.utils.data import DataLoader 
from torchvision.datasets import CIFAR10


"""
    Dataset class returns a dataset object that includes : 
        - data and 
        - information about the data 
"""

train_T = T.Compose([ T.RandomCrop(32, padding=4),
                      T.RandomHorizontalFlip(), 
                      T.ToTensor(), 
                      T.Normalize( mean= (0.4914, 0.4822, 0.4465),
                                   std=(0.2023, 0.1994, 0.2010)),
                    ])

train_data = CIFAR10(   root= "./train", 
                        train=True, 
                        download=True, 
                        transform=train_T, # set the transform parameter when creating the dataset 
                    )

trainloader = DataLoader( train_data,  # dataset object 
                          batch_size=16, 
                          shuffle=True, 
                          num_workers=3,
                        )                    

Files already downloaded and verified


## Data Parallel Processing (p.160)
* ```multi-threaded``` approach uinsg ```nn.DataParallel```
* ```multi-process``` approach using DDP (preferred) ★★★


### The multithreaded approach using nn.DataParallel 

In [5]:
# (ref) https://pytorch.org/tutorials/beginner/blitz/data_parallel_tutorial.html
if torch.cuda.device_count() > 1: 
    print("This machine has", torch.cuda.device_count(), "GPUs available." )

    model =  nn.DataParallel(model)

model.to(device)

This machine has 2 GPUs available.


DataParallel(
  (module): VGG(
    (features): Sequential(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU(inplace=True)
      (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (3): ReLU(inplace=True)
      (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (6): ReLU(inplace=True)
      (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (8): ReLU(inplace=True)
      (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (11): ReLU(inplace=True)
      (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (13): ReLU(inplace=True)
      (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (15): ReLU(inplace=True)
      

This multhithreaded approach is the simplest way to run on multiple GPUs; however, <br/>
the ```multiprocess``` approach usually performs better, even on a single machine.

### The multiprocess approach using DDP (preferred, 이걸 써라 ★★★) - ([ref](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html))
* distributed data processing (DDP)
* it can be used with ```multiple processes``` on a ```single machine``` or across ```multiple machines```. 


Four steps:
1. Initialize a process group using ```torch.distributed```.
2. Create a local model using ```torch.nn.to()```.
3. Wrap the model with DDP using ```torch.nn.parallel```.
4. Spawn processes using ```torch.multiprocessing```.

In [6]:
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

In [7]:
# distributed training loop for
def dist_training_loop( rank, world_size, dataloader, model, loss_fn, optimizer): 

    dist.init_process_group("gloo", rank=rank, world_size=world_size) # initialize the process group.
    
    model = model.to(rank)  # move the model to a GPU with the ID of rank. 
    ddp_model = DDP(model, device_ids=[rank]) # Wrap the model in DDP 

    optim = optimizer(ddp_model.parameters(), lr=1e-3)


    n_epochs = 10 
    for epochs in range(n_epochs): 
        for input, labels in dataloader: 

            input = input.to(rank)
            labels = labels.to(rank) # move inputs and labels to the GPU with the ID of rank. 

            outputs = ddp_model(input) # call the DDP model for the forward pass 

            loss = loss_fn(outputs, labels)

            optim.zero_grad() 
            loss.backward()
            optim.step() 
    
    dist.destroy_process_group()  # cealnup 


In [None]:
# === run main === # 
world_size = 2 

mp.spawn(   dist_training_loop,
            args=(world_size,),
            nprocs=world_size,
            join=True)

If your model does not fit into a single GPU or you are using smaller batch sizes, <br/> 
you may consider using model parallel processing instead of data parallel processing. 

## Model Parallel Processing (p.164) - ([ref](https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html))

In [8]:
from torchvision.models.resnet import ResNet, Bottleneck

In [9]:
num_classes = 10  # CIFAR-10


class ModelParallelResNet50(ResNet):
    def __init__(self, *args, **kwargs):
        super(ModelParallelResNet50, self).__init__( Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)

        self.seq1 = nn.Sequential( 
            self.conv1,
            self.bn1,
            self.relu,
            self.maxpool,

            self.layer1,
            self.layer2
        ).to('cuda:0')

        self.seq2 = nn.Sequential(
            self.layer3,
            self.layer4,
            self.avgpool,
        ).to('cuda:1')

        self.fc.to('cuda:1')

    def forward(self, x):
        x = self.seq2(self.seq1(x).to('cuda:1'))
        return self.fc(x.view(x.size(0), -1))

In [10]:
model = ModelParallelResNet50() 
loss_fn = nn.CrossEntropyLoss()  
optimizer = optim.SGD(model.parameters(), lr=1e-3)


n_epochs = 3
for epoch in range(n_epochs):

    model.train()
    batch_loss = 0.0 

    for input, labels in trainloader: 
        input = input.to("cuda:0")
        labels = labels.to("cuda:1") 

        outputs = model(input) 

        loss = loss_fn(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # === accumulate batch loss 
        batch_loss += loss.item()         

    print(f" Epoch: {epoch}, Avg_loss: {batch_loss / len(trainloader)}")


 Epoch: 0, Avg_loss: 2.26372942527771
 Epoch: 1, Avg_loss: 2.058728328475952
 Epoch: 2, Avg_loss: 1.9407443102645874


***
## Combined DDP and Model Parallel Processing (p.167)