# 1. SageMaker Training job execution for GPT-NEOX
---

본 모듈에서는 Amzaon SageMaker API을 효과적으로 이용하기 위해 multigpu-distributed 학습을 위한 PyTorch 프레임워크 자체 구현만으로 모델 훈련을 수행해 봅니다.

In [None]:
install_needed = True  # should only be True once
# install_needed = False

In [None]:
import sys
import IPython

if install_needed:
    print("installing deps and restarting kernel")
#     !{sys.executable} -m pip install -U split-folders tqdm albumentations crc32c wget
    !{sys.executable} -m pip install 'sagemaker[local]' --upgrade
    !{sys.executable} -m pip install -U smdebug sagemaker-experiments
    !{sys.executable} -m pip install -U sagemaker
    !{sys.executable} -m pip install -U lm_dataformat ftfy deepspeed
    !/bin/bash ./local/local_change_setting.sh
    IPython.Application.instance().kernel.do_shutdown(True)

## 2. 환경 설정

<p>Sagemaker 학습에 필요한 기본적인 package를 import 합니다. </p>
<p>boto3는 HTTP API 호출을 숨기는 편한 추상화 모델을 가지고 있고, Amazon EC2 인스턴스 및 S3 버켓과 같은 AWS 리소스와 동작하는 파이선 클래스를 제공합니다. </p>
<p>sagemaker python sdk는 Amazon SageMaker에서 기계 학습 모델을 교육 및 배포하기 위한 오픈 소스 라이브러리입니다.</p>

In [None]:
import joblib
import matplotlib.pyplot as plt
import sagemaker
# import splitfolders

import datetime
import glob
import os
import time
import warnings

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial

# import wget
# import tarfile
import shutil

import boto3
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision

# from tqdm import tqdm
from time import strftime
from PIL import Image
from torch.utils.data import Dataset
from torchvision import datasets, transforms

from sagemaker import get_execution_role
from sagemaker.pytorch import PyTorch

from sagemaker.debugger import (Rule,
                                rule_configs,
                                ProfilerConfig, 
                                FrameworkProfile, 
                                DetailedProfilingConfig, 
                                DataloaderProfilingConfig, 
                                PythonProfilingConfig)

warnings.filterwarnings('ignore')
%config InlineBackend.figure_format = 'retina'

In [None]:
role = get_execution_role()

In [None]:
sagemaker.__version__

In [None]:
# %%bash
# rm -rf ./data
# cd ./gpt-neox
# python prepare_data.py -d ../data
# aws s3 sync ../data s3://dataset-us-west-2-cyj/gpt_neox_dataset/

In [None]:
def create_experiment(experiment_name):
    try:
        sm_experiment = Experiment.load(experiment_name)
    except:
        sm_experiment = Experiment.create(experiment_name=experiment_name)

In [None]:
def create_trial(experiment_name, i_type, i_cnt):
    create_date = strftime("%m%d-%H%M%s")

    i_tag = 'test'
    if i_type == 'ml.p4d.24xlarge':
        i_tag = 'p4d'    
        
    trial = "-".join([i_tag,str(i_cnt)])
       
    sm_trial = Trial.create(trial_name=f'{experiment_name}-{trial}-{create_date}',
                            experiment_name=experiment_name)

    job_name = f'{sm_trial.trial_name}'
    return job_name

In [None]:
bucket = 'gpt-neox-sagemaker-220308'
code_location = f's3://{bucket}/backup_codes'
output_path = f's3://{bucket}/gpt_neox_output' 
s3_log_path = f's3://{bucket}/logs'

In [None]:
metric_definitions=[
#      {'Name': 'train:lr', 'Regex': 'lr - (.*?),'},
#      {'Name': 'train:Loss', 'Regex': 'loss -(.*?),'},
]

In [None]:
hyperparameters = {
    "d": "configs",
    "conf_file" : "small.yml sm_local_setup.yml"
    }


experiment_name = 'gpt-neox-exp1'
instance_type = 'ml.p4d.24xlarge'  # 'ml.p3.16xlarge', 'ml.p3dn.24xlarge', 'ml.p4d.24xlarge', 'local_gpu'
# instance_type = 'local_gpu'
instance_count = 1
do_spot_training = False
max_wait = None
max_run = 4*60*60

