In [None]:
%%capture

!pip install tensorflow==2.3.0
!pip install sagemaker-experiments

#### Import Necessary Libraries

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sagemaker.tensorflow.serving import TensorFlowModel
from sagemaker.multidatamodel import MultiDataModel
from tensorflow.keras.datasets import cifar10
from sagemaker.tuner import ContinuousParameter,  IntegerParameter, HyperparameterTuner
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.analytics import TrainingJobAnalytics
from sagemaker.analytics import ExperimentAnalytics
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.model_monitor import CronExpressionGenerator
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing import image
from sagemaker.tensorflow.model import TensorFlowPredictor
from IPython.display import Image
from time import gmtime, strftime
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker import get_execution_role
from tensorflow.keras import utils
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
from datetime import datetime
import time
import tensorflow as tf
import numpy as np
import sagemaker
import logging
import boto3
import time
import os

#### Setup Logger

In [None]:
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
!python --version

In [None]:
logger.info(f'[Using TensorFlow version: {tf.__version__}]')
logger.info(f'[Using SageMaker version: {sagemaker.__version__}]')

#### Seed for Reproducability

In [None]:
SEED = 123
np.random.seed(SEED)
tf.random.set_seed(SEED)

#### Create Roles, Sessions and Data Locations

In [None]:
role = get_execution_role()
session = boto3.Session()
sagemaker_session = sagemaker.Session()

s3 = session.resource('s3')
TF_FRAMEWORK_VERSION = '2.3.0'
BUCKET = sagemaker.Session().default_bucket()
PREFIX = 'cv-models'
MONITORING_FOLDER = 'DEMO-tf2-ModelMonitor'

# Load the Cifar Dataset

In [None]:
(X_train, y_train), (X_test, y_test) = cifar10.load_data()

In [None]:
logger.info(f'X_train Shape: {X_train.shape}')
logger.info(f'y_train Shape: {y_train.shape}')
logger.info(f'X_test Shape : {X_test.shape}')
logger.info(f'y_test Shape : {y_test.shape}')

#### Data Preparation

##### Rescale 
Rescales the images by dividing the pixel values by 255: [0,255] ⇒ [0,1]

In [None]:
X_train = X_train.astype('float32')/255
X_test = X_test.astype('float32')/255

##### One Hot Encode Target Labels
One-hot encoding is a process by which categorical variables are converted into a numeric form. One-hot encoding converts the (1 × n) label vector to a label matrix of dimensions (10 × n), where n is the number of sample images. So, if we have 1,000 images in our dataset, the label vector will have the dimensions (1 × 1000). After one-hot encoding, the label matrix dimensions will be (1000 × 10). That’s why, when we define our network architecture in the next step, we will make the output softmax layer contain 10 nodes, where each node represents the probability of each class we have.

In [None]:
num_classes = len(np.unique(y_train))
y_train = utils.to_categorical(y_train, num_classes)
y_test = utils.to_categorical(y_test, num_classes)

##### Split Data
Break original train set further into train and validation sets.

In [None]:
X_train, X_validation = X_train[500:], X_train[:500]
y_train, y_validation = y_train[500:], y_train[:500]

##### Save to Local

Create a local `data/cifar_10` directory to save the datasets.

In [None]:
DATASET_PATH = './data/cifar_10'

In [None]:
os.makedirs(DATASET_PATH, exist_ok=True)

Save train, validation and test sets to local `data` directory

In [None]:
np.save(f'{DATASET_PATH}/X_train.npy', X_train)
np.save(f'{DATASET_PATH}/y_train.npy', y_train)
np.save(f'{DATASET_PATH}/X_validation.npy', X_validation)
np.save(f'{DATASET_PATH}/y_validation.npy', y_validation)
np.save(f'{DATASET_PATH}/X_test.npy', X_test)
np.save(f'{DATASET_PATH}/y_test.npy', y_test)

##### Copy Datasets to S3
Copy train, validation and test sets from the local dir to S3, since SageMaker expects datasets to be in S3 for training.

