# Sagemaker Model training workflow with AWS Glue Databrew reciepe and AWS Step Functions.

1. [Introduction](#Introduction)
1. [Setup](#Setup)
1. [Create Resources](#Create-Resources)
1. [Build a Machine Learning Workflow](#Build-a-Machine-Learning-Workflow)
1. [Run the Workflow](#Run-the-Workflow)
1. [Clean Up](#Clean-Up)

## Introduction

As with growing data and large scale adoption of machine learning solutions, cleansing and visualizing data for model training has become key task in the ML workflow. As data scientist and engineers look for more insights from data they also want to reduce time to derive new insights and look for data profiling and transformation capabilities from visual preparation tool.
As part of this blog post, we walk through a solution where we create ML workflow using AWS step functions within Sagemaker notebook and use DataBrew for visual data preparation step and run DataBrew recipe jobs as part of ML workflow.

This notebook is part SageMaker model training with DataBrew recipe job blog post.

## Use case overview

For our use case, we use direct marketing campaigns public dataset. The marketing campaigns were based on phone calls. This dataset has few biographic and economic status about campaign contacts and their final decision to access product.
The classification goal is to predict if the client will subscribe a term deposit (variable y).
The dataset we use is publicly available and it is attributed by S. Moro, P. Cortez and P. Rita to the University Of California Irvine Repository Of Machine Learning Datasets.

https://archive.ics.uci.edu/ml/datasets/Bank+Marketing

For our use case, this campaign CSV file is maintained by your organization’s Marketing team, which uploads CSV file to Amazon Simple Storage Service (Amazon S3). We then create series of data preparation steps using AWS DataBrew and create product subscription model using ML workflow using Sagemaker & AWS step functions SDK.

* [AWS Glue Databrew](https://aws.amazon.com/glue/features/databrew/)
* [AWS Glue Databrew Deverloper Guide](https://docs.aws.amazon.com/databrew/latest/dg/what-is.html)
* [AWS Step Functions](https://aws.amazon.com/step-functions/)
* [AWS Step Functions Developer Guide](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io)


## Visual Workflow 

![title](img/step_function_workflow.png)

## DataBrew Plugin Setup (Optional)

First, we'll install AWS Glue databrew plugin with Sagemaker and load all the required modules, adding this plugin will help you to work with familiar AWS Glue Databrew console within Sagemaker. 
Then we'll create fine-grained IAM roles for the AWS Glue databrew,Lambda and Step Functions resources that we will create. The IAM roles grant the services permissions within your AWS environment.

## AWS Glue Databrew Plugin Installation

Make sure to add the correct permissions to the role your SageMaker Studio runs with. A good place to start is by:

1. Adding the *AwsGlueDataBrewFullAccessPolicy* managed policy to your SageMaker execution role, and;

2. Creating *AwsGlueDataBrewSpecificS3BucketPolicy* customer managed policy with s3:GetObject and s3:PutObject permissions for the bucket/s in which you would like to operate and adding this policy to your SageMaker execution role. Refer to IAM policy for S3 documentation for more details.

Detailed steps for plugin installation is documented in below page, please use sagemaker system terminal and follow below steps.

* [SageMaker Plugin installation](https://github.com/aws/aws-glue-databrew-jupyter-extension/blob/main/SageMaker-Installation-Instructions.md#installing-the-plugin)

![title](img/sagemaker_databrew.png)

### Adding Databrew receipe steps

After creating new DataBrew project and importing dataset, please proceed with below transformations.

### Categorical data mapping

Ordinal categorical values are ordered or hierarchical like Education level or economic status. These can be labeled as 1, 2 and 3 numerical format which represent lowest to highest ordering. DataBrew has an easier way to handle such variables by using Categorical mapping. It can be accessed from the toolbar under Mapping.
Categorical mapping also used for custom one to one mapping like “Yes” / “No” to 1/0 values. Here is an example mapping for the output variable y which gets converted to numeric values 1/0 using categorical mapping function.

![title](img/databrew_receipe_cat_mapping.png)

### One hot encoding

For all non-ordinal categorical values like marital status or education, One-hot encoding is the most common way to convert them to numerical format. It can be accessed from column actions next to the column name or from the DataBrew project toolbar under Encoding. 

![title](img/databrew_receipe_one_hot_encoding.png)

### Binning

Binning is a data pre-processing technique used to reduce the effects of minor observation errors and the binning transformation allows you to group numbers of more or less continuous values into a smaller number of "bins". For example, in the current data set we have ages of people which we can group into a smaller number of age intervals.

You can access the binning from scale menu on the toolbar.


![title](img/databrew_receipe_binning.png)

### Binarization

Binarization is the process of dividing data into two groups and assigning one out of two values to all the members of the same group. You can use the Binarize transformation by defining a threshold t and assigning the value 0 to all the data points below the threshold and 1 to those above it. 
In our marketing dataset the duration column specifies the last contact duration, in seconds (numeric). This metric is highly useful to predict outcome as usually the longer the conversation the customer is interested towards the offering. Adding a binarized long call metric based on call duration will be helpful in our prediction model.
Adding 5 min (300 sec) threshold to add new metric long call.


![title](img/databrew_receipe_binarization.png)

### Add a Flag column

Another useful recipe step is to add a column with transformation applied on a column, we will add a flag column for those who has house but no loan to give better weightage.

![title](img/databrew_receipe_flag_column.png)

### Delete unused columns

Finally drop unused columns before writing final output file for model creation. In the next step we will use Sagemaker inbuilt model XGBoost algorithm for model training which require the output variable to be the first column in your dataset.

![title](img/databrew_receipe_drop_columns_and_move_to_first.PNG)

After saving above recipe please create new DataBrew recipe job and use it in ETL step below.

## Sagemaker Permission Setup

Before beginning this tutorial, make sure you have the required permissions to create the resources required as part of the solution.
You have to setup below roles / permission to implement this ML workflow.

Add following permission policy to your Sagemaker Studio execution role.
1.	AWSStepFunctionsFullAccess
2.	AWS Lambda function (Write for Access level)
3.	AWSGlueDataBrewFullAccess

Create new role for Step function execution with related permission for orchestrating DataBrew Jobs, Sagemaker training & Lambda function. 	
1.	AmazonSageMaker-StepFunctionsWorkflowExecutionRole
2.	Add Iam:PassRole permission to Sagemaker service in policy definition.

AWS Glue DataBrew role to access data stored in S3 and create DataBrew recipe jobs.

AWS Lambda role to query Sagemaker training status.


To make it easier for you to get started, we created an AWS CloudFormation template that helps configure Sagemaker notebook role with the required policies for Step function Orchestration and new role for DataBrew job creation and AWS Lambda execution. The source code for the CloudFormation template are available in the GitHub repo.


You have to pass below parameters to the template to pre-configure the roles to allow fine grained resource access.

SageMakerExecutionRole  	- Copy the role name from Sagemaker studio

DatabrewGlueJobName	        - Preferred name for your DataBrew recipe job

S3Bucket			        - Source data S3 bucket name.


### Import the Required Modules

In [None]:
# verify latest version of stepfunctions.
import sys

# verify step function version
!pip show stepfunctions

# clone the repo and install SDK version > 2.2.0 required for databrew integration 
# https://github.com/aws/aws-step-functions-data-science-sdk-python/pull/151
!git clone https://github.com/aws/aws-step-functions-data-science-sdk-python.git /tmp/aws-step-functions-data-science-sdk-python
!pip install /tmp/aws-step-functions-data-science-sdk-python/.

In [None]:
import uuid
import logging
import stepfunctions
import boto3
import sagemaker

from sagemaker.amazon.amazon_estimator import image_uris
from sagemaker.inputs import TrainingInput
from sagemaker.s3 import S3Uploader
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

region = boto3.Session().region_name
bucket = session.default_bucket()
id = uuid.uuid4().hex


#Create a unique name for the AWS Glue DataBrew recipe & Glue etl job to be created. If you change the 
#default name, you may need to change the Step Functions execution role.
recipe_job_name = 'job-marketing-research-recipe'
etl_job_name = 'job-marketing-research-etl-{}'.format(id)

#Create a unique name for the AWS Lambda function to be created. If you change
#the default name, you may need to change the Step Functions execution role.
function_name = 'sage_training_query_status-{}'.format(id)

### Configure Execution Roles

In [None]:
# paste the AmazonSageMaker-StepFunctionsWorkflowExecutionRole ARN (please refer permission setup section)
workflow_execution_role = ''

# SageMaker Execution Role
# You can use sagemaker.get_execution_role() if running inside sagemaker's notebook instance
sagemaker_execution_role = sagemaker.get_execution_role() #Replace with ARN if not in an AWS SageMaker notebook

In [None]:
sagemaker_execution_role

In [None]:
# paste the query_training_status_role role ARN (refer prerequisites section)
lambda_role = ''

# paste the glue etl role this will help glue job to write to S3.
glue_role = ''

In [None]:
session = sagemaker.Session()
bucket = session.default_bucket()
print(bucket)
project_name = 'databrew_blog'

### Source DataSet Location

Copy the train dataset to S3 location for DataBrew transformation and to train the processed data.

In [None]:
data_source = S3Uploader.upload(local_path='./data/bank-additional.csv',
                               desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
                               sagemaker_session=session)
recipe_prefix = 'recipe'
train_prefix = 'train'
val_prefix = 'validation'

recipe_data = 's3://{}/{}/{}/'.format(bucket, project_name, recipe_prefix)
train_data = 's3://{}/{}/{}/'.format(bucket, project_name, train_prefix)
validation_data = 's3://{}/{}/{}/'.format(bucket, project_name, val_prefix)

### Create the AWS Glue Job

In [None]:
glue_script_location = S3Uploader.upload(local_path='./code/glue_etl.py',
                               desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
                               sagemaker_session=session)
glue_client = boto3.client('glue')

response = glue_client.create_job(
    Name=etl_job_name,
    Description='PySpark job to split data in to training and validation data sets and remove header',
    Role=glue_role, # you can pass your existing AWS Glue role here if you have used Glue before
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': glue_script_location,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--job-language': 'python'
    },
    GlueVersion='2.0',
    WorkerType='Standard',
    NumberOfWorkers=2,
    Timeout=60
)

### Create the AWS Lambda Function

In [None]:
import zipfile
zip_name = 'lambda_training_job_status.zip'
lambda_source_code = './code/lambda_training_job_status.py'

zf = zipfile.ZipFile(zip_name, mode='w')
zf.write(lambda_source_code, arcname=lambda_source_code.split('/')[-1])
zf.close()


S3Uploader.upload(local_path=zip_name, 
                  desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
                  sagemaker_session=session)

In [None]:
lambda_client = boto3.client('lambda')

response = lambda_client.create_function(
    FunctionName=function_name,
    Runtime='python3.7',
    Role=lambda_role,
    Handler='lambda_training_job_status.lambda_handler',
    Code={
        'S3Bucket': bucket,
        'S3Key': '{}/{}'.format(project_name, zip_name)
    },
    Description='Queries a SageMaker training job and return the results.',
    Timeout=15,
    MemorySize=128
)

### Configure the AWS SageMaker Estimator

In [None]:
container = sagemaker.image_uris.retrieve('xgboost', region, '1.2-1')

xgb = sagemaker.estimator.Estimator(container,
                                    sagemaker_execution_role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/output'.format(bucket, project_name))

xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        objective='binary:logistic',
                        eval_metric='error',
                        num_round=100)


## Build a Machine Learning Workflow

You can use a state machine workflow to create a model training pipeline. The AWS Stepfunctions Data Science  SDK provides several AWS SageMaker workflow steps that you can use to construct an ML pipeline. In this tutorial you will create the following steps:

* [**ETLStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/services.html) - Starts an AWS Glue Databrew job to extract the latest data from our source database and prepare our data.
* [**TrainingStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) - Creates the training step and passes the defined estimator.
* [**ModelStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) - Creates a model in SageMaker using the artifacts created during the TrainingStep.
* [**LambdaStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) - Creates the task state step within our workflow that calls a Lambda function.
* [**ChoiceStateStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Choice) - Creates the choice state step within our workflow.
* [**EndpointConfigStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) - Creates the endpoint config step to define the new configuration for our endpoint.
* [**EndpointStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointStep) - Creates the endpoint step to update our model endpoint.
* [**FailStateStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) - Creates fail state step within our workflow.

In [None]:
# SageMaker expects unique names for each job, model and endpoint. 
# If these names are not unique the execution will fail.
execution_input = ExecutionInput(schema={
    'TrainingJobName': str,
    'DatabrewJobName': str,
    'GlueETLJobName': str,
    'ModelName': str,
    'EndpointName': str,
    'LambdaFunctionName': str
})

### Create a step with AWS GlueDataBrew recipe Job
In the following cell, we create a DataBrew step that runs an AWS Glue DataBrew recipe job. The Glue job extracts the latest data from our source location, removes unnecessary columns, and perform few data cleansing operations. AWS Glue DataBrew is performing this extraction, transformation, and load (ETL) in a serverless fashion, so there are no compute resources to configure and manage.

In [None]:
recipe_step = steps.GlueDataBrewStartJobRunStep(
    'Extract, Transform, Load',
    parameters={"Name": execution_input['DatabrewJobName']}
)

### Create an ETL step with AWS Glue Job
In the following cell, we create a Glue step thats runs an AWS Glue job. The Glue job splits the data in to training and validation sets, and saves the data to CSV format in S3. Glue is performing this extraction, transformation, and load (ETL) in a serverless fashion, so there are no compute resources to configure and manage. See the [GlueStartJobRunStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.GlueStartJobRunStep) Compute step in the AWS Step Functions Data Science SDK documentation.

In [None]:
etl_step = steps.GlueStartJobRunStep('Split Train & Test DataSet',
    parameters={"JobName": execution_input['GlueETLJobName'],
                "Arguments":{
                    '--S3_SOURCE': recipe_data,
                    '--S3_DEST': 's3a://{}/{}/'.format(bucket, project_name),
                    '--TRAIN_KEY': train_prefix + '/',
                    '--VAL_KEY': val_prefix +'/'}
               }
)

### Create a SageMaker Training Step 

In the following cell, we create the training step and pass the estimator we defined above. See  [TrainingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
training_step = steps.TrainingStep(
    'Model Training', 
    estimator=xgb,
    data={
        'train': TrainingInput(train_data, content_type='text/csv'),
        'validation': TrainingInput(validation_data, content_type='text/csv')
    },
    job_name=execution_input['TrainingJobName'],
    wait_for_completion=True
)

### Create a Model Step 

In the following cell, we define a model step that will create a model in Amazon SageMaker using the artifacts created during the TrainingStep. See  [ModelStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) in the AWS Step Functions Data Science SDK documentation to learn more.

The model creation step typically follows the training step. The Step Functions SDK provides the [get_expected_model](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep.get_expected_model) method in the TrainingStep class to provide a reference for the trained model artifacts. Please note that this method is only useful when the ModelStep directly follows the TrainingStep.

In [None]:
model_step = steps.ModelStep(
    'Save Model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName'],
    result_path='$.ModelStepResults'
)

### Create a Lambda Step
In the following cell, we define a lambda step that will invoke the previously created lambda function as part of our Step Function workflow. See [LambdaStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
lambda_step = steps.compute.LambdaStep(
    'Query Training Results',
    parameters={  
        "FunctionName": execution_input['LambdaFunctionName'],
        'Payload':{
            "TrainingJobName.$": '$.TrainingJobName'
        }
    }
)

### Create a Choice State Step 
In the following cell, we create a choice step in order to build a dynamic workflow. This choice step branches based off of the results of our SageMaker training step: did the training job fail or should the model be saved and the endpoint be updated? We will add specific rules to this choice step later in this notebook.

In [None]:
check_accuracy_step = steps.states.Choice(
    'Accuracy > 90%'
)

### Create an Endpoint Configuration Step
In the following cell we create an endpoint configuration step. See [EndpointConfigStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
endpoint_config_step = steps.EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m4.xlarge'
)

### Update the Model Endpoint Step
In the following cell, we create the Endpoint step to deploy the new model as a managed API endpoint, updating an existing SageMaker endpoint if our choice state is successful.

In [None]:
endpoint_step = steps.EndpointStep(
    'Update Model Endpoint',
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName'],
    update=False
)

### Create the Fail State Step
In addition, we create a Fail step which proceeds from our choice state if the validation accuracy of our model is lower than the threshold we define. See [FailStateStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
fail_step = steps.states.Fail(
    'Model Accuracy Too Low',
    comment='Validation accuracy lower than threshold'
)

### Add Rules to Choice State
In the cells below, we add a threshold rule to our choice state. Therefore, if the validation accuracy of our model is below 0.90, we move to the Fail State. If the validation accuracy of our model is above 0.90, we move to the save model step with proceeding endpoint update. See [here](https://github.com/dmlc/xgboost/blob/master/doc/parameter.rst) for more information on how XGBoost calculates classification error.

For binary classification problems the XGBoost algorithm defines the model error as: 

\begin{equation*}
\frac{incorret\:predictions}{total\:number\:of\:predictions}
\end{equation*}

To achieve an accuracy of 90%, we need error <.10.

In [None]:
threshold_rule = steps.choice_rule.ChoiceRule.NumericLessThan(variable=lambda_step.output()['Payload']['trainingMetrics'][0]['Value'], value=.1)

check_accuracy_step.add_choice(rule=threshold_rule, next_step=endpoint_config_step)
check_accuracy_step.default_choice(next_step=fail_step)

### Link all the Steps Together
Finally, create your workflow definition by chaining all of the steps together that we've created. See [Chain](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.states.Chain) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
endpoint_config_step.next(endpoint_step)

In [None]:
workflow_definition = steps.Chain([
    recipe_step,
    etl_step,
    training_step,
    model_step,
    lambda_step,
    check_accuracy_step
])

## Run the Workflow
Create your workflow using the workflow definition above, and render the graph with [render_graph](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph):

In [None]:
workflow = Workflow(
    name='MarketingCampaignInference_{}'.format(id),
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input
)

# render workflow graph
workflow.render_graph()

# create workflow
workflow.create()



In [None]:
# run the workflow
execution = workflow.execute(
    inputs={
        'TrainingJobName': 'regression-{}'.format(id), # Each Sagemaker Job requires a unique name,
        'DatabrewJobName': recipe_job_name,
        'GlueETLJobName': etl_job_name,
        'ModelName': 'MarketingCampaignPrediction-{}'.format(id), # Each Model requires a unique name,
        'EndpointName': 'MarketingCampaign', # Each Endpoint requires a unique name
        'LambdaFunctionName': function_name
    }
)

## Step function output

![title](img/step_function_output.PNG)

## Clean Up
When you are done, make sure to clean up your AWS account by deleting resources you won't be reusing.