# Predict park accessibility in NYC using Machine Learning

In this notebook, we work through how we can ingest the parquet file produced by our ETL pipeline into an ML algorithm for training a model to predict accessibility. We will go through the entire data science lifecyle -- 

data ingest --> data exploration --> data cleaning --> staging the training data --> machine learning.

Accessiblity here is defined as whether parks are categorized as Level 4 or not.  

For Machine learning, we will use two approaches -- 

1/ Train an XGBoost model to predict whether accessible parks are available based on individual tax returns and park data 

2/ Train a model using SageMaker Autopilot and compare the results. SageMaker Autopilot is a fully managed ML service that can train various machine learning models with different hyperparameters, and allow you to pick the best one. Here we will use the SageMaker Python SDK to make API calls to SageMaker Autopilot to train and deploy an ML model.

# Import Libraries and Data

Note that to import the data in parquet format into a pandas dataframe, we will need to install pyarrow library.

In [None]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, auc, classification_report
from sklearn.model_selection import train_test_split
import io
import sagemaker.amazon.common as smac
import boto3
import re
from sagemaker import get_execution_role
import sagemaker
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.predictor import csv_serializer, json_deserializer


In [None]:
# if needed, updated boto3
!pip install --upgrade pip
if boto3.__version__ != '1.10.33':
    !pip install boto3=='1.10.33'
else:
    pass


In [None]:
!pip install pyarrow==0.12.0
#import pyarrow.parquet as pq

In [None]:
FILENAME = 'part-00000-588c7835-a113-419a-8cea-580b11e397e8-c000.snappy.parquet' # replace this with your parquet file
df = pd.read_parquet(FILENAME, engine ='pyarrow')
df.head()

In [None]:
df.Playground_ID.value_counts()

In [None]:
print("Shape of Dataset = {}".format(df.shape))

In [None]:
list(df.columns)

# Data Cleaning prior to Exploration

There are several data cleaning steps that need to be performed:

1) There isn't enough data to identify each playground Id separately but we can extract the Borough information as a categorical variable <br/>
2) We need to convert the AGI to ordinal values <br/>
3) We need to convert Accessible and Adaptible Swing to numeric values <br/>
4) We will drop the name and location columns <br/>
5) We will drop any columns on volunteer prepared taxes <br/>

In [None]:
df['Borough'] = [str(x)[0] for x in df.Playground_ID.values]
convert_cols = {'Accessible': {'Y': 1, 'N': 0},
                'Adaptive_Swing': {'Y': 1, 'N': 0}}#,
              # 'adjusted_gross_income': {'':0, '$1 under $25,000':1, '$25,000 under $50,000': 2, '$50,000 under $75,000':3, 
              #                          '$75,000 under $100,000':4, '$100,000 under $200,000':5, '$200,000 or more':6}}
df.replace(convert_cols, inplace = True)

In [None]:
COLSCONVERT = ['Adaptive_Swing', 'num_of_exemptions',
       'Accessible', 'num_of_dependents', 'num_of_returns',
       'num_of_joint_returns', 'num_of_head_of_household_returns',
       'num_of_single_returns']
indexlist = []
for col in COLSCONVERT:
    indexlist = indexlist + list(df.loc[df[col]=='**'].index)
df = df.drop(index= indexlist)
df[COLSCONVERT] = df[COLSCONVERT].apply(pd.to_numeric)
df.replace({'Level': {None:0, '1':0, '2':0, '3':0, '4':1}}, inplace=True)

In [None]:
COLS_TO_DROP = ['lat', 'lon', 'Playground_ID', 'Name', 'Location', 'Prop_ID','num_of_volunteer_prepared_returns_Total',
                  'num_of_volunteer_prepared_returns_Num_of_volunteer_income_tax_assistance_prepared_returns',
                 'num_with_paid_preparers_signature','num_of_volunteer_prepared_returns_Num_of_tax_counseling_for_the_elderly_prepared_returns', 
                'zipcode','School_ID','Status']
