# PyTorch : Data Parallelism multi-GPU et multi-nœuds
## Mise en pratique

*Notebook rédigé par l'équipe assistance IA de l'IDRIS, novembre 2020*

Ce document présente la méthode à adopter sur Jean Zay pour distribuer votre entraînement PyTorch selon la méthode du ***Data Parallelism***. Il prend comme référence la [documentation pytorch](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html) et illustre la [documentation IDRIS](http://www.idris.fr/jean-zay/gpu/jean-zay-gpu-torch-multi.html).

Dans l'exemple proposé, nous entraînons un réseau de neurones convolutionnel sur la base de données MNIST. L'apprentissage s'exécute sur plusieurs GPU et plusieurs nœuds de calcul Jean Zay.

Il s'agit ici de :
* préparer la base de données MNIST
* rédiger le script Python pour l'apprentissage distribué (Data Parallelism)
* réaliser une exécution parallèle sur Jean Zay

Il est à noter que les données MNIST et le modèle utilisé dans cet exemple sont très simples. Cela permet de présenter un code court et de tester rapidement la configuration du *Data Parallelism*, mais pas de mesurer une accélération de l'apprentissage. En effet, les temps de transfert entre GPU et le temps d'initialisation des *kernels* GPU ne sont pas négligeables par rapport aux temps d'exécution.

------------------------

### Environnement de calcul

Ce notebook est prévu pour être exécuté à partir d'une machine frontale de Jean-Zay. Le *hostname* doit être jean-zay[1-5].

In [1]:
!hostname

jean-zay2


Un module PyTorch doit avoir été chargé pour le bon fonctionnement de ce Notebook. Par exemple, le module `pytorch-gpu/py3/1.7.0` :

In [2]:
!module list

[?1h=Currently Loaded Modulefiles:[m
 1) gcc/8.3.1           4) cudnn/8.0.4.30-cuda-10.2   7) [4mopenmpi/4.0.5-cuda[0m     [m
 2) cuda/10.2           5) intel-mkl/2020.4           8) pytorch-gpu/py3/1.8.0  [m
 3) nccl/2.8.3-1-cuda   6) magma/2.5.4-cuda          [m
[K[?1l>

Création d'un dossier `checkpoint` si il n'existe pas.

In [3]:
!mkdir checkpoint
!rm checkpoint/*

mkdir: cannot create directory ‘checkpoint’: File exists


------------------------------------

### Préparation de la base de données MNIST

La base de données MNIST est disponible sur Jean Zay dans le DSDIR.

**Remarque** : le DSDIR, comme le SCRATCH, est un espace disque GPFS dont la bande passante est d'environ 300 Go/s en écriture et en lecture. Ils sont à privilégier pour les codes ayant une utilisation intense des opérations d'entrées/sorties. Votre space personnel SCRATCH est dédié à vos bases privées et l'espace commun DSDIR comprend la plupart des bases publiques.

Vous pouvez tester l'accès aux données avec la commande ci-dessous :

In [4]:
import os
import torchvision
import torchvision.transforms as transforms

torchvision.datasets.MNIST(root=os.environ['DSDIR'],
                            train=True,
                            transform=transforms.ToTensor(),
                            download=False)

Dataset MNIST
    Number of datapoints: 60000
    Root location: /gpfsdswork/dataset
    Split: Train
    StandardTransform
Transform: ToTensor()

### Rédaction du script Python pour l'apprentissage distribué (Data Parallelism)

Dans cette section, nous rédigeons le script Python d'entraînement dans le fichier 'mnist-distributed.py'.

* Chargement des librairies et définition de la fonction *main* :

In [5]:
%%writefile mnist-distributed.py 

import os
from datetime import datetime
from time import time
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
import idr_torch

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-b', '--batch-size', default=128, type =int,
                        help='batch size. it will be divided in mini-batch for each worker')
    parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
                        help='number of total epochs to run')
    parser.add_argument('-c','--checkpoint', default=None, type=str,
                        help='path to checkpoint to load')
    args = parser.parse_args()

    train(args)     

Overwriting mnist-distributed.py


* Création du modèle d'apprentissage (réseau de neurones convulationnel simple à 2 couches) :

In [6]:
%%writefile -a mnist-distributed.py

class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out


Appending to mnist-distributed.py


* Définition de la fonction d'apprentissage distribué (les *timers* et les affichages sont gérés par le *process* 0, qui est le *process* maître)

In [7]:
%%writefile -a mnist-distributed.py

def train(args):
    
    # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
    dist.init_process_group(backend='nccl', init_method='env://', world_size=idr_torch.size, rank=idr_torch.rank)
    
    # distribute model
    torch.cuda.set_device(idr_torch.local_rank)
    gpu = torch.device("cuda")
    model = ConvNet().to(gpu)
    ddp_model = DistributedDataParallel(model, device_ids=[idr_torch.local_rank])
    if args.checkpoint is not None:
        map_location = {'cuda:%d' % 0: 'cuda:%d' % idr_torch.local_rank}
        ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
    
    # distribute batch size (mini-batch)
    batch_size = args.batch_size 
    batch_size_per_gpu = batch_size // idr_torch.size
    
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss()  
    optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)

    # load data with distributed sampler
    train_dataset = torchvision.datasets.MNIST(root=os.environ['DSDIR'],
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=False)
    
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                    num_replicas=idr_torch.size,
                                                                    rank=idr_torch.rank)
    
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size_per_gpu,
                                               shuffle=False,
                                               num_workers=0,
                                               pin_memory=True,
                                               sampler=train_sampler)

    # training (timers and display handled by process 0)
    if idr_torch.rank == 0: start = datetime.now()         
    total_step = len(train_loader)
    
    for epoch in range(args.epochs):
        if idr_torch.rank == 0: start_dataload = time()
        
        for i, (images, labels) in enumerate(train_loader):
            
            # distribution of images and labels to all GPUs
            images = images.to(gpu, non_blocking=True)
            labels = labels.to(gpu, non_blocking=True) 
            
            if idr_torch.rank == 0: stop_dataload = time()

            if idr_torch.rank == 0: start_training = time()
            
            # forward pass
            outputs = ddp_model(images)
            loss = criterion(outputs, labels)

            # backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            if idr_torch.rank == 0: stop_training = time() 
            if (i + 1) % 200 == 0 and idr_torch.rank == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
                                                                        i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
                                                                        (stop_training - start_training)*1000))
            if idr_torch.rank == 0: start_dataload = time()
                    
        #Save checkpoint at every end of epoch
        if idr_torch.rank == 0:
            torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(idr_torch.size, epoch+1))
    

    if idr_torch.rank == 0:
        print(">>> Training complete in: " + str(datetime.now() - start))


Appending to mnist-distributed.py


* Définition de la fonction principale :

In [8]:
%%writefile -a mnist-distributed.py

if __name__ == '__main__':
    
    # get distributed configuration from Slurm environment
    NODE_ID = os.environ['SLURM_NODEID']
    MASTER_ADDR = os.environ['MASTER_ADDR']
    
    # display info
    if idr_torch.rank == 0:
        print(">>> Training on ", len(idr_torch.hostnames), " nodes and ", idr_torch.size, " processes, master node is ", MASTER_ADDR)
    print("- Process {} corresponds to GPU {} of node {}".format(idr_torch.rank, idr_torch.local_rank, NODE_ID))

    main()

Appending to mnist-distributed.py


### Exemple d'exécution mono-nœud mono-GPU

* Écriture du script batch de soumission

**Rappel**:  si votre unique projet dispose d'heures CPU et GPU ou si votre login est rattaché à plusieurs projets, vous devez impérativement préciser l'attribution sur laquelle doit être décomptée les heures consommées par vos calculs, en ajoutant l'option `--account=my_project@gpu` comme indiqué dans la [documentation IDRIS](http://www.idris.fr/jean-zay/cpu/jean-zay-cpu-doc_account.html).


In [9]:
%%writefile batch_monogpu.slurm
#!/bin/sh
#SBATCH --job-name=mnist_pytorch_monogpu
#SBATCH --output=mnist_pytorch_monogpu.out
#SBATCH --error=mnist_pytorch_monogpu.out
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --gres=gpu:1
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=00:10:00
#SBATCH --qos=qos_gpu-dev

# go into the submission directory 
cd ${SLURM_SUBMIT_DIR}

# cleans out modules loaded in interactive and inherited by default
module purge

# loading modules
module load pytorch-gpu/py3/1.7.0

# echo of launched commands
set -x

# code execution
srun python -u mnist-distributed.py --epochs 8 --batch-size 128

Overwriting batch_monogpu.slurm


* Soumission du script batch et affichage de la sortie

In [10]:
%%bash
# submit job
sbatch batch_monogpu.slurm

Submitted batch job 210415


In [11]:
# watch Slurm queue line until the job is done
# execution should take about 1 minute
import time
sq = !squeue -u $USER -n mnist_pytorch_monogpu
print(sq[0])
while len(sq) >= 2:
    print(sq[1],end='\r')
    time.sleep(5)
    sq = !squeue -u $USER -n mnist_pytorch_monogpu
print('\n Done!')

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON) 
            210415   gpu_p13 mnist_py  ssos040  R       0:49      1 r10i2n5 
 Done!


In [12]:
# display output
%cat mnist_pytorch_monogpu.out

Loading pytorch-gpu/py3/1.7.0
  Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.6.4-1-cuda
    cudnn/7.6.5.32-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
    openmpi/4.0.2-cuda
+ srun python -u mnist-distributed.py --epochs 8 --batch-size 128
>>> Training on  1  nodes and  1  processes, master node is  r10i2n5
- Process 0 corresponds to GPU 0 of node 0
Epoch [1/8], Step [200/469], Loss: 2.0360, Time data load: 10.056ms, Time training: 2.348ms
Epoch [1/8], Step [400/469], Loss: 1.8188, Time data load: 10.068ms, Time training: 2.361ms
Epoch [2/8], Step [200/469], Loss: 1.5095, Time data load: 10.085ms, Time training: 2.363ms
Epoch [2/8], Step [400/469], Loss: 1.3832, Time data load: 10.171ms, Time training: 2.345ms
Epoch [3/8], Step [200/469], Loss: 1.1738, Time data load: 10.049ms, Time training: 2.344ms
Epoch [3/8], Step [400/469], Loss: 1.1128, Time data load: 10.061ms, Time training: 2.354ms
Epoch [4/8], Step [200/469], Loss: 0.9567, Time data load: 10.052ms, Time traini

### Exemple d'exécution mono-nœud multi-GPU

* Écriture du script batch de soumission

**Rappel**:  si votre unique projet dispose d'heures CPU et GPU ou si votre login est rattaché à plusieurs projets, vous devez impérativement préciser l'attribution sur laquelle doit être décomptée les heures consommées par vos calculs, en ajoutant l'option `--account=my_project@gpu` comme indiqué dans la [documentation IDRIS](http://www.idris.fr/jean-zay/cpu/jean-zay-cpu-doc_account.html).


In [13]:
%%writefile batch_mononode.slurm
#!/bin/sh
#SBATCH --job-name=mnist_pytorch_mononode
#SBATCH --output=mnist_pytorch_mononode.out
#SBATCH --error=mnist_pytorch_mononode.out
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --gres=gpu:4
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=00:10:00
#SBATCH --qos=qos_gpu-dev

# go into the submission directory 
cd ${SLURM_SUBMIT_DIR}

# cleans out modules loaded in interactive and inherited by default
module purge

# loading modules
module load pytorch-gpu/py3/1.7.0

# echo of launched commands
set -x

# code execution
srun python -u mnist-distributed.py --epochs 8 --batch-size 128

Overwriting batch_mononode.slurm


* Soumission du script batch et affichage de la sortie

In [14]:
%%bash
# submit job
sbatch batch_mononode.slurm

Submitted batch job 210422


In [15]:
# watch Slurm queue line until the job is done
# execution should take less than 1 minute
import time
sq = !squeue -u $USER -n mnist_pytorch_mononode
print(sq[0])
while len(sq) >= 2:
    print(sq[1],end='\r')
    time.sleep(5)
    sq = !squeue -u $USER -n mnist_pytorch_mononode
print('\n Done!')

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON) 
            210422   gpu_p13 mnist_py  ssos040  R       0:23      1 r10i7n0 
 Done!


In [16]:
#display output 
%cat mnist_pytorch_mononode.out

Loading pytorch-gpu/py3/1.7.0
  Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.6.4-1-cuda
    cudnn/7.6.5.32-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
    openmpi/4.0.2-cuda
+ srun python -u mnist-distributed.py --epochs 8 --batch-size 128
- Process 3 corresponds to GPU 3 of node 0
>>> Training on  1  nodes and  4  processes, master node is  r10i7n0
- Process 0 corresponds to GPU 0 of node 0
- Process 1 corresponds to GPU 1 of node 0
- Process 2 corresponds to GPU 2 of node 0
Epoch [1/8], Step [200/469], Loss: 2.0835, Time data load: 2.685ms, Time training: 2.338ms
Epoch [1/8], Step [400/469], Loss: 1.8022, Time data load: 2.705ms, Time training: 2.336ms
Epoch [2/8], Step [200/469], Loss: 1.5930, Time data load: 2.713ms, Time training: 2.280ms
Epoch [2/8], Step [400/469], Loss: 1.3419, Time data load: 2.697ms, Time training: 2.327ms
Epoch [3/8], Step [200/469], Loss: 1.2721, Time data load: 2.682ms, Time training: 2.329ms
Epoch [3/8], Step [400/469], Loss: 1.0584, Tim

### Exemple d'exécution multi-nœuds multi-GPU

* Écriture du script batch de soumission

**Rappel**:  si votre unique projet dispose d'heures CPU et GPU ou si votre login est rattaché à plusieurs projets, vous devez impérativement préciser l'attribution sur laquelle doit être décomptée les heures consommées par vos calculs, en ajoutant l'option `--account=my_project@gpu` comme indiqué dans la [documentation IDRIS](http://www.idris.fr/jean-zay/cpu/jean-zay-cpu-doc_account.html).


In [17]:
%%writefile batch_multinode.slurm
#!/bin/sh
#SBATCH --job-name=mnist_pytorch_multinode
#SBATCH --output=mnist_pytorch_multinode.out
#SBATCH --error=mnist_pytorch_multinode.out
#SBATCH --nodes=3
#SBATCH --gres=gpu:4
#SBATCH --ntasks-per-node=4
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=00:10:00
#SBATCH --qos=qos_gpu-dev

# go into the submission directory 
cd ${SLURM_SUBMIT_DIR}

# cleans out modules loaded in interactive and inherited by default
module purge

# loading modules
module load pytorch-gpu/py3/1.7.0

# echo of launched commands
set -x

# code execution
srun python -u mnist-distributed.py --epochs 8 --batch-size 128

Overwriting batch_multinode.slurm


* Soumission du script batch et affichage de la sortie

In [18]:
%%bash
# submit job
sbatch batch_multinode.slurm

Submitted batch job 210558


sbatch: IDRIS: setting exclusive mode for the job.


In [19]:
# watch Slurm queue line until the job is done
# execution should take about 1 minute
import time
sq = !squeue -u $USER -n mnist_pytorch_multinode
print(sq[0])
while len(sq) >= 2:
    print(sq[1],end='\r')
    time.sleep(5)
    sq = !squeue -u $USER -n mnist_pytorch_multinode
print('\n Done!')

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON) 
            210558   gpu_p13 mnist_py  ssos040  R       0:22      3 r11i4n[4-6] 
 Done!


In [20]:
# display output
%cat mnist_pytorch_multinode.out

Loading pytorch-gpu/py3/1.7.0
  Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.6.4-1-cuda
    cudnn/7.6.5.32-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
    openmpi/4.0.2-cuda
+ srun python -u mnist-distributed.py --epochs 8 --batch-size 128
- Process 2 corresponds to GPU 2 of node 0
- Process 3 corresponds to GPU 3 of node 0
>>> Training on  3  nodes and  12  processes, master node is  r11i4n4
- Process 0 corresponds to GPU 0 of node 0
- Process 1 corresponds to GPU 1 of node 0
- Process 4 corresponds to GPU 0 of node 1
- Process 7 corresponds to GPU 3 of node 1
- Process 5 corresponds to GPU 1 of node 1
- Process 6 corresponds to GPU 2 of node 1
- Process 10 corresponds to GPU 2 of node 2
- Process 8 corresponds to GPU 0 of node 2
- Process 9 corresponds to GPU 1 of node 2
- Process 11 corresponds to GPU 3 of node 2
Epoch [1/8], Step [200/500], Loss: 2.0609, Time data load: 1.045ms, Time training: 2.901ms
Epoch [1/8], Step [400/500], Loss: 1.8782, Time data load: 

### Exemple d'exécution multi-nœuds à partir d'un checkpoint

* Écriture du script batch de soumission

**Rappel**:  si votre unique projet dispose d'heures CPU et GPU ou si votre login est rattaché à plusieurs projets, vous devez impérativement préciser l'attribution sur laquelle doit être décomptée les heures consommées par vos calculs, en ajoutant l'option `--account=my_project@gpu` comme indiqué dans la [documentation IDRIS](http://www.idris.fr/jean-zay/cpu/jean-zay-cpu-doc_account.html).


In [21]:
%%writefile batch_multinode.slurm
#!/bin/sh
#SBATCH --job-name=mnist_pytorch_multinode
#SBATCH --output=mnist_pytorch_multinode.out
#SBATCH --error=mnist_pytorch_multinode.out
#SBATCH --nodes=3
#SBATCH --gres=gpu:4
#SBATCH --ntasks-per-node=4
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=00:10:00
#SBATCH --qos=qos_gpu-dev

# go into the submission directory 
cd ${SLURM_SUBMIT_DIR}

# cleans out modules loaded in interactive and inherited by default
module purge

# loading modules
module load pytorch-gpu/py3/1.7.0

# echo of launched commands
set -x

# code execution
srun python -u mnist-distributed.py --epochs 8 --batch-size 128 -c ./checkpoint/12GPU_8epoch.checkpoint

Overwriting batch_multinode.slurm


In [22]:
%%bash
# submit job
sbatch batch_multinode.slurm

Submitted batch job 210567


sbatch: IDRIS: setting exclusive mode for the job.


In [23]:
# watch Slurm queue line until the job is done
# execution should take about 1 minute
import time
sq = !squeue -u $USER -n mnist_pytorch_multinode
print(sq[0])
while len(sq) >= 2:
    print(sq[1],end='\r')
    time.sleep(5)
    sq = !squeue -u $USER -n mnist_pytorch_multinode
print('\n Done!')

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON) 
            210567   gpu_p13 mnist_py  ssos040  R       0:21      3 r11i4n[4-6] 
 Done!


In [24]:
# display output
%cat mnist_pytorch_multinode.out

Loading pytorch-gpu/py3/1.7.0
  Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.6.4-1-cuda
    cudnn/7.6.5.32-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
    openmpi/4.0.2-cuda
+ srun python -u mnist-distributed.py --epochs 8 --batch-size 128 -c ./checkpoint/12GPU_8epoch.checkpoint
- Process 11 corresponds to GPU 3 of node 2
- Process 7 corresponds to GPU 3 of node 1
- Process 8 corresponds to GPU 0 of node 2
- Process 9 corresponds to GPU 1 of node 2
- Process 10 corresponds to GPU 2 of node 2
- Process 3 corresponds to GPU 3 of node 0
- Process 4 corresponds to GPU 0 of node 1
- Process 5 corresponds to GPU 1 of node 1
- Process 6 corresponds to GPU 2 of node 1
>>> Training on  3  nodes and  12  processes, master node is  r11i4n4
- Process 1 corresponds to GPU 1 of node 0
- Process 0 corresponds to GPU 0 of node 0
- Process 2 corresponds to GPU 2 of node 0
Epoch [1/8], Step [200/500], Loss: 0.7129, Time data load: 1.039ms, Time training: 2.302ms
Epoch [1/8], Step [