# Amazon SageMaker와 병렬로 SageMaker 분산 모델을 사용하여 모델 병렬화로 MNIST 훈련 작업 시작

SageMaker 분산 모델 병렬 (SageMaker Distributed Model Parallel, SMP)은 GPU 메모리 제한으로 인해 이전에 학습하기 어려웠던 대규모 딥러닝 모델을 훈련하기 위한 모델 병렬 처리 라이브러리입니다. SageMaker Distributed Model Parallel은 여러 GPU 및 인스턴스에서 모델을 자동으로 효율적으로 분할하고 모델 훈련을 조정하므로 더 많은 매개 변수로 더 큰 모델을 생성하여 예측 정확도를 높일 수 있습니다.

이 노트북에서는 예제 PyTorch 훈련 스크립트 `utils/pt_mnist.py` 및 [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/overview.html#train-a-model-with-the-sagemaker-python-sdk) 를 사용하여 모델을 훈련하도록 Sagemaker Distributed Model Parallel을 구성합니다.


### 추가 리소스

Amazon SageMaker를 처음 사용하는 경우, SageMaker 상에서 SMP로 PyTorch 모델을 훈련 시 다음 정보들이 도움이 될 수 있습니다.

* SageMaker 모델 병렬 처리 라이브러리에 대한 자세한 내용은 [SageMaker Distributed를 사용한 모델 병렬 분산 훈련](http://docs.aws.amazon.com/sagemaker/latest/dg/model-parallel.html)을 참조하세요.

* Pytorch와 함께 SageMaker Python SDK를 사용하는 방법에 대한 자세한 내용은 [SageMaker Python SDK와 함께 PyTorch 사용](https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html)을 참조하세요.

* 자체 훈련 이미지로 Amazon SageMaker에서 훈련 작업을 시작하는 방법에 대한 자세한 내용은 [자체 훈련 알고리즘 사용](https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html)을 참조하세요.

## Amazon SageMaker 초기화

다음 셀을 실행하여 노트북 인스턴스를 초기화합니다. 이 노트북을 실행하는 데 사용되는 SageMaker 실행 역할을 가져옵니다.

In [1]:
pip install sagemaker-experiments

Collecting sagemaker-experiments
  Downloading sagemaker_experiments-0.1.25-py3-none-any.whl (40 kB)
[K     |████████████████████████████████| 40 kB 995 bytes/s a 0:00:01
Installing collected packages: sagemaker-experiments
Successfully installed sagemaker-experiments-0.1.25
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install sagemaker --upgrade

Requirement already up-to-date: sagemaker in /opt/conda/lib/python3.7/site-packages (2.19.0)
Note: you may need to restart the kernel to use updated packages.


In [3]:
%%time
import sagemaker
from sagemaker import get_execution_role
from sagemaker.pytorch import PyTorch
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
import boto3
from time import gmtime, strftime

role = get_execution_role() # provide a pre-existing role ARN as an alternative to creating a new role
print(f'SageMaker Execution Role:{role}')

session = boto3.session.Session()

SageMaker Execution Role:arn:aws:iam::143656149352:role/service-role/AmazonSageMaker-ExecutionRole-20200526T075882
CPU times: user 647 ms, sys: 95.7 ms, total: 743 ms
Wall time: 902 ms


## 훈련 스크립트 준비

이 데모에서 사용할 예제 훈련 스크립트를 보려면 다음 셀을 실행하세요. 이것은 MNIST 데이터셋을 사용하는 PyTorch 1.6 훈련 스크립트입니다.

스크립트에 모델 병렬 학습을 구성하는 `SMP` 특정 오퍼레이션 및 데코레이터가 포함되어 있음을 알 수 있습니다. 스크립트에 사용된 SMP 함수 및 유형에 대한 자세한 내용은 훈련 스크립트 주석을 참조하세요.

In [6]:
%%writefile utils/pt_mnist.py

# Future
from __future__ import print_function

# Standard Library
import os, time
import argparse
import math
import random

# Third Party
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.cuda.amp import autocast
from torch.optim.lr_scheduler import StepLR
from torchnet.dataset import SplitDataset
from torchvision import datasets, transforms

# First Party
import smdistributed.modelparallel.torch as smp

# SM Distributed: import scaler from smdistributed.modelparallel.torch.amp, instead of torch.cuda.amp

# Make cudnn deterministic in order to get the same losses across runs.
# The following two lines can be removed if they cause a performance impact.
# For more details, see:
# https://pytorch.org/docs/stable/notes/randomness.html#cudnn
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False


def aws_s3_sync(source, destination):
    
    """aws s3 sync in quiet mode and time profile"""
    import time, subprocess
    cmd = ["aws", "s3", "sync", "--quiet", source, destination]
    print(f"Syncing files from {source} to {destination}")
    start_time = time.time()
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    p.wait()
    end_time = time.time()
    print("Time Taken to Sync: ", (end_time-start_time))
    return

def sync_local_checkpoints_to_s3(local_path="/opt/ml/checkpoints", s3_path=os.path.dirname(os.path.dirname(os.getenv('SM_MODULE_DIR', '')))+'/checkpoints'):
    
    """ sample function to sync checkpoints from local path to s3 """

    import boto3, botocore
    #check if local path exists
    if not os.path.exists(local_path):
        raise RuntimeError("Provided local path {local_path} does not exist. Please check")

    #check if s3 bucket exists
    s3 = boto3.resource('s3')
    if 's3://' not in s3_path:
        raise ValueError("Provided s3 path {s3_path} is not valid. Please check")

    s3_bucket = s3_path.replace('s3://','').split('/')[0]
    print(f"S3 Bucket: {s3_bucket}")
    try:
        s3.meta.client.head_bucket(Bucket=s3_bucket)
    except botocore.exceptions.ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == '404':
            raise RuntimeError('S3 bucket does not exist. Please check')
    aws_s3_sync(local_path, s3_path)
    return

def sync_s3_checkpoints_to_local(local_path="/opt/ml/checkpoints", s3_path=os.path.dirname(os.path.dirname(os.getenv('SM_MODULE_DIR', '')))+'/checkpoints'):
    
    """ sample function to sync checkpoints from s3 to local path """

    import boto3, botocore
    #creat if local path does not exists
    if not os.path.exists(local_path):
        print(f"Provided local path {local_path} does not exist. Creating...")
        try:
            os.makedirs(local_path)
        except Exception as e:
            raise RuntimeError(f"failed to create {local_path}")

    #check if s3 bucket exists
    s3 = boto3.resource('s3')
    if 's3://' not in s3_path:
        raise ValueError("Provided s3 path {s3_path} is not valid. Please check")

    s3_bucket = s3_path.replace('s3://','').split('/')[0]
    print(f"S3 Bucket: {s3_bucket}")
    try:
        s3.meta.client.head_bucket(Bucket=s3_bucket)
    except botocore.exceptions.ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == '404':
            raise RuntimeError('S3 bucket does not exist. Please check')
    aws_s3_sync(s3_path, local_path)
    return

class Net1(nn.Module):
    def __init__(self):
        super(Net1, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = torch.flatten(x, 1)
        return x


class Net2(nn.Module):
    def __init__(self):
        super(Net2, self).__init__()
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        output = F.log_softmax(x, 1)
        return output


class GroupedNet(nn.Module):
    def __init__(self):
        super(GroupedNet, self).__init__()
        self.net1 = Net1()
        self.net2 = Net2()

    def forward(self, x):
        x = self.net1(x)
        x = self.net2(x)
        return x


# SM Distributed: Define smp.step. Return any tensors needed outside.
@smp.step
def train_step(model, scaler, data, target):
    with autocast(1 > 0):
        output = model(data)

    loss = F.nll_loss(output, target, reduction="mean")

    scaled_loss = loss
    model.backward(scaled_loss)
    return output, loss


def train(model, scaler, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # SM Distributed: Move input tensors to the GPU ID used by the current process,
        # based on the set_device call.
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        # Return value, loss_mb is a StepOutput object
        _, loss_mb = train_step(model, scaler, data, target)

        # SM Distributed: Average the loss across microbatches.
        loss = loss_mb.reduce_mean()

        optimizer.step()

        if smp.rank() == 0 and batch_idx % 10 == 0:
            print(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_loader.dataset),
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )


# SM Distributed: Define smp.step for evaluation.
@smp.step
def test_step(model, data, target):
    output = model(data)
    loss = F.nll_loss(output, target, reduction="sum").item()  # sum up batch loss
    pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
    correct = pred.eq(target.view_as(pred)).sum().item()
    return loss, correct


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            # SM Distributed: Moves input tensors to the GPU ID used by the current process
            # based on the set_device call.
            data, target = data.to(device), target.to(device)

            # Since test_step returns scalars instead of tensors,
            # test_step decorated with smp.step will return lists instead of StepOutput objects.
            loss_batch, correct_batch = test_step(model, data, target)
            test_loss += sum(loss_batch)
            correct += sum(correct_batch)

    test_loss /= len(test_loader.dataset)
    if smp.mp_rank() == 0:
        print(
            "\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
                test_loss,
                correct,
                len(test_loader.dataset),
                100.0 * correct / len(test_loader.dataset),
            )
        )
    return test_loss

def main():
    if not torch.cuda.is_available():
        raise ValueError("The script requires CUDA support, but CUDA not available")
    use_ddp = True
    use_horovod = False

    # Fix seeds in order to get the same losses across runs
    random.seed(1)
    np.random.seed(1)
    torch.manual_seed(1)
    torch.cuda.manual_seed(1)

    smp.init()

    # SM Distributed: Set the device to the GPU ID used by the current process.
    # Input tensors should be transferred to this device.
    torch.cuda.set_device(smp.local_rank())
    device = torch.device("cuda")
    kwargs = {"batch_size": 64}
    kwargs.update({"num_workers": 1, "pin_memory": True, "shuffle": False})

    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    )

    # SM Distributed: Download only on a single process per instance.
    # When this is not present, the file is corrupted by multiple processes trying
    # to download and extract at the same time
    if smp.local_rank() == 0:
        dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform)
    smp.barrier()
    dataset1 = datasets.MNIST("../data", train=True, download=False, transform=transform)

    if (use_ddp or use_horovod) and smp.dp_size() > 1:
        partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
        dataset1 = SplitDataset(dataset1, partitions=partitions_dict)
        dataset1.select(f"{smp.dp_rank()}")

    # Download and create dataloaders for train and test dataset
    dataset2 = datasets.MNIST("../data", train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(dataset1, **kwargs)
    test_loader = torch.utils.data.DataLoader(dataset2, **kwargs)

    model = GroupedNet()

    # SMP handles the transfer of parameters to the right device
    # and the user doesn't need to call 'model.to' explicitly.
    # model.to(device)
    optimizer = optim.Adadelta(model.parameters(), lr=4.0)

    # SM Distributed: Use the DistributedModel container to provide the model
    # to be partitioned across different ranks. For the rest of the script,
    # the returned DistributedModel object should be used in place of
    # the model provided for DistributedModel class instantiation.
    model = smp.DistributedModel(model)
    scaler = smp.amp.GradScaler()
    optimizer = smp.DistributedOptimizer(optimizer)

    scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
    for epoch in range(1, 2):
        train(model, scaler, device, train_loader, optimizer, epoch)
        test_loss = test(model, device, test_loader)
        scheduler.step()
        
    if smp.rank() == 0:
        if os.path.exists('/opt/ml/local_checkpoints'):
            print("-INFO- PATH DO EXIST")
        else:
            os.makedirs('/opt/ml/local_checkpoints')
            print("-INFO- PATH DO NOT EXIST")

    # Waiting the save checkpoint to be finished before run another allgather_object
    smp.barrier()
    
    if smp.dp_rank() == 0:
        model_dict = model.local_state_dict()
        opt_dict = optimizer.local_state_dict()
        smp.save(
                {"model_state_dict": model_dict, "optimizer_state_dict": opt_dict},
                f"/opt/ml/local_checkpoints/pt_mnist_checkpoint.pt",
                partial=True,
            )
    smp.barrier()
    
    if smp.local_rank() == 0:
        print("Start syncing")
        base_s3_path = os.path.dirname(os.path.dirname(os.getenv('SM_MODULE_DIR', '')))
        curr_host = os.getenv('SM_CURRENT_HOST')
        full_s3_path = f'{base_s3_path}/checkpoints/{curr_host}/'
        sync_local_checkpoints_to_s3(local_path='/opt/ml/local_checkpoints', s3_path=full_s3_path)
        print("Finished syncing")
        


if __name__ == "__main__":
    main()

Overwriting utils/pt_mnist.py


## SageMaker 훈련 작업 정의

다음으로 SageMaker Estimator API를 사용하여 SageMaker 훈련 작업을 정의합니다. [`Estimator`](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)를 사용하여 Amazon SageMaker가 훈련에 사용하는 EC2 인스턴스의 수와 유형과 해당 인스턴스에 연결된 볼륨의 크기를 정의합니다.

다음을 업데이트할 수 있습니다.
* `processes_per_host`
* `entry_point`
* `instance_count`
* `instance_type`
* `base_job_name`

또한 SageMaker Distributed Model Parallel 라이브러리에 대한 설정 파라메터를 제공하고 수정할 수 있습니다. 이러한 파라메터는 아래와 같이 `distributions` 인수를 통해 전달됩니다.

### 사용할 EC2 인스턴스의 유형 및 개수 업데이트

`processes_per_host`를 지정하세요. 기본적으로 파티션의 2배수여야 합니다. (예: 2, 4, ...)

`instance_type` 및 `instance_count`에서 각각 지정하는 인스턴스 유형 및 인스턴스 수에 따라 Amazon SageMaker가 훈련 중에 사용하는 GPU 수가 결정됩니다. 명시 적으로`instance_type`은 단일 인스턴스의 GPU 수를 결정하고 그 숫자에 `instance_count`를 곱합니다.

훈련에 사용할 수 있는 총 GPU 수가 훈련 스크립트의 `smp.init`의 `config`에 있는 `partitions`와 같도록 `instance_type`및 `instance_count`의 값을 지정해야 합니다.


인스턴스 유형을 확인하려면 [Amazon EC2 인스턴스 유형](https://aws.amazon.com/sagemaker/pricing/)을 참조하세요.


### 훈련 중 체크 포인트 업로드 또는 이전 훈련에서 체크 포인트 재개
또한 사용자가 훈련 중에 체크 포인트를 업로드하거나 이전 훈련에서 체크 포인트를 재개할 수있는 맞춤형 방법을 제공합니다. 자세한 방법은 `aws_s3_sync`, `sync_local_checkpoints_to_s3` 및` sync_s3_checkpoints_to_local` 함수를 참조하세요.
`pt_mnist.py` 예제 스크립트에서 이를 확인할 수 있으며, 이 예제에서는 `sync_local_checkpoints_to_s3`을 사용하여 훈련 중에 체크 포인트만 업로드합니다.

After you have updated `entry_point`, `instance_count`, `instance_type` and `base_job_name`, run the following to create an estimator. 

In [7]:
sagemaker_session = sagemaker.session.Session(boto_session=session)
mpioptions = "-verbose -x orte_base_help_aggregate=0 "
mpioptions += "--mca btl_vader_single_copy_mechanism none "

all_experiment_names = [exp.experiment_name for exp in Experiment.list()]

#choose an experiment name (only need to create it once)
experiment_name = "SM-MP-DEMO"

# Load the experiment if it exists, otherwise create 
if experiment_name not in all_experiment_names:
    customer_churn_experiment = Experiment.create(
        experiment_name=experiment_name, sagemaker_boto_client=boto3.client("sagemaker")
    )
else:
    customer_churn_experiment = Experiment.load(
        experiment_name=experiment_name, sagemaker_boto_client=boto3.client("sagemaker")
    )

# Create a trial for the current run
trial = Trial.create(
        trial_name="SMD-MP-demo-{}".format(strftime("%Y-%m-%d-%H-%M-%S", gmtime())),
        experiment_name=customer_churn_experiment.experiment_name,
        sagemaker_boto_client=boto3.client("sagemaker"),
    )


smd_mp_estimator = PyTorch(
          entry_point="pt_mnist.py", # Pick your train script
          source_dir='utils',
          role=role,
          instance_type='ml.p3.16xlarge',
          sagemaker_session=sagemaker_session,
          framework_version='1.6.0',
          py_version='py36',
          instance_count=1,
          distribution={
              "smdistributed": {
                  "modelparallel": {
                      "enabled":True,
                      "parameters": {
                          "microbatches": 4,
                          "placement_strategy": "spread",
                          "pipeline": "interleaved",
                          "optimize": "speed",
                          "partitions": 2,
                          "ddp": True,
                      }
                  }
              },
              "mpi": {
                    "enabled": True,
                    "processes_per_host": 2, # Pick your processes_per_host
                    "custom_mpi_options": mpioptions 
              },
          },
          base_job_name="SMD-MP-demo",
      )


마지막으로 estimator를 사용하여 SageMaker 훈련 작업을 시작합니다.

In [8]:
%%time
smd_mp_estimator.fit(
        experiment_config={
            "ExperimentName": customer_churn_experiment.experiment_name,
            "TrialName": trial.trial_name,
            "TrialComponentDisplayName": "Training",
        })

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: latest.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating training-job with name: SMD-MP-demo-2020-12-13-08-31-43-559


2020-12-13 08:31:44 Starting - Starting the training job...
2020-12-13 08:32:07 Starting - Launching requested ML instancesProfilerReport-1607848303: InProgress
............
2020-12-13 08:34:09 Starting - Preparing the instances for training.........
2020-12-13 08:35:30 Downloading - Downloading input data...
2020-12-13 08:36:11 Training - Downloading the training image...............
2020-12-13 08:38:33 Training - Training image download completed. Training in progress.[34mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[34mbash: no job control in this shell[0m
[34m2020-12-13 08:38:30,339 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training[0m
[34m2020-12-13 08:38:30,418 sagemaker_pytorch_container.training INFO     Block until all host DNS lookups succeed.[0m
[34m2020-12-13 08:38:30,427 sagemaker_pytorch_container.training INFO     Invoking user training script.[0m
[34m2020-12-13 08:38:30,916 sagemaker-