# Shadow Deployment for Asynchronous process, with Breast Cancer Prediction Model - 

A **Shadow deployment** consists of releasing version B alongside version A, fork version A's incoming requests and send them to version B as well without impacting production traffic. This is particularly useful to test production load on a new feature. 

In this notebook, we will utilize the **Data Capture** utility in ***[SageMaker Model Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html)***. Model Monitor continuously monitors the quality of Amazon SageMaker machine learning models in production. With Model Monitor, you can set alerts that notify you when there are deviations in the model quality. Early and proactive detection of these deviations enables you to take corrective actions, such as retraining models, auditing upstream systems, or fixing quality issues without having to monitor models manually or build additional tooling.

For **Shadow deployment** we will enable data capture and turn on the model monitor for real-time inference endpoint for model version 1 to capture data from requests and responses and store the captured data in Amazon S3 bucket. Using the file that data capture generates (input data), use batch transform to get inference for model version 2. Optionally, we can use Amazon Athena and Amazon Quicksight to prepare dashboard and gain insights from the inferences or simply run a hash compare between the two-inference data to show the differences.



# Architecture

![](images/Picture1.png)

## Setup

Let's start by specifying:

- The SageMaker role arn used to give learning and hosting access to your data. The snippet below will use the same role used by your SageMaker notebook instance, if you're using other. Otherwise, specify the full ARN of a role with the SageMakerFullAccess policy attached.
- The S3 bucket that you want to use for training and storing model objects.

In [None]:
import os
import boto3
import re
import sagemaker
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io
import time
import json
import csv
import math
import sagemaker.amazon.common as smac

role = sagemaker.get_execution_role()
region = boto3.Session().region_name

# S3 bucket for saving code and model artifacts.
# Feel free to specify a different bucket and prefix
sess = sagemaker.Session()
bucket = sagemaker.Session().default_bucket()

prefix = 'sagemaker/shadow-breast-cancer-prediction' # place to upload training files within the bucket

---
## Data

Data Source: https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data
        https://www.kaggle.com/uciml/breast-cancer-wisconsin-data

Let's download the data and save it in the local folder with the name data.csv and take a look at it.

In [None]:
data = pd.read_csv('https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data', header = None)

# specify columns extracted from wbdc.names
data.columns = ["id","diagnosis","radius_mean","texture_mean","perimeter_mean","area_mean","smoothness_mean",
                "compactness_mean","concavity_mean","concave points_mean","symmetry_mean","fractal_dimension_mean",
                "radius_se","texture_se","perimeter_se","area_se","smoothness_se","compactness_se","concavity_se",
                "concave points_se","symmetry_se","fractal_dimension_se","radius_worst","texture_worst",
                "perimeter_worst","area_worst","smoothness_worst","compactness_worst","concavity_worst",
                "concave points_worst","symmetry_worst","fractal_dimension_worst"] 

# save the data
data.to_csv("data.csv", sep=',', index=False)

# print the shape of the data file
print(data.shape)

# show the top few rows
display(data.head())

# describe the data object
display(data.describe())

# we will also summarize the categorical field diganosis 
display(data.diagnosis.value_counts())


#### Key observations:
* Data has 569 observations and 32 columns.
* First field is 'id'.
* Second field, 'diagnosis', is an indicator of the actual diagnosis ('M' = Malignant; 'B' = Benign).
* There are 30 other numeric features available for prediction.

## Create Features and Labels
#### Split the data into 80% training, 10% validation and 10% testing.

In [None]:
rand_split = np.random.rand(len(data))
train_list = rand_split < 0.8
val_list = (rand_split >= 0.8) & (rand_split < 0.9)
test_list = rand_split >= 0.9

data_train = data[train_list]
data_val = data[val_list]
data_test = data[test_list]

train_y = ((data_train.iloc[:,1] == 'M') +0).to_numpy();
train_X = data_train.iloc[:,2:].to_numpy();

val_y = ((data_val.iloc[:,1] == 'M') +0).to_numpy();
val_X = data_val.iloc[:,2:].to_numpy();

test_y = ((data_test.iloc[:,1] == 'M') +0).to_numpy();
test_X = data_test.iloc[:,2:].to_numpy();

In [None]:
#Write test data to a csv file

pd.DataFrame(test_X).to_csv("test_sample.csv",header=None, index=None)

