# Compiling and Deploying HuggingFace Pretrained CLIP on Inf2 on Amazon SageMaker

## Overview

In this notebook, we will compile and deploy a pretrained CLIP model from HuggingFace Transformers, using the [AWS Deep Learning Containers](https://github.com/aws/deep-learning-containers). We use AWS Deep Learning Containers as they offer a convenient, pre-configured environment with necessary deep learning framework and AWS Neuron dependencies. 

CLIP (Contrastive Language–Image Pretraining) is a model designed to understand and relate text and image data. It is especially known to perform well on zero-shot classification tasks, where it can classify images into categories it hasn't been explicitly trained on, by understanding the relationship between text and image content. You can find more information on the model architecture in this paper: https://arxiv.org/abs/2103.00020, and a full list of CLIP models on this page: https://huggingface.co/models?sort=trending&search=clip.

By the end of this tutorial, you will have a clear understanding of how to optimize a CLIP model for AWS infrastructure, including model compilation, and deployment on Inferentia 2.

This Jupyter Notebook was tested on a ml.t3.medium SageMaker Notebook instance with PyTorch 2.1.2 Python 3.10 CPU kernel, in the us-east-1 region. 

## Install Dependencies:

In [None]:
%pip install --upgrade pip

In [None]:
%pip install --upgrade sagemaker boto3 awscli

## Compile the model into an AWS Neuron optimized TorchScript

In the following section we will compile the model into an AWS Neuron optimized TorchScript. We start with a src directory where we create the following files:

- A **'compile_clip.py'** file: In this script we perform several key tasks:
    - Loading the CLIP Model: we import and load the CLIP model (specifically, the 'openai/clip-vit-large-patch14' version) from the Hugging Face Transformers library. 
    - Sample input preparation: we retrieve a sample input comprising both text and image data from the CIFAR100 dataset. The CIFAR100 dataset is a collection of images classified into 100 different classes, and these classes provide the textual component.
    - Processing the sample Input: We then process this input using the CLIPProcessor to ensure the data is in the correct tensor format for the model. This step involves tokenizing the text and appropriately formatting the image data.
    - Model Compilation for AWS Neuron: Using torch_neuronx.trace(), we compile the CLIP model for optimized execution on AWS Neuron hardware. This step is crucial for deploying the model on AWS Inferentia chips.
    - Saving the Optimized Model: Finally, the compiled model is saved as a TorchScript file, allowing the model to be executed in a variety of environments.

 

In [None]:
import os

os.makedirs("src", exist_ok=True)

In [None]:
%%writefile src/compile_clip.py
import os
import tarfile
import torch
import torch_neuronx
from transformers import CLIPProcessor, CLIPModel
from torchvision.datasets import CIFAR100

# Disable parallelism in Hugging Face tokenizer
os.environ["TOKENIZERS_PARALLELISM"] = "false"

model_name = 'openai/clip-vit-large-patch14'


if __name__=='__main__':
    # Create the input preprocessor and model
    processor = CLIPProcessor.from_pretrained(model_name) 
    model = CLIPModel.from_pretrained(model_name, return_dict=False)
    model.eval()

    # Load cifar100 dataset
    cifar100 = CIFAR100(root=os.path.expanduser("~/.cache"), download=True, train=False)

    # Get text captions for the model to classify the image against
    text = cifar100.classes

    # Get sample input (the first image in the CIFAR-100 dataset)
    image = cifar100[0][0]

    # Process sample input text and image data from the CIFAR100 dataset
    inputs = processor(text=text, images=image, return_tensors="pt", padding=True)
    
    # Example input that the function will use to trace the model's execution
    example = (inputs['input_ids'], inputs['pixel_values'])

    # Compile PyTorch model for optimized execution on inf2, using the AWS Neuron SDK
    model_neuron = torch_neuronx.trace(model, example, compiler_args='--enable-saturate-infinity --target=inf2')

    # Save the TorchScript for inference deployment
    torch.jit.save(model_neuron, '/tmp/neuron_compiled_model.pt')
    with tarfile.open(os.path.join("/opt/ml/model/", 'model.tar.gz'), "w:gz") as tar:
        tar.add('/tmp/neuron_compiled_model.pt', "neuron_compiled_model.pt")
    

## Get Sagemaker execution role


In [None]:
import sagemaker

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
sess_bucket = sess.default_bucket()

prefix = "inf2_compiled_model"

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess_bucket}")
print(f"sagemaker session region: {sess.boto_region_name}")

