# Conceitos de Paralelismo e treinamento em múltiplas GPUs [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://githubtocolab.com/chcomin/curso-visao-computacional-2024/blob/main/M12_topicos_extra/2%20-%20Paralelismo%20\(GPU\).ipynb)

### Paralelismo no pré-processamento de dados

O dataloader do Pytorch possibilita paralelizar de forma fácil o pré-processamento de dados. O arquivo dataloader.py possui um exemplo de um dataset com 15 itens, batch size de tamanho 4 e 3 `workers`, que são processos paralelos criados pelo dataloader. O dataset retorna o índice do processo responsável por ler cada item do dataset.

Nota: precisamos criar o arquivo dataloader.py porque o dataloader cria novos processos do interpretador Python, o que não pode ser feito em código interativo.

In [12]:
import dataloader

dataloader.main()

batch_idx=0
['idx=0 processado por worker 0', 'idx=1 processado por worker 0', 'idx=2 processado por worker 0', 'idx=3 processado por worker 0']
batch_idx=1
['idx=4 processado por worker 1', 'idx=5 processado por worker 1', 'idx=6 processado por worker 1', 'idx=7 processado por worker 1']
batch_idx=2
['idx=8 processado por worker 2', 'idx=9 processado por worker 2', 'idx=10 processado por worker 2', 'idx=11 processado por worker 2']
batch_idx=3
['idx=12 processado por worker 0', 'idx=13 processado por worker 0', 'idx=14 processado por worker 0']


### Treinando com múltiplas GPUs

In [13]:
import torch 
from torch import nn

device1 = 'cuda:0'
# Vamos usar a cpu como se fosse outra gpu. Normalmente usaríamos 'cuda:1'
device2 = 'cpu' 
# Lista de dispositivos disponíveis. Poderiam ser n GPUs!
devices = [device1, device2]

#### Execução de partes do cálculo em diferentes dispositivos

O Pytorch permite trocar a qualquer momento de dispositivo. **O cálculo do gradiente é propagado entre os dispotitivos**. Mas é preciso tomar cuidado que cópias entre dispositivos são custosas e requerem sincronização.

In [14]:
x0 = torch.tensor([1., 2., 3.], requires_grad=True, device=device1)
x1 = 2*x0.to(device2)        # Envia tensor para device2
x2 = (3*x1 + 5).to(device1)  # Envia tensor para device1
x3 = x2.sum().to(device2)    # Envia tensor para device2
x3.backward()
# Valor esperado: grad=2*3
print(x0.grad)

tensor([6., 6., 6.], device='cuda:0')


Portanto, podemos colocar cada parte do modelo em uma GPU distinta, e calcular os gradientes normalmente:

In [15]:
class Model(nn.Module):
    '''Uma rede básica só para exemplificar.'''
    
    def __init__(self, device1, device2):
        super().__init__()

        self.device1 = device1
        self.device2 = device2

        # Camadas estão no device1
        self.layers1 = nn.Sequential(
            nn.Conv2d(1,1,1),
            nn.BatchNorm2d(1),
            nn.ReLU()
        ).to(device1)

        # Camadas estão no device2
        self.layers2 = nn.Sequential(
            nn.Conv2d(1,1,1),
            nn.BatchNorm2d(1),
            nn.ReLU()
        ).to(device2)

    def forward(self, x):

        x = x.to(self.device1)
        # Aplica camada e envia para o device2
        x = self.layers1(x).to(self.device2)
        x = self.layers2(x)

        return x
    
model = Model(device1, device2)
x = torch.rand(1, 1, 224, 224, device=device1)
out = model(x)
out.sum().backward()

O exemplo acima é útil quando um modelo não cabe em uma única GPU. Os parâmetros, ativações e gradientes de cada camada são salvas na GPU que a camada está alocada. Todo o resto do treinamento da rede é exatamente igual ao caso que a rede está em uma única GPU, pois o otimizador irá atualizar os parâmetros corretamente em cada dispositivo. 

A desvantagem dessa implementação é que uma GPU fica ociosa enquanto a outra está executando. É possível otimizar esse exemplo para manter ambas as GPUs ocupadas, mas a lógica fica um pouco mais complicada. Esse processo é chamado de *pipeline parallelism*. Veja um exemplo em https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html#speed-up-by-pipelining-inputs  