df = df.drop(columns = COLS_TO_DROP)
print(df.shape)

In [None]:
# let's save this dataframe at this stage as a separate dataframe for AutoPilot
autodf = df.copy()

Mark everything with Level 4 as 1, and else 0. Why have we done this?

Look at the graph below. It shows that most playgrounds in NYC are classified as Level 4 playgrounds.  Level 4 means Accessible Playgrounds with Transfer Platforms and Ground Level Play Features; and thus represents the most sophisticated playgrounds.

We want to predict whether a given Playground ID in a given school district, given demographic information about the income levels of households in that zipcode is a Level 4 playground or not. 

In [None]:
sns.countplot(df.Level)

In [None]:
print("Baseline Model accuracy = {}".format(len(df[df['Level']==1])/len(df)))

In [None]:
df.dropna(inplace=True)
df.shape

# Data Exploration

Having converted the raw data into numerical format, we can explore the dataset and extract some meaningful information and look for any correlations in the data, which may be indicative of whether income is correlated to playground type

In [None]:
# Total returns versus AGI shows an overall trend that we need to normalize out. 
sns.barplot(x='size_of_adjusted_gross_income', y = 'num_of_returns', data = df)
plt.title("Total Returns versus AGI")
plt.show()

In [None]:
# Number of dependents versus number of exceptions
sns.regplot(x = 'num_of_dependents', y = 'num_of_exemptions', data = df)
plt.title("Scatter plot of dependents versus exceptions")
plt.show()

In [None]:
# Introduce new columns which are the normalized number of joint, head of household and single returns
COLS_TO_NORMALIZE = ['num_of_joint_returns', 'num_of_head_of_household_returns',
       'num_of_single_returns']
for col in COLS_TO_NORMALIZE:
    df['normalized_'+ col] = df[col]/df['num_of_returns']

In [None]:
sns.barplot(x='size_of_adjusted_gross_income', y = 'normalized_num_of_head_of_household_returns', data = df)
plt.title("AGI versus normalized HOH returns")
plt.show()

In [None]:
sns.barplot(x='Accessible', y = 'num_of_dependents', hue = 'Borough', data = df)
plt.title("Playground Accessibility by number of dependents and Borough")
plt.show()

In [None]:
sns.boxplot(x = 'Accessible', y = 'normalized_num_of_joint_returns', hue = 'Borough', data = df)
plt.title("Playground accessibility by normalized number of joint returns")
plt.show()

In [None]:
# Not much correlation between adjusted gross income level and number of available playgrounds. Certain zip codes simply
# dont have any filers in the high income categories. 
sns.countplot(x = 'Level' , hue = 'size_of_adjusted_gross_income', data = df)
plt.title("Distribution of Accessibility Levels by Income Accross all Boroughs")
plt.show()

In [None]:
# How does this play out at a Borough specific level? Let's look at Manhattan and Bronx
bordf = df[df['Borough']=='X']
sns.countplot(x = 'Level', hue = 'size_of_adjusted_gross_income', data = bordf)
plt.title("Distribution of Accessibility Levels by Income in the Bronx")
plt.show()

In [None]:
sns.countplot(x = 'size_of_adjusted_gross_income', hue = 'Level', data = bordf)
plt.title("Distribution of Accessibility Levels by Income in the Manhattan")
plt.show()

In [None]:
# How does this play out at a Borough specific level? Let's look at Manhattan and Bronx
bordf = df[df['Borough']=='M']
sns.countplot(x = 'Level', hue = 'size_of_adjusted_gross_income', data = bordf)
plt.title("Distribution of Accessibility Levels by Income in the Manhattan")
plt.show()

In [None]:
sns.countplot(x = 'size_of_adjusted_gross_income', hue = 'Level', data = bordf)
plt.title("Distribution of Accessibility Levels by Income in the Manhattan")
plt.show()

