# Deploy BertQA to Inferentia1 + SageMaker

http://huggingface.co/bert-large-uncased-whole-word-masking-finetuned-squad

**SageMaker Studio Kernel**: Python 3 (PyTorch 1.13 Python 3.9 CPU Optimized)  
**Instance**: ml.t3.medium

## 1) Update SageMaker SDK

In [None]:
%pip install -U sagemaker

## 2) Initialize session

In [None]:
import os
import boto3
import sagemaker

print(sagemaker.__version__)
if not sagemaker.__version__ >= "2.196.0": print("You need to upgrade or restart the kernel if you already upgraded")

os.makedirs("src", exist_ok=True)

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
region = sess.boto_region_name
bucket = sess.default_bucket()

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {bucket}")
print(f"sagemaker session region: {region}")

## 3) Create artifacts to compile & run the model

### 3.1) Dependencies file

In [None]:
%%writefile src/requirements.txt
--extra-index-url https://pip.repos.neuron.amazonaws.com
torchvision
numpy==1.22.2
accelerate==0.20.3
transformers==4.34.1
torch-neuron==1.13.1.2.9.6.0
neuron-cc[tensorflow]==1.20.3.0+ed6db4a2e

### 3.2) Python script for compiling and deploying the model

This script will download model weights from HF, compile each module to inf1 and save the compiled artifacts to S3

The envvar **NEURON_RT_NUM_CORES** controls how many NeuronCores are allocated per process. SageMaker can launch multiple processess in just one Endpoint. It means you can increase throughput of your endpoint by deploying multiple copies of your model to different cores.

In [None]:
%%writefile src/inference.py
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

import os
# To use one neuron core per worker
os.environ["NEURON_RT_NUM_CORES"] = os.environ.get("TP_DEGREE", "1")
import time
import json
import torch
import shutil
import argparse
import torch.neuron
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoConfig
from transformers import BertForQuestionAnswering, BertTokenizer
from filelock import Timeout, FileLock

lock_path='/tmp/new_packages.lock'
lock = FileLock(lock_path)

def model_fn(model_dir):
    print("Waiting for the lock acquire...")    
    lock.acquire()
    # this lock is necessary to load one worker at a time and avoid OOM
    t=time.time()
    print("Loading model...")
    # load tokenizer and neuron model from model_dir
    tokenizer = AutoTokenizer.from_pretrained(model_dir)
    model = torch.jit.load(os.path.join(model_dir, "model.pt"))
    print(f"Model loaded. Elapsed: {time.time()-t}s")
    lock.release()
    return model, tokenizer

def predict_fn(data, model_tokenizer):
    # destruct model, tokenizer and model config
    model, tokenizer = model_tokenizer

    # create embeddings for inputs
    # Process the input data (tokenization, input tensors, etc.)
    input_question = data.get("question")
    input_context = data.get("context")

    inputs = tokenizer.encode_plus(input_question, input_context, return_tensors="pt", max_length=384, padding='max_length', truncation=True)
    inputs_pr = inputs['input_ids'], inputs['attention_mask'], inputs['token_type_ids']
    input_ids = inputs["input_ids"]
    attention_mask = inputs["attention_mask"]
    # Perform inference with the compiled model
    with torch.no_grad():
        output = model(*inputs_pr)

    # Process the output as needed (e.g., extract answer)
    # Ensure you adapt this part to your specific model's output structure
    start_scores = output['start_logits']
    end_scores = output['end_logits']

    start_index = torch.argmax(start_scores)
    end_index = torch.argmax(end_scores)

    # Convert token indices to answer text
    answer_tokens = input_ids[0][start_index:end_index + 1]
    answer_text = tokenizer.decode(answer_tokens)

    return {"answer": answer_text}

def input_fn(input_data, content_type, context=None):
    if content_type == 'application/json':
        return json.loads(input_data)
    else:
        raise Exception(f"Unsupported mime type: {content_type}. Supported: application/json")    