## Create Pytorch estimator 

In this section we create a Pytorch estimator, with the Pytorch class from the 'sagemaker.pytorch' module. This estimator is a high-level abstraction for running PyTorch jobs in SageMaker, simplifying the process of training and deploying PyTorch models. It allows you to execute a custom Python script (compile_clip.py in this case, including the steps to compile the CLIP model using AWS Neuron) on a SageMaker-managed instance. In this case we compile the model with the ml.trn1.2xlarge instance.

Configuration of the Estimator:
- entry_point: The path to your Python script (compile_clip.py) that contains the code for compiling the CLIP model.
- source_dir: Specifies the directory (src) where additional code and files related to the entry_point script are located.
- role, sagemaker_session: These parameters pass the AWS role and the SageMaker session information, respectively, which are essential for accessing AWS resources.
- instance_count: Set to 1, indicating that the job will run on a single instance.
- output_path: Specifies the S3 bucket path where the output of the job (the compiled model) will be stored.
- disable_profiler and disable_output_compression: These are specific configurations to control the SageMaker job behavior, like disabling the built-in profiler and output compression for efficiency or debugging purposes.
- image_uri: This specifies the Docker image to be used for the compiling job. It points to an AWS Deep Learning Container image with PyTorch and the AWS Neuron SDK, optimized for model compilation and training. Make sure to pick the latest version of your Deep Learning Container from: https://github.com/aws/deep-learning-containers/blob/master/available_images.md  
- volume_size: Defines the size of the EBS volume attached to the instance.

estimator.fit() initiates the SageMaker training job, which will take around 10 minutes to complete. It will use the configurations specified above to launch a SageMaker training instance, run your compile_clip.py script on this instance, and output the results to the specified S3 path. 


In [None]:
from sagemaker.pytorch import PyTorch

instance_type = "ml.trn1.2xlarge"

estimator = PyTorch(
    entry_point="compile_clip.py",
    source_dir="src",
    role=role,
    sagemaker_session=sess,
    instance_count=1,
    instance_type=instance_type,
    output_path=f"s3://{sess_bucket}/{prefix}",
    disable_profiler=True,
    disable_output_compression=True,
    image_uri=f"763104351884.dkr.ecr.{sess.boto_region_name}.amazonaws.com/pytorch-training-neuronx:2.1.2-neuronx-py310-sdk2.18.0-ubuntu20.04",
    volume_size=128,
)

estimator.framework_version = "1.13.1"  # workround when using image_uri

In [None]:
%%time
estimator.fit()

## Deploy Container and run inference based on the pretrained model

To deploy a pretrained PyTorch model with custom inference script, you will create a PyTorchModel object and set a different entry_point.

The entry_point will be the inference script (inference.py). The inference.py script contains several key functions that SageMaker invokes during the inference process:
- **model_fn**: This function is called once when the SageMaker endpoint is first started. It loads the model from the provided directory (typically from the path where model artifacts are unarchived) and returns the model object.
- **input_fn**: Each time an inference request is made, this function is invoked to process the incoming data (e.g., JSON payload, images) into a format that the model can work with.
- **predict_fn**: After input_fn, the predict_fn function is called with the processed data and the model loaded by model_fn. This is where the actual inference (prediction) happens.
- **output_fn**: Finally, the output_fn function formats the output of predict_fn into the response format that will be returned to the client.

After the inference script is prepared, you use the **PyTorchModel** class from the SageMaker Python SDK to create a model object. This object requires the S3 URI of the compiled model artifacts, the role for SageMaker to access AWS resources, and the Docker image URI for the inference container. The entry_point parameter points to your inference.py script, which contains the logic for handling inference requests.

Lastly, the deploy method of the PytorchModel object is used to create a SageMaker Endpoint -- a hosted prediction service that we can use to perform inference.

