# Churn Predictive Analytics using Amazon SageMaker

---
## Background

The purpose of this lab is to demonstrate the basics of building an advanced analytics solution using Amazon SageMaker. In this notebook we will create a customer churn analytics solution by training an XGBoost churn model, and batching churn prediction scores into a data warehouse. 

This notebook extends one of the example tutorial notebooks: [Customer Churn Prediction with XGBoost](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_applying_machine_learning/xgboost_customer_churn/xgboost_customer_churn_neo.ipynb). The extended learning objectives are highlighted in bold below.

#### Learning Objectives 

 - **Learn how to query ground truth data from our data warehouse into a pandas dataframe for exploration and feature engineering.**
 - Train an XGBoost model to perform churn prediction.
 - **Learn how to run a Batch Transform job to calculate churn scores in batch.**
 - Optimize your model using SageMaker Neo.
 - **Run a Glue job programatically to demonstrate data processing and feature engineering at scale using SparkML.88
 - **Create a production scale inference pipeline that consists of a SparkML feature engineering pipeline that feeds into an XGBoost churn classification model.**

---

## Prerequisites

The lab guide [here](https://github.com/dylan-tong-aws/aws-advanced-analytics-jumpstarter/blob/master/lab-guides/SageMaker%20Lab-Churn%20Predictive%20Analytics.pdf) takes you through the prerequite steps required by this notebook. 

In summary:
 - You've built the lab environment using this CloudFormation [template](https://github.com/dylan-tong-aws/aws-advanced-analytics-jumpstarter/blob/master/cf-templates/adv-analytics-lab.yaml). This template launches a Redshift cluster in your default VPC.
 - You've taken note of the Redshift cluster credentials, endpoint, and the workshop IAM role ARN that was created by the template.
 - This notebook should be running in your default VPC. 
 - The security groups on your Redshift cluster and your notebook instance must be able to communicate with each other using TCP on port 5439.
 
---

## Setup

_This notebook should run on any Amazon SageMaker notebook instance_

Let's first install the Redshift drivers on this notebook instance, so that we can query Redshift data directly from our notebook. You could have installed the drivers at launch time using SageMaker [lifecycle configurations](https://docs.aws.amazon.com/sagemaker/latest/dg/notebook-lifecycle-config.html).

In [None]:
!conda install -y -c anaconda psycopg2

Let's start by specifying:

- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting. Replace <<'REPLACE WITH YOUR BUCKET NAME'>> with the name of your bucket.

- The IAM role arn used to give training and hosting access to your data. By default, we'll use the IAM permissions that have been allocated to your notebook instance. The role should have the permissions to access your S3 bucket, and full execution permissions on Amazon SageMaker. In practice, you could minimize the scope of requried permissions.

In [None]:
import boto3
import psycopg2
import sqlalchemy as sa
import re
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io
import os
import sys
import time
import json
from IPython.display import display
from time import strftime, gmtime

import sagemaker
from sagemaker.predictor import csv_serializer
from sagemaker import get_execution_role

sess = sagemaker.Session()
role = get_execution_role()
region = boto3.Session().region_name

##bucket = 'dtong-jumpstarter-workshop'
bucket = '<<REPLACE WITH YOUR BUCKET NAME>>'
prefix = 'churn-analytics-lab'

---
## Data

Mobile operators have historical records on which customers ultimately ended up churning and which continued using the service. We can use this historical information to construct an ML model of one mobile operator’s churn using a process called training. After training the model, we can pass the profile information of an arbitrary customer (the same profile information that we used to train the model) to the model, and have the model predict whether this customer is going to churn. Of course, we expect the model to make mistakes–after all, predicting the future is tricky business! But I’ll also show how to deal with prediction errors.

The dataset we use is publicly available and was mentioned in the book [Discovering Knowledge in Data](https://www.amazon.com/dp/0470908742/) by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets.  Let's download and read that dataset in now:

In [None]:
!wget http://dataminingconsultant.com/DKD2e_data_sets.zip
!unzip -o DKD2e_data_sets.zip

It's often desireable to leverage data sources from your databases to train machine learning models. The data is in tabular format, and often contains ground truth.

In other situations, you might want to load data into a database for the purpose of being to run queries at scale to facilitate exploration and experimentation.

In the following steps, we're going to load our data set into our database to demonstrate how you can query data from your notebook into a pandas dataframe.

In [None]:
DATASOURCE_PREFIX = '{}/datasets/raw'.format(prefix)
DATASOURCE_S3URI = 's3://{}/{}/{}'.format(bucket,DATASOURCE_PREFIX,'churn.csv')

boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(DATASOURCE_PREFIX,'churn.csv')).upload_file('Data sets/churn.txt')

print("Data set transferred to your S3 bucket at: {}".format(DATASOURCE_S3URI))

Provide the connection and credentials required to connect to your Redshift instance. You'll need to modify the cell below with the appropriate **host endpoint** and **password** to your database. If you followed the lab guide instructions, you should have this information.

In practice, security standards might prohibit you from providing credentials in clear text. As a best practice in production, you should utilize a service like [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) to manage your database credentials.

In [None]:
creds = {
    "host_name": "<<YOUR CLUSTER ENDPOINT>>",
    "port_num": "5439",
    "db_name": "workshop",
    "username": "admin",
    "password": "<<YOUR PASSWORD>>"
}
print(creds)

def get_conn(creds): 
    conn = psycopg2.connect(dbname=creds['db_name'], 
                            user=creds['username'], 
                            password=creds['password'],
                            port=creds['port_num'],
                            host=creds['host_name'])
    return conn

Set **REDSHIFT_IAM_ROLE** to the value of the IAM Role ARN (eg. arn:aws:iam::803235869972:role/workshop-rWorkshopRole-TPJWUEY4PMP6) that was created for you by the CloudFormation template that you executed during the lab setup.

In [None]:
REDSHIFT_IAM_ROLE = '<<SET YOUR IAM ROLE>>'

Next, we load the data into our database cluster.

In [None]:
cmd = 'CREATE TABLE call_stats( \
  state VARCHAR NOT NULL, \
  acctlen INT NOT NULL, \
  areacode INT2 NOT NULL, \
  phone VARCHAR NOT NULL, \
  intlplan VARCHAR NOT NULL, \
  vmailplan VARCHAR NOT NULL, \
  vmailmsg INT NOT NULL, \
  daymins REAL NOT NULL, \
  daycalls INT NOT NULL, \
  daycharge REAL NOT NULL, \
  evemins REAL NOT NULL, \
  evecalls INT NOT NULL, \
  evecharge REAL NOT NULL, \
  nightmins REAL NOT NULL, \
  nightcalls INT NOT NULL, \
  nightcharge REAL NOT NULL, \
  intlmins REAL NOT NULL, \
  intlcalls INT NOT NULL, \
  intlcharge REAL NOT NULL, \
  custservcalls INT NOT NULL, \
  churn VARCHAR);'

def create_table() :
    try :
        conn = None
        with get_conn(creds) as conn:
            with conn.cursor() as cur:
                cur.execute(cmd)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()
                
create_table()

cmd = "COPY call_stats from \
'{}' IAM_ROLE '{}' DELIMITER ',' IGNOREHEADER as 1".format(DATASOURCE_S3URI,REDSHIFT_IAM_ROLE);

def load_data() :
    try :
        conn = None
        with get_conn(creds) as conn:
            with conn.cursor() as cur:
                cur.execute(cmd)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

start = time.time()         
load_data()
end = time.time()

print("Data was loaded in {} seconds.".format(end-start))

---
## Explore

Now we can run queries against your database. In the cell below, we retrieve the entire table. 
  *query = 'select * from public.call_stats;'*

However, in practice, the data table will often contain more data than what is practical to operate on within a notebook instance, or relevant attributes are spread across multiple tables. Being able to run SQL queries and loading the data into a pandas dataframe will be helpful during the initial stages of development.

In [None]:
# Sample query for testing
query = 'select * from public.call_stats;'

colnames = ['State','Account Length','Area Code','Phone','Intl Plan', 'VMail Plan', 'VMail Message','Day Mins',
            'Day Calls', 'Day Charge', 'Eve Mins', 'Eve Calls', 'Eve Charge', 'Night Mins', 'Night Calls','Night Charge',
            'Intl Mins','Intl Calls','Intl Charge','CustServ Calls', 'Churn?']

def get_df(creds, query):
    with get_conn(creds) as conn:
        with conn.cursor() as cur:
            cur.execute(query)
            result_set = cur.fetchall()
          #  colnames = [desc.name for desc in cur.description]
            df = pd.DataFrame.from_records(result_set, columns=colnames)
    return df

churn = get_df(creds, query)

display(churn)

By modern standards, it’s a relatively small dataset, with only 3,333 records, where each record uses 21 attributes to describe the profile of a customer of an unknown US mobile operator. The attributes are:

- `State`: the US state in which the customer resides, indicated by a two-letter abbreviation; for example, OH or NJ
- `Account Length`: the number of days that this account has been active
- `Area Code`: the three-digit area code of the corresponding customer’s phone number
- `Phone`: the remaining seven-digit phone number
- `Int’l Plan`: whether the customer has an international calling plan: yes/no
- `VMail Plan`: whether the customer has a voice mail feature: yes/no
- `VMail Message`: presumably the average number of voice mail messages per month
- `Day Mins`: the total number of calling minutes used during the day
- `Day Calls`: the total number of calls placed during the day
- `Day Charge`: the billed cost of daytime calls
- `Eve Mins, Eve Calls, Eve Charge`: the billed cost for calls placed during the evening
- `Night Mins`, `Night Calls`, `Night Charge`: the billed cost for calls placed during nighttime
- `Intl Mins`, `Intl Calls`, `Intl Charge`: the billed cost for international calls
- `CustServ Calls`: the number of calls placed to Customer Service
- `Churn?`: whether the customer left the service: true/false

The last attribute, `Churn?`, is known as the target attribute–the attribute that we want the ML model to predict.  Because the target attribute is binary, our model will be performing binary prediction, also known as binary classification.

Let's begin exploring the data:

In [None]:
# Frequency tables for each categorical feature
for column in churn.select_dtypes(include=['object']).columns:
    display(pd.crosstab(index=churn[column], columns='% observations', normalize='columns'))

# Histograms for each numeric features
display(churn.describe())
%matplotlib inline
hist = churn.hist(bins=30, sharey=True, figsize=(10, 10))

We can see immediately that:
- `State` appears to be quite evenly distributed
- `Phone` takes on too many unique values to be of any practical use.  It's possible parsing out the prefix could have some value, but without more context on how these are allocated, we should avoid using it.
- Only 14% of customers churned, so there is some class imabalance, but nothing extreme.
- Most of the numeric features are surprisingly nicely distributed, with many showing bell-like gaussianity.  `VMail Message` being a notable exception (and `Area Code` showing up as a feature we should convert to non-numeric).

In [None]:
churn = churn.drop('Phone', axis=1)
churn['Area Code'] = churn['Area Code'].astype(object)

Next let's look at the relationship between each of the features and our target variable.

In [None]:
for column in churn.select_dtypes(include=['object']).columns:
    if column != 'Churn?':
        display(pd.crosstab(index=churn[column], columns=churn['Churn?'], normalize='columns'))

for column in churn.select_dtypes(exclude=['object']).columns:
    print(column)
    hist = churn[[column, 'Churn?']].hist(by='Churn?', bins=30)
    plt.show()

Interestingly we see that churners appear:
- Fairly evenly distributed geographically
- More likely to have an international plan
- Less likely to have a voicemail plan
- To exhibit some bimodality in daily minutes (either higher or lower than the average for non-churners)
- To have a larger number of customer service calls (which makes sense as we'd expect customers who experience lots of problems may be more likely to churn)

In addition, we see that churners take on very similar distributions for features like `Day Mins` and `Day Charge`.  That's not surprising as we'd expect minutes spent talking to correlate with charges.  Let's dig deeper into the relationships between our features.

In [None]:
display(churn.corr())
pd.plotting.scatter_matrix(churn, figsize=(12, 12))
plt.show()

We see several features that essentially have 100% correlation with one another.  Including these feature pairs in some machine learning algorithms can create catastrophic problems, while in others it will only introduce minor redundancy and bias.  Let's remove one feature from each of the highly correlated pairs: Day Charge from the pair with Day Mins, Night Charge from the pair with Night Mins, Intl Charge from the pair with Intl Mins:

In [None]:
churn = churn.drop(['Day Charge', 'Eve Charge', 'Night Charge', 'Intl Charge'], axis=1)

Now that we've cleaned up our dataset, let's determine which algorithm to use.  As mentioned above, there appear to be some variables where both high and low (but not intermediate) values are predictive of churn.  In order to accommodate this in an algorithm like linear regression, we'd need to generate polynomial (or bucketed) terms.  Instead, let's attempt to model this problem using gradient boosted trees.  Amazon SageMaker provides an XGBoost container that we can use to train in a managed, distributed setting, and then host as a real-time prediction endpoint.  XGBoost uses gradient boosted trees which naturally account for non-linear relationships between features and the target variable, as well as accommodating complex interactions between features.

Amazon SageMaker XGBoost can train on data in either a CSV or LibSVM format.  For this example, we'll stick with CSV.  It should:
- Have the predictor variable in the first column
- Not have a header row

But first, let's convert our categorical features into numeric features.

In [None]:
model_data = pd.get_dummies(churn)
model_data = pd.concat([model_data['Churn?_True.'], model_data.drop(['Churn?_False.', 'Churn?_True.'], axis=1)], axis=1)

And now let's split the data into training, validation, and test sets.  This will help prevent us from overfitting the model, and allow us to test the models accuracy on data it hasn't already seen.

In [None]:
train_data, validation_data, test_data = np.split(model_data.sample(frac=1, random_state=1729), [int(0.7 * len(model_data)), int(0.9 * len(model_data))])
train_data.to_csv('train.csv', header=False, index=False)
validation_data.to_csv('validation.csv', header=False, index=False)

In [None]:
pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 1000)
display(train_data)

Now we'll upload these files to S3.

In [None]:
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')

---
## Train

Moving onto training, first we'll need to specify the locations of the XGBoost algorithm containers.

In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri
xgb_training_container = get_image_uri(boto3.Session().region_name, 'xgboost')

Then, because we're training with the CSV file format, we'll create `s3_input`s that our training function can use as a pointer to the files in S3.

In [None]:
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')
s3_input_validation = sagemaker.s3_input(s3_data='s3://{}/{}/validation/'.format(bucket, prefix), content_type='csv')

Now, we can specify a few parameters like what type of training instances we'd like to use and how many, as well as our XGBoost hyperparameters.  A few key hyperparameters are:
- `max_depth` controls how deep each tree within the algorithm can be built.  Deeper trees can lead to better fit, but are more computationally expensive and can lead to overfitting.  There is typically some trade-off in model performance that needs to be explored between a large number of shallow trees and a smaller number of deeper trees.
- `subsample` controls sampling of the training data.  This technique can help reduce overfitting, but setting it too low can also starve the model of data.
- `num_round` controls the number of boosting rounds.  This is essentially the subsequent models that are trained using the residuals of previous iterations.  Again, more rounds should produce a better fit on the training data, but can be computationally expensive or lead to overfitting.
- `eta` controls how aggressive each round of boosting is.  Larger values lead to more conservative boosting.
- `gamma` controls how aggressively trees are grown.  Larger values lead to more conservative models.

More detail on XGBoost's hyperparmeters can be found on their GitHub [page](https://github.com/dmlc/xgboost/blob/master/doc/parameter.md).

In [None]:
xgb = sagemaker.estimator.Estimator(xgb_training_container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m5.xlarge',
                                    output_path='s3://{}/{}/output'.format(bucket, prefix),
                                    sagemaker_session=sess)
xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100)

