# Purchase Modelling with Amazon SageMaker

In this notebook we will demostrate how one can use the browsing data of users in a fictional website to determine which customers are more likely to purchase, and allow us to target them.

We will demostrate several capabilities of Amazon SageMaker that make training, deploying, and monitoring models easier.
We will perform some simple data exploration steps to start, then we will use `XGBoost` to train a model that takes in the browsing data of users and predicts whether they will make a purchase.

We will then demonstrate how we can use model monitor to detect when the incoming data have issues, alerting us to potential data problems.

Finally we will perform a accuracy evaluation on our model.

In [None]:
import sys
sys.path.insert(0, './src/')

## Data Exploration

The data we use for this solution is dataset of simulated user visits to a fictional website. We will split the data into two parts: one that we will use to train our model and on that we will use evaluate its accuracy, and demonstrate the use of Model Monitor.

In [None]:
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
plt.style.use('seaborn-whitegrid')
mpl.rcParams['figure.dpi'] = 150
import seaborn as sns
import time

In [None]:
import boto3
from package import config

In [None]:
train_file = "training_sample.csv"
s3 = boto3.resource('s3')
object = s3.Object(f"{config.SOLUTIONS_S3_BUCKET}-{config.AWS_REGION}",f"{config.SOLUTION_NAME}/data/{train_file}")
object.download_file(train_file)

In [None]:
train_data = pd.read_csv(train_file, index_col="UserID")

Each row in our dataset is a user session, where binary feature columns like "basket_icon_click", "device_mobile", "loc_uk" indicate whether the user took a specific action during their visit, or more general information like the device they used and if the user was in the UK.

In [None]:
train_data

By performing a simple statistics analysis we can see that some features like "returning_user" are balanced, while other like "saw_delivery" barely have any positive values.

In [None]:
train_data.describe()

The label for our data is the `ordered` column that indicates whether the user ordered an item during their session.
As expected our data is skewed, with most sessions not resulting in an order:

In [None]:
train_data.ordered.value_counts()

We can see from the above that out of the ~451k sessions, around 19k included an order. Next we will examine the correlations between pairs of features, and then focus on the correlations between the various features and the dependent variable we are trying to predict, `ordered`.

In [None]:
correlations = train_data.corr()

In [None]:
sns.heatmap(correlations, cmap="YlGnBu")

In the pairwise correlations, we can see that certain features are highly correlated like `sign_in` and `saw_checkout`, and others are very negatively correlated like `device_computer` and `device_mobile`. We are mostly interested to see which features most correlate with the `ordered` column, so let's "zoom in" on those correlations.

In [None]:
correlations['ordered'].drop(['ordered']).sort_values(ascending=False).plot(kind='bar')

We can see that several features are highly correlated with our label, like `checked_delivery_detail` and `saw_checkout`. This should make our prediction problem relatively straight-forward, so we will use a relatively small tree model to perform our predictions, using Amazon SageMaker's support for the `XGBoost` framework.

## Training a classification model for purchase prediction

We are now ready to train our predictive model. Using our labeled data, we will train a classifier that can give us the likelihood of a session including a purchase. We can then use this model to determine sessions that are most likely to end with the user purchasing an item, and use that to target potential customers.

First, we will split our data into a train and test set. This will allow us to estimate the accuracy of our model when it's time to deploy it. Because our dataset is small, we will perform this split using `scikit-learn` on the notebook. If we had a large dataset we could use [Amazon SageMaker Processing](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html) to offload the work to a single processing instance or even use an Apache Spark cluster.

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split

X = train_data.drop(['ordered'], axis=1)
y = train_data['ordered']

# Because our data are unbalanced, we startify our split to ensure we have similar proportions of labels in each split
X_train, X_test, y_train, y_test = train_test_split(
     X, y, test_size=0.2, stratify=y, random_state=42)


Let's prepare our `XGBoost` estimator. We set the target location on S3 for our data, as well as the model output. We will use a single `ml.m5.2xlarge` instance for this training since our data are small. Even if the dataset was very large, `XGBoost` is a highly scalable algorithm, and can deal with massive data by using bigger instances, or scaling out to multiple instances.