In [None]:
!aws s3 cp ./{DATASET_PATH}/X_train.npy s3://{BUCKET}/{PREFIX}/cifar_10/train/
!aws s3 cp ./{DATASET_PATH}/y_train.npy s3://{BUCKET}/{PREFIX}/cifar_10/train/
!aws s3 cp ./{DATASET_PATH}/X_validation.npy s3://{BUCKET}/{PREFIX}/cifar_10/validation/
!aws s3 cp ./{DATASET_PATH}/y_validation.npy s3://{BUCKET}/{PREFIX}/cifar_10/validation/
!aws s3 cp ./{DATASET_PATH}/X_test.npy s3://{BUCKET}/{PREFIX}/cifar_10/test/
!aws s3 cp ./{DATASET_PATH}/y_test.npy s3://{BUCKET}/{PREFIX}/cifar_10/test/

# Create Training Inputs

In [None]:
train_input = TrainingInput(s3_data=f's3://{BUCKET}/{PREFIX}/cifar_10/train', 
                            distribution='FullyReplicated', 
                            content_type='npy')
validation_input = TrainingInput(s3_data=f's3://{BUCKET}/{PREFIX}/cifar_10/validation', 
                                 distribution='FullyReplicated', 
                                 content_type='npy')
test_input = TrainingInput(s3_data=f's3://{BUCKET}/{PREFIX}/cifar_10/test', 
                           distribution='FullyReplicated', 
                           content_type='npy')

In [None]:
inputs = {'train': train_input, 'val': validation_input, 'test': test_input}

# Prepare a Experiment Tracker
SageMaker Experiment tracker works on a hierarchical model of Experiment, Trial and Trial component respectively.
Amazon SageMaker Experiments is a capability of Amazon SageMaker that lets you organize, track, compare, and evaluate your machine learning experiments.
A trial consists of one or more trial components, such as a data preprocessing job and a training job.
The TrialName is passed on while fitting an ML model such that the job is recorded by the TrialName.

An Experiment can be separated for a certain project, while TrialName can be related to varying model training jobs.

In [None]:
sm = boto3.client('sagemaker')

In [None]:
cifar_experiment = Experiment.create(
    experiment_name="cifar-10-dataset-experiment", 
    description="objects", 
    sagemaker_boto_client=sm)

In [None]:
for num_hidden_channel in [32]:
    trial_name = f"cnn-training-job-{num_hidden_channel}-hidden-channels-{int(time.time())}"
    cnn_trial = Trial.create(
        trial_name=trial_name, 
        experiment_name=cifar_experiment.experiment_name,
        sagemaker_boto_client=sm,
    )

# Using HyperParameter Tuner (OPTIONAL)
 Its is a feature provided by SageMaker. We can provide a range of possible values and the SageMaker will check for the best combination of hyperparameters in that range.
 A down side to it is that we cannot use Experiment Tracker while using Hyperparameter Tuner.

In [None]:
from sagemaker.tuner import ContinuousParameter, HyperparameterTuner
hyperparameter_ranges = {
    "learning_rate": ContinuousParameter(1e-4, 1, scaling_type="Logarithmic")
}

In [None]:
model_name = 'cifar-10'
estimator_parameters = {'entry_point':'cifar_train.py',
                        'instance_type': 'ml.m5.2xlarge',
                        'instance_count': 1,
                        'model_dir': '/opt/ml/model',
                        'role': role,
                        'output_path': f's3://{BUCKET}/{PREFIX}/cifar_10/out',
                        'base_job_name': f'mme-cv-{model_name}',
                        'framework_version': TF_FRAMEWORK_VERSION,
                        'py_version': 'py37',
                        'script_mode': True}
model = TensorFlow(**estimator_parameters)

In [None]:
objective_metric_name = "loss"
objective_type = "Minimize"
metric_definitions = [{"Name": "loss", "Regex": "loss : ([0-9\\.]+)"}]
tuner = HyperparameterTuner(
    model,
    objective_metric_name,
    hyperparameter_ranges,
    metric_definitions,
    max_jobs=9,
    max_parallel_jobs=5,
    objective_type=objective_type,
)

In [None]:
tuner.fit(inputs, wait=True)  # EXPERIMENT CONFIG CANNOT BE USED IN HYPER PARAMETER TUNING

#### Getting Tuner Information

In [None]:
print(tuner.latest_tuning_job.job_name)
boto3.client("sagemaker").describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=tuner.latest_tuning_job.job_name
) #tensorflow-training-220627-0556

# Training Without HyperParameter Tuning

