# Predict all cause 30-day hospital readmission risk

## Thinking about Data

It is important to understand the relationship between different tables and the data in those tables. This is important to identify the information which is relevant to the prediction. The tool that you used to generate the data created different csv files which you will upload to S3 bucket. Based on the generated data, you can see the below relationship between different tables within your data set. If you are using your own data for this notebook, it will help to create some visualization of the data to better understand the relationship.

## Cleaning and Visualizing Your Data

<img src="EHR.png">

## Steps involved in this machine learning project

1. Understanding of your data 
2. Storing and converting your data into parquet for optimized performance and storage
3. Feature selection and feature engineering using Spark
4. Data pre-processing - StringIndexer and OneHotEncoding to convert categorical variables into required training data
5. Train Spark ML model for data pre-processing and serialize using MLeap library to be used during inference pipeline
6. Convert the data set into XGBoost supported format i.e. CSV from Spark Data Frame
7. Split the data set into training and validation for model training and validation
8. Train XGBoost Model using SageMaker XGBoost algorithm and validate model prediction using validation data set
9. Tune the trained model using Hyperparameter tuning jobs for required HPO parameters
10. Get the best tuned model and create inference pipeline which includes Spark ML model and XGBoost Model
11. Create the end point configuration to deploy the inference pipeline
12. Deploy the inference pipeline for real time prediction
13. Invoke real time prediction API for a request.

You need to **update** **S3 Bucket** and **KMS Key Id** with the values for your environment. This notebook requires certain resources to be created. Cloud Formation template has been provided to create the required resources.

In [None]:
import boto3
import botocore 
import time

bucket = '' # Update this to the bucket that was created in your lab account as part of this enviroment.
sse_kms_id = '' ## Get this value from Cloud Formation template
glue_crawler_db = 'ehr-db-readmission' ## Glue Database created by Glue crawler (contains raw data)
s3 = boto3.resource('s3')
region = boto3.Session().region_name

# Preprocessing using Apache Spark in AWS Glue

### Upload Glue Scripts to S3

In [None]:
%%bash

# Download Dependencies
wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip
wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar

In [None]:
 # Uploading Glue scripts and dependencies to S3
from sagemaker import Session as Sess

# SageMaker session
sess = Sess()

result = sess.upload_data(path='../glue_scripts/convert_to_parquet', bucket=bucket, key_prefix='scripts', extra_args={"ServerSideEncryption": "aws:kms",'SSEKMSKeyId':sse_kms_id })
print(result)
result = sess.upload_data(path='../glue_scripts/produce_training_data', bucket=bucket, key_prefix='scripts', extra_args={"ServerSideEncryption": "aws:kms",'SSEKMSKeyId':sse_kms_id })
print(result)
result = sess.upload_data(path='python.zip', bucket=bucket, key_prefix='scripts', extra_args={"ServerSideEncryption": "aws:kms",'SSEKMSKeyId':sse_kms_id })
print(result)
result = sess.upload_data(path='mleap_spark_assembly.jar', bucket=bucket, key_prefix='scripts', extra_args={"ServerSideEncryption": "aws:kms",'SSEKMSKeyId':sse_kms_id })
print(result)

### Upload raw data to S3
#### SKIP THIS STEP IF YOU GENERATED YOUR OWN DATA SET. MAKE SURE TO UPDATE SCRIPTS ACCORDINGLY AS PER YOUR  DATA SET

In [None]:
%%bash

# Download raw data
wget https://hospital-readmission-blog.s3-us-west-2.amazonaws.com/synthetic_5000_population.zip
unzip -d raw-data synthetic_5000_population.zip

In [None]:
%%bash -s "$bucket" "$sse_kms_id"

# Uploading raw data to S3
export bucket=$1
export sse_kms_id=$2

YY=$(date +%Y)
MM=$(date +%m)
DD=$(date +%d)