xgb.fit({'train': s3_input_train, 'validation': s3_input_validation}) 

## Compile
[Amazon SageMaker Neo](https://aws.amazon.com/sagemaker/neo/) optimizes models to run up to twice as fast, with no loss in accuracy. When calling `compile_model()` function, we specify the target instance family (m4) as well as the S3 bucket to which the compiled model would be stored.

In [None]:
compiled_model = xgb
try:
    xgb.create_model()._neo_image_account(boto3.Session().region_name)
except:
    print('Neo is not currently supported in', boto3.Session().region_name)
else:
    output_path = '/'.join(xgb.output_path.split('/')[:-1])
    compiled_model = xgb.compile_model(target_instance_family='ml_m5', 
                                   input_shape={'data':[1, 69]},
                                   role=role,
                                   framework='xgboost',
                                   framework_version='0.7',
                                   output_path=output_path)
    compiled_model.name = 'deployed-xgboost-customer-churn'
    compiled_model.image = get_image_uri(sess.boto_region_name, 'xgboost-neo', repo_version='latest')

## Batch Inference

Next we're going to evaluate our model by using a Batch Transform to generate churn scores in batch from our `test_data.`

First, we upload the test data to S3. SageMaker Batch Transform is designed to run asynchronously and ingest input data from S3. This differs from SageMaker's real-time inference endpoints, which receive input data from synchronous HTTP requests.

Batch Transform is often the ideal option for advanced analytics use case for serveral reasons:

 - Batch Transform is better optimized for throughput in comparison with real-time inference endpoints. Thus, Batch Transform is ideal for processing large volumes of data for analytics.
 - Offline asynchronous processing is acceptable for most analytics use cases.
 - Batch Transform is more cost efficient when real-time inference isn't necessary. You only need to pay for resources used during batch processing. There is no need to pay for ongoing resources like a hosted endpoint for real-time inference.

In [None]:
batch_input = test_data.iloc[:,1:]
batch_input.to_csv('test.csv', header=False, index=False)
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'test/test.csv')).upload_file('test.csv')