In [None]:
model_name = 'cifar-10'
hyperparameters = {'epochs': 1}
estimator_parameters = {'entry_point':'cifar_train.py',
                        'instance_type': 'ml.m5.2xlarge',
                        'instance_count': 1,
                        'model_dir': '/opt/ml/model',  # This shouldn't be changed
                        'role': role,
                        'hyperparameters': hyperparameters,
                        'output_path': f's3://{BUCKET}/{PREFIX}/cifar_10/out',
                        'base_job_name': f'mme-cv-{model_name}',
                        'framework_version': TF_FRAMEWORK_VERSION,
                        'py_version': 'py37',
                        'script_mode': True}
model = TensorFlow(**estimator_parameters)

In [None]:
# USE THE TRIAL NAME THAT YOU HAVE CREATED
cnn_training_job_name = "cnn-training-job-{}".format(int(time.time()))
print(cnn_training_job_name)
model.fit(inputs, job_name=cnn_training_job_name,
        experiment_config={
            "ExperimentName": "cifar-10-dataset-experiment", 
            "TrialName": 'cnn-training-job-32-hidden-channels-1655871250',
            "TrialComponentDisplayName": "Training",
        })

#### Viewing Contents of the recorded experiment

In [None]:
trial_component_analytics = ExperimentAnalytics(
    experiment_name='cifar-10-dataset-experiment',
    sort_by="metrics.test:accuracy.max",
    sort_order="Descending",
    metric_names=['test:accuracy'],
   # parameter_names=['hidden_channels', 'epochs', 'dropout', 'optimizer']
)
analytic_table = trial_component_analytics.dataframe()
analytic_table.head()

In [None]:
# CONSIDER USING THE TRAINING JOB NAME THAT YOU HAVE CREATED 
analytics = TrainingJobAnalytics(training_job_name = 'cnn-training-job-1655871262', metric_names=['test:accuracy'])

# Experiment Cleanup (In Case the Experiment is No Longer needed)

In [None]:
def cleanup_sme_sdk(experiment):
    for trial_summary in experiment.list_trials():
        trial = Trial.load(trial_name=trial_summary.trial_name)
        for trial_component_summary in trial.list_trial_components():
            tc = TrialComponent.load(
                trial_component_name=trial_component_summary.trial_component_name)
            trial.remove_trial_component(tc)
            try:
                # comment out to keep trial components
                tc.delete()
            except:
                # tc is associated with another trial
                continue
            # to prevent throttling
            time.sleep(.5)
        trial.delete()
        experiment_name = experiment.experiment_name
    experiment.delete()
    print(f"\nExperiment {experiment_name} deleted")

In [None]:
cleanup_sme_sdk('cifar-10-dataset-experiment')

# Deploy

#### Load Trained Model (Optional, Not To be used if New model is to be deployed)

In [None]:
# Update the model path with the path to the .tar.gz file
model_path = f's3://{BUCKET}/cv-models/cifar_10/out/cnn-training-job-1655978931/output/model.tar.gz'

model = TensorFlowModel(model_data=model_path, role=role, framework_version="2.3.0")

#### Prepare a Data Capture Config
Data Capture Config is used to provide a storage location and storage mechanism for the datas being sent to the deployed endpoint. It is only required to be implemented if we are also implementing Model Monitoring.

In [None]:
from sagemaker.model_monitor import DataCaptureConfig
data_capture_prefix = "{}/monitoring/datacapture/".format(MONITORING_FOLDER)
s3_capture_upload_path = "s3://{}/{}".format(BUCKET, data_capture_prefix)

data_capture_configuration=DataCaptureConfig(
        enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
    )

#### Deploy the model

In [None]:
endpoint_name=f'tensorflow-cv-{int(time.time())}'
predictor = model.deploy(initial_instance_count=1,   # USE tuner.deploy if HyperParameter Tuner has been used.
                       instance_type='ml.m5.xlarge',
                       endpoint_name=endpoint_name,
 data_capture_config = data_capture_configuration)
print(f"\nSuccessfully deployed at {endpoint_name}...")

In [None]:
import boto3
sm.describe_endpoint(EndpointName = endpoint_name)

# Invoking the Deployed Model

In [None]:
%matplotlib inline
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing import image
from IPython.display import Image
import matplotlib.image as mpimg 
import matplotlib.pyplot as plt
import numpy as np
CIFAR10_LABELS = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

In [None]:
# in case you are trying to invoke an existing endpoint.
from sagemaker.tensorflow.model import TensorFlowPredictor
predictor = TensorFlowPredictor(endpoint_name = endpoint_name)      