def output_fn(prediction, content_type, context=None):
    if content_type=="application/json":
        return json.dumps(prediction)
    else:
        raise Exception(f"Invalid content-type: {content_type}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.    
    parser.add_argument("--model_name", type=str, default="bert-large-uncased-whole-word-masking-finetuned-squad")
    parser.add_argument("--input_question", type=str, default="What is the capital of France?")
    parser.add_argument("--input_context", type=str, default="Paris is the capital of France.")    
    parser.add_argument("--max_len", type=int, default=384)
    
    parser.add_argument("--model_dir", type=str, default=os.environ["SM_MODEL_DIR"])    
    
    args, _ = parser.parse_known_args()
    
    # Build tokenizer and model
    model = BertForQuestionAnswering.from_pretrained(args.model_name)
    tokenizer = BertTokenizer.from_pretrained(args.model_name)

    # Tokenize and format the input
    inputs = tokenizer.encode_plus(
        args.input_question,
        args.input_context,
        return_tensors="pt",
        max_length=args.max_len,
        padding='max_length',
        truncation=True
    )

    # Convert example inputs to a format that is compatible with TorchScript tracing
    example_inputs = inputs['input_ids'], inputs['attention_mask'], inputs['token_type_ids']

    y = model(**inputs) # warmup the model
    try:
        traced_model = torch.jit.trace(model, example_inputs, strict=False)
        print("Cool! Model is jit traceable")
    except Exception as e:
        print("Ops. Something went wrong. Model is not traceable")

    neuron_model = torch.neuron.trace(model, example_inputs, strict=False)
    neuron_model.save(os.path.join(args.model_dir, "model.pt"))
    tokenizer.save_pretrained(args.model_dir)
    
    code_path=os.path.join(args.model_dir, "code")
    os.makedirs(code_path, exist_ok=True)
    shutil.copy("inference.py", os.path.join(code_path, "inference.py"))
    shutil.copy("requirements.txt", os.path.join(code_path, "requirements.txt"))

In [None]:
import os
import json
import logging
from sagemaker.pytorch import PyTorch

estimator = PyTorch(
    entry_point="inference.py", # Specify your train script
    source_dir="src",
    role=role,
    sagemaker_session=sess,
    container_log_level=logging.DEBUG,
    instance_count=1,
    instance_type='ml.c5.9xlarge',   
    disable_profiler=True,
    env={
        "TP_DEGREE": "1"
    },
    image_uri=f"763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-training:1.13.1-cpu-py39-ubuntu20.04-sagemaker",
    
    volume_size = 30,
    hyperparameters={
        "max_len": 384
    }
)
estimator.framework_version = '1.13.1' # workround when using image_uri

In [None]:
estimator.fit()

In [None]:
print("""
If you decide to run this notebook again, you don't need to re-compile the model.
Just keep the following path and use it to deploy the model next time.
""")
print(estimator.model_data)

## 5) Deploy the compiled model to a SageMaker endpoint on inf1
SageMaker can launch multiple workers, depending on the size of the Inf1 instance. A worker is a standalone Python process that manages one copy of the model. SageMaker puts a load balancer on top of all these processes and distributes the load automatically for your clients. It means that you can increase throughput by launching multiple workers, which serve different clients in parallel.

For instance. If you deploy the model to a **ml.inf1.xlarge**, SageMaker can launch 4 workers with 4 copies of the model. This instance has 4 cores and each copy of the model utilizes 1 core. Then, you can have 4 simultaneous clients invoking the endpoint and being served at the same time.

In [None]:
import logging
from sagemaker.utils import name_from_base
from sagemaker.pytorch.model import PyTorchModel

# depending on the inf1 instance you deploy the model you'll have more or less accelerators
# we'll ask SageMaker to launch 1 worker per core

num_workers=4
pytorch_model = PyTorchModel(    
    image_uri=f"763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-inference-neuron:1.13.1-neuron-py310-sdk2.13.2-ubuntu20.04",
    model_data=estimator.model_data,
    role=role,
    name=name_from_base('bert-large-qa'),
    sagemaker_session=sess,
    container_log_level=logging.DEBUG,
    model_server_workers=num_workers, # 1 worker per core
    framework_version="1.13.1",
    env = {
        'TP_DEGREE': '1',
        'SAGEMAKER_MODEL_SERVER_TIMEOUT' : '3600' 
    }
    # for production it is important to define vpc_config and use a vpc_endpoint
    #vpc_config={
    #    'Subnets': ['<SUBNET1>', '<SUBNET2>'],
    #    'SecurityGroupIds': ['<SECURITYGROUP1>', '<DEFAULTSECURITYGROUP>']
    #}
)
pytorch_model._is_compiled_model = True

In [None]:
predictor = pytorch_model.deploy(
    initial_instance_count=1,
    instance_type='ml.inf1.xlarge'
)

## 6) Run a simple test to check the endpoint

In [None]:
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
predictor.serializer = JSONSerializer()
predictor.deserializer = JSONDeserializer()

In [10]:
predictor.predict({"question": "What is the capital of France?", "context": "Paris is the capital of France."})

{'answer': 'paris'}

In [19]:
import time
from multiprocessing.pool import ThreadPool

q = {"question": "What is the capital of France?", "context": "Paris is the capital of France."}

with ThreadPool(num_workers) as p:
    t=time.time()
    resp = p.map(predictor.predict, [q] * num_workers)
    elapsed=time.time()-t
    print(f"Total elapsed time for {num_workers} workers: {elapsed}")

Total elapsed time for 4 workers: 0.08790946006774902


## 7) Cleanup
Delete the endpoint to stop paying for the provisioned resources

In [None]:
predictor.delete_endpoint()