Now, we'll convert the datasets to the recordIO-wrapped protobuf format used by the Amazon SageMaker algorithms, and then upload this data to S3.  We'll start with training data.

In [None]:
train_file = 'linear_train.data'

f = io.BytesIO()
smac.write_numpy_to_dense_tensor(f, train_X.astype('float32'), train_y.astype('float32'))
f.seek(0)

boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', train_file)).upload_fileobj(f)

Next we'll convert and upload the validation dataset.

In [None]:
validation_file = 'linear_validation.data'

f = io.BytesIO()
smac.write_numpy_to_dense_tensor(f, val_X.astype('float32'), val_y.astype('float32'))
f.seek(0)

boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation', validation_file)).upload_fileobj(f)

---
## Train

Now we can begin to specify our linear model.  Amazon SageMaker's Linear Learner actually fits many models in parallel, each with slightly different hyperparameters, and then returns the one with the best fit.  This functionality is automatically enabled.  We can influence this using parameters like:

- `num_models` to increase to total number of models run.  The specified parameters will always be one of those models, but the algorithm also chooses models with nearby parameter values in order to find a solution nearby that may be more optimal.  In this case, we're going to use the max of 32.
- `loss` which controls how we penalize mistakes in our model estimates.  For this case, let's use absolute loss as we haven't spent much time cleaning the data, and absolute loss will be less sensitive to outliers.
- `wd` or `l1` which control regularization.  Regularization can prevent model overfitting by preventing our estimates from becoming too finely tuned to the training data, which can actually hurt generalizability.  In this case, we'll leave these parameters as their default "auto" though.

### Specify container images used for training and hosting SageMaker's linear-learner

In [None]:
# See 'Algorithms Provided by Amazon SageMaker: Common Parameters' in the SageMaker documentation for an explanation of these values.
from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'linear-learner')

### Setup shadow model v1 training params

In [None]:
linear_job_v1 = 'breast-cancer-model-v1-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())


print("Job name is:", linear_job_v1)

linear_training_params_v1 = {
    "RoleArn": role,
    "TrainingJobName": linear_job_v1,
    "AlgorithmSpecification": {
        "TrainingImage": container,
        "TrainingInputMode": "File"
    },
    "ResourceConfig": {
        "InstanceCount": 1,
        "InstanceType": "ml.c4.2xlarge",
        "VolumeSizeInGB": 10
    },
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": "s3://{}/{}/train/".format(bucket, prefix),
                    "S3DataDistributionType": "ShardedByS3Key"
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None"
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": "s3://{}/{}/validation/".format(bucket, prefix),
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None"
        }

    ],
    "OutputDataConfig": {
        "S3OutputPath": "s3://{}/{}/".format(bucket, prefix)
    },
    "HyperParameters": {
        "feature_dim": "30",
        "mini_batch_size": "100",
        "predictor_type": "regressor",
        "epochs": "10",
        "num_models": "32",
        "loss": "absolute_loss"
    },
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 60 * 60
    }
}

### Setup shadow model v2 training params

In [None]:
linear_job_v2 = 'breast-cancer-model-v2-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

print("Job name is:", linear_job_v2)

linear_training_params_v2 = {
    "RoleArn": role,
    "TrainingJobName": linear_job_v2,
    "AlgorithmSpecification": {
        "TrainingImage": container,
        "TrainingInputMode": "File"
    },
    "ResourceConfig": {
        "InstanceCount": 1,
        "InstanceType": "ml.c4.2xlarge",
        "VolumeSizeInGB": 10
    },
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": "s3://{}/{}/train/".format(bucket, prefix),
                    "S3DataDistributionType": "ShardedByS3Key"
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None"
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": "s3://{}/{}/validation/".format(bucket, prefix),
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None"
        }

    ],
    "OutputDataConfig": {
        "S3OutputPath": "s3://{}/{}/".format(bucket, prefix)
    },
    "HyperParameters": {
        "feature_dim": "30",
        "mini_batch_size": "50",
        "predictor_type": "regressor",
        "epochs": "5",
        "num_models": "16",
        "loss": "huber_loss"
    },
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 60 * 60
    }
}

Now let's kick off our training job in SageMaker's distributed, managed training, using the parameters we just created.  Because training is managed, we don't have to wait for our job to finish to continue, but for this case, let's use boto3's 'training_job_completed_or_stopped' waiter so we can ensure that the job has been started.

### Create shadow model v1 training job

