# Module 3. Training on Amazon SageMaker using SageMaker Data Parallelism Library
---

***[주의] 본 세션은 최소 1개 이상의 `ml.p3.16xlarge` 훈련 인스턴스가 필요합니다. 이벤트 엔진으로 핸즈온 진행 시는 반드시 훈련 인스턴스의 리소스 가용 여부를 확인해 주세요.***

본 모듈에서는 re:Invent 2020에서 새로 추가된 SageMaker Data Parallelism Library를 사용하여 분산 훈련을 수행합니다.
분산 훈련 및 디버깅에 대한 세션을 아래 동영상에서 보실 수 있습니다.

- [Amazon SageMaker를 통한 딥러닝 분산 학습 및 디버거 프로파일링 활용하기](https://www.youtube.com/watch?v=lsTtoACAPj4)

## Amazon SageMaker Distributed Training
Amazon SageMaker에서는 2가지 분산 학습 알고리즘을 AWS 분산 학습 클러스터에 특화하여 제공하고 있습니다. 
SageMaker Data Parallelism library는 GPU 간 또는 여러 인스턴스 간의 네트워크를 통해 주고 받는 모델 파라메터의 병목을 개선합니다. 이를 위해, backward pass에서의 최적화된 AllReduce 오퍼레이터 활용 방법 등의 다양한 기법을 적용하였고, 특히 Balanced Fusion Buffer로 Elastic Fabric Adapter의 성능을 최대한 발휘할 수 있도록 하였습니다. 
SageMaker Model Parallelism library는 모델 구조를 분석하여 방향성 비사이클 그래프로 구성한 다음, subgraph형태로 분리하여 각 GPU 노드에서 학습을 진행하는 방식입니다. 따라서, 단일 GPU 노드에서는 일부 모델 파라미터에 대해서만 학습이 일어나므로 GPU 메모리의 부담이 줄어듭니다.

### Traditional Data Parallelism
![smdataparallel1](imgs/smdataparallel1.jpg)
**[그림 1]** Parameter Server and Ring All-reduce

딥러닝 훈련은 매 스텝(step)마다 gradient를 계산해서 모델의 파라메터를 반복적으로 업데이트하는 과정입니다. Data parallel 환경에서는 여러 GPU에서 동일한 모델 파라미터와 다른 입력값으로 각 GPU마다 다른 gradient를 계산하고, 이를 다 합산한 값을 가지고 각 GPU의 모델 파라메터를 업데이트합니다.
대표적인 방식으로, 파라메터 서버(Parameter Server)와 Horovod에서 쓰이는 Ring All-reduce 방식이 있습니다.

파라메터 서버는 각 GPU에서 계산된 gradient를 파라메터 서버에서 취합 후 모델 파라메터를 관리하는 방식입니다. 이 방식의 문제는 특정 파라메터가 몰리는 GPU 쪽 트래픽에서 병목이 발생합니다. 
따라서, 모든 GPU가 고르게 데이터를 주고 받기 위해 별도의 파라메터 서버를 사용하는 대신 Ring All-reduce 방식도 많이 사용합니다. 특정 GPU에 트래픽이 몰리는 현상을 개선할 수 있지만, 이 또한 communication cost가 많이 소요됩니다. **[그림 1]** 의 오른쪽과 같이 현 GPU의 gradient를 다음 GPU로 패싱하면서 한 바퀴를 돌고 gradient들을 누적 합산하여 평균 gradient를 계산해야 합니다. 그런 다음, 모델 파라메터 업데이트를 위해 다시 한 바퀴를 돌아야 하기 때문에 두 바퀴를 돌게 됩니다.

### Balanced Fusion Buffers (BFB)

SageMaker Data Parallelism Library은 파라메터 서버를 그대로 이용하면서도 모든 GPU를 고르게 활용하는데, 여기에서 핵심이 되는 컨셉이 바로 Balanced Fusion Buffer입니다. 

쉬운 예시로, 4장의 GPU가 있고 2개의 파라메터가 있다고 가정하겠습니다.파라메터 Wh1 용량이 500메가이고 Wh2 용량이 100메가라면 기존 파라메터 서버에서는 2장의 GPU만 활용하고 나머지 2장의 GPU는 놀고 있습니다. 게다가 첫번째 GPU에서 500메가 파라메터를 계산하느라 추가적인 병목이 발생합니다.

Balanced Fusion Buffer는 각 GPU마다 동일한 버퍼 사이즈를 할당합니다. 각 GPU에서 동일하게 150메가 파라메터를 처리하므로 모든 GPU 리소스를 사용할 수 있습니다. 구체적으로는 Wh1을 4개로 분할하여 각 GPU에 할당하고, 마지막 GPU에서는 150메가 버퍼 중에서 50메가는 Wh1 파라메터를 처리하고, 100메가는 Wh2 파라메터를 처리합니다. 


또한, 기존 방법은 어떤 gradient가 준비되었는지 확인하기 위한 별도의 협상 과정인 negotiation이 필요한데, Balanced Fusion Buffer에서는 순서를 기억하고 있다가 곧바로 매핑해 주기 때문에 negotiation에 들어가는 추가 지연이 없습니다.

예를 들어, 버퍼 사이즈가 100MB이고 5 3 4 2 1 순서대로 들어오는 gradient 용량은 각각 50MB, 50MB, 250MB, 75MB, 100MB 라고 가정하겠습니다. 
0번 GPU는 5와 3이 0번 버퍼로 들어갑니다. 그리고 4는 용량이 크기 때문에 3개의 버퍼에 분산되어 들어갑니다. 2는 3번 버퍼에서 남는 공간과 그 다음 버퍼의 앞에 들어가고요 (클릭) 1도 마찬가지로 4번 버퍼의 남는 공간과 마지막 버퍼의 앞에 들어갑니다.

그런데, 1번 GPU에서 순서가 꼬여서 5 4 3 2 1 이 들어오면 어떻게 될까요? 3이 4 뒤에 있다 하더라도 Balanced Fusion Buffer에 매핑되는 순서는 변함이 없기 때문에, 버퍼 내 순서가 그대로 보장됩니다. **[그림 2]** 에서 4보다 3이 먼저 들어가는 것을 확인할 수 있습니다.

![smdataparallel2](imgs/smdataparallel2.jpg)
**[그림 2]** Balanced Fusion Buffer without negotiation

<br>

## 1. Training script
---

아래 코드 셀은 `src` 디렉토리에 SageMaker 훈련 스크립트인 `train_smdataparallel.py`를 저장합니다.<br>
훈련에 필요한 코드는 몇 줄의 코드만 수정하시면 되며, 자세한 내용을 개발자 문서에서 확인할 수 있습니다.

#### Code Snippet: PyTorch Training Script 
```python
import smdistributed.dataparallel.torch.distributed as dist
from smdistributed.dataparallel.torch.parallel.distributed import DistributedDataParallel as DDP

# Scale Parameters
dist.init_process_group()
batch_size //= dist.get_world_size() // 8
batch_size = max(batch_size, 1)

# Set rank in DistributedSampler
train_sampler = DistributedSampler(train_dataset, num_replicas=dist.get_world_size(), rank=dist.get_rank())
train_loader = torch.utils.data.DataLoader(..)

# Pin each GPU
model = DDP(model.to(device))
torch.cuda.set_device(dist.get_local_rank())
model.cuda(dist.get_local_rank())

# Checkpoint on master node
if dist.get_rank() == 0:
    torch.save(checkpoint_dir)
``` 

In [1]:
%%writefile ./src/train_smdataparallel.py

import argparse
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import os
import sys
import time, datetime
import gc

import warnings
warnings.filterwarnings('ignore')

from torch.utils.data import Dataset
from sklearn.metrics import recall_score
import logging
import logging.handlers

import matplotlib.pyplot as plt
import joblib


# Import SMDataParallel PyTorch Modules
from smdistributed.dataparallel.torch.parallel.distributed import DistributedDataParallel as DDP
import smdistributed.dataparallel.torch.distributed as dist
dist.init_process_group()


HEIGHT = 137
WIDTH = 236
BATCH_SIZE = 256
NUM_WORKERS = 4

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))


