# Problem 2 - Data Parallelism in Pytorch

## 2.1

In [None]:
!pip install torch

In [7]:
# Import torch library
import torch

# Set the device to 'cuda' to use the GPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Print the total number of GPUs available
print("Number of GPUs =", torch.cuda.device_count()) # torch.cuda.device_count() returns the number of CUDA-enabled GPUs detected

Number of GPUs = 4


In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PreActBlock(nn.Module):
    ''' A Pre-activation version of the BasicBlock used in ResNet architectures. '''
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(PreActBlock, self).__init__()
        # First apply Batch Normalization and then a ReLU activation
        # before each convolutional layer
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)

        # A shortcut to match the input and output dimensions, if needed
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False)
            )

    def forward(self, x):
        # Forward pass of the block
        out = F.relu(self.bn1(x))
        shortcut = self.shortcut(out) if hasattr(self, 'shortcut') else x
        out = self.conv1(out)
        out = self.conv2(F.relu(self.bn2(out)))
        out += shortcut  # Adding the shortcut to the main path
        return out


class PreActBottleneck(nn.Module):
    ''' A Pre-activation version of the Bottleneck module, more complex than the basic block. '''
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(PreActBottleneck, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion * planes, kernel_size=1, bias=False)

        # Shortcut for dimension matching
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False)
            )

    def forward(self, x):
        # Forward pass of the bottleneck
        out = F.relu(self.bn1(x))
        shortcut = self.shortcut(out) if hasattr(self, 'shortcut') else x
        out = self.conv1(out)
        out = self.conv2(F.relu(self.bn2(out)))
        out = self.conv3(F.relu(self.bn3(out)))
        out += shortcut  # Adding the shortcut to the main path
        return out


class PreActResNet(nn.Module):
    ''' The overall Pre-activation ResNet architecture. '''
    def __init__(self, block, num_blocks, num_classes=10):
        super(PreActResNet, self).__init__()
        self.in_planes = 64

        # The initial convolutional layer
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)

        # Stacking layers of blocks
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)

        # The final fully connected layer
        self.linear = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        # Creating a layer consisting of 'num_blocks' blocks
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential

In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class BasicBlock(nn.Module):
    ''' Basic Block for use in ResNet-18 and ResNet-34 '''
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        # First convolutional layer
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        # Second convolutional layer
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        # Shortcut to match the input and output dimensions
        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        # Forward pass of the block
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    ''' Bottleneck Block for use in larger ResNet models (e.g., ResNet-50, 101, 152) '''
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        # Three convolutional layers
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        # Shortcut connection
        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        # Forward pass of the block
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    ''' General ResNet model. '''
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        # Initial convolutional layer
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)

        # Creating stacks of residual blocks
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)

        # Fully connected layer
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        # Function to create a layer of blocks
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        # Forward propagation
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4) # Average pooling
        out = out.view(out.size(0), -1) # Flatten
        out = self.linear(out) # Fully connected layer
        return out


def ResNet18():
    ''' Function to return a ResNet-18 model. '''
    return ResNet(BasicBlock, [2, 2, 2, 2])

def test():
    ''' Test function for model verification. '''
    net = ResNet18()
    y = net(torch.randn(1, 3, 32, 32))
    print(y.size()) # Print the output size

# Run the test function
test()

torch.Size([1, 10])


In [10]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets
from torch.utils.data import DataLoader
from torch.nn.parallel import DataParallel
import time


def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])