#### Paralelismo de dados

O paralelismo de dados consiste em dividir um batch entre os dispositivos. Por exemplo, em um sistema com 4 GPUs, um batch de tamanho 64 é dividido em minibatches de tamanho 16, e cada minibatch é enviado para uma GPU. Os dados divididos são chamados de *shards*.

Inicialmente, a rede é copiada para cada GPU. No treinamento, cada modelo processa o minibatch de forma independente, e os gradientes são calculados também de forma independente em cada GPU. Após o cálculo do gradiente, é preciso combinar os gradientes calculados e enviar o resultado para cada GPU. Essa operação é chamada de *allreduce*

In [17]:
def scatter(x, devices):
    '''Divide um batch em n diferentes dispositivos.'''

    n = len(devices)
    # .chunk(n) divide o batch em n minibatches de tamanho bs/n
    x_shard = list(x.chunk(n))
    # Envia cada minibatch para o dispositivo correto
    for idx in range(n):
        x_shard[idx] = x_shard[idx].to(devices[idx])

    return x_shard

# Lista de dispositivos
devices = [device1, device2]
# Batch de 16 imagens
x = torch.rand(16, 1, 224, 224)
# Divide em 2 minibatches de tamanho 8
x_shards = scatter(x, devices)
print(x_shards[0].shape, x_shards[0].device)
print(x_shards[1].shape, x_shards[1].device)

torch.Size([8, 1, 224, 224]) cuda:0
torch.Size([8, 1, 224, 224]) cpu


Como mencionado, precisamos de uma operação allreduce para somar os gradientes calculados nas GPUs e enviar o resultado para todas as GPUs. A soma é feita na GPU 0, que recebe os gradientes das outras GPUs, soma os valores, e envia o resultado.

In [19]:
def allreduce(grad_shards):
    """Agrega os valores de gradientes no dispositivo 0 e envia o resultado de 
    volta para os outros dispositivos."""

    n = len(grad_shards)
    # Soma os valores que estão nos devices 1, 2, 3,... e armazena no device 0
    for i in range(1, n):
        # Envia o dado para o device 0
        x0 = grad_shards[i].to(grad_shards[0].device)
        # Soma in-place o valor enviado com o que está no device 0
        grad_shards[0][:] += x0

    # Envia o resultado para cada device
    for i in range(1, n):
        xi = grad_shards[0].to(grad_shards[i].device)
        grad_shards[i][:] = xi

# Exemplo de gradiente em diferentes dispositivos
x = torch.tensor([1., 2.])
grad_shards = scatter(x, devices)
# soma os valores
allreduce(grad_shards)
# Resultado esperado: o valor 3 em ambos os dispositivos. Note que o dispositivo
# de tensores na CPU não são impressos automaticamente
grad_shards

[tensor([3.], device='cuda:0'), tensor([3.])]

Loop de treinamento

Podemos então criar um loop de treinamento. Abaixo faremos apenas o treinamento, sem o passo de validação, que seria similar.

In [11]:
import copy

def get_dataset():
    '''Gambiarra para simular um dataloader de segmentação com 10 batches. Cada 
    batch possui 16 imagens de 1 canal e 16 imagens de target com valor 0'''

    images = torch.rand(10, 16, 1, 224, 224)
    targets = torch.zeros(10, 16, 224, 224, dtype=torch.long)
    dl = list(zip(images, targets))
    
    return dl

