## Distributed Data Parallel (DDP) Process
- Step 1: Multiple processes are launched, each loading data and model separately.
- Step 2: Each process performs forward propagation independently.
- Step 3: Each process computes the loss and performs backpropagation.
- Step 4: Gradients are synchronized across GPUs using all-reduce.
- Step 5: Each process updates its model weights with the synchronized gradients.


Compared to DataParallel, DDP eliminates GIL issues, distributes the computation more evenly, reduces synchronization overhead, and supports multi-node training.

## Basic Concepts in Distributed Data Parallel (DDP)

1. **Group**: A collection of processes involved in a distributed training task. Typically, all GPUs participate in a single group.

2. **World Size**: The total number of processes participating in the distributed training. Usually equal to the number of GPUs used.

3. **Node**: A machine or container running the training. Each node can contain multiple GPUs.

4. **Rank (Global Rank)**: The unique identifier assigned to each process in the distributed training. Used for communication and coordination.

5. **Local Rank**: The rank of a process within a node. Helps identify which GPU a process should use on a given node.

6. **Backend**: The communication protocol used for inter-process communication (e.g., NCCL, Gloo, MPI).  
   - **NCCL**: Optimized for GPU-to-GPU communication.  
   - **Gloo**: Supports both CPU and GPU training.  
   - **MPI**: Common in HPC environments.

7. **All-Reduce**: The operation used to synchronize gradients across all processes. Each process computes its own gradients, and `all-reduce` ensures they are averaged and shared.

8. **Broadcast**: Used to distribute initial model weights from rank 0 to all other processes.

9. **Synchronization**: Ensures that all GPUs have the same model parameters and gradients after each update.

10. **Gradient Bucketing**: Groups small gradient tensors together to optimize communication efficiency, reducing overhead.


In [None]:
%%write ddp.py

# %% [markdown]
# # 文本分类实例

# %% [markdown]
# ## Step1 导入相关包

# %%
from transformers import AutoTokenizer, AutoModelForSequenceClassification, BertTokenizer, BertForSequenceClassification

import torch.distributed as dist

dist.init_process_group(backend="nccl")

# %% [markdown]
# ## Step2 加载数据

# %%
import pandas as pd

data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
data

# %%
data = data.dropna()
data

# %% [markdown]
# ## Step3 创建Dataset

# %%
from torch.utils.data import Dataset

class MyDataset(Dataset):

    def __init__(self) -> None:
        super().__init__()
        self.data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
        self.data = self.data.dropna()

    def __getitem__(self, index):
        return self.data.iloc[index]["review"], self.data.iloc[index]["label"]

    def __len__(self):
        return len(self.data)

# %%
dataset = MyDataset()
# for i in range(5):
#     print(dataset[i])

# %% [markdown]
# ## Step4 划分数据集

# %%
import torch
from torch.utils.data import random_split


trainset, validset = random_split(dataset, lengths=[0.9, 0.1], generator=torch.Generator().manual_seed(42))
len(trainset), len(validset)

# %%
for i in range(5):
    print(trainset[i])

# %% [markdown]
# ## Step5 创建Dataloader

# %%
import torch

tokenizer = BertTokenizer.from_pretrained("/gemini/code/model")

def collate_func(batch):
    texts, labels = [], []
    for item in batch:
        texts.append(item[0])
        labels.append(item[1])
    inputs = tokenizer(texts, max_length=128, padding="max_length", truncation=True, return_tensors="pt")
    inputs["labels"] = torch.tensor(labels)
    return inputs

# %%
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

trainloader = DataLoader(trainset, batch_size=32, collate_fn=collate_func, sampler=DistributedSampler(trainset))
validloader = DataLoader(validset, batch_size=64, collate_fn=collate_func, sampler=DistributedSampler(validset))

# %%
next(enumerate(validloader))[1]

# %% [markdown]
# ## Step6 创建模型及优化器

# %%
from torch.optim import Adam
import os
from torch.nn.parallel import DistributedDataParallel as DDP

model = BertForSequenceClassification.from_pretrained("/gemini/code/model")

if torch.cuda.is_available():
    model = model.to(int(os.environ["LOCAL_RANK"]))

model = DDP(model)

# %%
optimizer = Adam(model.parameters(), lr=2e-5)

# %% [markdown]
# ## Step7 训练与验证

def print_rank_0(info):
    if int(os.environ["RANK"]) == 0:
        print(info)