In [11]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# Define a series of transformations to be applied to the images
transform = transforms.Compose([
    # Randomly crop the image to 32x32 pixels, adding padding of 4 pixels on each side
    transforms.RandomCrop(32, padding=4),
    # Randomly flip the image horizontally with a probability of 0.5
    transforms.RandomHorizontalFlip(p=0.5),
    # Convert image to a PyTorch Tensor
    transforms.ToTensor(),
    # Normalize the image with mean and standard deviation for each color channel
    # This helps in faster convergence during training
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

# Load the CIFAR10 dataset
# The dataset consists of 60,000 32x32 color images in 10 classes, with 6,000 images per class
trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# Create DataLoader for the training set
# DataLoader provides an iterable over the given dataset, with batch processing and optional shuffling
trainloader = DataLoader(trainset, batch_size=128, shuffle=True, num_workers=2)

# Create DataLoader for the test set
# Here, shuffling is not necessary as it's used only for evaluating the model
testloader = DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

Files already downloaded and verified
Files already downloaded and verified


In [12]:
import time

def one_epoch(model, train_loader, criterion, optimizer, device):
    """
    Train the model for one epoch
    Returns:
    float: The time taken to complete the epoch
    """
    # Set the model to training mode
    model.train()

    # Record the start time
    start_time = time.time()

    # Iterate over the training data
    for inputs, labels in train_loader:
        # Move the data to the specified device (GPU or CPU)
        inputs, labels = inputs.to(device), labels.to(device)

        # Clear the gradients before backward pass
        optimizer.zero_grad()

        # Forward pass: compute the model output
        outputs = model(inputs)

        # Compute the loss
        loss = criterion(outputs, labels)

        # Backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()

        # Perform a single optimization step (parameter update)
        optimizer.step()

    # Record the end time
    end_time = time.time()

    # Return the time taken for this epoch
    return end_time - start_time

In [13]:
import torch

# Check if CUDA (NVIDIA GPU) is available
if torch.cuda.is_available():
    # Get the name of the first GPU
    gpu_name = torch.cuda.get_device_name(0)
    print(f"GPU Name: {gpu_name}")

    # Get the properties of the first GPU
    gpu_properties = torch.cuda.get_device_properties(0)

    # Get total memory in GB
    total_memory_gb = gpu_properties.total_memory / (1024 ** 3)
    print(f"Total GPU Memory: {total_memory_gb:.2f} GB")

    # Get the amount of memory currently allocated (in bytes) and convert to GB
    allocated_memory_gb = torch.cuda.memory_allocated(0) / (1024 ** 3)
    print(f"Allocated GPU Memory: {allocated_memory_gb:.2f} GB")
else:
    # If no GPU is available, print this message
    print("No GPU available.")

GPU Name: Quadro RTX 8000
Total GPU Memory: 44.48 GB
Allocated GPU Memory: 0.00 GB


In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import time

def main():
    # Set the device to GPU if available, otherwise use CPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize the model, loss function, and optimizer
    model = ResNet18().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)

    # Starting batch size
    batch_size = 32

    # Loop to incrementally increase batch size and train the model
    while True:
        try:
            # Initialize DataLoader with the current batch size
            trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

            # Warm-up epoch: Run one epoch without timing it, for warming up the GPU
            one_epoch(model, trainloader, criterion, optimizer, device)

            # Timed epoch: Run another epoch and measure the training time
            start_time = time.time()
            one_epoch(model, trainloader, criterion, optimizer, device)
            end_time = time.time()

            print(f"Training time for batch size {batch_size}: {end_time - start_time:.2f} seconds")

            # Increase the batch size by 4 times for the next iteration
            batch_size *= 4

        except RuntimeError as e:
            if 'out of memory' in str(e):
                # If a CUDA out-of-memory error occurs, report and exit the loop
                print(f"Out of memory with batch size {batch_size}.")
                break
            else:
                # Re-raise any other runtime errors
                raise e

# Run the main function if the script is executed directly
if __name__ == "__main__":
    main()

Training time for batch size 32: 23.52 seconds
Training time for batch size 128: 17.53 seconds
Training time for batch size 512: 18.47 seconds
Training time for batch size 2048: 19.52 seconds
Training time for batch size 8192: 21.18 seconds
Out of memory with batch size 32768.


**Answer:**

The output indicates the training times for different batch sizes while training a neural network on a single GPU. As the batch size increases, there's an initial decrease in training time, showing efficiency gains from larger batches. However, beyond a certain point (batch size 2048), the training time starts to increase again, possibly due to the GPU's limitations in processing larger batches efficiently. The out-of-memory error at batch size 32768 suggests the GPU's memory capacity was exceeded, preventing further training.

## 2.2

In [18]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn import DataParallel
import time

def train_one_epoch(model, train_loader, criterion, optimizer, device):
    """
    Train the model for one epoch and measure the elapsed time
    Returns:
    float: Time taken for one epoch
    """
    model.train()
    start_time = time.time()

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    end_time = time.time()
    return end_time - start_time

def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    criterion = nn.CrossEntropyLoss()
    gpu_configs = [1, 2, 4]  # GPU configurations
    batch_size = 32

    while True:
        try:
            for num_gpus in gpu_configs:
                # Initialize the model
                model = ResNet18().to(device)
                if num_gpus > 1:
                    model = DataParallel(model, device_ids=list(range(num_gpus)))

                # Set up the optimizer
                optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)

                # Initialize DataLoader
                trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

                # Warm-up epoch (not timed)
                train_one_epoch(model, trainloader, criterion, optimizer, device)

                # Timed training epoch
                training_time = train_one_epoch(model, trainloader, criterion, optimizer, device)
                print(f"Training time for batch size {batch_size} on {num_gpus} GPUs: {training_time:.2f} seconds")

            # Increase batch size for next iteration
            batch_size *= 4

        except RuntimeError as e:
            if 'out of memory' in str(e):
                print(f"Out of memory with batch size {batch_size} on {num_gpus} GPUs.")
                break
            else:
                raise e

