# Deploy a TSFM Inference Endpoint on Amazon Sagemaker


In this notebook, we walk through the process of deploying an IBM granite timeseries inference service enpoint on Amazon SageMaker.

In [104]:
# obtain tsfm requirements
import shutil
import subprocess
from pathlib import Path


# so that some code directories from the granite-tsfm repo can be added to the sys.path
shutil.rmtree("code/granite-tsfm", ignore_errors=True)
assert not Path("code/granite-tsfm").exists()
subprocess.run(
    check=True,
    stdout=subprocess.DEVNULL,
    cwd="code",
    args=[
        "git",
        "clone",
        "-b",
        "sagemaker",
        "--depth",
        "1",
        "https://github.com/ibm-granite/granite-tsfm.git",
    ],
)
shutil.copy("code/granite-tsfm/services/inference/requirements.txt", "code")
# set up tsfm services layer
# 1. boilerplate code
subprocess.run(
    check=True, stdout=subprocess.DEVNULL, cwd="code/granite-tsfm/services/inference", args=["make", "boilerplate"]
)
# 2. tsfm-granite public models
# you must have git-lfs installed for this
subprocess.run(check=True, stdout=subprocess.DEVNULL, args=["git-lfs"])
subprocess.run(
    check=True, stdout=subprocess.DEVNULL, cwd="code/granite-tsfm/services/inference", args=["make", "clone_models"]
)

Cloning into 'granite-tsfm'...
Cloning into 'mytest-tsfm'...
Filtering content: 100% (15/15), 104.18 MiB | 21.61 MiB/s, done.


CompletedProcess(args=['make', 'clone_models'], returncode=0)

In [93]:
# obtain the mdhash of our repo so that models and endpoints can use that in their name
result = subprocess.run(
    cwd="code/granite-tsfm/services/inference",
    args=["git", "rev-parse", "--short", "HEAD"],
    capture_output=True,
    text=True,
)
tsfm_release = f"tsfm-public-{result.stdout.strip()}"

result = subprocess.run(
    cwd="code/granite-tsfm/services/inference/mytest-tsfm",
    args=["git", "rev-parse", "--short", "HEAD"],
    capture_output=True,
    text=True,
)
tsfm_model_release = f"model-release-{result.stdout.strip()}"
identifier = f"{tsfm_release}-{tsfm_model_release}"
identifier

'tsfm-public-95ed712-model-release-fc98672'

In [105]:
import subprocess

import boto3
from sagemaker import Session
from sagemaker.pytorch import PyTorchModel


# Make sure you have run the aws cli command "aws config" to set up your system correctly
# with your aws credentials

boto3_session = boto3.Session()

sess = Session(boto_session=boto3_session)
role = "arn:aws:iam::481118440516:role/SagemakerFullAccessRole"

# create the tarball of code and model artifacts
subprocess.run(
    check=True,
    stdout=subprocess.DEVNULL,
    args=[
        "tar",
        "--exclude",
        "*.git/**",
        "-czvf",
        "model.tar.gz",
        "code/inference.py",
        "code/requirements.txt",
        "code/granite-tsfm/services/inference",
    ],
)

# upload it to our default buucket sot that sagemaker get get it
tsfm_tarball = sess.upload_data(path="model.tar.gz", bucket=sess.default_bucket(), key_prefix="model/pytorch")

## PyTorch Model Object

The `PyTorchModel` class allows you to define an environment for making inference using your
model artifact. Like the `PyTorch` class discussed 
[in this notebook for training an PyTorch model](get_started_mnist_train.ipynb), it is a high level API used to set up a docker image for your model hosting service.

Once it is properly configured, it can be used to create a SageMaker
endpoint on an EC2 instance. The SageMaker endpoint is a containerized environment that uses your trained model 
to make inference on incoming data via RESTful API calls. 

Some common parameters used to initiate the `PyTorchModel` class are:
- `entry_point`: A user defined python file to be used by the inference image as handlers of incoming requests
- `source_dir`: The directory of the `entry_point`
- `role`: An IAM role to make AWS service requests
- `model_data`: the S3 location of the compressed model artifact. It can be a path to a local file if the endpoint 
is to be deployed on the SageMaker instance you are using to run this notebook (local mode)
- `framework_version`: version of the PyTorch package to be used
- `py_version`: python version to be used

We elaborate on the `entry_point` below.



In [106]:
model = PyTorchModel(
    entry_point="inference.py",
    source_dir="code",
    role=role,
    model_data=tsfm_tarball,
    framework_version="2.5",
    py_version="py311",
    name=f"model-{identifier}",
)