aws s3 cp raw-data/allergies.csv s3://$bucket/raw-data/allergies/$YY/$MM/$DD/allergies.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/imaging_studies.csv s3://$bucket/raw-data/imaging_studies/$YY/$MM/$DD/imaging_studies.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/payer_transitions.csv s3://$bucket/raw-data/payer_transitions/$YY/$MM/$DD/payer_transitions.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/patients.csv s3://$bucket/raw-data/patients/$YY/$MM/$DD/patients.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/encounters.csv s3://$bucket/raw-data/encounters/$YY/$MM/$DD/encounters.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/conditions.csv s3://$bucket/raw-data/conditions/$YY/$MM/$DD/conditions.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/medications.csv s3://$bucket/raw-data/medications/$YY/$MM/$DD/medications.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/careplans.csv s3://$bucket/raw-data/careplans/$YY/$MM/$DD/careplans.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/observations.csv s3://$bucket/raw-data/observations/$YY/$MM/$DD/observations.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/procedures.csv s3://$bucket/raw-data/procedures/$YY/$MM/$DD/procedures.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/immunizations.csv s3://$bucket/raw-data/immunizations/$YY/$MM/$DD/immunizations.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/organizations.csv s3://$bucket/raw-data/organizations/$YY/$MM/$DD/organizations.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/providers.csv s3://$bucket/raw-data/providers/$YY/$MM/$DD/providers.csv --sse aws:kms --sse-kms-key-id $sse_kms_id
aws s3 cp raw-data/payers.csv s3://$bucket/raw-data/payers/$YY/$MM/$DD/payers.csv --sse aws:kms --sse-kms-key-id $sse_kms_id


### Your S3 Bucket is now ready with raw data and required scripts

In [None]:
##Check for uploaded raw data on S3 bucket
!aws s3 ls 's3://'$bucket'/raw-data/' --recursive --page-size=1 --human-readable --summarize

#### Let's look at Patients Data

In [None]:
import pandas as pd

print ("Patients Data")
df = pd.read_csv('raw-data/patients.csv')
df.head(3)

#### Let's look at Encounters Data

In [None]:
import pandas as pd

print ("Encounters Data")
df = pd.read_csv('raw-data/encounters.csv')
df.head(3)

### Create and run AWS Glue Preprocessing Job

Next we'll be creating Glue client via Boto3 so that we can invoke the start_job_run API of Glue. This API creates an immutable run/execution corresponding to the job definition created above. We will require the job_run_id for the particular job execution to check for status. We'll pass the data and model locations as part of the job execution parameters.

Finally, we will check for the job status to see if it has succeeded, failed or stopped. Once the job is succeeded, we have the transformed data into S3 in required format. If the job fails, you can go to AWS Glue console, click on Jobs tab on the left, and from the page, click on this particular job and you will be able to find the CloudWatch logs (the link under Logs) link for these jobs which can help you to see what exactly went wrong in the job execution.

### Start AWS Glue crawler