In [None]:
%%time

region = boto3.Session().region_name
sm = boto3.client('sagemaker')

sm.create_training_job(**linear_training_params_v1)

status = sm.describe_training_job(TrainingJobName=linear_job_v1)['TrainingJobStatus']
print(status)
sm.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=linear_job_v1)
if status == 'Failed':
    message = sm.describe_training_job(TrainingJobName=linear_job_v1)['FailureReason']
    print('Training failed with the following error: {}'.format(message))
    raise Exception('Training job failed')

### Create shadow model v2 training job

In [None]:
%%time

region = boto3.Session().region_name
sm = boto3.client('sagemaker')

sm.create_training_job(**linear_training_params_v2)

status = sm.describe_training_job(TrainingJobName=linear_job_v2)['TrainingJobStatus']
print(status)
sm.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=linear_job_v2)
if status == 'Failed':
    message = sm.describe_training_job(TrainingJobName=linear_job_v2)['FailureReason']
    print('Training failed with the following error: {}'.format(message))
    raise Exception('Training job failed')

---
## Host

Now that we've trained the linear algorithm on our data, let's setup a model which can later be hosted.  We will:
1. Point to the scoring container
1. Point to the model.tar.gz that came from training
1. Create the hosting model

In [None]:
#Deploy the model-1 to Amazon Sagemaker

from time import gmtime, strftime
from sagemaker.model import Model
from sagemaker.amazon.amazon_estimator import get_image_uri

model1_url = 'https://{}.s3-{}.amazonaws.com/{}/{}/output/model.tar.gz'.format(bucket, region, prefix,linear_job_v1)
image_uri1 = get_image_uri(boto3.Session().region_name, 'linear-learner')
model1 = Model(image_uri1, model_data=model1_url, role=role)

print(model1)

In [None]:
#Deploy the model-2 to Amazon Sagemaker

model2_url = 'https://{}.s3-{}.amazonaws.com/{}/{}/output/model.tar.gz'.format(bucket, region, prefix,linear_job_v2)
image_uri2 = get_image_uri(boto3.Session().region_name, 'linear-learner')
model2 = Model(image_uri2, model_data=model2_url, role=role)

print(model2)

In [None]:
data_capture_prefix = '{}/datacapture'.format(prefix)
s3_capture_upload_path = 's3://{}/{}'.format(bucket, data_capture_prefix)
reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(bucket,reports_prefix)
code_prefix = '{}/code'.format(prefix)

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))
print("Default bucket is...", bucket)

## Enable Model Monitoring with Data Capture for model v1

In [None]:
## Create DataCaptureConfig and Enable DataCapture for model v1

from sagemaker.model_monitor import DataCaptureConfig

endpoint_name1 = 'Shadow-breast-cancer-1-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("EndpointName={}".format(endpoint_name1))

data_capture_config = DataCaptureConfig(
                        enable_capture=True,
                        sampling_percentage=100,
                        destination_s3_uri=s3_capture_upload_path)

predictor = model1.deploy(initial_instance_count=1,
                    instance_type='ml.m4.xlarge',
                    endpoint_name=endpoint_name1,
                    data_capture_config=data_capture_config)

print(model1.name)

## Deploy model v2

In [None]:
#Deploy mmodel v2

endpoint_name2 = 'Shadow-breast-cancer-2-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

print("EndpointName={}".format(endpoint_name2))

predictor2 = model2.deploy(initial_instance_count=1,
                    instance_type='ml.m4.xlarge',
                    endpoint_name=endpoint_name2)

print(model2.name)

In [None]:
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.predictor import Predictor

predictor = Predictor(endpoint_name=endpoint_name1, 
                        sagemaker_session=sess,
                        serializer=CSVSerializer(),            
                        deserializers=JSONDeserializer())

In [None]:
from sagemaker.predictor import Predictor
    
print("Sending data to the endpoint")
with open('test_sample.csv', 'r') as f:
    for row in f:
        payload = row.rstrip('\n')
        response = predictor.predict(payload)
        time.sleep(1)
        
print("Done!")    

## View captured data 

In [None]:
#Note: It takes a few seconds for the capture data to appear in S3

s3_client = boto3.Session().client('s3')
current_endpoint_capture_prefix = '{}/{}'.format(data_capture_prefix, endpoint_name1)
print(current_endpoint_capture_prefix)
result = s3_client.list_objects(Bucket=bucket, Prefix=current_endpoint_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]
print("Found Capture Files:")
print("\n ".join(capture_files))

