# ModelTrainer Demo - Distributed Training Using Torchrun

### Prerequisites

In [None]:
!pip uninstall sagemaker-core -y

In [None]:
!pip install sagemaker ../../dist/sagemaker_core-1.0.1.tar.gz

In [None]:
!pip install "transformers" "datasets[s3]" "sagemaker" "boto3" --upgrade --quiet

### Setup - AWS SageMaker Session, Bucket, Role

In [None]:
import sagemaker
import boto3
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

### Load and Prepare the Dataset

In [None]:
import os

access_token = os.environ.get("HF_TOKEN", "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") # update the access_token and change the model name to use llama 2 
model_id = "meta-llama/Llama-2-7b-hf"
model_id_2 = "facebook/opt-13b"

dataset_name = "tatsu-lab/alpaca"

In [None]:
from datasets import load_dataset
from transformers import AutoTokenizer 

from huggingface_hub.hf_api import HfFolder;

# Load Tokenizer 
tokenizer = AutoTokenizer.from_pretrained(model_id, access_token=access_token)

# Load dataset from huggingface.co
dataset = load_dataset(dataset_name)

# downsample dataset to 10k
dataset = dataset.shuffle(42)


#### Split dataset into Train and Validation Sets

In [None]:
if "validation" not in dataset.keys():
    dataset["validation"] = load_dataset(
        dataset_name,
        split="train[:1%]"
    )

    dataset["train"] = load_dataset(
        dataset_name,
        split="train[1%:]"
    )

#### Prepare Data - Tokenize and Chunk Dataset 

In [None]:
from itertools import chain
from functools import partial