In [None]:
img = load_img('./data/cifar_10/raw_images/jeep.png', target_size=(32, 32))
data = img_to_array(img)
data = data.astype('float32')
data = data / 255.0
data = data.reshape(1, 32, 32, 3)

In [None]:
payload = {'instances': data}

In [None]:
resp = predictor.predict(payload)
predicted_label = CIFAR10_LABELS[np.argmax(resp['predictions'])]
print(f'Predicted Label: [{predicted_label}]')

# Sending Test Traffic

In [None]:
%%time

import time
print("Sending test traffic to the endpoint {}. \nPlease wait...".format(endpoint_name))

flat_list = []
for i in range(100):
    data = np.array([X_test[i]])
    payload = {'instances': data}
    resp = predictor.predict(payload)
    predicted_label = CIFAR10_LABELS[np.argmax(resp['predictions'])]
    flat_list.append(predicted_label)
    time.sleep(0.5)

print("Done!")
print("predictions: \t{}".format(np.array(flat_list)))

##### The Input data is stored in the Data Input Capture Location

In [None]:
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=BUCKET, Prefix=data_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get("Contents")]
print("Found Capture Files:")
print("\n ".join(capture_files))
print(BUCKET)

# Prepare baseline Dataset
To implement Model Monitoring, we also need to provide a Baseline value. Generally the Training data itself is used as the baseline dataset.
The baseline data has to be a CSV or a JSON and should contain the fields: probability, prediction, label.

In [None]:
validate_dataset = "validation_with_predictions.csv"

In [None]:
data = np.array([X_test[i]])
    payload = {'instances': data}
    resp = predictor.predict(payload)
    predicted_label = CIFAR10_LABELS[np.argmax(resp['predictions'])]

In [None]:
i = 0
with open(f"{validate_dataset}", "w") as baseline_file:
    baseline_file.write("probability,prediction,label\n")  # our header
    for i in range(1000):
        data = np.array([X_train[i]])
        payload = {'instances': data}
        resp = predictor.predict(payload)
        probability = max(resp['predictions'][0])
        prediction = np.argmax(resp['predictions'])
        label = y_train[i][0]
        
        baseline_file.write(f"{probability},{prediction},{label}\n")
print("Done!")

#### Copy the File to a S3 Location

In [None]:
!aws s3 cp ./validation_with_predictions.csv s3://{BUCKET}/DEMO-tf2-ModelMonitor/monitoring/baseline/

# MODEL MONITORING

Model Monitoring requires endpoint to be invoked conituously while also generating corresponding ground truth for the input dataset. Thus we generate a network traffic at the endpoint and also generate a corresponding ground truth for the given inputs. As the code below.

In [None]:
import random
from sagemaker.s3 import S3Downloader, S3Uploader
import pandas as pd
import uuid
from datetime import datetime
import numpy as np
import time
MONITORING_FOLDER = 'DEMO-tf2-ModelMonitor'
BUCKET = 'sagemaker-us-east-1-949263681218'
data_capture_prefix = "{}/monitoring/ground_truth".format(MONITORING_FOLDER)
ground_truth_upload_path = "s3://{}/{}".format(BUCKET, data_capture_prefix)

def generate_load_and_ground_truth():
    df = pd.read_csv('validation_with_predictions.csv')
    gt_records = []
    for i, row in df.iterrows():
        suffix = uuid.uuid1().hex
        inference_id = f'{i}-{suffix}'
        data = np.array([X_test[i]])
        payload = {'instances': data}
        args = {'InferenceId': inference_id}
        out = predictor.predict(data = payload, initial_args = args)
        gt_records.append(str({
            "groundTruthData": {
                "data": str(df['label'][i]),
                "encoding": 'CSV',
            },
            "eventMetadata": {
                "eventId": str(inference_id),
            },
            "eventVersion": "0",
        }))
    upload_ground_truth(gt_records, ground_truth_upload_path, datetime.utcnow())


def upload_ground_truth(records, path, upload_time):
    data_to_upload = ",".join(records)
    data_to_upload = data_to_upload
    target_s3_uri = f"{path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"  # This Datewise Folder Hierarchy is a must. s3://bucket/prefixyyyy/mm/dd/hh
    print(f"Uploading {len(records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)


def generate_load_and_ground_truth_forever():
    # for _ in range(2):
    while True:
        generate_load_and_ground_truth()

In [None]:
from threading import Thread
thread = Thread(target=generate_load_and_ground_truth_forever)
thread.start()

#### Create Model Quality Monitor

In [None]:
from sagemaker.model_monitor import ModelQualityMonitor, EndpointInput, DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

In [None]:
my_default_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
)

