# DeepSpeed on Amazon SageMaker
---



#### Note

- 이미 기본적인 Hugging Face 용법 및 자연어 처리에 익숙하신 분들은 앞 모듈을 생략하고 이 모듈부터 핸즈온을 시작하셔도 됩니다.
- 이 노트북은 SageMaker 기본 API를 참조하므로, SageMaker Studio, SageMaker 노트북 인스턴스 또는 AWS CLI가 설정된 로컬 시스템에서 실행해야 합니다. SageMaker Studio 또는 SageMaker 노트북 인스턴스를 사용하는 경우 PyTorch 기반 커널을 선택하세요.
- 훈련(Training) job 수행 시 최소 ml.p3.2xlarge 이상의 훈련 인스턴스가 필요하며, 분산 훈련 핸즈온은 `ml.p3.16xlarge` 인스턴스를 권장합니다. 만약 인스턴스 사용에 제한이 걸려 있다면 [Request a service quota increase for SageMaker resources](https://docs.aws.amazon.com/sagemaker/latest/dg/regions-quotas.html#service-limit-increase-request-procedure)를 참조하여 인스턴스 제한을 해제해 주세요.

<br>

## 1. Preparation
---

SageMaker 훈련을 위해 전처리된 데이터셋을 S3에 업로드합니다.

In [5]:
install_needed = True
install_needed = False

if install_needed:
    !pip install -r scripts/requirements.txt

print("## requirements.txt: ")
! cat scripts/requirements.txt
print("## installed packages ")
! pip list | grep -E "datasets|transformers|fsspec|evaluate|deepspeed|s3fs|boto3|sagemaker|scikit-learn"

## requirements.txt: 
datasets==2.14.6 
transformers==4.30.2
fsspec==2023.9.2
evaluate==0.4.0
deepspeed==0.14.2
s3fs==2024.5.0
boto3==1.34.109 
sagemaker==2.221.0 
scikit-learn==1.4.2
## installed packages 
boto3                     1.34.109
datasets                  2.14.6
deepspeed                 0.14.2
evaluate                  0.4.0
fsspec                    2023.9.2
s3fs                      2024.5.0
sagemaker                 2.221.0
scikit-learn              1.4.2
transformers              4.30.2


In [6]:
import os
import sys
import logging
import boto3
import botocore
import sagemaker
import time
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

logging.basicConfig(
    level=logging.INFO, 
    format='[{%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)

logging.info(f"sagemaker role arn: {role}")
logging.info(f"sagemaker bucket: {sess.default_bucket()}")
logging.info(f"sagemaker session region: {sess.boto_region_name}")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/SageMaker/.xdg/config/sagemaker/config.yaml
[{2147392357.py:28} INFO - sagemaker role arn: arn:aws:iam::057716757052:role/dt2gsmoon
[{2147392357.py:29} INFO - sagemaker bucket: sagemaker-us-east-1-057716757052
[{2147392357.py:30} INFO - sagemaker session region: us-east-1


In [7]:
from datasets import load_dataset
from transformers import AutoTokenizer

# Define the model repo
model_id = 'bert-base-multilingual-cased'

# dataset used
dataset_name = 'nsmc'

# s3 key prefix for the data
s3_prefix = 'datasets/nsmc'

  from .autonotebook import tqdm as notebook_tqdm


In [8]:
# load dataset
train_dataset, eval_dataset = load_dataset(dataset_name, split=['train', 'test'])

num_samples_for_debug = 2000
train_dataset = train_dataset.shuffle(seed=42).select(range(num_samples_for_debug))
eval_dataset = eval_dataset.shuffle(seed=42).select(range(num_samples_for_debug))

logging.info(f" loaded train_dataset length is: {len(train_dataset)}")
logging.info(f" loaded eval_dataset length is: {len(eval_dataset)}")
logging.info(train_dataset[0])

[{3407595512.py:8} INFO -  loaded train_dataset length is: 2000
[{3407595512.py:9} INFO -  loaded eval_dataset length is: 2000
[{3407595512.py:10} INFO - {'id': '10020916', 'document': 'For Carl.칼 세이건으로 시작해서 칼 세이건으로 끝난다.', 'label': 1}


In [9]:
tokenizer = AutoTokenizer.from_pretrained(model_id)

def tokenize(batch):
    return tokenizer(batch['document'], padding='max_length', max_length=128, truncation=True)

# tokenize dataset
train_dataset = train_dataset.map(tokenize, batched=True, remove_columns=['id', 'document'])
eval_dataset = eval_dataset.map(tokenize, batched=True, remove_columns=['id', 'document'])

# set format for pytorch
train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])
eval_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])

train_dataset = train_dataset.rename_column("label", "labels")
eval_dataset = eval_dataset.rename_column("label", "labels")



In [10]:
train_dir = 'train'
eval_dir = 'eval'
!rm -rf {train_dir} {eval_dir}

os.makedirs(train_dir, exist_ok=True)
os.makedirs(eval_dir, exist_ok=True) 

if not os.listdir(train_dir):
    train_dataset.save_to_disk(train_dir)
if not os.listdir(eval_dir):
    eval_dataset.save_to_disk(eval_dir)

Saving the dataset (1/1 shards): 100%|██████████| 2000/2000 [00:00<00:00, 303693.00 examples/s]
Saving the dataset (1/1 shards): 100%|██████████| 2000/2000 [00:00<00:00, 430715.14 examples/s]


In [11]:
# save train_dataset to s3
train_input_path = f's3://{sess.default_bucket()}/{s3_prefix}/{train_dir}'
train_dataset.save_to_disk(train_input_path)

# save eval_dataset to s3
eval_input_path = f's3://{sess.default_bucket()}/{s3_prefix}/{eval_dir}'
eval_dataset.save_to_disk(eval_input_path)

[{credentials.py:550} INFO - Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


Saving the dataset (1/1 shards): 100%|██████████| 2000/2000 [00:00<00:00, 20804.51 examples/s]
Saving the dataset (1/1 shards): 100%|██████████| 2000/2000 [00:00<00:00, 33585.60 examples/s]


<br>

## 2. Debugging (Development Stage)
---

SageMaker에서 훈련을 수행하기 전에 먼저 로컬 개발 환경에서 모델 훈련 코드를 개발하고 디버깅해야 합니다. SageMaker 노트북 인스턴스에서 작업하는 경우 GPU가 탑재된 인스턴스(p-family, g-family)를 사용하셔야 합니다.

### DeepSpeed 디버깅

In [12]:
# %%time

# TRAIN_DEEPSPEED_CMD = f"""cd scripts && deepspeed train_deepspeed.py \
# """

# print(f'Running command: \n{TRAIN_DEEPSPEED_CMD}')
# ! {TRAIN_DEEPSPEED_CMD}

<br>

## 3. SageMaker Training (Development Stage)
---
SageMaker에 대한 대표적인 오해가 여전히 많은 분들이 SageMaker 훈련을 위해 소스 코드를 전면적으로 수정해야 한다고 생각합니다. 하지만, 실제로는 별도의 소스 코드 수정 없이 기존 여러분이 사용했던 파이썬 스크립트에 SageMaker 훈련에 필요한 SageMaker 전용 환경 변수들만 추가하면 됩니다.

SageMaker 훈련은 훈련 작업을 호출할 때, 1) 훈련 EC2 인스턴스 프로비저닝 - 2) 컨테이너 구동을 위한 도커 이미지 및 훈련 데이터 다운로드 - 3) 컨테이너 구동 - 4) 컨테이너 환경에서 훈련 수행 - 5) 컨테이너 환경에서 S3의 특정 버킷에 저장 - 6) 훈련 인스턴스 종료로 구성됩니다. 따라서, 훈련 수행 로직은 아래 예시와 같이 기존 개발 환경과 동일합니다.

