## Train Llama2 70b SageMaker using AWS Trainium instances

This example helps to pretrain a Llama 70b model using NeuronX Distributed on trainium instances. NeuronX Distributed is a package for supporting different distributed training/inference mechanism for Neuron devices. It would provide xla friendly implementations of some of the more popular distributed training/inference techniques. The library can be easily installed via pip.

In this notebook, we showcase to pretrain a Llama2 70B model by using the tensor parallel, pipeline parallel, sequence parallel, activation checkpoint as well as constant mask optimization in the neuronx-distributed package.



### Install required packages

In [None]:
!pip install -U sagemaker boto3

In [None]:
! pip install transformers datasets[s3]

### Create and upload data

For this example we will download the [wikicorpus data](https://huggingface.co/datasets/wikicorpus) from huggingface datasets , tokenize the data with llama2 tokenizer and then upload it to S3 to use during training.

In [None]:
from huggingface_hub.hf_api import HfFolder;
access_token = "hf_xxxx" # Update the access token to download the tokenizer
HfFolder.save_token(access_token)

In [None]:
from datasets import load_dataset
from transformers import AutoTokenizer
from itertools import chain
import os

dataset_name = "wikicorpus"
dataset_config_name = "raw_en"
save_path = "data/wikicorpus_llama2_7B_tokenized_4k"


tokenizer_name = "meta-llama/Llama-2-70b-hf"

save_path = os.path.expanduser(save_path)

if not os.path.exists(save_path):
    os.makedirs(save_path)

block_size = 4096

raw_datasets = load_dataset(dataset_name, dataset_config_name)

tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)


column_names = raw_datasets["train"].column_names
text_column_name = "text" if "text" in column_names else column_names[0]

### Tokenize the data using Llama2 tokenizer

In [None]:
def tokenize_function(examples):
    return tokenizer(examples[text_column_name])

tokenized_datasets = raw_datasets.map(
    tokenize_function,
    batched=True,
    remove_columns=column_names,
    load_from_cache_file=True,
    desc="Running tokenizer on dataset",
)

if block_size > tokenizer.model_max_length:
    print("block_size > tokenizer.model_max_length")
block_size = min(block_size, tokenizer.model_max_length)

