# 1. SageMaker Training for Diffusion model
---

본 모듈에서는 Amzaon SageMaker API을 효과적으로 이용하기 위해 multigpu-distributed 학습을 위한 PyTorch 프레임워크 자체 구현만으로 모델 훈련을 수행해 봅니다.

In [1]:
install_needed = True  # should only be True once
# install_needed = False

In [2]:
import sys
import IPython

if install_needed:
    print("installing deps and restarting kernel")
#     !{sys.executable} -m pip install -U split-folders tqdm albumentations crc32c wget
    !{sys.executable} -m pip install 'sagemaker[local]' --upgrade
    !{sys.executable} -m pip install -U bokeh smdebug sagemaker-experiments gdown
    !{sys.executable} -m pip install -U sagemaker torch torchvision
    !/bin/bash ./local/local_mode_setup.sh
    IPython.Application.instance().kernel.do_shutdown(True)

installing deps and restarting kernel
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/pytorch_p36/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/pytorch_p36/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/pytorch_p36/bin/python -m pip install --upgrade pip' command.[0m
nvidia-docker2 already installed. We are good to go!
Stopping docker: [60G[[0;32m  OK  [0;39m]
Starting docker:	.[60G[[0;32m  OK  [0;39m]
SageMaker instance route table setup is ok. We are good to go.
SageMaker instance routing for Docker is ok. We are good to go!


## 2. 환경 설정

<p>Sagemaker 학습에 필요한 기본적인 package를 import 합니다. </p>
<p>boto3는 HTTP API 호출을 숨기는 편한 추상화 모델을 가지고 있고, Amazon EC2 인스턴스 및 S3 버켓과 같은 AWS 리소스와 동작하는 파이선 클래스를 제공합니다. </p>
<p>sagemaker python sdk는 Amazon SageMaker에서 기계 학습 모델을 교육 및 배포하기 위한 오픈 소스 라이브러리입니다.</p>

In [2]:
import joblib
import matplotlib.pyplot as plt
import sagemaker
# import splitfolders

import datetime
import glob
import os
import time
import warnings

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial

# import wget
# import tarfile
import shutil

import boto3
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision

# from tqdm import tqdm
from time import strftime
from PIL import Image
from torch.utils.data import Dataset
from torchvision import datasets, transforms

from sagemaker import get_execution_role
from sagemaker.pytorch import PyTorch

from sagemaker.debugger import (Rule,
                                rule_configs,
                                ProfilerConfig, 
                                FrameworkProfile, 
                                DetailedProfilingConfig, 
                                DataloaderProfilingConfig, 
                                PythonProfilingConfig)

warnings.filterwarnings('ignore')
%config InlineBackend.figure_format = 'retina'

In [3]:
role = get_execution_role()

In [4]:
sagemaker.__version__

'2.63.2'

In [5]:
def create_experiment(experiment_name):
    try:
        sm_experiment = Experiment.load(experiment_name)
    except:
        sm_experiment = Experiment.create(experiment_name=experiment_name,
                                          tags=[
                                              {
                                                  'Key': 'multigpu',
                                                  'Value': 'yes'
                                              },
                                              {
                                                  'Key': 'multinode',
                                                  'Value': 'yes'
                                              },
                                          ])

In [6]:
def create_trial(experiment_name, set_param, i_type, i_cnt, spot):
    create_date = strftime("%m%d-%H%M%s")
    
    if set_param['sagemakerdp']:
        algo = 'sdp'
#     elif set_param['sagemakermp']:
#         algo = 'smp'
    else:
        algo = 'ds'
    
    spot = 's' if spot else 'd'
    i_tag = 'test'
    if i_type == 'ml.p3.16xlarge':
        i_tag = 'p3'
    elif i_type == 'ml.p3dn.24xlarge':
        i_tag = 'p3dn'
    elif i_type == 'ml.p4d.24xlarge':
        i_tag = 'p4d'    
        
    trial = "-".join([i_tag,str(i_cnt),algo, spot])
       
    sm_trial = Trial.create(trial_name=f'{experiment_name}-{trial}-{create_date}',
                            experiment_name=experiment_name)

    job_name = f'{sm_trial.trial_name}'
    return job_name

In [7]:
bucket = 'diffusion-sagemaker-211011'
code_location = f's3://{bucket}/sm_codes'
output_path = f's3://{bucket}/poc_diffusion/output' 
s3_log_path = f's3://{bucket}/tf_logs' 

In [8]:
metric_definitions=[
     {'Name': 'train:lr', 'Regex': 'lr - (.*?),'},
     {'Name': 'train:Loss', 'Regex': 'loss -(.*?),'},
]

In [9]:
from sagemaker.debugger import Rule, ProfilerRule, rule_configs

rules=[ 
    Rule.sagemaker(rule_configs.loss_not_decreasing()),
    Rule.sagemaker(rule_configs.overfit()),
    ProfilerRule.sagemaker(rule_configs.ProfilerReport()),
]

In [290]:
# hyperparameters = {
#     'schedule_sampler' : 'uniform',
#     'lr': 1e-4,
#     'weight_decay': 0.0,
#     'lr_anneal_steps' : 100,
#     'batch_size' : 32,
#     'microbatch' : -1,
#     'ema_rate' : '0.9999',
#     'log_interval' : 5,
#     'save_interval' : 5,
# #     'resume_checkpoint' : "/opt/ml/code/resume_ckt/model000100.pt",
#     'use_fp16': True,
#     'fp16_scale_growth' : 1e-3,
#     's3_log_path' : s3_log_path,
#     'sagemakerdp' : True,
#     }


hyperparameters = {
    'attention_resolutions': '32,16,8',
    'class_cond': False,
    'diffusion_steps': 1000,
    'image_size': 128,
    'channel_mult': '1,1,2,4,4',
    'learn_sigma': True,
    'noise_schedule': 'linear',
    'num_channels': 256,
    'num_heads': 4,
    'num_res_blocks': 3,
    'resblock_updown': True,
    'use_fp16': True,
    'use_scale_shift_norm': True,
#     'schedule_sampler' : 'uniform',
    'lr': 1e-4,
    'weight_decay': 0.0,
#     'lr_anneal_steps' : 0,
    'batch_size' : 16,
#     'microbatch' : -1,
    'ema_rate' : '0.9999',
    'log_interval' : 1,
    'save_interval' : 1,
#     'resume_checkpoint' : "/opt/ml/code/resume_ckt/model000100.pt",
#     'fp16_scale_growth' : 1e-3,
    's3_log_path' : s3_log_path,   ### 로그를 위한 s3_log_path 추가
    'sagemakerdp' : True,
    }

# mp_parameters = {
#         'num_microbatches': 16,
#         'num_partitions' : 4,
#         'placement_strategy': 'cluster', # cluster , spread
#         'pipeline': 'interleaved',
#         'optimize': 'speed',
#         'memory_weight': 0.2,
#         'ddp': True,
# }

experiment_name = 'diffusion-poc-exp2'
instance_type = 'ml.p4d.24xlarge'  # 'ml.p3.16xlarge', 'ml.p3dn.24xlarge', 'ml.p4d.24xlarge', 'local_gpu'
# instance_type = 'local_gpu'
instance_count = 2
do_spot_training = False
max_wait = None
max_run = 3*60*60


# !gdown https://drive.google.com/uc?id=1vF8Ht0VThpobtmShD52_INhpIgy6eEXq
# !gdown https://drive.google.com/uc?id=1kaIqFwTLD7Ml3ib9NQpjoUSD4FUD21-I

# !rm -rf dataset
# !mkdir dataset
# !unzip birds.zip -d dataset/
# !tar zxvf CUB_200_2011.tgz -C dataset/

In [291]:
if instance_type =='local_gpu':
    from sagemaker.local import LocalSession
    from pathlib import Path

    sagemaker_session = LocalSession()
    sagemaker_session.config = {'local': {'local_code': True}}
    s3_data_path = 'file:///home/ec2-user/SageMaker/improved-diffusion-sagemaker/datasets/cifar10'
    source_dir = f'{Path.cwd()}/scripts'
    checkpoint_s3_bucket = None
else:
    sess = boto3.Session()
    sagemaker_session = sagemaker.Session()
    sm = sess.client('sagemaker')
    s3_data_path = 's3://dataset-us-west-2-cyj/cifar10'
    source_dir = 'scripts'
    checkpoint_s3_bucket = f's3://{bucket}/checkpoints'

In [292]:
image_uri = None
distribution = None
train_job_name = 'sagemaker'


train_job_name = 'smp-dist'
distribution = {}

if hyperparameters['sagemakerdp']:
    distribution["smdistributed"]={ 
                        "dataparallel": {
                            "enabled": True
                        }
                }

# elif hyperparameters['sagemakermp']:
#     distribution['smdistributed'] = { "modelparallel": {
#                                               "enabled":True,
#                                               "parameters": {
#                                                   "partitions": mp_parameters['num_partitions'],
#                                                   "microbatches": mp_parameters['num_microbatches'],
#                                                   "placement_strategy": mp_parameters['placement_strategy'],
#                                                   "pipeline": mp_parameters['pipeline'],
#                                                   "optimize": mp_parameters['optimize'],
#                                                   "memory_weight": mp_parameters['memory_weight'],
#                                                   "ddp": mp_parameters['ddp'],
#                                               }
#                                           }
#                                       }
#     distribution["mpi"]={
#                         "enabled": True,
#                         "processes_per_host": 8, # Pick your processes_per_host
#                         "custom_mpi_options": "-verbose -x orte_base_help_aggregate=0 -x FI_EFA_USE_DEVICE_RDMA=1 -x FI_PROVIDER=efa " #  -x SMP_SKIP_GRAPH_VALIDATION=1
#                   }

else:
    distribution["mpi"]={
                        "enabled": True,
    #                     "processes_per_host": 8, # Pick your processes_per_host
    #                     "custom_mpi_options": "-verbose -x orte_base_help_aggregate=0 "
                  }

if do_spot_training:
    max_wait = max_run

print("train_job_name : {} \ntrain_instance_type : {} \ntrain_instance_count : {} \nimage_uri : {} \ndistribution : {}".format(train_job_name, instance_type, instance_count, image_uri, distribution))    

train_job_name : smp-dist 
train_instance_type : ml.p4d.24xlarge 
train_instance_count : 2 
image_uri : None 
distribution : {'smdistributed': {'dataparallel': {'enabled': True}}}


In [293]:
image_uri='322537213286.dkr.ecr.us-west-2.amazonaws.com/diffusion-sagemaker-smddp:smddp-1.2.2-pt-1.8.1'

In [302]:
# all input configurations, parameters, and metrics specified in estimator 
# definition are automatically tracked
estimator = PyTorch(
    entry_point='image_train.py',
    source_dir=source_dir,
    role=role,
    sagemaker_session=sagemaker_session,
    framework_version='1.8.1',
    py_version='py36',
    image_uri=image_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size=256,
    code_location = code_location,
    output_path=output_path,
    hyperparameters=hyperparameters,
    distribution=distribution,
    disable_profiler=True,
    debugger_hook_config=False,
    metric_definitions=metric_definitions,
#     rules=rules,
    max_run=max_run,
    use_spot_instances=do_spot_training,  # spot instance 활용
    max_wait=max_wait,
    subnets=['subnet-02e36c042e58264e6'],   ## 	subnet-05c77affac40aa7f3 (2b)  subnet-02e36c042e58264e6 (2c)
    security_group_ids=['sg-0bc738570daec9015'],
    checkpoint_s3_uri=checkpoint_s3_bucket,
    TrainingInputMode='File' ## FastFile
)

#### lustre preload

In [303]:
# ## https://docs.aws.amazon.com/fsx/latest/LustreGuide/preload-file-contents-hsm.html
# # sudo lfs hsm_restore path/to/file
# # sudo lfs hsm_action path/to/file
# !find /home/ec2-user/SageMaker/dstaset-2a -type f -print0 | xargs -0 -n 1 sudo lfs hsm_restore

In [304]:
# Configure FSx Input for your SageMaker Training job

from sagemaker.inputs import FileSystemInput

file_system_directory_path= '/hlz2pbmv/BIRDS'  # '/5n6znbmv'    g4ljfbmv
 
file_system_id='fs-0cd6d9b6c3c7f614e'  # fs-0849611d06d289065  063be12d6ca6d7862

file_system_access_mode='rw'
file_system_type='FSxLustre'
train_fs = FileSystemInput(file_system_id=file_system_id,
                                    file_system_type=file_system_type,
                                    directory_path=file_system_directory_path,
                                    file_system_access_mode=file_system_access_mode)

In [305]:
# Configure FSx Input for your SageMaker Training job

from sagemaker.inputs import FileSystemInput

file_system_directory_path= '/bwh3hbmv/cifar10'  # '/5n6znbmv'    g4ljfbmv
 
file_system_id='fs-0ac78e311f71fd34a'  # fs-0849611d06d289065  063be12d6ca6d7862

file_system_access_mode='rw'
file_system_type='FSxLustre'
train_fs = FileSystemInput(file_system_id=file_system_id,
                                    file_system_type=file_system_type,
                                    directory_path=file_system_directory_path,
                                    file_system_access_mode=file_system_access_mode)

In [None]:
# Configure FSx Input for your SageMaker Training job - cifar10-2

from sagemaker.inputs import FileSystemInput

file_system_directory_path= '/pwa3hbmv/cifar10'  # '/5n6znbmv'    g4ljfbmv
 
file_system_id='fs-0a50fc761273ae496'  # fs-0849611d06d289065  063be12d6ca6d7862

file_system_access_mode='rw'
file_system_type='FSxLustre'
train_fs = FileSystemInput(file_system_id=file_system_id,
                                    file_system_type=file_system_type,
                                    directory_path=file_system_directory_path,
                                    file_system_access_mode=file_system_access_mode)

In [306]:
if instance_type =='local_gpu':
    inputs = s3_data_path
else:
    inputs = train_fs
#     inputs = s3_data_path

In [307]:
%%time
create_experiment(experiment_name)
job_name = create_trial(experiment_name, hyperparameters, instance_type, instance_count, do_spot_training)

# Now associate the estimator with the Experiment and Trial
estimator.fit(
    inputs={'training': inputs}, 
    job_name=job_name,
    experiment_config={
      'TrialName': job_name,
      'TrialComponentDisplayName': job_name,
    },
    wait=False,
)

INFO:sagemaker:Creating training-job with name: diffusion-poc-exp2-p4d-2-sdp-d-1028-10551635418552


CPU times: user 247 ms, sys: 0 ns, total: 247 ms
Wall time: 857 ms


In [308]:
job_name=estimator.latest_training_job.name
# job_name='dalle-poc-exp5-p4d-2-d-0530-12261622377580'
# dalle-poc-exp4-p4d-2-d-0525-03071621912021 --> public
# dalle-poc-exp4-p4d-2-d-0525-03091621912148 --> another private
# job_name='dalle-poc-exp1-p4d-1-sdp-d-1006-13111633525892'

In [311]:
sagemaker_session.logs_for_job(job_name=job_name, wait=True)

2021-10-28 11:03:22 Starting - Preparing the instances for training
2021-10-28 11:03:22 Downloading - Downloading input data....................................................................................................................................
2021-10-28 11:52:58 Training - Downloading the training image............................[34mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[34mbash: no job control in this shell[0m
[34m2021-10-28 11:57:45,493 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training[0m
[34m2021-10-28 11:57:45,575 sagemaker_pytorch_container.training INFO     Block until all host DNS lookups succeed.[0m
[35mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[35mbash: no job control in this shell[0m
[35m2021-10-28 11:57:46,589 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training[0m
[35m2021-10-28 11:57:4

KeyboardInterrupt: 

In [121]:
test={}

In [122]:
test['PMI_RANK'] = "1"
test['OMPI_COMM_WORLD_RANK'] = "3"

In [123]:
    for varname in ["PMI_RANK", "OMPI_COMM_WORLD_RANK"]:
        if varname in test:
            print(int(test[varname]))

1
3


In [95]:
log_suffix = log_suffix + "-rank%03i" % rank
print(log_suffix)

log-rank014


### Clean Up Amazon SageMaker Experiment Resources
- https://docs.aws.amazon.com/sagemaker/latest/dg/experiments-cleanup.html

In [None]:
import boto3
sm = boto3.Session().client('sagemaker')

In [None]:
def cleanup_boto3(experiment_name):
    trials = sm.list_trials(ExperimentName=experiment_name)['TrialSummaries']
    print('TrialNames:')
    for trial in trials:
        trial_name = trial['TrialName']
        print(f"\n{trial_name}")

        components_in_trial = sm.list_trial_components(TrialName=trial_name)
        print('\tTrialComponentNames:')
        for component in components_in_trial['TrialComponentSummaries']:
            component_name = component['TrialComponentName']
            print(f"\t{component_name}")
            sm.disassociate_trial_component(TrialComponentName=component_name, TrialName=trial_name)
            try:
                # comment out to keep trial components
                sm.delete_trial_component(TrialComponentName=component_name)
            except:
                # component is associated with another trial
                continue
            # to prevent throttling
            time.sleep(.5)
        sm.delete_trial(TrialName=trial_name)
    sm.delete_experiment(ExperimentName=experiment_name)
    print(f"\nExperiment {experiment_name} deleted")

In [None]:
# Use experiment name not display name
experiment_name = "dalle-poc-exp4"
cleanup_boto3(experiment_name)

In [None]:
!pip install piexif

In [None]:
import piexif

In [None]:
image_size=256

In [None]:
image_file = '/home/ec2-user/SageMaker/lg-ai-research/dalle-sagemaker-dp-mp/test2.png'

In [None]:
from skimage import io, color

In [None]:
image_transform1 = T.Compose([
    T.ToPILImage(),
    T.RandomResizedCrop(image_size,
                        scale=(0.8, 1.),
                        ratio=(1., 1.)),
    T.ToTensor()
])

In [None]:
try:
    array_img = io.imread(image_file)
    image_tensor = image_transform1(array_img)
except (PIL.UnidentifiedImageError, OSError, ValueError) as corrupt_image_exceptions:
    print(f"An exception occurred trying to load file.")

In [None]:
image_tensor.shape

In [None]:
trans = transforms.ToPILImage()
plt.imshow(trans(image_tensor))

In [None]:
im = Image.open(image_file)
rgb_im = im.convert('RGB')
rgb_im.save('test.jpg')

In [None]:
image_file = '/home/ec2-user/SageMaker/lg-ai-research/dalle-sagemaker-dp-mp/test.jpg'
image_file = '/home/ec2-user/SageMaker/dataset/BIRDS/CUB_200_2011/images/029.American_Crow/American_Crow_0053_25203.jpg'

In [None]:
array_img = PIL.Image.open(image_file)

In [None]:
array_img.info

In [None]:
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [None]:
import PIL

In [None]:
image_transform = T.Compose([
    T.RandomResizedCrop(image_size,
                        scale=(0.8, 1.),
                        ratio=(1., 1.)),
    T.ToTensor()
])

In [None]:
try:
#     piexif.remove(image_file)
    array_img = PIL.Image.open(image_file)
    array_img = array_img.convert('RGB')
    
    image_tensor = image_transform(array_img)
except (PIL.UnidentifiedImageError, OSError, ValueError) as corrupt_image_exceptions:
    print(f"An exception occurred trying to load file.")

In [None]:
image_tensor.shape

In [None]:
trans = transforms.ToPILImage()
plt.imshow(trans(image_tensor))

In [None]:
array_img.info.get("transparency", None)

In [None]:
if array_img.info.get("transparency", None):
    print(f"[transparency] An exception occurred trying to load file.")


In [None]:
array_img = PIL.Image.open(image_file)
            img = self.img_convert(array_img)
        except (PIL.UnidentifiedImageError, OSError) as corrupt_image_exceptions:
            print(f"An exception occurred trying to load file {image_file}.")
            print(f"Skipping index {ind}")
            return self.skip_sample(ind)

        try:
            if img.info.get("transparency", None):

In [359]:
entt = {"additional_framework_parameters":{"sagemaker_distributed_dataparallel_custom_mpi_options":"","sagemaker_distributed_dataparallel_enabled":True,"sagemaker_instance_type":"local_gpu"},"channel_input_dirs":{"training":"/opt/ml/input/data/training"},"current_host":"algo-1-84g0b","framework_module":"sagemaker_pytorch_container.training:main","hosts":["algo-1-84g0b"],"hyperparameters":{"batch_size":1024,"ema_rate":"0.9999","fp16_scale_growth":0.001,"log_interval":5,"lr":0.0001,"lr_anneal_steps":5000,"microbatch":16,"sagemakerdp":True,"save_interval":5,"schedule_sampler":"uniform","use_fp16":False,"weight_decay":0.0},"input_config_dir":"/opt/ml/input/config","input_data_config":{"training":{"TrainingInputMode":"File"}},"input_dir":"/opt/ml/input","is_master":True,"job_name":"diffusion-poc-exp1-test-1-sdp-d-1023-04051634961959","log_level":20,"master_hostname":"algo-1-84g0b","model_dir":"/opt/ml/model","module_dir":"/opt/ml/code","module_name":"image_train","network_interface_name":"eth0","num_cpus":64,"num_gpus":8,"output_data_dir":"/opt/ml/output/data","output_dir":"/opt/ml/output","output_intermediate_dir":"/opt/ml/output/intermediate","resource_config":{"current_host":"algo-1-84g0b","hosts":["algo-1-84g0b"]},"user_entry_point":"image_train.py"}

In [422]:
entt

{'additional_framework_parameters': {'sagemaker_distributed_dataparallel_custom_mpi_options': '',
  'sagemaker_distributed_dataparallel_enabled': True,
  'sagemaker_instance_type': 'local_gpu'},
 'channel_input_dirs': {'training': '/opt/ml/input/data/training'},
 'current_host': 'algo-1-84g0b',
 'framework_module': 'sagemaker_pytorch_container.training:main',
 'hosts': ['algo-1-84g0b'],
 'hyperparameters': {'batch_size': 1024,
  'ema_rate': '0.9999',
  'fp16_scale_growth': 0.001,
  'log_interval': 5,
  'lr': 0.0001,
  'lr_anneal_steps': 5000,
  'microbatch': 16,
  'sagemakerdp': True,
  'save_interval': 5,
  'schedule_sampler': 'uniform',
  'use_fp16': False,
  'weight_decay': 0.0},
 'input_config_dir': '/opt/ml/input/config',
 'input_data_config': {'training': {'TrainingInputMode': 'File'}},
 'input_dir': '/opt/ml/input',
 'is_master': True,
 'job_name': 'diffusion-poc-exp1-test-1-sdp-d-1023-04051634961959',
 'log_level': 20,
 'master_hostname': 'algo-1-84g0b',
 'model_dir': '/opt/ml/

In [357]:
entt['job_name']

'diffusion-poc-exp1-test-1-sdp-d-1023-04051634961959'

In [360]:
import json

In [361]:
json.loads()

<function json.loads(s, *, encoding=None, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw)>