In [None]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(capture_files[-1])
print(capture_file[:2000])

In [None]:
import json

print(json.dumps(json.loads(capture_file.split('\n')[0]), indent=2))

## Save the Captured Input from Real time Inference

In [None]:
#Save the input data from DataCapture to send to Batch Prediction 

with open('Captured_input.csv','w') as file:
    for i in range(20):
        data = json.loads(capture_file.split('\n')[i])
        file.write(data["captureData"]["endpointInput"]["data"])
        file.write('\n')  

## Save the output from Real time Prediction for comparison

In [None]:
with open('RealTimePrediction_output.csv','w') as file:
    for i in range(20):
        data = json.loads(capture_file.split('\n')[i])
        file.write(data["captureData"]["endpointOutput"]["data"])
        file.write('\n')

In [None]:
!printf "\n\nShowing first five lines\n\n"    
!head -n 5 Captured_input.csv 

In [None]:
#Upload Captured Input file to S3 bucket
batch_input_loc = sess.upload_data(path='Captured_input.csv', bucket=bucket, key_prefix='batch')

In [None]:
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

input_data_path = 's3://{}/{}/{}'.format(bucket, 'batch', 'Captured_input.csv')
output_prefix = '{}/{}'.format('batch_output', timestamp_prefix)
output_data_path = 's3://{}/{}'.format(bucket,output_prefix)

#output_data_path = 's3://{}/{}/{}'.format(bucket, 'batch_output', timestamp_prefix)
job_name = 'shadow-deployment-job-' + timestamp_prefix

print(input_data_path)
print(output_data_path)

## Batch Transform on model v2 using the captured data

In [None]:
#Create Batch transform function with model v2

transformer = sagemaker.transformer.Transformer(
    model_name = model2.name,
    instance_count = 1,
    instance_type = 'ml.m5.xlarge',
    strategy = 'SingleRecord',
    assemble_with = 'Line',
    output_path = output_data_path,
    base_transform_job_name='serial-inference-batch ',
    sagemaker_session=sess,
    accept = 'text/csv'
)

In [None]:
#Calling Batch transform with Captured data from RealTime Prediction

transformer.transform(data = input_data_path,
                      job_name = job_name,
                      content_type = 'text/csv',
                      split_type = 'Line')
transformer.wait()

In [None]:
#Download the output file from Batch Transform

s3_file_path= '{}/Captured_input.csv.out'.format(output_prefix)
save_as = 'BatchPrediction_output.csv'

try:
    s3_client.download_file(bucket,s3_file_path, save_as)
    print("File downloaded successfully!")
except:
    print("File not found!")

## Compare the difference between Realtime and Batch Inference outputs for the two models

In [None]:
Realtime_predictions = []
Batch_predictions = []

with open('RealTimePrediction_output.csv') as read_obj:
    csv_reader = csv.reader(read_obj)
    for row in csv_reader:
        data = json.loads(row[0])
        Realtime_predictions.append(data["predictions"][0]["score"])
        
print("Realtime predictions - ",Realtime_predictions)

with open('BatchPrediction_output.csv') as read_obj:
    csv_reader = csv.reader(read_obj)
    for row in csv_reader:
        Batch_predictions.append(float(row[0]))
        
print("Batch predictions - ",Batch_predictions)

In [None]:
def compare_values(list1,list2):
    difference = []
    if len(list1) != len(list2):
        print("Predictions are not of the same sample size")
    else:
        print("The sample size is same..let's proceed with comparison..")
        zip_object = zip(list1, list2)
        
        for list1_i, list2_i in zip_object:
            difference.append(abs(list1_i-list2_i))
            
    return difference 

       
diff = compare_values(Realtime_predictions,Batch_predictions)
print(diff)

Alternatively, we can use Amazon Athena and Amazon Quicksight to prepare dashboard and gain insights from the inferences

## Delete the resources

You can keep your endpoint running to continue capturing data. If you do not plan to collect more data or use this endpoint further, you should delete the endpoint to avoid incurring additional charges. Note that deleting your endpoint does not delete the data that was captured during the model invocations. That data persists in Amazon S3 until you delete it yourself.

In [None]:
#sm.delete_endpoint(EndpointName=endpoint_name1)
#sm.delete_endpoint(EndpointName=endpoint_name2)