In [None]:
if instance_type =='local_gpu':
    from sagemaker.local import LocalSession
    from pathlib import Path

    sagemaker_session = LocalSession()
    sagemaker_session.config = {'local': {'local_code': True}}
    s3_data_path = f'file://{Path.cwd()}/data'
    source_dir = f'{Path.cwd()}/gpt-neox'
    checkpoint_s3_uri = None
else:
    sess = boto3.Session()
    sagemaker_session = sagemaker.Session()
    sm = sess.client('sagemaker')
    s3_data_path = 's3://dataset-us-west-2-cyj/gpt_neox_dataset'
    source_dir = 'gpt-neox'
    checkpoint_s3_uri = f's3://{bucket}/checkpoints'

In [None]:
distribution = {}
flag = 'deepspeed'

if flag == 'smddp':
    distribution["smdistributed"]={ 
                        "dataparallel": {
                            "enabled": True
                        }
                }

elif flag == 'smmp':
    distribution['smdistributed'] = {
        "modelparallel": {
            "enabled":True,
            "parameters": {
                "ddp": True,
                "tensor_parallel_degree": hyperparameters['tensor_parallel_degree'],
                # partitions is a required param in the current SM SDK so it needs to be passed,
                # these two map to the same config
                "partitions": hyperparameters['pipeline_parallel_degree'],
                "shard_optimizer_state": hyperparameters['shard_optimizer_state'] > 0,
                "prescaled_batch": hyperparameters['prescaled_batch'] > 0,
                "fp16_params": hyperparameters['fp16'] > 0,
                "optimize": hyperparameters['optimize'],
                "auto_partition": True,
                "default_partition": 0,                        
                "fp16_params": hyperparameters['fp16'] > 0,
                "optimize": hyperparameters['optimize'],
            }
        }   
    }
    mpioptions = "-x NCCL_DEBUG=WARN -x SMDEBUG_LOG_LEVEL=ERROR "
    mpioptions += "-x SMP_DISABLE_D2D=1 -x SMP_D2D_GPU_BUFFER_SIZE_BYTES=1 -x SMP_NCCL_THROTTLE_LIMIT=1 "
    mpioptions += "-x FI_EFA_USE_DEVICE_RDMA=1 -x FI_PROVIDER=efa -x RDMAV_FORK_SAFE=1"

    metric_definitions = [{"Name": "base_metric", "Regex": "<><><><><><>"}] # Add your custom metric definitions

    distribution["mpi"]={
        "enabled": True,
        "processes_per_host": 8, # Pick your processes_per_host
        "custom_mpi_options": mpioptions      
    }
else:
    distribution["mpi"]={"enabled": True}

if do_spot_training:
    max_wait = max_run

In [None]:
estimator = PyTorch(
    entry_point='sm_train.py',
    source_dir=source_dir,
    role=role,
    sagemaker_session=sagemaker_session,
    framework_version='1.9',
    py_version='py38',
    instance_count=instance_count,
    instance_type=instance_type,
#     volume_size=1024,
    code_location = code_location,
    output_path=output_path,
    hyperparameters=hyperparameters,
    distribution=distribution,
    disable_profiler=True,
    debugger_hook_config=False,
    metric_definitions=metric_definitions,
#     rules=rules,
    max_run=max_run,
    use_spot_instances=do_spot_training,  # spot instance 활용
    max_wait=max_wait,
    checkpoint_s3_uri=checkpoint_s3_uri
)

In [None]:
!sudo rm -rf ./gpt-neox/core*

In [None]:
%%time

create_experiment(experiment_name)
job_name = create_trial(experiment_name, instance_type, instance_count)

estimator.fit(
    inputs={'training': s3_data_path}, 
    job_name=job_name,
    experiment_config={
      'TrialName': job_name,
      'TrialComponentDisplayName': job_name,
    },
    wait=False,
)

In [None]:
job_name=estimator.latest_training_job.name

In [None]:
sagemaker_session.logs_for_job(job_name=job_name, wait=True)