def train(model, dl, devices):

    # Copia o modelo para cada GPU
    models = [copy.deepcopy(model).to(device) for device in devices]
    # Lista de parâmetros de cada modelo, note que os parâmetros estão
    # em dispositivos diferentes
    params_devs = [list(model.parameters()) for model in models]
    # Cria uma lista flat para o otimizador
    params_all = [param for params in params_devs for param in params]
    optim = torch.optim.SGD(params_all)
    loss_func = nn.CrossEntropyLoss()

    for epoch in range(10):
        print(epoch)
        for x_b, t_b in dl:
            optim.zero_grad()
            # Divide os dados entre as GPUs
            x_shards = scatter(x_b, devices)
            t_shards = scatter(t_b, devices)
            # Aplica o modelo e calcula gradientes em cada GPU
            for model, x_shard, t_shard in zip(models, x_shards, t_shards):
                scores = model(x_shard)
                loss = loss_func(scores, t_shard)
                loss.backward()

            # Sincroniza os dispositivos para evitar copiar gradientes 
            # do batch anterior
            torch.cuda.synchronize()
            # Soma o gradiente de cada parâmetro na GPU 0 e copia o resultado
            # para todas as GPUs
            num_params = len(params_devs[0])
            for param_idx in range(num_params):
                grad_shards = []
                for params in params_devs:
                    # `params` são os parâmetros do modelo em uma GPU
                    # `params[param_idx]` é um parâmetro específico do modelo
                    grad_shards.append(params[param_idx].grad)
                # Soma os gradientes e copia entre os modelos
                with torch.no_grad():
                    allreduce(grad_shards)

            # Atualiza os parâmetros
            optim.step()

dl = get_dataset()
# `model`` pode ser qualquer modelo (ResNet, ViT, etc). Vamos fingir que uma camada
# conv é o nosso modelo
model = nn.Conv2d(1,1,1)
train(model, dl, devices)

0
1
2
3
4
5
6
7
8
9


No código que implementamos é preciso tomar cuidado com camadas BatchNorm. A camada BatchNorm calcula as estatísticas de cada batch, mas elas estão sendo calculadas de forma independente em cada GPU. Essa camada conhecidamente é problemática em paralelização. Uma estratégia é além de compartilhar gradientes compartilhar também os parâmetros do batch norm. É comum também utilizar a camada LayerNorm ao invés de BatchNorm, pois ela não possui esse problema.

Há um gargalo na nossa implementação, toda a lógica é orquestrada por um único processo da CPU. Em uma implementação completamente paralela, são criados n processos para n GPUs, e cada processo se comunica com a respectiva GPU. Há uma implementação desse procedimento na classe **nn.parallel.DistributedDataParallel**

### Visão geral sobre opções de paralelismo e escalabilidade

* Pipeline parallelism (PP): Consiste em executar diferentes camadas de um modelo em diferentes GPUs. Primeiro exemplo deste notebook.

* Data Parallelism (DP): Consiste em replicar um modelo em múltiplas GPUs. Cada GPU processa um batch diferente de dados. É necessário agregar os gradientes entre as GPUs (implementado acima). Cada GPU precisa ter memória suficiente para executar o modelo. O caso de apenas um processo na CPU é implementado pela camada nn.DataParallel do Pytorch. 

* Distributed Data Parallelism (DDP): O mesmo que DataParallel, mas é criado um processo distinto na CPU para cada GPU, o que evita gargalos. A classe to Pytorch nn.parallel.DistributedDataParallel possui uma implementação dessa estratégia. As bibliotecas Accelerate e Fabric permitem implementar essa estratégia de forma simples em um loop de treinamento. 

* Model Parallelism (MP): As camadas de um modelo são divididas entre GPUs. Por exemplo, metade da matriz de uma camada linear fica em uma GPU e metade na outra. Essa estratégia permite executar modelos que não cabem em uma GPU, mas como é necessário muita comunicação entre GPUs, a velocidade de processamente tende a ser menor. O modelo AlexNet usou essa estratégia, já que a GPU utilizada possuía apenas 3GB

* Fully Sharded Data Parallel (FSDP): Conceito que divide os parâmetros e gradientes do modelo e estados do otimizador em diversas partes (shards), que são alocadas em GPUs distintas. Cada GPU executa apenas parte do método .forward e dos cálculos do gradiente. Isso permite ganhar o benefício do paralelismo de dados do DP e também executar modelos enormes com centenas de GB de tamanho. A biblioteca DeepSpeed é a referência atual dessa estratégia.

* CPU-disk/Offloading: consiste em enviar partes de parâmetros e ativações do modelo para a RAM ou disco, o que pertmite executar modelos com trilhões de parâmetros. A biblioteca DeepSpeed também é a referência nessa estratégia.