# Setup

In [22]:
!pip install -U pip watermark -q --root-user-action=ignore

Keyring is skipped due to an exception: 'keyring.backends'


## Imports

In [4]:
import base64
import json
import os
import time
from io import BytesIO

import boto3
import numpy as np
import pandas as pd
import sagemaker
from sagemaker.model_monitor import (
    CronExpressionGenerator,
    DataCaptureConfig,
    DatasetFormat,
    DefaultModelMonitor,
)
from sagemaker.pipeline import PipelineModel
from sagemaker.s3 import S3Downloader
from sagemaker.serverless.serverless_inference_config import ServerlessInferenceConfig
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.transformer import Transformer
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

## Versions

In [3]:
%load_ext watermark
%watermark --iversions

json     : 2.0.9
sagemaker: 2.116.0
pandas   : 1.3.5
numpy    : 1.18.1
boto3    : 1.26.8



## Parameters

In [6]:
bucket = "datarocket-stg-sagemaker"
prefix = "california-housing"


sm_region = boto3.Session().region_name
sm_role = sagemaker.get_execution_role()
sm_session = sagemaker.Session(default_bucket=bucket)
sm_client = boto3.client("sagemaker", sm_region)
runtime = boto3.client("sagemaker-runtime")


data = fetch_california_housing()
X_train, X_test, y_train, y_test = train_test_split(
    data.data, data.target, test_size=0.25, random_state=42
)
testX = pd.DataFrame(X_test, columns=data.feature_names)


model_file_name = "model.joblib"
model_name = "california-housing"
serverless_inference_config = ServerlessInferenceConfig(memory_size_in_mb=2048, max_concurrency=10)
endpoint_name_serverless = "california-housing-serverless"
endpoint_name_realtime = "california-housing-realtime"
data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri=f"s3://{bucket}/{prefix}/data_capture",
)

## Data

In [5]:
testX.shape

(5160, 8)

In [6]:
testX.head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude
0,1.6812,25.0,4.192201,1.022284,1392.0,3.877437,36.06,-119.01
1,2.5313,30.0,5.039384,1.193493,1565.0,2.679795,35.14,-119.46
2,3.4801,52.0,3.977155,1.185877,1310.0,1.360332,37.8,-122.44
3,5.7376,17.0,6.163636,1.020202,1705.0,3.444444,34.28,-118.72
4,3.725,34.0,5.492991,1.028037,1063.0,2.483645,36.62,-121.93


In [7]:
testX[data.feature_names].to_csv("./data/california_housing.csv", header=False, index=False)
testX[data.feature_names].to_csv("./data/california_housing_with_header.csv", index=False)

sm_session.upload_data(
    path="./data/california_housing.csv", bucket=bucket, key_prefix=f"{prefix}/batch_data"
)

batch_data = f"s3://{bucket}/{prefix}/batch_data/"
batch_data

's3://datarocket-stg-sagemaker/california-housing/batch_data/'

## Model Data

In [8]:
!tar czvf model.tar.gz $model_file_name

model.joblib


In [9]:
fObj = open("model.tar.gz", "rb")
key = os.path.join(prefix, "model.tar.gz")
boto3.Session().resource("s3").Bucket(bucket).Object(key).upload_fileobj(fObj)

In [10]:
model_data = "s3://{}/{}".format(bucket, key)
model_data

's3://datarocket-stg-sagemaker/california-housing/model.tar.gz'

## Model Code

In [11]:
!pygmentize ./code/inference.py

