# Data Parallelism in PyTorch

This notebook experiments with PyTorch’s DataParallel Module, which implements Synchronous SGD across multiple GPUs. It involves training a ResNet-18 model on the CIFAR10 dataset using multiple GPUs and analyzing the performance and scalability.

## Overview
The key steps involve setting up the DataLoader with CIFAR10 dataset transformations, training the model on different GPUs, measuring training time, analyzing scalability, and calculating communication bandwidth utilization.

## Procedure
- **Data Loading and Transformations**: Load CIFAR10 dataset with random cropping, horizontal flipping, and normalization transformations.
- **Training Setup**: Implement ResNet-18 model training using DataParallel on multiple GPUs.
- **Single GPU Training**: Measure training time for various batch sizes on a single GPU.
- **Multi-GPU Training**: Measure training time and speedup on 2 and 4 GPUs, and analyze the type of scaling.
- **Computation and Communication Time**: Calculate and report the time spent in computation and communication for multi-GPU setups.
- **Bandwidth Utilization**: Calculate communication bandwidth utilization using the all-reduce algorithm.

In [1]:
'''ResNet in PyTorch.

For Pre-activation ResNet, see 'preact_resnet.py'.

Reference:
[1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
    Deep Residual Learning for Image Recognition. arXiv:1512.03385
'''
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch
import time
from tqdm import tqdm
import warnings 
from IPython.display import clear_output
warnings.filterwarnings("ignore")


In [2]:


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion *
                               planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])


def ResNet34():
    return ResNet(BasicBlock, [3, 4, 6, 3])


def ResNet50():
    return ResNet(Bottleneck, [3, 4, 6, 3])


def ResNet101():
    return ResNet(Bottleneck, [3, 4, 23, 3])


def ResNet152():
    return ResNet(Bottleneck, [3, 8, 36, 3])


def test():
    net = ResNet18()
    y = net(torch.randn(1, 3, 32, 32))
    print(y.size())

# test()


 We are going to experiment with PyTorch’s DataParallel Module, which is PyTorch’s Synchronous SGD
 implementation across a number of GPUs on the same server. In particular, we will train ResNet-18 implementation from https://github.com/kuangliu/pytorch-cifar with num workers=2, running up to 4 GPUs
 with the DataParallel (DP) Module. Use SGD optimizers with 0.1 as the learning rate, momentum 0.9, and
 weight decay 5e-4. For this question, you need to do experiment with multiple GPUs on the same server.
 You may need to execute this on the NYU Greene Cluster.
 Create a PyTorch program with a DataLoader that loads the images and the related labels from the torchvision CIFAR10 dataset. Import the CIFAR10 dataset for the torchvision package, with the following sequence

 of transformations:
 - Random cropping, with size 32x32 and padding 4
 - Random horizontal flipping with a probability 0.5
 - Normalize each image’s RGB channel with mean(0.4914, 0.4822, 0.4465) and variance (0.2023, 0.1994,
 0.2010)

 The DataLoader for the training set uses a minibatch size of 128 and 3 IO processes (i.e., num workers=2).
 The DataLoader for the testing set uses a minibatch size of 100 and 3 IO processes (i.e., num workers =2).
 Create a main function that creates the DataLoaders for the training set and the neural network.

In [3]:
from torchvision import transforms, datasets, models, utils
from torch.utils.data import DataLoader
from torch.optim import SGD
import numpy as np
import torch
import time

num_workers = 2
momentum = 0.9
weight_decay = 5e-4

transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(p=.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=np.sqrt([0.2023, 0.1994, .2010]))
])

trainset = datasets.CIFAR10(root='./data',
                            train=True,
                            download=True,
                            transform=transform)

testset = datasets.CIFAR10(root='./data',
                            train=False,
                            download=True,
                            transform=transform)


trainloader = DataLoader(trainset,
                         batch_size=128,
                         shuffle=True,
                         num_workers=num_workers)



def load_cifar10():
    trainloader = DataLoader(trainset,
                             batch_size=128,
                             shuffle=True,

                             num_workers=num_workers)

    testloader = DataLoader(testset,
                            batch_size=100,
                            shuffle=False,
                            num_workers=num_workers)

    return trainloader, testloader