## Deploy the inference container
Once the `PyTorchModel` class is initiated, we can call its `deploy` method to run the container for the hosting
service. Some common parameters needed to call `deploy` methods are:

- `initial_instance_count`: the number of SageMaker instances to be used to run the hosting service.
- `instance_type`: the type of SageMaker instance to run the hosting service. Set it to `local` if you want to run the hosting service on the local SageMaker instance. Local mode is typically used for debugging. 
- `serializer`: A python callable used to serialize (encode) the request data.
- `deserializer`: A python callable used to deserialize (decode) the response data.

Commonly used serializers and deserializers are implemented in `sagemaker.serializers` and `sagemaker.deserializers`
submodules of the SageMaker Python SDK. 

Since in the `transform_fn` we declared that the incoming requests are json-encoded, we need to use a `json serializer`,
to encode the incoming data into a json string.

In [107]:
from sagemaker.deserializers import JSONDeserializer
from sagemaker.serializers import JSONSerializer


instance_type = "ml.c4.xlarge"

# this could take up to five minutes to complete
predictor = model.deploy(
    initial_instance_count=1,
    instance_type=instance_type,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
    endpoint_name=f"endpoint-{identifier}",
)
predictor

-------!

<sagemaker.pytorch.model.PyTorchPredictor at 0x7f544d237510>

The `predictor` we get above can be used to make prediction requests against a SageMaker endpoint. 
For more information, check [the API reference for SageMaker Predictor](
https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html#sagemaker.predictor.predictor)

Now, let's test the endpoint with some dummy data.

We'll create a univariate single (all given with the ID "A") timeseries.

In [123]:
from datetime import datetime

import numpy as np
import pandas as pd


tslength = 1024
start_date = datetime(2020, 1, 1)
date_range = [d.isoformat() for d in pd.date_range(start=start_date, periods=tslength, freq="h")]
ids = ["A" for _ in range(tslength)]
values = np.random.rand(tslength)

# Create the DataFrame
df = pd.DataFrame({"timestamp": date_range, "ID": ids, "value": values})

df[0:10]

Unnamed: 0,timestamp,ID,value
0,2020-01-01T00:00:00,A,0.849956
1,2020-01-01T01:00:00,A,0.149866
2,2020-01-01T02:00:00,A,0.446576
3,2020-01-01T03:00:00,A,0.933218
4,2020-01-01T04:00:00,A,0.114864
5,2020-01-01T05:00:00,A,0.437336
6,2020-01-01T06:00:00,A,0.392566
7,2020-01-01T07:00:00,A,0.382814
8,2020-01-01T08:00:00,A,0.290531
9,2020-01-01T09:00:00,A,0.894302


Prepare the payload

In [124]:
payload = {
    "inference_type": "forecasting",  # we currently support only 'forecasting'
    "model_id": "ttm-1024-96-r1",
    "parameters": {},
    "schema": {
        "timestamp_column": "timestamp",
        "id_columns": ["ID"],  # multiple columns are supported
        "target_columns": ["value"],  # what we're generating a forecast for
    },
    "data": df.to_dict(orient="list"),
    "future_data": {},  # used for things like exogenous data
}

Use the predictor to generate a forecast

In [125]:
# import json
res = predictor.predict(payload)

In [126]:
res.keys()

dict_keys(['model_id', 'created_at', 'results', 'input_data_points', 'output_data_points'])

In [127]:
# convert back to a pandas dataframe
# note that [0] are the predictions for the first--and in this case only--timeseries "A"
results_df = pd.DataFrame.from_dict(res["results"][0])
# the first 10 predictions
results_df[0:10]

Unnamed: 0,timestamp,ID,value
0,2020-02-12T16:00:00,A,0.5489
1,2020-02-12T17:00:00,A,0.564634
2,2020-02-12T18:00:00,A,0.55699
3,2020-02-12T19:00:00,A,0.608186
4,2020-02-12T20:00:00,A,0.580682
5,2020-02-12T21:00:00,A,0.512144
6,2020-02-12T22:00:00,A,0.495369
7,2020-02-12T23:00:00,A,0.478212
8,2020-02-13T00:00:00,A,0.495077
9,2020-02-13T01:00:00,A,0.50149


In [128]:
# notice that the results start one hour beyond the last date in the input data
from datetime import timedelta


assert datetime.fromisoformat(df["timestamp"].iloc[-1]) + timedelta(hours=1) == datetime.fromisoformat(
    results_df["timestamp"].iloc[0]
)

## (Optional) Clean up 

If you do not plan to use the endpoint, you should delete it to free up some computation 
resource. If you use local, you will need to manually delete the docker container bounded
at port 8080 (the port that listens to the incoming request).


In [130]:
predictor.delete_model()
predictor.delete_endpoint()