# Implementation with Step Functions

Now that we have a satisfying machine learning model, we want to implement this as part of a process that runs every day at 3AM in the morning based on the latest transactional information that we have dumped on S3 to create forecasts for each reseller.


## 1. Upload data to S3

First you need to create a bucket for this experiment. Upload the data from the following public location to your own S3 bucket. To facilitate the work of the crawler use two different prefixs (folders): one for the billing information and one for reseller. 

### Download the data

In [1]:
# your bucket name
your_bucket = 'blackb-mggaska-implementation'

In [7]:
!wget https://ml-lab-mggaska.s3.amazonaws.com/sales-forecast/billing_sm.csv
!wget https://ml-lab-mggaska.s3.amazonaws.com/sales-forecast/reseller_sm.csv
!wget https://ml-lab-mggaska.s3.amazonaws.com/sales-forecast/awswrangler-0.0b2-py3.6.egg

--2019-07-02 16:35:59--  https://ml-lab-mggaska.s3.amazonaws.com/sales-forecast/billing_sm.csv
Resolving ml-lab-mggaska.s3.amazonaws.com (ml-lab-mggaska.s3.amazonaws.com)... 52.216.20.203
Connecting to ml-lab-mggaska.s3.amazonaws.com (ml-lab-mggaska.s3.amazonaws.com)|52.216.20.203|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15803443 (15M) [binary/octet-stream]
Saving to: ‘billing_sm.csv.1’


2019-07-02 16:35:59 (99.2 MB/s) - ‘billing_sm.csv.1’ saved [15803443/15803443]

--2019-07-02 16:35:59--  https://ml-lab-mggaska.s3.amazonaws.com/sales-forecast/reseller_sm.csv
Resolving ml-lab-mggaska.s3.amazonaws.com (ml-lab-mggaska.s3.amazonaws.com)... 52.216.20.203
Connecting to ml-lab-mggaska.s3.amazonaws.com (ml-lab-mggaska.s3.amazonaws.com)|52.216.20.203|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 210111 (205K) [binary/octet-stream]
Saving to: ‘reseller_sm.csv.1’


