In [12]:
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [13]:
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel
from torch.distributed import init_process_group, destroy_process_group

- All to one: reduce; one to All: broadcast
- rank: [0, world_size - 1]

In [14]:
def ddp_setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12345"
    
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)


In [15]:
class Trainer:
    
    def __init__(
        self,
        model: torch.nn.Module,
        train_dataloader: DataLoader,
        optimizer: torch.optim.Optimizer,
        gpu_id: int,
    ) -> None:
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_dataloader = train_dataloader
        self.optimizer = optimizer
        self.model = DistributedDataParallel(model, device_ids=[gpu_id])
    
    def _run_batch(self, xs, ys):
        self.optimizer.zero_grad()
        output = self.model(xs)
        loss = F.cross_entropy(output, ys)
        loss.backward()
        self.optimizer.step()
        
    
    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        print(f"[GPU: {self.gpu_id} Epoch: {epoch}, Batch size: {batch_size} | Steps {len(self.train_dataloader)}]")
        self.train_dataloader.sampler.set_epoch(epoch) # type: ignore
        for xs, ys in self.train_dataloader:
            xs = xs.to(self.gpu_id)
            ys = ys.to(self.gpu_id)
            self._run_batch(xs, ys)
    
    def train(self, max_epoch: int):
        for epoch in range(max_epoch):
            self._run_epoch(epoch)


In [16]:
class MyTrainDataset(Dataset):
    
    def __init__(self, size) -> None:
        super().__init__()
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]
    
    def __len__(self):
        return self.size

    def __getitem__(self, index):
        return self.data[index]


In [17]:
train_dataset = MyTrainDataset(2048)
train_dataset[0]

(tensor([0.7652, 0.2057, 0.5130, 0.2730, 0.3109, 0.4562, 0.1879, 0.2597, 0.4491,
         0.8231, 0.3092, 0.9490, 0.8383, 0.4265, 0.8909, 0.0449, 0.5804, 0.8927,
         0.2376, 0.4476]),
 tensor([0.8712]))

In [18]:
def main(rank: int, world_size: int, max_epochs: int, batch_size: int):
    ddp_setup(rank, world_size)
    
    train_dataset = MyTrainDataset(2048)
    train_dataloader = DataLoader(
        train_dataset, 
        batch_size=batch_size,
        pin_memory=True,
        shuffle=True,
        sampler=DistributedSampler(train_dataset),
    )
    
    model = torch.nn.Linear(20, 1)
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
    
    trainer = Trainer(
        model=model, 
        gpu_id=rank, 
        optimizer=optimizer, 
        train_dataloader=train_dataloader
    )
    
    trainer.train(max_epochs)
    
    destroy_process_group()
    

In [19]:
world_size = torch.cuda.device_count()
world_size

4

In [20]:
2048/32

64.0

In [21]:
!python ddp_gpus.py --max_epochs 5 --batch_size 32

[GPU: 0 Epoch: 0, Batch size: 32 | Steps 16]
[GPU: 3 Epoch: 0, Batch size: 32 | Steps 16][GPU: 1 Epoch: 0, Batch size: 32 | Steps 16][GPU: 2 Epoch: 0, Batch size: 32 | Steps 16]


[GPU: 3 Epoch: 1, Batch size: 32 | Steps 16][GPU: 0 Epoch: 1, Batch size: 32 | Steps 16]

[GPU: 2 Epoch: 1, Batch size: 32 | Steps 16]
[GPU: 1 Epoch: 1, Batch size: 32 | Steps 16]
[GPU: 2 Epoch: 2, Batch size: 32 | Steps 16][GPU: 0 Epoch: 2, Batch size: 32 | Steps 16]

[GPU: 3 Epoch: 2, Batch size: 32 | Steps 16]
[GPU: 1 Epoch: 2, Batch size: 32 | Steps 16]
[GPU: 2 Epoch: 3, Batch size: 32 | Steps 16]
[GPU: 3 Epoch: 3, Batch size: 32 | Steps 16]
[GPU: 0 Epoch: 3, Batch size: 32 | Steps 16]
[GPU: 1 Epoch: 3, Batch size: 32 | Steps 16]
[GPU: 2 Epoch: 4, Batch size: 32 | Steps 16][GPU: 0 Epoch: 4, Batch size: 32 | Steps 16]

[GPU: 3 Epoch: 4, Batch size: 32 | Steps 16]
[GPU: 1 Epoch: 4, Batch size: 32 | Steps 16]


In [22]:
!torchrun ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32

[GPU: 0 Epoch: 0, Batch size: 32 | Steps 64]
[GPU: 0 Epoch: 1, Batch size: 32 | Steps 64]
[GPU: 0 Epoch: 2, Batch size: 32 | Steps 64]
[GPU: 0 Epoch: 3, Batch size: 32 | Steps 64]
[GPU: 0 Epoch: 4, Batch size: 32 | Steps 64]


In [23]:
!torchrun --nproc-per-node 4 ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32

W1213 23:05:32.270000 183123 site-packages/torch/distributed/run.py:803] 
W1213 23:05:32.270000 183123 site-packages/torch/distributed/run.py:803] *****************************************
W1213 23:05:32.270000 183123 site-packages/torch/distributed/run.py:803] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W1213 23:05:32.270000 183123 site-packages/torch/distributed/run.py:803] *****************************************
[GPU: 2 Epoch: 0, Batch size: 32 | Steps 16]
[GPU: 0 Epoch: 0, Batch size: 32 | Steps 16]
[GPU: 3 Epoch: 0, Batch size: 32 | Steps 16][GPU: 1 Epoch: 0, Batch size: 32 | Steps 16]

[GPU: 0 Epoch: 1, Batch size: 32 | Steps 16][GPU: 2 Epoch: 1, Batch size: 32 | Steps 16]

[GPU: 1 Epoch: 1, Batch size: 32 | Steps 16]
[GPU: 3 Epoch: 1, Batch size: 32 | Steps 16]
[GPU: 0 Epoch: 2, Batch size: 32 | Steps 16]
[GPU: 2 