# %%
def evaluate():
    model.eval()
    acc_num = 0
    with torch.inference_mode():
        for batch in validloader:
            if torch.cuda.is_available():
                batch = {k: v.to(int(os.environ["LOCAL_RANK"])) for k, v in batch.items()}
            output = model(**batch)
            pred = torch.argmax(output.logits, dim=-1)
            acc_num += (pred.long() == batch["labels"].long()).float().sum()
    dist.all_reduce(acc_num)
    return acc_num / len(validset)

def train(epoch=3, log_step=100):
    global_step = 0
    for ep in range(epoch):
        model.train()
        trainloader.sampler.set_epoch(ep)
        for batch in trainloader:
            if torch.cuda.is_available():
                batch = {k: v.to(int(os.environ["LOCAL_RANK"])) for k, v in batch.items()}
            optimizer.zero_grad()
            output = model(**batch)
            loss = output.loss
            loss.backward()
            optimizer.step()
            if global_step % log_step == 0:
                dist.all_reduce(loss, op=dist.ReduceOp.AVG)
                print_rank_0(f"ep: {ep}, global_step: {global_step}, loss: {loss.item()}")
            global_step += 1
        acc = evaluate()
        print_rank_0(f"ep: {ep}, acc: {acc}")

# %% [markdown]
# ## Step8 模型训练

# %%
train()

In [None]:
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.utils.data import DataLoader, Dataset, random_split
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.optim import Adam
from torchtext.datasets import YelpReviewFull
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import os

# Initialize distributed process group for multi-GPU training
dist.init_process_group(backend="nccl")  # Using NCCL for efficient GPU communication

# Load tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-cased")
model = AutoModelForSequenceClassification.from_pretrained(
    "google-bert/bert-base-cased", num_labels=5, torch_dtype="auto"
)

# Move model to the appropriate device based on the process rank
if torch.cuda.is_available():
    model = model.to(int(os.environ["LOCAL_RANK"]))

# Wrap the model with DistributedDataParallel for synchronized multi-GPU training
model = DDP(model)

# Custom Dataset class to handle YelpReviewFull dataset
class YelpDataset(Dataset):
    def __init__(self, split="train"):
        super().__init__()
        self.data = list(YelpReviewFull(split=split))  # Load dataset into memory

    def __getitem__(self, index):
        return self.data[index][1], self.data[index][0] - 1  # Adjust labels to be 0-based

    def __len__(self):
        return len(self.data)

# Create dataset instances
# Since we're using the YelpReviewFull dataset and directly selecting the train and test splits,
# We no longer need to use random_split() with a fixed seed.
# The dataset is already predefined, ensuring consistency across GPUs.
train_dataset = YelpDataset(split="train")
valid_dataset = YelpDataset(split="test")

# Function to preprocess batches (tokenization, padding, and conversion to tensors)
def collate_fn(batch):
    texts, labels = zip(*batch)
    inputs = tokenizer(list(texts), max_length=128, padding="max_length", truncation=True, return_tensors="pt")
    inputs["labels"] = torch.tensor(labels)
    return inputs

# Define DistributedSampler to ensure proper data shuffling across processes
train_sampler = DistributedSampler(train_dataset)
valid_sampler = DistributedSampler(valid_dataset)

# Create DataLoaders for training and validation
train_loader = DataLoader(train_dataset, batch_size=32, collate_fn=collate_fn, sampler=train_sampler)
valid_loader = DataLoader(valid_dataset, batch_size=64, collate_fn=collate_fn, sampler=valid_sampler)

# Optimizer setup
optimizer = Adam(model.parameters(), lr=2e-5)

# Function to ensure only rank 0 prints log messages
def print_rank_0(info):
    if int(os.environ.get("RANK", 0)) == 0:
        print(info)

# Evaluation function
def evaluate():
    model.eval()
    acc_num = torch.tensor(0.0, device=model.device)
    with torch.inference_mode():
        for batch in valid_loader:
            if torch.cuda.is_available():
                batch = {k: v.to(model.device) for k, v in batch.items()}
            output = model(**batch)
            pred = torch.argmax(output.logits, dim=-1)
            acc_num += (pred.long() == batch["labels"].long()).float().sum()

    # Synchronize accuracy across all GPUs
    dist.all_reduce(acc_num, op=dist.ReduceOp.SUM)
    return acc_num.item() / len(valid_dataset)

