# 1.Multigpu Distributed Training-ScriptMode
---

본 모듈에서는 Amzaon SageMaker API을 효과적으로 이용하기 위해 multigpu-distributed 학습을 위한 PyTorch 프레임워크 자체 구현만으로 모델 훈련을 수행해 봅니다.

In [1]:
install_needed = True  # should only be True once
install_needed = False

In [2]:
import sys
import IPython

if install_needed:
    print("installing deps and restarting kernel")
#     !{sys.executable} -m pip install -U split-folders tqdm albumentations crc32c wget
    !{sys.executable} -m pip install 'sagemaker[local]' --upgrade
    !{sys.executable} -m pip install -U bokeh smdebug sagemaker-experiments
    !{sys.executable} -m pip install -U sagemaker
    !/bin/bash ./local/local_mode_setup.sh
    IPython.Application.instance().kernel.do_shutdown(True)

## 2. 환경 설정

<p>Sagemaker 학습에 필요한 기본적인 package를 import 합니다. </p>
<p>boto3는 HTTP API 호출을 숨기는 편한 추상화 모델을 가지고 있고, Amazon EC2 인스턴스 및 S3 버켓과 같은 AWS 리소스와 동작하는 파이선 클래스를 제공합니다. </p>
<p>sagemaker python sdk는 Amazon SageMaker에서 기계 학습 모델을 교육 및 배포하기 위한 오픈 소스 라이브러리입니다.</p>

In [1]:
import joblib
import matplotlib.pyplot as plt
import sagemaker
# import splitfolders

import datetime
import glob
import os
import time
import warnings

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial

# import wget
# import tarfile
import shutil

import boto3
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision

# from tqdm import tqdm
from time import strftime
from PIL import Image
from torch.utils.data import Dataset
from torchvision import datasets, transforms

from sagemaker import get_execution_role
from sagemaker.pytorch import PyTorch

from sagemaker.debugger import (Rule,
                                rule_configs,
                                ProfilerConfig, 
                                FrameworkProfile, 
                                DetailedProfilingConfig, 
                                DataloaderProfilingConfig, 
                                PythonProfilingConfig)

warnings.filterwarnings('ignore')
%config InlineBackend.figure_format = 'retina'

In [2]:
role = get_execution_role()

In [3]:
sagemaker.__version__

'2.35.0'

In [39]:
def create_experiment(experiment_name):
    try:
        sm_experiment = Experiment.load(experiment_name)
    except:
        sm_experiment = Experiment.create(experiment_name=experiment_name,
                                          tags=[
                                              {
                                                  'Key': 'multigpu',
                                                  'Value': 'yes'
                                              },
                                              {
                                                  'Key': 'multinode',
                                                  'Value': 'yes'
                                              },
                                          ])

In [40]:
def create_trial(experiment_name, set_param, i_type, i_cnt, spot):
    create_date = strftime("%m%d-%H%M")
    
    spot = 's' if spot else 'd'
    i_tag = 'test'
    if i_type == 'ml.p3.16xlarge':
        i_tag = 'p3'
    elif i_type == 'ml.p3dn.24xlarge':
        i_tag = 'p3dn'
    elif i_type == 'ml.p4d.24xlarge':
        i_tag = 'p4d'    
        
    trial = "-".join([i_tag,str(i_cnt),spot])
       
    sm_trial = Trial.create(trial_name=f'{experiment_name}-{trial}-{create_date}',
                            experiment_name=experiment_name)

    job_name = f'{sm_trial.trial_name}'
    return job_name

In [41]:
bucket = 'bucket-exp-dalle-210410'
code_location = f's3://{bucket}/sm_codes'
output_path = f's3://{bucket}/poc_dalle/output/' 

In [42]:
metric_definitions=[
     {'Name': 'train:lr', 'Regex': 'lr - (.*?),'},
     {'Name': 'train:Loss', 'Regex': 'loss -(.*?),'},
]

In [43]:
from sagemaker.debugger import Rule, ProfilerRule, rule_configs

rules=[ 
    Rule.sagemaker(rule_configs.loss_not_decreasing()),
    Rule.sagemaker(rule_configs.overfit()),
    ProfilerRule.sagemaker(rule_configs.ProfilerReport()),
]

In [109]:
hyperparameters = {
        'EPOCHS' : 1,
        'BATCH_SIZE' : 32, # 24
        'LEARNING_RATE' : 1e-3,
        'LR_DECAY_RATE' : 0.98,
        'NUM_TOKENS' : 8192,
        'NUM_LAYERS' : 2,
        'NUM_RESNET_BLOCKS' : 2,
        'SMOOTH_L1_LOSS' : False,
        'EMB_DIM' : 512,
        'HID_DIM' : 256,
        'KL_LOSS_WEIGHT' : 0,
        'STARTING_TEMP' : 1.,
        'TEMP_MIN' : 0.5,
        'ANNEAL_RATE' : 1e-6,
        'NUM_IMAGES_SAVE' : 4,
        'model_parallel': True,  ## False : DeepSpeeds
        'num_microbatches': 8,
        'num-partitions' : 2,
        'placement_strategy': 'spread',
        'pipeline': 'interleaved',
        'optimize': 'speed',
        'ddp': True,
    }