The second plot in each Borough category shows that the availability distribution of playgrounds is almost identical accross all income groups, suggesting that these variables are not strongly correlated

In [None]:
# Make a correlation plot to confirm. Convert the Level column to numeric
df['Level'] = df.Level.astype('float32')
sns.heatmap(df.corr())
plt.title('Correlation Matrix Plot showing different features and their correlation with each other')
plt.show()

In [None]:
df.corr().Level

# Data Preprocessing

For further preparation of the data for ML, we need to drop some more columns:
1) We have subsumed the accessibility column under level, so we can drop that. Remember that anything that was not wheelchair accessible is classifed as Level 0 <br/>

2) The adaptive swing column is correlated to Level 0 -- Not accessible, and can help pick that out. <br/> 

3) We will only keep the normalized columns for joint, HOH and Single returns and drop the unnormalized ones. <br/>

4) We will keep the total number of returns column <br/> 

5) SInce the total returns is a pretty large number, we will standardize the columns as well <br/>


# Machine Learning modeling

Here we will try to predict the level of playground based on the data available.
For any ML model, we need to provide the label column first. Let's drop some of the fields above and prepare the data for ML training in SageMaker

In [None]:
final_df = df.drop(columns =['Accessible', 'num_of_returns', 
                             'num_of_dependents','num_of_joint_returns', 'num_of_head_of_household_returns',
                             'num_of_single_returns'])

In [None]:
final_df.shape

In [None]:
final_df = pd.get_dummies(final_df)
df1 = final_df.Level
df2 = final_df.drop(columns = 'Level')
final_df = pd.concat([df1, df2], axis = 1)
final_df.dropna(inplace=True)

In [None]:
final_df.shape

In [None]:
X_train, X_test = train_test_split(final_df, test_size = 0.2, random_state = 42)
sns.countplot(X_test.Level)

In [None]:
X_train.head()

In [None]:
X_train.isnull().any()


Upload Training data to S3

In [None]:
role = get_execution_role()
session = sagemaker.Session()
bucket = session.default_bucket() # you can replace with this your processed bucket name <<user-id>>-processed
prefix = 'sagemaker/accessibility'

train_file = 'train.csv'
X_train.to_csv(train_file,index=False,header=False)
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
train_data = sagemaker.session.s3_input('s3://{}/{}/train'.format(bucket, prefix), 
                                        content_type='csv')

In [None]:
container = get_image_uri(boto3.Session().region_name, 'xgboost', '0.90-1')


In [None]:
output_location = 's3://{}/{}/output'.format(bucket, prefix)
print('training artifacts will be uploaded to: {}'.format(output_location))


In [None]:
sess = sagemaker.Session()

xgb = sagemaker.estimator.Estimator(container,
                                       role, 
                                       train_instance_count=1, 
                                       train_instance_type='ml.c4.xlarge',
                                       output_path=output_location,
                                       sagemaker_session=sess)

xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100)

xgb.fit({'train': train_data, 'validation': train_data})

# Using SageMaker Autopilot

As you can see, training an ML model, feature engineering, preprocessing can take quite some time and be a fairly involved process. Let's see how we can use SageMaker Autopilot, which is SageMaker's built in AutoML feature to take the raw dataset, and train a bunch of ML models, and let AutoPilot take care of the preprocessing, feature engineering and model training all within a few simple API calls. 

Lets start by looking at autodf -- we will drop the accessibility column, since we have already incorporated that into Level. Note that to go from autodf to the final training dataset for XGBoost above we had to implement a sequence of feature engineering steps:

1) We converted Borough columns to one hot encoding

2) We dropped all the non normalized columns which are strongly correlated with the feature engineered normalized ones.

3) We had to split the data randomly into training and test datasets which we uploaded separately to S3.

4) We had to save the csv files locally without headers and indexes. 

5) While this is not required for XGBoost, we removed missing values. Linear learner algorithm for example doesn't accept missing values. 