class BangaliDataset(Dataset):
    def __init__(self, imgs, label_df=None, transform=None):
        self.imgs = imgs
        self.label_df = label_df.reset_index(drop=True)
        self.transform = transform
        
    def __len__(self):
        return len(self.label_df)
    
    def __getitem__(self, idx):
        
        img_idx = self.label_df.iloc[idx].id
        img = (self.imgs[img_idx]).astype(np.uint8)
        img = 255 - img
    
        img = img[:,:,np.newaxis]
        img = np.repeat(img, 3, axis=2)
        
        if self.transform is not None:
            img = self.transform(image=img)['image']        
        
        if self.label_df is not None:
            label_1 = self.label_df.iloc[idx].grapheme_root
            label_2 = self.label_df.iloc[idx].vowel_diacritic
            label_3 = self.label_df.iloc[idx].consonant_diacritic           
            return img, np.array([label_1, label_2, label_3])        
        else:
            return img
        
        
def _set_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    mx.random.seed(seed)

def _get_images(train_dir, num_folds=5, vld_fold_idx=4, data_type='train'):

    logger.info("=== Getting Labels ===")
    logger.info(train_dir)
    
    label_df = pd.read_csv(os.path.join(train_dir, 'train_folds.csv'))
    #label_df = pd.read_csv(f'{train_dir}/train_folds.csv')
     
    trn_fold = [i for i in range(num_folds) if i not in [vld_fold_idx]]
    vld_fold = [vld_fold_idx]

    trn_idx = label_df.loc[label_df['fold'].isin(trn_fold)].index
    vld_idx = label_df.loc[label_df['fold'].isin(vld_fold)].index

    logger.info("=== Getting Images ===")    
    files = [f'{train_dir}/{data_type}_image_data_{i}.feather' for i in range(4)]
    logger.info(files)
    
    image_df_list = [pd.read_feather(f) for f in files]
    imgs = [df.iloc[:, 1:].values.reshape(-1, HEIGHT, WIDTH) for df in image_df_list]
    del image_df_list
    gc.collect()
    imgs = np.concatenate(imgs, axis=0)
    
    trn_df = label_df.loc[trn_idx]
    vld_df = label_df.loc[vld_idx]
    
    return imgs, trn_df, vld_df       
                           
                           