- You can now login to [AWS console](https://console.aws.amazon.com/glue/home?region=us-east-1#catalog:tab=crawlers) to run AWS Glue crawler, look for the crawler named **ehr-crawler-readmission** (default name provided in CloudFormation) and run the crawler. Once the crawler is successfully run i.e. the attribute **Tables Added** will be updated to the number of tables discovered by the crawler. Below screenshots can guide you through the process.

- Click on Databases in AWS Glue console and look for database named **ehr-db-readmission**(default name provided in CloudFormation) and click on it. You can then click on the link `Tables in ehr-db-readmission` to check the available tables and associated properties. 

![Run Crawler](../images/6.png)

![Run Crawler](../images/7.png)

![Glue Database](../images/8.png)

![Glue Database](../images/9.png)

#### Start CSV to Parquet conversion Glue Job and Wait for Job to Succeed

This Glue Job is setup to use Spark 2.4 and Python 3.0. The job requires a python script which is uploaded to the S3 bucket and provided to the job while creating this job using Cloud Formation template. You can generate these scripts in Glue using the console so that you don't have to write the script from scratch and can make modifications to the generated script as per your use case. In this case, we generated the script to read the data from Glue crawler database and then selecting only the required columns based on domain knowledge for pre-processing and model training. We will drop the null values and update the data types to be supported by our machine learning algorithm. Finally, the data set is saved to S3 bucket in parquet with partition keys.

In [None]:
### Create and run AWS Glue Preprocessing Job to convert CSV to Parquet Format

# Define the Job in AWS Glue
glue = boto3.client('glue')

try:
    glue.get_job(JobName='glue-etl-convert-to-parquet')
    print("Job already exists, continuing...")
except glue.exceptions.EntityNotFoundException:
    print('{}\n'.format("Job Not Found, Check the output of Cloud Formation template"))

# Run the job in AWS Glue
try:
    job_name='glue-etl-convert-to-parquet'
    response = glue.start_job_run(JobName=job_name,
                                  Arguments={
                                            '--s3_bucket' : bucket,
                                            '--glue_crawler_db' : glue_crawler_db ##This value is from cloud formation template
                                    })
    job_run_id = response['JobRunId']
    print('{}\n'.format(response))
except glue.exceptions.ConcurrentRunsExceededException:
    print("Job run already in progress, continuing...")


job_url = "https://console.aws.amazon.com/glue/home?region="+region+"#etl:tab=jobs"
print ("You can go to AWS Glue Console to check status for glue-etl-convert-to-parquet as shown in below screenshots: "+job_url)


![Glue Database](../images/16.png)

#### Start Glue Job to Produce Training Data
#### Note - This job runs for approximately 30 min depending on the data set size

In [None]:
### Create and run AWS Glue Preprocessing Job

# Define the Job in AWS Glue
glue = boto3.client('glue')

try:
    glue.get_job(JobName='glue-etl-produce-traing-data')
    print("Job already exists, continuing...")
except glue.exceptions.EntityNotFoundException:
    print('{}\n'.format("Job Not Found, Check the output of Cloud Formation template"))

# Run the job in AWS Glue
try:
    job_name='glue-etl-produce-traing-data'
    response = glue.start_job_run(JobName=job_name,
                                  Arguments={
                                            '--sse_kms_id': sse_kms_id,
                                            '--s3_bucket' : bucket
                                    })
    job_run_id = response['JobRunId']
    print('{}\n'.format(response))
except glue.exceptions.ConcurrentRunsExceededException:
    print("Job run already in progress, continuing...")


This Glue Job setup uses **Spark 2.2 and Python 2.0**. The job requires a python script which is uploaded to the S3 bucket from the code repository. In this case, we are using Spark 2.2 instead of latest supported Spark version i.e. Spark 2.4 since **MLeap** serialization libraries provided for serializing Spark ML model for data pre-processing currently does not support Spark 2.4. You can check more details about this on https://github.com/aws/sagemaker-sparkml-serving-container. In the provided script, all S3 partitions are read but you have the flexibility to filter partitions to read data based on required partition key. In this solution, partitions are based on the date but you can define your own *partition strategy*. As per the understanding of your data and domain knowledge, multiple tables are joined and unncessary columns are dropped to perform feature selection. Since this notebook uses **XGBoost** which is a Supervised learning model, you need to provide **Label data** information in the training data set. The script calculates Label data i.e. **30-day readmission** by sorting all the patient encounters by timestamp for a specific patient_id and then taking **a difference of encounter_stop from previous encounter and encounter_start from current encounter** which provides the number of hours from the last encounter and can be used to identify if the encounter was within last 30 days or not. For Feature engineering, Imputation technique is used to fill some of the missing values in the training data. The script also converts **birth_date** into **age** for feature engineering. Once all this is done, it generates feature vector by leveraging **Spark ML OneHotEncoding** and then serializes the model using MLeap serialization library. Since XGBoost algorithm supports CSV format for training data, you need to convert Spark Data Frame into CSV files and save to S3 bucket.

#### Check AWS Glue Job Status and Wait for it to Succeed

In [None]:
job_url = "https://console.aws.amazon.com/glue/home?region="+region+"#etl:tab=jobs"
print ("You can go to AWS Glue Console to check status for glue-etl-produce-traing-data: "+job_url)

#### Explore AWS Glue Jobs
![Glue Jobs](../images/14.png)
![Glue Jobs](../images/15.png)

 ### Validate Spark ML model for data pre-processing
Once the Open Jupyter notebook i.e. **Sparkml-model-test.ipynb** to understand and validate the prediction generated by Spark ML model for data pre-processing. Once done, come back to this notebook.
![Sparkml-model-test.ipynb](../images/13.png)

# Training an Amazon SageMaker XGBoost Model

Now that you have our data preprocessed in a format that XGBoost recognizes, you can run a simple training job to train a binary classifier model on our data. You can run this entire process in this Jupyter notebook. Run the following cell, labeled **Run Amazon SageMaker XGBoost Training Job**. This runs **XGBoost algorithm** training job in Amazon SageMaker, and monitors the progress of the job. Once the job is ‘Completed’, you can move on to the next cell.

After a few minutes, the job should complete successfully, and output model artifacts saved to the specified S3 location. Once this is done, the model is deployed to an inference pipeline that consists of pre-processing, inference and post-processing steps.

### Run Amazon SageMaker XGBoost Training Job

Now you will use SageMaker XGBoost algorithm to train model on this dataset. You already have the S3 location
where the preprocessed training data was uploaded as result of the Glue pre-processing job. You need to update **train_prefix** and **validation_prefix** with S3 prefix location of training and validation data set

In [None]:
## Get S3 Location of Training Data Set
!aws s3 ls 's3://'$bucket'/train-data/' --recursive --page-size=1

In [None]:
## Get S3 Location of Validation Data Set
!aws s3 ls 's3://'$bucket'/validation-data/' --recursive --page-size=1

In [None]:
## Get S3 Location of Validation Data Set
!aws s3 ls 's3://'$bucket'/test-data/' --recursive --page-size=1

In [None]:
##Update training and validation data set S3 prefix 
train_prefix = "train-data/2020/4/9" ## Update S3 Prefix from above output
validation_prefix = "validation-data/2020/4/9" ## Update S3 Prefix from above output
test_data_prefix = 'test-data/2020/4/9' ## Update S3 Prefix from above output

In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri
import boto3
import botocore
from botocore.exceptions import ClientError
from sagemaker import get_execution_role

from sagemaker import Session as Sess

# SageMaker session
sess = Sess()

# Boto3 session
session = boto3.session.Session()
role = get_execution_role()
region = session.region_name

training_image = get_image_uri(sess.boto_region_name, 'xgboost', repo_version="0.90-1")
print (training_image)

In [None]:
### Run Amazon SageMaker XGBoost Training Job
from sagemaker.amazon.amazon_estimator import get_image_uri

import random
import string


# Get XGBoost container image for current region
training_image = get_image_uri(region, 'xgboost', repo_version="0.90-1")

# Create a unique training job name
training_job_name = 'xgboost-readmission-'+''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(8))

