In [1]:
import sys

!{sys.executable} -m pip install --upgrade stepfunctions

Collecting stepfunctions
  Downloading stepfunctions-2.3.0.tar.gz (67 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.1/67.1 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: stepfunctions
  Building wheel for stepfunctions (setup.py) ... [?25ldone
[?25h  Created wheel for stepfunctions: filename=stepfunctions-2.3.0-py2.py3-none-any.whl size=78149 sha256=9b25e2572cc7f5201113112e972f2d8d12a392ca5fe0f1dbc023d7487c81793b
  Stored in directory: /home/ec2-user/.cache/pip/wheels/e8/8e/d2/b36300e3b204b743131942831a469a45ee5e5180884f7306c5
Successfully built stepfunctions
Installing collected packages: stepfunctions
Successfully installed stepfunctions-2.3.0


In [60]:
import uuid
import logging
import stepfunctions
import boto3
import sagemaker

from sagemaker.amazon.amazon_estimator import image_uris
from sagemaker.inputs import TrainingInput
from sagemaker.s3 import S3Uploader
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

# Set up sagemaker connection session
session = sagemaker.Session()
# Set up logging level
stepfunctions.set_stream_logger(level=logging.INFO)

# Set up AWS python SDK session
region = boto3.Session().region_name
# Set up S3 bucket location 
bucket = session.default_bucket()
# Generate random UUID for this run
id = uuid.uuid4().hex

# Create a unique name for the AWS Glue job to be created. If you change the
# default name, you may need to change the Step Functions execution role.
job_name = "glue-customer-churn-etl-{}".format(id)

# Create a unique name for the AWS Lambda function to be created. If you change
# the default name, you may need to change the Step Functions execution role.
function_name = "query-training-status-{}".format(id)

In [61]:
# paste the role'ARN we created for this course 
workflow_execution_role = "arn:aws:iam::891377264944:role/tim-course-mlops"

# SageMaker Execution Role
# You can use sagemaker.get_execution_role() if running inside sagemaker's notebook instance
sagemaker_execution_role = (
    sagemaker.get_execution_role()
)

In [62]:
session = sagemaker.Session()
# In the case you cannot get the right default bucket, you can specify a S3 location you created manually
bucket = session.default_bucket()
print(bucket)

sagemaker-us-east-2-891377264944


In [63]:
# Same role we created in homework 1
glue_role = "arn:aws:iam::891377264944:role/tim-course-mlops"

In [64]:
# Same role we created in homework 1
lambda_role = "arn:aws:iam::891377264944:role/tim-course-mlops"

In [65]:
# Name anything you want
project_name = "ml_deploy"

# Copy customer churn csv data into this notebook instance
# Then use the following code to copy your local CSV to S3 location for model training
data_source = S3Uploader.upload(
    local_path="./data/customer-churn.csv",
    desired_s3_uri="s3://{}/{}".format(bucket, project_name),
    sagemaker_session=session,
)

train_prefix = "train"
val_prefix = "validation"

# Train and validation dataset location in S3
train_data = "s3://{}/{}/{}/".format(bucket, project_name, train_prefix)
validation_data = "s3://{}/{}/{}/".format(bucket, project_name, val_prefix)

In [66]:
# Copy glue_etl.py to notebook instance
# Upload glue script to S3 bucket
glue_script_location = S3Uploader.upload(
    local_path="./code/glue_etl.py",
    desired_s3_uri="s3://{}/{}".format(bucket, project_name),
    sagemaker_session=session,
)
glue_client = boto3.client("glue")

# create a ETL job in Glue to split training and validation dataset
response = glue_client.create_job(
    Name=job_name,
    Description="PySpark job to extract the data and split in to training and validation data sets",
    Role=glue_role,  # you can pass your existing AWS Glue role here if you have used Glue before
    ExecutionProperty={"MaxConcurrentRuns": 2},
    Command={"Name": "glueetl", "ScriptLocation": glue_script_location, "PythonVersion": "3"},
    DefaultArguments={"--job-language": "python"},
    GlueVersion="3.0",
    WorkerType="Standard",
    NumberOfWorkers=2,
    Timeout=60,
)

In [67]:
import zipfile

# Model validation pipeline
# Copy query_training_status.py to lcoal instance notebook
zip_name = "query_training_status.zip"
lambda_source_code = "./code/query_training_status.py"

# Zip the script
zf = zipfile.ZipFile(zip_name, mode="w")
zf.write(lambda_source_code, arcname=lambda_source_code.split("/")[-1])
zf.close()

# Copy zipped script to S3 for lambda use
S3Uploader.upload(
    local_path=zip_name,
    desired_s3_uri="s3://{}/{}".format(bucket, project_name),
    sagemaker_session=session,
)

's3://sagemaker-us-east-2-891377264944/ml_deploy/query_training_status.zip'

In [68]:
# Create lambda client
lambda_client = boto3.client("lambda")

# Create a lambda function for model result validation
response = lambda_client.create_function(
    FunctionName=function_name,
    Runtime="python3.9",
    Role=lambda_role,
    Handler="query_training_status.lambda_handler",
    Code={"S3Bucket": bucket, "S3Key": "{}/{}".format(project_name, zip_name)},
    Description="Queries a SageMaker training job and return the results.",
    Timeout=15,
    MemorySize=128,
)

In [69]:
# Retrive XGBoost algorithm container for training purpose
container = sagemaker.image_uris.retrieve("xgboost", region, "latest")

# Create XGBoost estimator (model) with m4.xlarge instance type
xgb = sagemaker.estimator.Estimator(
    container,
    sagemaker_execution_role,
    train_instance_count=1,
    train_instance_type="ml.m4.xlarge",
    output_path="s3://{}/{}/output".format(bucket, project_name),
)

# Set initial hyperparameter configurations
xgb.set_hyperparameters(
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.8,
    objective="binary:logistic",
    eval_metric="error",
    num_round=100,
)

In [70]:
# Specify model training execution configurations
# SageMaker expects unique names for each job, model and endpoint.
# If these names are not unique the execution will fail.
execution_input = ExecutionInput(
    schema={
        "TrainingJobName": str,
        "GlueJobName": str,
        "ModelName": str,
        "EndpointName": str,
        "LambdaFunctionName": str,
    }
)

In [71]:
# Start glue data ETL job run
etl_step = steps.GlueStartJobRunStep(
    "Extract, Transform, Load",
    parameters={
        "JobName": execution_input["GlueJobName"],
        "Arguments": {
            "--S3_SOURCE": data_source,
            "--S3_DEST": "s3a://{}/{}/".format(bucket, project_name),
            "--TRAIN_KEY": train_prefix + "/",
            "--VAL_KEY": val_prefix + "/",
        },
    },
)

In [72]:
# Define model training setp
training_step = steps.TrainingStep(
    "Model Training",
    estimator=xgb,
    data={
        "train": TrainingInput(train_data, content_type="text/csv"),
        "validation": TrainingInput(validation_data, content_type="text/csv"),
    },
    job_name=execution_input["TrainingJobName"],
    wait_for_completion=True,
)

In [73]:
# Define model store / register step
model_step = steps.ModelStep(
    "Save Model",
    model=training_step.get_expected_model(),
    model_name=execution_input["ModelName"],
    result_path="$.ModelStepResults",
)

In [74]:
# Define lambda for model validation step
lambda_step = steps.compute.LambdaStep(
    "Query Training Results",
    parameters={
        "FunctionName": execution_input["LambdaFunctionName"],
        "Payload": {"TrainingJobName.$": "$.TrainingJobName"},
    },
)

In [75]:
# Name accuracy check in AWS StepFunction
check_accuracy_step = steps.states.Choice("Accuracy > 90%")

In [76]:
# Configure model deployment endpoint
endpoint_config_step = steps.EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name=execution_input["ModelName"],
    model_name=execution_input["ModelName"],
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
)

In [77]:
# Update Model Endpoint
endpoint_step = steps.EndpointStep(
    "Update Model Endpoint",
    endpoint_name=execution_input["EndpointName"],
    endpoint_config_name=execution_input["ModelName"],
    # If you want continuous training in existing pipeline, need to modify this to true
    update=False,
)

In [78]:
# Name fail critera in AWS StepFunction
fail_step = steps.states.Fail(
    "Model Accuracy Too Low", comment="Validation accuracy lower than threshold"
)

In [79]:
# Define logic for model validation
threshold_rule = steps.choice_rule.ChoiceRule.NumericLessThan(
    variable=lambda_step.output()["Payload"]["trainingMetrics"][0]["Value"], value=0.1
)

check_accuracy_step.add_choice(rule=threshold_rule, next_step=endpoint_config_step)
check_accuracy_step.default_choice(next_step=fail_step)

In [80]:
# Define step function end
endpoint_config_step.next(endpoint_step)

Update Model Endpoint EndpointStep(resource='arn:aws:states:::sagemaker:createEndpoint', parameters={'EndpointConfigName': <stepfunctions.inputs.placeholders.ExecutionInput object at 0x7f93f8a18580>, 'EndpointName': <stepfunctions.inputs.placeholders.ExecutionInput object at 0x7f93f8a18520>}, type='Task')

In [81]:
# Chain model training automation as a pipeline
workflow_definition = steps.Chain(
    [etl_step, training_step, model_step, lambda_step, check_accuracy_step]
)

In [82]:
# Define workflow in AWS StepFunction
workflow = Workflow(
    name="MyInferenceRoutine_{}".format(id),
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input,
)

In [83]:
# Genearate DAG in graph in AWS StepFunction
workflow.render_graph()

In [84]:
# Create workflow in AWS StepFunction
workflow.create()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:us-east-2:891377264944:stateMachine:MyInferenceRoutine_1d3dd5c020804a86ad739f8a367e0cd4'

In [85]:
# Execute training automation workflow and pass parameters
execution = workflow.execute(
    inputs={
        "TrainingJobName": "regression-{}".format(id),  # Each Sagemaker Job requires a unique name,
        "GlueJobName": job_name,
        "ModelName": "CustomerChurn-{}".format(id),  # Each Model requires a unique name,
        "EndpointName": "CustomerChurn",  # Each Endpoint requires a unique name
        "LambdaFunctionName": function_name,
    }
)

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m


In [86]:
execution.render_progress()

In [87]:
execution.list_events()

[{'timestamp': datetime.datetime(2024, 3, 13, 7, 21, 41, 409000, tzinfo=tzlocal()),
  'type': 'ExecutionStarted',
  'id': 1,
  'previousEventId': 0,
  'executionStartedEventDetails': {'input': '{\n    "TrainingJobName": "regression-1d3dd5c020804a86ad739f8a367e0cd4",\n    "GlueJobName": "glue-customer-churn-etl-1d3dd5c020804a86ad739f8a367e0cd4",\n    "ModelName": "CustomerChurn-1d3dd5c020804a86ad739f8a367e0cd4",\n    "EndpointName": "CustomerChurn",\n    "LambdaFunctionName": "query-training-status-1d3dd5c020804a86ad739f8a367e0cd4"\n}',
   'inputDetails': {'truncated': False},
   'roleArn': 'arn:aws:iam::891377264944:role/tim-course-mlops'}},
 {'timestamp': datetime.datetime(2024, 3, 13, 7, 21, 41, 461000, tzinfo=tzlocal()),
  'type': 'TaskStateEntered',
  'id': 2,
  'previousEventId': 0,
  'stateEnteredEventDetails': {'name': 'Extract, Transform, Load',
   'input': '{\n    "TrainingJobName": "regression-1d3dd5c020804a86ad739f8a367e0cd4",\n    "GlueJobName": "glue-customer-churn-etl-1d3

In [88]:
workflow.list_executions(html=True)

Name,Status,Started,End Time
5186feed-be83-4486-b7f0-1c7ad75a0eb6,RUNNING,"Mar 13, 2024 07:21:41.409 AM",-


In [91]:
# Validate endpoint
endpoint_name="CustomerChurn"
sagemaker_runtime = boto3.client(
    "sagemaker-runtime", region_name='us-east-2')

response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name, 
    ContentType='text/csv',
    Body = "2.0,400.0,0.38571846040122537,2.0,4.177940384158745,0.0,3.745462710628048,250.0,3.699591756294294,1.0,11.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,1.0"
)
print(response['Body'].read().decode('utf-8'))

0.7669106721878052


In [None]:
# Clean up resource, note does not include delete SageMaker endpoint
# lambda_client.delete_function(FunctionName=function_name)
# glue_client.delete_job(JobName=job_name)
# workflow.delete()