# Pytorch ModelBuilder

This notebook was tested with the `conda_pytorch_p310` kernel on an Amazon SageMaker notebook instance of type `m5`.

In [None]:
!pip install boto3 sagemaker -U -q

In [None]:
!pip install torch==2.0.1 torchvision==0.15.2 transformers==4.31.0 -q

# SageMaker ModelBuilder experience

In the new experience, we have introduced a few new constructs. Here we will focus on the following: 

1. ModelBuilder
2. SchemaBuilder
3. InferenceSpec

In the following section, we will define these constructs and provide examples to elaborate on each one.

4.1 ModelBuilder:

ModelBuilder is a Python class that takes a framework model (such as XGBoost or PyTorch) or an Inference Spec (more details below) and converts them into a SageMaker deployable model. ModelBuilder provides a `build` function that generates the artifacts for deployment. The model artifact generated is specific to the model server, which is also customizable as one of the inputs.

```python
Class definition:

class ModelBuilder(
    model_path: str | None = '/tmp/sagemaker/model-builder/' + uuid.uuid1().hex,
    role_arn: str | None = None,
    sagemaker_session: Session | None = None,
    name: str | None = 'model-name-' + uuid.uuid1().hex,
    mode: Mode | None = Mode.SAGEMAKER_ENDPOINT,
    shared_libs: List[str] = lambda : [],
    dependencies: Dict[str, Any] | None = lambda : { "auto": False },
    env_vars: Dict[str, str] | None = lambda : {},
    log_level: int | None = logging.DEBUG,
    content_type: str | None = None,
    accept_type: str | None = None,
    s3_model_data_url: str | None = None,
    instance_type: str | None = "ml.c5.xlarge",
    schema_builder: str | None = None,
    model: Any | None = None,
    inference_spec: InferenceSpec = None,
    image_uri: str | None = None,
    model_server: str | None = None
)
```
Example:

The above class file provide all the options for customization. However to deploy the framework model, the model builder just expects model, input, output and the role. 

```python
model_builder = ModelBuilder(
    model=model,  # Pass in the actual model object. It's "predict" method will be invoked in the endpoint.
    schema_builder=SchemaBuilder(input, output), # Pass in a "SchemaBuilder" which will use the sample test input and output objects to infer the serialization needed.
    role_arn=role, # Pass in the role arn or update intelligent defaults.
    )
```

4.2 SchemaBuilder:

The SchemaBuilder enables you to define the input and output for your endpoint. It allows the SchemaBuilder to generate the corresponding marshalling functions for serializing and deserializing the input and output. For further details, please consult the notebook or refer to the video.

Class definition:
```python
class SchemaBuilder(
    sample_input: Any,
    sample_output: Any,
    input_translator: CustomPayloadTranslator = None,
    output_translator: CustomPayloadTranslator = None
)
```
Example:

