# GPU DataParallel
- 데이터의 Batch를 Parallel하게 학습하는 방법
- Batch size가 256이고 4개의 GPU가 있을 때, 각 GPU마다 64 batch size로 학습하는 테크닉
- 여러 GPU를 이용해 batch size를 높여 학습시간을 단축할 수 있음

- DataParallel 방법은 총 4가지가 있음
 - pytorch의 nn.DataParallel
 - pytorch 외부 패키지 pytorch-encoding
 - pytorch의 nn.DistributedDataParallel
 - Nvidia의 Apex

![gpu image](https://miro.medium.com/max/1400/1*F6SXjBp6BCoFTZ26RKnz9A.png)

## 0. 준비물
- Environment : 4개의 GPU 서버
- Dataset : MNIST
- Model : VGG16

In [1]:
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision import models

# import matplotlib.pyplot as plt
# import numpy as np

In [2]:
# Dataset & DataLoader
transform = transforms.Compose(
[
    transforms.ToTensor(),
    transforms.Normalize([0], [1])
])

data_path = os.path.join(os.getenv('HOME'), 'data')
train_batch = 2048
test_batch = 128
trainset = torchvision.datasets.MNIST(root = data_path, train = True,
                                      download = True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size = train_batch,
                                          shuffle = True, num_workers=4)

testset = torchvision.datasets.MNIST(root = data_path, train = False,
                                     download = True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size = test_batch,
                                         shuffle = True, num_workers=4)

# GPU set
num_gpus = torch.cuda.device_count()
if num_gpus >= 1:
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
print('Number of GPU : ', num_gpus)   

# Model
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, 3, padding=1)
        self.conv2 = nn.Conv2d(10, 10, 3, padding=1)
        self.pool = nn.MaxPool2d(2,2)
        self.fc1 = nn.Linear(10 * 7 * 7, 10)
        
    def forward(self, x):
        x = F.relu(self.conv1(x)) # 28 28
        x = self.pool(x)          # 14 14
        x = F.relu(self.conv2(x)) # 14 14
        x = self.pool(x)          # 7 7
        x = x.view(-1, 10 * 7 * 7)
        x = F.relu(self.fc1(x))
        
        return x
    
model = Net()

# loss & oiptimizer
loss_func = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr = 0.1)

model

Number of GPU :  4


