In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory


import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# DDP with 2 GPUs

In [2]:
%%writefile train_ddp.py
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from torch.distributed import init_process_group, destroy_process_group
import torch.distributed as dist


def ddp_setup():
    local_rank = int(os.environ["LOCAL_RANK"])
    dist.init_process_group(
        backend="nccl",
        device_id=local_rank
    )
    torch.cuda.set_device(local_rank)
    return local_rank

class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 5, padding=2)
        self.conv2 = nn.Conv2d(32, 64, 5, padding=2)
        self.conv3 = nn.Conv2d(64, 128, 5, padding=2)
        self.pool = nn.MaxPool2d(2)
        self.fc1 = nn.Linear(128 * 3 * 3, 128)
        self.fc2 = nn.Linear(128, 10)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        return self.fc2(x)

def main():
    local_rank = ddp_setup()
    rank = torch.distributed.get_rank()
    world_size = torch.distributed.get_world_size()

    if rank == 0:
        print(f"Running DDP on {world_size} GPUs")

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])

    # Download only on rank 0
    if rank == 0:
        datasets.MNIST("./data", train=True, download=True)
        datasets.MNIST("./data", train=False, download=True)
    torch.distributed.barrier()

    train_dataset = datasets.MNIST("./data", train=True, transform=transform)
    train_sampler = DistributedSampler(train_dataset)
    train_loader = DataLoader(
        train_dataset,
        batch_size=128,
        sampler=train_sampler,
        num_workers=2,
        pin_memory=True
    )

    test_dataset = datasets.MNIST("./data", train=False, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=128)

    model = CNN().cuda()
    model = DDP(model, device_ids=[local_rank])

    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(5):
        model.train()
        train_sampler.set_epoch(epoch)

        correct = total = 0
        for x, y in train_loader:
            x = x.cuda(non_blocking=True)
            y = y.cuda(non_blocking=True)

            optimizer.zero_grad()
            out = model(x)
            loss = criterion(out, y)
            loss.backward()
            optimizer.step()

            correct += (out.argmax(1) == y).sum().item()
            total += y.size(0)

        if rank == 0:
            print(f"Epoch {epoch+1} | Train Acc: {100*correct/total:.2f}%")

    if rank == 0:
        model.eval()
        correct = total = 0
        with torch.no_grad():
            for x, y in test_loader:
                x, y = x.cuda(), y.cuda()
                out = model.module(x)
                correct += (out.argmax(1) == y).sum().item()
                total += y.size(0)
        print(f"\nTest Accuracy: {100*correct/total:.2f}%")

    destroy_process_group()

if __name__ == "__main__":
    main()


Writing train_ddp.py


In [3]:
# Cell 3: Check how many GPUs you have
import torch
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"Number of GPUs: {torch.cuda.device_count()}")
if torch.cuda.is_available():
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")

CUDA available: True
Number of GPUs: 2
GPU Name: Tesla T4


In [4]:


!torchrun --standalone --nproc_per_node=2 train_ddp.py   # Use 1 for single GPU (safe)
# OR if you have 2 GPUs (Pro/Pro+):
# !torchrun --standalone --nproc_per_node=2 train_ddp.py

W0105 07:15:14.682000 119 torch/distributed/run.py:774] 
W0105 07:15:14.682000 119 torch/distributed/run.py:774] *****************************************
W0105 07:15:14.682000 119 torch/distributed/run.py:774] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0105 07:15:14.682000 119 torch/distributed/run.py:774] *****************************************
[W105 07:15:14.103082572 socket.cpp:200] [c10d] The hostname of the client socket cannot be retrieved. err=-3
[W105 07:15:14.103945206 socket.cpp:200] [c10d] The hostname of the client socket cannot be retrieved. err=-3
[W105 07:15:20.940558886 socket.cpp:200] [c10d] The hostname of the client socket cannot be retrieved. err=-3
[W105 07:15:20.940645380 socket.cpp:200] [c10d] The hostname of the client socket cannot be retrieved. err=-3
[W105 07:15:20.941322093 socket.cpp:200] 

# FSDP with 2 GPUs

In [13]:
%%writefile train_fsdp.py
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from functools import partial

from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from torchvision import datasets, transforms