def group_texts(examples,block_size = 2048):
        # Concatenate all texts.
        concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()}
        total_length = len(concatenated_examples[list(examples.keys())[0]])
        # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
        # customize this part to your needs.
        if total_length >= block_size:
            total_length = (total_length // block_size) * block_size
        # Split by chunks of max_len.
        result = {
            k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
            for k, t in concatenated_examples.items()
        }
        result["labels"] = result["input_ids"].copy()
        return result

column_names = dataset["train"].column_names

lm_dataset = dataset.map(
    lambda sample: tokenizer(sample["text"],return_token_type_ids=False), batched=True, remove_columns=list(column_names)
).map(
    partial(group_texts, block_size=2048),
    batched=True,
)

### Save Data Locally and in S3

In [None]:
#save data locally

training_input_path = f'processed/data/'
lm_dataset.save_to_disk(training_input_path)

print(f"Saved data to: {training_input_path}")

In [None]:
training_input_path = f's3://{sess.default_bucket()}/processed/data/'
print(f"training dataset to: {training_input_path}")# save train_dataset to s3
lm_dataset.save_to_disk(training_input_path)

print(f"uploaded data to: {training_input_path}")

# Distributed Training Using Torchrun (multiprocess single host)

## Huggingface Estimator Example - Single Node

In [None]:
import time
from sagemaker.huggingface import HuggingFace
from sagemaker.pytorch import PyTorch
# define Training Job Name 
job_name = f'huggingface-fsdp-{time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())}'


# hyperparameters, which are passed into the training job
hyperparameters={
    'bf16': True, # enable mixed precision training
    'dataset_path': '/opt/ml/input/data/train', # path where sagemaker will save training dataset
    'epochs': 1, # number of epochs to train
    'max_steps':100,
    'fsdp': '"full_shard auto_wrap"', # fully sharded data parallelism
    'fsdp_transformer_layer_cls_to_wrap': "LlamaDecoderLayer", # transformer layer to wrap
    'gradient_checkpointing': True, # enable gradient checkpointing
    'optimizer': "adamw_torch", # optimizer
    'per_device_train_batch_size': 1, # batch size per device during training
    'model_id': model_id, # model id from huggingface.co/models
    'access_token': access_token
}

# this environment variables are required for P4d instances to enable EFA.
env = {}
env['FI_PROVIDER'] = 'efa'
env['NCCL_PROTO'] = 'simple'
env['FI_EFA_USE_DEVICE_RDMA'] = '1'
env['RDMAV_FORK_SAFE'] = '1'

# estimator 
huggingface_estimator = HuggingFace(
    entry_point='run_clm_lora.py',
    source_dir='./scripts',
    instance_type="ml.p4d.24xlarge",
    instance_count=1,
    volume_size=96,
    role=role,
    job_name=job_name,
    transformers_version='4.28.1',
    pytorch_version='2.0.0',
    py_version="py310",
    environment=env,
    hyperparameters = hyperparameters,
    disable_output_compression=True,
    keep_alive_period_in_seconds=600,
    distribution={"torch_distributed": {"enabled": True}} # enable torchrun 
)

In [None]:
# define a data input dictonary with our uploaded s3 uris
data = {'train': training_input_path}

# starting the train job with our uploaded datasets as input
huggingface_estimator.fit(data, wait=True)

## Model Trainer Examples - Single Node

In [None]:
import sys
sys.path.append("..") 

from model_trainer.ModelTrainer import ModelTrainer, ImageSpec, SourceCodeConfig, TorchDistributedConfig, TrainingRunMode
from sagemaker_core.shapes import ResourceConfig

# this environment variables are required for P4d instances to enable EFA.
env = {}
env['FI_PROVIDER'] = 'efa'
env['NCCL_PROTO'] = 'simple'
env['FI_EFA_USE_DEVICE_RDMA'] = '1'
env['RDMAV_FORK_SAFE'] = '1'


instance_type = "ml.p4d.24xlarge" 
resource_config = ResourceConfig(
    instance_count=1,
    instance_type=instance_type,
    volume_size_in_gb=96,
)

image_spec = ImageSpec(
    instance_type=instance_type,
    version="4.28.1",
    framework_name="huggingface",
    base_framework_version="pytorch2.0.0",
    image_scope="training",
    py_version="py310",
    distribution={"torch_distributed": {"enabled": True}},
)

print(image_spec.get_image_uri())


In [None]:
model_trainer = ModelTrainer(
    training_image=image_spec,
    role=role,
    environment=env,
    resource_config=resource_config,
)

##### Example 1

Install requirements and set all training arguments in user set `command` parameter

In [None]:
source_code_config = SourceCodeConfig(
    source_dir="scripts",
    command=f"/bin/sh -c\
        'pip install -r /opt/ml/input/data/code/requirements.txt && \
        torchrun --nnodes 1 \
            --nproc_per_node 8 \
            /opt/ml/input/data/code/run_clm_lora.py \
            --bf16 True \
            --dataset_path /opt/ml/input/data/train \
            --valid_path /opt/ml/input/data/valid \
            --epochs 1 \
            --max_steps 100 \
            --fsdp \"full_shard auto_wrap\" \
            --fsdp_transformer_layer_cls_to_wrap LlamaDecoderLayer \
            --gradient_checkpointing True \
            --optimizer adamw_torch \
            --per_device_train_batch_size 1 \
            --model_id {model_id} \
            --access_token {access_token}'"
)

In [None]:
data_path = "processed/data/"

model_trainer.run(
    source_code_config=source_code_config,
    inputs={"dataset": data_path},
)

##### Issues Noted With Example 1

1. Internally Pass everything in the -c '{sub_command}' as a single argument get error like below (using shlex.split())

```bash
ClientError: An error occurred (ValidationException) when calling the CreateTrainingJob operation: 1 validation 
error detected: Value '[-c, pip install -r /opt/ml/input/data/code/requirements.txt &&         torchrun --nnodes 1 
--nproc_per_node 4             --master_addr algo-1             --master_port 7777             
/opt/ml/input/data/code/run_clm_lora.py             --bf16 True             --dataset_path /opt/ml/input/data/train
--valid_path /opt/ml/input/data/valid             --output_dir /opt/ml/model             --epochs 1             
--max_steps 100             --fsdp "full_shard auto_wrap"             --fsdp_transformer_layer_cls_to_wrap 
LlamaDecoderLayer             --gradient_checkpointing True             --optimizer adamw_torch             
--per_device_train_batch_size 1             --model_id meta-llama/Llama-2-7b-hf             --access_token 
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]' at 'algorithmSpecification.containerArguments' failed to satisfy 
constraint: Member must satisfy constraint: [Member must have length less than or equal to 256, Member must have 
length greater than or equal to 0, Member must satisfy regular expression pattern: .*]
```


2. Internally split everything str.split(" ") (get some error)
- https://tiny.amazon.com/xagznvde/IsenLink
- https://tiny.amazon.com/1hk2yznli/IsenLink

##### Example 2
Hyperparamters set directly in the command and run `os.system("pip install -r /opt/ml/input/data/code/requirements.txt")` inside of the training scipt to install requirements

In [None]:
# this environment variables are required for P4d instances to enable EFA.
env = {}
env['FI_PROVIDER'] = 'efa'
env['NCCL_PROTO'] = 'simple'
env['FI_EFA_USE_DEVICE_RDMA'] = '1'
env['RDMAV_FORK_SAFE'] = '1'


source_code_config = SourceCodeConfig(
    source_dir="scripts",
    command=f"torchrun --nnodes 1 \
            --nproc_per_node 8 \
            /opt/ml/input/data/code/run_clm_lora.py \
            --bf16 True \
            --dataset_path /opt/ml/input/data/dataset \
            --epochs 1 \
            --max_steps 100 \
            --fsdp \"full_shard auto_wrap\" \
            --fsdp_transformer_layer_cls_to_wrap LlamaDecoderLayer \
            --gradient_checkpointing True \
            --optimizer adamw_torch \
            --per_device_train_batch_size 1 \
            --model_id {model_id} \
            --access_token {access_token}"
)

In [None]:
dataset_path = "processed/data/"

model_trainer.run(
    source_code_config=source_code_config,
    inputs={"dataset": dataset_path},
)

##### Example 3

Hyperparameters passed as parameters in the TraininJob API - let platform pass these as arguments

In [None]:
source_code_config = SourceCodeConfig(
    source_dir="scripts",
    command=f"torchrun --nnodes 1 --nproc_per_node 8 /opt/ml/input/data/code/run_clm_lora.py"
)

In [None]:
# hyperparameters, used directly in the ContainerScriptConfig
hyperparameters={
    'bf16': True, # enable mixed precision training
    'dataset_path': '/opt/ml/input/data/train', #path where to access training dataset
    'epochs': 1, # number of epochs to train
    'max_steps':100,
    'fsdp': '"full_shard auto_wrap"', # fully sharded data parallelism
    'fsdp_transformer_layer_cls_to_wrap': "LlamaDecoderLayer", # transformer layer to wrap
    'gradient_checkpointing': True, # enable gradient checkpointing
    'optimizer': "adamw_torch", # optimizer
    'per_device_train_batch_size': 1, # batch size per device during training
    'model_id': model_id, # model id from huggingface.co/models
    'access_token': access_token
}

dataset_path = "processed/data/"

model_trainer.run(
    source_code_config=source_code_config,
    hyper_parameters=hyperparameters,
    inputs={"dataset": dataset_path},
)

##### Example 4

Setup ContainerEntryPoint and ContainerArguments for user (with requirements.txt not possible but would be really nice experience)

In [None]:
source_code_config = SourceCodeConfig(
    source_dir="scripts", # path to source code directory
    command=f"torchrun --nnodes 1 --nproc_per_node 4 /opt/ml/input/data/code/run_clm_lora.py", # command to run the training script
    requirements="requirements.txt" # path to requirements file within source code directory
)

In [None]:

# hyperparameters, used directly in the ContainerScriptConfig
hyperparameters={
    'bf16': True, # enable mixed precision training
    'dataset_path': '/opt/ml/input/data/dataset', # path where to access training dataset
    'epochs': 1, # number of epochs to train
    'max_steps':100,
    'fsdp': '"full_shard auto_wrap"', # fully sharded data parallelism
    'fsdp_transformer_layer_cls_to_wrap': "LlamaDecoderLayer", # transformer layer to wrap
    'gradient_checkpointing': True, # enable gradient checkpointing
    'optimizer': "adamw_torch", # optimizer
    'per_device_train_batch_size': 1, # batch size per device during training
    'model_id': model_id, # model id from huggingface.co/models
    'access_token': access_token
}

dataset_path = "processed/data/" # local path to training dataset

model_trainer.run(
    source_code_config=source_code_config, 
    hyperparameters=hyperparameters,
    inputs={"dataset": dataset_path},
)


# Distributed Training Accross Multiple Nodes

## Estimator Example - Multi Node

In [None]:
import time
from sagemaker.huggingface import HuggingFace
from sagemaker.pytorch import PyTorch
# define Training Job Name 
job_name = f'huggingface-fsdp-{time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())}'


# hyperparameters, which are passed into the training job
hyperparameters={
    'bf16': True, # enable mixed precision training
    'dataset_path': '/opt/ml/input/data/train', # path where sagemaker will save training dataset
    'epochs': 1, # number of epochs to train
    'max_steps':100,
    'fsdp': '"full_shard auto_wrap"', # fully sharded data parallelism
    'fsdp_transformer_layer_cls_to_wrap': "LlamaDecoderLayer", # transformer layer to wrap
    'gradient_checkpointing': True, # enable gradient checkpointing
    'optimizer': "adamw_torch", # optimizer
    'per_device_train_batch_size': 1, # batch size per device during training
    'model_id': model_id, # model id from huggingface.co/models
    'access_token': access_token
}

# this environment variables are required for P4d instances to enable EFA.
env = {}
env['FI_PROVIDER'] = 'efa'
env['NCCL_PROTO'] = 'simple'
env['FI_EFA_USE_DEVICE_RDMA'] = '1'
env['RDMAV_FORK_SAFE'] = '1'

# estimator 
huggingface_estimator = HuggingFace(
    entry_point='run_clm_no_trainer.py',
    source_dir='./scripts',
    instance_type="ml.p4d.24xlarge",
    instance_count=2,
    volume_size=96,
    role=role,
    job_name=job_name,
    transformers_version='4.28.1',
    pytorch_version='2.0.0',
    py_version="py310",
    environment=env,
    hyperparameters = hyperparameters,
    disable_output_compression=True,
    keep_alive_period_in_seconds=600,
    distribution={"torch_distributed": {"enabled": True}} # enable torchrun 
)

In [None]:
# define a data input dictonary with our uploaded s3 uris
data = {'train': training_input_path}

# starting the train job with our uploaded datasets as input
huggingface_estimator.fit(data, wait=True)

## ModelTrainer Example - Multi Node (Not Working Currently)

In [None]:
import sys
sys.path.append("..") 

from model_trainer.ModelTrainer import ModelTrainer, ImageSpec, SourceCodeConfig, TorchDistributedConfig, TrainingRunMode
from sagemaker_core.shapes import ResourceConfig, InstanceGroup

# this environment variables are required for P4d instances to enable EFA.
env = {}
env['FI_PROVIDER'] = 'efa'
env['NCCL_PROTO'] = 'simple'
env['FI_EFA_USE_DEVICE_RDMA'] = '1'
env['RDMAV_FORK_SAFE'] = '1'


instance_type = "ml.p4d.24xlarge" 
resource_config = ResourceConfig(
    instance_count=2,
    instance_type=instance_type,
    volume_size_in_gb=96,
)

image_spec = ImageSpec(
    instance_type=instance_type,
    version="4.28.1",
    framework_name="huggingface",
    base_framework_version="pytorch2.0.0",
    image_scope="training",
    py_version="py310",
    distribution={"torch_distributed": {"enabled": True}},
)

print(image_spec.get_image_uri())


In [None]:
model_trainer = ModelTrainer(
    training_image=image_spec,
    role=role,
    environment=env,
    resource_config=resource_config,
)

In [None]:
# this environment variables are required for P4d instances to enable EFA.
env = {}
env['FI_PROVIDER'] = 'efa'
env['NCCL_PROTO'] = 'simple'
env['FI_EFA_USE_DEVICE_RDMA'] = '1'
env['RDMAV_FORK_SAFE'] = '1'


source_code_config = SourceCodeConfig(
    source_dir="scripts",
    command=f"torchrun --nnodes 2 \
            --nproc_per_node 8 \
            --master_addr algo-1 \
            --master_port 7777 \
            /opt/ml/input/data/code/run_clm_no_trainer.py \
            --bf16 True \
            --dataset_path /opt/ml/input/data/dataset \
            --epochs 1 \
            --max_steps 100 \
            --fsdp \"full_shard auto_wrap\" \
            --fsdp_transformer_layer_cls_to_wrap LlamaDecoderLayer \
            --gradient_checkpointing True \
            --optimizer adamw_torch \
            --per_device_train_batch_size 1 \
            --model_id {model_id} \
            --access_token {access_token}"
)

In [None]:
dataset_path = "processed/data/"

model_trainer.run(
    source_code_config=source_code_config,
    inputs={"dataset": dataset_path},
)