# Pytorch Distributed Data parallel(DDP) processing tutorial

1. Import pytorch module for do DDP processing

In [1]:
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader

from torchvision import transforms
from torchvision.datasets import ImageFolder

  from .autonotebook import tqdm as notebook_tqdm


2. Design own your model

In [3]:
class FClayer(nn.Module):
    def __init__(self, dim_in, dim_out) -> None:
        super().__init__()

        self.main = nn.Sequential(nn.Linear(dim_in, dim_out),
                                  nn.ReLU())

    def forward(self, x):   
        return self.main(x)

class simpleNN(nn.Module):
    def __init__(self, dim_in, dim_out, hidden_dim, num_layers) -> None:
        super().__init__()
        self.num_layers = num_layers

        features_list = [dim_in] + [hidden_dim] * (num_layers - 1) + [dim_out]

        for idx in range(num_layers):
            _in = features_list[idx]
            _out = features_list[idx + 1]
            if idx != (num_layers - 1):
                layer = FClayer(_in, _out)
            else:
                layer = nn.Linear(_in, _out)
            setattr(self, f'fc{idx}', layer)

    def forward(self, x):
        for idx in range(self.num_layers):
            layer = getattr(self, f'fc{idx}')
            x = layer(x)

        return x
    

model = simpleNN(dim_in=64, dim_out=4, hidden_dim=256, num_layers=8)
print(model)

simpleNN(
  (fc0): FClayer(
    (main): Sequential(
      (0): Linear(in_features=64, out_features=256, bias=True)
      (1): ReLU()
    )
  )
  (fc1): FClayer(
    (main): Sequential(
      (0): Linear(in_features=256, out_features=256, bias=True)
      (1): ReLU()
    )
  )
  (fc2): FClayer(
    (main): Sequential(
      (0): Linear(in_features=256, out_features=256, bias=True)
      (1): ReLU()
    )
  )
  (fc3): FClayer(
    (main): Sequential(
      (0): Linear(in_features=256, out_features=256, bias=True)
      (1): ReLU()
    )
  )
  (fc4): FClayer(
    (main): Sequential(
      (0): Linear(in_features=256, out_features=256, bias=True)
      (1): ReLU()
    )
  )
  (fc5): FClayer(
    (main): Sequential(
      (0): Linear(in_features=256, out_features=256, bias=True)
      (1): ReLU()
    )
  )
  (fc6): FClayer(
    (main): Sequential(
      (0): Linear(in_features=256, out_features=256, bias=True)
      (1): ReLU()
    )
  )
  (fc7): Linear(in_features=256, out_features=4, bias

3. Define functions

In [None]:
def main(args):
    n_gpus = torch.cuda.device_count()
    args.world_size = n_gpus

    if args.use_ddp: # a flag for using Pytorch DDP
        args.world_size = n_gpus * args.world_size
        args.num_workers = n_gpus * 4 # it's common to multiply 4 with how many gpu you have
        args.batch_size = n_gpus * args.batch_size # to split batch per each gpu
        args.val_batch_size = n_gpus * args.val_batch_size # same above
        mp.spawn(main_worker, nprocs=n_gpus, args=(n_gpus, args)) # pytorch multiprocessing spawn
    else: # a flag for not using Pytorch DDP
        args.gpu = 0 # set first gpu id
        main_worker(args.gpu, n_gpus, args)

def main_worker(gpu, ngpus_per_node, args):
    args.gpu = gpu
    torch.cuda.set_device(args.gpu)

    if args.gpu is not None:
        print("Use GPU: {} for training".format(args.gpu))

    if args.distributed:
        if args.is_distributed:
            args.rank = args.rank * ngpus_per_node + gpu
        dist.init_process_group(backend='nccl', # 'gloo', 'mpi', or 'nccl', check out they have different capabilities
                                init_method='tcp://127.0.0.1:88', #{localhost}:PORT
                                world_size=args.world_size, 
                                rank=args.rank)

    # load dataset
    transform = transforms.Compose([transforms.Resize([256, 256]),
                                    transforms.ToTensor(),
                                    transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                                        std=[0.5, 0.5, 0.5])])
    dataset = ImageFolder("your directory", transform=transform)

    sampler = DistributedSampler(dataset=dataset, shuffle=True) # split train data per each process
    dataloader = DataLoader(dataset, 
                            batch_size=int(args.batch_size / args.world_size),
                            shuffle=False,
                            num_workers=int(args.num_workers / args.world_size),
                            sampler=sampler,
                            pin_memory=True)
    
    # create model
    model = simpleNN(dim_in=64, dim_out=4, hidden_dim=256, num_layers=8)
    model = model.to(model)
    model = DistributedDataParallel(model, device_ids=[args.gpu])

    criterion = nn.MSELoss()

    for i in range(0, 10000):
        input_ = next(dataloader)
        x, y = input_.x, input_.y

        out = model(x)
        loss = criterion(y, out)

        loss.backward()

        '''
        ...
        '''