# Data Parallelism

In this session, we will explore data parallelism techniques.

## 1. `torch.nn.DataParallel`
First, let‚Äôs take a look at how the familiar `torch.nn.DataParallel` works. `torch.nn.DataParallel` is a multi-threaded module that operates on a **single node with multiple GPUs**.


### 1) Forward Pass

1. **Scatter** the input mini-batch and send it to each device.
2. **Broadcast** the model parameters from GPU-1 to GPUs 2, 3, and 4.
3. Perform the **forward pass** on each device using the replicated model to compute the logits.
4. **Gather** the computed logits and collect them on GPU-1.
5. Compute the **loss** from the logits (with loss reduction).

![](../images/dp_forward.png)

<br>

The corresponding code is shown below.


In [None]:
import torch.nn as nn


def data_parallel(module, inputs, labels, device_ids, output_device):
    inputs = nn.parallel.scatter(inputs, device_ids)
    # Scatter the input data to the devices specified by device_ids

    replicas = nn.parallel.replicate(module, device_ids)
    # Replicate the model across the devices in device_ids
   
    logit = nn.parallel.parallel_apply(replicas, inputs)
    # Perform the forward pass on each device using the replicated models

    logits = nn.parallel.gather(outputs, output_device)
    # Gather the model logits to the output_device (a single device)
    
    return logits


### 2) Backward Pass

1. **Scatter** the computed loss to each device.
2. Perform **backward propagation** on each device using the received loss to compute gradients.
3. **Reduce** all computed gradients to GPU-1 by summing them.
4. Update the model on GPU-1 using the aggregated gradients.

![](../images/dp_backward.png)


#### For those who may not be familiar...
- `loss.backward()`: Computes gradients by differentiating the loss
- `optimizer.step()`: Updates parameters using the computed gradients
- The computation cost follows: `backward()` > `step()`

![](../images/backward_step.png)


In [None]:
"""
src/ch4_data_parallelism/data_parallel.py
"""

from torch import nn
from torch.optim import Adam
from torch.utils.data import DataLoader
from transformers import BertForSequenceClassification, BertTokenizer
from datasets import load_dataset

# 1. Create dataset
datasets = load_dataset("multi_nli").data["train"]
datasets = [
    {
        "premise": str(p),
        "hypothesis": str(h),
        "labels": l.as_py(),
    }
    for p, h, l in zip(datasets[2], datasets[5], datasets[9])
]
data_loader = DataLoader(datasets, batch_size=128, num_workers=4)

# 2. Create model and tokenizer
model_name = "bert-base-cased"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name, num_labels=3).cuda()

# 3. Create the data parallel module
# device_ids: list of devices to use / output_device: device to gather outputs
model = nn.DataParallel(model, device_ids=[0, 1, 2, 3], output_device=0)

# 4. Create optimizer and loss function
optimizer = Adam(model.parameters(), lr=3e-5)
loss_fn = nn.CrossEntropyLoss(reduction="mean")

# 5. Start training
for i, data in enumerate(data_loader):
    optimizer.zero_grad()
    tokens = tokenizer(
        data["premise"],
        data["hypothesis"],
        padding=True,
        truncation=True,
        max_length=512,
        return_tensors="pt",
    )

    logits = model(
        input_ids=tokens.input_ids.cuda(),
        attention_mask=tokens.attention_mask.cuda(),
        return_dict=False,
    )[0]

    loss = loss_fn(logits, data["labels"].cuda())
    loss.backward()
    optimizer.step()

    if i % 10 == 0:
        print(f"step:{i}, loss:{loss}")

    if i == 300:
        break


In [None]:
!python ../src/ch4_data_parallelism/data_parallel.py

![](../images/dp_training.png)

Training works well on multiple GPUs. However, there is a problem: since the logits are gathered on GPU 0, a **GPU memory imbalance** occurs. This issue can be alleviated to some extent by changing the approach to gather the **loss instead of the logits** on device 0. This is because the loss is a scalar and therefore much smaller in size compared to logits.

This approach is equivalent to the `DataParallelCriterion` used in **PyTorch-Encoding**, which was introduced in a blog post by **Daangn Market**:
- Blog: https://medium.com/daangn/pytorch-multi-gpu-%ED%95%99%EC%8A%B5-%EC%A0%9C%EB%8C%80%EB%A1%9C-%ED%95%98%EA%B8%B0-27270617936b  
- Code: https://github.com/zhanghang1989/PyTorch-Encoding

Although the blog explains it in a fairly complex way, the same functionality can be implemented much more easily by simply **overriding the `forward` function**.