def _get_data_loader(imgs, trn_df, vld_df):

    import albumentations as A
    from albumentations import (
        Rotate,HorizontalFlip, IAAPerspective, ShiftScaleRotate, CLAHE, RandomRotate90,
        Transpose, ShiftScaleRotate, Blur, OpticalDistortion, GridDistortion, HueSaturationValue,
        IAAAdditiveGaussianNoise, GaussNoise, MotionBlur, MedianBlur, RandomBrightnessContrast, IAAPiecewiseAffine,
        IAASharpen, IAAEmboss, Flip, OneOf, Compose
    )
    from albumentations.pytorch import ToTensor, ToTensorV2

    train_transforms = A.Compose([
        Rotate(20),
            OneOf([
                IAAAdditiveGaussianNoise(),
                GaussNoise(),
            ], p=0.2),
            OneOf([
                MotionBlur(p=.2),
                MedianBlur(blur_limit=3, p=0.1),
                Blur(blur_limit=3, p=0.1),
            ], p=0.2),
            ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.2, rotate_limit=45, p=0.2),
            OneOf([
                OpticalDistortion(p=0.3),
                GridDistortion(p=.1),
                IAAPiecewiseAffine(p=0.3),
            ], p=0.2),
            OneOf([
                CLAHE(clip_limit=2),
                IAASharpen(),
                IAAEmboss(),
                RandomBrightnessContrast(),            
            ], p=0.3),
            HueSaturationValue(p=0.3),
        ToTensor()
        ], p=1.0)


    valid_transforms = A.Compose([
        ToTensor()
    ])

    from torch.utils.data import Dataset, DataLoader
    trn_dataset = BangaliDataset(imgs=imgs, label_df=trn_df, transform=train_transforms)
    vld_dataset = BangaliDataset(imgs=imgs, label_df=vld_df, transform=valid_transforms)
    
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    
    trn_sampler = torch.utils.data.distributed.DistributedSampler(
        trn_dataset, 
        num_replicas=world_size, # worldsize만큼 분할
        rank=rank)
    
    trn_loader = DataLoader(trn_dataset, 
                            shuffle=False, 
                            num_workers=8,
                            pin_memory=True,
                            batch_size=BATCH_SIZE,
                           sampler=trn_sampler)  
    
    vld_loader = DataLoader(vld_dataset, shuffle=False, num_workers=NUM_WORKERS, batch_size=BATCH_SIZE)  
    return trn_loader, vld_loader


def _rand_bbox(size, lam):
    '''
    CutMix Helper function.
    Retrieved from https://github.com/clovaai/CutMix-PyTorch/blob/master/train.py
    '''
    W = size[2]
    H = size[3]
    # 폭과 높이는 주어진 이미지의 폭과 높이의 beta distribution에서 뽑은 lambda로 얻는다
    cut_rat = np.sqrt(1. - lam)
    
    # patch size 의 w, h 는 original image 의 w,h 에 np.sqrt(1-lambda) 를 곱해준 값입니다.
    cut_w = np.int(W * cut_rat)
    cut_h = np.int(H * cut_rat)

    # patch의 중심점은 uniform하게 뽑힘
    cx = np.random.randint(W)
    cy = np.random.randint(H)

    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)

    return bbx1, bby1, bbx2, bby2


def _format_time(elapsed):
    '''
    Takes a time in seconds and returns a string hh:mm:ss
    '''
    # Round to the nearest second.
    elapsed_rounded = int(round((elapsed)))
    
    # Format as hh:mm:ss
    return str(datetime.timedelta(seconds=elapsed_rounded))
            