# Training function
def train(epochs=3, log_step=100):
    global_step = 0
    for epoch in range(epochs):
        model.train()
        train_loader.sampler.set_epoch(epoch)  # Ensure proper shuffling across epochs
        for batch in train_loader:
            if torch.cuda.is_available():
                batch = {k: v.to(model.device) for k, v in batch.items()}

            optimizer.zero_grad()
            output = model(**batch)
            loss = output.loss
            loss.backward()
            optimizer.step()

            # Log training progress on rank 0
            if global_step % log_step == 0:
                loss_item = loss.detach().clone()
                dist.all_reduce(loss_item, op=dist.ReduceOp.AVG)  # Sync loss across GPUs
                print_rank_0(f"Epoch: {epoch}, Step: {global_step}, Loss: {loss_item.item()}")
            global_step += 1

        # Evaluate the model after each epoch
        acc = evaluate()
        print_rank_0(f"Epoch: {epoch}, Accuracy: {acc}")

# Start training
train()

dist.destroy_process_group()  # Cleanup distributed processes


In [None]:
# !torchrun --nproc_per_node=2 ddp.py

## Note

If your dataset cannot be evenly split across GPUs, `DistributedSampler` automatically handles this by assigning different numbers of samples to different GPUs as needed. However, it does not pad the dataset. Instead, it may drop some samples if `drop_last=True` (which can be set in the `DataLoader`), or it may allow an uneven last batch.

Solutions:
1. **Allow smaller last batches (default behavior):**
   - If `drop_last=False` (default), the last batch on some GPUs may have fewer samples.

2. **Ensure equal data distribution with padding:**
   - You can modify `collate_fn` to pad the dataset artificially so all GPUs get an equal number of samples.

3. **Use `DistributedSampler` with `num_replicas`:**
   - If you manually set `num_replicas`, you can control how data is split across GPUs.


## DDP Training with Trainer:
For DDP with `Trainer`, you can simply enable multi-GPU training using the `TrainingArguments`. Below are the essential components for setting up DDP:

### Distributed Training:
- When training on multiple GPUs, you can set `per_device_train_batch_size` based on how many GPUs you have available.
- By default, `Trainer` handles distributed data parallelism, ensuring each GPU gets a portion of the dataset.

### Setting up DDP:
- DDP is enabled automatically if you are using `Trainer` with distributed environment variables (like `CUDA_VISIBLE_DEVICES` or setting the number of GPUs with `--nproc_per_node` when launching the script).

### Trainer in Distributed Mode:
- If running the script using `torchrun` or `accelerate`, DDP will be enabled automatically. The `Trainer` will manage distributing the dataset and model across the GPUs.
- When using DDP, batch size will be split across GPUs, and each GPU computes gradients independently.


In [None]:
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from transformers import DataCollatorWithPadding
import numpy as np
import evaluate

# Load the dataset
dataset = load_dataset("yelp_review_full")
print(f"Example from dataset: {dataset['train'][100]}")

# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-cased")

# Tokenize the dataset
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True)

tokenized_datasets = dataset.map(tokenize_function, batched=True)
print(f"Tokenized dataset: {tokenized_datasets}")

# Subsample for quick experimentation
small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(100))
small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(50))

# Load the model
model = AutoModelForSequenceClassification.from_pretrained("google-bert/bert-base-cased", num_labels=5, torch_dtype="auto")
print(f"Model config: {model.config}")

# Metrics to compute during training
acc_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    acc = acc_metric.compute(predictions=predictions, references=labels)
    f1 = f1_metric.compute(predictions=predictions, references=labels, average="macro")
    acc.update(f1)
    return acc

# Define training arguments
training_args = TrainingArguments(
    output_dir="test_trainer",                          # Output directory for model checkpoints
    per_device_train_batch_size=8,                      # Batch size per device during training
    per_device_eval_batch_size=16,                      # Batch size per device during evaluation
    logging_steps=10,                                   # Number of steps between logging
    evaluation_strategy="epoch",                        # Evaluate at the end of each epoch
    save_strategy="epoch",                              # Save checkpoints at the end of each epoch
    num_train_epochs=4,                                 # Number of training epochs
    save_total_limit=3,                                 # Maximum number of saved checkpoints
    learning_rate=2e-5,                                 # Learning rate
    weight_decay=0.01,                                  # Weight decay for regularization
    metric_for_best_model="f1",                         # Metric to monitor for best model
    load_best_model_at_end=True,                        # Load the best model after training
)

# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=small_train_dataset,
    eval_dataset=small_eval_dataset,
    compute_metrics=compute_metrics,
)

# Start training
trainer.train()


In [None]:
# !torchrun --nproc_per_node=2 ddp_trainer.py