Net(
  (conv1): Conv2d(1, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(10, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=490, out_features=10, bias=True)
)

## 1. Pytorch의 DataParallel
- 가장 간단한 방법
- 하지만 데이터를 scatter하고 gather하는 과정에서 1개의 GPU에 메모리가 몰리는 단점이 있음
 - output의 GPU를 다른 GPU로 할당하면 조금이나마 메모리 분산이 가능함
 - 하지만 이것도 문제해결을 위한 방법은 아님

In [3]:
import torch.nn as nn
    
model_parallel = nn.DataParallel(model)
model_parallel.to(device)

DataParallel(
  (module): Net(
    (conv1): Conv2d(1, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv2): Conv2d(10, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (fc1): Linear(in_features=490, out_features=10, bias=True)
  )
)

In [4]:
# Train

import time

EPOCH = 2
for e in range(1, EPOCH+1):
    model_parallel.train()
    start_time = time.time()
    running_loss = 0
    
    for i, data in enumerate(trainloader):
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model_parallel(images)
        loss = loss_func(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss
        now = time.time()
        print('\r[%d/%d]-----[%d/%d] LOSS : %.3f------ Time : %d' 
              %(e, EPOCH, i, 60000/512, running_loss, now - start_time), end = '')
        
    print('\n')

[1/2]-----[29/117] LOSS : 62.476------ Time : 18

[2/2]-----[29/117] LOSS : 43.534------ Time : 3



## 1-1. Outputs을 다른 GPU에 할당하기
- batch 분배, loss 계산, output 등이 모두 0번 GPU에 몰려있어서 다른 GPU로 분산시키기
- 효과는 매우 적음

In [7]:
os.environ["CUDA_VISIBLE_DEVICES"] = '0, 1, 2, 3'
model_parallel = nn.DataParallel(model, output_device=1)

## 2. Custom된 dataparallel(외부 패키지)
- loss 계산을 1개의 GPU가 아닌 각 GPU로 분산시키는 방법
- 1개의 GPU에 메모리가 몰리는 문제를 해결할 수 있음
- 1-1의 방법보다는 상대적으로 메모리 분배가 잘됨

> 외부패키지인 Pytorch-Encoding 패키지에서 parallel.py 파일을 불러와야함
<br>

##### 적용 방법
- https://github.com/zhanghang1989/PyTorch-Encoding 에서 encoding/parallel.py 다운로드
- model에 DataParallelModel 적용
- loss function에 DataParallelCriterion 적용

##### 현재 사용 x -> 에러뜸
- https://github.com/zhanghang1989/PyTorch-Encoding/issues/361
- 이걸 만든 사람도 이제 이거 쓰지 말고 pytorch의 DDP 쓰라고함
- 이거 업그레이드 안 할 듯...

In [3]:
from parallel import DataParallelModel, DataParallelCriterion

model_parallel2 = DataParallelModel(model).to(device)
loss_func2 = DataParallelCriterion(loss_func)

In [None]:
# Train

import time

EPOCH = 2
for e in range(1, EPOCH+1):
    model_parallel2.train()
    start_time = time.time()
    running_loss = 0
    
    for i, data in enumerate(trainloader):
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model_parallel2(images)
        loss = loss_func2(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss
        now = time.time()
        print('\r[%d/%d]-----[%d/%d] LOSS : %.3f------ Time : %d' 
              %(e, EPOCH, i, 60000/512, running_loss, now - start_time), end = '')
        
    print('\n')

## 3. Pytorch의 Distributed Data Parallel
- Pytorch에서 제공해주는 DDP 사용
- Batch 뿐만 아니라 여러 컴퓨터(머신)에 대해서도 Parallel연산이 가능함
- Multi processor를 사용해야하기 때문에 약간 복잡함(설정이 필요함)
 - 학습을 진행시키는 코드를 함수로 작성(main_worker)
   - GPU 분산 학습을 위해 main_worker 초기화(init_process_group)
   - dataloader 분산처리를 위해 sampler 설정
   - model 분산처리를 위해 DDP 적용
 - 해당 함수를 multi-processor로 실행
 
> Jupyter Notebook에서는 Multi-processor가 작동하지 않음<br>
해결 방법은 있는데 뭔가 복잡해보임
https://discuss.pytorch.org/t/multi-gpu-ddp-in-jupyter-notebook/104302

In [53]:
%%writefile ddp_example.py
# py파일로 저장 후 실행

import os
import time

os.environ["CUDA_VISIBLE_DEVICES"] = '0, 1, 2, 3'

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision import models

import torch.multiprocessing as mp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

import warnings
warnings.filterwarnings('ignore')


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, 3, padding=1)
        self.conv2 = nn.Conv2d(10, 10, 3, padding=1)
        self.pool = nn.MaxPool2d(2,2)
        self.fc1 = nn.Linear(10 * 7 * 7, 10)
        
    def forward(self, x):
        x = F.relu(self.conv1(x)) # 28 28
        x = self.pool(x)          # 14 14
        x = F.relu(self.conv2(x)) # 14 14
        x = self.pool(x)          # 7 7
        x = x.view(-1, 10 * 7 * 7)
        x = F.relu(self.fc1(x))
        
        return x

def main_worker(rank, world_size):
    # Init DDP
    dist.init_process_group(backend='nccl', rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)
    # DataLoader
    transform = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize([0], [1])
    ])

    data_path = os.path.join(os.getenv('HOME'), 'data')
    train_batch = 2048
    test_batch = 128
    trainset = torchvision.datasets.MNIST(root = data_path, train = True,
                                          download = True, transform=transform)
    train_sampler = DistributedSampler(trainset)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size = train_batch,
                                              num_workers=4, pin_memory=True, sampler=train_sampler)

    testset = torchvision.datasets.MNIST(root = data_path, train = False,
                                         download = True, transform=transform)
    test_sampler = DistributedSampler(testset)
    testloader = torch.utils.data.DataLoader(testset, batch_size = test_batch,
                                             num_workers=4, pin_memory=True, sampler=test_sampler)    
    # Model
    model = Net()
    model_ddp = DDP(model.to(rank), device_ids=[rank])
    
    # Optimizer
    loss_func = nn.CrossEntropyLoss().to(rank)
    optimizer = optim.SGD(model_ddp.parameters(), lr = 0.1)
    
    # train
    EPOCH = 2
    for e in range(1, EPOCH+1):
        train_sampler.set_epoch(e)
        model_ddp.train()
        
        start_time = time.time()
        running_loss = 0
        for i, data in enumerate(trainloader):
            images, labels = data
            images, labels = images.to(rank), labels.to(rank)

            optimizer.zero_grad()
            outputs = model(images)
            loss = loss_func(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss
            now = time.time()
            print('\r[%d/%d]-----[%d/%d] LOSS : %.3f------ Time : %d' 
                  %(e, EPOCH, i, 60000/512, running_loss, now - start_time), end = '')
        print('\n')

def main():
    world_size = torch.cuda.device_count()
    mp.spawn(main_worker,
            args=(world_size,),
            nprocs=world_size, join=True)

if __name__ == '__main__':
    main()

Overwriting ddp_example.py


## 4. Apex 이용하기
- 3번 Example인 DDP랑 비슷하지만 Multi-processor를 사용하지 않음

In [52]:
%%writefile ddp_example2.py
# py파일로 저장 후 실행

import os
import time
import argparse
os.environ["CUDA_VISIBLE_DEVICES"] = '0, 1, 2, 3'

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

import torch.distributed as dist
from torch.utils.data.distributed import DistributedSampler

from apex import amp
from apex.parallel import DistributedDataParallel as DDP

import warnings
warnings.filterwarnings('ignore')

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, 3, padding=1)
        self.conv2 = nn.Conv2d(10, 10, 3, padding=1)
        self.pool = nn.MaxPool2d(2,2)
        self.fc1 = nn.Linear(10 * 7 * 7, 10)
        
    def forward(self, x):
        x = F.relu(self.conv1(x)) # 28 28
        x = self.pool(x)          # 14 14
        x = F.relu(self.conv2(x)) # 14 14
        x = self.pool(x)          # 7 7
        x = x.view(-1, 10 * 7 * 7)
        x = F.relu(self.fc1(x))
        
        return x