s3uri_batch_input ='s3://{}/{}/test'.format(bucket, prefix)
print('Batch Transform input S3 uri: {}'.format(s3uri_batch_input))

s3uri_batch_output= 's3://{}/{}/out'.format(bucket, prefix)
print('Batch Transform output S3 uri: {}'.format(s3uri_batch_output))

In [None]:
from sagemaker.transformer import Transformer
BATCH_INSTANCE_TYPE = 'ml.c5.xlarge'

transformer = compiled_model.transformer(instance_count=1,
                                         strategy='SingleRecord',
                                         assemble_with='Line',
                                         instance_type= BATCH_INSTANCE_TYPE,
                                         accept = 'text/csv',
                                         output_path=s3uri_batch_output)
    
transformer.transform(s3uri_batch_input, split_type= 'Line', content_type= 'text/csv')

There are many ways to compare the performance of a machine learning model, but let's start by simply by comparing actual to predicted values.  In this case, we're simply predicting whether the customer churned (`1`) or not (`0`), which produces a simple confusion matrix.

In [None]:
batched_churn_scores = pd.read_csv(s3uri_batch_output+'/test.csv.out', names=['scores'])
gt_df = pd.DataFrame(test_data['Churn?_True.']).reset_index(drop=True)
results_df= pd.concat([gt_df,batched_churn_scores],axis=1,join_axes=[gt_df.index])