if __name__ == "__main__":
    main()

Training time for batch size 32 on 1 GPUs: 23.44 seconds
Training time for batch size 32 on 2 GPUs: 60.17 seconds
Training time for batch size 32 on 4 GPUs: 70.33 seconds
Training time for batch size 128 on 1 GPUs: 17.37 seconds
Training time for batch size 128 on 2 GPUs: 20.04 seconds
Training time for batch size 128 on 4 GPUs: 19.99 seconds
Training time for batch size 512 on 1 GPUs: 18.57 seconds
Training time for batch size 512 on 2 GPUs: 11.35 seconds
Training time for batch size 512 on 4 GPUs: 9.88 seconds
Training time for batch size 2048 on 1 GPUs: 19.68 seconds
Training time for batch size 2048 on 2 GPUs: 11.19 seconds
Training time for batch size 2048 on 4 GPUs: 10.02 seconds
Training time for batch size 8192 on 1 GPUs: 21.34 seconds
Training time for batch size 8192 on 2 GPUs: 13.03 seconds
Training time for batch size 8192 on 4 GPUs: 11.28 seconds
Out of memory with batch size 32768 on 1 GPUs.


**Answer:**

The data indicates a scaling experiment using multiple GPUs. For smaller batches (32, 128), adding more GPUs increases the training time, suggesting inefficiency, possibly due to overheads in data synchronization across GPUs. As the batch size increases (512, 2048, 8192), the training time decreases with more GPUs, showing effective utilization of additional GPUs. This scenario is indicative of strong scaling, where we fix the total problem size (batch size per GPU) and increase the resources. If weak scaling were used (increasing problem size with resources), the numbers might show less drastic speedup due to increased per-GPU computation.

|        | Batch-size 32 per GPU |              | Batch-size 128 per GPU |              | Batch-size 512 per GPU |              |
|--------|-----------------------|--------------|------------------------|--------------|------------------------|--------------|
|        | Time (sec)            | Speedup      | Time (sec)             | Speedup      | Time (sec)             | Speedup      |
| 1-GPU  | 23.44 sec             | 1x                       | 17.37 sec              | 1x    | 18.57 sec              | 1x    |
| 2-GPU  | 60.17 sec             | 2(23.44) / 60.17 ≈ 0.7791258102x | 20.04 sec              | 2(17.37) / 20.04 ≈1.73353293413x | 11.35 sec              | 2(18.57) / 11.35 ≈ 3.27224669604x |
| 4-GPU  | 70.33 sec             | 4(23.44) / 70.33 ≈ 1.33314375089x | 19.99 sec              | 4(17.37) / 19.99 ≈ 3.47573786893x | 9.88 sec               | 4(18.57) / 9.88 ≈ 7.51821862348x |


Observing the speedup values, it is clear that for batch size of 32 using 2 GPUs, there's actually a decrease in performance since it takes longer to process with 2 GPUs than with just one. This suggests that for smaller batch sizes, the additional time required to coordinate between multiple GPUs may outweigh the performance gains. On the other hand, with a batch size of 32 using 4 GPUs, the speedup is approximately 2.6, which is along the lines of what we would anticipate. Moreover, when the batch sizes are increased to 128 and 512, there's a notable improvement in processing speed for configurations with 2 and 4 GPUs, relative to a single GPU setup. 

In this context, with Weak-Scaling, each GPU handles a constant workload despite an increase in the number of GPUs. Conversely, Strong-Scaling keeps the overall workload fixed while adding more GPUs. The experiment aligns with Weak-Scaling, as the per-GPU batch size remains unchanged while more GPUs are added. The results show non-linear efficiency in Weak-Scaling, particularly at smaller batch sizes, likely due to communication overheads and small workloads on multiple GPUs. Switching to Strong-Scaling, where the total batch size stays constant as GPUs increase, might improve efficiency for smaller batches but could result in underutilization for larger batches.

## 2.3