# Create the training job in Amazon SageMaker
sagemaker = boto3.client('sagemaker')
response = sagemaker.create_training_job(
    TrainingJobName=training_job_name,
    HyperParameters={
        'early_stopping_rounds': '5',
        'num_round': '10',
        'objective': 'binary:logistic', ## Binary classification since readmission will be Yes or NO. Get probability of binary classification
        'eval_metric': 'auc' ## Evaluation Metic is Area Under Curve

    },
    AlgorithmSpecification={
        'TrainingImage': training_image,
        'TrainingInputMode': 'File',
    },
    RoleArn=role,
    InputDataConfig=[
        {
            'ChannelName': 'train',
            'DataSource': {
                'S3DataSource': {
                    'S3DataType': 'S3Prefix',
                    'S3Uri': 's3://{}'.format(bucket+'/'+train_prefix),
                    'S3DataDistributionType': 'FullyReplicated'
                }
            },
            'ContentType': 'text/csv',
            'CompressionType': 'None',
            'RecordWrapperType': 'None',
            'InputMode': 'File'
        },
        {
            'ChannelName': 'validation',
            'DataSource': {
                'S3DataSource': {
                    'S3DataType': 'S3Prefix',
                    'S3Uri': 's3://{}'.format(bucket+'/'+validation_prefix),
                    'S3DataDistributionType': 'FullyReplicated'
                }
            },
            'ContentType': 'text/csv',
            'CompressionType': 'None',
            'RecordWrapperType': 'None',
            'InputMode': 'File'
        },
    ],
    OutputDataConfig={
        'S3OutputPath': 's3://{}/xgb'.format(bucket),
        'KmsKeyId' : sse_kms_id
    },
    ResourceConfig={
        'InstanceType': 'ml.m5.4xlarge', ## For XGBoost use memory optimized instances since all the data is loaded into memory so we need memory intensive Ec2 instance
        'InstanceCount': 2, ## Distributed training
        'VolumeSizeInGB': 10
    },
    StoppingCondition={
        'MaxRuntimeInSeconds': 3600
    },)