experiment_name = 'dalle-poc-exp1'
instance_type = 'local_gpu'  # 'ml.p3.16xlarge', 'ml.p3dn.24xlarge', 'ml.p4d.24xlarge', 'local_gpu'
instance_count = 1
do_spot_training = False
max_wait = None
max_run = 2*60*60


In [110]:
if instance_type =='local_gpu':
    from sagemaker.local import LocalSession
    from pathlib import Path

    sagemaker_session = LocalSession()
    sagemaker_session.config = {'local': {'local_code': True}}
    s3_data_path = 'file:///home/ec2-user/SageMaker/napkin-Dalle/dataset'
    source_dir = f'{Path.cwd()}/source_code'
else:
    sess = boto3.Session()
    sagemaker_session = sagemaker.Session()
    sm = sess.client('sagemaker')
    bucket_name = 'dataset-cyj-coco-210410'
    s3_data_path = f's3://{bucket_name}/dataset'
    source_dir = 'source_code'


In [111]:
image_uri = None
distribution = None
train_job_name = 'sagemaker'


train_job_name = 'smp-dist'

distribution = {"smdistributed": {
                  "modelparallel": {
                      "enabled":True,
                      "parameters": {
                          "partitions": hyperparameters['num-partitions'],
#                               "microbatches": 8,
#                               "placement_strategy": "spread",
#                               "pipeline": "interleaved",
#                               "optimize": "speed",
#                               "partitions": 1,
#                               "ddp": True,
                      }
                  }
              },
              "mpi": {
                    "enabled": True,
                    "processes_per_host": 2, # Pick your processes_per_host
                    "custom_mpi_options": "-verbose -x orte_base_help_aggregate=0 "
              },
          }


if do_spot_training:
    max_wait = max_run

print("train_job_name : {} \ntrain_instance_type : {} \ntrain_instance_count : {} \nimage_uri : {} \ndistribution : {}".format(train_job_name, instance_type, instance_count, image_uri, distribution))    

train_job_name : smp-dist 
train_instance_type : local_gpu 
train_instance_count : 1 
image_uri : None 
distribution : {'smdistributed': {'modelparallel': {'enabled': True, 'parameters': {'partitions': 2}}}, 'mpi': {'enabled': True, 'processes_per_host': 2, 'custom_mpi_options': '-verbose -x orte_base_help_aggregate=0 '}}


In [112]:
%%time

# all input configurations, parameters, and metrics specified in estimator 
# definition are automatically tracked
estimator = PyTorch(
    entry_point='train_vae.py',
    source_dir=source_dir,
    role=role,
    sagemaker_session=sagemaker_session,
    framework_version='1.7.1',
    py_version='py36',
    instance_count=instance_count,
    instance_type=instance_type,
#     volume_size=1024,
    code_location = code_location,
    output_path=output_path,
    hyperparameters=hyperparameters,
    distribution=distribution,
#     disable_profiler=True,
    metric_definitions=metric_definitions,
    rules=rules,
    max_run=max_run,
    use_spot_instances=do_spot_training,  # spot instance 활용
    max_wait=max_wait,
)

CPU times: user 133 µs, sys: 52 µs, total: 185 µs
Wall time: 190 µs


In [None]:
create_experiment(experiment_name)
job_name = create_trial(experiment_name, hyperparameters, instance_type, instance_count, do_spot_training)

# Now associate the estimator with the Experiment and Trial
estimator.fit(
    inputs={'training': s3_data_path}, 
    job_name=job_name,
    experiment_config={
      'TrialName': job_name,
      'TrialComponentDisplayName': job_name,
    },
    wait=False,
)

INFO:sagemaker:Creating training-job with name: dalle-poc-exp1-test-1-d-0417-0639
INFO:sagemaker.local.local_session:Starting training job
INFO:sagemaker.local.image:No AWS credentials found in session but credentials from EC2 Metadata Service are available.
INFO:sagemaker.local.image:docker compose file: 
networks:
  sagemaker-local:
    name: sagemaker-local
services:
  algo-1-1svdy:
    command: train
    container_name: 5y6zfj3v6k-algo-1-1svdy
    environment:
    - '[Masked]'
    - '[Masked]'
    image: 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.7.1-gpu-py36
    networks:
      sagemaker-local:
        aliases:
        - algo-1-1svdy
    runtime: nvidia
    stdin_open: true
    tty: true
    volumes:
    - /tmp/tmp33k2xzuj/algo-1-1svdy/output/data:/opt/ml/output/data
    - /tmp/tmp33k2xzuj/algo-1-1svdy/output:/opt/ml/output
    - /tmp/tmp33k2xzuj/algo-1-1svdy/input:/opt/ml/input
    - /tmp/tmp33k2xzuj/model:/opt/ml/model
    - /opt/ml/metadata:/opt/ml/metadata