|        | Batch-size 32 per GPU |       | Batch-size 128 per GPU |       | Batch-size 512 per GPU |       |
|--------|-----------------------|-------|------------------------|-------|------------------------|-------|
|        | Compute(sec)          | Comm(sec) | Compute(sec)          | Comm(sec) | Compute(sec)          | Comm(sec) |
| 2-GPU  | 11.72                 | 48.45    | 8.685                  | 11.355   | 9.285                  | 2.065    |
| 4-GPU  | 5.86                  | 64.47    | 4.3425                 | 15.6475  | 4.6425                 | 5.2375   |


**Answer:**
 
 The 'Compute' time can be considered as the total training time on a single GPU divided by the number of GPUs. The 'Communication' time is the difference between the total time on multiple GPUs and the computed 'Compute' time: 


1) For 2 GPUs with batch-size 32:
- Compute time: 23.44 / 2 = 11.72 seconds
- Communication time: 60.17 - 11.72 = 48.45 seconds
2) For 2 GPUs with batch-size 128:
- Compute time: 17.37 / 2 = 8.685 seconds
- Communication time: 20.04 − 8.685 = 11.355 seconds
3) For 2 GPUs with batch-size 512:
- Compute time: 18.57 / 2 = 9.285 seconds
- Communication time: 11.35 − 9.285 = 2.065 seconds
4) For 4 GPUs with batch-size 32:
- Compute time: 23.44 / 4 = 5.86 seconds
- Communication time: 70.33 − 5.86 = 64.47 seconds
5) For 4 GPUs with batch-size 128:
- Compute time: 17.37 / 4 = 4.3425 seconds
- Communication time: 19.99 - 4.3425 = 15.6475 seconds
6) For 4 GPUs with batch-size 512:
- Compute time: 18.57 / 4 = 4.6425 seconds
- Communication time: 9.88 - 4.6425 = 5.2375 seconds

The compute time per GPU, obtained by dividing single GPU training time by the number of GPUs, decreases as more GPUs are used. However, the time spent on communication, calculated by subtracting compute time from total multi-GPU training time, notably rises with the addition of more GPUs. This trend indicates that the communication and data synchronization overheads among multiple GPUs significantly affect the overall training efficiency as the GPU count increases.

## 2.4

**Answer:**

Equation for Allreduce: 2(N-1)(K/N) 
  - K = # of model parameters (in this specific scenario, we are using resnet18)
  - N = # of GPUs utilized

Equation for bandwidth utilization = allreduce communication cost / communication time

Allreduce cost for 2 and 4 GPUs: 
- K = # of parameters in resnet18 = 11,689,512
    - 2-GPU allreduce cost = 2(2-1)(11689512 / 2) = 11,689,512
    - 4-GPU allreduce cost = 2(4-1))(11689512 /4) = 17,534,268
    Convert values to GB:
    - 2-GPU allreduce cost = (11689512 * 4 bytes) / (2^30) = 0.044 GB
    - 4-GPU allreduce cost = (17534268 * 4 bytes) / (2^30) = 0.065 GB

2-GPU Bandwidth Utilization for different batch sizes:
- Bandwidth Utilization: batch-size-per-GPU 32 = 0.044 / 48.45 = 0.00090815273
- Bandwidth Utilization: batch-size-per-GPU 128 = 0.044 / 11.355 = 0.00387494495
- Bandwidth Utilization: batch-size-per-GPU 512 = 0.044 / 2.065 = 0.02130750605

4-GPU Bandwidth Utilization for different batch sizes:
- Bandwidth Utilization: batch-size-per-GPU 32 = 0.065 / 64.47 = 0.00100822087
- Bandwidth Utilization: batch-size-per-GPU 128 = 0.065 / 15.6475 = 0.00415401821
- Bandwidth Utilization: batch-size-per-GPU 512 = 0.065 / 5.2375 = 0.01241050119




|        | Batch-size-per-GPU 32                  | Batch-size-per-GPU 128                 | Batch-size-per-GPU 512                 |
|--------|----------------------------------------|----------------------------------------|----------------------------------------|
|        | Bandwidth Utilization (GB/s)           | Bandwidth Utilization (GB/s)           | Bandwidth Utilization (GB/s)           |
| 2-GPU  | 0.00090815273 |0.00387494495  |0.02130750605|
| 4-GPU  | 0.00100822087|0.00415401821   |0.01241050119 |

In this case, the values show that as the batch size per GPU increases, the bandwidth utilization improves. For 2 GPUs, smaller batch sizes lead to lower efficiency due to increased communication overhead. However, with 4 GPUs, the efficiency is generally better, suggesting that larger batch sizes are more effective in utilizing communication bandwidth efficiently in a parallel training scenario.