# Main data processing function that will concatenate all texts from our dataset and generate chunks of block_size.
def group_texts(examples):
    # Concatenate all texts.
    concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()}
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, and if the total_length < block_size  we exclude this batch and return an empty dict.
    # We could add padding if the model supported it instead of this drop, you can customize this part to your needs.
    total_length = (total_length // block_size) * block_size
    # Split by chunks of max_len.
    result = {
        k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
        for k, t in concatenated_examples.items()
    }
    result["labels"] = result["input_ids"].copy()
    return result

lm_datasets = tokenized_datasets.map(
    group_texts,
    batched=True,
    load_from_cache_file=True,
    desc=f"Grouping texts in chunks of {block_size}",
)

train_dataset = lm_datasets["train"]
print(len(train_dataset))

### Upload data to S3

In [None]:
import sagemaker
import boto3
sess = sagemaker.Session()

# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:  
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

In [None]:
training_input_path = f's3://{sess.default_bucket()}/neuronx_distributed/data'
print(f"uploading training dataset to: {training_input_path}")# save train_dataset to s3
train_dataset.save_to_disk(training_input_path)

print(f"uploaded data to: {training_input_path}")

At this point we should have the data uploaded to S3 and ready to kick start the training job.

### Run the training job

For the training job we will be using Trn1.32xlarge instances. Each Trn1.32xlarge instances will have 32 neuron cores and we will use Tensor parallelism and pipeline parallelism to shard the model across neuron cores and train. The below cell provides basic setting for pretraining llama 2 70b using Trn1. 

In [None]:
PROCESSES_PER_NODE = 32
WORLD_SIZE = 8 # this is the number of nodes in cluster, change this if you wan to tweak the instance_count parameter
# Global batch size
GBS=512
# Input sequence length
SEQ_LEN=4096
# Pipeline parallel degree
PP_DEGREE=8
# Tensor parallel degree
TP_DEGREE=8
# Data paralell size
DP=((PROCESSES_PER_NODE * WORLD_SIZE / TP_DEGREE / PP_DEGREE))
# Batch size per model replica
BS=((GBS / DP))
# Number microbatches for pipeline execution
# Setting same as BS so each microbatch contains a single datasample
NUM_MICROBATCHES=BS

In [None]:
hyperparameters = {}
hyperparameters["train_batch_size"] = int(BS)
hyperparameters["use_meta_device_init"] = 1
hyperparameters["training_dir"] = "/opt/ml/input/data/train" # path where sagemaker uploads the training data
hyperparameters["training_config"] = "config.json" # config file containing llama 70b configuration , change this for tweaking the number of parameters.
hyperparameters["max_steps"] = 30000
hyperparameters["seq_len"] =SEQ_LEN
hyperparameters["pipeline_parallel_size"] = PP_DEGREE
hyperparameters["tensor_parallel_size"] = TP_DEGREE
hyperparameters["num_microbatches"] = int(NUM_MICROBATCHES)
hyperparameters["lr"] = 0.00015
hyperparameters["min_lr"] = 1e-05
hyperparameters["beta1"] = 0.9
hyperparameters["beta2"] = 0.95
hyperparameters["weight_decay"] = 0.1
hyperparameters["warmup_steps"] = 2000
hyperparameters["constant_steps"] = 0
hyperparameters["use_zero1_optimizer"] = 1
hyperparameters["tb_dir"] = "/opt/ml/checkpoints/tensorboard" # The tensorboard logs will be stored here and eventually pushed to S3.

In [None]:
cache_dir = "/opt/ml/checkpoints/neuron_cache" # path to neuron cache
docker_image = "763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training-neuronx:1.13.1-neuronx-py310-sdk2.15.0-ubuntu20.04"

In [None]:
import time

from sagemaker.pytorch import PyTorch


# define Training Job Name 
job_name = f'llama-neuron-nemo-{time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())}'
checkpoint_s3_uri = "s3://" + sagemaker_session_bucket + "/nemo_llama_experiment"
checkpoint_dir = '/opt/ml/checkpoints'

env = {}

env['FI_PROVIDER'] = 'efa'
env['NCCL_PROTO'] = 'simple'
env['FI_EFA_USE_DEVICE_RDMA'] = '1'
env['RDMAV_FORK_SAFE'] = '1'
env['FI_EFA_FORK_SAFE'] = '1'
env['NCCL_SOCKET_IFNAME'] = 'ens'
#env['XLA_USE_BF16']='1'
env['NEURON_FUSE_SOFTMAX'] = '1'
env['MALLOC_ARENA_MAX'] = '128'
env['XLA_DOWNCAST_BF16'] = '1'
env['NEURON_RT_ASYNC_EXEC_MAX_INFLIGHT_REQUESTS'] = '5'

env['NCCL_SOCKET_IFNAME'] = '^lo,docker'
env['NEURON_CC_FLAGS'] = "--model-type=transformer --distribution-strategy=llm-training --enable-saturate-infinity --cache_dir=" + cache_dir

# estimator 
pt_estimator = PyTorch(
    entry_point='run_llama_nxd.py',
    source_dir='./scripts',
    instance_type="ml.trn1.32xlarge",
    image_uri=docker_image,
    instance_count=WORLD_SIZE,
    hyperparameters=hyperparameters,
    role=role,
    base_job_name=job_name,
    environment=env,
    input_mode="FastFile",
    disable_output_compression=True,
    keep_alive_period_in_seconds=600, # this is added to enable warm pool capability
    checkpoint_s3_uri=checkpoint_s3_uri,
    checkpoint_local_path=checkpoint_dir,
    distribution={"torch_distributed": {"enabled": True}} # enable torchrun 
)

In [None]:
pt_estimator.fit({"train":training_input_path})

### Terminate the warmpool 

Execute the below cell to terminate the warmpool if you no longer need it.

In [None]:
sess.update_training_job(pt_estimator.latest_training_job.job_name, resource_config={"KeepAlivePeriodInSeconds":0})

In this example we looked at how to pretrain an llama2 70b model using Amazon SageMaker training jobs on AWS Trainium instance. 