# 1: Deploy Llama 2 7b Chat HF to Inferentia2 on SageMaker

- Neuronx 2.15
- SageMaker Notebook Kernel: `conda_python3`
- SageMaker Notebook Instance Type: ml.m5d.large | ml.t3.large

In this notebook, you will use the SageMaker Python SDK to deploy a Llama 2 Chat 7b instruction tuned large language model on an endpoint with [AWS Inferentia 2 (Inf2)](https://aws.amazon.com/ec2/instance-types/inf2/) accelerators. You will create the files needed to run inference on TorchServe within a `pytorch-inference-neuronx` container. Then you will deploy and test the endpoint.

AWS Inferentia 2 instances are purpose built for deep learning (DL) inference. They deliver high performance at the lowest cost in Amazon EC2 and SageMaker for generative artificial intelligence (AI) models, including large language models (LLMs). The [AWS Neuron SDK](https://awsdocs-neuron.readthedocs-hosted.com/) helps developers deploy models on the AWS Inferentia accelerators. It integrates natively with frameworks, such as PyTorch and TensorFlow, so you can continue using your existing workflows and application code and run on Inf2 instances.

## Runtime 

This notebook takes approximately 20 minutes to run.

## Contents

1. [Prerequisites](#prerequisites)
1. [Setup](#setup)
1. [Build and push inference container](#build-and-push-inference-container)
1. [Create the TorchServe inference files](#create-the-torchserve-inference-files)
1. [Update the model archive](#update-the-model-archive)
1. [Create and deploy model endpoint](#create-and-deploy-model-endpoint)
1. [Test the endpoint](#test-the-endpoint)

## Prerequisites

- Neuron 2.15 (tp=2) compiled LLama 2 Chat model weights (created by notebook 00 or s3 uri provided by workshop)


## Setup

Let's start by installing and importing the required packages for this notebook. 

<div class="alert alert-block alert-warning"><b>Note:</b> Verify that the notebook kernel is `conda_python3`. Also, if you run into an issue where a module can't be imported after installation, restart the notebook kernel, then rerun the import notebook cell.</div>

In [None]:
%pip install --upgrade sagemaker --quiet
%pip install torch-model-archiver --quiet

In [None]:
import os
import io
import json
import boto3
import sagemaker
from IPython.display import display
from ipywidgets import widgets
from sagemaker import Model

***

Next, we will initialize the SageMaker session and create a working directory.

***

In [None]:
sagemaker_session = sagemaker.Session()
smr = sagemaker_session.sagemaker_runtime_client
role = sagemaker_session.get_caller_identity_arn()
bucket = sagemaker_session.default_bucket()
account = sagemaker_session.account_id()
region = sagemaker_session.boto_region_name

os.makedirs("build/inference", exist_ok=True)
os.makedirs("build/inference/container", exist_ok=True)

print(f"Sagemaker version: {sagemaker.__version__}")
print(f"Sagemaker role arn: {role}")
print(f"Sagemaker bucket: {bucket}")
print(f"Sagemaker session region: {region}")

## Build and push inference container

The 2.15 version of [Neuron inference container](https://github.com/aws/deep-learning-containers/blob/master/available_images.md#neuron-containers) hasn't been released yet. Let's upgrade the 2.14.1 version with the latest Neuron, transformers, and torchserve packages. 

- `Dockerfile` - Custom container definition with `pytorch-inference-neuronx` as the base. 

In [None]:
%%writefile build/inference/container/Dockerfile

FROM "763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference-neuronx:1.13.1-neuronx-py310-sdk2.14.1-ubuntu20.04"

RUN pip install --upgrade \ 
    torchserve==0.9.0 \
    neuronx-cc==2.11.0.34 \
    sentencepiece==0.1.99 \
    torch-neuronx==1.13.1.1.12.0 \
    transformers==4.34.1 \
    transformers-neuronx==0.8.268 \
    torchvision \
    --extra-index-url=https://pip.repos.neuron.amazonaws.com

***

Pull the base `pytorch-inference-neuronx` container, run docker build, then upload the custom container to AWS ECR. This cell can take **10** minutes to run and doesn't produce any output, so please be patient.

***

In [None]:
docker_build_out, docker_build_err = "", ""

In [None]:
%%sh --out docker_build_out --err docker_build_err

# The name of ECR repository to create and push to
repository_name="neuronx-inference"

cd build/inference/container

account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:-us-west-2}

source_image_name=$(awk -F ' ' '/^FROM/ { gsub(/"/, "", $2); print $2 }' Dockerfile)

target_image_name="${account}.dkr.ecr.${region}.amazonaws.com/${repository_name}:latest"

# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${repository_name}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${repository_name}" > /dev/null
fi

# if the image doesn't exist in ECR, create it.
if ! aws ecr list-images --repository-name "${repository_name}" | jq -e '.imageIds | length > 0' >/dev/null; then
    aws ecr get-login-password --region ${region} | docker login --username AWS --password-stdin ${source_image_name}

    # Get the login command from ECR and execute it directly
    aws ecr get-login-password --region ${region} | docker login --username AWS --password-stdin ${target_image_name}

    # Build the Docker image locally with the image name and then push it to ECR
    docker build -q -t ${repository_name} --build-arg region=${region} .
    docker tag ${repository_name} ${target_image_name}

    docker push ${target_image_name}
fi

# print the name of the container so we can get it in the next cell
echo ${target_image_name}

***

Now, let's check the output of the previous command, and if successful, store the image uri in a variable for use when we deploy the endpoint.

***

In [None]:
# Let's check the docker build output for errors. If successful, print the docker image uri.
if "Error response from daemon" in str(docker_build_err):
    print(docker_build_err)
    raise SystemExit("\n\n!!There was an error with the container build!!")
else:
    image_uri = str(docker_build_out).strip().split("\n")[-1]

image_uri

## Create the TorchServe inference files



Create the `inference.py` file. This file contains the core inference logic and input and output handling.  

Notable methods:

- `initialize` - Handles the load and initialization of the model. If you have used the HuggingFace transformers library before, then this code may look familiar. The line `self.model.to_neuron()` handles loading the model onto the accelerators. 
- `preprocess` - Deserializes the request and passes the input prompts into the tokenizer. 
- `inference` - Invokes the model with the request parameters and tokenized input. Supports both streaming and non-streaming response modes.
- `postprocess` - Passes the tokens generated by the model to the tokenizer for decoding back to text before it's returned to the client.

Read through the inference.py file to understand what it's doing.

In [None]:
%%writefile build/inference/inference.py

import logging
import os
import json
import time
from abc import ABC
from threading import Thread

import torch
import torch_neuronx
from transformers import AutoConfig, LlamaTokenizer, StoppingCriteria, StoppingCriteriaList, MaxLengthCriteria
from transformers_neuronx.generation_utils import HuggingFaceGenerationModelAdapter
from transformers_neuronx.llama.model import LlamaForSampling

from ts.handler_utils.hf_batch_streamer import TextIteratorStreamerBatch
from ts.handler_utils.micro_batching import MicroBatching
from ts.protocol.otf_message_handler import send_intermediate_predict_response
from ts.torch_handler.base_handler import BaseHandler
from ts.utils.util import PredictionException

logger = logging.getLogger(__name__)


class LLMHandler(BaseHandler, ABC):
    """
    Transformers handler class for text completion streaming on Inferentia2
    """

    def __init__(self):
        super().__init__()
        self.initialized = False
        self.max_length = None
        self.tokenizer = None
        self.output_streamer = None
        # enable micro batching
        self.handle = MicroBatching(self)

    def initialize(self, ctx):
        self.manifest = ctx.manifest
        properties = ctx.system_properties
        model_dir = properties.get("model_dir")
        model_checkpoint_dir = ctx.model_yaml_config.get("handler", {}).get(
            "model_checkpoint_dir", ""
        )
        model_checkpoint_path = f"{model_dir}/{model_checkpoint_dir}"

        os.environ["NEURON_COMPILE_CACHE_URL"] = f"{model_dir}/neuron_cache"

        # -O1 - not optimized for performance
        # -O2 - default settings
        # -O3 - best performance
        os.environ["NEURON_CC_FLAGS"] = "-O3" 

        # micro batching initialization
        micro_batching_parallelism = ctx.model_yaml_config.get(
            "micro_batching", {}
        ).get("parallelism", None)
        if micro_batching_parallelism:
            logger.info(
                f"Setting micro batching parallelism from model_config_yaml: {micro_batching_parallelism}"
            )
            self.handle.parallelism = micro_batching_parallelism

        micro_batch_size = ctx.model_yaml_config.get("micro_batching", {}).get(
            "micro_batch_size", 1
        )
        logger.info(f"Setting micro batching size: {micro_batch_size}")
        self.handle.micro_batch_size = micro_batch_size

        # settings for model compilation and loading
        amp = ctx.model_yaml_config.get("handler", {}).get("amp", "f32")
        tp_degree = ctx.model_yaml_config.get("handler", {}).get("tp_degree", 6)
        self.max_length = ctx.model_yaml_config.get("handler", {}).get("max_length", 50)

        # allocate "tp_degree" number of neuron cores to the worker process
        os.environ["NEURON_RT_NUM_CORES"] = str(tp_degree)
        try:
            num_neuron_cores_available = (
                torch_neuronx.xla_impl.data_parallel.device_count()
            )
            assert num_neuron_cores_available >= int(tp_degree)
        except (RuntimeError, AssertionError) as error:
            logger.error(
                "Required number of neuron cores for tp_degree "
                + str(tp_degree)
                + " are not available: "
                + str(error)
            )

            raise error

        logger.info("LOADING TOKENIZER")
        self.tokenizer = LlamaTokenizer.from_pretrained(model_dir)
        self.tokenizer.padding_side = "left"
        self.tokenizer.pad_token = self.tokenizer.eos_token
        
        logger.info("LOADING MODEL") 
        self.model = LlamaForSampling.from_pretrained(
            model_checkpoint_path,
            batch_size=self.handle.micro_batch_size,
            amp=amp,
            tp_degree=tp_degree,
        )

        # Load compiled artifacts if they exist
        neuron_artifacts = os.path.join(model_dir, "neuron_artifacts")
        if os.path.isdir(neuron_artifacts):
            logger.info("LOADING COMPILED ARTIFACTS FROM CACHE")
            self.model.load(neuron_artifacts)
        else:
            logger.debug("COMPILING MODEL FOR NEURON")

        self.model.to_neuron()
        logger.info("Model has been successfully compiled")

        logger.debug("LOADING CONFIG")
        
        model_config = AutoConfig.from_pretrained(model_checkpoint_path)
        self.model = HuggingFaceGenerationModelAdapter(model_config, self.model)
        
        # https://github.com/pytorch/serve/blob/447f3ef2b6df2ea9171c219b6f3bd51d2f0adbc5/ts/handler_utils/hf_batch_streamer.py#L7
        self.output_streamer = TextIteratorStreamerBatch(
            self.tokenizer,
            batch_size=self.handle.micro_batch_size,
            skip_prompt=True,
            skip_special_tokens=True,
        )

        self.initialized = True

    def preprocess(self, data):
        logger.debug("PREPROCESS")
        input_text = []
        requests = []
        logger.info(f"Preprocessing {len(data)} requests")
        for req in data:
            body = req.get("body")
            request = json.loads(body)
            logger.debug(request)
            prompt = request["prompt"]
            input_text.append(prompt.strip())
            requests.append(request)

        # Ensure the compiled model can handle the input received
        if len(input_text) > self.handle.micro_batch_size:
            raise ValueError(
                f"Model is compiled for batch size {self.handle.micro_batch_size} but received input of size {len(input_text)}"
            )

        # Pad input to match compiled model batch size
        input_text.extend([""] * (self.handle.micro_batch_size - len(input_text)))
        
        return self.tokenizer(
            input_text, 
            return_tensors="pt", 
            padding=True,
            add_special_tokens=False # to turn off adding of <s> at the begging of the input
        ), requests

    @torch.no_grad()
    def inference(self, data, *args, **kwargs):
        logger.debug("INFERENCE")
        tokenized_input, requests = data
        # use the first request to set the hyper parameters
        request = requests[0]
        
        calling_method = self.context.get_request_header(0, "X-Amzn-SageMaker-Forwarded-API")
        if calling_method == "InvokeEndpointWithResponseStream":
            streaming = True
        else:
            streaming = False

        parameters = dict(
            max_new_tokens=request.get("max_tokens_to_sample", self.max_length),
            temperature=request.get("temperature", 0.6),
            top_p=request.get("top_p", 0.9),
            top_k=request.get("top_k", 50),
            do_sample=True,
        )
        logger.info(parameters)

        self.model.reset_generation()

        if streaming:
            # see https://huggingface.co/docs/transformers/main_classes/text_generation for options
            logger.debug("Using hf model generate streaming method")
            generation_kwargs = dict(
                tokenized_input,
                streamer=self.output_streamer,  
                **parameters
            )   
            
            thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
            thread.start()
    
            micro_batch_idx = self.handle.get_micro_batch_idx()
            micro_batch_req_id_map = self.get_micro_batch_req_id_map(micro_batch_idx)
            for new_texts in self.output_streamer:
                formatted_responses = [ f"{json.dumps({'outputs': [new_text]})}\n" for new_text in new_texts[: len(micro_batch_req_id_map)]]
                send_intermediate_predict_response(
                    formatted_responses,
                    micro_batch_req_id_map,
                    "Intermediate Prediction success",
                    200,
                    self.context,
                )
    
            thread.join()
    
            return [""] * len(micro_batch_req_id_map)
        else:

            generation_kwargs = dict(
                tokenized_input,
                **parameters
            )
    
            generated_sequences = self.model.generate(**generation_kwargs)
    
            # remove input tokens from response
            cleaned_generated_sequences = [gs[len(tokenized_input["input_ids"][idx]):] for idx, gs in enumerate(generated_sequences)]
           
            return cleaned_generated_sequences

    def postprocess(self, data):
        logger.debug("IN POST PROCESS")
        decoded_sequences = self.tokenizer.batch_decode(data, skip_special_tokens=True)
        
        # Post process gets called for both streaming and non-streaming requests
        # The streaming processor is looking for a \n as an indication that all bytes for
        # that message have been received, which is why it's added to the end of the response
        responses = [f"{json.dumps({'outputs': [seq] })}\n"  for seq in decoded_sequences]
        
        return responses   
    
    def handle(self, data, context):
        try:
            super().handle(data, context)
        except Exception as e:
            raise PredictionException("Unable to process request. " + str(e))


    def get_micro_batch_req_id_map(self, micro_batch_idx: int):
        start_idx = micro_batch_idx * self.handle.micro_batch_size
        micro_batch_req_id_map = {
            index: self.context.request_ids[batch_index]
            for index, batch_index in enumerate(
                range(start_idx, start_idx + self.handle.micro_batch_size)
            )
            if batch_index in self.context.request_ids
        }

        return micro_batch_req_id_map

***

Create the [TorchServe](https://pytorch.org/serve/large_model_inference.html) `model-config.yaml` file. This file is used by TorchServe model server and defines runtime parameters for the model. The parameters under the `handler` section are used by the `inference.py` file at runtime to initialize the model.

- handler
    - `model_checkpoint_dir` - the location of the model weights folder under the model path.
    - `amp` - runtime model parameter data type
    - `tp_degree` - tensor parallelism degree, the number of neuron cores available. Inf2.xlarge has 1 accelerator with 2 cores
    - `max_length` - max allowed generation length for the model
    
***


In [None]:
%%writefile build/inference/model-config.yaml

minWorkers: 1
maxWorkers: 1
maxBatchDelay: 10
responseTimeout: 10800
batchSize: 1

handler:
    model_checkpoint_dir: "llama2-split"
    amp: "f16"
    tp_degree: 2 
    max_length: 4096

micro_batching:
    micro_batch_size: 1 
    parallelism:
        preprocess: 2
        inference: 1
        postprocess: 2

***
Create the models `requirements.txt` file to be installed during deployment. The tokenizer requires the `sentencepiece` package.

***


In [None]:
%%writefile build/inference/requirements.txt

sentencepiece==0.1.99

***

Define the `model_name` constant. 

<div class="alert alert-block alert-warning"><b>Important: </b> Please refrain from altering the value, as other parts of the workshop rely on it being set to <b>llama-2-7b</b>. </div>

***

In [None]:
model_name = "llama-2-7b"

***

Run the [torch-model-archiver](https://github.com/pytorch/serve/blob/master/model-archiver/README.md) to create the files and folder layout TorchServe expects.

***


In [None]:
!cd build/inference && torch-model-archiver --model-name {model_name} --version 1.0 --handler inference.py -r requirements.txt --config-file model-config.yaml --archive-format no-archive --force

print("Model package directory contents: \n")

!tree "./build/inference/llama-2-7b/"

## Update the model archive

Read the model_data.json file created from the prepare model notebook to get the S3 uri of the model weights. If your workshop is providing shared model weights, you will be prompted to enter in the S3 uri location provided to you.


In [None]:
with open("model_data.json", "r") as file:
    model_data = json.load(file)
model_data

***

Copy the inference files into the model folder in S3, or, enter in the S3 path provided to you.

<div class="alert alert-block alert-warning"><b>Note: </b> Upon executing the subsequent code cell, should an input prompt appear, be aware that the focus will automatically shift to the cell just executed. To proceed, enter the requisite value in the input box. Subsequently, it's imperative to manually select the cell immediately below the input box; failing to do so will result in the re-rendering of the input box, thereby erasing the previously entered value. </div>

***

In [None]:
# copy the files to S3
model_s3_uri = model_data["S3DataSource"]["S3Uri"]
if model_s3_uri != "":
    !aws s3 cp build/inference/{model_name}/ {model_s3_uri} --recursive
    s3_location_input = None
else:
    # Use IpWidgets Text to ask for the s3 model location then update the S3Uri
    
    print("Input the S3 location of the model artifact:")
    print(" * Remember, after entering the value in the text box, you will need to manually select the next cell to continue running the notebook.")
    s3_location_input = widgets.Text(
        placeholder="Enter S3 Uri",
        description="S3:",
        disabled=False
    )
    display(s3_location_input)

***

Validate the S3 model path.

***

In [None]:
# If s3 location was entered, validate format and update model_data object
if s3_location_input and s3_location_input.value != "":
    s3_location = s3_location_input.value

    if not s3_location.endswith("/"):
        s3_location += "/"
    model_data["S3DataSource"]["S3Uri"] = s3_location

# asset s3uri is not empty
assert (
    model_data["S3DataSource"]["S3Uri"] != ""
), "Model S3 source is missing. Please provide a S3 location."

print(f"S3 model uri {model_data['S3DataSource']['S3Uri']}")

***

When the model weights are combined with the TorchServe files, the directory structure will look like the image below.

![](./assets/images/model-package.png)

***

## Create and deploy model endpoint


Create the SageMaker model and deploy the endpoint. 

<div class="alert alert-block alert-warning"><b>Important: </b> Endpoint deployment takes around 10 minutes, and since we won't use the endpoint until lab 5, you don't need to wait for the cell to finish before moving on. So after running the cell below, go ahead and move on to the next lab.
</div>

In [None]:
%%time

model = Model(
    name=sagemaker.utils.name_from_base(model_name, short=True),
    model_data=model_data,
    image_uri=image_uri,
    role=role,
    sagemaker_session=sagemaker_session,
    env={
        "TS_INSTALL_PY_DEP_PER_MODEL": "true",
        "FI_EFA_FORK_SAFE": "1",  # https://awsdocs-neuron.readthedocs-hosted.com/en/latest/neuron-runtime/nrt-troubleshoot.html
    },
    # for production it is important to define vpc_config and use a vpc_endpoint
    # vpc_config={
    #    'Subnets': ['<SUBNET1>', '<SUBNET2>'],
    #    'SecurityGroupIds': ['<SECURITYGROUP1>', '<DEFAULTSECURITYGROUP>']
    # }
)
model._is_compiled_model = True

model.deploy(
    endpoint_name=model_name,
    initial_instance_count=1,
    instance_type="ml.inf2.xlarge",
    volume_size=64,
    model_data_download_timeout=3600,
    container_startup_health_check_timeout=600,
    region=region,
)
endpoint_name = model.endpoint_name
endpoint_name

Remember, after clicking run on the cell above, to move to the next lab.

## Notebook complete

You've deploy Llama 2 Chat on Inferenia 2. Move to the next notebook to dive deep into text embeddings and semantic search where we will learn about information retrieval for private data.