Creating 5y6zfj3v6k-algo-1-1svdy ... 
Creating 5y6zfj3v6k-algo-1-1svdy ... done
Attaching to 5y6zfj3v6k-algo-1-1svdy
[36m5y6zfj3v6k-algo-1-1svdy |[0m 2021-04-17 06:39:52,826 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training
[36m5y6zfj3v6k-algo-1-1svdy |[0m 2021-04-17 06:39:52,907 sagemaker_pytorch_container.training INFO     Block until all host DNS lookups succeed.
[36m5y6zfj3v6k-algo-1-1svdy |[0m 2021-04-17 06:39:52,910 sagemaker_pytorch_container.training INFO     Invoking user training script.
[36m5y6zfj3v6k-algo-1-1svdy |[0m 2021-04-17 06:39:52,911 sagemaker-training-toolkit INFO     Installing module with the following command:
[36m5y6zfj3v6k-algo-1-1svdy |[0m /opt/conda/bin/python3.6 -m pip install . -r requirements.txt
[36m5y6zfj3v6k-algo-1-1svdy |[0m Processing /opt/ml/code
[36m5y6zfj3v6k-algo-1-1svdy |[0m Collecting wandb
[36m5y6zfj3v6k-algo-1-1svdy |[0m   Downloading wandb-0.10.26-py2.py3-none-any.whl (2.1 MB)
[K    

In [None]:
job_name=estimator.latest_training_job.name

<p><strong>Aynchronous</strong>로 진행된 Training job은 아래와 같은 방법으로 진행상황을 실시간으로 확인할 수 있습니다.</p>

In [None]:
sagemaker_session.logs_for_job(job_name=job_name, wait=True)

In [None]:
rule_output_path = estimator.output_path + estimator.latest_training_job.job_name + "/rule-output"
print(f"You will find the profiler report in {rule_output_path}")

In [None]:
!aws s3 ls {rule_output_path}/ProfilerReport/profiler-output/

In [None]:
!aws s3 cp {rule_output_path}/ProfilerReport/profiler-output/ {output_dir}/ProfilerReport/ --recursive

In [None]:
from IPython.core.display import display, HTML

display(HTML('<b>ProfilerReport : <a href="{}profiler-report.html">Profiler Report</a></b>'.format(output_dir+"/ProfilerReport/")))


In [None]:
%store hyperparameters model_dir output_dir artifacts_dir

<p></p>
<p>Amazon SageMaker에서 모든 학습을 완료하였습니다. </p>

In [1]:
%%writefile test_sync.py

def aws_s3_sync(source, destination):
    
    """aws s3 sync in quiet mode and time profile"""
    import time, subprocess
    cmd = ["aws", "s3", "sync", "--quiet", source, destination]
    print(f"Syncing files from {source} to {destination}")
    start_time = time.time()
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    p.wait()
    end_time = time.time()
    print("Time Taken to Sync: ", (end_time-start_time))
    return


def sync_local_checkpoints_to_s3(local_path="/opt/ml/checkpoints", s3_path=os.path.dirname(os.path.dirname(os.getenv('SM_MODULE_DIR', '')))+'/checkpoints'):
    
    """ sample function to sync checkpoints from local path to s3 """

    import boto3, botocore
    #check if local path exists
    if not os.path.exists(local_path):
        raise RuntimeError("Provided local path {local_path} does not exist. Please check")

    #check if s3 bucket exists
    s3 = boto3.resource('s3')
    if 's3://' not in s3_path:
        raise ValueError("Provided s3 path {s3_path} is not valid. Please check")

    s3_bucket = s3_path.replace('s3://','').split('/')[0]
    print(f"S3 Bucket: {s3_bucket}")
    try:
        s3.meta.client.head_bucket(Bucket=s3_bucket)
    except botocore.exceptions.ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == '404':
            raise RuntimeError('S3 bucket does not exist. Please check')
    aws_s3_sync(local_path, s3_path)
    return

def sync_s3_checkpoints_to_local(local_path="/opt/ml/checkpoints", s3_path=os.path.dirname(os.path.dirname(os.getenv('SM_MODULE_DIR', '')))+'/checkpoints'):
    
    """ sample function to sync checkpoints from s3 to local path """

    import boto3, botocore
    #creat if local path does not exists
    if not os.path.exists(local_path):
        print(f"Provided local path {local_path} does not exist. Creating...")
        try:
            os.makedirs(local_path)
        except Exception as e:
            raise RuntimeError(f"failed to create {local_path}")

    #check if s3 bucket exists
    s3 = boto3.resource('s3')
    if 's3://' not in s3_path:
        raise ValueError("Provided s3 path {s3_path} is not valid. Please check")

    s3_bucket = s3_path.replace('s3://','').split('/')[0]
    print(f"S3 Bucket: {s3_bucket}")
    try:
        s3.meta.client.head_bucket(Bucket=s3_bucket)
    except botocore.exceptions.ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == '404':
            raise RuntimeError('S3 bucket does not exist. Please check')
    aws_s3_sync(s3_path, local_path)
    return

Writing test_sync.py


In [None]:
sync_local_checkpoints_to_s3(local_path='/opt/ml/local_checkpoints', s3_path=full_s3_path)