print('{}\n'.format(response))

# Monitor the status until completed
job_run_status = sagemaker.describe_training_job(TrainingJobName=training_job_name)['TrainingJobStatus']
while job_run_status not in ('Failed', 'Completed', 'Stopped'):
    job_run_status = sagemaker.describe_training_job(TrainingJobName=training_job_name)['TrainingJobStatus']
    print (job_run_status)
    time.sleep(30)



### Hyperparameter Tuning to find the best model

Explore Hyperparameters of SageMaker XGBoost Algorithm at https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost_hyperparameters.html

Amazon SageMaker automatic model tuning, also known as hyperparameter tuning, finds the best version of a model by running many training jobs on your dataset using the algorithm and ranges of hyperparameters that you specify. It then chooses the hyperparameter values that result in a model that performs the best, as measured by a metric that you choose.

In [None]:
### Run Amazon SageMaker XGBoost Training Job
from sagemaker.amazon.amazon_estimator import get_image_uri

# Get XGBoost container image for current region
training_image = get_image_uri(region, 'xgboost', repo_version="0.90-1")

training_job_definition = {
    "AlgorithmSpecification": {
      "TrainingImage": training_image,
      "TrainingInputMode": "File"
    },
    "InputDataConfig": [
      {
        "ChannelName": "train",
        "CompressionType": "None",
        "ContentType": "text/csv",
        "DataSource": {
          "S3DataSource": {
            "S3DataDistributionType": "FullyReplicated",
            "S3DataType": "S3Prefix",
            "S3Uri": 's3://{}'.format(bucket+'/'+train_prefix)
          }
        }
      },
      {
        "ChannelName": "validation",
        "CompressionType": "None",
        "ContentType": "text/csv",
        "DataSource": {
          "S3DataSource": {
            "S3DataDistributionType": "FullyReplicated",
            "S3DataType": "S3Prefix",
            "S3Uri": 's3://{}'.format(bucket+'/'+validation_prefix)
          }
        }
      }
    ],
    "OutputDataConfig": {
      "S3OutputPath": "s3://{}/xgb".format(bucket),
      "KmsKeyId" : sse_kms_id
    },
    "ResourceConfig": {
      "InstanceCount": 2, ## Distributed training
      "InstanceType": "ml.m5.4xlarge",
      "VolumeSizeInGB": 10
    },
    "RoleArn": role,
    "StaticHyperParameters": {
      "eval_metric": "auc",
      "num_round": "100",
      "objective": "binary:logistic",
      "rate_drop": "0.3",
    },
    "StoppingCondition": {
      "MaxRuntimeInSeconds": 3600
    }
}

