## Amazon SageMaker Initialization
Run the following cell to import SageMaker modules and retrieve information of your current SageMaker work environment, such as your AWS account ID, the AWS Region, and the ARN of your Amazon SageMaker execution role. Upgrade SageMaker SDK to the latest version.

**NOTE:** This step might require a kernel restart.

# setup

In [None]:
FILE_SYSTEM_ID = "..."
FSX_SECURITY_GROUP_ID = "..."
FSX_SUBNET = "..."
BASE_PATH = "..."
PRETRAINED_MODEL = "..."
PRETRAINED_DIR = "..."

In [None]:
%pip install --upgrade "sagemaker>=2.212"
%pip install sagemaker-experiments

Collecting sagemaker>=2.212
  Downloading sagemaker-2.218.0-py3-none-any.whl.metadata (14 kB)
Collecting dill>=0.3.8 (from pathos->sagemaker>=2.212)
  Using cached dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting multiprocess>=0.70.16 (from pathos->sagemaker>=2.212)
  Using cached multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Downloading sagemaker-2.218.0-py3-none-any.whl (1.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hUsing cached dill-0.3.8-py3-none-any.whl (116 kB)
Using cached multiprocess-0.70.16-py310-none-any.whl (134 kB)
Installing collected packages: dill, multiprocess, sagemaker
  Attempting uninstall: dill
    Found existing installation: dill 0.3.7
    Uninstalling dill-0.3.7:
      Successfully uninstalled dill-0.3.7
  Attempting uninstall: multiprocess
    Found existing installation: multiprocess 0.70.15
    Uninstalling multiprocess-0.70.15:
      Successfully

In [None]:
%%time
import os

import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.pytorch import PyTorch

role = (
    get_execution_role()
)  # provide a pre-existing role ARN as an alternative to creating a new role
print(f"SageMaker Execution Role: {role}")

client = boto3.client("sts")
account = client.get_caller_identity()["Account"]
print(f"AWS account: {account}")

session = boto3.session.Session()
region = session.region_name
print(f"AWS region: {region}")

sm_boto_client = boto3.client("sagemaker")
sagemaker_session = sagemaker.session.Session(boto_session=session)

# get default bucket
default_bucket = sagemaker_session.default_bucket()
print("Default bucket for this session: ", default_bucket)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
SageMaker Execution Role: arn:aws:iam::905418125508:role/service-role/AmazonSageMaker-ExecutionRole-20240317T151227
AWS account: 905418125508
AWS region: us-east-1
Default bucket for this session:  sagemaker-us-east-1-905418125508
CPU times: user 1.95 s, sys: 224 ms, total: 2.18 s
Wall time: 2.84 s


## Download and prepare GLUE/SST2 data
Here you will download, prepare the GLUE/SST2 dataset and then copy the files to S3.

### Install the Hugging Face Transformers and Datasets libraries

In [None]:
! pip install -q datasets==2.15.0 transformers pytest

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pathos 0.3.2 requires dill>=0.3.8, but you have dill 0.3.7 which is incompatible.
pathos 0.3.2 requires multiprocess>=0.70.16, but you have multiprocess 0.70.15 which is incompatible.[0m[31m
[0m

In [None]:
import datasets
from datasets import load_dataset, load_from_disk, load_metric

In [None]:
from sagemaker.pytorch import PyTorch
import transformers
import logging

from transformers import (
    AutoTokenizer,
)

from transformers.testing_utils import CaptureLogger

None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


In [None]:
logger = logging.getLogger(__name__)

# Config Setup

### Choose Model
Choose to train either the GPT-NeoX or Llama-v2 model.

In [None]:
model_type = "llama_v2"  # [gpt_neox, llama_v2]
max_context_width = 4096  # For Llama v2 model

### Load data
This section loads the [GLUE/SST2](https://huggingface.co/datasets/glue/viewer/sst2/train) dataset and splits it to training and validation datasets. You can update this section to load any HuggingFace dataset you want.

In [None]:
hyperparameters = {
    "dataset_name": "glue",
    "dataset_config_name": "sst2",
    "do_train": True,
    "do_eval": True,
    "cache_dir": "tmp",
}

In [None]:
raw_datasets = load_dataset(
    hyperparameters["dataset_name"],
    hyperparameters["dataset_config_name"],
)

In [None]:
# Remove existing validation dataset as it is too small
# to shard across all ranks.
del raw_datasets["validation"]
if "validation" not in raw_datasets.keys():
    validation_percentage = "10%"
    raw_datasets["validation"] = load_dataset(
        hyperparameters["dataset_name"],
        hyperparameters["dataset_config_name"],
        split=f"train[:{validation_percentage}]",
        cache_dir=hyperparameters["cache_dir"],
    )

    raw_datasets["train"] = load_dataset(
        hyperparameters["dataset_name"],
        hyperparameters["dataset_config_name"],
        split=f"train[{validation_percentage}:]",
        cache_dir=hyperparameters["cache_dir"],
    )

### Load tokenizer
Nearly every NLP task begins with a tokenizer. A tokenizer converts your text data into a format (token) that can be processed by the NLP model.
The following cell loads a tokenizer for GPT-NeoX-7B using [AutoTokenizer.from_pretrained()](https://huggingface.co/docs/transformers/v4.19.4/en/autoclass_tutorial#autotokenizer).

In [None]:
tokenizer_kwargs = {
    "cache_dir": hyperparameters["cache_dir"],
}

# Pretrained meta-llama/Llama-2-7b-hf requires HuggingFace access, https://huggingface.co/meta-llama/Llama-2-7b-hf
# There also exist pretrained models without special access requirement e.g., https://huggingface.co/NousResearch/Llama-2-7b-chat-hf
PRETRAINED_MODEL='NousResearch/Llama-2-7b-chat-hf'
tokenizer = AutoTokenizer.from_pretrained(PRETRAINED_MODEL, **tokenizer_kwargs)

### Preprocess data

The following two cells set up a function to run the tokenizer and group texts into chunks smaller than the block size.

In [None]:
def tokenize_function(examples):
    tok_logger = transformers.utils.logging.get_logger("transformers.tokenization_utils_base")

    with CaptureLogger(tok_logger) as cl:
        output = tokenizer(examples[text_column_name])
        # clm input could be much much longer than block_size
        if "Token indices sequence length is longer than the" in cl.out:
            tok_logger.warning(
                "^^^^^^^^^^^^^^^^ Please ignore the warning above - this long input will be chunked into smaller bits before being passed to the model."
            )
    return output


# Main data processing function that will concatenate all texts from our dataset and generate chunks of block_size.
def group_texts(block_size, examples):
    # Concatenate all texts.
    concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
    # customize this part to your needs.
    if total_length >= block_size:
        total_length = (total_length // block_size) * block_size
        # Split by chunks of max_len.
        result = {
            k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
            for k, t in concatenated_examples.items()
        }
    result["labels"] = result["input_ids"].copy()
    return result

In [None]:
column_names = raw_datasets["train"].column_names
text_column_name = "text" if "text" in column_names else column_names[0]

# since this will be pickled to avoid _LazyModule error in Hasher force logger loading before tokenize_function
tok_logger = transformers.utils.logging.get_logger("transformers.tokenization_utils_base")

tokenized_datasets = raw_datasets.map(
    tokenize_function,
    batched=True,
    num_proc=1,
    remove_columns=column_names,
    desc="Running tokenizer on dataset",
)

import functools

lm_datasets = tokenized_datasets.map(
    functools.partial(group_texts, max_context_width),
    batched=True,
    #     num_proc=args.preprocessing_num_workers,
    desc=f"Grouping texts in chunks of {max_context_width}",
)

In [None]:
if hyperparameters["do_train"]:
    if "train" not in tokenized_datasets:
        raise ValueError("--do_train requires a train dataset")
    train_dataset = lm_datasets["train"]


if hyperparameters["do_eval"]:
    if "validation" not in tokenized_datasets:
        raise ValueError("--do_eval requires a validation dataset")
    eval_dataset = lm_datasets["validation"]

In [None]:
training_dataset_location = None
validation_dataset_location = None


if hyperparameters["do_train"]:
    train_dataset.to_json("./training.json")
    training_dataset_location = "s3://{}/dataset/train/".format(default_bucket)

if hyperparameters["do_eval"]:
    eval_dataset.to_json("./validation.json")
    validation_dataset_location = "s3://{}/dataset/validation/".format(default_bucket)

Creating json from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Creating json from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

In [None]:
if training_dataset_location is not None:
    command = "aws s3 cp ./training.json {}".format(training_dataset_location)
    os.system(command)

if validation_dataset_location is not None:
    command = "aws s3 cp ./validation.json {}".format(validation_dataset_location)
    os.system(command)

upload: ./training.json to s3://sagemaker-us-east-1-905418125508/dataset/train/training.json
upload: ./validation.json to s3://sagemaker-us-east-1-905418125508/dataset/validation/validation.json


In [None]:
if hyperparameters["do_train"]:
    command = "rm ./training.json"
    os.system(command)

if hyperparameters["do_eval"]:
    command = "rm ./validation.json"
    os.system(command)

In [None]:
%store training_dataset_location
%store validation_dataset_location

Stored 'training_dataset_location' (str)
Stored 'validation_dataset_location' (str)


In [None]:
%store

Stored variables and their in-db values:
training_dataset_location               -> 's3://sagemaker-us-east-1-905418125508/dataset/tra
validation_dataset_location             -> 's3://sagemaker-us-east-1-905418125508/dataset/val


## Specify Amazon S3 Bucket Paths
Here you need to specify the paths for training data to be used by your job. The bucket used must be in the same region as where training will run. In the cells above you downloaded the GLUE/SST2 training and validation split datasets and uploaded the json files in an S3 bucket in your account. This example will train on those json files.

After you successfully run this example tensor parallel + fully sharded data parallel training job, you can modify the S3 bucket to where your own dataset is stored.

In [None]:
%store -r training_dataset_location
%store -r validation_dataset_location

In [None]:
s3_train_bucket = training_dataset_location
s3_test_bucket = validation_dataset_location

The following S3 bucket will store the output artifacts of the training job. You can modify this as needed.

In [None]:
s3_output_bucket = f"s3://sagemaker-{region}-{account}/smp-fsdp/{model_type}-outputdir/"

## Define Data Channels for SageMaker Training Using Amazon S3
In this step, define SageMaker training data channels to the S3 buckets.



In [None]:
# Set below var to True if you want to use fsx (see next cell)
use_fsx = False
if not use_fsx:
    if s3_train_bucket != None:
        train = sagemaker.inputs.TrainingInput(
            s3_train_bucket, distribution="FullyReplicated", s3_data_type="S3Prefix"
        )
        data_channels = {"train": train}
    else:
        data_channels = {"train": mock_data}
    if s3_test_bucket != None:
        test = sagemaker.inputs.TrainingInput(
            s3_test_bucket, distribution="FullyReplicated", s3_data_type="S3Prefix"
        )
        data_channels["test"] = test
    else:
        data_channels["test"] = mock_data

In [None]:
tensor_parallel_degree = 8  # An integer in [1, world_size]. Note: we recommend using TP_DEGREE in [1,8] for intra-node communication as inter-node TP communication is slow.
hybrid_shard_degree = (
    0  # An integer in [0, world_size // tensor_parallel_degree] and its default value is 0.
)
offload_activations = True  # Enables SM activation offloading implementation.
activation_loading_horizon = (
    2  # Activation loading horizon, a positive integer and its default value is 2.
)
save_steps = 50  # Save step interval.
max_steps = 50  # Maximum training steps.

hyperparameters = {
    "train_batch_size": 2,
    "val_batch_size": 4,
    "fast_validation": 0,
    "max_steps": max_steps,
    "epochs": 100,
    "seed": 12345,
    "bf16": 1,
    "fp8":0,
    "lr": 0.0001,
    "min_lr": 1e-05,
    "beta1": 0.9,
    "beta2": 0.95,
    "lr_decay_style": "cosine",
    "lr_decay_iters": 47683,
    "warmup": 0.0032,
    "plateau": 0.0,
    "delayed_param": 1,
    "num_kept_checkpoints": 2,
    "checkpoint_freq": save_steps,
    "checkpoint_dir": "/opt/ml/checkpoints",
    "validation_freq": save_steps,
    "logging_freq": 1,
    "weight_decay": 0.2,
    "clean_cache": 0,
    "activation_checkpointing": 1,
    "enable_memory_profiling": 0,
    "forward_prefetch": 1,
    "vocab_size": 50257,
    "limit_all_gathers": 1,
    "backward_fetch_policy": "backward_pre",
    "sharding_strategy": "hybrid_shard",
    "auto_wrap_policy": "transformer_auto_wrap_policy",
    "model_type": model_type,
    "use_smp_flash_attn": 1,
    "use_smp_implementation": 1,
    "distributed_backend": "nccl",
}

if use_fsx:
    # make sure to update paths for training_dir and test_dir based on the paths of datasets in fsx
    # If you want to resume training, set checkpoint_dir to the same path as a previous job.
    SM_TRAIN_DIR = "/opt/ml/input/data/train"
    hyperparameters["checkpoint_dir"] = f"{SM_TRAIN_DIR}/smp-v2/{model_type}/checkpointdir"
    hyperparameters["training_dir"] = f"{SM_TRAIN_DIR}/datasets/c4/en/hf-tokenized/llama/train"
    hyperparameters["test_dir"] = f"{SM_TRAIN_DIR}/datasets/c4/en/hf-tokenized/llama/val"
    hyperparameters["zipped_data"] = 1
    hyperparameters["dataset_type"] = "hf"
else:
    hyperparameters["zipped_data"] = 0
    hyperparameters["dataset_type"] = "gpt_jsonl"

# The checkpoint path (hyperparameters['checkpoint_dir'] or checkpoint_s3_uri) is not unique per job.
# You need to modify as needed for different runs.
# If same path is used for unrelated runs, this may increase time when downloading unnecessary checkpoints,
# and cause conflicts when loading checkpoints.

metric_definitions = [
    {"Name": "base_metric", "Regex": "<><><><><><>"}
]  # Add your custom metric definitions

In [None]:
if use_fsx:
    hyperparameters["hf_pretrained_model_name_or_dir"] = f"{SM_TRAIN_DIR}{PRETRAINED_DIR}"
else:
    hyperparameters["hf_pretrained_model_name_or_dir"] = PRETRAINED_MODEL

In [None]:
# Select your model size.
model_config = "7b"  # [7b, 65b]

if model_type == "llama_v2":
    if model_config == "7b":
        model_params = {
            "max_context_width": 4096,
            "hidden_width": 4096,
            "num_layers": 32,
            "num_heads": 32,
            "llama_intermediate_size": 11008,
        }
    elif model_config == "65b":
        model_params = {
            "max_context_width": 4096,
            "hidden_width": 8192,
            "num_layers": 80,
            "num_heads": 64,
            "llama_intermediate_size": 22016,
        }
    else:
        raise RuntimeError("Unknown model config")

for k, v in model_params.items():
    hyperparameters[k] = v

## Specify Essential Parameters for a SageMaker Training Job
Next, you use the `SageMaker Estimator class` to define a SageMaker Training Job, passing values through the following parameters for training job name, the number of EC2 instances, the instance type, and the size of the volume attached to the instances.

- `instance_count`
- `instance_type`
- `volume_size`
- `base_job_name`

### Update the Type and Number of EC2 Instance to Use
The instance type and the number of instances you specify to the `instance_type` and `instance_count` parameters, respectively, determine the total number of GPUs (world size).
$$\text{(world size) = (the number of GPUs on a single instance)}\times\text{(the number of instances)}$$

In [None]:
instance_type = "ml.p4d.24xlarge"

# You need >= 1 p4d for 7b model.
# You need >= 8 p4d for 65b model.
instance_count = 1

# set to the number of GPUs on that instance
processes_per_host = 8

### Specify a Base Job Name

In [None]:
machine_str = instance_type.split(".")[1] + instance_type.split(".")[2][:3]
base_job_name = f'smp-{model_config}-{machine_str}-hs{hybrid_shard_degree}-ao{offload_activations}-bs{hyperparameters["train_batch_size"]}'

In [None]:
if not use_fsx:
    checkpoint_bucket = f"s3://sagemaker-{region}-{account}/"
    checkpoint_s3_uri = (
        f"{checkpoint_bucket}/experiments/smp_fsdp-{model_type}-checkpoints/{base_job_name}/"
    )

In [None]:
kwargs = {}
if use_fsx:
    # Use the security group and subnet that was used to create the fsx filesystem
    kwargs["security_group_ids"] = [fsx_security_group_id]
    kwargs["subnets"] = [fsx_subnet]

smp_estimator = PyTorch(
    entry_point="train.py",
    hyperparameters=hyperparameters,
    source_dir=os.path.join(os.getcwd(), "./shared-scripts"),
    role=role,
    checkpoint_s3_uri=checkpoint_s3_uri if not use_fsx else None,
    checkpoint_local_path=hyperparameters["checkpoint_dir"] if use_fsx else None,
    instance_type=instance_type,
    volume_size=400,
    instance_count=instance_count,
    sagemaker_session=sagemaker_session,
    distribution={
        "torch_distributed": {"enabled": True},  # Use torchrun.
        "smdistributed": {
            "modelparallel": {
                "enabled": True,
                "parameters": {
                    "tensor_parallel_degree": tensor_parallel_degree,
                    "hybrid_shard_degree": hybrid_shard_degree,
                    "sm_activation_offloading": offload_activations,
                    "activation_loading_horizon": activation_loading_horizon,
                },
            }
        },
    },
    py_version="py310",
    framework_version="2.2.0",
    # image_uri=$IMAGE,  # Either provide `framework_version` or `image_uri`
    output_path=s3_output_bucket,
    max_run=86400,
    debugger_hook_config=False,
    base_job_name=base_job_name,
    metric_definitions=metric_definitions,
    **kwargs,
)

# FIT ESTIMATOR

Finally, run the estimator.fit method to launch the SageMaker fine-tuning job of the model with hybrid sharding and activation offloading.

In [None]:
smp_estimator.fit(inputs=data_channels)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker:Creating training-job with name: smp-7b-p4d24x-hs0-aoTrue-bs2-2024-03-27-23-51-11-911


2024-03-27 23:51:12 Starting - Starting the training job...
2024-03-27 23:51:31 Pending - Training job waiting for capacity...
2024-03-27 23:51:58 Pending - Preparing the instances for training........................
2024-03-27 23:56:03 Downloading - Downloading input data.........
2024-03-27 23:57:33 Downloading - Downloading the training image............
2024-03-27 23:59:24 Training - Training image download completed. Training in progress.....[34mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[34mbash: no job control in this shell[0m
[34m2024-03-28 00:00:15,447 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training[0m
[34m2024-03-28 00:00:15,555 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2024-03-28 00:00:15,564 sagemaker_pytorch_container.training INFO     Block until all host DNS lookups succeed.[0m
[34m2024-03-28 00:00:15,565 sagemaker_pytorch_con

## Accessing the launched SM training job
You can access the launched training job from [SageMaker](https://docs.aws.amazon.com/sagemaker/latest/dg/whatis.html).  
Go to `Amazon SageMaker -> Training -> Training jobs`.  
You can also access the training logs from here with `View Logs` which opens CloudWatch directly.

## Accessing the Training Logs

You can access the training logs from [Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html).

You can use CloudWatch to track SageMaker GPU and memory utilization during training and inference. To view the metrics and logs that SageMaker writes to CloudWatch, see [SageMaker Jobs and Endpoint Metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/monitoring-cloudwatch.html#cloudwatch-metrics-jobs) in the Amazon SageMaker Developer Guide.

If you are a new user of CloudWatch, see [Getting Started with Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/GettingStarted.html).

For additional information on monitoring and analyzing Amazon SageMaker training jobs, see [Monitor and Analyze Training Jobs Using Metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/training-metrics.html).

## Deploying Trained Model for Inference

In most cases, a trained model can be deployed on a single device for inference because inference only requires a small amount of memory.

After you build and train your models, you can deploy them to get predictions in one of two ways:

* To set up a persistent endpoint to get predictions from your models, use SageMaker hosting services. For an overview on deploying a single model or multiple models with SageMaker hosting services, see [Deploy a Model on SageMaker Hosting Services](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-deployment.html#how-it-works-hosting).
* To get predictions for an entire dataset, use SageMaker batch transform. For an overview on deploying a model with SageMaker Batch Transform, see [Get Inferences for an Entire Dataset with Batch Transform](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-batch.html).

To learn more about deploying models for inference using SageMaker, see [Deploy Models for Inference](https://docs.aws.amazon.com/sagemaker/latest/dg/deploy-model.html).