In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri 

data_location=f"s3://{config.MODEL_DATA_S3_BUCKET}/data/"
output_path=f"s3://{config.MODEL_DATA_S3_BUCKET}/output/"

In [None]:
xgboost_container = get_image_uri(boto3.Session().region_name,
                          'xgboost', 
                          repo_version='0.90-2')

In [None]:
hyperparameters = {
        "max_depth":"5",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.8",
        "objective":"binary:logistic",
        "num_round":"100"}


Now we will copy the data into an in-memory buffer and upload the data to S3 in a CSV format as XGBoost expects it: the label being the first column followed by the rest of the columns.

In [None]:
import io
import sklearn
import os

str_buf = io.StringIO()
bin_buf = io.BytesIO()

In [None]:
train_combined_df = pd.concat([y_train.astype(float), X_train], axis=1)

In [None]:
# We copy the csv text into a in-memory text buffer, that we then convert to a binary one as that what's expected from S3
train_combined_df.to_csv(str_buf, header=False, index=False)
str_buf.seek(0)
bin_buf = io.BytesIO(str_buf.getvalue().encode())
bin_buf.seek(0)

In [None]:
key = 'training-data.csv'
boto3.resource('s3').Bucket(config.MODEL_DATA_S3_BUCKET).Object(os.path.join('data', 'train', key)).upload_fileobj(bin_buf)

s3_train_data = f"s3://{config.MODEL_DATA_S3_BUCKET}/data/train/{key}"
print(f"Uploaded training data location: {s3_train_data}")


In [None]:
import sagemaker
from sagemaker.session import s3_input

# construct a SageMaker estimator that calls the xgboost-container
estimator = sagemaker.estimator.Estimator(xgboost_container, 
                                          hyperparameters=hyperparameters,
                                          role=config.SAGEMAKER_IAM_ROLE,
                                          train_instance_count=1, 
                                          train_instance_type='ml.m5.2xlarge', 
                                          train_volume_size=5, # 5 GB 
                                          output_path=output_path,
                                          base_job_name="{}-xgb".format(config.SOLUTION_PREFIX))

# define the data type and paths to the training and validation datasets
content_type = "text/csv"
train_input = s3_input(s3_train_data, content_type=content_type)

In [None]:
# execute the XGBoost training job
estimator.fit({'train': train_input})

## Brief intermission for SageMaker Model Monitor

While the model is training we can start familiarizing ourselves with Amazon SageMaker Model Monitor. We will use this capability to monitor the distribution of data sent to our model after we deploy it. The purpose of Model Monitor is to detect changes in the data early, so that we can trigger alerts and investigate any errors in our data ingestion pipelines, and re-train our models if necessary. We will attempt a high-level overview of the offering here, for more information you can check out the [developer guide](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html).

A Model Monitor pipeline consists of four overall steps:

1. Data Capture
2. Creating a Baseline
3. Scheduling Monitoring Jobs
4. Interpreting Results

For the first step, _Data Capture_, we need to set up model monitor to capture incoming requests to the endpoint where we deployed our trained model, as well as the resulting model predictions. 

The next step we'll need to take is _Create a Baseline_ from the train data we used to train the model. This step allows Model Monitor to recognize when the data fed into the model for predictions have shifted significantly from the data that was used to train the model. To achieve this, Model Monitor uses [Deequ](https://github.com/awslabs/deequ), an open-source library developed by Amazon to measure data quality using Apache Spark.

Next, we'll _Schedule Monitoring Jobs_ to to specify the data to collect and how often, in order to produce reports of data quality.

Finally, we will _Interepret the Results_ produced by our monitoring jobs. This step will allow us to watch for any violations of data quality and receive notifications from CloudWatch.

Before we proceed however, we need a model, so let's wait for the training job we triggered previously to finish. Overall the training job should take approximatelly 6 minutes to finish, and you can also monitor its progress in the SageMaker UI in the AWS console.

