This notebook provides the framework for doing the batch transform for hourly call volume predictions using NFORS data. The cells at the beginning of the notebook are all required for running the code, but the end of the notebook contains the code I used for creating the datasets for reference.

In [9]:
#If running in local mode, local to True. Otherwise, set it to false
# local = False
local = True

# Getting the Sagemaker role

In [10]:
import sagemaker
from sagemaker import get_execution_role

# S3 prefix
prefix = 'hourly_call_volume'


if local == True:
    sagemaker_session = sagemaker.LocalSession()
    role = 'arn:aws:iam::445861113736:role/service-role/AmazonSageMaker-ExecutionRole-20190903T114521'
else:
    sagemaker_session = sagemaker.Session()
    role = get_execution_role()

## Create SageMaker Scikit Estimator <a class="anchor" id="create_sklearn_estimator"></a>

To run our Scikit-learn training script on SageMaker, we construct a `sagemaker.sklearn.estimator.sklearn` estimator, which accepts several constructor arguments:

* __entry_point__: The path to the Python script SageMaker runs for training and prediction.
* __role__: Role ARN
* __train_instance_type__ *(optional)*: The type of SageMaker instances for training. __Note__: Because Scikit-learn does not natively support GPU training, Sagemaker Scikit-learn does not currently support training on GPU instance types.
* __sagemaker_session__ *(optional)*: The session used to train on Sagemaker.
* __hyperparameters__ *(optional)*: A dictionary passed to the train function as hyperparameters.

To see the code for the SKLearn Estimator, see here: https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/sklearn

In [13]:
from sagemaker.sklearn.estimator import SKLearn

script_path = 'hourly_call_prediction.py'
sklearn = SKLearn(
    entry_point=script_path,
    train_instance_type="ml.c4.xlarge",
    role=role,
    sagemaker_session=sagemaker_session,
    hyperparameters={'n_estimators': 1000})

## Train SKLearn Estimator on call volume data

In [15]:
#The data should already be saved to the ./data directory
WORK_DIRECTORY = 'data'
train_input = sagemaker_session.upload_data(WORK_DIRECTORY, key_prefix="{}/{}".format(prefix, WORK_DIRECTORY) )

#Training the model
sklearn.fit({'train': train_input})

