In [4]:
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()

# S3 prefix
bucket = sagemaker_session.default_bucket()
prefix = "Scikit-LinearLearner-snmpcpu"

In [3]:
import pandas as pd
import numpy as np
import pyarrow
from sklearn.model_selection import train_test_split

df=pd.read_parquet("7e940b15d4914503a2c42e3e75ac196c.parquet")
df=df.to_csv("7e940b15d4914503a2c42e3e75ac196c.csv")
df=pd.read_csv("7e940b15d4914503a2c42e3e75ac196c.csv")

df['ts']=pd.to_datetime(df['ts'])
data_cpu=df.drop('Unnamed: 0',axis=1)

#data_cpu['day']=data_cpu['ts'].dt.day
data_cpu['day_of_week']=data_cpu['ts'].dt.day_of_week.astype(np.float)
data_cpu['hour']=data_cpu['ts'].dt.hour.astype(np.float)
data_cpu['minutes']=data_cpu['ts'].dt.minute.astype(np.float)
data_cpu['date']=data_cpu['ts'].dt.strftime("%Y-%m-%d")
data_cpu['date']=pd.to_datetime(data_cpu['date'])

time_series=data_cpu[['minutes','hour','day_of_week','snmpcpu_processor_1','date']]
time_series=time_series.drop('date',axis=1)

columns=list(time_series.columns)


x_train, x_test = train_test_split(time_series, test_size=0.25)
x_eval = x_test[['minutes', 'hour', 'day_of_week',]]
x_train.to_csv("train.csv")
x_test.to_csv("test.csv")
x_eval.to_csv("eval.csv", header=False, index=False)

trainpath = sagemaker_session.upload_data(
    path="train.csv", bucket=bucket, key_prefix="sagemaker/sklearn-train"
)

testpath = sagemaker_session.upload_data(
    path="test.csv", bucket=bucket, key_prefix="sagemaker/sklearn-train"
)

#print(trainpath)
#print(testpath)

sagemaker_session.upload_data(
    path="eval.csv", bucket=bucket, key_prefix="sagemaker/sklearn-eval"
)

eval_s3_prefix = f"s3://{bucket}/sagemaker/sklearn-eval/"
#eval_s3_prefix

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  from ipykernel import kernelapp as app
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  app.launch_new_instance()


In [7]:
%%writefile script.py

import argparse
import joblib
import os

import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
import logging
import sys

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))


# inference functions ---------------
def model_fn(model_dir):
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf


if __name__ == "__main__":

    print("extracting arguments")
    parser = argparse.ArgumentParser()


    # Data, model, and output directories
    parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
    parser.add_argument("--train-file", type=str, default="train.csv")
    parser.add_argument("--test-file", type=str, default="test.csv")
    parser.add_argument(
        "--features", type=str
    )  # in this script we ask user to explicitly name features
    parser.add_argument(
        "--target", type=str
    )  # in this script we ask user to explicitly name the target

    args, _ = parser.parse_known_args()

    print("reading data")
    train_df = pd.read_csv(os.path.join(args.train, args.train_file))
    test_df = pd.read_csv(os.path.join(args.test, args.test_file))

    print("building training and testing datasets")
    X_train = train_df[args.features.split()]
    X_test = test_df[args.features.split()]
    y_train = train_df[args.target]
    y_test = test_df[args.target]

    # train
    print("training model")
    model =LinearRegression() 

    model.fit(X_train, y_train)

    # print abs error
    print("validating model")
    abs_err = np.abs(model.predict(X_test) - y_test)
    logger.info(f"Absolute Error: {abs_err}")

    # print couple perf metrics
    for q in [10, 50, 90]:
        logger.info("AE-at-" + str(q) + "th-percentile: " + str(np.percentile(a=abs_err, q=q)))

    # persist model
    path = os.path.join(args.model_dir, "model.joblib")
    joblib.dump(model, path)
    print("model persisted at " + path)
    logger.info(f"model persisted at {path}")

Overwriting script.py


In [8]:
# We use the Estimator from the SageMaker Python SDK
from sagemaker.sklearn.estimator import SKLearn

FRAMEWORK_VERSION = "1.0-1"
training_job_1_name = "sklearn-snmp-cpu-1"

