# Deploying a Hugging Face model to Google Vertex AI for Bulk Embedding Inference

Inspired by the [GCP tutorial]( https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/community-content/pytorch_text_classification_using_vertex_sdk_and_gcloud/pytorch-text-classification-vertex-ai-train-tune-deploy.ipynb) we will deploy a `sentence-transformers` model on a [Vertex AI](https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api) endpoint. We will use [TorchServe](https://pytorch.org/serve/) to serve a Hugging Face model available on the [Hub](hf.co). To accelerate inference we will also use features from the `optimum` [library](https://github.com/huggingface/optimum) to apply graph optimization and/or quantization to the model.

**Resources**:
   - On batching:
        - https://huggingface.co/docs/transformers/main_classes/pipelines#pipeline-batching
   - On TorchServe:
        - https://github.com/pytorch/serve/tree/master/examples/Huggingface_Transformers
   - On Locust:
        - https://medium.com/@tferreiraw/performing-load-tests-with-python-locust-io-62de7d91eebd
        - https://medium.com/@ashmi_banerjee/3-step-tutorial-to-performance-test-ml-serving-apis-using-locust-and-fastapi-40e6cc580adc

### Set up your local development environment

1. Follow the Google Cloud guide to [setting up a Python development environment](https://cloud.google.com/python/docs/setup) 
2. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/) 
3. Create a virtual environment (virtualenv, pyenv) with Python 3 (<3.9) and activate the environment
4. Launch jupyter notebook from this environment


In [None]:
https://cloud.google.com/products/calculator/

### Install packages

In [2]:
# !pip -q install --upgrade google-cloud-aiplatform #Vertex AI sdk
!pip -q install --upgrade transformers, datasets, locust, locust-plugins

### Set up your Google Cloud project

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager)
1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project)
1. Enable following APIs in your project required for running the tutorial
    - [Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com)
    - [Cloud Storage API](https://console.cloud.google.com/flows/enableapi?apiid=storage.googleapis.com)
    - [Container Registry API](https://console.cloud.google.com/flows/enableapi?apiid=containerregistry.googleapis.com)
    - [Cloud Build API](https://console.cloud.google.com/flows/enableapi?apiid=cloudbuild.googleapis.com)
   
1. Enter your project ID in the cell below. Then run the cell to make sure the Cloud SDK uses the right project for all the commands in this notebook.

### Authenticate to gcloud

 1. In the Cloud Console, go to the [**Create service account key** page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).,
 2. Click **Create service account**.,
 3. In the **Service account name** field, enter a name, and click **Create**,
 4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type \"Vertex AI\" into the filter box, and select **Vertex AI Administrator**. Type \"Storage Object Admin\" into the filter box, and select **Storage Object Admin**.
 5. Click *Create*. A JSON file that contains your key downloads to your local environment.
 6. Enter the path to your service account key as the `GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell.

In [3]:
# %env GOOGLE_APPLICATION_CREDENTIALS ./keys/huggingface-ml-e974975230cc.json #change to your service account key

In [1]:
# Get your Google Cloud project ID using google.auth
import google.auth

_, PROJECT_ID = google.auth.default()
print("Project ID: ", PROJECT_ID)

#Or set it yourself manually
# PROJECT_ID = "huggingface-ml"

Project ID:  huggingface-ml


### Create a cloud storage bucket

In [2]:
BUCKET_NAME = "gs://andrew-reed-bucket"  # <---CHANGE THIS TO YOUR BUCKET
REGION = "us-east4"

**If the bucket doesn't exist, run the following:**

In [3]:
# ! gsutil mb -l $REGION $BUCKET_NAME

Access the content of the bucket

In [4]:
! gsutil ls -al $BUCKET_NAME

### Imports

In [5]:
import torch
import base64
import json
import os
import random
import sys
import transformers

import google.auth
from google.cloud import aiplatform
from google.cloud.aiplatform import gapic as aip
from google.protobuf.json_format import MessageToDict

In [6]:
print(f"Notebook runtime: {'GPU' if torch.cuda.is_available() else 'CPU'}")
print(f"PyTorch version : {torch.__version__}")
print(f"Transformers version : {transformers.__version__}")

Notebook runtime: GPU
PyTorch version : 2.0.1+cu118
Transformers version : 4.32.1


In [7]:
APP_NAME = "test_bge_embedder"

## Deployment

#### *Overview*

Deploying a PyTorch model on [Vertex AI Predictions](https://cloud.google.com/vertex-ai/docs/predictions/getting-predictions) requires to use a custom container that serves online predictions. You will deploy a container running [PyTorch's TorchServe](https://pytorch.org/serve/) tool in order to serve predictions from a fine-tuned sentence transformer model `msmarco-distilbert-base-tas-b` available in [Hugging Face Transformers](https://huggingface.co/sentence-transformers/msmarco-distilbert-base-tas-b). 

Essentially, to deploy a PyTorch model on Vertex AI Predictions following are the steps:
1. Package the trained model artifacts including [default](https://pytorch.org/serve/#default-handlers) or [custom](https://pytorch.org/serve/custom_service.html) handlers by creating an archive file using [Torch model archiver](https://github.com/pytorch/serve/tree/master/model-archiver),
2. Build a [custom container](https://cloud.google.com/vertex-ai/docs/predictions/custom-container-requirements) compatible with Vertex AI Predictions to serve the model using Torchserve
3. Upload the model with custom container image to serve predictions as a Vertex AI Model resource,
4. Create a Vertex AI Endpoint and [deploy the model](https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api) resource

### Save model locally

In [8]:
MODEL_DIR = "./"+APP_NAME+"_predictor"
os.makedirs(MODEL_DIR, exist_ok=True)

In [9]:
from transformers import AutoTokenizer, AutoModel

model_id = "BAAI/bge-large-en"

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModel.from_pretrained(model_id)

pt_save_directory = os.path.join(MODEL_DIR, "model")

tokenizer.save_pretrained(pt_save_directory)
model.save_pretrained(pt_save_directory)

Downloading (…)okenizer_config.json:   0%|          | 0.00/366 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/711k [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/720 [00:00<?, ?B/s]

Downloading model.safetensors:   0%|          | 0.00/1.34G [00:00<?, ?B/s]

In [10]:
from transformers import pipeline

def cls_pooling(pipeline_output):
    """
    Return the [CLS] token embedding
    """
    return [_h[0][0] for _h in pipeline_output]
        
        
tokenizer = AutoTokenizer.from_pretrained(pt_save_directory)
model = AutoModel.from_pretrained(pt_save_directory)

BATCH_SIZE = 4
pipe = pipeline("feature-extraction", model=model, tokenizer=tokenizer, batch_size=BATCH_SIZE)

input_texts = ["I like the new ORT pipeline", "blah blah blah", "Hello, I'm andrew"]
embeddings = cls_pooling(pipe(input_texts))

In [12]:
len(embeddings), len(embeddings[0])

(3, 1024)

### Create a custom model handler 

Please refer to the [TorchServe documentation](https://pytorch.org/serve/custom_service.html) for defining a custom handler.

In [105]:
# use int for value, False to disable batching
BATCH_SIZE = 32

In [106]:
%%bash -s $APP_NAME $BATCH_SIZE

# %%writefile test_bge_embedder_predictor/custom_handler.py
APP_NAME=$1
BATCH_SIZE=$2

cat << EOF > ./${APP_NAME}_predictor/custom_handler.py

import os
import json
import logging

import torch
from transformers import AutoModel, AutoTokenizer, pipeline

from ts.torch_handler.base_handler import BaseHandler

logger = logging.getLogger(__name__)

class SentenceTransformersHandler(BaseHandler):
    """
    The handler takes an input string and returns the embedding 
    based on the serialized transformers checkpoint.
    """
    def __init__(self):
        super(SentenceTransformersHandler, self).__init__()
        self.initialized = False
        self.batch_size = $BATCH_SIZE

    def initialize(self, ctx):
        """ Loads the model.onnx file and initialized the model object.
        Instantiates Tokenizer for preprocessor to use and a feature extraction pipeline
        """
        self.manifest = ctx.manifest

        properties = ctx.system_properties
        model_dir = properties.get("model_dir")
        self.device = torch.device("cuda:" + str(properties.get("gpu_id")) if torch.cuda.is_available() else "cpu")

        # Read model serialize/pt file
        serialized_file = self.manifest["model"]["serializedFile"]
        model_pt_path = os.path.join(model_dir, serialized_file)
        if not os.path.isfile(model_pt_path):
            raise RuntimeError("Missing the model.onnx or pytorch_model.bin file")
        
        # Load model
        self.model = AutoModel.from_pretrained(model_dir)
        logger.debug('Transformer model from path {0} loaded successfully'.format(model_dir))
        logger.info(f"model_pt_path: {model_pt_path}")
        logger.info(f"model_dir: {model_dir}")
        
        # Ensure to use the same tokenizer used during training
        self.tokenizer = AutoTokenizer.from_pretrained(model_dir, model_max_length=512)
        
        # Create an optimum pipeline
        # Use BetterTransformer for fused kernel + sparsity optimizations
        # https://medium.com/pytorch/bettertransformer-out-of-the-box-performance-for-huggingface-transformers-3fbe27d50ab2
        if self.batch_size:
            logger.info(f"----Loading pipeline with batch_size={self.batch_size}")
            self.pipeline = pipeline("feature-extraction",
                                     model=self.model, 
                                     tokenizer=self.tokenizer, 
                                     device=self.device, 
                                     accelerator="bettertransformer",
                                     batch_size=self.batch_size)
        else:
            logger.info(f"----Loading pipeline without batching")
            self.pipeline = pipeline("feature-extraction",
                                     model=self.model, 
                                     tokenizer=self.tokenizer, 
                                     device=self.device, 
                                     accelerator="bettertransformer")

        self.initialized = True

    def preprocess(self, requests):
        """ Preprocessing input request by tokenizing
            Extend with your own preprocessing steps as needed
            
            [{'data': b'I am creating an endpoint using TorchServe and HF transformers\n'}]
        """
        # print(f'{"---"*20}')
        # print(requests)
        # print(type(requests))
        # print(len(requests))
        # print(f'{"---"*20}')
        
        input_texts = []
        for idx, request in enumerate(requests):
            text = request.get("data")
            if text is None:
                text = request.get("body")
                
            text = text.decode('utf-8')
            input_texts.append(text)
        logger.info("Received text: '%s'", input_texts)
        
        return input_texts

    def inference(self, input_texts):
        """ Predict the class of a text using a trained transformer model.
        """
        
        def cls_pooling(pipeline_output):
            """
            Return the [CLS] token embedding
            """
            return [_h[0][0] for _h in pipeline_output]
        
        embeddings = cls_pooling(self.pipeline(input_texts))

        logger.info(f"Model embedded: {len(embeddings)}")
        return embeddings

    def postprocess(self, inference_output):
        return inference_output
    
EOF

### Create custom container image

**Create a Dockerfile with TorchServe as base image**

**NB**: to define the right Torchserve parameters such as `workers` please consult (https://github.com/pytorch/serve/blob/master/docs/performance_guide.md) 

In [107]:
%%bash -s $APP_NAME $BATCH_SIZE

APP_NAME=$1
BATCH_SIZE=$2

cat << EOF > ./${APP_NAME}_predictor/Dockerfile

FROM pytorch/torchserve:latest-gpu

# install dependencies
RUN python3 -m pip install --upgrade pip
RUN pip3 install transformers

USER model-server

# copy model artifacts, custom handler and other dependencies
COPY custom_handler.py /home/model-server/
COPY ./model/ / /home/model-server/

# create torchserve configuration file
USER root
RUN printf "\nservice_envelope=json" >> /home/model-server/config.properties
RUN printf "\ninference_address=http://0.0.0.0:7080" >> /home/model-server/config.properties
RUN printf "\nmanagement_address=http://0.0.0.0:7081" >> /home/model-server/config.properties
RUN printf "\nmanagement_address=http://0.0.0.0:7081" >> /home/model-server/config.properties
RUN if [ "$BATCH_SIZE" = False ]; then \
        : \
        else \
        printf '\nmodels={\
          "$APP_NAME": {\
            "1.0": {\
                "defaultVersion": true,\
                "marName": "$APP_NAME.mar",\
                "minWorkers": 1,\
                "maxWorkers": 8,\
                "batchSize": "$BATCH_SIZE",\
                "maxBatchDelay": 100,\
                "responseTimeout": 200\
            }\
          }}' >> /home/model-server/config.properties; \
    fi
    
# expose health and prediction listener ports from the image
EXPOSE 7080
EXPOSE 7081

# create model archive file packaging model artifacts and dependencies
RUN torch-model-archiver -f \
  --model-name=$APP_NAME \
  --version=1.0 \
  --serialized-file=/home/model-server/pytorch_model.bin \
  --handler=/home/model-server/custom_handler.py \
  --extra-files "/home/model-server/config.json,/home/model-server/tokenizer.json,/home/model-server/tokenizer_config.json,/home/model-server/special_tokens_map.json,/home/model-server/vocab.txt" \
  --export-path=/home/model-server/model-store

# run Torchserve HTTP serve to respond to prediction requests
CMD ["torchserve", \
     "--start", \
     "--ts-config=/home/model-server/config.properties", \a
     "--models", \
     "$APP_NAME=$APP_NAME.mar", \
     "--model-store", \
     "/home/model-server/model-store"]

EOF

echo "Writing ./${APP_NAME}_predictor/Dockerfile"

Writing ./test_bge_embedder_predictor/Dockerfile


In [108]:
APP_DIR = APP_NAME+"_predictor"

In [109]:
APP_NAME, APP_DIR

('test_bge_embedder', 'test_bge_embedder_predictor')

**Build container**

In [110]:
CUSTOM_PREDICTOR_IMAGE_URI = f"gcr.io/{PROJECT_ID}/pytorch_predict_{APP_NAME}"
print(f"CUSTOM_PREDICTOR_IMAGE_URI = {CUSTOM_PREDICTOR_IMAGE_URI}")

CUSTOM_PREDICTOR_IMAGE_URI = gcr.io/huggingface-ml/pytorch_predict_test_bge_embedder


In [111]:
!docker build \
  --tag=$CUSTOM_PREDICTOR_IMAGE_URI \
  ./$APP_DIR

Sending build context to Docker daemon  1.342GB
Step 1/16 : FROM pytorch/torchserve:latest-gpu
 ---> 04eef250c14e
Step 2/16 : RUN python3 -m pip install --upgrade pip
 ---> Using cache
 ---> 2f06486ed62f
Step 3/16 : RUN pip3 install transformers
 ---> Using cache
 ---> 4ed472215157
Step 4/16 : USER model-server
 ---> Using cache
 ---> 4fa0cd41a25e
Step 5/16 : COPY custom_handler.py /home/model-server/
 ---> 7580986c8753
Step 6/16 : COPY ./model/ / /home/model-server/
 ---> 6cb9c33fbc7b
Step 7/16 : USER root
 ---> Running in 1a10cec8c6bc
Removing intermediate container 1a10cec8c6bc
 ---> a91921ac1cff
Step 8/16 : RUN printf "\nservice_envelope=json" >> /home/model-server/config.properties
 ---> Running in 1dadb9c7aaf0
Removing intermediate container 1dadb9c7aaf0
 ---> 0933c09bbd31
Step 9/16 : RUN printf "\ninference_address=http://0.0.0.0:7080" >> /home/model-server/config.properties
 ---> Running in 61a0618d1592
Removing intermediate container 61a0618d1592
 ---> 2f9eb17850d4
Step 10/16 

**Test API locally**

In [112]:
!docker run -td --rm -p 7080:7080 --gpus all --name=$APP_NAME $CUSTOM_PREDICTOR_IMAGE_URI

d8c16e4de0f1b3fb868f1e89eebfb924b43584df12d8106820747e2d79b227dd


1. Health check

In [114]:
!curl http://localhost:7080/ping

{
  "status": "Healthy"
}


2. Send request

In [115]:
%%bash -s $APP_NAME

APP_NAME=$1

cat > ./${APP_NAME}_predictor/instances.json <<END
{ 
   "instances": [
     { 
       "data": {
         "b64": "$(echo 'I am creating an endpoint using TorchServe and HF transformers' | base64 --wrap=0)"
       }
     }
   ]
}
END

curl -s -X POST \
  -H "Content-Type: application/json; charset=utf-8" \
  -d @./${APP_NAME}_predictor/instances.json \
  http://localhost:7080/predictions/$APP_NAME/

{"predictions": [[-0.3966214656829834, 0.13638117909431458, -0.7998953461647034, 0.17386069893836975, -0.32081007957458496, 0.20749905705451965, -0.430697500705719, 0.18672336637973785, 0.6046302914619446, 0.37625569105148315, 0.8786728978157043, 0.08130515366792679, 0.7122514843940735, -0.5567383170127869, -0.3595656752586365, 0.13959753513336182, -0.7658876776695251, -0.6352641582489014, -0.38556528091430664, -0.08639071136713028, -0.19624273478984833, -0.370313823223114, -0.7243615984916687, -0.9205985069274902, -0.689978301525116, 0.7531246542930603, 0.053085941821336746, -0.04321198910474777, 1.5050652027130127, 1.1163926124572754, -0.7939869165420532, -0.8258238434791565, 0.2806147336959839, -1.0460847616195679, -0.4688354730606079, -0.8166561126708984, 0.4421796202659607, -0.7080443501472473, -0.9239137768745422, -0.642957866191864, 0.11196565628051758, 0.1299777328968048, 0.6359308362007141, -1.1601362228393555, -1.0760389566421509, 0.31392139196395874, -0.11758226901292801, -0

3. Run load test

In [116]:
MAX_SEQ_LEN = pipe.tokenizer.max_len_single_sentence

In [117]:
%%bash -s $APP_NAME $MAX_SEQ_LEN

APP_NAME=$1
MAX_SEQ_LEN=$2

cat > ./locustfile.py << END

import json
from base64 import b64encode
from locust import HttpUser, task


class BulkEncodeUser(HttpUser):
    
    @task
    def encode_text(self):
        test_input = "hello " * $MAX_SEQ_LEN
        instances = {
            "instances": [
                {"data": {"b64": b64encode(f"{test_input}".encode()).decode("utf-8")}}
            ]
        }

        self.client.post(
            "/predictions/${APP_NAME}/",
            headers={"Content-Type": "application/json; charset=utf-8"},
            json=instances,
        )
END

In [118]:
LOAD_TEST_RESULTS_DIR = "./load-test-results"
os.makedirs(LOAD_TEST_RESULTS_DIR, exist_ok=True)

In [None]:
%%time
!locust \
    --headless \
    --users 100 \
    --spawn-rate 100 \
    --iterations 1_000 \
    --host http://localhost:7080 \
    --csv=$LOAD_TEST_RESULTS_DIR/results \
    # --only-summary 

Notes on BGE-large:
- Without batching, we're at 1min 11sec to run 1000 requests at 81% GPU utilization on both GPU's
- With batchsize=8, we're at 1min 8sec to run 1000 requests a 60-80% GPU utilization on both GPU's
- With batchsize=32, we're at 1min 11sec to run 1000 requests a 60-80% GPU utilization on both GPU's

Hmmmm... need to look into this


3. Stop the container

In [None]:
!docker stop local_sbert_embedder_optimum

### Push image Container Registry

In [None]:
!docker push $CUSTOM_PREDICTOR_IMAGE_URI

### Create model and endpoint to VertexAI

We create a model resource on Vertex AI and deploy the model to a Vertex AI Endpoints. You must deploy a model to an endpoint before using the model. The deployed model runs the custom container image to serve predictions.

**Initialize the Vertex AI SDK for Python**

In [None]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

**Create a Model resource with custom serving container**

In [None]:
VERSION = 1
model_display_name = f"{APP_NAME}-v{VERSION}"
model_description = "PyTorch based sentence transformers embedder with custom container"

MODEL_NAME = APP_NAME
health_route = "/ping"
predict_route = f"/predictions/{MODEL_NAME}"
serving_container_ports = [7080]

In [None]:
model = aiplatform.Model.upload(
    display_name=model_display_name,
    description=model_description,
    serving_container_image_uri=CUSTOM_PREDICTOR_IMAGE_URI,
    serving_container_predict_route=predict_route,
    serving_container_health_route=health_route,
    serving_container_ports=serving_container_ports,
)

model.wait()

print(model.display_name)
print(model.resource_name)

**Create an Endpoint for Model with Custom Container**

In [None]:
endpoint_display_name = f"{APP_NAME}-endpoint"
endpoint = aiplatform.Endpoint.create(display_name=endpoint_display_name)

**Deploy the Model to Endpoint**

See more on the [documentation](https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api).

To select the right machine type according to your budget select go to [Google Cloud Pricing Calculator](https://cloud.google.com/products/calculator) and [Finding the ideal machine type](https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#finding_the_ideal_machine_type).

In [None]:
traffic_percentage = 100
machine_type = "n1-standard-8"
deployed_model_display_name = model_display_name
min_replica_count = 1
max_replica_count = 3
sync = True

model.deploy(
    endpoint=endpoint,
    deployed_model_display_name=deployed_model_display_name,
    machine_type=machine_type,
    traffic_percentage=traffic_percentage,
    min_replica_count=min_replica_count,
    max_replica_count=max_replica_count,
    sync=sync,
)

### Invoking the Endpoint with deployed Model using Vertex AI SDK to make predictions

**Get the endpoint id**

In [None]:
endpoint_display_name = f"{APP_NAME}-endpoint"
filter = f'display_name="{endpoint_display_name}"'

for endpoint_info in aiplatform.Endpoint.list(filter=filter):
    print(
        f"Endpoint display name = {endpoint_info.display_name} resource id ={endpoint_info.resource_name} "
    )

endpoint = aiplatform.Endpoint(endpoint_info.resource_name)

In [None]:
endpoint.list_models()

**Formatting input for online prediction**

In [None]:
test_instances = [
    b"This is an example of model deployment using a sentence transformers model and optimum",
]*100

In [None]:
len(tokenizer(test_instances[0])["input_ids"])

In [None]:
#test_instances

In [None]:
%%time
print("=" * 100)
for instance in test_instances:
    print(f"Input text: \n\t{instance.decode('utf-8')}\n")
    b64_encoded = base64.b64encode(instance)
    test_instance = [{"data": {"b64": f"{str(b64_encoded.decode('utf-8'))}"}}]
    print(f"Formatted input: \n{json.dumps(test_instance, indent=4)}\n")
    prediction = endpoint.predict(instances=test_instance)
    #print(f"Prediction response: \n\t{prediction}")
    print("=" * 100)

In [None]:
%%time
prediction = endpoint.predict(instances=test_instance)