# Soporte básico de Paralelismo de Datos en Pytorch

Este tutorial explora algunos conceptos básicos relacionados con el soporte nativo para implementar paralelismo de datos en Pytorch.

En Pytorch existen dos mecanismos básicos para implementar paralelismo de datos en Pytorch: DataParallel y DistributedDataParallel.

El siguiente ejemplo ilustra el uso de DataParallel para explotar el mecanismo DataParallel. Este mecanismo es apto para el uso en un nodo equipado de varios aceleradores (normalmente GPUs) que pueden ser configuradas como trabajadores. Debido a que usa un único proceso Python, y por lo tanto, puede estar afectada por el GIL, no es la opción más recomendable. En cambio, se debería utilizar, incluso en este escenario con un único nodo, la opción más avanzada DistributedDataParallel, que sí genera varios procesos Python que pueden ejecutarse de forma concurrente.

Empecemos por un pequeño ejemplo comenado de uso de DataParallel

In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

# Definición del modelo: uno modelo linear de una sola capa
class SimpleModel(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(SimpleModel, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)

    def forward(self, x):
        return self.linear(x)


# Generador del conjunto de datos: aleatorio
class MyDataset(Dataset):
    def __len__(self):
        return 1000

    def __getitem__(self, idx):
        return torch.randn(10), torch.tensor([1.0])  


def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Instanciamos el modelo y lo enviamos a un dispositivo
    model = SimpleModel(10, 1)
    model = model.to(device)

    # Pasamos el modelo por el wrapper DataParallel para tener una versión paralela del mismo
    if torch.cuda.device_count() > 1:
        model = nn.DataParallel(model)

    # Definición del conjunto de datos y el DataLoader
    dataset = MyDataset()
    dataloader = DataLoader(dataset, batch_size=32)

    # Definición de la función de pérdida y el optimizador
    criterion = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    # Para cada epoch
    for epoch in range(10): 
        running_loss = 0.0
        # Para cada sample del conjunto de datos
        for i, data in enumerate(dataloader, 0):
            inputs, labels = data[0].to(device), data[1].to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print('[%d] loss: %.3f' % (epoch + 1, running_loss / i))

    print('Finalizó el entrenamiento')


if __name__ == '__main__':
    main()


[1] loss: 1.421
[2] loss: 0.373
[3] loss: 0.073
[4] loss: 0.014
[5] loss: 0.002
[6] loss: 0.000
[7] loss: 0.000
[8] loss: 0.000
[9] loss: 0.000
[10] loss: 0.000
Finalizó el entrenamiento


Ahora, vamos a hacer el mismo ejemplo pero con el mecanismo de DistributedDataParallel. El código se escribe a un fichero ddp.py y se lanza con el launcher torchrun en el nodo actual. Nótese que solo hemos comentado las líneas de código que diferen respecto a la versión que usa DataParallel.


In [21]:
%%writefile ddp.py
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
import os

class SimpleModel(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(SimpleModel, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)

    def forward(self, x):
        return self.linear(x)

class MyDataset(Dataset):
    def __len__(self):
        return 1000

    def __getitem__(self, idx):
        return torch.randn(10), torch.tensor([1.0])


def main():
    # Inicialización del backend para las comunicaciones distribuidas (nccl)
    dist.init_process_group(backend='nccl')
    
    # Global rank del proceso actual
    global_rank = dist.get_rank()

    # El dispositivo usado será igual al global_rank
    torch.cuda.set_device(global_rank)

    
    model = SimpleModel(10, 1).cuda()

    # En esta ocasión pasamos el modelo por el Wrapper DDP
    ddp_model = DDP(model, device_ids=[global_rank], output_device=global_rank)

    # El dataLoader ahora usa un sampler distribuido
    dataset = MyDataset()
    sampler = DistributedSampler(dataset)
    dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)


    criterion = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001, momentum=0.9)

    for epoch in range(10):
        running_loss = 0.0
        for i, data in enumerate(dataloader, 0):
            inputs, labels = data[0].cuda(), data[1].cuda()

            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print('[%d] loss: %.3f' % (epoch + 1, running_loss / i))

    print('Finished Training')


if __name__ == '__main__':
    main()


Overwriting ddp.py


In [22]:
!torchrun --nproc_per_node=2 ddp.py


[1] loss: 0.892
[1] loss: 0.872
[2] loss: 0.528
[2] loss: 0.543
[3] loss: 0.222
[3] loss: 0.236
[4] loss: 0.101
[4] loss: 0.109
[5] loss: 0.044
[5] loss: 0.041
[6] loss: 0.019
[6] loss: 0.019
[7] loss: 0.007
[7] loss: 0.008
[8] loss: 0.003
[8] loss: 0.003
[9] loss: 0.001
[9] loss: 0.001
[10] loss: 0.001
Finished Training
[10] loss: 0.001
Finished Training