`/opt/conda/bin/python train_hf.py --num_epochs 5 --train_batch_size 32 ...`

이 과정에서 컨테이너 환경에 필요한 환경 변수(예: 모델 경로, 훈련 데이터 경로) 들은 사전에 지정되어 있으며, 이 환경 변수들이 설정되어 있어야 훈련에 필요한 파일들의 경로를 인식할 수 있습니다. 대표적인 환경 변수들에 대한 자세한 내용은 https://github.com/aws/sagemaker-containers#important-environment-variables 을 참조하세요.


### DeepSpeed migration


SageMaker에서 DeepSpeed를 사용하려면, `deepspeed` 커맨드가 아닌 `mpirun` 커맨드를 사용해야 합니다. `mpirun`에 대한 세부 파라미터는 SageMaker Estimator 호출 시 `distribution = {"mpi": mpi_options}`로 설정하시면 되며, `deepspeed.init_distributed(...)`는 호출할 필요가 없습니다.

```python
if 'WORLD_SIZE' in os.environ:
    # Environment variables set by torch.distributed.launch or torchrun
    world_size = int(os.environ['WORLD_SIZE'])
    rank = int(os.environ['RANK'])
    local_rank = int(os.environ['LOCAL_RANK'])
elif 'OMPI_COMM_WORLD_SIZE' in os.environ:
    # Environment variables set by mpirun 
    world_size = int(os.environ['OMPI_COMM_WORLD_SIZE'])
    rank = int(os.environ['OMPI_COMM_WORLD_RANK'])
    local_rank = int(os.environ['OMPI_COMM_WORLD_LOCAL_RANK'])
else:
    sys.exit("Can't find the evironment variables for local rank")

dist.init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(local_rank)

# SageMaker Training does not need to call deepspeed.init_distributed()
if not 'SM_CHANNEL' in os.environ:
    deepspeed.init_distributed("nccl")
  
```