sklearn_estimator_1 = SKLearn(
    entry_point="script.py",
    role=get_execution_role(),
    instance_count=1,
    instance_type="ml.m5.xlarge",
    framework_version=FRAMEWORK_VERSION,
    base_job_name=training_job_1_name,
    metric_definitions=[{"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"}],
    sagemaker_session=sagemaker_session,
    hyperparameters={
        "features": "minutes hour day_of_week",
        "target": "snmpcpu_processor_1",
    },
)

In [9]:
sklearn_estimator_1.fit({"train": trainpath, "test": testpath})

2022-11-21 20:02:48 Starting - Starting the training job...
2022-11-21 20:03:13 Starting - Preparing the instances for trainingProfilerReport-1669060967: InProgress
.........
2022-11-21 20:04:33 Downloading - Downloading input data...
2022-11-21 20:05:16 Training - Training image download completed. Training in progress..[34m2022-11-21 20:05:18,879 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2022-11-21 20:05:18,882 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2022-11-21 20:05:18,890 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2022-11-21 20:05:19,243 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2022-11-21 20:05:19,255 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2022-11-21 20:05:19,266 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus

## Model Monitor

we create a monitoring schedule for a deployed model. We'll begin by reloading our data from the previous demo.

In [12]:
from sagemaker.model_monitor import DataCaptureConfig

capture_uri = f's3://{bucket}/data-capture'

data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,#percentage of events that pass through our model we want to capture, here 100 we capture everything
    destination_s3_uri=capture_uri
)

In [13]:
predictor=sklearn_estimator_1.deploy(initial_instance_count=1,instance_type='ml.m5.xlarge',
                                    data_capture_config=data_capture_config
                                    )

-----!

In [14]:
predictor.endpoint

The endpoint attribute has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


'sklearn-snmp-cpu-1-2022-11-21-20-32-01-311'

In [15]:
predictor.serializer = sagemaker.serializers.CSVSerializer()

[Minutes, hour, weekday]

In [21]:

inputs=x_eval.sample(5).values
inputs

array([[36.,  0.,  5.],
       [ 1., 14.,  1.],
       [45.,  9.,  5.],
       [35., 10.,  3.],
       [53., 10.,  1.]])

In [22]:

x_pred=predictor.predict(inputs)
x_pred# output of 5

array([2.70718733, 2.63649008, 2.69301744, 2.6738877 , 2.67088027])

Predictions from Inputs

In [24]:
#predictor.predict([[38., 16.,  4.]])

**We define the Model Monitor and suggest a baseline:**

to capture our data

In [26]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20, #how much size they have to hold the data as they are processing it.
    max_runtime_in_seconds=3600,#they will fail if they have to process it for more than 3600 seconds.
)

**monitor.suggest_baseline(..)**

Just using our input data, the monitor can generate lots of rules about what our inputs should be from like data type to range the data should be within, if all of our training data is within a particular range, the suggest baseline method can create a statistical test for our future input and make sure that our data is not skewing or drifting.

In [28]:
my_monitor.suggest_baseline(
    baseline_dataset="s3://sagemaker-us-east-1-940426109786/sagemaker/sklearn-train/train.csv",
    dataset_format=DatasetFormat.csv(header=False),# data format so that the serializer for the endpoint knows what data to pull in and how.
)