In [None]:
tuning_job_config = {
    "ParameterRanges": {
      "CategoricalParameterRanges": [],
      "ContinuousParameterRanges": [
        {
          "MaxValue": "1",
          "MinValue": "0",
          "Name": "eta",
        },
        {
          "MaxValue": "10",
          "MinValue": "1",
          "Name": "min_child_weight",
        },
        {
          "MaxValue": "2",
          "MinValue": "0",
          "Name": "alpha",            
        }
      ],
      "IntegerParameterRanges": [
        {
          "MaxValue": "10",
          "MinValue": "1",
          "Name": "max_depth",
        }
      ]
    },
    "ResourceLimits": {
      "MaxNumberOfTrainingJobs": 5,
      "MaxParallelTrainingJobs": 2
    },
    "Strategy": "Bayesian",
    "HyperParameterTuningJobObjective": {
      "MetricName": "validation:auc",
      "Type": "Maximize"
    }
  }

In [None]:
# Create a unique training job name
import random
import string

tuning_job_name = 'xgboost-readmission-'+''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(8))

smclient = boto3.Session().client('sagemaker')

smclient.create_hyper_parameter_tuning_job(HyperParameterTuningJobName = tuning_job_name,
                                            HyperParameterTuningJobConfig = tuning_job_config,
                                            TrainingJobDefinition = training_job_definition)

In [None]:
# Monitor the status until completed
job_run_status = smclient.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=tuning_job_name)['HyperParameterTuningJobStatus']
while job_run_status not in ('Failed', 'Completed', 'Stopped'):
    job_run_status = smclient.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=tuning_job_name)['HyperParameterTuningJobStatus']
    print (job_run_status)
    time.sleep(30)

# Deploying an Amazon SageMaker Endpoint 

Now that you have a set of model artifacts, set up an inference pipeline that executes sequentially in Amazon SageMaker. You start by setting up a Model, which will point to all of your model artifacts, then you setup an **SageMaker Endpoint configuration** to specify your instance configuration, and finally you stand up an **SageMaker Endpoint**. 

With this endpoint, you pass the raw data and no longer need to write pre-processing logic in your application code. The same pre-processing steps that ran for training can be applied to inference input data for better consistency and ease of management.

**Deploying a model in SageMaker requires two components:**
- Docker image residing in ECR.
- Model artifacts residing in S3.

**SparkML**

For SparkML, Docker image for MLeap based SparkML serving is provided by SageMaker team. For more information on this, please see SageMaker SparkML Serving. MLeap serialized SparkML model was uploaded to S3 as part of the SparkML job you executed in AWS Glue.

**XGBoost**

For XGBoost, you use the same Docker image you used for training. The model artifacts for XGBoost are uploaded as part of the training job you just ran.

# Building an Inference Pipeline consisting of SparkML & XGBoost models for a realtime inference endpoint

In [None]:
##Get the best training job name 
best_training_job = smclient.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=tuning_job_name)['BestTrainingJob']['TrainingJobName']
print ('Best training job : ' + best_training_job)

info = smclient.describe_training_job(TrainingJobName=best_training_job)
best_model_data_loc = info['ModelArtifacts']['S3ModelArtifacts']
print('Model Artifact Location : ' + best_model_data_loc)


### Passing the schema of the payload via environment variable
**SparkML serving container** needs to know the schema of the request that'll be passed to it while calling the **predict** method. In order to alleviate the pain of not having to pass the schema with every request, **sagemaker-sparkml-serving** allows you to pass it via an environment variable while creating the model definitions. *This schema definition will be required in your next step for creating a model.*