Files already downloaded and verified
Files already downloaded and verified


 1. Measure how long it takes to complete 1 epoch of training using different batch sizes on a single GPU.
 Start from batch size 32, increase by 4-fold for each measurement (i.e., 32, 128, 512 ...) until single
 GPU memory cannot hold the batch size. For each run, run 2 epochs, the first epoch is used to warm
 up the CPU/GPU cache; and you should report the training time (excluding data I/O; but including
 data movement from CPU to GPU, gradients calculation and weights update) based on the 2nd epoch
 training. (5)

In [4]:

from tqdm import tqdm
import warnings 
warnings.filterwarnings("ignore")
from IPython.display import clear_output

model = models.resnet18(num_classes=10)

device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
model = model.to(device)


criterion = torch.nn.CrossEntropyLoss()
optimizer = SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)


def train_one_epoch(loader):
    model.train()
    start_time = time.time()
    for i, (inputs, labels) in enumerate(tqdm(loader), 0):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    end_time = time.time()
    return end_time - start_time


batch_size = 32
results = []

while True:
    try:
        trainloader = DataLoader(trainset,
                                batch_size=batch_size,
                                shuffle=True,
                                num_workers=num_workers)
        time_taken = train_one_epoch(trainloader)
        results.append((batch_size, time_taken))
        batch_size *= 4
    except:
        clear_output(wait=True)
        print(f"Max GPU Memory reached at batch size {batch_size}")
        break

tqdm.write("Results:")
print(f"Max GPU Memory reached at batch size {batch_size}")
for batch_size, time_taken in tqdm(results):
    tqdm.write(f"Batch size: {str(batch_size):6s}, Time taken: {round(time_taken, 2):4s} seconds")



100%|██████████| 1563/1563 [00:13<00:00, 113.45it/s]
100%|██████████| 391/391 [00:07<00:00, 54.70it/s]
100%|██████████| 98/98 [00:07<00:00, 13.95it/s]
100%|██████████| 25/25 [00:07<00:00,  3.49it/s]
100%|██████████| 7/7 [00:07<00:00,  1.04s/it]
100%|██████████| 2/2 [00:10<00:00,  5.03s/it]
100%|██████████| 1/1 [00:15<00:00, 15.27s/it]
100%|██████████| 1/1 [00:14<00:00, 14.62s/it]
100%|██████████| 1/1 [00:14<00:00, 14.59s/it]
100%|██████████| 1/1 [00:14<00:00, 14.64s/it]
100%|██████████| 1/1 [00:14<00:00, 14.76s/it]
100%|██████████| 1/1 [00:15<00:00, 15.27s/it]
100%|██████████| 1/1 [00:17<00:00, 17.20s/it]
100%|██████████| 1/1 [00:26<00:00, 26.34s/it]
100%|██████████| 1/1 [01:00<00:00, 60.39s/it]
100%|██████████| 1/1 [03:16<00:00, 196.56s/it]
  0%|          | 0/1 [00:00<?, ?it/s]

# 4.1
**Note:** I ran the above code on a single NVIDIA A100 gpu with 40gb of memory such that GPU memory utilization never surpassed total GPU Memory. This is due to the single GPU having enough memory to house the entire epoch at once utilizing at most 83.3% GPU memory. 

```text
Batch size: 32    , Time taken: 13.55 seconds
Batch size: 128   , Time taken: 7.62  seconds
Batch size: 512   , Time taken: 7.33  seconds
Batch size: 2048  , Time taken: 7.57  seconds
Batch size: 8192  , Time taken: 7.97  seconds
Batch size: 32768 , Time taken: 10.38 seconds
Batch size: 131072, Time taken: 15.27 seconds
```

 2. Measure running time with batch size per GPU you used in part 1 (i.e., 32, 128, ...) on 2 GPUs and
 4 GPUs and calculate speedup for each setup. Again, for each setup, run 2 epochs, and only measure
 the 2nd epoch. When measuring speedup, one should include all the training components (e.g., data
 loading, cpu-gpu time, compute time). (5).
 Expected Answer: Table 1 records the training time and speedup for different batch sizes up to 4 GPUs.
 Comment on which type of scaling we are measuring: weak-scaling or strong-scaling? Comment on if
 the other type of scaling was used to speed up the number will be better or worse than what you are
 measuring.

In [6]:

def train_one_epoch(loader, model, criterion, optimizer):
    model.train()
    start_time = time.time()
    for i, (inputs, labels) in enumerate(tqdm(loader), 0):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    end_time = time.time()
    return end_time - start_time