### Deploy an endpoint with Model Monitor Support

The first step as we mentioned is Data Capture. We will define a data capture configuration that captures 50% of the incoming predictions to our endpoint and stores the data and predictions to a destination on the solution's bucket. We will start by creating a data capture configuration, and then deploying the endpoint.

In [None]:
endpoint_name = f"{config.SOLUTION_PREFIX}-xgb-endpoint"

from sagemaker.model_monitor import DataCaptureConfig

data_capture_prefix = 'data-capture'
s3_capture_upload_path = 's3://{}/{}'.format(config.MODEL_DATA_S3_BUCKET, data_capture_prefix)

In [None]:
data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri=s3_capture_upload_path
)

endpoint_name = f"{config.SOLUTION_PREFIX}-xgb-endpoint"
predictor = estimator.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge',
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
    wait=True)

### Note:

Currently, Model Monitor supports monitoring Endpoints out of the box only for **tabular (csv, flat-json)** datasets. If your Endpoint uses some other data format, these following steps will NOT work for you. `XGBoost` is one of the many first-party algorithms that support tabular datasets, you can find more [here](https://docs.aws.amazon.com/sagemaker/latest/dg/cdf-training.html#cdf-common-content-types).

Additionally, the analysis only supports endpoints that have the **same** input and output content type. In this example, we use `XGBoost` which can have both input and output content type be `text/csv`.

In [None]:
from sagemaker.predictor import RealTimePredictor, csv_serializer

predictor = RealTimePredictor(endpoint_name, content_type='text/csv', serializer=csv_serializer)

### Capture endpoint data

Now that we have our endpoint deployed, let's make some predictions on our test dataset to see the if the data is being captured by model monitor:

### Feed the endpoint with modified data

To simulate data issues we will modify part of our test data and feed it continuously to our endpoint. For that we will use a background thread, so for the executions to stop you should eventually shut down the notebook. To referesh our memory, let's take a look at the index of each feature.

In [None]:
# Let's take a look at our columns again
list(enumerate(train_data.columns))

We'll modify the X_test data points to introduce some data drift. Specifically, let's modify the values of two columns,  column 1 which is `basket_add_list`, and column 22 which is `loc_uk`, replacing their integer values with normally distributed random values.

In [None]:
# Need to reset the index to allow for mixed types in the dataframe
X_test_modified = X_test[:3600].copy(deep=True).reset_index().drop("UserID", axis=1)
X_test_modified["basket_add_list"] = np.random.randn(X_test_modified.shape[0])
X_test_modified["loc_uk"] = np.random.randn(X_test_modified.shape[0]).astype(float) * 3

We can see that the values for these two features are now random floating point numbers instead of 0/1:

In [None]:
X_test_modified

Next, we will create a thread that continuously invokes the endpoint to generate predictions, using the modified data. Once the scheduled Model Monitor job runs, which will happen approximatelly on the hour mark, we will be able to create reports and check for any data issues.

In [None]:
modified_strings = X_test_modified.to_csv(header=None, index=False).strip('\n').split('\n')

In [None]:
def generate_traffic(predictor, data_list):
    """
    Using a list of strings
    """
    while True:
        for example in data_list:
            response = predictor.predict(data=example)
            time.sleep(1.0)

In [None]:
from threading import Thread

thread = Thread(target = generate_traffic, args=(predictor, modified_strings))
thread.start()

#### View captured data

Now that we have triggered some predictions on the endpoint, Model Monitor will eventually capture some data on S3 that we can view:

In [None]:
# Give the data some time to be captured
s3_client = boto3.Session().client('s3')
current_endpoint_capture_prefix = '{}/{}'.format(data_capture_prefix, endpoint_name)

response_list = []
while len(response_list) == 0:
    result = s3_client.list_objects(Bucket=config.MODEL_DATA_S3_BUCKET, Prefix=current_endpoint_capture_prefix)
    contents = result.get('Contents')
    response_list = [] if contents == None else contents
    print("Waiting for endpoint data capture...")
    time.sleep(60)

capture_files = [capture_file.get("Key") for capture_file in response_list]

print("Found Capture Files:")
print("\n ".join(capture_files))

Let's take a look at single line from one of the files:

In [None]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=config.MODEL_DATA_S3_BUCKET, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(capture_files[-1])
import json
print(json.dumps(json.loads(capture_file.split('\n')[0]), indent=2))

We can see that model monitor captures the input data, the response of the endpoint, as well as some metadata for the prediction event.

### Create a baseline

For model monitor to be able to detect the data issues we introduced, we first need to teach it what "normal" looks like. This is done by creating a baseline model on the original training data, to capture the statistics of the various features.

In [None]:
# We create a new csv file that includes the header to make the constraints more intuitive
head_str_buf = io.StringIO()
train_combined_df.to_csv(head_str_buf, header=True, index=False)
head_str_buf.seek(0)

head_bin_buf = io.BytesIO(head_str_buf.getvalue().encode())
head_bin_buf.seek(0)

key = 'training-data-with-header.csv'
boto3.resource('s3').Bucket(config.MODEL_DATA_S3_BUCKET).Object(
    os.path.join('data', 'baseline', key)).upload_fileobj(head_bin_buf)

s3_train_data_with_header = 's3://{}/data/baseline/{}'.format(config.MODEL_DATA_S3_BUCKET, key)
print('Uploaded baseline data location: {}'.format(s3_train_data))


Running the `suggest_baseline` job below should take approximately 5 minutes. When it completes we should a get set of statistics and constraints for our features.

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

baseline_output = f"s3://{config.MODEL_DATA_S3_BUCKET}/baseline-output/"

default_monitor = DefaultModelMonitor(
    role=config.SAGEMAKER_IAM_ROLE,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
    base_job_name=f"{config.SOLUTION_PREFIX}-model-monitor"
)

In [None]:
default_monitor.suggest_baseline(
    baseline_dataset=s3_train_data_with_header,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_output
)

Now that the job has finished, let's take a look at the produced information about the first 10 columns of our training dataset. This will include statistics depending on the kind of feature: String columns will get statistics like the number of distinct values, while numerical features will get statistics like mean, standard deviation etc. In our example, all our data are numerica, binary features to be exact, so we will get back statistical information for each feature, similarly to the `Pandas.describe()` call we made above, but able to handle massive amounts of data.

In [None]:
import pandas as pd

baseline_job = default_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

From these statistics a number of constraints can be generated, that we can then use to detect drift in our dataset. This is indicated by the value of `num_constraints.is_non_negative`. When `True` this means that Model Monitor was able to generate a number of constraints for the feature.

In [None]:
constraints_df = pd.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)