In [None]:
import json
schema = {"input":[{"type":"string","name":"encounters_encounterclass"},{"type":"string","name":"patient_gender"},{"type":"string","name":"patient_marital"},{"type":"string","name":"patient_ethnicity"},{"type":"string","name":"patient_race"},{"type":"string","name":"encounters_reasoncode"},{"type":"string","name":"encounters_code"},{"type":"string","name":"procedures_code"},{"type":"double","name":"patient_healthcare_expenses"},{"type":"double","name":"patient_healthcare_coverage"},{"type":"double","name":"encounters_total_claim_cost"},{"type":"double","name":"encounters_payer_coverage"},{"type":"double","name":"encounters_base_encounter_cost"},{"type":"double","name":"procedures_base_cost"},{"type":"long","name":"providers_utilization"},{"type":"double","name":"age"}],"output":{"type":"double","name":"features","struct":"vector"}}
schema_json = json.dumps(schema)
print(schema_json)

### Creating a `PipelineModel` which comprises of the SparkML and XGBoost model in the right order

Next you create a **SageMaker PipelineModel** with SparkML and XGBoost.The `PipelineModel` will ensure that both the containers get deployed behind a *single API endpoint* in the correct order. The same model would later be used for Batch Transform as well to ensure that a single job is sufficient to do prediction against the Pipeline. 

Here, during the `Model` creation for SparkML, you will pass the schema definition that you built in the previous cell.

In [None]:
!aws s3 ls 's3://'$bucket'/spark-ml-model' --recursive

In [None]:
## Update S3 prefix
sparkml_model_prefix = 'spark-ml-model/2020/4/9'

In [None]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

# Get XGBoost container image for current region
training_image = get_image_uri(region, 'xgboost', repo_version="0.90-1")


sparkml_data = 's3://{}/{}/{}'.format(bucket,sparkml_model_prefix,'model.tar.gz')
# passing the schema defined above by using an environment variable that sagemaker-sparkml-serving understands
sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})
xgb_model_data = '{}'.format(best_model_data_loc)
xgb_model = Model(model_data=xgb_model_data, image=training_image)

model_name = 'inference-pipeline-readmission-' + timestamp_prefix
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, xgb_model])

### Deploying the `PipelineModel` to an endpoint for realtime inference
Next you deploy the model you just created with the `deploy()` method to start an inference endpoint and then you will send some requests to the endpoint to verify that it works as expected.

In [None]:
endpoint_name = 'inference-pipeline-readmission-ep-' + timestamp_prefix
## Deploying two instaces for High Availability, SageMaker will automatically deploy them in different AZs
sm_model.deploy(initial_instance_count=2, instance_type='ml.m5.4xlarge', endpoint_name=endpoint_name) 

In [None]:
# Monitor the status until completed
endpoint_status = sagemaker.describe_endpoint(EndpointName='inference-pipeline-readmission-ep-' + timestamp_prefix)['EndpointStatus']
while endpoint_status not in ('OutOfService','InService','Failed'):
    endpoint_status = sagemaker.describe_endpoint(EndpointName='pipeline-xgboost-readmission')['EndpointStatus']
    print(endpoint_status)
    time.sleep(30)


### Invoking the newly created inference endpoint with a payload to transform the data
Now you invoke the endpoint with a valid payload that SageMaker SparkML Serving can recognize. Pass the input payload to the request:

* Pass it as a valid CSV string. In this case, the schema passed via the environment variable will be used to determine the schema. For CSV format, every column in the input has to be a basic datatype (e.g. int, double, string) and it can not be a Spark `Array` or `Vector`.


#### Get Test Data from S3

You will first get the test data to get the prediction values. Use these values in the request to get the prediction. You will then see how the payload can be passed to the endpoint in CSV format.

In [None]:
## Install PyArrow libraries to read test data set parquet files directly from S3 filesystem
!pip install pyarrow==0.15.1

In [None]:
## Load Test CSV into pandas
import pandas as pd 
import s3fs
import pyarrow.parquet as pq

fs = s3fs.S3FileSystem()