Job Name:  baseline-suggestion-job-2022-11-21-21-03-00-922
Inputs:  [{'InputName': 'baseline_dataset_input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-940426109786/sagemaker/sklearn-train/train.csv', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-940426109786/model-monitor/baselining/baseline-suggestion-job-2022-11-21-21-03-00-922/results', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
...........................[34m2022-11-21 21:07:16,643 - matplotlib.font_manager - INFO - Generating new fontManager, this may take some time...[0m
[34m2022-11-21 21:07:17.185086: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; d

<sagemaker.processing.ProcessingJob at 0x7f9a8610b4d0>

Lastly, the Model Monitor must be scheduled, or it won't actually run regular processing jobs on the captured data:

Statistics and constraints are the outputs of the suggest baseline method above. The statistics are monitoring statistics about the operational properties of the model endpoint and suggested constraints are hard limits, datatypes or numerical ranges that we want the inputs and outputs to adhere to.

model monitor has a buffer like 20 minutes


In [29]:
from sagemaker.model_monitor import CronExpressionGenerator

my_monitor.create_monitoring_schedule(
    monitor_schedule_name='my-monitoring-schedule',
    endpoint_input=predictor.endpoint_name, # endpoint name
    statistics=my_monitor.baseline_statistics(),
    constraints=my_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),# defines a schedule on which model monitor can run.
)

Now it is ready to check for performance and data drift of our model.

## Clarify

This Clarify demo builds on the previous demo: we follow the same pattern of define-configure-schedule for our Monitor. Clarify, however, needs more config. We define SHAPConfig, ModelConfig, ExplainabilityAnalysisConfig, and pass them all to the scheduling method.

In [31]:
x_train.columns.to_list()[0:3]

['minutes', 'hour', 'day_of_week']

In [34]:
model_explainability_monitor = sagemaker.model_monitor.ModelExplainabilityMonitor(
    role=role,
    sagemaker_session=sagemaker_session,
    max_runtime_in_seconds=1800,# how long before time out
)


shap_config = sagemaker.clarify.SHAPConfig(
    baseline=[x_train.mean().astype(int).to_list()[0:3]],
    num_samples=int(x_train.size),
    agg_method="mean_abs",
    save_local_shap_values=False,
)


model_config = sagemaker.clarify.ModelConfig(
    model_name="sklearn-snmp-cpu-1-2022-11-21-20-32-01-311",
    instance_count=1,
    instance_type='ml.m4.xlarge',
    content_type="text/csv",
    accept_type="text/csv",
)

analysis_config = sagemaker.model_monitor.ExplainabilityAnalysisConfig(
        explainability_config=shap_config,
        model_config=model_config,
        headers=x_train.columns.to_list()[0:3],
    )

explainability_uri = f"s3://{bucket}/model_explainability"
model_explainability_monitor.create_monitoring_schedule(
    output_s3_uri=explainability_uri,
    analysis_config=analysis_config,
    endpoint_input=predictor.endpoint_name,
    schedule_cron_expression=CronExpressionGenerator.hourly(),
)

In [None]:
import time
import random as rand

while i<5:
    time.sleep(rand.randint(1,60))# run at random number of seconds
    print("Inferring...")
    predictor.predict(inputs)
    i+=1

Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...
Inferring...


In [None]:
time.sleep(rand.randint(1,60))

In [3]:
#model_explainability_monitor.delete_monitoring_schedule()

In [None]:
my_monitor.delete_monitoring_schedule()

In [5]:
import boto3

# Specify your AWS Region
aws_region='us-east-1'

# Specify the name of your endpoint configuration
model_name='sklearn-snmp-cpu-1-2022-11-21-20-32-01-311'

# Create a low-level SageMaker service client.
sagemaker_client = boto3.client('sagemaker', region_name=aws_region)

# Delete model
sagemaker_client.delete_model(ModelName=model_name)
                        

{'ResponseMetadata': {'RequestId': '0983481d-c8b8-4b5f-b69f-dcc0d3ed82b1',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '0983481d-c8b8-4b5f-b69f-dcc0d3ed82b1',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Mon, 21 Nov 2022 22:19:41 GMT'},
  'RetryAttempts': 0}}

{'MonitoringScheduleSummaries': [],
 'ResponseMetadata': {'RequestId': '14db7b6b-1853-4a73-92d0-148a74c9108c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '14db7b6b-1853-4a73-92d0-148a74c9108c',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '34',
   'date': 'Mon, 21 Nov 2022 22:33:10 GMT'},
  'RetryAttempts': 0}}

In [10]:
sagemaker_session.delete_monitoring_schedule("monitoring-schedule-2022-11-21-21-25-00-454")


Deleting Monitoring Schedule with name: monitoring-schedule-2022-11-21-21-25-00-454


In [11]:
sagemaker_session.delete_monitoring_schedule("my-monitoring-schedule")


Deleting Monitoring Schedule with name: my-monitoring-schedule


In [12]:
sagemaker_session.delete_endpoint('sklearn-snmp-cpu-1-2022-11-21-20-32-01-311')

In [15]:
sagemaker_session.list_monitoring_schedules()

{'MonitoringScheduleSummaries': [],
 'ResponseMetadata': {'RequestId': '33e5cbdd-943f-409a-afc1-a3668ed0dad6',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '33e5cbdd-943f-409a-afc1-a3668ed0dad6',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '34',
   'date': 'Mon, 21 Nov 2022 22:34:16 GMT'},
  'RetryAttempts': 0}}

No monitoring schedules

# Batch processing
## Create a Model Package Group for the trained model to be registered

Create a new Model Package Group or use an existing one to register the model

In [12]:
import boto3
import time

client = boto3.client("sagemaker")

model_package_group_name = "sklearn-snmp-cpu" + str(round(time.time()))
model_package_group_input_dict = {
    "ModelPackageGroupName": model_package_group_name,
    "ModelPackageGroupDescription": "My sample sklearn model package group",
}

create_model_pacakge_group_response = client.create_model_package_group(
    **model_package_group_input_dict
)
model_package_arn = create_model_pacakge_group_response["ModelPackageGroupArn"]
print(f"ModelPackageGroup Arn : {model_package_arn}")


ModelPackageGroup Arn : arn:aws:sagemaker:us-east-1:940426109786:model-package-group/sklearn-snmp-cpu1669048935


### Register the model of the training job in the Model Registry

Once the model is registered, you will see it in the Model Registry tab of the SageMaker Studio UI. The model is registered with the approval_status set to "Approved". By default, the model is registered with the approval_status set to PendingManualApproval. Users can then navigate to the Model Registry to manually approve the model based on any criteria set for model evaluation or this can be done via API.

In [13]:
inference_instance_type = "ml.m5.xlarge"
model_package_1 = sklearn_estimator_1.register(
    model_package_group_name=model_package_arn,
    inference_instances=[inference_instance_type],
    transform_instances=[inference_instance_type],
    content_types=["text/csv"],
    response_types=["text/csv"],
    approval_status="Approved",
)

model_package_arn_1 = model_package_1.model_package_arn
print("Model Package ARN : ", model_package_arn_1)

Model Package ARN :  arn:aws:sagemaker:us-east-1:940426109786:model-package/sklearn-snmp-cpu1669048935/1


## Create a transform job with the default configurations from the model of the  training job

In [14]:
sklearn_1_transformer = model_package_1.transformer(
    instance_count=1, instance_type=inference_instance_type
)

In [15]:
sklearn_1_transformer.transform(eval_s3_prefix, split_type="Line", content_type="text/csv")

..........................[34m2022-11-21 16:49:55,293 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[34m2022-11-21 16:49:55,295 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[34m2022-11-21 16:49:55,296 INFO - sagemaker-containers - nginx config: [0m
[34mworker_processes auto;[0m
[34mdaemon off;[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log  /dev/stderr;[0m
[34mworker_rlimit_nofile 4096;[0m
[34mevents {
  worker_connections 2048;[0m
[34m}[0m
[34mhttp {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  access_log /dev/stdout combined;
  upstream gunicorn {
    server unix:/tmp/gunicorn.sock;
  }
  server {
    listen 8080 deferred;
    client_max_body_size 0;
    keepalive_timeout 3;
    location ~ ^/(ping|invocations|execution-parameters) {
      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header Host $http_host;
      proxy_redirect off;
    

Let's inspect the output of the Batch Transform job in S3. It should show the CPU in block group.

**Go to s3 and Download the from the s3 part**

In [16]:
sklearn_1_transformer.output_path

's3://sagemaker-us-east-1-940426109786/1-2022-11-21-16-45-45-088'

In [17]:
output_file_name = "eval.csv.out"

In [18]:
pd.read_csv(output_file_name, sep=",", header=None)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,10790,10791,10792,10793,10794,10795,10796,10797,10798,10799
0,[2.674172600256959,2.656889,2.673542,2.668324,2.661286,2.63597,2.632166,2.662212,2.652649,2.645131,...,2.662187,2.669324,2.664676,2.671665,2.652525,2.665082,2.663946,2.635937,2.638675,2.6566369938885632]


**Actual**

In [22]:
pd.read_csv("test.csv")['snmpcpu_processor_1']

0         2.0
1         2.0
2        38.0
3         4.0
4         2.0
         ... 
10795     2.0
10796     3.0
10797     2.0
10798     4.0
10799     3.0
Name: snmpcpu_processor_1, Length: 10800, dtype: float64

## Reference

[Data Drift](https://www.youtube.com/watch?v=J9T0X9Jxl_w&ab_channel=AWSEvents)