def measure_speedup(num_gpus, batch_sizes):
    results = []
    for batch_size in batch_sizes:
        model = models.resnet18(num_classes=10)
        if num_gpus > 1:
            model = torch.nn.DataParallel(model, device_ids=list(range(num_gpus)))
        model = model.to(device)

        criterion = torch.nn.CrossEntropyLoss()
        optimizer = SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)

        trainloader = DataLoader(trainset,
                                 batch_size=batch_size,
                                 shuffle=True,
                                 num_workers=num_workers)

        train_one_epoch(trainloader, model, criterion, optimizer)

        time_taken = train_one_epoch(trainloader, model, criterion, optimizer)
        results.append((batch_size, time_taken))

    return results



# Measure speedup for 2 and 4 GPUs
batch_sizes = [32, 128, 512, 2048, 8192] 

speedup_2_gpus = measure_speedup(num_gpus=2, batch_sizes=batch_sizes)
speedup_4_gpus = measure_speedup(num_gpus=4, batch_sizes=batch_sizes)


print("Speedup results for 2 GPUs:")
for batch_size, time_taken in speedup_2_gpus:
    print(f"Batch size: {str(batch_size)+',':<5s} Time taken: {str(round(time_taken, 2)):>4s} seconds")

print("\nSpeedup results for 4 GPUs:")
for batch_size, time_taken in speedup_4_gpus:
    print(f"Batch size: {str(batch_size)+',':<5s} Time taken: {str(round(time_taken, 2)):>4s} seconds")

100%|██████████| 1563/1563 [01:08<00:00, 22.67it/s]
100%|██████████| 1563/1563 [01:08<00:00, 22.85it/s]
100%|██████████| 391/391 [00:17<00:00, 22.19it/s]
100%|██████████| 391/391 [00:17<00:00, 22.22it/s]
100%|██████████| 98/98 [00:07<00:00, 13.21it/s]
100%|██████████| 98/98 [00:07<00:00, 13.36it/s]
100%|██████████| 25/25 [00:07<00:00,  3.24it/s]
100%|██████████| 25/25 [00:07<00:00,  3.30it/s]
100%|██████████| 7/7 [00:07<00:00,  1.12s/it]
100%|██████████| 7/7 [00:07<00:00,  1.13s/it]
100%|██████████| 1563/1563 [01:25<00:00, 18.30it/s]
100%|██████████| 1563/1563 [01:26<00:00, 18.04it/s]
100%|██████████| 391/391 [00:21<00:00, 17.87it/s]
100%|██████████| 391/391 [00:22<00:00, 17.75it/s]
100%|██████████| 98/98 [00:07<00:00, 12.92it/s]
100%|██████████| 98/98 [00:07<00:00, 13.18it/s]
100%|██████████| 25/25 [00:07<00:00,  3.31it/s]
100%|██████████| 25/25 [00:07<00:00,  3.34it/s]
100%|██████████| 7/7 [00:07<00:00,  1.11s/it]
100%|██████████| 7/7 [00:07<00:00,  1.12s/it]

Speedup results for 2 GPUs:
Batch size: 32,   Time taken: 68.39 seconds
Batch size: 128,  Time taken: 17.6 seconds
Batch size: 512,  Time taken: 7.34 seconds
Batch size: 2048, Time taken: 7.58 seconds
Batch size: 8192, Time taken:  7.9 seconds

Speedup results for 4 GPUs:
Batch size: 32,   Time taken: 86.65 seconds
Batch size: 128,  Time taken: 22.03 seconds
Batch size: 512,  Time taken: 7.44 seconds
Batch size: 2048, Time taken: 7.49 seconds
Batch size: 8192, Time taken: 7.83 seconds





# 4.2
```text 
2 GPUs:
Batch size: 32,   Time taken: 68.39 seconds
Batch size: 128,  Time taken: 17.6  seconds
Batch size: 512,  Time taken: 7.34  seconds
Batch size: 2048, Time taken: 7.58  seconds
Batch size: 8192, Time taken: 7.9   seconds
4 GPUs:
Batch size: 32,   Time taken: 86.65 seconds
Batch size: 128,  Time taken: 22.03 seconds
Batch size: 512,  Time taken: 7.44  seconds
Batch size: 2048, Time taken: 7.49  seconds
Batch size: 8192, Time taken: 7.83  seconds
```
Speedup Analysis:

- Speedup was not linear due to overheads in communication and synchronization.
    As the batch size increased, the speedup flattened or decreased due to overheads outweighing computation gains.
    
- This is an example of strong scaling as the problem size remains constant while GPUs are added


# 4.3

To calculate:
Compute Time = Training time on 1 GPU 
Communication Time = Training time on N GPUs - Compute Time

Results:

| Batch Size | 2 GPU Compute | 2 GPU Comm | 4 GPU Compute | 4 GPU Comm |
|------------|---------------|------------|---------------|------------|
| 32         | 13.55         | 54.84      | 13.55         | 73.10      |
| 128        | 7.62          | 9.98       | 7.62          | 14.41      | 
| 512        | 7.33          | 0.01       | 7.33          | 0.11       |
| 2048       | 7.57          | 0.01       | 7.57          | -0.08      |
| 8192       | 7.97          | -0.07      | 7.97          | -0.14      |


In [7]:
from tabulate import tabulate

# Constants
num_parameters = 11689512  
param_size_bytes = 4  

# Calculating Allreduce Costs
allreduce_cost_2_gpu_gb = 2 * (2 - 1) * num_parameters * param_size_bytes / 2**30
allreduce_cost_4_gpu_gb = 2 * (4 - 1) * num_parameters * param_size_bytes / 2**30


exec_times_2_gpu = {
    32: 68.39,
    128: 17.6,
    512: 7.34,
    2048: 7.58,
    8192: 7.9
}
exec_times_4_gpu = {
    32: 86.65,
    128: 22.03,
    512: 7.44,
    2048: 7.49,
    8192: 7.83
}


def calculate_bandwidth_utilization(allreduce_cost_gb, exec_time_sec):
    return allreduce_cost_gb / exec_time_sec


batch_sizes = [32, 128, 512, 2048, 8192]
table_data = []
for batch_size in batch_sizes:
    bw_util_2_gpu = calculate_bandwidth_utilization(allreduce_cost_2_gpu_gb, exec_times_2_gpu[batch_size])
    bw_util_4_gpu = calculate_bandwidth_utilization(allreduce_cost_4_gpu_gb, exec_times_4_gpu[batch_size])
    table_data.append([batch_size, bw_util_2_gpu, bw_util_4_gpu])


headers = ["Batch Size per GPU", "2-GPU Bandwidth Utilization GB/s", "4-GPU Bandwidth Utilization GB/s)"]
print(tabulate(table_data, headers=headers, floatfmt=".8f"))


  Batch Size per GPU    2-GPU Bandwidth Utilization GB/s    4-GPU Bandwidth Utilization GB/s)
--------------------  ----------------------------------  -----------------------------------
                  32                          0.00127349                           0.00301536
                 128                          0.00494850                           0.01186023
                 512                          0.01186562                           0.03511841
                2048                          0.01148993                           0.03488397
                8192                          0.01102451                           0.03336922


# 4.4 


| Formula | Description |
|---------|-------------|
| 2(N - 1) * K / N | All-reduce cost, where K is the number of model parameters and N is the number of GPUs |
| Communication Time / All-reduce Cost | Bandwidth Utilization |

Given:
- Number of parameters in ResNet18 (K) = 11,689,512
- 2 GPU All-reduce Cost = 2(2 - 1) * 11,689,512 = 11,689,512
- 4 GPU All-reduce Cost = 2(4 - 1) * 11,689,512 = 17,534,268

Converting All-reduce Costs to GB:
- 2 GPU All-reduce Cost = 11,689,512 * 4 bytes / 2^30 = 0.044 GB
- 4 GPU All-reduce Cost = 17,534,268 * 4 bytes / 2^30 = 0.065 GB


```text
  Batch Size per GPU    2-GPU Bandwidth Utilization GB/s    4-GPU Bandwidth Utilization GB/s
--------------------  ----------------------------------  -----------------------------------
                  32                          0.00127349                           0.00301536
                 128                          0.00494850                           0.01186023
                 512                          0.01186562                           0.03511841
                2048                          0.01148993                           0.03488397
                8192                          0.01102451                           0.03336922
```

In [4]:
allreduce_cost_2_gpu_gb/2, allreduce_cost_4_gpu_gb/4

(0.043546825647354126, 0.06532023847103119)