The CustomPayloadTranslator class provides all the options for customization. However, for [common inference data format](https://docs.aws.amazon.com/sagemaker/latest/dg/cdf-inference.html), you can just provide the sample input/output for the SchemaBuilder.
```python
input = "How is the demo going?"
output = "Comment la démo va-t-elle?"
schema = SchemaBuilder(input, output)
```

4.3 InferenceSpec

In the case you want to specify custom function to load and invoke the model instead of the framework model function, then you can pass the inference spec with your implementation in `load` and `invoke` function. 

class definition:
```python
class InferenceSpec(abc.ABC):
    @abc.abstractmethod
    def load(self, model_dir: str):
        pass

    @abc.abstractmethod
    def invoke(self, input_object: object, model: object):
        pass
```
Example:
```python
class MyInferenceSpec(InferenceSpec):
    def load(self, model_dir: str):
        return pipeline("translation_en_to_fr", model="t5-small")
        
    def invoke(self, input, model):
        return model(input)
   
inf_spec = MyInferenceSpec()

```

In this example, we are using ModelBuilder to deploy an PyTorch model directly. You can use `Mode` to switch between local testing and deploying to a SageMaker Endpoint. 

### PyTorch model deployment

In [None]:
from sagemaker import get_execution_role, Session, image_uris
import boto3

sagemaker_session = Session()
region = boto3.Session().region_name

# get execution role
# please use execution role if you are using notebook instance or update the role arn if you are using a different role
execution_role = get_execution_role() if get_execution_role() is not None else "your-role-arn"

In [None]:
# clean up any working directories
!sudo rm -r "./working_dir/models/resnet_v2_demo/"

In [None]:
# get the local working of resnet model
import torch
from torchvision.transforms import transforms
from torchvision.models import resnet50, ResNet50_Weights
from PIL import Image
import numpy as np
import io

from pathlib import Path

resnet_model_dir = "./working_dir/models/resnet_v2_demo"
!mkdir -p {resnet_model_dir}

resnet_model_path = Path(resnet_model_dir+ '/model.pth')
resnet_model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
torch.save(resnet_model.state_dict(), resnet_model_path)
image_path = Path('./pytorch/zidane.jpeg').resolve()


In [None]:
# Load model from local disk
resnet_model = resnet50()
resnet_model.load_state_dict(torch.load(str(resnet_model_path)))

# Define image transformation
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Load image
image = Image.open(str(image_path))
image_tensor = transform(image)
input_batch = image_tensor.unsqueeze(0)

with torch.no_grad():
    output = resnet_model(input_batch)

output

#### Deploy model using ModelBuilder

Now we will deploy the model using ModelBuilder.

By default, when you pass the samples, ModelBuilder will be able to generate marshaling function. However in case you want to do it because of the custom nature, then you need to build translator. The process will be like below:

* (1) Inference request serialization (handled by the client)
* (2) Inference request deserialization (handled by the server or algorithm)
* (3) (4) Invoke the model against the payload and send response payload back
* (5) Inference response serialization (handled by the server or algorithm)
* (6) Inference response deserialization (handled by the client)

![diagram](./img/serialization-deserialization.png)

Note that all to_function are taken care by translator.

In [None]:
from sagemaker.serve import CustomPayloadTranslator

# request translator
class MyRequestTranslator(CustomPayloadTranslator):
    def __init__(self):
        super().__init__()
        # Define image transformation
        self.transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
    
    # This function converts the payload to bytes - happens on client side
    def serialize_payload_to_bytes(self, payload: object) -> bytes:
        # converts an image to bytes
        image_tensor = self.transform(payload)
        input_batch = image_tensor.unsqueeze(0)
        input_ndarray = input_batch.numpy()
        return self._convert_numpy_to_bytes(input_ndarray)
        
    # This function converts the bytes to payload - happens on server side
    def deserialize_payload_from_stream(self, stream) -> torch.Tensor:
        # convert payload back to torch.Tensor
        np_array = np.load(io.BytesIO(stream.read()))
        return torch.from_numpy(np_array)
        
    def _convert_numpy_to_bytes(self, np_array: np.ndarray) -> bytes:
        buffer = io.BytesIO()
        np.save(buffer, np_array)
        return buffer.getvalue()
    
# response translator 
class MyResponseTranslator(CustomPayloadTranslator):
    # This function converts the payload to bytes - happens on server side
    def serialize_payload_to_bytes(self, payload: torch.Tensor) -> bytes:
        return self._convert_numpy_to_bytes(payload.numpy())
    
    # This function converts the bytes to payload - happens on client side
    def deserialize_payload_from_stream(self, stream) -> object:
        return torch.from_numpy(np.load(io.BytesIO(stream.read())))
    
    def _convert_numpy_to_bytes(self, np_array: np.ndarray) -> bytes:
        buffer = io.BytesIO()
        np.save(buffer, np_array)
        return buffer.getvalue()

In [None]:
from sagemaker.serve import SchemaBuilder

# pass in the sample input and output, along with above translators
my_schema = SchemaBuilder(
    sample_input=image, 
    sample_output=output,
    input_translator=MyRequestTranslator(), 
    output_translator=MyResponseTranslator()
    )

In [None]:
from sagemaker.serve import InferenceSpec

# custom inference spec
class MyResNetModel(InferenceSpec):
    def invoke(self, input_object: object, model: object):       
        with torch.no_grad():
            output = model(input_object)
        return output
        
    def load(self, model_dir: str):
        model = resnet50()
        model.load_state_dict(torch.load(model_dir+'/model.pth'))
        model.eval()
        return model

my_inference_spec = MyResNetModel()

In [None]:
from sagemaker.serve import ModelBuilder
from sagemaker.serve.mode.function_pointers import Mode
from sagemaker.session import Session
import boto3

# python absolute path from relative path for local deployment to work. -- we will fix this
resnet_model_dir = str(Path(resnet_model_dir).resolve())

# Create model builder with above custom inference spec and schema builder
model_builder = ModelBuilder(
    mode=Mode.SAGEMAKER_ENDPOINT,  # you can change it to Mode.LOCAL_CONTAINER for local testing
    model_path=resnet_model_dir,
    inference_spec=my_inference_spec,
    schema_builder=my_schema,
    role_arn=execution_role,
)

In [None]:
# Build the model according to the model server specification and save it to as files in the working directory
model = model_builder.build()

In [None]:
# deploy is an existing method in the model object, however we have enabled live loggging for easier debugging.
predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.c6i.xlarge"
)

In [None]:
# Load image and preprocess
image = Image.open(str(image_path))

# make inference call
predictor.predict(image)

## Clean up

In [None]:
predictor.delete_model()
predictor.delete_endpoint()