In [None]:
import os

os.makedirs("code", exist_ok=True)

In [None]:
%%writefile code/inference.py

import os
import io
import json
import base64

import torch
import torch_neuronx

from PIL import Image
from transformers import CLIPProcessor

os.environ["TOKENIZERS_PARALLELISM"] = "false"

JSON_CONTENT_TYPE = 'application/json'

def model_fn(model_dir):
    """Loads the model from the provided directory."""
    model_file = os.path.join(model_dir, 'neuron_compiled_model.pt')
    model_neuron = torch.jit.load(model_file)
    return model_neuron

def input_fn(serialized_input_data, content_type=JSON_CONTENT_TYPE):
    """Processes incoming data into a format the model can work with."""
    if content_type == JSON_CONTENT_TYPE:
        input_data = json.loads(serialized_input_data)
        
        base_64_img_str = input_data['image']
        image = Image.open(io.BytesIO(base64.decodebytes(bytes(base_64_img_str, "utf-8"))))
        text = input_data['candidate_labels']
        
        return (image, text)

    else:
        raise Exception('Requested unsupported ContentType in Accept: ' + content_type)
        return

def predict_fn(input_data, models):
    """Takes the model and input data, and generates the prediction."""
    model_neuron = models
    processor = CLIPProcessor.from_pretrained('openai/clip-vit-large-patch14')
    image, text = input_data
    
    inputs = processor(text=text, images=image, return_tensors="pt", padding=True)
    input_data = (inputs['input_ids'], inputs['pixel_values'])
    
    output_neuron = model_neuron(*input_data)
    
    softmax_probs = output_neuron[0][0].softmax(dim=-1)
        
    label_probabilities = {text[i]: 100 * prob.item() for i, prob in enumerate(softmax_probs)}
    sorted_label_probabilities = dict(sorted(label_probabilities.items(), key=lambda item: item[1], reverse=True))
    formatted_label_probabilities = {label: f"{prob}%" for label, prob in sorted_label_probabilities.items()}
    
    return formatted_label_probabilities

def output_fn(prediction_output, accept=JSON_CONTENT_TYPE):
    """formats the output of predict_fn into the response format that will be returned to the client."""
    if accept == JSON_CONTENT_TYPE:
        return json.dumps(prediction_output), accept

    raise Exception('Requested unsupported ContentType in Accept: ' + accept)

Path of compiled pretrained model in S3:

In [None]:
s3_model_uri = f"{estimator.model_data['S3DataSource']['S3Uri']}model.tar.gz"