#### Generate constraints and statistics

In [None]:
baseline_job_name = f"ModelQualityMonitor-test-{datetime.utcnow():%Y-%m-%d-%H-%M}"
baseline_data_uri = f's3://{BUCKET}/{MONITORING_FOLDER}/monitoring/baseline'
baseline_data_output_uri = f's3://{BUCKET}/{MONITORING_FOLDER}/monitoring/baseline/output'
job = my_default_monitor.suggest_baseline(
    job_name = baseline_job_name,
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_data_output_uri,
    problem_type = 'MulticlassClassification',
    inference_attribute="prediction",
    probability_attribute="probability",
    ground_truth_attribute="label"
)
job.wait(logs=False)

In [None]:
baseline_job = my_default_monitor.latest_baselining_job

#### Check the constraints and Statistics Value

In [None]:
import pandas as pd
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

In [None]:
constraints_df = pd.io.json.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

#### Prepare a Function which handles the input data on the endpoint before monitoring

In [None]:
%%writefile preprocessing.py
import json

def preprocess_handler(inference_record):
    input_dict = json.loads(inference_record.endpoint_input.data)
    output_dict = json.loads(inference_record.endpoint_output.data)
    input_data = str(input_dict['instances'].reshape(3072))[1:-1]
    output_data = str(np.argmax(output_dict['predictions'][0]))
    return_dict = {'prediction000':output_data, 'feature000':input_data}
    return return_dict

In [None]:
preprocessor_s3_dest_path = f"s3://{BUCKET}/{MONITORING_FOLDER}/monitoring/preprocessor"
preprocessor_s3_dest = sagemaker.s3.S3Uploader.upload("preprocessing.py", preprocessor_s3_dest_path)
print(preprocessor_s3_dest)

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

s3_report_path = 's3://{BUCKET}/{MONITORING_FOLDER}/monitoring/preprocessor/processed_output'
mon_schedule_name = "DEMO-tf2-model-monitor-schedule-" + strftime("%Y%m%d-%H%M%S", gmtime())
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    record_preprocessor_script=preprocessor_s3_dest,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

In [None]:
from threading import Thread
thread = Thread(target=generate_load_and_ground_truth_forever)
thread.start()

# Delete Endpoint

In [None]:
import boto3
sm = boto3.client('sagemaker')

In [None]:
sm.list_endpoints()

In [None]:
endpoint_name='tensorflow-cv-1656302831'

#### Delete Monitoring Schedules related to The Endpoint

In [None]:
sm.list_monitoring_schedules(EndpointName = endpoint_name)

In [114]:
sm.describe_monitoring_schedule(MonitoringScheduleName = 'DEMO-tf2-model-monitor-schedule-20220627-073726')

{'MonitoringScheduleArn': 'arn:aws:sagemaker:us-east-1:949263681218:monitoring-schedule/demo-tf2-model-monitor-schedule-20220627-073726',
 'MonitoringScheduleName': 'DEMO-tf2-model-monitor-schedule-20220627-073726',
 'MonitoringScheduleStatus': 'Scheduled',
 'MonitoringType': 'DataQuality',
 'CreationTime': datetime.datetime(2022, 6, 27, 7, 37, 27, 456000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 6, 27, 9, 6, 12, 538000, tzinfo=tzlocal()),
 'MonitoringScheduleConfig': {'ScheduleConfig': {'ScheduleExpression': 'cron(0 * ? * * *)'},
  'MonitoringJobDefinitionName': 'data-quality-job-definition-2022-06-27-07-37-27-181',
  'MonitoringType': 'DataQuality'},
 'EndpointName': 'tensorflow-cv-1656312038',
 'LastMonitoringExecutionSummary': {'MonitoringScheduleName': 'DEMO-tf2-model-monitor-schedule-20220627-073726',
  'ScheduledTime': datetime.datetime(2022, 6, 27, 9, 0, tzinfo=tzlocal()),
  'CreationTime': datetime.datetime(2022, 6, 27, 9, 5, 50, 205000, tzinfo=tzlocal()

In [None]:
sm.delete_monitoring_schedule(MonitoringScheduleName = 'DEMO-tf2-model-monitor-schedule-2022-06-27-04-43-15')

In [None]:
sm.delete_endpoint(EndpointName=endpoint_name)