![](../images/dp_forward_2.png)

<br>

The key idea is to perform **loss computation and loss reduction inside the multi-threaded execution**. Since the model‚Äôs `forward` function already runs in multiple threads, placing the loss computation inside the `forward` function makes this very straightforward to implement.

One interesting point is that, with this approach, **loss reduction happens twice**. First, within the multi-threaded execution, the loss is reduced from `batch_size // 4` to 4 (step 4 in the figure). Then, the four losses produced by each device are reduced again into a single loss (step 5 in the figure). Even so, this approach is much more efficient because the loss computation itself is parallelized and the memory burden on GPU 0 is significantly reduced.

    

In [None]:
"""
src/ch4_data_parallelism/custom_data_parallel.py
"""

from torch import nn


# A standard model that outputs logits as the model output
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(768, 3)

    def forward(self, inputs):
        outputs = self.linear(inputs)
        return outputs


# A parallel model that outputs the loss during the forward pass
class ParallelLossModel(Model):
    def __init__(self):
        super().__init__()

    def forward(self, inputs, labels):
        logits = super(ParallelLossModel, self).forward(inputs)
        loss = nn.CrossEntropyLoss(reduction="mean")(logits, labels)
        return loss

Fortunately, the Hugging Face Transformers models that we frequently use provide built-in support for computing the loss directly during the forward pass. Therefore, we can proceed using the Transformers functionality without going through the additional steps described above. The code below computes the loss directly by passing labels to the `labels` argument of a Transformers model.


In [None]:
"""
src/ch4_data_parallelism/efficient_data_parallel.py
"""

# 1 ~ 4ÍπåÏßÄ ÏÉùÎûµ...

# 5. start training
for i, data in enumerate(data_loader):
    optimizer.zero_grad()
    tokens = tokenizer(
        data["premise"],
        data["hypothesis"],
        padding=True,
        truncation=True,
        max_length=512,
        return_tensors="pt",
    )

    loss = model(
        input_ids=tokens.input_ids.cuda(),
        attention_mask=tokens.attention_mask.cuda(),
        labels=data["labels"],
    ).loss
    
    loss = loss.mean()
    # (4,) -> (1,)
    loss.backward()
    optimizer.step()

    if i % 10 == 0:
        print(f"step:{i}, loss:{loss}")

    if i == 300:
        break

In [None]:
!python ../src/ch4_data_parallelism/efficient_data_parallel.py

<br>

## 2. Limitations of `torch.nn.DataParallel`

### 1) Inefficient in Python because it is a multi-threaded module
Due to the Global Interpreter Lock (GIL), Python does not allow multiple threads to run simultaneously within a single process. Therefore, to achieve true parallelism, the program must fundamentally be implemented as a **multi-process program**, allowing multiple processes to run concurrently.

<br>

### 2) The updated model must be replicated to other devices at every step
In the current approach, gradients computed on each device are gathered onto a single device and used to update the model. As a result, the updated model must be broadcast and replicated to the other devices at every training step, which is quite expensive. However, if gradients were not gathered and each device instead performed its own `step()` locally, the model would not need to be replicated every time. So how can this be implemented?

<br>

### Solution? ‚ûù All-reduce!! üëç
![](../images/allreduce.png)

The answer is the **All-reduce** operation introduced earlier. If the gradients computed on each device are summed together and then evenly distributed to all devices, each device can perform its own `step()` independently. This eliminates the need to replicate the model from a single device at every step. Therefore, improving the existing approach requires leveraging All-reduce.

<br>

### However... ü§î
All-reduce is considered a very expensive operation. Why is that? Let‚Äôs take a closer look at how All-reduce is implemented internally.

<br>

### Reduce + Broadcast implementation
![](../images/allreduce_1.png)

<br>

### All-to-All implementation
![](../images/allreduce_2.png)

<br><br>


## 3. `torch.nn.parallel.DistributedDataParallel` (DDP)

### Ring All-reduce üíç
Ring All-reduce is a new collective operation developed by Baidu researchers in 2017. Because it demonstrated significantly higher efficiency compared to previous approaches, it became a core component in the development of DDP.

- https://github.com/baidu-research/baidu-allreduce

![](../images/ring_allreduce.gif)

<br>

![](../images/ring_allreduce.png)

<br>

### What is DDP?
DDP is a data parallelism module designed to address the limitations of the original DataParallel approach. It is a **multi-process module** that works in both **single-node and multi-node, multi-GPU** environments. By leveraging All-reduce, the concept of a master process is eliminated, which greatly simplifies the training workflow.