Note, the **image_uri** is again from the [Neuron Containers](https://github.com/aws/deep-learning-containers/blob/master/available_images.md#neuron-containers). This is a Docker image that is optimized for running PyTorch inference on AWS Neuron. This image includes the necessary dependencies and configurations for PyTorch and AWS Neuron SDK.

In [None]:
from sagemaker.pytorch.model import PyTorchModel

ecr_image = f"763104351884.dkr.ecr.{sess.boto_region_name}.amazonaws.com/pytorch-inference-neuronx:2.1.2-neuronx-py310-sdk2.18.0-ubuntu20.04"

pytorch_model = PyTorchModel(
    model_data=s3_model_uri,
    role=role,
    source_dir="code",
    entry_point="inference.py",
    image_uri=ecr_image,
)

# Let SageMaker know that we've already compiled the model via neuron-cc
pytorch_model._is_compiled_model = True

The arguments to the deploy function allow us to set the number and type of instances that will be used for the Endpoint.

Here you will deploy the model to a single **ml.inf2.xlarge** instance.
It may take 6-10 min to deploy.

In [None]:
%%time

predictor = pytorch_model.deploy(
    instance_type="ml.inf2.xlarge",
    initial_instance_count=1,
)

In [None]:
print(predictor.endpoint_name)

## Perform inference on your deployed endpoint. 

In order to perform inference on the deployed endpoint, we will need to convert the image we want to classify into a base64 encoded string, with the **image_to_base64** function. It can handle both a filepath (a string pointing to an image file) and a PIL Image object. 
This encoding is necessary because the inference request needs to be serialized into a JSON format before we send it to the endpoint. 


In [None]:
import base64
import io
import os

from PIL import Image


def image_to_base64(img) -> str:
    """Convert a PIL Image or local image file path to a base64 string"""
    if isinstance(img, str):
        if os.path.isfile(img):
            with open(img, "rb") as f:
                return base64.b64encode(f.read()).decode("utf-8")
        else:
            raise FileNotFoundError(f"File {img} does not exist")

    elif isinstance(img, Image.Image):
        buffer = io.BytesIO()
        img.save(buffer, format="PNG")
        return base64.b64encode(buffer.getvalue()).decode("utf-8")

    else:
        raise ValueError(f"Expected str (filename) or PIL Image. Got {type(img)}")

In [None]:
# Take example image from CIFAR-100
image = Image.open(
    io.BytesIO(
        base64.decodebytes(
            bytes(
                "iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAIAAAD8GO2jAAAGUUlEQVR4nGVW3Y5cRxms7+s+8+PZtXdNHMexY0wimaAQCXFhuEVccoHEC3DBK/ESPAL3kEiJFGQkB5tYDt4YO95dj3e8Oztzzumvios+M15gRpqfMzpdXfVVVY/94Y8P3LJEwDA8DAZJETF/efDs689OXjxavHiE0jaT3TfzJ9nHP7j5M0tN6Xu47d28u3frp/s3P5zuXIUlgyTVBcnIo+RmDhlsCwDAKI5yunHro6vXbp6fzp8/+vLxF3+K9TI3MyfN0S5f0RKglw//cjZ/5qPfXr581T0JlCg5zBTKTU7uSRTM7L8wkiABo/Huzu4uGEff/f3lN5+JnTydvn46nu6PR5PSLpvZ/u61O9dvfjiZTCUKeWBgTrfcZHM3CTDfiLSFkQQJkq7fuvOTe7+ZP3+wXh416dJqcdivlmgmsT4bT/d2lneabKNRJgtkkiQzszDPTZPMk6QqUX25wKMCYGx+5+OfL3/9+8X3j7vz0zevnkUpo8kVAbP9G7d//Mudnd0mG5kgQJAEWAJz05gNDCqA/R8JqzA7l2f3fvU7SB1Lt1pGVzw1pe9Csbv/TvLMCDFJAiASMJrnUTZLFWB4Aibgf0jUd8ABGyFjdgkwkdUxQUKCOyXJIIEOIQy5yebZJcIMkJkBDgMEQXj7yTa06mBKJS1KUOMmQJJogiRTyGBhlscjeBIHm1aV6rrbjVuF2jq4zlBVa0C0jRcgV71Og0kGDRKRMt9KL0BmEAzDAAS8RZGsjlEVoK7ILRJFVggT8yTDXHTBTaI0iLKZ9mbZ7TcNHqkAAxJB31yTBNSFzJFnDd2NGGZGiVXFbe60IQWIqDfrwi8C6tZp9VZSMhCCocsNiltd0ZEggYIMNtxub0nI5MOM31KBRNBUgrSIYZMBBYPt+ixn0pySQG1DZheFqRHUZsCbMhNEMUIlpCAiwB5dtz59c3T4bL1adm1p1+c5Su8aqmijxkXT2NsUbMaqQQj2fb/uSldoiIy+9H0pPH3z+uGDrx7+42+vX71cn5/lrg8PUDJ4XYjbqEPVRYCZBNaLkFgi+r5ftV1bglH68+P7X/x5Pl/Mruz0xY6OXhweHiwXJ+7KUYoSSG3zW0t0cAMMGsxHimQp0Zfo+1L6QrFbL7599PnBk/tHLx93qzKdjNxsverevTx576NPD4/mue87Y5CS+WaDW5EB2wKIIskIsrBfL5P1u9NRx+7qLOPW7Q9u3Si9mjxOCX27nmbsTJu/fv5VXrdr80xpqCBdGMSmYoeCrI0hmbMZN2Q67dDZlekH9/avreYnc3Z9b1qV0kxXOZ0++fab714c5a7rakdtfO2sjufFBFSXqaoYIqUSioiI6Ps2uhYs0Z+beRL7wuOuSZPdvVmTz9etWSEpWC+GzGBkJWRmmzDV1A2tK9XQS1QQgPtoNJZk7iUi5TGjvP++rv/i0/zs+Mxg7ubJaXC3ZElSSkOUBZAyq8AcKFntH/WMIMOy8sSVAJgFXJ7UGc/PS85ZycxAM2hwYwgCbfh3ULMtiIPNQgLq0U5SUQsGKYYycYqFVMt/Pvk+t12Bp6AAmilkrHM2DFWKQRAOBThEIkhC2jpMohAMDmXar1698Gx52XaCh2wbJgCBWnkDmGw4H+qe63SG+nvbz9V4NAiwkVaX4th2c16cdzAnBPONuhSMqt0pXghF9drQy5vzdGDCbVtHiTJB3L1x41+vDnLb9zQPklviAZjLkdzczH3Ya4Ri3UUQSJ7N3IIETFSQMBkBkGaKWAmjcXO6WOSj+YJw1YPGzJK5u5u5W0CkRALMyV0oXQHMUpQiSixRC5KQmaIoIXZSG21r5vuz6d0fXsvL1bnnnNxz9pQSDAYyWNpChkwmGdlTkLEQySyBhGQGpeQwM9GTj73caY6v6/iU63Xhu6OP3/vkdp5Nk5mJEV3blmAJkjCTaG5e0yCQhAFKOehk45rGcurr00W3Kmwskjez1K2X/37h3NmZvTMdlZN5BvP5yZwRLAGzbaDMYMlSTiJIutc/AMgq13Q8Kmdd6fuz15evjLFsF4cnq67du7yzHk1evjkb5XTzRvrkR9evXEr3vz7Iq9evILOUUpNrsXlKLMG20OoYaQgzQmlSjtuzp+FpsWoXZ210V8fj5G4t/ODwdR8yIDn292alj6fPT0D7D+4UwZGygvYMAAAAAElFTkSuQmCC",
                "utf-8",
            )
        )
    )
)

image

Since in the input_fn we declared that the incoming requests are json-encoded, we need to use a json serializer, to make sure the data is converted to JSON format before sending it to the model. 

Also, we declared the return content type to be a JSON string, so we need to use a json deserializer to parse the response.

In [None]:
predictor.serializer = sagemaker.serializers.JSONSerializer()
predictor.deserializer = sagemaker.deserializers.JSONDeserializer()

In [None]:
from torchvision.datasets import CIFAR100

# Define CIFAR100 dataset and candidate labels from the dataset.
cifar100 = CIFAR100(root=os.path.expanduser("~/.cache"), download=True, train=False)
candidate_labels = cifar100.classes

data = {
    "image": image_to_base64(image),
    "candidate_labels": candidate_labels,
}

predictor.predict(data)


## Benchmarking your endpoint

The following cells create a benchmark file (metrics.json) for your endpoint. We first define a Metrics class, providing methods to add the metrics and store them in a structured json file. 

Then, we define some helper functions: 

- **inference_latency** runs the endpoint request, collects client side latency and any errors.
- **random_image_from_cifar100** returns random images and candidate labels from the cifar100 dataset to be sent to the endpoint. It also returns the image in decoded form. 

Then, in the **benchmark** function, we use parallel processing (with Parallel and delayed from the joblib library) to simulate multiple clients sending inference requests to the endpoint. **number_of_clients** and **number_of_runs** define the parallelism level and the total number of inference requests, respectively.

We then calculate throughput (number of inferences per second) and various latency metrics (like the 0th, 50th, 90th, and 95th percentile latencies) based on the collected data.



In [None]:
class Metrics:
    """Provides methods to add metrics and then write them to a JSON file."""

    # The constructor initializes various default metrics, sets file names for storing metrics and loss breakdown, and then triggers the automatic collection of system-related metrics.
    def __init__(self, fname) -> None:
        self.metrics = {
            "KPI": {},
            "CFG": {"TestDef": {}, "TestOpt": {}},
            "Baselines": {"Hardware": {}, "Software": {}},
        }
        self.metrics_fname = fname

    # Metric Addition Methods:
    # add_testdef, add_testopt, and add_kpi methods allow adding custom test definitions, test options, and key performance indicators respectively to the metrics.
    def add_testdef(self, key: str, value: any):
        self.metrics["CFG"]["TestDef"][key] = value

    def add_testopt(self, key: str, value: any):
        self.metrics["CFG"]["TestOpt"][key] = value

    def add_kpi(self, key: str, value: any):
        self.metrics["KPI"][key] = value

    # writes the collected metrics into a JSON file
    def write(self):
        # overwrite/create file
        with open(self.metrics_fname, "w") as f:
            json.dump(self.metrics, f)

In [None]:
import random
import time

import numpy as np
from joblib import Parallel, delayed
from tqdm import tqdm
import json

In [None]:
def random_image_from_cifar100():
    """random_image_from_cifar100() randomly select an image from the cifar100 dataset together with the candidate labels, and returns a tuple with the image and candidate classes."""
    # Randomly select an image
    random_index = random.randint(0, len(cifar100) - 1)
    image, _ = cifar100[random_index]

    # Convert image to base64
    image_base64 = image_to_base64(image)

    return {
        "image": image_base64,
        "candidate_labels": candidate_labels,
    }

In [None]:
def inference_latency(model, *data):
    """Returns the latency of a model inference along with any errors."""
    error = False
    start = time.time()
    try:
        results = model(*data)
    except Exception as e:  # Catch the exception to log or handle it if needed
        error = True
        results = None
    end = time.time()
    return {"latency": (end - start) * 1000, "error": error, "result": results}

In [None]:
def benchmark(number_of_clients, number_of_runs):
    """Performs a benchmark test on a given endpoint."""

    # warmup
    for _ in range(8):
        data = random_image_from_cifar100()
        predictor.predict(data)
        print("warming up endpoint...")

    t = tqdm(range(number_of_runs), position=0, leave=True)

    # execute concurrent requests
    results = Parallel(n_jobs=number_of_clients, prefer="threads")(
        delayed(inference_latency)(predictor.predict, random_image_from_cifar100())
        for _ in t
    )

    avg_throughput = number_of_runs / t.format_dict["elapsed"]

    # compute metrics
    latencies = [res["latency"] for res in results]
    errors = [res["error"] for res in results]
    error_percentage = sum(errors) / len(errors) * 100
    percentiles = {
        "latency_p0": np.quantile(latencies, 0.0),
        "latency_p50": np.quantile(latencies, 0.50),
        "latency_p90": np.quantile(latencies, 0.90),
        "latency_p95": np.quantile(latencies, 0.95),
        "latency_p99": np.quantile(latencies, 0.99),
    }

    # Return metrics
    return {
        "throughput": avg_throughput,
        "percentiles": percentiles,
        "error_percentage": error_percentage,
    }


In [None]:
def post_metrics(metrics_result):
    """add metrics to json file in a structured format"""
    metrics = Metrics("metrics.json")
    metrics.add_testdef("ModelName", "CLIP")
    metrics.add_testopt(
        "NumThreads", metrics_result.get("n_threads", number_of_clients)
    )
    metrics.add_testopt(
        "NumInferences", metrics_result.get("inferences", number_of_runs)
    )
    metrics.add_kpi("Throughput", round(metrics_result["throughput"], 2))
    for key, value in metrics_result["percentiles"].items():
        metrics.add_kpi(key, round(value, 2))
    metrics.add_kpi("ErrorPercentage", round(metrics_result["error_percentage"], 2))

    metrics.write()

In [None]:
# Set variables for benchmark
number_of_clients = 2
number_of_runs = 1000

# Benchmark CLIP and post metrics. This creates a metrics.json file where the metrics can be reviewed.
benchmark_result = benchmark(number_of_clients, number_of_runs)
post_metrics(benchmark_result)

## Cleanup
Endpoints should be deleted when no longer in use, to avoid costs.

In [29]:
predictor.delete_model()
predictor.delete_endpoint()