Autopilot takes care of all the feature engineering steps for us. Simply feed in the initial dataframe as an input, save it as csv, point Autopilot to the dependent variable and it will take care of the rest

In [None]:
#look at the autodf dataframe again
autodf.head()


In [None]:
# note that we have missing values in almost every column. We will drop those as AutoML doesn't support missing values
#for now.
autodf.isnull().any()

In [None]:
#Load the autodf into s3 directly
autodf.dropna(inplace=True)
print(autodf.shape)
autodf = autodf.sample(frac=1)

#Let's take out a random sample for testing and set it aside
TRAIN_LENGTH = X_train.shape[0]
test_sample = autodf[TRAIN_LENGTH+1:]
print("Shape of test data = {}".format(test_sample.shape))
test_sample.drop(columns=['Accessible']).to_csv('automl-test.csv', index=False)


autodf[:TRAIN_LENGTH].drop(columns=['Accessible']).to_csv('automl-train.csv', index= False)
autotrainpath = session.upload_data(path="automl-train.csv", key_prefix=prefix + "/input")
print(autotrainpath)

In [None]:
# configure the AutoML job: specify the target attribute name and the location of the input file
input_data_config = [{
      'DataSource': {
        'S3DataSource': {
          'S3DataType': 'S3Prefix',
          'S3Uri': 's3://{}/{}/input'.format(bucket,prefix)
        }
      },
      'TargetAttributeName': 'Level'
    
    }
  ]

output_data_config = {
    'S3OutputPath': 's3://{}/{}/automloutput'.format(bucket,prefix)
  }
print(input_data_config)
print(output_data_config)

In [None]:
from time import gmtime, strftime, sleep
timestamp_suffix = strftime('%d-%H-%M-%S', gmtime())

In [None]:
region = boto3.Session().region_name
sm = boto3.Session().client(service_name='sagemaker',region_name=region)
auto_ml_job_name = 'automl-dm-' + timestamp_suffix
print('AutoMLJobName: ' + auto_ml_job_name)

sm.create_auto_ml_job(AutoMLJobName=auto_ml_job_name,
                      InputDataConfig=input_data_config,
                      OutputDataConfig=output_data_config,
                      RoleArn=role,
                     ProblemType='BinaryClassification',
                     AutoMLJobObjective = {'MetricName':'Accuracy'})

In [None]:
# Kick off the Autopilot job
import time
start = time.time()
print ('JobStatus - Secondary Status')
print('------------------------------')


describe_response = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)
print (describe_response['AutoMLJobStatus'] + " - " + describe_response['AutoMLJobSecondaryStatus'])
job_run_status = describe_response['AutoMLJobStatus']
    
while job_run_status not in ('Failed', 'Completed', 'Stopped'):
    describe_response = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)
    job_run_status = describe_response['AutoMLJobStatus']
    
    print (describe_response['AutoMLJobStatus'] + " - " + describe_response['AutoMLJobSecondaryStatus'])
    sleep(30)
end = time.time()
print("Time Taken for job = {}".format(end - start))

In [None]:
describe_response

In [None]:
# lets look at the best job
best_candidate = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)['BestCandidate']
best_candidate_name = best_candidate['CandidateName']
print(best_candidate)
print('\n')
print("CandidateName: " + best_candidate_name)
print("FinalAutoMLJobObjectiveMetricName: " + best_candidate['FinalAutoMLJobObjectiveMetric']['MetricName'])
print("FinalAutoMLJobObjectiveMetricValue: " + str(best_candidate['FinalAutoMLJobObjectiveMetric']['Value']))


In [None]:
# Let's now use boto3 to deploy the model
model_name = 'automl-' + timestamp_suffix

model = sm.create_model(Containers=best_candidate['InferenceContainers'],
                            ModelName=model_name,
                            ExecutionRoleArn=role)