def train_model(args):
    from torchvision import datasets, models
    from tqdm import tqdm
    
    imgs, trn_df, vld_df = _get_images(args.train_dir, args.num_folds, args.vld_fold_idx, data_type='train')
    trn_loader, vld_loader = _get_data_loader(imgs, trn_df, vld_df)

    
    logger.info("=== Getting Pre-trained model ===")    
    model = models.resnet18(pretrained=True)
    last_hidden_units = model.fc.in_features
    model.fc = torch.nn.Linear(last_hidden_units, 186)
#     len_buffer =  len(list(module.buffers()))

#     logger.info("=== Buffer ===")    
#     print(f"len_buffer={len_buffer}")
#     print(list(model.buffers()))
    
    # SDP: Pin each GPU to a single process
    # Use SMDataParallel PyTorch DDP for efficient distributed training
    model = DDP(model.to(args.device), broadcast_buffers=False)

    # SDP: Pin each GPU to a single SDP process.
    torch.cuda.set_device(args.local_rank)
    model.cuda(args.local_rank)

    optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)
    loss_fn = nn.CrossEntropyLoss()
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max',
                                                          verbose=True, patience=5, 
                                                          factor=0.5)

    best_score = -1
    training_stats = []
    logger.info("=== Start Training ===")    

    for epoch_id in range(args.num_epochs):

        ################################################################################
        # ==> Training phase
        ################################################################################    
        trn_loss = []
        model.train()

        # Measure how long the training epoch takes.
        t0 = time.time()
        running_loss = 0.0

        for batch_id, (inputs, targets) in enumerate((trn_loader)):
            inputs = inputs.cuda()
            targets = targets.cuda()
            targets_gra = targets[:, 0]
            targets_vow = targets[:, 1]
            targets_con = targets[:, 2]

            # 50%의 확률로 원본 데이터 그대로 사용    
            if np.random.rand() < 0.5:
                logits = model(inputs)
                grapheme = logits[:, :168]
                vowel = logits[:, 168:179]
                cons = logits[:, 179:]

                loss1 = loss_fn(grapheme, targets_gra)
                loss2 = loss_fn(vowel, targets_vow)
                loss3 = loss_fn(cons, targets_con) 

            else:

                lam = np.random.beta(1.0, 1.0) 
                rand_index = torch.randperm(inputs.size()[0])
                shuffled_targets_gra = targets_gra[rand_index]
                shuffled_targets_vow = targets_vow[rand_index]
                shuffled_targets_con = targets_con[rand_index]

                bbx1, bby1, bbx2, bby2 = _rand_bbox(inputs.size(), lam)
                inputs[:, :, bbx1:bbx2, bby1:bby2] = inputs[rand_index, :, bbx1:bbx2, bby1:bby2]
                # 픽셀 비율과 정확히 일치하도록 lambda 파라메터 조정  
                lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (inputs.size()[-1] * inputs.size()[-2]))

                logits = model(inputs)
                grapheme = logits[:,:168]
                vowel = logits[:, 168:179]
                cons = logits[:, 179:]

                loss1 = loss_fn(grapheme, targets_gra) * lam + loss_fn(grapheme, shuffled_targets_gra) * (1. - lam)
                loss2 = loss_fn(vowel, targets_vow) * lam + loss_fn(vowel, shuffled_targets_vow) * (1. - lam)
                loss3 = loss_fn(cons, targets_con) * lam + loss_fn(cons, shuffled_targets_con) * (1. - lam)

            loss = 0.5 * loss1 + 0.25 * loss2 + 0.25 * loss3    
            trn_loss.append(loss.item())
            running_loss += loss.item()
            loss.backward()

            optimizer.step()
            optimizer.zero_grad()

            # Printing vital information
            if (batch_id + 1) % (args.log_interval) == 0:
                s = f'[Epoch {epoch_id} Batch {batch_id+1}/{len(trn_loader)}] ' \
                f'loss: {running_loss / args.log_interval:.4f}'
                print(s)
                running_loss = 0

        # Measure how long this epoch took.
        trn_time = _format_time(time.time() - t0)        


        if args.rank == 0:    
            ################################################################################
            # ==> Validation phase
            ################################################################################
            val_loss = []
            val_true = []
            val_pred = []
            model.eval()

            # === Validation phase ===
            logger.info('=== Start Validation ===')        

            with torch.no_grad():
                for inputs, targets in vld_loader:
                    inputs = inputs.cuda()
                    targets = targets.cuda()
                    logits = model(inputs)
                    grapheme = logits[:,:168]
                    vowel = logits[:, 168:179]
                    cons = logits[:, 179:]

                    loss= 0.5* loss_fn(grapheme, targets[:,0]) + 0.25*loss_fn(vowel, targets[:,1]) + \
                    0.25*loss_fn(vowel, targets[:,2])
                    val_loss.append(loss.item())

                    grapheme = grapheme.cpu().argmax(dim=1).data.numpy()
                    vowel = vowel.cpu().argmax(dim=1).data.numpy()
                    cons = cons.cpu().argmax(dim=1).data.numpy()

                    val_true.append(targets.cpu().numpy())
                    val_pred.append(np.stack([grapheme, vowel, cons], axis=1))                

            val_true = np.concatenate(val_true)
            val_pred = np.concatenate(val_pred)
            val_loss = np.mean(val_loss)
            trn_loss = np.mean(trn_loss)

            score_g = recall_score(val_true[:,0], val_pred[:,0], average='macro')
            score_v = recall_score(val_true[:,1], val_pred[:,1], average='macro')
            score_c = recall_score(val_true[:,2], val_pred[:,2], average='macro')
            final_score = np.average([score_g, score_v, score_c], weights=[2,1,1])

            # Printing vital information
            s = f'[Epoch {epoch_id}] ' \
            f'trn_loss: {trn_loss:.4f}, vld_loss: {val_loss:.4f}, score: {final_score:.4f}, ' \
            f'score_each: [{score_g:.4f}, {score_v:.4f}, {score_c:.4f}]'          
            print(s)

            ################################################################################
            # ==> Save checkpoint and training stats
            ################################################################################        
            if final_score > best_score:
                best_score = final_score
                state_dict = model.cpu().state_dict()
                model = model.cuda()
                torch.save(state_dict, os.path.join(args.model_dir, 'model.pt'))

            # Record all statistics from this epoch
            training_stats.append(
                {
                    'epoch': epoch_id + 1,
                    'trn_loss': trn_loss,
                    'trn_time': trn_time,            
                    'val_loss': val_loss,
                    'score': final_score,
                    'score_g': score_g,
                    'score_v': score_v,
                    'score_c': score_c            
                }
            )      

            # === Save Model Parameters ===
            logger.info("Model successfully saved at: {}".format(args.model_dir))            

        