Creating tmpr8a_8veh_algo-1-4phej_1 ... 
[1BAttaching to tmpr8a_8veh_algo-1-4phej_12mdone[0m
[36malgo-1-4phej_1  |[0m 2020-01-22 16:54:19,875 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training
[36malgo-1-4phej_1  |[0m 2020-01-22 16:54:19,880 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)
[36malgo-1-4phej_1  |[0m 2020-01-22 16:54:19,900 sagemaker_sklearn_container.training INFO     Invoking user training script.
[36malgo-1-4phej_1  |[0m 2020-01-22 16:54:20,225 sagemaker-containers INFO     Module hourly_call_prediction does not provide a setup.py. 
[36malgo-1-4phej_1  |[0m Generating setup.py
[36malgo-1-4phej_1  |[0m 2020-01-22 16:54:20,225 sagemaker-containers INFO     Generating setup.cfg
[36malgo-1-4phej_1  |[0m 2020-01-22 16:54:20,225 sagemaker-containers INFO     Generating MANIFEST.in
[36malgo-1-4phej_1  |[0m 2020-01-22 16:54:20,226 sagemaker-containers INFO     Installing module with the following 

[36mtmpr8a_8veh_algo-1-4phej_1 exited with code 0
[0mAborting on container exit...
===== Job Complete =====


## Batch Transform <a class="anchor" id="batch_transform"></a>
We can also use the trained model for asynchronous batch inference on S3 data using SageMaker Batch Transform.

In [16]:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn.transformer(instance_count=1, instance_type='ml.m4.xlarge')

### Run Transform Job <a class="anchor" id="run_transform_job"></a>
Using the Transformer, run a transform job on the S3 input data.

In [34]:
#Again, the test data should already be saved to the prediction_data directory
WORK_DIRECTORY = 'prediction_data'
batch_input_s3 = sagemaker_session.upload_data(WORK_DIRECTORY, key_prefix="{}/{}".format(prefix, WORK_DIRECTORY) )

# Start a transform job and wait for it to finish
transformer.transform(batch_input_s3, content_type='application/json')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()

Attaching to tmpf2v8y628_algo-1-6lmtb_1
[36malgo-1-6lmtb_1  |[0m Processing /opt/ml/code
[36malgo-1-6lmtb_1  |[0m Building wheels for collected packages: hourly-call-prediction
[36malgo-1-6lmtb_1  |[0m   Building wheel for hourly-call-prediction (setup.py) ... [?25ldone
[36malgo-1-6lmtb_1  |[0m [?25h  Created wheel for hourly-call-prediction: filename=hourly_call_prediction-1.0.0-py2.py3-none-any.whl size=7376 sha256=1c97c7c5221e67b131dfe666c1dc0b718835d17cb37ebf6628bbe35cdd4e339c
[36malgo-1-6lmtb_1  |[0m   Stored in directory: /tmp/pip-ephem-wheel-cache-olfsomla/wheels/35/24/16/37574d11bf9bde50616c67372a334f94fa8356bc7164af8ca3
[36malgo-1-6lmtb_1  |[0m Successfully built hourly-call-prediction
[36malgo-1-6lmtb_1  |[0m Installing collected packages: hourly-call-prediction
[36malgo-1-6lmtb_1  |[0m Successfully installed hourly-call-prediction-1.0.0
[36malgo-1-6lmtb_1  |[0m   import imp
[36malgo-1-6lmtb_1  |[0m [2020-01-22 17:27:05 +0000] [217] [INFO] Starting gunic

[36malgo-1-6lmtb_1  |[0m [2020-01-22 17:27:08 +0000] [4146] [INFO] Booting worker with pid: 4146
[36malgo-1-6lmtb_1  |[0m [2020-01-22 17:27:08 +0000] [4301] [INFO] Booting worker with pid: 4301
[36malgo-1-6lmtb_1  |[0m   import imp
[36malgo-1-6lmtb_1  |[0m 172.18.0.1 - - [22/Jan/2020:17:27:09 +0000] "GET /execution-parameters HTTP/1.1" 404 232 "-" "-"
[36malgo-1-6lmtb_1  |[0m 2020-01-22 17:27:09,775 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)
[36malgo-1-6lmtb_1  |[0m   import imp
[36malgo-1-6lmtb_1  |[0m 172.18.0.1 - - [22/Jan/2020:17:27:10 +0000] "POST /invocations HTTP/1.1" 200 6663 "-" "-"
Gracefully stopping... (press Ctrl+C again to force)
Waiting for transform job: sagemaker-scikit-learn-2020-01-22-17-26-59-165
.

In [38]:
# Download the output data from S3 to local filesystem
batch_output = transformer.output_path
!mkdir -p batch_data/output
!aws s3 cp --recursive $batch_output/ batch_data/output/
# Head to see what the batch output looks like
!head batch_data/output/*

head: cannot open 'batch_data/output/*' for reading: No such file or directory


In [36]:
transformer.output_path

's3://sagemaker-us-east-2-445861113736/sagemaker-scikit-learn-2020-01-22-17-26-59-165'

In [39]:
dir(transformer)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_current_job_name',
 '_ensure_last_transform_job',
 '_prepare_init_params_from_job_description',
 '_reset_output_path',
 '_retrieve_base_name',
 '_retrieve_image_name',
 'accept',
 'assemble_with',
 'attach',
 'base_transform_job_name',
 'delete_model',
 'env',
 'instance_count',
 'instance_type',
 'latest_transform_job',
 'max_concurrent_transforms',
 'max_payload',
 'model_name',
 'output_kms_key',
 'output_path',
 'sagemaker_session',
 'stop_transform_job',
 'strategy',
 'tags',
 'transform',
 'volume_kms_key',
 'wait']

In [33]:
data = json.loads('./prediction_data/test_data.json')
df = pd.io.json.json_normalize(data['prediction_data'])
features = pd.get_dummies(df)

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

### Check Output Data  <a class="anchor" id="check_output_data"></a>
After the transform job has completed, download the output data from S3. For each file "f" in the input data, we have a corresponding file "f.out" containing the predicted labels from each input row. We can compare the predicted labels to the true labels saved earlier.

In [32]:
# Download the output data from S3 to local filesystem
batch_output = transformer.output_path
!mkdir -p batch_data/output
!aws s3 cp --recursive $batch_output/ batch_data/output/
# Head to see what the batch output looks like
!head batch_data/output/*

[14.262, 13.651, 13.776, 64.899, 32.359, 80.751, 119.24432222222221, 67.791, 14.291, 9.6, 77.111, 11.977, 71.26, 17.555, 76.62, 71.955, 65.988, 69.633, 72.4, 67.801, 16.868, 15.341, 14.491, 68.154, 73.672, 13.88, 11.946, 94.801953968254, 30.46, 76.214, 12.099, 32.963, 111.37256608946599, 75.409, 13.584, 10.49, 13.494, 14.924, 20.175, 13.632, 11.323, 79.148, 13.037, 13.741, 71.156, 12.984, 13.376, 71.529, 13.335, 14.749, 63.824, 73.622, 13.242, 10.425, 14.079, 70.725, 35.103, 14.916, 68.318, 16.027, 13.933, 13.86, 15.153, 61.511, 12.992, 69.234, 69.987, 13.4, 65.712, 11.144, 73.905, 63.855, 78.564, 69.216, 10.006, 12.519, 34.603, 65.763, 78.152, 13.837, 87.715, 13.497, 69.304, 68.77, 13.061, 69.625, 11.177, 9.125, 73.762, 69.42, 68.098, 70.547, 66.93, 16.691, 69.59, 16.193, 12.927, 73.688, 65.495, 72.673, 74.601, 66.103, 78.257, 14.602, 77.926, 12.563, 11.194, 73.189, 13.907, 64.898, 78.776, 86.40980952380953, 74.366, 33.019, 9.718, 9.49, 75.839, 12.261, 14.203, 77.327, 13.315, 14.497, 

In [None]:
# Pulling the predictions, comparing to the observed number of calls

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
output_loc = './batch_data/output/test_data.json.out'

with open (output_loc, "r") as myfile:
    data=myfile.readlines()
    
string_output = data[0].replace('[','').replace(']','')

predicted = np.genfromtxt(StringIO(string_output),delimiter=',')
actual = np.genfromtxt('observed',delimiter=',')[1:,1]
plt.scatter(predicted,actual,alpha=0.3)
plt.xlabel('predicted')
plt.ylabel('actual')

# Creating the training data and uploading to s3
This cell only works locally because it depends on elasticsearch.


In [None]:
#Importing libraries required for performing the query
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from elasticsearch_dsl import Q
import pandas as pd
import json

#Setting up the query
es = Elasticsearch()
s = Search(using=es,index='*-fire-incident-*')
s = s.source(['description.event_opened',
                     'description.day_of_week',
                    'NFPA.type',
                     'fire_department.firecares_id'])


#Performing the query
q = Q("match",fire_department__firecares_id =  '79592') | Q("match",fire_department__firecares_id =  '93345')
results = s.query(q)

#Converting query results to a pandas dataframe
df = pd.DataFrame((d.to_dict() for d in tqdm_notebook(results.scan())))
json_struct = json.loads(df.to_json(orient="records"))

df = pd.io.json.json_normalize(json_struct)

#Converting date
df['date'] = df['description.event_opened'].apply(lambda x: x[:10])
df['month'] = df.apply(lambda x: x['date'][5:7], axis=1)
df['hour'] = df['description.event_opened'].apply(lambda x: x[11:13])


#Converting df dates to datetime objects
df['date'] = df.apply(lambda x: datetime.datetime.strptime(x['date'],'%Y-%m-%d'),axis=1)
# df['date'] = df.apply(lambda x: datetime.datetime.strptime(x['date'],'%Y-%m-%d'),axis=1)

#It's convenient to serialize (pickle) the dataframe because it's faster to load it rather than re-create it.
df.to_pickle('query_results')

#Hourly is a dataframe aggregated grouped by the day, hour, and department
hourly = df[['fire_department.firecares_id', 'date','description.day_of_week', 'hour']].groupby(['fire_department.firecares_id', 'date','description.day_of_week', 'hour']).aggregate(len).reset_index()
hourly = hourly.rename(columns={0: 'calls'})

#Formatting the hourly dataframe into a json
jsondata = {}
jsondata['model_name'] = 'calls_by_hour'
jsondata['model_version'] = 1.0
jsondata['prediction_data'] = hourly.drop('date',axis=1).to_dict(orient='records')

#Saving the json to the data directory
with open('./data/training_data.json', 'w') as outfile:
    json.dump(jsondata, outfile)
    


# Creating the test dataset
This involves generating a dataframe with every possible combination of department, day of week, and hour of the day. The size of this dataset is 7x24xn, where n is the number of departments included for predictions.

Also, this cell requires that the training set has already been created.

In [30]:
#Make every combination of departments, days of week, and hour
from itertools import product
import json
import pandas as pd



#load the training dataset
with open('./data/training_data.json') as data_file:
    data = json.load(data_file)
hourly = pd.io.json.json_normalize(data['prediction_data'])

#Getting the list of ever department that shows up in the hourly dataframe
dep_list = hourly['fire_department.firecares_id'].unique()
days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
hours = [str(i) for i in range(24)]

#Creating the dataframe of all possible combinations
test_df = pd.DataFrame(list(product(dep_list, days, hours)), columns=['fire_department.firecares_id', 'description.day_of_week', 'hour'])

#Formatting it as a json
jsondata = {}
jsondata['model_name'] = 'calls_by_hour'
jsondata['model_version'] = 1.0
jsondata['prediction_data'] = test_df.to_dict(orient='records')

#Saving it locally
with open('./prediction_data/test_data.json', 'w') as outfile:
    json.dump(jsondata, outfile)
    
    
