# Implement ML pipeline Using the AWS Glue Workflow

1. [Introduction](#Introduction)
1. [Setup](#Setup)
1. [Build a Machine Learning Workflow](#Build-a-Machine-Learning-Workflow)
1. [Run the Workflow](#Run-the-Workflow)
1. [Evaluate the deployed model](#Evaluate-the-deployed-model)
1. [Clean Up](#Clean-Up)

---

## Introduction

This notebook describes how to use Glue Workflow with PySpark scripts to create a machine learning pipeline across data preparation, model training, model evaluation and model register. The defintion of workflow as beflow:

<div align="center"><img width=600 src="images/glue_workflow_pipeline.png"></div>

## Setup

### IAM Permission and Role

* Required IAM roles on services.

To execute the notebook and Glue Workflow, we will need to manage access control for services.

  * IAM role for SageMaker (Studio) Notebook - the execution role configuration
    * Open the Amazon [SageMaker console](https://console.aws.amazon.com/sagemaker/). 
    * Get the SageMaker execution role from console (via opening SageMaker Notebook Instance detail or opening user profile detail under SageMaker Studio domain)
    * Open the SageMaker execution role from IAM, and attached below managed IAM policy for it:
        * arn:aws:iam::aws:policy/AWSGlueConsoleSageMakerNotebookFullAccess
                            
  * IAM role for Glue job to execute data access from S3 and model training on SageMaker
    * With executing a script to create role `AWS-Glue-S3-SageMaker-Access` below



### Import the Required Modules

In [2]:
import os
import sys
import uuid
import logging
import boto3
import time
from datetime import datetime

import sagemaker

from sagemaker.s3 import S3Uploader, S3Downloader

sys.path.insert( 0, os.path.abspath("./code") )
import setup_iam_roles

session = sagemaker.Session()

region = boto3.Session().region_name
bucket = session.default_bucket()

id = uuid.uuid4().hex

# SageMaker Execution Role
sagemaker_execution_role = sagemaker.get_execution_role()

# Create a unique name for the AWS Glue job to be created. If you change the
# default name, you may need to change the Step Functions execution role.
glue_job_prefix = "customer-churn-etl"
glue_job_name = f"{glue_job_prefix}-{id}"

# Create a unique name for the AWS Lambda function to be created. If you change
# the default name, you may need to change the Step Functions execution role.
query_function_prefix = "query-evaluation-result"
query_function_name = f"{query_function_prefix}-{id}"

# endpoint name
current_time = datetime.now()
timestamp_suffix = str(current_time.month) + "-" + str(current_time.day) + "-" + str(current_time.hour) + "-" + str(current_time.minute)

endpoint_name = f"gw-customer-churn-endpoint-{timestamp_suffix}"

prefix = 'sagemaker/DEMO-xgboost-customer-churn'

Create an IAM role for Glue Job
* Providing access on the S3 bucket
* Executing SageMaker training job and model deployment

In [3]:
glue_role_name = "AWS-Glue-S3-SageMaker-Access"
glue_role_arn = setup_iam_roles.create_glue_role(glue_role_name, bucket)
glue_role_arn

Using ARN from existing role: AWS-Glue-S3-SageMaker-Access


'arn:aws:iam::452533547478:role/AWS-Glue-S3-SageMaker-Access'

### Prepare the Dataset
This notebook uses the XGBoost algorithm to automate the classification of unhappy customers for telecommunication service providers. The goal is to identify customers who may cancel their service soon so that you can entice them to stay. This is known as customer churn prediction.

The dataset we use is publicly available and was mentioned in the book [Discovering Knowledge in Data](https://www.amazon.com/dp/0470908742/) by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets.

In [53]:
train_prefix = "train"
val_prefix = "validation"
test_prefix = "test"

raw_data = f"s3://{bucket}/{prefix}/input"
batch_transform_output = f"s3://{bucket}/{prefix}/batch_transform"
processed_data = f"s3://{bucket}/{prefix}/processed"

train_data = f"{processed_data}/{train_prefix}/"
validation_data = f"{processed_data}/{val_prefix}/"
test_data = f"{processed_data}/{test_prefix}/"

Upload data to `S3 Bucket`

In [5]:
S3Uploader.upload(
    local_path="../data/churn_processed.csv",
    desired_s3_uri=f"{raw_data}",
    sagemaker_session=session,
)

's3://sagemaker-ap-southeast-2-452533547478/sagemaker/DEMO-xgboost-customer-churn/input/churn_processed.csv'

## Build a Machine Learning Workflow

We are going to use Glue Workflow as the orchestration engine, Glue Job for the data preprocessing and model training/deployment as the steps

* [**Glue Workflow**](https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html) - Orchestration engine for ML workflow.
* [**Glue Job**](https://docs.aws.amazon.com/glue/latest/dg/author-job.html) - Business logic for ETL or python shell.
* [**Glue Trigger**](https://docs.aws.amazon.com/glue/latest/dg/trigger-job.html) - Triggers Glue Job as steps.

Once the Glue Workflow is created, you may view the the detail via: AWS Glue Console / Workflow / (To select the created workflow). It should be similar like:

<div align="center"><img width=500 src="images/glue_workflow.png"></div>

### Create AWS Glue Workflow

#### Create Glue Workflow Object


In [None]:
glue_client = boto3.client("glue")

In [None]:
glue_workflow_name = f"CustomerChurnMLWorkflow-{id}"
response = glue_client.create_workflow(
    Name=glue_workflow_name,
    Description='AWS Glue workflow to process data and create training jobs'
)

#### Create Glue Jobs 

In [None]:
# Data Processing Job
data_processing_script_path = S3Uploader.upload(
    local_path="./code/glue_preprocessing.py",
    desired_s3_uri=f"s3://{bucket}/{prefix}/glue/scripts",
    sagemaker_session=session,
)
data_processing_job_name = f"DataProcessingJob-{id}"
response = glue_client.create_job(
    Name=data_processing_job_name,
    Description='Preparing data for SageMaker training',
    Role=glue_role_arn,
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': data_processing_script_path,
    },
    DefaultArguments={
        "--job-bookmark-option": "job-bookmark-enable",
        "--enable-metrics": "",
        "--additional-python-modules": "pyarrow==2,awswrangler==2.9.0,fsspec==0.7.4"
    },
    MaxRetries=0,
    Timeout=60,
    MaxCapacity=10.0,
    GlueVersion='2.0'
)

In [None]:
# Model Training & Deployment Job
model_training_deployment_script_path = S3Uploader.upload(
    local_path="./code/model_training_deployment.py",
    desired_s3_uri=f"s3://{bucket}/{prefix}/glue/scripts",
    sagemaker_session=session
)

model_training_deployment_job_name = f"ModelTrainingDeploymentJob-{id}"
response = glue_client.create_job(
    Name=model_training_deployment_job_name,
    Description='Model training and deployment',
    Role=glue_role_arn,
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'pythonshell',
        'ScriptLocation': model_training_deployment_script_path,
        'PythonVersion': '3'
    },
    DefaultArguments={
        "--job-bookmark-option": "job-bookmark-enable",
        "--enable-metrics": ""
    },
    MaxRetries=0,
    Timeout=60,
    MaxCapacity=1,
    GlueVersion='1.0'
)

In [None]:
model_output_path = f"s3://{bucket}/{prefix}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
)

processed_data, sagemaker_execution_role, image_uri, model_output_path

#### Create Glue Triggers

In [None]:
data_processing_trigger_name = f'TriggerDataProcessingJob-{id}'
response = glue_client.create_trigger(
    Name=data_processing_trigger_name,
    Description='Triggering Data Processing Job',
    Type='ON_DEMAND',
    WorkflowName=glue_workflow_name,
    Actions=[
        {
            'JobName': data_processing_job_name,
            'Arguments': {
                '--INPUT_DIR': raw_data,
                '--PROCESSED_DIR': processed_data
            },
        },
    ]
)


In [None]:
model_train_deploy_trigger_name = f'TriggerModelTrainingDeploymentJob-{id}'
response = glue_client.create_trigger(
    Name=model_train_deploy_trigger_name,
    Description='Triggering Model Training Deployment Job',
    WorkflowName=glue_workflow_name,
    Type='CONDITIONAL',
    StartOnCreation=True,
    Predicate={
        'Conditions': [
            {
                'LogicalOperator': 'EQUALS',
                'JobName': data_processing_job_name,
                'State': 'SUCCEEDED'
            },
        ]
    },
    Actions=[
        {
            'JobName': model_training_deployment_job_name,
            'Arguments': {
                '--train_input_path': processed_data,
                '--model_output_path': model_output_path,
                '--algorithm_image': image_uri,
                '--role_arn': sagemaker_execution_role,
                '--endpoint_name': endpoint_name
            }
        }
    ]
)


In [None]:
import boto3
import sys
from datetime import datetime
# todo
# from awsglue.utils import getResolvedOptions

class ModelRun:

    def __init__(self, args):
        # todo
        # args = getResolvedOptions(sys.argv, ['train_input_path', 'model_output_path', 'algorithm_image', 'role_arn', 'endpoint_name'])
        current_time = datetime.now()
        self.train_input_path = args['train_input_path']
        self.model_output_path = args['model_output_path']
        self.algorithm_image = args['algorithm_image']
        self.role_arn = args['role_arn']
        timestamp_suffix = str(current_time.month) + "-" + str(current_time.day) + "-" + str(current_time.hour) + "-" + str(current_time.minute)
        self.training_job_name = 'gw-xgb-churn-pred' + timestamp_suffix
        self.endpoint = args['endpoint_name']
        
    def create_training_job(self):
        print("Started training job...")
        
        try:
            response = sagemaker.create_training_job(
                TrainingJobName=self.training_job_name,
                HyperParameters={
                    'max_depth': '5',
                    'eta': '0.2',
                    'gamma': '4',
                    'min_child_weight': '6',
                    'subsample': '0.8',
                    'silent': '0',
                    'objective': 'binary:logistic',
                    'num_round': '100',
                    'eval_metric': 'auc'
                },
                AlgorithmSpecification={
                    'TrainingImage': self.algorithm_image,
                    'TrainingInputMode': 'File'
                },
                RoleArn=self.role_arn,
                InputDataConfig=[
                    {
                        'ChannelName': 'train',
                        'DataSource': {
                            'S3DataSource': {
                                'S3DataType': 'S3Prefix',
                                'S3Uri': self.train_input_path + '/train',
                                'S3DataDistributionType': 'FullyReplicated'
                            }
                        },
                        'ContentType': 'text/csv',
                        'CompressionType': 'None'
                    },
                    {
                        'ChannelName': 'validation',
                        'DataSource': {
                            'S3DataSource': {
                                'S3DataType': 'S3Prefix',
                                'S3Uri': self.train_input_path + '/validation',
                                'S3DataDistributionType': 'FullyReplicated'
                            }
                        },
                        'ContentType': 'text/csv',
                        'CompressionType': 'None'
                    }
                ],
                OutputDataConfig={
                    'S3OutputPath': self.model_output_path
                },
                ResourceConfig={
                    'InstanceType': 'ml.m5.xlarge',
                    'InstanceCount': 1,
                    'VolumeSizeInGB': 20
                },
                StoppingCondition={
                    'MaxRuntimeInSeconds': 86400
                }
            )
            print("Training job has been created...")
        except Exception as e:
            print(e)
            print('Unable to create training job')
            raise(e)
            
    def describe_training_job(self):
        status = sagemaker.describe_training_job(
            TrainingJobName=self.training_job_name
        )
        print(self.training_job_name + " job status: ", status)
        print("Waiting for " + self.training_job_name + " training job to complete...")
        sagemaker.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=self.training_job_name)
        resp = sagemaker.describe_training_job(TrainingJobName=self.training_job_name)
        status = resp['TrainingJobStatus']
        print("Training job " + self.training_job_name + " ended with status: " + status)
        if status == 'Failed':
            message = resp['FailureReason']
            print('Training job {} failed with the following error: {}'.format(self.training_job_name, message))
            raise Exception('Creation of sagemaker Training job failed')
        return status

    def create_endpoint_config(self):

        endpoint_name = self.endpoint

        print("Creating model..")
        create_model = sagemaker.create_model(
            ModelName=endpoint_name,
            PrimaryContainer=
            {
                'Image': self.algorithm_image,
                'ModelDataUrl': f"{self.model_output_path}/{self.training_job_name}/output/model.tar.gz"
            },
            ExecutionRoleArn=self.role_arn
        )

        resp = sagemaker.create_endpoint_config(
            EndpointConfigName=endpoint_name,
            ProductionVariants=[
                {
                    'VariantName': '{}-variant-1'.format(endpoint_name),
                    'ModelName': endpoint_name,
                    'InitialInstanceCount': 1,
                    'InstanceType': 'ml.m5.large'
                }
            ])

        print(resp)
        return resp

    def create_endpoint(self):
        print("Creating endpoint..")
        endpoint_name = self.endpoint
        response = sagemaker.create_endpoint(
            EndpointName=endpoint_name,
            EndpointConfigName=endpoint_name
        )
        print(response)

    def describe_endpoint(self):
        status = sagemaker.describe_endpoint(EndpointName=self.endpoint)['EndpointStatus']
        print(self.endpoint + "endpoint is now in status:", status)
        print("Waiting for " + self.endpoint + " to be In-service...")
        sagemaker.get_waiter('endpoint_in_service').wait(EndpointName=self.endpoint)
        resp = sagemaker.describe_endpoint(EndpointName=self.endpoint)
        status = resp['EndpointStatus']
        print(self.endpoint + " endpoint is now in status:", status)
        if status == 'Failed':
            message = resp['FailureReason']
            print('Test Endpoint {} creation failed with the following error: {}'.format(self.endpoint, message))
            raise Exception('Endpoint creation failed')
        return status

    
    def create_batch_transform_job(self):
        batch_job_name = self.batch_transform_job_name
        model_name = self.model_name
        inference_output_location = self.inference_output_location
        inference_input_location = self.inference_input_location
        
        request = {
            "TransformJobName": batch_job_name,
            "ModelName": model_name,
            "TransformOutput": {
                "S3OutputPath": inference_output_location,
                "Accept": "text/csv",
                "AssembleWith": "Line",
            },
            "TransformInput": {
                "DataSource": {"S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": inference_input_location}},
                "ContentType": "text/csv",
                "SplitType": "Line",
                "CompressionType": "None",
            },
            "TransformResources": {"InstanceType": "ml.m5.xlarge", "InstanceCount": 1},
        }
        sagemaker.create_transform_job(**request)
        print("Created Transform job with name: ", batch_job_name)
        
    


In [None]:
# Configure SDK to sagemaker
sagemaker = boto3.client('sagemaker')    
s3 = boto3.resource('s3')

args = dict(
    train_input_path=processed_data,
    model_output_path=model_output_path,
    algorithm_image=image_uri,
    role_arn=sagemaker_execution_role,
    endpoint_name=endpoint_name
)

obj = ModelRun(args)

# Create training job
obj.create_training_job()

# Describe training job
status = obj.describe_training_job()

# Create endpoint conf
resp = obj.create_endpoint_config()

# Create endpoint for model
obj.create_endpoint()

# Describe endpoint
status = obj.describe_endpoint()



## Run the Workflow

Create your workflow using the workflow definition above, and render the graph with [render_graph](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph):

In [None]:
# quick test
response = glue_client.start_workflow_run(
    Name=glue_workflow_name
)

In [None]:
def check_workflow_state(workflow_name, run_id):
    resp = glue_client.get_workflow_run(
        Name=workflow_name,
        RunId=run_id,
        IncludeGraph=True
    )
    return resp['Run']['Status']

print('Checking workflow state:')
while True:
    workflow_status = check_workflow_state(glue_workflow_name, response['RunId'])
    if workflow_status in ['COMPLETED', 'STOPPED', 'ERROR']:
        print(workflow_status)
        break
    else:
        print('.')
    time.sleep(30)

## Evaluate the deployed model

Once the workflow execution is completed, we can execute below code cells to evaluate the test data result.

In [49]:
import pandas as pd
import numpy as np

from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    confusion_matrix,
    roc_curve,
)

sagemaker_runtime_client = boto3.Session().client('sagemaker-runtime')
sagemaker_client = boto3.Session().client('sagemaker')

In [50]:
endpoint_name = 'gw-customer-churn-endpoint-8-3-6-51'

In [51]:
def evaluate_model(endpoint_name, s3_object_uri):
    # download the data
    df = pd.read_csv(s3_object_uri, header=None)
    
    payload = df[df.columns[1:]].to_csv(header=False, index=False).encode("utf-8")

    response = sagemaker_runtime_client.invoke_endpoint(
        EndpointName=endpoint_name, 
        ContentType='text/csv', 
        Body=payload)

    result = response['Body'].read().decode()

    prediction_probabilities = np.asarray(result.split(','), dtype=float)
    predictions = np.round(prediction_probabilities)
    
    y_test = df[0]

    precision = precision_score(y_test, predictions)
    recall = recall_score(y_test, predictions)
    accuracy = accuracy_score(y_test, predictions)
    conf_matrix = confusion_matrix(y_test, predictions)
    fpr, tpr, _ = roc_curve(y_test, prediction_probabilities)
    
    return accuracy, precision, recall, conf_matrix
    

In [52]:
test_data_uri = f"{test_data}{test_data_file}"
accuracy, precision, recall, conf_matrix = evaluate_model(endpoint_name, test_data_uri)

In [24]:
accuracy, precision, recall, conf_matrix

(0.96,
 0.9615384615384616,
 0.9615384615384616,
 array([[23,  1],
        [ 1, 25]]))

### Evaluate and model registry

In [42]:
sagemaker_client = boto3.client('sagemaker')
sagemaker_runtime_client = boto3.client('sagemaker-runtime')

In [43]:
def create_model_package_group(model_package_group_name):
    try:
        return sagemaker_client.describe_model_package_group(
            ModelPackageGroupName=model_package_group_name
        )
    except Exception as e:
        # to create model package group
        model_package_group_input_dict = {
         "ModelPackageGroupName" : model_package_group_name,
         "ModelPackageGroupDescription" : "Sample model package group from Glue Workflow Demo"
        }

        response = sagemaker_client.create_model_package_group(**model_package_group_input_dict)
        return response
    
def create_model_package_group_version(model_package_group_name, container):
    modelpackage_inference_specification =  {
        "InferenceSpecification": {
            "Containers": [
             {
                "Image": container['Image'],
                "ModelDataUrl": container['ModelDataUrl']
             }
            ],
            "SupportedContentTypes": [ "text/csv" ],
            "SupportedResponseMIMETypes": [ "text/csv" ],
        }
    }

    create_model_package_input_dict = {
        "ModelPackageGroupName" : model_package_group_name,
        "ModelPackageDescription" : "Model to predict customer churn.",
        "ModelApprovalStatus" : "PendingManualApproval"
    }
    create_model_package_input_dict.update(modelpackage_inference_specification)

    create_model_package_response = sagemaker_client.create_model_package(**create_model_package_input_dict)
    return create_model_package_response

def get_model_name(endpoint_name):
    response = sagemaker_client.describe_endpoint_config(EndpointConfigName=endpoint_name)
    return response['ProductionVariants'][0]['ModelName']    

def register_model_package_group(endpoint_name):
    
    model_name = get_model_name(endpoint_name)
    
    response = sagemaker_client.describe_model(ModelName = model_name)
    container = response['PrimaryContainer']
    
    create_model_package_group(model_name)
    create_model_package_group_version(model_name, container)
        


In [44]:
# register model group
register_model_package_group(endpoint_name)

In [55]:
def create_batch_transform_job(endpoint_name, inference_input_location, inference_output_location):
    model_name = get_model_name(endpoint_name)
    batch_job_name = f"batch-transform-job-by-{model_name}"
    inference_output_location = inference_output_location
    inference_input_location = inference_input_location

    request = {
        "TransformJobName": batch_job_name,
        "ModelName": model_name,
        "TransformOutput": {
            "S3OutputPath": inference_output_location,
            "Accept": "text/csv",
            "AssembleWith": "Line",
        },
        "TransformInput": {
            "DataSource": {"S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": inference_input_location}},
            "ContentType": "text/csv",
            "SplitType": "Line",
            "CompressionType": "None",
        },
        "TransformResources": {"InstanceType": "ml.m5.xlarge", "InstanceCount": 1},
    }
    sagemaker.create_transform_job(**request)
    print("Created Transform job with name: ", batch_job_name)


In [56]:
# kick off batch transform job
inference_output_location = batch_transform_output
inference_input_location = test_data_uri
create_batch_transform_job(endpoint_name, inference_input_location, inference_output_location)

Created Transform job with name:  batch-transform-job-by-gw-customer-churn-endpoint-8-3-6-51


## Clean Up

When you are done, make sure to clean up your AWS account by deleting resources you won't be reusing. Uncomment the code below and run the cell to delete the Glue job, Lambda function, and Step Function.

In [None]:
# delete the jobs
for job_name in [data_processing_job_name, model_training_deployment_job_name]:
    glue_client.delete_job(JobName=job_name)

# delete the triggers    
for trigger_name in [data_processing_trigger_name, model_train_deploy_trigger_name]:
    glue_client.delete_trigger(Name=trigger_name)
    
# deletion
response = glue_client.delete_workflow(
    Name=glue_workflow_name
)


In [None]:
sagemaker_client = boto3.Session().client('sagemaker')

sagemaker_client.delete_endpoint(
    EndpointName=endpoint_name
)
