In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
import torch
torch.cuda.device_count()

2

**single node multi gpu**

In [42]:
%%writefile multigpu.py
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os

def ddp_setup(rank, world_size):
    """
    rank: unique id of eeach process
    world_size: no of gpus or devices
    """

    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    torch.cuda.set_device(rank)
    init_process_group(backend="nccl", rank=rank, world_size=world_size)

class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(10), torch.rand(1)) for _ in range(size)]

    def __len__(self):
        return self.size
    
    def __getitem__(self, index):
        return self.data[index]

class Trainer:
    def __init__(self, model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        gpu_id: int,
        save_every: int) -> None:

        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.optimizer = optimizer
        self.train_data = train_data
        self.save_every = save_every
        self.model = DDP(model, device_ids = [gpu_id])


    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        loss = F.cross_entropy(output, targets)
        loss.backward()
        self.optimizer.step()


    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_data))[0])
        print(f"[GPU {self.gpu_id}] Epoch {epoch} | \
                Batchsize: {batch_size} | Steps: {len(self.train_data)}")

        self.train_data.sampler.set_epoch(epoch)
        for source, targets in self.train_data:
            source = source.to(self.gpu_id)
            targets = targets.to(self.gpu_id)
            self._run_batch(source, targets)

    def _save_checkpoint(self, epoch):
        check_point = self.model.state_dict()
        path = "checkpoint.pt"
        torch.save(check_point, path)
        print(f"Epoch {epoch} | Training checkpoint saved at {path}")

    def train(self, max_epochs):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            if epoch % self.save_every == 0:
                self._save_checkpoint(epoch)
        

def load_train_objs():
    train_set = MyTrainDataset(20248)
    model = torch.nn.Linear(10, 1)
    optimizer = torch.optim.SGD(model.parameters(), lr = 1e-3)
    return train_set, model, optimizer

def prepare_dataloader(dataset, batch_size):
    return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=False, sampler = DistributedSampler(dataset))

def main(rank, world_size, total_epochs, save_every, batch_size):

    ddp_setup(rank, world_size)
    dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_data, optimizer, rank, save_every)
    trainer.train(total_epochs)

if __name__ == "__main__":
    world_size = torch.cuda.device_count()
    mp.spawn(main, args=(world_size, 20, 5, 16), nprocs=world_size)

Overwriting multigpu.py


In [43]:
%%script bash
python3 multigpu.py

[GPU 1] Epoch 0 |                 Batchsize: 16 | Steps: 633
Epoch 0 | Training checkpoint saved at checkpoint.pt
[GPU 1] Epoch 1 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 2 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 3 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 4 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 5 |                 Batchsize: 16 | Steps: 633
Epoch 5 | Training checkpoint saved at checkpoint.pt
[GPU 1] Epoch 6 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 7 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 8 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 9 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 10 |                 Batchsize: 16 | Steps: 633
Epoch 10 | Training checkpoint saved at checkpoint.pt
[GPU 1] Epoch 11 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 12 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 13 |                 Batchsize

In [53]:
%%writefile torchrun_multigpu.py
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os

def ddp_setup():
    """
    rank: unique id of eeach process
    world_size: no of gpus or devices
    """
    
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    init_process_group(backend="nccl")

class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(10), torch.rand(1)) for _ in range(size)]

    def __len__(self):
        return self.size
    
    def __getitem__(self, index):
        return self.data[index]

class Trainer:
    def __init__(self, model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        save_every: int,
        snapshot_path) -> None:

        self.gpu_id = int(os.environ["LOCAL_RANK"])
        self.model = model.to(self.gpu_id)
        self.optimizer = optimizer
        self.train_data = train_data
        self.save_every = save_every
        
        self.snapshot_path = snapshot_path

        if os.path.exists(snapshot_path):
            print("Loading snapshot")
            self._load_snapshot(snapshot_path)

        self.model = DDP(model, device_ids = [self.gpu_id])

    
    def _load_snapshot(self, snapshot_path):
        device_loc = f"cuda:{self.gpu_id}"
        snapshot = torch.load(snapshot_path, map_location=device_loc)
        self.model.load_state_dict(snapshot["MODEL_STATE"])
        self.epochs_run = snapshot["EPOCHS_RUN"]
        print(f"Resuming training from snapshot at Epoch {self.epochs_run}")

        
    def _save_snapshot(self, epoch):
        snapshot = {
            "MODEL_STATE": self.model.module.state_dict(),
            "EPOCHS_RUN": epoch,
        }
        torch.save(snapshot, self.snapshot_path)
        print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")

    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        loss = F.cross_entropy(output, targets)
        loss.backward()
        self.optimizer.step()


    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_data))[0])
        print(f"[GPU {self.gpu_id}] Epoch {epoch} | \
                Batchsize: {batch_size} | Steps: {len(self.train_data)}")

        self.train_data.sampler.set_epoch(epoch)
        for source, targets in self.train_data:
            source = source.to(self.gpu_id)
            targets = targets.to(self.gpu_id)
            self._run_batch(source, targets)

    def _save_checkpoint(self, epoch):
        check_point = self.model.state_dict()
        path = "checkpoint.pt"
        torch.save(check_point, path)
        print(f"Epoch {epoch} | Training checkpoint saved at {path}")

    def train(self, max_epochs):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            if epoch % self.save_every == 0:
                self._save_checkpoint(epoch)
        

def load_train_objs():
    train_set = MyTrainDataset(20248)
    model = torch.nn.Linear(10, 1)
    optimizer = torch.optim.SGD(model.parameters(), lr = 1e-3)
    return train_set, model, optimizer

def prepare_dataloader(dataset, batch_size):
    return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=False, sampler = DistributedSampler(dataset))

def main(total_epochs, save_every, batch_size, snapshot_path = "snapshot.pt"):

    ddp_setup()
    dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_data, optimizer, save_every, snapshot_path)
    trainer.train(total_epochs)

if __name__ == "__main__":
    main(20, 5, 16)

Overwriting torchrun_multigpu.py


In [54]:
%%script bash
torchrun --standalone --nproc_per_node=gpu torchrun_multigpu.py

[GPU 1] Epoch 0 |                 Batchsize: 16 | Steps: 633[GPU 0] Epoch 0 |                 Batchsize: 16 | Steps: 633

Epoch 0 | Training checkpoint saved at checkpoint.pt
Epoch 0 | Training checkpoint saved at checkpoint.pt
[GPU 1] Epoch 1 |                 Batchsize: 16 | Steps: 633[GPU 0] Epoch 1 |                 Batchsize: 16 | Steps: 633

[GPU 1] Epoch 2 |                 Batchsize: 16 | Steps: 633
[GPU 0] Epoch 2 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 3 |                 Batchsize: 16 | Steps: 633
[GPU 0] Epoch 3 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 4 |                 Batchsize: 16 | Steps: 633
[GPU 0] Epoch 4 |                 Batchsize: 16 | Steps: 633
[GPU 1] Epoch 5 |                 Batchsize: 16 | Steps: 633
[GPU 0] Epoch 5 |                 Batchsize: 16 | Steps: 633
Epoch 5 | Training checkpoint saved at checkpoint.ptEpoch 5 | Training checkpoint saved at checkpoint.pt

[GPU 1] Epoch 6 |                 Batchsize: 16 | Steps:

W0320 11:39:51.439000 307 torch/distributed/run.py:793] 
W0320 11:39:51.439000 307 torch/distributed/run.py:793] *****************************************
W0320 11:39:51.439000 307 torch/distributed/run.py:793] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0320 11:39:51.439000 307 torch/distributed/run.py:793] *****************************************