In [13]:
instance_type = 'ml.p3.16xlarge'
#instance_type = 'local_gpu'
num_gpus = 8
instance_count = 1
batch_size = 32

if instance_type in ['local', 'local_gpu']:
    from sagemaker.local import LocalSession
    sagemaker_session = LocalSession()
    sagemaker_session.config = {'local': {'local_code': True}}
else:
    sagemaker_session = sagemaker.session.Session()
    
# hyperparameters, which are passed into the training job
hyperparameters = {
    'num_epochs': 3,                    # number of training epochs
    'seed': 42,                         # seed
    'train_batch_size': batch_size,     # batch size for training
    'eval_batch_size': batch_size*2,    # batch size for evaluation
    'warmup_steps': 0,                  # warmup steps
    'learning_rate': 3e-5,              # learning rate used during training
    'model_id': model_id                # pre-trained model
}

[{credentials.py:1075} INFO - Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


In [14]:
mpi_options = {
    "enabled" : True,            # Required
    "processes_per_host" : 8,    # Required
    # "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none"
}


In [15]:
from sagemaker.pytorch import PyTorch
#image_uri = f"763104351884.dkr.ecr.{region}.amazonaws.com/huggingface-pytorch-training:2.0.0-transformers4.28.1-gpu-py310-cu118-ubuntu20.04"
image_uri = '763104351884.dkr.ecr.{}.amazonaws.com/pytorch-training:1.12.1-gpu-py38-cu113-ubuntu20.04-sagemaker'.format(region)

# define Training Job Name 
job_name = f'deepspeed-nsmc-{time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())}'
chkpt_s3_path = f's3://{sess.default_bucket()}/{s3_prefix}/native/checkpoints'

# create the Estimator
sm_estimator = PyTorch(
    entry_point           = 'train_deepspeed.py',  # fine-tuning script used in training jon
    source_dir            = './scripts',        # directory where fine-tuning script is stored
      image_uri = image_uri,
    instance_type         = instance_type,      # instances type used for the training job
    instance_count        = instance_count,     # the number of instances used for training
    base_job_name         = job_name,           # the name of the training job
    role                  = role,               # IAM role used in training job to access AWS ressources, e.g. S3
    sagemaker_session=sagemaker_session,
    py_version            = 'py38',             # the python version used in the training job
    hyperparameters       = hyperparameters,    # the hyperparameter used for running the training job
    distribution          = {"mpi": mpi_options},
    disable_profiler     = True,
    debugger_hook_config  = False,
    #keep_alive_period_in_seconds = 20*60     # warm pool    
    #checkpoint_s3_uri     = chkpt_s3_path,
    #checkpoint_local_path ='/opt/ml/checkpoints',  
)

[{credentials.py:1075} INFO - Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


In [16]:
# define a data input dictonary with our uploaded s3 uris
data = {
    'train': train_input_path,
    'eval': eval_input_path
}

# starting the train job with our uploaded datasets as input
sm_estimator.fit(data, wait=False)
train_job_name = sm_estimator.latest_training_job.job_name

[{session.py:1002} INFO - Creating training-job with name: deepspeed-nsmc-2024-05-21-12-06-35-2024-05-21-12-06-40-420


In [17]:
from IPython.display import display, HTML

def make_console_link(region, train_job_name, train_task='[Training]'):
    train_job_link = f'<b> {train_task} Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={region}#/jobs/{train_job_name}">Training Job</a></b>'   
    cloudwatch_link = f'<b> {train_task} Review <a target="blank" href="https://console.aws.amazon.com/cloudwatch/home?region={region}#logStream:group=/aws/sagemaker/TrainingJobs;prefix={train_job_name};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a></b>'
    return train_job_link, cloudwatch_link  
        
train_job_link, cloudwatch_link = make_console_link(region, train_job_name, '[PyTorch DeepSpeed Training]')

display(HTML(train_job_link))
display(HTML(cloudwatch_link))

In [18]:
sm_estimator.logs()

2024-05-21 12:06:42 Starting - Starting the training job
2024-05-21 12:06:42 Pending - Training job waiting for capacity...............
2024-05-21 12:09:12 Pending - Preparing the instances for training......
2024-05-21 12:10:05 Downloading - Downloading input data...
2024-05-21 12:10:35 Downloading - Downloading the training image..................
2024-05-21 12:13:36 Training - Training image download completed. Training in progress....bash: cannot set terminal process group (-1): Inappropriate ioctl for device
bash: no job control in this shell
2024-05-21 12:14:04,082 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training
2024-05-21 12:14:04,144 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)
2024-05-21 12:14:04,155 sagemaker_pytorch_container.training INFO     Block until all host DNS lookups succeed.
2024-05-21 12:14:04,158 sagemaker_pytorch_container.training INFO     Invoking user training script.
2024

UnexpectedStatusException: Error for Training job deepspeed-nsmc-2024-05-21-12-06-35-2024-05-21-12-06-40-420: Failed. Reason: AlgorithmError: InstallRequirementsError:
ExitCode 1
ErrorMessage ""
Command "/opt/conda/bin/python3.8 -m pip install -r requirements.txt", exit code: 1

<br>

## 4. (Optional) Inference
---

### Copy S3 model artifact to local directory

S3에 저장된 모델 아티팩트를 로컬 경로로 복사하여 압축을 해제합니다.

In [None]:
import json, os

local_model_dir = 'model_from_sagemaker'

if not os.path.exists(local_model_dir):
    os.makedirs(local_model_dir)

!aws s3 cp {sm_estimator.model_data} {local_model_dir}/model.tar.gz
!tar -xzf {local_model_dir}/model.tar.gz -C {local_model_dir}

download: s3://sagemaker-us-east-1-057716757052/deepspeed-nsmc-2024-05-21-11-30-10-2024-05-21-11-30-13-891/output/model.tar.gz to model_from_sagemaker/model.tar.gz
tar: Ignoring unknown extended header keyword `LIBARCHIVE.creationtime'


In [None]:
import torch
import transformers
import numpy as np
from collections import OrderedDict
from transformers import BertForSequenceClassification, AutoTokenizer
        
MODEL_NAME = "bert-base-multilingual-cased"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = BertForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=2)#.to(device)

Some weights of the model checkpoint at bert-base-multilingual-cased were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-multilingual

### Load DDP model to a non-DDP model
데이터 병렬화를 적용하여 모델을 훈련하면 모델의 weight의 key값에 `module`이 붙게 되어 모델 로딩 시 오류가 발생합니다. 따라서, 이를 제거해 주는 후처리 과정이 필요합니다. 후처리가 번거롭다면, DDP로 훈련 후 저장할 때 명시적으로 `module`를 제외하고 저장하는 방법도 있습니다.

참조: https://discuss.pytorch.org/t/how-to-switch-model-trained-on-2-gpus-to-1-gpu/20039

```python
model_to_save = model.module if hasattr(model, 'module') else model
...
model_to_save.state_dict()
torch.save({'model': model_to_save.state_dict())
```


In [None]:
import glob
model_filename = glob.glob(f'{local_model_dir}/*.pt')[0]
state_dict = torch.load(model_filename)

new_state_dict = {}
for key in state_dict:
    new_key = key.replace('module.','')
    new_state_dict[new_key] = state_dict[key]

model.load_state_dict(new_state_dict)

<All keys matched successfully>

In [None]:
text = "이 영화 너무 재미있어요"
encode_plus_token = tokenizer.encode_plus(
    text,
    max_length=128,
    add_special_tokens=True,
    return_token_type_ids=False,
    padding="max_length",
    return_attention_mask=True,
    return_tensors="pt",
    truncation=True
)

output = model(**encode_plus_token)
softmax_fn = torch.nn.Softmax(dim=1)
softmax_output = softmax_fn(output[0])
_, prediction = torch.max(softmax_output, dim=1)

predicted_class_idx = prediction.item()
score = softmax_output[0][predicted_class_idx]
print(f"predicted_class: {predicted_class_idx}, score={score}")

predicted_class: 1, score=0.9327744841575623