# Python 3.6 or later
p_dataset = pq.ParquetDataset(
    f"s3://{bucket}/{test_data_prefix}",
    filesystem=fs
)

test_data = p_dataset.read().to_pandas()


#### Fetch Test Data where the patient was not readmitted within 30 days

In [None]:
test_data_0 = test_data[(test_data['readmission'] == 0) & (test_data['encounters_reasoncode'] != 0) & (test_data['procedures_code'] != 0)]
test_data_0.head(2)



#### Fetch Test Data where the patient was readmitted within 30 days

In [None]:
test_data_1 = test_data[(test_data['readmission'] == 1) & (test_data['encounters_reasoncode'] != 0)]
test_data_1.head(5)


#### Passing the payload in CSV format
Based on the test data, you can update the request provided to prediction API and validate the results. Note that you have used `binary:logistic` for training your model and the output will **probability** of patient being readmitted within 30 days

In [None]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
from sagemaker import Session as Sess
# SageMaker session
sess = Sess()

## Payload schema = encounters_encounterclass,patient_gender,patient_marital,patient_ethnicity,patient_race,
## encounters_reasoncode,encounters_code,procedures_code,patient_healthcare_expenses,
## patient_healthcare_coverage,encounters_total_claim_cost,encounters_payer_coverage,encounters_base_encounter_cost,
## procedures_base_cost,providers_utilization,age
encounters_encounterclass='ambulatory'
patient_gender='M'
patient_marital='NM'
patient_ethnicity='nonhispanic'
patient_race='white'
encounters_reasoncode='10509002'
encounters_code='185345009'
procedures_code='23426006'
patient_healthcare_expenses='167365.25'
patient_healthcare_coverage='2638.16'
encounters_total_claim_cost='129.16'
encounters_payer_coverage='49.16'
encounters_base_encounter_cost='129.16'
procedures_base_cost='516.65'
providers_utilization='10924'
age='8'

payload = encounters_encounterclass+","+patient_gender+","+patient_marital+","+patient_ethnicity+","+patient_race+","+encounters_reasoncode+","+encounters_code+","+procedures_code+","+patient_healthcare_expenses+","+patient_healthcare_coverage+","+encounters_total_claim_cost+","+encounters_payer_coverage+","+encounters_base_encounter_cost+","+procedures_base_cost+","+providers_utilization+","+age

predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=csv_serializer,
                                content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_CSV)
print('probability of readmission:')
print(predictor.predict(payload))


#### Different payload
Now let's update the request with different values for encounter_class, procedure_code, encounter_code, gender, patient_healthcare_expenses, etc. and see the results. Try to change values and check the prediction to understand the model better.

In [None]:
encounters_encounterclass='ambulatory'
patient_gender='F'
patient_marital='M'
patient_ethnicity='nonhispanic'
patient_race='white'
encounters_reasoncode='10509002'
encounters_code='439740005'
procedures_code='274804006'
patient_healthcare_expenses='809476.7'
patient_healthcare_coverage='39888.39'
encounters_total_claim_cost='1456'
encounters_payer_coverage='64.16'
encounters_base_encounter_cost='129.16'
procedures_base_cost='516.65'
providers_utilization='12885'
age='33'

payload = encounters_encounterclass+","+patient_gender+","+patient_marital+","+patient_ethnicity+","+patient_race+","+encounters_reasoncode+","+encounters_code+","+procedures_code+","+patient_healthcare_expenses+","+patient_healthcare_coverage+","+encounters_total_claim_cost+","+encounters_payer_coverage+","+encounters_base_encounter_cost+","+procedures_base_cost+","+providers_utilization+","+age

print('probability of readmission:')
print(predictor.predict(payload))


#### [Optional] Deleting the Endpoint
If you do not plan to use this endpoint, then it is a good practice to delete the endpoint so that you do not incur the cost of running it.

In [None]:
boto_session = sess.boto_session
sm_client = boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint_name)