# DistributedDataParallel (DDP)

이전에는 `torch.nn.DataParallel`을 사용하여 구현했지만 이건 multithreading 쓰는 방법과 큰 차이가 없어서 결국 GIL의 영향을 받습니다.

보통 multi GPU 환경에서 쓰기 위해서 쓰이는데, GPU 각각이 연산 slave 처럼 작동합니다만.. 이때도 역시 GIL 때문에 parameter server 가 되는 main GPU 혹은 CPU 에서 병목이 발생합니다.

그래서 DDP 로 발전됐는데, 이 경우 완전히 multiprocessing 방식으로 작동하면서도, 각 프로세스가 독립적으로 GIL을 갖고 있으므로 병목이 없습니다.

데이터를 복사해서 각각의 GPU에 넣는게 문제인데, 이건 또 각자가 직접 불러올 수 있는 형태의 dataloader를 사용하도록 변경되었습니다 `DistributedSampler` 라는 녀석을 사용합니다.


이 과정이 일반적인 torch model 을 학습하는 방식과 거의 동일하게 할 수 있기 때문에 많이 사용하곤 합니다~




In [2]:
import os
import argparse
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import torch.multiprocessing as mp # 그냥 multiprocessing 보다 다양한 통신 backend를 지원해서 빠름
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group

## multi GPU / CPU

In [5]:
def ddp_setup(rank: int, world_size: int, backend: str = "nccl"):
   """
   Args:
       rank: Unique identifier of each process
       world_size: Total number of processes
   """
   os.environ["MASTER_ADDR"] = "localhost"
   os.environ["MASTER_PORT"] = "12355"
   torch.cuda.set_device(rank)
   init_process_group(backend=backend, rank=rank, world_size=world_size)
   # 여기서 nccl 은 gpu 끼리 통신할 때 빠른 속도를 내기 위해서 사용하는 backend
   # gloo 는 cpu 끼리 통신할 때 빠른 속도를 내기 위해서 사용하는 backend

In [None]:
self.model = DDP(model, device_ids=[gpu_id])

In [None]:
train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
    shuffle=False,  # We don't shuffle
    sampler=DistributedSampler(train_dataset), # Use the Distributed Sampler here.
)

In [None]:
def _run_epoch(self, epoch):
    b_sz = len(next(iter(self.train_data))[0])
    self.train_data.sampler.set_epoch(epoch)   # call this additional line at every epoch
    for source, targets in self.train_data:
      ...
      self._run_batch(source, targets)

### training code (full)

In [7]:
def ddp_setup(rank: int, world_size: int):
    """
    CPU를 위한 DDP 설정 함수
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12555"
    init_process_group(backend="gloo", rank=rank, world_size=world_size)

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(28 * 28, 512)
        self.fc2 = nn.Linear(512, 10)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = x.view(-1, 28 * 28)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    
class Trainer:
    def __init__(
        self,
        model: nn.Module,
        train_data: torch.utils.data.DataLoader,
        optimizer: torch.optim.Optimizer,
        rank: int, # 현재 프로세스의 랭크 (이전의 gpu_id 대체)
        save_every: int,
    ):
        self.rank = rank
        self.model = model # 모델은 CPU에 위치
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.epochs_run = 0
        # DDP 모델로 감싸기 (device_ids 제거)
        self.model = DDP(self.model)

    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        loss = nn.CrossEntropyLoss()(output, targets)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        self.train_data.sampler.set_epoch(epoch)
        # 데이터는 이미 CPU에 있으므로 .to(device) 호출 불필요
        for source, targets in self.train_data:
            self._run_batch(source, targets)

    def _save_checkpoint(self, epoch):
        ckp = self.model.module.state_dict()
        PATH = "checkpoint.pt"
        torch.save(ckp, PATH)
        print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            # rank 0 프로세스만 체크포인트를 저장
            if self.rank == 0 and (epoch + 1) % self.save_every == 0:
                self._save_checkpoint(epoch + 1)
                

def prepare_dataloader(batch_size: int):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    dataset = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
    return torch.utils.data.DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=False,
        sampler=DistributedSampler(dataset)
    )

def main(rank: int, world_size: int, total_epochs: int, save_every: int, batch_size: int):
    ddp_setup(rank, world_size)
    
    model = MLP()
    optimizer = optim.SGD(model.parameters(), lr=1e-3)
    train_data = prepare_dataloader(batch_size)
    
    print(f"Rank {rank} 시작")
    
    trainer = Trainer(model, train_data, optimizer, rank, save_every)
    trainer.train(total_epochs)
    
    destroy_process_group()

In [8]:
# Jupyter/Colab 환경을 위한 임시 args
class Args:
    total_epochs = 5
    save_every = 2
    batch_size = 32
args = Args()

# 사용 가능한 CPU 코어 수를 world_size로 설정 (전부를 다 쓰거나 일부만 쓸 수 있음)
world_size = mp.cpu_count()
print(f"{world_size}개의 CPU 코어 존재")

# 사용 가능한 GPU 수를 world_size로 설정 (전부를 다 쓰거나 일부만 쓸 수 있음)
# world_size = torch.cuda.device_count()
# print(f"{world_size}개의 GPU 존재")

12개의 CPU 코어 존재


In [9]:
world_size = 4 # mp.cpu_count()

print(f"{world_size}개의 CPU 코어를 사용하여 훈련을 시작합니다.")
mp.spawn(main, args=(world_size, args.total_epochs, args.save_every, args.batch_size), nprocs=world_size)
print("모든 훈련 프로세스가 완료되었습니다.")

4개의 CPU 코어를 사용하여 훈련을 시작합니다.


W1015 22:56:42.452000 25964 torch\multiprocessing\spawn.py:169] Terminating process 17652 via signal SIGTERM
W1015 22:56:42.454000 25964 torch\multiprocessing\spawn.py:169] Terminating process 25200 via signal SIGTERM
W1015 22:56:42.455000 25964 torch\multiprocessing\spawn.py:169] Terminating process 14680 via signal SIGTERM


ProcessExitedException: process 0 terminated with exit code 1

## AllReduce