from torch.distributed.fsdp import (
    FullyShardedDataParallel as FSDP,
    ShardingStrategy,
)
from torch.distributed.fsdp.wrap import size_based_auto_wrap_policy


# ------------------ FSDP SETUP ------------------
def fsdp_setup():
    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)

    dist.init_process_group(
        backend="nccl",
        device_id=local_rank
    )
    return local_rank


# ------------------ MODEL ------------------
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 5, padding=2)
        self.conv2 = nn.Conv2d(32, 64, 5, padding=2)
        self.conv3 = nn.Conv2d(64, 128, 5, padding=2)
        self.pool = nn.MaxPool2d(2)
        self.fc1 = nn.Linear(128 * 3 * 3, 128)
        self.fc2 = nn.Linear(128, 10)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        return self.fc2(x)


# ------------------ MAIN ------------------
def main():
    local_rank = fsdp_setup()
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    if rank == 0:
        print(f"Running FSDP on {world_size} GPUs")

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])

    # Download dataset only once
    if rank == 0:
        datasets.MNIST("./data", train=True, download=True)
        datasets.MNIST("./data", train=False, download=True)
    dist.barrier()

    train_dataset = datasets.MNIST("./data", train=True, transform=transform)
    train_sampler = DistributedSampler(train_dataset)
    train_loader = DataLoader(
        train_dataset,
        batch_size=128,
        sampler=train_sampler,
        num_workers=2,
        pin_memory=True
    )

    test_dataset = datasets.MNIST("./data", train=False, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=128)

    model = CNN().cuda()

    # Auto-wrap policy (wrap layers larger than threshold)
    auto_wrap_policy = partial(
    size_based_auto_wrap_policy,
    min_num_params=1_000_000
)

    model = FSDP(
        model,
        auto_wrap_policy=auto_wrap_policy,
        sharding_strategy=ShardingStrategy.FULL_SHARD,
        device_id=local_rank
    )

    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()

    # ------------------ TRAIN ------------------
    for epoch in range(5):
        model.train()
        train_sampler.set_epoch(epoch)

        correct = total = 0
        for x, y in train_loader:
            x = x.cuda(non_blocking=True)
            y = y.cuda(non_blocking=True)

            optimizer.zero_grad()
            out = model(x)
            loss = criterion(out, y)
            loss.backward()
            optimizer.step()

            correct += (out.argmax(1) == y).sum().item()
            total += y.size(0)

        if rank == 0:
            print(f"Epoch {epoch+1} | Train Acc: {100*correct/total:.2f}%")

# ------------------ EVAL (ALL RANKS PARTICIPATE) ------------------
    model.eval()
    correct = torch.tensor(0, device="cuda")
    total = torch.tensor(0, device="cuda")
    
    with torch.no_grad():
        for x, y in test_loader:
            x = x.cuda(non_blocking=True)
            y = y.cuda(non_blocking=True)
    
            out = model(x)
            correct += (out.argmax(1) == y).sum()
            total += y.numel()
    
    # Reduce results to rank 0
    dist.reduce(correct, dst=0, op=dist.ReduceOp.SUM)
    dist.reduce(total, dst=0, op=dist.ReduceOp.SUM)
    
    if rank == 0:
        print(f"\nTest Accuracy: {100 * correct.item() / total.item():.2f}%")

    dist.destroy_process_group()


if __name__ == "__main__":
    main()


Overwriting train_fsdp.py


In [14]:
!torchrun --standalone --nproc_per_node=2 train_fsdp.py

W0105 07:25:18.235000 819 torch/distributed/run.py:774] 
W0105 07:25:18.235000 819 torch/distributed/run.py:774] *****************************************
W0105 07:25:18.235000 819 torch/distributed/run.py:774] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0105 07:25:18.235000 819 torch/distributed/run.py:774] *****************************************
[W105 07:25:18.598438027 socket.cpp:200] [c10d] The hostname of the client socket cannot be retrieved. err=-3
[W105 07:25:18.599278289 socket.cpp:200] [c10d] The hostname of the client socket cannot be retrieved. err=-3
[W105 07:25:22.198142807 socket.cpp:200] [c10d] The hostname of the client socket cannot be retrieved. err=-3
[W105 07:25:22.198297641 socket.cpp:200] [c10d] The hostname of the client socket cannot be retrieved. err=-3
[W105 07:25:22.199048230 socket.cpp:200] 