print('Model ARN corresponding to the best candidate is : {}'.format(model['ModelArn']))

In [None]:
endpoint_config_name = 'automl-endpoint-config' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m4.xlarge',
        'InitialVariantWeight':1,
        'InitialInstanceCount':1,
        'ModelName':model_name,
        'VariantName':'AllTraffic'}])

print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])

## Create the Endpoint for the AutoML job

In [None]:
%%time
import time

endpoint_name = 'autmlEndpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = sm.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

resp = sm.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)

while status=='Creating':
    time.sleep(60)
    resp = sm.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    print("Status: " + status)

print("Arn: " + resp['EndpointArn'])
print("Status: " + status)

## Test the Model predictions

Replace the endpoint name below with your endpoint name from the above cell: it should start with 'autmlEndpoint-yyyy-mm-dd-hh-mm-ss'

In [None]:
from sagemaker.predictor import RealTimePredictor, csv_deserializer, csv_serializer
#endpoint  = 'autmlEndpoint-2019-12-23-17-15-48' #replace with your endpoint name here
autopredictor = RealTimePredictor(endpoint = endpoint_name, serializer=csv_serializer, deserializer =csv_deserializer, 
                                  content_type='text/csv', sagemaker_session = session)

In [None]:
y_test = test_sample.Level
X_pred = test_sample.drop(columns = ['Level', 'Accessible']).reset_index().drop(columns = ['index'])
y_pred = [int(autopredictor.predict(np.array(X_pred.loc[x]))[0][0]) for x in range(len(X_pred))]

In [None]:
from sklearn.metrics import accuracy_score
print("Accuracy from AutoML Job = {}".format(accuracy_score(y_test, y_pred)))

# Model Deployment

Here We explore two modes of model deployment -- batch and as a live inference endpoint. 
First we deploy as a live endpoint and obtain model metrics <br/>
Next we do a batch transform job. For this we need to load the test data into S3 as well <br/>

## Live Model Deployment

In [None]:
xgb_predictor = xgb.deploy(initial_instance_count=1,
                                 instance_type='ml.m4.xlarge')


In [None]:
xgb_predictor.content_type = 'text/csv'
xgb_predictor.serializer = csv_serializer
xgb_predictor.deserializer = json_deserializer

In [None]:
predictions = []
y_test = X_test.Level
X_test_drop = X_test.drop(columns = 'Level')
for i in range(X_test.shape[0]):
    predictions.append(xgb_predictor.predict(np.array(X_test_drop.iloc[i])))
predictions = np.array(np.round(predictions))

In [None]:
from sklearn.metrics import accuracy_score, auc, classification_report
print(classification_report(y_test, predictions))

In [None]:
accuracy_score(y_test,predictions)

Notice that the model where we have normalized the features returns a higher accuracy score, but the AutoML does a really good job without virtually any data preparation!

# Make Inferences directly in SQL using Amazon Athena

Next, we will ingest the training data csv file into Amazon Athena and perform inferences directly against the XGBoost SageMaker endpoint. Here we will show how the process works for the XGBoost endpoint. 

To do so, navigate to SageMaker console and go to Endpoints. Identify the endpoint you just created for XGBoost. It should start with 'sagemaker-xgboost-####'. Copy this to Clipboard.

Next, navigate to the **Readme** associated with this workshop to the section **Inferences using Amazon Athena** and complete the rest of the workshop. 

Don't forget to come back to SageMaker and run the 2 cells below, to delete the endpoints you created to avoid paying for them.

# Inferences Using Amazon Athena

To be completed from Readme. Since this feature is in **Preview** as of 05-01-2020, it can only be completed if you are in "us-east-1".  

## Delete Endpoints (Only do so after completing the Athena portion)

See the Readme in the Github for this workshop for next steps on how to complete invoke the model endpoints from Amazon Athena

In [None]:
xgb_predictor.delete_endpoint()


In [None]:
autopredictor.delete_endpoint()
