# Amazon SageMaker Model Parallelism - Pipeline Parallelism
---


#### Note

- 이미 기본적인 Hugging Face 용법 및 자연어 처리에 익숙하신 분들은 앞 모듈을 생략하고 이 모듈부터 핸즈온을 시작하셔도 됩니다.
- 이 노트북은 SageMaker 기본 API를 참조하므로, SageMaker Studio, SageMaker 노트북 인스턴스 또는 AWS CLI가 설정된 로컬 시스템에서 실행해야 합니다. SageMaker Studio 또는 SageMaker 노트북 인스턴스를 사용하는 경우 PyTorch 기반 커널을 선택하세요.
- 훈련(Training) job 수행 시 최소 `ml.p3.2xlarge` 이상의 훈련 인스턴스가 필요하며, 분산 훈련 핸즈온은 `ml.p3.16xlarge` 인스턴스를 권장합니다. 만약 인스턴스 사용에 제한이 걸려 있다면 [Request a service quota increase for SageMaker resources](https://docs.aws.amazon.com/sagemaker/latest/dg/regions-quotas.html#service-limit-increase-request-procedure)를 참조하여 인스턴스 제한을 해제해 주세요.

<br>

## 1. Preparation
---

SageMaker 훈련을 위해 전처리된 데이터셋을 S3에 업로드합니다.

In [None]:
%%bash

#!/usr/bin/env bash

echo '{
    "runtimes": {
        "nvidia": {
            "path": "nvidia-container-runtime",
            "runtimeArgs": []
        }
    }
}' > daemon.json

sudo cp daemon.json /etc/docker/daemon.json && rm daemon.json

DAEMON_PATH="/etc/docker"
MEMORY_SIZE=10G

FLAG=$(cat $DAEMON_PATH/daemon.json | jq 'has("data-root")')
# echo $FLAG

if [ "$FLAG" == true ]; then
    echo "Already revised"
else
    echo "Add data-root and default-shm-size=$MEMORY_SIZE"
    sudo cp $DAEMON_PATH/daemon.json $DAEMON_PATH/daemon.json.bak
    sudo cat $DAEMON_PATH/daemon.json.bak | jq '. += {"data-root":"/home/ec2-user/SageMaker/.container/docker","default-shm-size":"'$MEMORY_SIZE'"}' | sudo tee $DAEMON_PATH/daemon.json > /dev/null
    sudo service docker restart
    echo "Docker Restart"
fi

sudo docker info | grep Root

In [None]:
import os
import sys
import logging
import boto3
import botocore
import sagemaker
import time
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

logging.basicConfig(
    level=logging.INFO, 
    format='[{%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)

logging.info(f"sagemaker role arn: {role}")
logging.info(f"sagemaker bucket: {sess.default_bucket()}")
logging.info(f"sagemaker session region: {sess.boto_region_name}")

In [None]:
from datasets import load_dataset
from transformers import AutoTokenizer

# Define the model repo
model_id = 'bert-base-multilingual-cased'

# dataset used
dataset_name = 'nsmc'

# s3 key prefix for the data
s3_prefix = 'datasets/nsmc'

In [None]:
# load dataset
train_dataset, eval_dataset = load_dataset(dataset_name, split=['train', 'test'])

num_samples_for_debug = 2000
train_dataset = train_dataset.shuffle(seed=42).select(range(num_samples_for_debug))
eval_dataset = eval_dataset.shuffle(seed=42).select(range(num_samples_for_debug))

logging.info(f" loaded train_dataset length is: {len(train_dataset)}")
logging.info(f" loaded eval_dataset length is: {len(eval_dataset)}")
logging.info(train_dataset[0])

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_id)

def tokenize(batch):
    return tokenizer(batch['document'], padding='max_length', max_length=128, truncation=True)

# tokenize dataset
train_dataset = train_dataset.map(tokenize, batched=True, remove_columns=['id', 'document'])
eval_dataset = eval_dataset.map(tokenize, batched=True, remove_columns=['id', 'document'])

# set format for pytorch
train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])
eval_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])

train_dataset = train_dataset.rename_column("label", "labels")
eval_dataset = eval_dataset.rename_column("label", "labels")

In [None]:
train_dir = 'train'
eval_dir = 'eval'
!rm -rf {train_dir} {eval_dir}

os.makedirs(train_dir, exist_ok=True)
os.makedirs(eval_dir, exist_ok=True) 

if not os.listdir(train_dir):
    train_dataset.save_to_disk(train_dir)
if not os.listdir(eval_dir):
    eval_dataset.save_to_disk(eval_dir)

In [None]:
# save train_dataset to s3
train_input_path = f's3://{sess.default_bucket()}/{s3_prefix}/{train_dir}'
train_dataset.save_to_disk(train_input_path)

# save eval_dataset to s3
eval_input_path = f's3://{sess.default_bucket()}/{s3_prefix}/{eval_dir}'
eval_dataset.save_to_disk(eval_input_path)

<br>

## 2. SageMaker Training (Development Stage)
---
SageMaker에 대한 대표적인 오해가 여전히 많은 분들이 SageMaker 훈련을 위해 소스 코드를 전면적으로 수정해야 한다고 생각합니다. 하지만, 실제로는 별도의 소스 코드 수정 없이 기존 여러분이 사용했던 파이썬 스크립트에 SageMaker 훈련에 필요한 SageMaker 전용 환경 변수들만 추가하면 됩니다.