### Schedule monitoring jobs

The next step to make use of Model Monitor is to set up a monitoring schedule. This will instruct Model Monitor to check the distribution of the incoming data to the endpoint to those of the baseline, at preset intervals.

We use `CronExpressionGenerator` here, to generate an hourly schedule.

**Note:** Model Monitor currently only supports hourly integer rates between 1 and 24 hours.

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator

default_monitor.create_monitoring_schedule(
    endpoint_input=endpoint_name,
    statistics=default_monitor.baseline_statistics(),
    constraints=default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    monitor_schedule_name=f"{config.SOLUTION_PREFIX}-schedule"
)


In [None]:
desc_schedule_result = default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

### Interpret Monitoring Results

As we mentioned model monitor runs its execution at the hour mark once we trigger it, so it's possible we will have to wait for an execution to trigger before we can observe some results. If we triggered the schedule at 13:34, we'd have to wait for the hour mark (14:00) plus up to 20 minutes for the analysis to run. The below cell will do just that.

In [None]:
mon_executions = default_monitor.list_executions()
print("We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer).\nWe will have to wait till we hit the hour...")

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    mon_executions = default_monitor.list_executions()
    time.sleep(60)

In [None]:
mon_executions

Now that we have at least one monitoring execution finished we can view the reports:

In [None]:
latest_execution = mon_executions[-1]
time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()['ProcessingJobStatus']))
print("Latest execution result: {}".format(latest_execution.describe()['ExitMessage']))

latest_job = latest_execution.describe()
if (latest_job['ProcessingJobStatus'] != 'Completed'):
        print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

In [None]:
latest_monitoring_violations = default_monitor.latest_monitoring_constraint_violations()

In [None]:
latest_execution_info = latest_execution.describe()

Let us get the reports from S3 and see what violations were detected:

In [None]:
processing_job_arn = latest_execution_info['ProcessingJobArn']

In [None]:
exec_inputs = {inp['InputName']: inp for inp in latest_execution.describe()['ProcessingInputs']}
exec_results = latest_execution.output.destination

In [None]:
from sagemaker.s3 import S3Downloader

violations_filepath = os.path.join(exec_results, 'constraint_violations.json')

violations = json.loads(S3Downloader.read_file(violations_filepath))['violations']


In [None]:
violations

From the results above we can see that model monitor is able to detect the errors that we introduced in the dataset, warning us about potential data issues. We can use the output of the analysis to trigger alerts or model retraining.

## Accuracy evaluation

Before completing the solution we'll evaluate the performance of our model on the test data we split previously. We start by defining a function that performs predictions on batches of data.

**Note:** By using batched prediction it's likely that Model Monitor detects the multiple inputs as extra columns in the data. This issue will go away at the next analysis if you continue with single-value predictions.

In [None]:
# This convenince function allows us to trigger the endpoint using batches of data to avoid large requests
def make_predictions(predictor, X_test, rows=500):
    """
    Extract predictions given a predictor and test set.
    """
    # Split the dataset into batches
    split_array = np.array_split(X_test, int(X_test.shape[0] / float(rows) + 1))
    predictions = ''
    # Obtain predictions for each batch
    for array in split_array:
        predictions = ','.join([predictions, predictor.predict(array).decode('utf-8')])

    # Join the batched predictions toger
    return np.fromstring(predictions[1:], sep=',')


In [None]:
test_preds = make_predictions(predictor, X_test.values)
# Threshold our predictions to get integer 0/1 values
test_preds = np.where(test_preds > 0.5, 1, 0)

Because our dataset is unbalanced we should be using metrics that take the frequency of each class into consideration. Two such metrics are the balanced accuracy score and Cohen's Kappa score:

In [None]:
from sklearn.metrics import balanced_accuracy_score, cohen_kappa_score, classification_report

# scikit-learn expects 0/1 predictions, so we threshold our raw predictions
print("Balanced accuracy = {}".format(balanced_accuracy_score(y_test, test_preds)))
print("Cohen's Kappa = {}".format(cohen_kappa_score(y_test, test_preds)))

It's interesting to see how our model performs per-class as well:

In [None]:
print(classification_report(y_test, test_preds, target_names=["No Purchase", "Purchase"]))

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(y_true, y_predicted):

    cm  = confusion_matrix(y_true, y_predicted)
    # Get the per-class normalized value for each cell
    cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    
    # We color each cell according to its normalized value, annotate with exact counts.
    ax = sns.heatmap(cm_norm, annot=cm, fmt="d")
    ax.set(xticklabels=["No Purchase", "Purchase"], yticklabels=["No Purchase", "Purchase"])
    ax.set_ylim([0,2])
    plt.title('Confusion Matrix')
    plt.ylabel('Real Classes')
    plt.xlabel('Predicted Classes')
    plt.show()

In [None]:
plot_confusion_matrix(y_test, test_preds)

From the above we can observe that our model is very accurate overall, but we could try to improve it's accuracy for the "purchase" class. Right now our model has many false positives, where a purchase was predicted but didn't happen. We leave the model improvements as an exercise for later.

Before finishing the solution, let's ensure we turn off monitoring for our endpoint as that is required for us to be able to delete the endpoint later:

In [None]:
default_monitor.delete_monitoring_schedule()
time.sleep(60) # actually wait for the deletion