pd.crosstab(index=results_df['Churn?_True.'], columns=np.round(results_df['scores']), rownames=['actual'], colnames=['predictions'])

---

## Scaling Your Solution

In the previous section we used pandas for data wrangling. This is convenient for exploration and prototyping. For small to moderate data sets, you could run your scripts in Lambda as a quick and simple pipelining solution.

However, for large data sets, you'll want to leverage a data processing framework like Spark. In the following section, we'll demonstrate the use of [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) to serve as a data pipeline for both training and inference. AWS Glue is a serverless ETL service that is built on Apache Spark. 

---

### Training Pipelining

Run the next cell to view the PySpark script that performs the equivalent transformations that were previously accomplished using Pandas.

This script was developed within an AWS Glue [development environment](https://docs.aws.amazon.com/glue/latest/dg/notebooks-with-glue.html). AWS Glue provides managed notebooks for Zeppelin and an integration with SageMaker managed Jupyter notebooks. 

To keep this lab brief, we fast forward to the stage where we've already developed our PySpark scripts. This notebook demonstrates the execution of a Glue ETL job through the SDK within a Jupyter notebook. In practice, the AWS Glue job could be launched automatically using AWS Glue's [workflow functionality](https://docs.aws.amazon.com/glue/latest/dg/trigger-job.html), or via an external orchestration tool (eg. Apache Airflow, AWS Step Functions).

In [None]:
!pygmentize ../scripts/churn-analytics-data-pipeline.py

Next, let's make preparation for our AWS Glue ETL Job. We do the following in the cell below:
 - Upload the PySpark scripts to an S3 location where AWS Glue can pick it up.
 - Set the location of the input data
 - Set the location for the job to output the processed data sets.

In [None]:
script_location_prefix = '{}/scripts/churn-analytics-data-pipeline.py'.format(prefix)
script_location = 's3://{}/{}'.format(bucket,script_location_prefix)

start = time.time()
boto3.Session().resource('s3').Bucket(bucket).Object(script_location_prefix).upload_file('../scripts/churn-analytics-data-pipeline.py')
end = time.time()

print("Scripts were uploaded to {} in {} seconds.".format(script_location,end-start))

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

# Input location of the data, We uploaded our train.csv file to input key previously
s3_input_bucket = bucket
s3_input_key_prefix = DATASOURCE_PREFIX

# Output location of the data. The input data will be split, transformed, and 
# uploaded to output/train and output/validation
s3_output_bucket = bucket
s3_output_key_prefix = timestamp_prefix + '/churn'

### Upload MLeap dependencies to S3

For our job, we will also have to pass MLeap dependencies to Glue. MLeap is an additional library we are using which does not come bundled with default Spark.

MLeap provides a means to export Spark pipelines into a portable format, and execution engine. In the above AWS Glue job, we perform a number of data transformations which we export using MLeap. This allows us to later deploy this transformation pipeline into our SageMaker production environment as part of an inference pipeline.

Similar to most of the packages in the Spark ecosystem, MLeap is also implemented as a Scala package with a front-end wrapper written in Python so that it can be used from PySpark. We need to make sure that the MLeap Python library as well as the JAR is available within the Glue job environment. In the following cell, we will download the MLeap Python dependency & JAR from a SageMaker hosted bucket and upload to the S3 bucket we created above in your account. 

If you are using some other Python libraries like `nltk` in your code, you need to download the wheel file from PyPI and upload to S3 in the same way. At this point, Glue only supports passing pure Python libraries in this way (e.g. you can not pass `Pandas` or `OpenCV`). However you can use `NumPy` & `SciPy` without having to pass these as packages because these are pre-installed in the Glue environment. 


First, we download the libraries.

In [None]:
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar    

Next, we upload the libraries to an S3 location that our AWS Glue job can pick up.

In [None]:
python_dep_location = sess.upload_data(path='python.zip', bucket=bucket, key_prefix='dependencies/python')
jar_dep_location = sess.upload_data(path='mleap_spark_assembly.jar', bucket=bucket, key_prefix='dependencies/jar')

Lastly, we'll have our AWS Glue job upload our MLeap serialized SparkML model at the following location.

In [None]:
s3_model_bucket = bucket
s3_model_key_prefix = s3_output_key_prefix + '/mleap'

print('AWS Glue will upload the MLeap model to {}/{}'.format(s3_model_bucket,s3_model_key_prefix))

### Run a Glue Job

Next we'll be creating a Glue client via Boto so that we can invoke the `create_job` API for Glue. `create_job` will create a job definition which can be executed to run our script.

`AllocatedCapacity` parameter controls the hardware resources that Glue will use to execute this job. It is measures in units of `DPU`. For more information on `DPU`, please see [here](https://docs.aws.amazon.com/glue/latest/dg/add-job.html).

In [None]:
glue_client = boto3.client('glue')
job_name = 'churn-analytics-pipeline-' + timestamp_prefix
response = glue_client.create_job(
    Name=job_name,
    Description='PySpark job to featurize the telco churn dataset',
    Role=role, # you can pass your existing AWS Glue role here if you have used Glue before
    ExecutionProperty={
        'MaxConcurrentRuns': 1
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location
    },
    DefaultArguments={
        '--job-language': 'python',
        '--extra-jars' : jar_dep_location,
        '--extra-py-files': python_dep_location
    },
    AllocatedCapacity=5,
    Timeout=60,
)
glue_job_name = response['Name']
print('Create an AWS Glue job definition named {}'.format(glue_job_name))

We're now ready to run our AWS Glue job. Next we call the `start_job_run` API to execute the job. We provide the parameters that we've set in the previous steps to specify the data input, output, script and model locations in S3.

In [None]:
glue_job_args = {
                    '--S3_INPUT_BUCKET': s3_input_bucket,
                    '--S3_INPUT_KEY_PREFIX': s3_input_key_prefix,
                    '--S3_OUTPUT_BUCKET': s3_output_bucket,
                    '--S3_OUTPUT_KEY_PREFIX': s3_output_key_prefix,
                    '--S3_MODEL_BUCKET': s3_model_bucket,
                    '--S3_MODEL_KEY_PREFIX': s3_model_key_prefix
                }
    
job_run_id = glue_client.start_job_run(JobName=job_name,
                                       Arguments = glue_job_args)['JobRunId']
print('Running Job ID: {}'.format(job_run_id))
print('Arguments provided: {}'.format(glue_job_args))

### Checking Glue job status

Now we will check for the job status to see if it has `succeeded`, `failed` or `stopped`. Once the job is succeeded, we have the transformed data into S3 in CSV format which we can use with XGBoost for training. If the job fails, you can go to [AWS Glue console](https://us-west-2.console.aws.amazon.com/glue/home), click on **Jobs** tab on the left, and from the page, click on this particular job and you will be able to find the CloudWatch logs (the link under **Logs**) link for these jobs which can help you to see what exactly went wrong in the job execution.

In [None]:
job_run_status = glue_client.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):
    job_run_status = glue_client.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
    print (job_run_status)
    time.sleep(30)

### Train Your Churn Predictor

The AWS Glue job processes our raw data, and produces training and validation sets that are suited for training our XGBoost model. AWS Glue is built on Apache Spark, so it's designed to easily scale-out and process large data volumes. We simply need to dial up the number of DPUs to increase resources available to drive up the parallel processing power of our Spark cluster.

In this lab, we operate on a small dataset, so there's no value in retraining our XGBoost model. To save on time and resources, we leverage the XGBoost model that we've already trained. However, the AWS Glue job did serve teh purpose of generating our SparkML pipeline model, which we'll use in the following sections to build an inference pipeline.


---
# Inference Pipelining

Next we will create an inference pipeline that consists of our SparkML model and our XGBoost model. The pipeline is able to take raw data as input, leverage our SparkML pipeline for data processing (feature engineering), and then generate churn scores using our XGBoost model as a Batch Transform (alternatively, a real-time inference endpoint could use used). This type of inference pipeline is practical when pre-processing isn't viable, and you want to either perform near real-time inference, or escapsulate your batch processing pipeline into a single model. 

---

Deploying a model in SageMaker requires two components:

* Docker image residing in ECR.
* Model artifacts residing in S3.

**SparkML**

* SageMaker provides a Docker image for SparkML running on MLeap. For more information, please see [SageMaker SparkML Serving](https://github.com/aws/sagemaker-sparkml-serving-container). 
* The model artifacts for our SparkML model was seralized by MLeap by the AWS Glue job that we ran previously.

**XGBoost**

* SageMaker provides Docker images for all it's algorithms. We can use the same XGBoost container as we did for training. 
* The model artifacts for our XGBoost model was created by our SageMaker training job.

### SparkML Container: Define a Schema

The SparkML serving container requires a schema for the `predict` method call. In order to alleviate the pain of not having to pass the schema with every request, `sagemaker-sparkml-serving` allows you to set the schema once through the model definitions.

We'll do just that in the following cell...

In [None]:
schema = {
    "input": [{"name": "State","type": "string"}, 
              {"name": "AccountLength", "type": "int"}, 
              {"name": "AreaCode","type": "int"}, 
              {"name": "Phone","type": "string"}, 
              {"name": "IntlPlan","type": "string"}, 
              {"name": "VMailPlan","type": "string"},
              {"name": "VMailMessage","type": "int"},
              {"name": "DayMins","type": "float"},
              {"name": "DayCalls","type": "int"},
              {"name": "DayCharge","type": "float"},
              {"name": "EveMins","type": "float"},
              {"name": "EveCalls","type": "int"},
              {"name": "EveCharge","type": "float"},
              {"name": "NightMins","type": "float"},
              {"name": "NightCalls","type": "int"},
              {"name": "NightCharge","type": "float"},
              {"name": "IntlMins","type": "float"},
              {"name": "IntlCalls","type": "int"},
              {"name": "IntlCharge","type": "float"},
              {"name": "CustServCalls","type": "int"},],
    "output": {"name": "features","type": "double","struct": "vector"}
}

schema_json = json.dumps(schema)
print(schema_json)

### Creating a `PipelineModel`

Next we'll create a SageMaker `PipelineModel` with SparkML and XGBoost. The `PipelineModel` will ensure that the containers and models are deployed behind a single API endpoint, and are operated on in the specified order. 

In [None]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel
from sagemaker.amazon.amazon_estimator import get_image_uri

training_image = get_image_uri(sess.boto_region_name, 'xgboost', repo_version="latest")
print (training_image)

sparkml_data = 's3://{}/{}/{}'.format(s3_model_bucket, s3_model_key_prefix, 'model.tar.gz')

sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})
xgb_model = Model(model_data=xgb.model_data, image=xgb_training_container)

model_name = 'inference-pipeline-' + timestamp_prefix
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, xgb_model])