def parser_args():
    
    parser = argparse.ArgumentParser()

    # Hyperparameters sent by the client are passed as command-line arguments to the script.
    parser.add_argument('--num_epochs', type=int, default=1)
    parser.add_argument('--num_folds', type=int, default=5)
    parser.add_argument('--vld_fold_idx', type=int, default=4)
    parser.add_argument('--batch_size', type=int, default=256)
    parser.add_argument('--lr', type=float, default=0.001)
    parser.add_argument('--seed', type=int, default=1)
    parser.add_argument('--log_interval', type=int, default=10) 

    # SageMaker Container environment    
    parser.add_argument('--train_dir', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
    parser.add_argument('--model_dir', type=str, default=os.environ['SM_MODEL_DIR'])    
    parser.add_argument('--num_gpus', type=int, default=os.environ['SM_NUM_GPUS'])
 
    args = parser.parse_args() 
    return args
        
    
if __name__ =='__main__':

    #parse arguments
    args = parser_args() 
    
    args.world_size = dist.get_world_size()
    args.rank = dist.get_rank()
    args.local_rank = dist.get_local_rank()
    #print(f"rank={args.rank}, local_rank={args.local_rank}")
    args.batch_size //= args.world_size // 8
    args.batch_size = max(args.batch_size, 1)
    
    args.use_cuda = args.num_gpus > 0
    print("args.use_cuda : {} , args.num_gpus : {}".format(
        args.use_cuda, args.num_gpus))
    args.device = torch.device("cuda" if args.use_cuda else "cpu")

    train_model(args)
    

Overwriting ./src/train_smdataparallel.py


<br>

## 2. Training on SageMaker
---

훈련 스크립트가 준비되었다면 SageMaker 훈련을 수행하는 법은 매우 간단합니다. SageMaker Python SDK 활용 시, Estimator 인스턴스를 생성하고 해당 인스턴스의 `fit()` 메서드를 호출하는 것이 전부입니다.

#### 1) `Estimator` 인스턴스 생성 
훈련 컨테이너에 필요한 설정들을 지정합니다. 본 핸즈온에서는 훈련 스크립트 파일이 포함된 경로인 소스 경로와(`source_dir`)와 훈련 스크립트 Python 파일만 엔트리포인트(`entry_point`)로 지정해 주면 됩니다.

#### 2) `fit()` 메서드 호출
`estimator.fit(YOUR_TRAINING_DATA_URI)` 메서드를 호출하면, 훈련에 필요한 인스턴스를 시작하고 컨테이너 환경을 시작합니다. 
필수 인자값은 훈련 데이터가 존해자는 S3 경로(`s3://`)이며, 로컬 모드로 훈련 시에는 로컬 경로(`file://`)를 지정하시면 됩니다. 

인자값 중 wait은 디폴트 값으로 `wait=True`이며, 모든 훈련 작업이 완료될 때까지 코드 셀이 freezing됩니다. 만약 다른 코드 셀을 실행하거나, 다른 훈련 job을 시작하고 싶다면 `wait=False`로 설정하여 Asynchronous 모드로 변경하면 됩니다.

SageMaker 훈련이 끝나면 컨테이너 환경과 훈련 인스턴스는 자동으로 삭제됩니다. 이 때, SageMaker는 자동으로 `SM_MODEL_DIR` 경로에 저장된 최종 모델 아티팩트를 `model.tar.gz`로 압축하여 훈련 컨테이너 환경에서 S3 bucket으로 저장합니다. 당연히, S3 bucket에 저장된 모델 아티팩트를 다운로드받아 로컬 상에서 곧바로 테스트할 수 있습니다.


#### Code Snippet: SageMaker Estimator 호출
```python
# 분산 훈련 수행 선언
distribution = {"smdistributed": {"dataparallel": {"enabled": True}}}
estimator = PyTorch(
    instance_type='ml.p4d.24xlarge', # ml.p3.16xlarge, ml.p3dn.24xlarge 지원
    instance_count=2,
    framework_version='1.6.0',
    py_version='py36’,
    ...
    distribution=distribution,
)
```

In [2]:
import boto3
import sagemaker

boto_session = boto3.Session()
sagemaker_session = sagemaker.Session(boto_session=boto_session)

In [3]:
from sagemaker.pytorch import PyTorch
role = sagemaker.get_execution_role()
bucket = sagemaker.Session().default_bucket()
prefix = 'bangali/train'

In [4]:
estimator = PyTorch(base_job_name='pytorch-smdataparallel-bangali-handwritten',
                    entry_point='train_smdataparallel.py',
                    source_dir='src',
                    role=role,
                    instance_type='ml.p3.16xlarge',
                    instance_count=2,
                    framework_version='1.6.0',
                    py_version='py36',
                    hyperparameters = {'num_epochs': 10, 
                                       'num_folds': 5,
                                       'vld_fold_idx': 4,
                                       'batch_size': 256,
                                       'lr': 0.001,
                                       'log_interval': 10,
                                      },
                    distribution={'smdistributed':{
                                        'dataparallel':{
                                                'enabled': True
                                             }
                                      }
                                  },
                    debugger_hook_config=False)
s3_input_train = sagemaker.TrainingInput(s3_data='s3://{}/{}'.format(bucket, prefix), content_type='csv')    

In [5]:
%%time
estimator.fit(s3_input_train)

2021-03-02 06:04:17 Starting - Starting the training job...
2021-03-02 06:04:41 Starting - Launching requested ML instancesProfilerReport-1614665057: InProgress
............
2021-03-02 06:06:42 Starting - Preparing the instances for training.........
2021-03-02 06:08:14 Downloading - Downloading input data......
2021-03-02 06:09:18 Training - Downloading the training image.........[34mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[34mbash: no job control in this shell[0m
[34m2021-03-02 06:10:45,894 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training[0m
[34m2021-03-02 06:10:45,972 sagemaker_pytorch_container.training INFO     Block until all host DNS lookups succeed.[0m
[35m2021-03-02 06:10:52,233 sagemaker_pytorch_container.training INFO     Invoking SMDataParallel[0m
[35m2021-03-02 06:10:52,233 sagemaker_pytorch_container.training INFO     Invoking user training script.[0m
[35m2021-03-02 06:10:52,71


2021-03-02 06:11:06 Training - Training image download completed. Training in progress.[35m2021-03-02 06:11:08,025 sagemaker-training-toolkit INFO     Process[es]: [psutil.Process(pid=55, name='orted', status='sleeping', started='06:11:06')][0m
[35m2021-03-02 06:11:08,025 sagemaker-training-toolkit INFO     Orted process found [psutil.Process(pid=55, name='orted', status='sleeping', started='06:11:06')][0m
[35m2021-03-02 06:11:08,025 sagemaker-training-toolkit INFO     Waiting for orted process [psutil.Process(pid=55, name='orted', status='sleeping', started='06:11:06')][0m
[34m[1,8]<stdout>:NCCL version 2.7.8+cuda11.0[0m
[34m[1,0]<stdout>:NCCL version 2.7.8+cuda11.0[0m
[34m[1,13]<stdout>:args.use_cuda : True , args.num_gpus : 8[0m
[34m[1,8]<stdout>:args.use_cuda : True , args.num_gpus : 8[0m
[34m[1,15]<stdout>:args.use_cuda : True , args.num_gpus : 8[0m
[34m[1,14]<stdout>:args.use_cuda : True , args.num_gpus : 8[0m
[34m[1,12]<stdout>:args.use_cuda : True , args.num

[34m[1,5]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,1]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,7]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,6]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,4]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,2]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,3]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,0]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,12]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,14]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,8]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,9]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,11]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,13]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,15]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,10]<stdout>:=== Getting Pre-trained model ===[0m
[34m[1,0]<stdout>:=== Start Training ===[0m
[34m[1,2]<stdout>:=== Start Trainin

[34m[1,0]<stdout>:[Epoch 0] trn_loss: 2.3238, vld_loss: 1.5416, score: 0.5718, score_each: [0.4769, 0.6939, 0.6395][0m
[34m[1,0]<stdout>:Model successfully saved at: /opt/ml/model[0m
[34m[1,12]<stdout>:[Epoch 1 Batch 10/40] loss: 1.4637[0m
[34m[1,8]<stdout>:[Epoch 1 Batch 10/40] loss: 1.5463[0m
[34m[1,14]<stdout>:[Epoch 1 Batch 10/40] loss: 1.6857[0m
[34m[1,15]<stdout>:[Epoch 1 Batch 10/40] loss: 1.4459[0m
[34m[1,10]<stdout>:[Epoch 1 Batch 10/40] loss: 1.4196[0m
[34m[1,11]<stdout>:[Epoch 1 Batch 10/40] loss: 1.6493[0m
[34m[1,13]<stdout>:[Epoch 1 Batch 10/40] loss: 1.7038[0m
[34m[1,9]<stdout>:[Epoch 1 Batch 10/40] loss: 1.4178[0m
[34m[1,5]<stdout>:[Epoch 1 Batch 10/40] loss: 1.4853[0m
[34m[1,2]<stdout>:[Epoch 1 Batch 10/40] loss: 1.5725[0m
[34m[1,6]<stdout>:[Epoch 1 Batch 10/40] loss: 1.5323[0m
[34m[1,0]<stdout>:[Epoch 1 Batch 10/40] loss: 1.8691[0m
[34m[1,3]<stdout>:[Epoch 1 Batch 10/40] loss: 1.7042[0m
[34m[1,7]<stdout>:[Epoch 1 Batch 10/40] loss: 1.5550

[34m[1,0]<stdout>:[Epoch 3 Batch 20/40] loss: 1.0503[0m
[34m[1,6]<stdout>:[Epoch 3 Batch 20/40] loss: 1.0428[0m
[34m[1,10]<stdout>:[Epoch 3 Batch 20/40] loss: 1.0796[0m
[34m[1,7]<stdout>:[Epoch 3 Batch 20/40] loss: 0.9183[0m
[34m[1,2]<stdout>:[Epoch 3 Batch 20/40] loss: 1.3849[0m
[34m[1,11]<stdout>:[Epoch 3 Batch 20/40] loss: 1.4606[0m
[34m[1,5]<stdout>:[Epoch 3 Batch 20/40] loss: 1.5220[0m
[34m[1,14]<stdout>:[Epoch 3 Batch 20/40] loss: 1.3371[0m
[34m[1,12]<stdout>:[Epoch 3 Batch 20/40] loss: 1.4372[0m
[34m[1,8]<stdout>:[Epoch 3 Batch 20/40] loss: 1.0746[0m
[34m[1,15]<stdout>:[Epoch 3 Batch 20/40] loss: 1.1110[0m
[34m[1,3]<stdout>:[Epoch 3 Batch 20/40] loss: 0.6362[0m
[34m[1,13]<stdout>:[Epoch 3 Batch 20/40] loss: 1.1992[0m
[34m[1,9]<stdout>:[Epoch 3 Batch 20/40] loss: 1.2190[0m
[34m[1,1]<stdout>:[Epoch 3 Batch 20/40] loss: 1.2849[0m
[34m[1,4]<stdout>:[Epoch 3 Batch 20/40] loss: 1.4154[0m
[34m[1,8]<stdout>:[Epoch 3 Batch 30/40] loss: 1.1737[0m
[34m[1,

[34m[1,8]<stdout>:[Epoch 5 Batch 30/40] loss: 0.7209[0m
[34m[1,12]<stdout>:[Epoch 5 Batch 30/40] loss: 1.0202[0m
[34m[1,10]<stdout>:[Epoch 5 Batch 30/40] loss: 0.9606[0m
[34m[1,15]<stdout>:[Epoch 5 Batch 30/40] loss: 1.0598[0m
[34m[1,13]<stdout>:[Epoch 5 Batch 30/40] loss: 0.7510[0m
[34m[1,14]<stdout>:[Epoch 5 Batch 30/40] loss: 1.0282[0m
[34m[1,11]<stdout>:[Epoch 5 Batch 30/40] loss: 0.8249[0m
[34m[1,9]<stdout>:[Epoch 5 Batch 30/40] loss: 1.3329[0m
[34m[1,5]<stdout>:[Epoch 5 Batch 30/40] loss: 1.2701[0m
[34m[1,4]<stdout>:[Epoch 5 Batch 30/40] loss: 1.1590[0m
[34m[1,2]<stdout>:[Epoch 5 Batch 30/40] loss: 0.7948[0m
[34m[1,1]<stdout>:[Epoch 5 Batch 30/40] loss: 1.3051[0m
[34m[1,0]<stdout>:[Epoch 5 Batch 30/40] loss: 1.0578[0m
[34m[1,3]<stdout>:[Epoch 5 Batch 30/40] loss: 1.1002[0m
[34m[1,6]<stdout>:[Epoch 5 Batch 30/40] loss: 1.0048[0m
[34m[1,7]<stdout>:[Epoch 5 Batch 30/40] loss: 0.5489[0m
[34m[1,9]<stdout>:[Epoch 5 Batch 40/40] loss: 1.1066[0m
[34m[1,

[34m[1,0]<stdout>:[Epoch 7] trn_loss: 1.2584, vld_loss: 1.1216, score: 0.9253, score_each: [0.9136, 0.9595, 0.9145][0m
[34m[1,0]<stdout>:Model successfully saved at: /opt/ml/model[0m
[34m[1,8]<stdout>:[Epoch 8 Batch 10/40] loss: 0.9226[0m
[34m[1,10]<stdout>:[Epoch 8 Batch 10/40] loss: 0.9002[0m
[34m[1,9]<stdout>:[Epoch 8 Batch 10/40] loss: 0.8841[0m
[34m[1,15]<stdout>:[Epoch 8 Batch 10/40] loss: 0.9265[0m
[34m[1,13]<stdout>:[Epoch 8 Batch 10/40] loss: 1.2898[0m
[34m[1,12]<stdout>:[Epoch 8 Batch 10/40] loss: 1.0144[0m
[34m[1,14]<stdout>:[Epoch 8 Batch 10/40] loss: 1.2343[0m
[34m[1,11]<stdout>:[Epoch 8 Batch 10/40] loss: 1.2575[0m
[34m[1,1]<stdout>:[Epoch 8 Batch 10/40] loss: 0.6322[0m
[34m[1,4]<stdout>:[Epoch 8 Batch 10/40] loss: 0.8864[0m
[34m[1,2]<stdout>:[Epoch 8 Batch 10/40] loss: 0.9004[0m
[34m[1,3]<stdout>:[Epoch 8 Batch 10/40] loss: 1.0486[0m
[34m[1,0]<stdout>:[Epoch 8 Batch 10/40] loss: 1.0554[0m
[34m[1,6]<stdout>:[Epoch 8 Batch 10/40] loss: 1.0144


2021-03-02 06:31:20 Uploading - Uploading generated training model[35m2021-03-02 06:31:11,652 sagemaker-training-toolkit INFO     MPI process finished.[0m
[35m2021-03-02 06:31:11,653 sagemaker-training-toolkit INFO     Reporting training SUCCESS[0m

2021-03-02 06:31:40 Completed - Training job completed
ProfilerReport-1614665057: IssuesFound
Training seconds: 2786
Billable seconds: 2786
CPU times: user 4.51 s, sys: 272 ms, total: 4.78 s
Wall time: 27min 45s


In [6]:
s3_model_dir = estimator.model_data.replace('model.tar.gz', '')
print(s3_model_dir)
!aws s3 ls {s3_model_dir}

s3://sagemaker-us-east-1-143656149352/pytorch-smdataparallel-bangali-handwrit-2021-03-02-06-04-16-979/output/
2021-03-02 06:31:22   41940706 model.tar.gz


<br>

## 3. Getting Model artifacts
---

훈련이 완료된 모델 아티팩트를 로컬(jupyter notebook 인스턴스 or 온프레미스)로 복사합니다.


In [7]:
local_model_dir = './model_ddp'
!rm -rf $local_model_dir

In [8]:
import json, os

if not os.path.exists(local_model_dir):
    os.makedirs(local_model_dir)

!aws s3 cp {s3_model_dir}model.tar.gz {local_model_dir}/model.tar.gz
!tar -xzf {local_model_dir}/model.tar.gz -C {local_model_dir}

download: s3://sagemaker-us-east-1-143656149352/pytorch-smdataparallel-bangali-handwrit-2021-03-02-06-04-16-979/output/model.tar.gz to model_ddp/model.tar.gz