[34mimport[39;49;00m [04m[36mos[39;49;00m

[34mimport[39;49;00m [04m[36mjoblib[39;49;00m

[33m"""[39;49;00m
[33m    https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/using_sklearn.html[39;49;00m
[33m[39;49;00m
[33m    input_fn: Takes request data and deserializes the data into an object for prediction.[39;49;00m
[33m    predict_fn: Takes the deserialized request object and performs inference against the loaded model.[39;49;00m
[33m    output_fn: Takes the result of prediction and serializes this according to the response content type.[39;49;00m
[33m"""[39;49;00m


[34mdef[39;49;00m [32mpredict_fn[39;49;00m(input_object, model):
    [33m"""[39;49;00m
[33m    """[39;49;00m
    [36mprint[39;49;00m([33m"[39;49;00m[33mcalling model[39;49;00m[33m"[39;49;00m)
    predictions = model.predict(input_object)
    [34mreturn[39;49;00m predictions


[34mdef[39;49;00m [32mmodel_fn[39;49;00m(model_dir):
    [33m"""[39;49;00m
[33m    """[39;

In [12]:
!pygmentize ./code/requirements.txt

boto3==1.24.17
botocore==1.27.18
requests==2.28.1
nltk==3.7


# SageMaker

## SKLearnModel

In [13]:
# https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-model

model = SKLearnModel(
    name=model_name,
    role=sm_role,
    model_data=model_data,
    framework_version="1.0-1",
    py_version="py3",
    source_dir="code",
    entry_point="inference.py",
    sagemaker_session=sm_session,
)

## SageMaker Model

In [14]:
model.create()

## Model Registry

In [15]:
register = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    transform_instances=["ml.m5.large", "ml.m5.xlarge"],
    model_package_group_name=model_name,
    approval_status="Approved",
    description="My sample California housing model package group"
    # model_metrics=model_metrics,
)

## Batch Transform

In [16]:
transformer = model.transformer(
    instance_count=1,
    instance_type="ml.m5.large",
    max_payload=10,
    accept="text/csv",
    assemble_with="Line",
    output_path=f"s3://{bucket}/{prefix}/result/",
)

Using already existing model: california-housing


In [17]:
transformer.transform(
    batch_data,
    join_source="Input",
    split_type="Line",
    content_type="text/csv",
)

...........................
[34m2022-12-06 02:23:42,816 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[34m2022-12-06 02:23:42,819 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[34m2022-12-06 02:23:42,820 INFO - sagemaker-containers - nginx config: [0m
[34mworker_processes auto;[0m
[34mdaemon off;[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log  /dev/stderr;[0m
[34mworker_rlimit_nofile 4096;[0m
[34mevents {
  worker_connections 2048;[0m
[34m}[0m
[34mhttp {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  access_log /dev/stdout combined;
  upstream gunicorn {
    server unix:/tmp/gunicorn.sock;
  }
  server {
    listen 8080 deferred;
    client_max_body_size 0;
    keepalive_timeout 3;
    location ~ ^/(ping|invocations|execution-parameters) {
      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header Host $http_host;
      proxy_redirect off;
  

In [18]:
# Download the output data from S3 to local file system
batch_output = transformer.output_path
!mkdir -p data/output
!aws s3 cp --recursive $batch_output data/output/
# Head to see what the batch output looks like
!head data/output/*

download: s3://datarocket-stg-sagemaker/california-housing/result/california_housing.csv.out to data/output/california_housing.csv.out
1.6812,25.0,4.192200557103064,1.0222841225626742,1392.0,3.8774373259052926,36.06,-119.01,0.5052255264069264
2.5313,30.0,5.039383561643835,1.1934931506849316,1565.0,2.6797945205479454,35.14,-119.46,0.7278549527417028
3.4801,52.0,3.977154724818276,1.185877466251298,1310.0,1.3603322949117342,37.8,-122.44,4.84144142727633
5.7376,17.0,6.163636363636364,1.02020202020202,1705.0,3.4444444444444446,34.28,-118.72,2.630331391414141
3.725,34.0,5.492990654205608,1.02803738317757,1063.0,2.4836448598130842,36.62,-121.93,2.3375484145021646
4.7147,12.0,5.251482799525504,0.9750889679715302,2400.0,2.8469750889679717,34.08,-117.61,1.6975502158730171
5.0839,36.0,6.221719457013575,1.0950226244343892,670.0,3.0316742081447963,33.89,-118.02,2.309398987518038
3.6908,38.0,4.962825278810409,1.0483271375464684,1011.0,3.758364312267658,33.92,-118.08,1.6731999047619046
4.8036,4.0,3.9

In [19]:
feature_names = data.feature_names.copy()
feature_names.append("predict")

pd.read_csv(
    "data/output/california_housing.csv.out",
    header=None,
    names=feature_names,
)

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,predict
0,1.6812,25.0,4.192201,1.022284,1392.0,3.877437,36.06,-119.01,0.505226
1,2.5313,30.0,5.039384,1.193493,1565.0,2.679795,35.14,-119.46,0.727855
2,3.4801,52.0,3.977155,1.185877,1310.0,1.360332,37.80,-122.44,4.841441
3,5.7376,17.0,6.163636,1.020202,1705.0,3.444444,34.28,-118.72,2.630331
4,3.7250,34.0,5.492991,1.028037,1063.0,2.483645,36.62,-121.93,2.337548
...,...,...,...,...,...,...,...,...,...
5155,6.6260,51.0,5.532213,0.974790,771.0,2.159664,34.04,-118.42,4.547683
5156,2.1898,30.0,4.509091,0.945455,410.0,2.484848,40.18,-122.21,0.862516
5157,2.1667,37.0,3.272152,1.056962,2173.0,4.584388,34.02,-118.26,1.259680
5158,6.8869,6.0,7.382385,1.030075,2354.0,2.528464,38.51,-121.06,3.007282


## Deploy Endpoint - Serverless

In [20]:
predictor_serverless = model.deploy(
    endpoint_name=endpoint_name_serverless,
    serverless_inference_config=serverless_inference_config,
)

Using already existing model: california-housing


------!

In [21]:
predictor_realtime = model.deploy(
    endpoint_name=endpoint_name_realtime,
    instance_type="ml.t2.medium",
    initial_instance_count=1,
    data_capture_config=data_capture_config,
)

Using already existing model: california-housing


---------!

## Predictor

In [22]:
predictions_realtime = predictor_realtime.predict(testX[data.feature_names])
predictions_serverless = predictor_realtime.predict(testX[data.feature_names])

df_predictions_realtime = pd.DataFrame(predictions_realtime, columns=["predict"])
df_predictions_serverless = pd.DataFrame(predictions_serverless, columns=["predict"])

In [23]:
df_predictions_realtime.shape, df_predictions_serverless.shape

((5160, 1), (5160, 1))

## Predictions Realtime

In [24]:
df_predictions_realtime

Unnamed: 0,predict
0,0.505226
1,0.727855
2,4.841441
3,2.630331
4,2.337548
...,...
5155,4.547683
5156,0.862516
5157,1.259680
5158,3.007282


## Predictions Serverless

In [25]:
df_predictions_serverless

Unnamed: 0,predict
0,0.505226
1,0.727855
2,4.841441
3,2.630331
4,2.337548
...,...
5155,4.547683
5156,0.862516
5157,1.259680
5158,3.007282


## Invoke Endpoint (application/x-npy)

In [26]:
# line 59 - https://github.com/aws/sagemaker-scikit-learn-container/blob/master/src/sagemaker_sklearn_container/serving.py
# lines 191, 48 - https://github.com/aws/sagemaker-training-toolkit/blob/master/src/sagemaker_training/encoders.py

# Serialise numpy ndarray as bytes
buffer = BytesIO()
np.save(buffer, testX[data.feature_names].values)

x_npy_response_realtime = runtime.invoke_endpoint(
    EndpointName=predictor_realtime.endpoint_name,
    Body=buffer.getvalue(),
    ContentType="application/x-npy",
)

x_npy_response_serverless = runtime.invoke_endpoint(
    EndpointName=predictor_serverless.endpoint_name,
    Body=buffer.getvalue(),
    ContentType="application/x-npy",
)

x_npy_predictions_serverless = json.loads(x_npy_response_serverless["Body"].read())
x_npy_predictions_realtime = json.loads(x_npy_response_realtime["Body"].read())

df_x_npy_predictions_realtime = pd.DataFrame(x_npy_predictions_realtime, columns=["Predict"])
df_x_npy_predictions_serverless = pd.DataFrame(x_npy_predictions_serverless, columns=["Predict"])

In [27]:
df_x_npy_predictions_serverless.shape, df_x_npy_predictions_realtime.shape

((5160, 1), (5160, 1))

## Invoke Endpoint (text/csv)

In [28]:
csv_response_realtime = runtime.invoke_endpoint(
    EndpointName=predictor_realtime.endpoint_name,
    Body=testX[data.feature_names].to_csv(header=False, index=False).encode("utf-8"),
    ContentType="text/csv",
)

csv_response_serverless = runtime.invoke_endpoint(
    EndpointName=predictor_serverless.endpoint_name,
    Body=testX[data.feature_names].to_csv(header=False, index=False).encode("utf-8"),
    ContentType="text/csv",
)

csv_predictions_serverless = json.loads(csv_response_serverless["Body"].read())
csv_predictions_realtime = json.loads(csv_response_realtime["Body"].read())

df_csv_predictions_realtime = pd.DataFrame(csv_predictions_realtime, columns=["Predict"])
df_csv_predictions_serverless = pd.DataFrame(csv_predictions_serverless, columns=["Predict"])

In [29]:
df_csv_predictions_realtime.shape, df_csv_predictions_serverless.shape

((5160, 1), (5160, 1))

## Invoke Endpoint (application/json)

In [30]:
event = {"data": json.loads(testX[data.feature_names].to_json(orient="values"))}
context = None


def lambda_handler(event, context):
    payload = json.dumps(event["data"])

    response = runtime.invoke_endpoint(
        EndpointName=predictor_realtime.endpoint_name,
        Body=payload,
        ContentType="application/json",
    )

    return response


json_response_realtime = lambda_handler(event, context)

json_predictions_realtime = json.loads(json_response_realtime["Body"].read())
df_json_predictions_realtime = pd.DataFrame(json_predictions_realtime, columns=["Predict"])

In [31]:
df_json_predictions_realtime.shape

(5160, 1)

## Capture realtime

In [32]:
# https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_monitoring.html
# https://github.com/aws/amazon-sagemaker-examples/blob/main/aws_sagemaker_studio/getting_started/xgboost_customer_churn_studio.ipynb
# https://github.com/aws/amazon-sagemaker-examples/tree/main/sagemaker_model_monitor/model_monitor_batch_transform

for _ in range(20):  # wait up to a minute to see captures in S3
    capture_files = S3Downloader.list(f"s3://{bucket}/{prefix}/data_capture")
    if capture_files:
        break
    time.sleep(5)

print("Found Data Capture Files:")
print(capture_files)

Found Data Capture Files:
['s3://datarocket-stg-sagemaker/california-housing/data_capture/california-housing-realtime/AllTraffic/2022/12/04/14/51-32-792-5c2e490c-4ccd-4f33-9520-d8807cad4eea.jsonl', 's3://datarocket-stg-sagemaker/california-housing/data_capture/california-housing-realtime/AllTraffic/2022/12/04/15/50-52-882-2346ad33-3082-44fb-b0bf-53e33fbf28ff.jsonl', 's3://datarocket-stg-sagemaker/california-housing/data_capture/california-housing-realtime/AllTraffic/2022/12/04/16/15-46-832-0a6fb23e-57b9-47eb-8450-ced1f2ad5a67.jsonl', 's3://datarocket-stg-sagemaker/california-housing/data_capture/california-housing-realtime/AllTraffic/2022/12/04/16/19-12-200-eb41e363-83a0-4684-bf7e-5dd26e7ca1f0.jsonl', 's3://datarocket-stg-sagemaker/california-housing/data_capture/california-housing-realtime/AllTraffic/2022/12/04/16/21-28-726-13eb317c-944d-4c2a-aa45-70f341032e19.jsonl', 's3://datarocket-stg-sagemaker/california-housing/data_capture/california-housing-realtime/AllTraffic/2022/12/04/16/23

In [33]:
capture_file = S3Downloader.read_file(capture_files[-1])

In [34]:
print(json.dumps(json.loads(capture_file.split("\n")[0]), indent=2)[:1000])

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "application/json",
      "mode": "INPUT",
      "data": "[[1.6812, 25.0, 4.1922005571, 1.0222841226, 1392.0, 3.8774373259, 36.06, -119.01], [2.5313, 30.0, 5.0393835616, 1.1934931507, 1565.0, 2.6797945205, 35.14, -119.46], [3.4801, 52.0, 3.9771547248, 1.1858774663, 1310.0, 1.3603322949, 37.8, -122.44], [5.7376, 17.0, 6.1636363636, 1.0202020202, 1705.0, 3.4444444444, 34.28, -118.72], [3.725, 34.0, 5.4929906542, 1.0280373832, 1063.0, 2.4836448598, 36.62, -121.93], [4.7147, 12.0, 5.2514827995, 0.975088968, 2400.0, 2.846975089, 34.08, -117.61], [5.0839, 36.0, 6.221719457, 1.0950226244, 670.0, 3.0316742081, 33.89, -118.02], [3.6908, 38.0, 4.9628252788, 1.0483271375, 1011.0, 3.7583643123, 33.92, -118.08], [4.8036, 4.0, 3.9246575342, 1.0359589041, 1050.0, 1.7979452055, 37.39, -122.08], [8.1132, 45.0, 6.8790560472, 1.01179941, 943.0, 2.7817109145, 34.18, -118.23], [2.5417, 30.0, 5.0860215054, 1.1720430108, 242.0, 2.602150

In [None]:
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = "s3://{}/{}".format(bucket, baseline_data_prefix)
baseline_results_uri = "s3://{}/{}".format(bucket, baseline_results_prefix)
print("Baseline data uri: {}".format(baseline_data_uri))
print("Baseline results uri: {}".format(baseline_results_uri))
baseline_data_path = sagemaker.s3.S3Uploader.upload(
    "data/california_housing_with_header.csv", baseline_data_uri
)

In [None]:
my_default_monitor = DefaultModelMonitor(
    role=sm_role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

baseline_job = my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_path,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)

In [None]:
print("Found Files:")
S3Downloader.list("s3://{}/{}".format(bucket, baseline_results_prefix))

In [None]:
baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

In [None]:
constraints_df = pd.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)

In [None]:
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name="my-monitoring-schedule",
    endpoint_input=predictor_realtime.endpoint_name,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
)

In [None]:
executions = my_default_monitor.list_executions()

## Delete Resources

In [10]:
def empty_and_delete_model_package(sagemaker_client, mpg_name):
    mpg = sagemaker_client.list_model_packages(
        ModelPackageGroupName=mpg_name,
    )

    # Delete model packages if Group not empty
    model_packages = mpg.get("ModelPackageSummaryList")
    if model_packages:
        for mp in model_packages:
            sagemaker_client.delete_model_package(ModelPackageName=mp["ModelPackageArn"])
            time.sleep(1)

    # Delete model package group
    sagemaker_client.delete_model_package_group(ModelPackageGroupName=mpg_name)


# model.delete_model()
# sm_session.delete_monitoring_schedule("my-monitoring-schedule")
# predictor_realtime.delete_endpoint(delete_endpoint_config=True)
# predictor_serverless.delete_endpoint(delete_endpoint_config=True)
empty_and_delete_model_package(sm_client, "from-idea-to-prod-model-group")