Next, we register this PipelineModel with SageMaker, so that it can be used for things like a Batch Transform.

In [None]:
sm_model.sagemaker_session = sess
container_def = sm_model.pipeline_container_def(instance_type=BATCH_INSTANCE_TYPE)
sess.create_model(model_name, role, container_def)

### Run the Inference Pipeline

MLeap can potentially offer significant performance improvements in production. Refer to the following benchmarks, which demonstrate inference times that are magnitudes quicker:
https://github.com/combust/mleap/blob/master/mleap-benchmark/README.md

In the next step, we're going to run our inference pipeline in batch. This differs from the previous SageMaker Batch Transform job, which operated on data sets that we had pre-process. In this case, our Batch Transform uses our PipelineModel, which operates on our raw data.

In [111]:
s3uri_batch_output = 's3://{}/{}/pipeline/out'.format(bucket, prefix)
s3uri_batch_input= 's3://reinvent2018-sagemaker-pytorch/datasets/uci-telco-churn/raw/churn_nolabels.csv'    

transformer = sagemaker.transformer.Transformer(model_name = model_name,
                                                instance_count=1,
                                                strategy='SingleRecord',
                                                assemble_with='Line',
                                                instance_type= BATCH_INSTANCE_TYPE,
                                                accept = 'text/csv',
                                                sagemaker_session=sess,
                                                base_transform_job_name='serial-inference-batch',
                                                output_path=s3uri_batch_output)
 
transformer.transform(s3uri_batch_input, split_type= 'Line', content_type= 'text/csv')
transformer.wait()

.....................................!


This Batch Transform job transforms your raw data into churn scores, which can then be loaded into your database. With Redshift, you can run a COPY command as demonstrated earlier in the lab. 

In practice, this pipeline should be automated end-to-end using an orchestration tool like Apache Airflow or AWS Step Functions. The Batch Transform job could be kicked off on a schedule, or based on an S3 event when a daily batch of data lands in the data lake. Once the Batch Transform job completes successfully, the workflow can simply load the churn scores into your datawarehouse, and your BI users will be enabled to run predictive churn analytics.