![](../images/ddp.png)

<br>


In [None]:
"""
src/ch4_data_parallelism/ddp.py
"""

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from torch.optim import Adam
from torch.utils.data import DataLoader, DistributedSampler
from transformers import BertForSequenceClassification, BertTokenizer
from datasets import load_dataset

# 1. Initialize the process group
dist.init_process_group("nccl")
rank = dist.get_rank()
world_size = dist.get_world_size()
torch.cuda.set_device(rank)
device = torch.cuda.current_device()

# 2. Create dataset
datasets = load_dataset("multi_nli").data["train"]
datasets = [
    {
        "premise": str(p),
        "hypothesis": str(h),
        "labels": l.as_py(),
    }
    for p, h, l in zip(datasets[2], datasets[5], datasets[9])
]

# 3. Create DistributedSampler
# DistributedSampler is a module used to split data and distribute it across different processes.
sampler = DistributedSampler(
    datasets,
    num_replicas=world_size,
    rank=rank,
    shuffle=True,
)
data_loader = DataLoader(
    datasets,
    batch_size=32,
    num_workers=4,
    sampler=sampler,
    shuffle=False,
    pin_memory=True,
)


# 4. Create model and tokenizer
model_name = "bert-base-cased"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name, num_labels=3).cuda()

# 5. Create the Distributed Data Parallel module
model = DistributedDataParallel(model, device_ids=[device], output_device=device)

# 5. Create optimizer
optimizer = Adam(model.parameters(), lr=3e-5)

# 6. Start training
for i, data in enumerate(data_loader):
    optimizer.zero_grad()
    tokens = tokenizer(
        data["premise"],
        data["hypothesis"],
        padding=True,
        truncation=True,
        max_length=512,
        return_tensors="pt",
    )

    loss = model(
        input_ids=tokens.input_ids.cuda(),
        attention_mask=tokens.attention_mask.cuda(),
        labels=data["labels"],
    ).loss

    loss.backward()
    optimizer.step()

    if i % 10 == 0 and rank == 0:
        print(f"step:{i}, loss:{loss}")

    if i == 300:
        break


Since this is a multi-process application, we launch it using `torch.distributed.launch`.


In [None]:
!python -m  torch.distributed.launch --nproc_per_node=4 ../src/ch4_data_parallelism/ddp.py

### But wait‚Äîwhen is the best time to perform All-reduce?
- Should All-reduce be performed together with the `backward()` operation?
- Or should it be done after `backward()` finishes and before `step()` starts?

![](../images/ddp_analysis_1.png)

<br>

### In conclusion, overlapping `backward()` and `all-reduce` is the most efficient approach.

Ultimately, overlapping `backward()` and `all-reduce` is the most efficient strategy. Since `all-reduce` involves network communication, while `backward()` and `step()` are GPU computations, they can be executed simultaneously. By overlapping them, computation and communication are maximally overlapped, which significantly improves overall training efficiency.

![](../images/ddp_analysis_2.png)

<br>

Analysis shows that when comparing `backward()` and `step()`, the `backward()` operation is much more computationally expensive.

![](../images/ddp_analysis_3.png)

<br>

Naturally, the more expensive the operation you overlap, the shorter the total training time becomes. The analysis shows that performing `all-reduce` together with `backward()` is much faster than waiting until `backward()` finishes.

![](../images/ddp_analysis_4.png)

<br>

### Common questions that may arise...
- **Q1:** How can `all-reduce` be performed when not all gradients have been computed during `backward()`?
  - **A1:** Since `backward()` proceeds sequentially from the later layers to the earlier ones, gradients can be communicated as soon as each layer‚Äôs gradients are computed.

<br>

- **Q2:** Then how often is `all-reduce` performed? Is it done for every layer?
  - **A2:** No. **Gradient bucketing** is used. All-reduce is triggered when a bucket becomes full.

<br>

### Gradient Bucketing
Gradient bucketing is a technique in which gradients are accumulated into buckets of a fixed size and sent to other processes once a bucket is full. During the `backward()` pass, gradients computed from the later layers are sequentially stored in a bucket. When the bucket reaches its capacity, an All-reduce operation is performed to distribute the summed gradients to each device.

The diagram may be a bit confusing, but note that what is stored in the bucket is **not model parameters**, but the **gradients produced by each layer**. All buckets have a fixed size, which can be configured in megabytes using the `bucket_size_mb` argument.

![](../images/ddp_analysis_5.png)