def main():
    # parser
    local_rank = int(os.environ['LOCAL_RANK'])
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", default=1, type=int)
    args = parser.parse_args()
    
    # Init DDP
    args.world_size = 1
    dist.init_process_group(backend='nccl')
    torch.cuda.set_device(local_rank)
    # DataLoader
    transform = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize([0], [1])
    ])

    data_path = os.path.join(os.getenv('HOME'), 'data')
    train_batch = 128
    test_batch = 128
    trainset = torchvision.datasets.MNIST(root = data_path, train = True,
                                          download = True, transform=transform)
    train_sampler = DistributedSampler(trainset)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size = train_batch,
                                              num_workers=4, pin_memory=True, sampler=train_sampler)

    testset = torchvision.datasets.MNIST(root = data_path, train = False,
                                         download = True, transform=transform)
    test_sampler = DistributedSampler(testset)
    testloader = torch.utils.data.DataLoader(testset, batch_size = test_batch,
                                             num_workers=4, pin_memory=True, sampler=test_sampler)    
    # Model
    model = Net().cuda()
    # Optimizer
    loss_func = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr = 0.1)
    
    model_ddp = DDP(model)
    
    # train
    EPOCH = 5
    for e in range(1, EPOCH+1):
        train_sampler.set_epoch(e)
        model_ddp.train()
        
        start_time = time.time()
        running_loss = 0
        for i, data in enumerate(trainloader):
            images, labels = data
            images, labels = images.cuda(), labels.cuda()

            optimizer.zero_grad()
            outputs = model_ddp(images)
            loss = loss_func(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss
            now = time.time()
            print('\r[%d/%d]-----[%d/%d] LOSS : %.3f------ Time : %d' 
                  %(e, EPOCH, i, 60000/512, running_loss, now - start_time), end = '')
        print('\n')
        
    if local_rank == 0:
        print('final loss : ')
if __name__ == '__main__':
    main()
    


Overwriting ddp_example2.py


## Reference codes

In [76]:
%%writefile ddp_example3.py
import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import argparse
import os
import random
import numpy as np

os.environ["CUDA_VISIBLE_DEVICES"] = '0, 1, 2, 3'

def set_random_seeds(random_seed=0):

    torch.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

def evaluate(model, device, test_loader):

    model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total

    return accuracy

def main():

    num_epochs_default = 2
    batch_size_default = 256 # 1024
    learning_rate_default = 0.1
    random_seed_default = 0
    model_dir_default = "saved_models"
    model_filename_default = "resnet_distributed.pth"

    # Each process runs on 1 GPU device specified by the local_rank argument.
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("--local_rank", type=int, help="Local rank. Necessary for using the torch.distributed.launch utility.")
    parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
    parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)
    parser.add_argument("--learning_rate", type=float, help="Learning rate.", default=learning_rate_default)
    parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
    parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
    parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default)
    parser.add_argument("--resume", action="store_true", help="Resume training from saved checkpoint.")
    argv = parser.parse_args()

    local_rank = argv.local_rank
    num_epochs = argv.num_epochs
    batch_size = argv.batch_size
    learning_rate = argv.learning_rate
    random_seed = argv.random_seed
    model_dir = argv.model_dir
    model_filename = argv.model_filename
    resume = argv.resume
    
    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)
    # Create directories outside the PyTorch program
    # Do not create directory here because it is not multiprocess safe
    '''
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    '''

    model_filepath = os.path.join(model_dir, model_filename)

    # We need to use seeds to make sure that the models initialized in different processes are the same
    set_random_seeds(random_seed=random_seed)

    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group(backend="nccl")
    # torch.distributed.init_process_group(backend="gloo")

    # Encapsulate the model on the GPU assigned to the current process
    model = torchvision.models.resnet18(pretrained=False)
    model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

    device = torch.device("cuda:{}".format(local_rank))
    model = model.to(device)
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

    # We only save the model who uses device "cuda:0"
    # To resume, the device for the saved model would also be "cuda:0"
    if resume == True:
        map_location = {"cuda:0": "cuda:{}".format(local_rank)}
        ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))

    # Prepare dataset and dataloader
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914), (0.2023)),
    ])

    # Data should be prefetched
    # Download should be set to be False, because it is not multiprocess safe
    data_path = os.path.join(os.getenv('HOME'), 'data')
    train_set = torchvision.datasets.MNIST(root=data_path, train=True, download=False, transform=transform) 
    test_set = torchvision.datasets.MNIST(root=data_path, train=False, download=False, transform=transform)

    # Restricts data loading to a subset of the dataset exclusive to the current process
    train_sampler = DistributedSampler(dataset=train_set)

    train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
    # Test loader does not have to follow distributed sampling strategy
    test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)

    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)

    # Loop over the dataset multiple times
    for epoch in range(num_epochs):

        print("Local Rank: {}, Epoch: {}, Training ...".format(local_rank, epoch))
        
        # Save and evaluate model routinely
        if epoch % 10 == 0:
            if local_rank == 0:
                accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
                torch.save(ddp_model.state_dict(), model_filepath)
                print("-" * 75)
                print("Epoch: {}, Accuracy: {}".format(epoch, accuracy))
                print("-" * 75)

        ddp_model.train()

        for data in train_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

if __name__ == "__main__":
    
    main()

Overwriting ddp_example3.py