SageMaker 훈련은 훈련 작업을 호출할 때, 1) 훈련 EC2 인스턴스 프로비저닝 - 2) 컨테이너 구동을 위한 도커 이미지 및 훈련 데이터 다운로드 - 3) 컨테이너 구동 - 4) 컨테이너 환경에서 훈련 수행 - 5) 컨테이너 환경에서 S3의 특정 버킷에 저장 - 6) 훈련 인스턴스 종료로 구성됩니다. 따라서, 훈련 수행 로직은 아래 예시와 같이 기존 개발 환경과 동일합니다.

`/opt/conda/bin/python train_hf.py --num_epochs 5 --train_batch_size 32 ...`

이 과정에서 컨테이너 환경에 필요한 환경 변수(예: 모델 경로, 훈련 데이터 경로) 들은 사전에 지정되어 있으며, 이 환경 변수들이 설정되어 있어야 훈련에 필요한 파일들의 경로를 인식할 수 있습니다. 대표적인 환경 변수들에 대한 자세한 내용은 https://github.com/aws/sagemaker-containers#important-environment-variables 을 참조하세요.


### SageMaker Pipeline Parallelism

In [None]:
#instance_type = 'ml.p3.16xlarge'
instance_type = 'local_gpu'
num_gpus = 8
instance_count = 1
batch_size = 32

if instance_type in ['local', 'local_gpu']:
    from sagemaker.local import LocalSession
    sagemaker_session = LocalSession()
    sagemaker_session.config = {'local': {'local_code': True}}
else:
    sagemaker_session = sagemaker.session.Session()
    
# hyperparameters, which are passed into the training job
hyperparameters = {
    'num_epochs': 2,                    # number of training epochs
    'seed': 42,                         # seed
    'train_batch_size': batch_size,     # batch size for training
    'eval_batch_size': batch_size*2,    # batch size for evaluation
    'warmup_steps': 0,                  # warmup ste|||ps
    'learning_rate': 3e-5,              # learning rate used during training
    'use_fp16': True,                   # use FP16?
    'log_interval': 100,                # log interval
    'model_id': model_id                # pre-trained model
}
 
smp_options = {
    "enabled": True,
    "parameters": {
        "optimize": "speed",
        "microbatches": 8,
        "partitions": 2,
        "pipeline": "interleaved",
        "ddp": True,
        #"shard_optimizer_state": True
    }
}

mpi_options = {
    "enabled" : True,                      # Required
    "processes_per_host" : 8,              # Required
    #"custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none "
}
  

In [None]:
from sagemaker.pytorch import PyTorch
image_uri = '763104351884.dkr.ecr.{}.amazonaws.com/pytorch-training:1.12.1-gpu-py38-cu113-ubuntu20.04-sagemaker'.format(region)

# define Training Job Name 
job_name = f'smdmp-nsmc-{time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())}'
chkpt_s3_path = f's3://{sess.default_bucket()}/{s3_prefix}/native/checkpoints'

# create the Estimator
sm_estimator = PyTorch(
    entry_point           = 'train_smdmp_pipeline.py',  # fine-tuning script used in training jon
    source_dir            = './scripts',        # directory where fine-tuning script is stored
      image_uri = image_uri,
    instance_type         = instance_type,      # instances type used for the training job
    instance_count        = instance_count,     # the number of instances used for training
    base_job_name         = job_name,           # the name of the training job
    role                  = role,               # IAM role used in training job to access AWS ressources, e.g. S3
    sagemaker_session=sagemaker_session,
    py_version            = 'py38',             # the python version used in the training job
    hyperparameters       = hyperparameters,    # the hyperparameter used for running the training job
    distribution={
        "smdistributed": {"modelparallel": smp_options},
        "mpi": mpi_options
    },
    
    disable_profiler     = True,
    debugger_hook_config  = False,
    #keep_alive_period_in_seconds = 20*60     # warm pool    
    #checkpoint_s3_uri     = chkpt_s3_path,
    #checkpoint_local_path ='/opt/ml/checkpoints',  
)

In [None]:
# define a data input dictonary with our uploaded s3 uris
data = {
    'train': train_input_path,
    'eval': eval_input_path
}

# starting the train job with our uploaded datasets as input
sm_estimator.fit(data, wait=False)
train_job_name = sm_estimator.latest_training_job.job_name

In [None]:
from IPython.core.display import display, HTML

def make_console_link(region, train_job_name, train_task='[Training]'):
    train_job_link = f'<b> {train_task} Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={region}#/jobs/{train_job_name}">Training Job</a></b>'   
    cloudwatch_link = f'<b> {train_task} Review <a target="blank" href="https://console.aws.amazon.com/cloudwatch/home?region={region}#logStream:group=/aws/sagemaker/TrainingJobs;prefix={train_job_name};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a></b>'
    return train_job_link, cloudwatch_link  
        
train_job_link, cloudwatch_link = make_console_link(region, train_job_name, '[SageMaker Model Parallelism (Pipeline Parallelism) Training]')

display(HTML(train_job_link))
display(HTML(cloudwatch_link))