2019-07-02 16:35:59 (32.0 MB/s) - ‘reseller_sm.csv.1’ saved [210111/2

Now we upload to an S3 location

In [8]:
import boto3, os
boto3.Session().resource('s3').Bucket(your_bucket).Object(os.path.join('billing', 'billing_sm.csv')).upload_file('billing_sm.csv')
boto3.Session().resource('s3').Bucket(your_bucket).Object(os.path.join('reseller', 'reseller_sm.csv')).upload_file('reseller_sm.csv')
boto3.Session().resource('s3').Bucket(your_bucket).Object(os.path.join('python', 'awswrangler-0.0b2-py3.6.egg')).upload_file('awswrangler-0.0b2-py3.6.egg')


## 2. Create Crawler

To use this csv information in the context of a Glue ETL, first we have to create a Glue crawler pointing to the location of each file. The crawler will try to figure out the data types of each column. The safest way to do this process is to create one crawler for each table pointing to a different location.

1. Go to the AWS Console.
2. Select under Services AWS Glue.
3. Under crawlers Add Crawler and two crawlers: create one pointing to each S3 location (one to billing and one to reseller)

    3.1 Name: Billing, Data Store, Specific Path in my Account, Navigate to your bucket and your folder Billing, create an IAM role billing-crawler-role, add database implementationdb, Next, Finish
    
    3.2 After the crawler is created select Run it now.
    
    3.3 Name: Reseller, Data Store, Specific Path in my Account, Navigate to your bucket and your folder Reseller, create an IAM role reseller-crawler-role, select database implementationdb, Next, Finish
    
    3.4 After the crawler is created select Run it now.

After both crawlers run you should see one table is been adeed for each. You can use Athena to inspect the tables and double check the data is been added properly.

## 3. Create Glue Job

Now we are going to create a GLUE ETL job in python 3.6. In this job we can combine both the ETL from Notebook #2 and the Preprocessing Pipeline from Notebook #4.

Note that instead of reading from a csv file we are going to use Athena to read from the resulting tables of the Glue Crawler. 

Glue is a "serverless" service so the processing power assigned to the process is meassured in DPUs. Each DPU is equivalent to 16GB of RAM and 4vCPU. 

1. Open the AWS Console
2. Under Services go to AWS Glue
3. Uner Jobs, add new job
4. Name: etlandpipeline, Role: Glueadmin, Type Python Shell, Python3, Select A New Script Authored By you,
Under Security Configuration...  Select Python library path and browse to the location where you have the egg of the aws wrangler Library, Under Maximum Capacity write 1. Then hit "Save Job and Edit Script"

In the Script tab copy and paste the following script adapted to Glue from the previous notebooks. 
<b> ALWAYS REMEMBER TO ADAPT YOUR ROLE ARN AND BUCKET </b>

In [9]:
! cat etlandpipeline.py

import pandas as pd
import numpy as np
import datetime 
import pandas as pd
from datetime import date
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder
import boto3
import pickle
import io
from io import StringIO
import awswrangler


df_r = awswrangler.athena.read( "implementationdb", "select * from reseller" )
df = awswrangler.athena.read( "implementationdb", "select * from billing" )
bucket = 'blackb-mggaska-implementation'
df['date'] = pd.to_datetime(df['date'])


print('dataframe',df.shape)
print('dataframer',df_r.shape)

#---FUNCTIONS-------------------------------

def write_dataframe_to_csv_on_s3(dataframe, bucket, filename):
    """ Write a dataframe to a CSV on S3 """
    # Create buffer
    csv_buffer = StringIO()
    # Write dataframe to buffer
    dataframe.to_csv(csv_buffer, sep=",", header=None,index=None)
    # Create S3 object
    s3_resource = boto3.resource("s3") 
    # W

## 4. Orchestration with Lambda, SageMaker, CloudWatch and Step Functions


Now it's time to create all the necesary steps to schedule the training, deployment and inference with the model. 
For this, we are going to use an architecture similar to the <a href='https://github.com/aws-samples/serverless-sagemaker-orchestration'>serverless sagemaker orchestration</a> but adapted to our specific problem.

First we need to create lambda functions capables of:

    1.training the model
    2.awaiting for training
    3.deploying the model
    4.awaiting for deploy
    5.predict, save predictions and delete endpoint




### 4.1 Create a role with SageMaker and S3 access

To execute this lambdas we are going to need a role SageMaker and S3 permissions.

Go to the AWS console and create a role with AmazonS3FullAccess and AmazonSageMakerFullAccess policies.

### 4.2 Train Model Lambda

    1. Go to the AWS Console and under Services, select Lambda
    2. Go to the Functions Pane and select Create Function
    3. Author from scratch
    4. Name it lambdaModelTrain, choose runtime Python 3.6 and as executing role, select the role you created in the previous step


This lambda function doesn't need to receive any parameters, but it should return the resulting hyperparameter tunning optimization job name, that we will use in the next lambda function to check status, the container it used and that the status is now In Progress...


In [12]:
! cat lambdafunctions/lambdaModelTrain.py

import json
import boto3
import copy
from time import gmtime, strftime


region = boto3.Session().region_name    
smclient = boto3.Session().client('sagemaker')
role = 'arn:aws:iam::452432741922:role/service-role/AmazonSageMaker-ExecutionRole-20181022T121720'


bucket_path='s3://blackb-mggaska-implementation'
prefix = "invoice-forecast"

container = '811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest'


def lambda_handler(event, context):
        
    tuning_job_config = {
    "ParameterRanges": {
      "CategoricalParameterRanges": [],
      "ContinuousParameterRanges": [
        {
          "MaxValue": "1",
          "MinValue": "0",
          "Name": "eta"
        },
        {
          "MaxValue": "2",
          "MinValue": "0",
          "Name": "alpha"
        },
        {
          "MaxValue": "10",
          "MinValue": "1",
          "Name": "min_child_weight"
        }
      ],
      "IntegerParameterRanges": [
        {
     

#### Test the lambda function

Create a test event with no parameters save and test. In your AWS console under SageMaker>Hyperparamter tunning jobs you should see the HPO runnning.

### 4.3 Await Train Model Lambda

    1. Go to the AWS Console and under Services, select Lambda
    2. Go to the Functions Pane and select Create Function
    3. Author from scratch
    4. Name it lambdaModelAwait, choose runtime Python 3.6 and as executing role, select the role you created in the previous step
    
    
This lambda function, now receives the output of the previous step and allows us to check if the process is done or not. If it's done, it returns the name of the best training job.

Previous response (we can use it in the test event):
        
        {
          "container": "811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest",
          "stage": "Training",
          "status": "InProgress",
          "name": "invoice-forecast20190702190151"
        }
        
In the code editor paste the following code:

In [16]:
! cat lambdafunctions/lambdaModelAwait.py

import boto3
import os

sagemaker = boto3.client('sagemaker')


def lambda_handler(event, context):
    stage = event['stage']
    if stage == 'Training':
        name = event['name']
        training_details = describe_training_job(name)
        print(training_details)
        status = training_details['HyperParameterTuningJobStatus']
        if status == 'Completed':
            s3_output_path = training_details['TrainingJobDefinition']['OutputDataConfig']['S3OutputPath']
            model_data_url = os.path.join(s3_output_path, training_details['BestTrainingJob']['TrainingJobName'], 'output/model.tar.gz')
            event['message'] = 'HPO tunning job "{}" complete. Model data uploaded to "{}"'.format(name, model_data_url)
            event['model_data_url'] = model_data_url
            event['best_training_job'] = training_details['BestTrainingJob']['TrainingJobName']
        elif status == 'Failed':
            failure_reason = training_details['FailureReason'

First, you will see a response like

    Response:
    {
      "container": "811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest",
      "stage": "Training",
      "status": "InProgress",
      "name": "invoice-forecast20190702190151"
    }

Once the training is completed, the state is going to change and you'll see the new status and the name of the best training job.

    {
      "container": "811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest",
      "stage": "Training",
      "status": "Completed",
      "name": "invoice-forecast20190702190151",
      "message": "HPO tunning job \"invoice-forecast20190702190151\" complete. Model data uploaded to \"s3://blackb-mggaska-implementation/invoice-forecast/xgboost/invoice-forecast20190702190151-005-dde9844e/output/model.tar.gz\"",
      "model_data_url": "s3://blackb-mggaska-implementation/invoice-forecast/xgboost/invoice-forecast20190702190151-005-dde9844e/output/model.tar.gz",
      "best_training_job": "invoice-forecast20190702190151-005-dde9844e"
    }

### 4.4 Deploy Model In SageMaker Lambda

In this lambda function, we are going to need to use the best training job from the previous step to deploy a predictor. 



In [17]:
! cat lambdafunctions/lambdaModelDeploy.py

import boto3
import os

sagemaker = boto3.client('sagemaker')
EXECUTION_ROLE = 'arn:aws:iam::452432741922:role/service-role/AmazonSageMaker-ExecutionRole-20181022T121720'
INSTANCE_TYPE = 'ml.m4.xlarge'
container = '811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest'

def lambda_handler(event, context):
    best_training_job = event['best_training_job']
    endpoint = 'demobb-invoice-prediction'
    model_data_url = event['model_data_url']
    print('Creating model resource from training artifact...')
    create_model(best_training_job, container, model_data_url)
    print('Creating endpoint configuration...')
    create_endpoint_config(best_training_job)
    print('There is no existing endpoint for this model. Creating new model endpoint...')
    create_endpoint(endpoint, best_training_job)
    event['stage'] = 'Deployment'
    event['status'] = 'Creating'
    event['message'] = 'Started deploying model "{}" to endpoint "{}"'.format(best_training_job, endpo

On your SageMaker console you should see an endpoint with status creating. Once you test the output it should look like this:

    {
      "container": "811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest",
      "stage": "Deployment",
      "status": "Creating",
      "name": "invoice-forecast20190702190151",
      "message": "Started deploying model \"invoice-forecast20190702190151-005-dde9844e\" to endpoint \"demobb-invoice-prediction\"",
      "model_data_url": "s3://blackb-mggaska-implementation/invoice-forecast/xgboost/invoice-forecast20190702190151-005-dde9844e/output/model.tar.gz",
      "best_training_job": "invoice-forecast20190702190151-005-dde9844e"
    }

### This is a good chance to test the Await function this time with Deployment stage.

If you create a new test event on the lambdaAwaitModel you should see a response like this:

    Response:
    {
      "container": "811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest",
      "stage": "Deployment",
      "status": "Creating",
      "name": "invoice-forecast20190702190151",
      "message": "Started deploying model \"invoice-forecast20190702190151-005-dde9844e\" to endpoint \"demobb-invoice-prediction\"",
      "model_data_url": "s3://blackb-mggaska-implementation/invoice-forecast/xgboost/invoice-forecast20190702190151-005-dde9844e/output/model.tar.gz",
      "best_training_job": "invoice-forecast20190702190151-005-dde9844e"
    }

After the model is in service you should see something like this:

    {
      "container": "811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest",
      "stage": "Deployment",
      "status": "InService",
      "name": "invoice-forecast20190702190151",
      "message": "Deployment completed for endpoint \"demobb-invoice-prediction\".",
      "model_data_url": "s3://blackb-mggaska-implementation/invoice-forecast/xgboost/invoice-forecast20190702190151-005-dde9844e/output/model.tar.gz",
      "best_training_job": "invoice-forecast20190702190151-005-dde9844e"
    }


### 4.5 Use model to predict

Now we are going to use the deployed model to predict. This last lambda function doesn't take any parameters. But in this case we need to touch the default parameters of the lambda to configure Max Memory in 1024 MB and Timeout in 15 Mins. 



In [22]:
! cat lambdafunctions/lambdaModelPredict.py

import os
import io
import boto3
import json
import csv
from io import StringIO

# grab static variables
ENDPOINT_NAME = 'demobb-invoice-prediction'
runtime= boto3.client('runtime.sagemaker')
bucket = 'blackb-mggaska-implementation'
s3 = boto3.client('s3')
bucket ='blackb-mggaska-implementation'
key = 'to_predict.csv'
def lambda_handler(event, context):
    response = s3.get_object(Bucket=bucket, Key=key)
    content = response['Body'].read().decode('utf-8')
    results = []
    for line in  content.splitlines():
        response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME,
                                               ContentType='text/csv',
                                               Body=line)
        result = json.loads(response['Body'].read().decode())
        results.append(result)
        i = 0
    multiLine = ""
    for item in results:
        if (i > 0):
            multiLine = multiLine + '\n'
        multiLine = multiLine + str(item