# Machine Learning Workflow Automation using the Step Functions Data Science SDK

>__NOTE:__ This Notebook uses the _Python 3 (Data Science)_ Kernel.

---

## 1. Pre-Requisites

### Load the Step Functions Data Science Python Library

In [None]:
%%capture
!pip install stepfunctions==2.2.0 sagemaker==2.49.1

### Simulate Basic CodeBuild Environment Variables

In [None]:
import os
import boto3

os.environ["MODEL_NAME"] = "abalone"
os.environ["PIPELINE_NAME"] = "abalone-cicd-pipeline"
os.environ["BUCKET_NAME"] = f"""{boto3.client("ssm").get_parameter(Name="PipelineBucketName")["Parameter"]["Value"]}"""
os.environ["DATA_PREFIX"] = "abalone_data"
os.environ["EPOCHS"] = "200"
os.environ["BATCH_SIZE"] = "8"
os.environ["THRESHOLD"] = "2.1"

### Create `write_and_run` Cell Magic

In [None]:
from IPython.core.magic import register_cell_magic

@register_cell_magic
def custom_writefile(line, cell):
    print("Writing {}".format(line.split()[0]))
    with open(line.split()[0], "a") as f:
        f.write(cell)
    print("Running Cell")
    get_ipython().run_cell(cell)

---

## 2. Workflow Setup

### Import Required Python Libraries

In [None]:
!mkdir ../workflow

In [None]:
%%custom_writefile ../workflow/main.py

import io
import os
import random
import time
import uuid
import boto3
import botocore
import zipfile
import json
from time import gmtime, strftime, sleep
from botocore.exceptions import ClientError

import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import (
    Chain,
    ChoiceRule,
    ModelStep,
    ProcessingStep,
    TrainingStep,
    TuningStep,
    TransformStep,
    Task,
    EndpointConfigStep,
    EndpointStep,
    LambdaStep
)
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath
from stepfunctions.workflow import Workflow

import sagemaker
from sagemaker.tensorflow import TensorFlow
from sagemaker.tuner import IntegerParameter, ContinuousParameter, HyperparameterTuner
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.processing import ProcessingInput, ProcessingOutput, Processor
from sagemaker.s3 import S3Uploader
from sagemaker.sklearn.processing import SKLearnProcessor

### Configure Global Variables

In [None]:
%%custom_writefile ../workflow/main.py

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = get_execution_role()
sfn_client = boto3.client("stepfunctions")
lambda_client = boto3.client("lambda")
codepipeline_client = boto3.client("codepipeline")
ssm_client = boto3.client("ssm")

### Create Helper/Utility Functions

#### `get_workflow_role` Function

In [None]:
%%custom_writefile ../workflow/main.py

def get_workflow_role():
    try:
        response = ssm_client.get_parameter(
            Name="WorkflowRoleParameter",
        )
        return response["Parameter"]["Value"]
    except ClientError as e:
        error_message = e.response["Error"]["Message"]
        print(error_message)
        raise Exception(error_message)

#### `update_lambda` Function

In [None]:
%%custom_writefile ../workflow/main.py

def update_lambda(name, zip_name):
    lambda_client.update_function_code(
        FunctionName=name,
        ZipFile=open(zip_name, mode="rb").read(),
        Publish=True
    )

#### `get_lambda` Function

In [None]:
%%custom_writefile ../workflow/main.py

def get_lambda(name, bucket, description):
    print("Creating Lambda Package ")
    zip_name = f"../artifacts/{name}.zip"
    lambda_src = f"../artifacts/{name}.py"
    z = zipfile.ZipFile(zip_name, mode="w")
    z.write(lambda_src, arcname=lambda_src.split("/")[-1])
    z.close()
    print("Uploading Lambda Package to S3 ")
    S3Uploader.upload(
        local_path=zip_name,
        desired_s3_uri=f"s3://{bucket}/lambda",
    )
    
    try:
        print(f"Creating Lambda Function '{name}' ...")
        lambda_client.create_function(
            FunctionName=name,
            Runtime="python3.8",
            Role=get_workflow_role(),
            Handler=f"{name}.lambda_handler",
            Code={
                "S3Bucket": bucket,
                "S3Key": f"lambda/{name}.zip"
            },
            Description=description,
            Timeout=120,
            MemorySize=128
        )
    except ClientError as e:
        print(f"Lambda Function '{name}' already exists, re-creating ...")
        update_lambda(name, zip_name)
    
    return name

#### `get_execution_id` Function

In [None]:
%%custom_writefile ../workflow/main.py

def get_execution_id(name=None):
    try:
        response = codepipeline_client.get_pipeline_state(name=name)
        for stage in response["stageStates"]:
            if stage["stageName"] == "Build":
                for action in stage["actionStates"]:
                    if action["actionName"] == "BuildModel":
                        return stage["latestExecution"]["pipelineExecutionId"]
    except KeyError:
        return strftime('%Y%m%d%H%M%S', gmtime())
        

### Workflow Execution Parameters and Schema

#### Workflow Paramaters

In [None]:
%%custom_writefile ../workflow/main.py

execution_id = get_execution_id(name=os.environ["PIPELINE_NAME"])
model = os.environ["MODEL_NAME"]
data_prefix = os.environ["DATA_PREFIX"]
model_prefix = execution_id
bucket_name = os.environ["BUCKET_NAME"]
model_name = f"{model}-{execution_id}"
training_job_name = f"{model}-TrainingJob-{execution_id}"
preprocessing_job_name = f"{model}-ProcessingJob-{execution_id}"
evaluation_job_name = f"{model}-EvaluationJob-{execution_id}"
deeplearning_container_image = f"763104351884.dkr.ecr.{region}.amazonaws.com/tensorflow-training:2.5.0-cpu-py37-ubuntu18.04-v1.0"

#### Workflow Execution Schema

In [None]:
%%custom_writefile ../workflow/main.py

execution_input = ExecutionInput(
    schema={
        "ModelName": str,
        "PreprocessingJobName": str,
        "TrainingJobName": str,
        "EvaluationProcessingJobName": str
    }
)

### Data Configuration

In [None]:
%%custom_writefile ../workflow/main.py

s3_bucket_base_uri = f"s3://{bucket_name}"
input_data = os.path.join(s3_bucket_base_uri,  data_prefix, "raw/abalone.data")
output_data = os.path.join(s3_bucket_base_uri, data_prefix)
preprocessed_training_data = os.path.join(output_data, "input", "training")
preprocessed_testing_data = f"{output_data}/testing"
model_data_s3_uri = f"{s3_bucket_base_uri}/{model_prefix}/{training_job_name}/output/model.tar.gz"
output_model_evaluation_s3_uri = f"{s3_bucket_base_uri}/{model_prefix}/{training_job_name}/evaluation"

---

## 3. Data Processing Step

### Data Processing Artifact

In [None]:
!mkdir ../artifacts

In [None]:
%%writefile ../artifacts/preprocessing.py

import os
import pandas as pd
import numpy as np

prefix = "/opt/ml"
processing_path = os.path.join(prefix, "processing")
preprocessing_input_path = os.path.join(processing_path, "input")
preprocessing_output_path = os.path.join(processing_path, "output")


if __name__ == "__main__":
    print("Preprocessing Data")
    column_names = ["sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", "rings"]
    data = pd.read_csv(os.path.join(preprocessing_input_path, "abalone.data"), names=column_names)
    y = data.rings.values.reshape(len(data), 1)
    del data["rings"]
    print("Creating Catagorical Features")
    data = pd.get_dummies(data).to_numpy()
    X = np.concatenate((y, data), axis=1)
    print("Splitting Data into Training, Validation and, Test Datasets")
    training, validation, testing = np.split(X, [int(.8*len(X)), int(.95*len(X))])
    pd.DataFrame(training).to_csv(os.path.join(preprocessing_output_path, "training/training.csv"), header=False, index=False)
    pd.DataFrame(validation).to_csv(os.path.join(preprocessing_output_path, "training/validation.csv"), header=False, index=False)
    pd.DataFrame(testing).to_csv(os.path.join(preprocessing_output_path, "testing/testing.csv"), header=False, index=False)
    print("Done!")

### Data Processing Step Definition

In [None]:
%%custom_writefile ../workflow/main.py

processing_step = ProcessingStep(
    "Pre-process Data",
    processor=SKLearnProcessor(
        framework_version="0.23-1",
        role=role,
        instance_type="ml.m5.xlarge",
        instance_count=1,
        max_runtime_in_seconds=1200,
    ),
    job_name=execution_input["PreprocessingJobName"],
    inputs=[
        ProcessingInput(
            source=input_data,
            destination="/opt/ml/processing/input",
            input_name="input"
        ),
        ProcessingInput(
            source=sagemaker_session.upload_data(
                path="../artifacts/preprocessing.py",
                bucket=bucket_name,
                key_prefix=os.path.join(data_prefix, "code")
            ),
            destination="/opt/ml/processing/input/code",
            input_name="code"
        )
    ],
    outputs=[
        ProcessingOutput(
           source="/opt/ml/processing/output/training",
           destination=os.path.join(output_data, "input", "training"),
           output_name="training"
        ),
        ProcessingOutput(
            source="/opt/ml/processing/output/testing",
            destination=os.path.join(output_data, "testing"),
            output_name="testing"
        )
    ],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/preprocessing.py"],
)

---

## 4. Model Training Step

### TensorFlow Model Training Artifact

In [None]:
%%writefile ../artifacts/model_training.py

import argparse
import os
import tensorflow as tf
import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn import preprocessing

tf.get_logger().setLevel('ERROR')

if __name__ == "__main__":
    print(f"Tensorflow Version: {tf.__version__}")
    column_names = ["rings", "length", "diameter", "height", "whole weight", "shucked weight",
                    "viscera weight", "shell weight", "sex_F", "sex_I", "sex_M"]
    parser = argparse.ArgumentParser()
    parser.add_argument('--epochs', type=int, default=2)
    parser.add_argument('--batch-size', type=int, default=8)
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--training', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
    args, _ = parser.parse_known_args()
    epochs = args.epochs
    batch_size = args.batch_size
    training_path = args.training
    model_path = args.model_dir
    train_data = pd.read_csv(os.path.join(training_path, 'training.csv'), sep=',', names=column_names)
    val_data = pd.read_csv(os.path.join(training_path, 'validation.csv'), sep=',', names=column_names)
    train_y = train_data['rings'].to_numpy()
    train_X = train_data.drop(['rings'], axis=1).to_numpy()
    val_y = val_data['rings'].to_numpy()
    val_X = val_data.drop(['rings'], axis=1).to_numpy()
    train_X = preprocessing.normalize(train_X)
    val_X = preprocessing.normalize(val_X)
    network_layers = [
            Dense(64, activation="relu", kernel_initializer="normal", input_dim=10),
            Dense(64, activation="relu"),
            Dense(1, activation="linear")
        ]
    model = Sequential(network_layers)
    model.compile(optimizer='adam', loss='mse', metrics=['mae', 'accuracy'])
    model.summary()
    model.fit(
        train_X,
        train_y,
        validation_data=(val_X, val_y),
        batch_size=batch_size,
        epochs=epochs,
        shuffle=True,
        verbose=1
    )
    
    # Model Evaluation Format
    model.save(os.path.join(model_path, 'model.h5'))
    # TF Serving Format
    model_version = 1
    export_path = os.path.join(model_path, str(model_version))
    tf.keras.models.save_model(
        model,
        export_path,
        overwrite=True,
        include_optimizer=True,
        save_format=None,
        signatures=None,
        options=None
    )

### Training Step Definition

In [None]:
%%custom_writefile ../workflow/main.py

training_step = TrainingStep(
    "Model Training",
    estimator=TensorFlow(
        entry_point='../artifacts/model_training.py',
        role=role,
        hyperparameters={
            'epochs': int(os.environ['EPOCHS']),
            'batch-size': int(os.environ['BATCH_SIZE']),
        }, 
        train_instance_count=1,
        train_instance_type='ml.m5.xlarge',
        framework_version='2.4',
        py_version="py37",
        script_mode=True,
        output_path=os.path.join(s3_bucket_base_uri, model_prefix)
    ),
    data={"training": sagemaker.inputs.TrainingInput(preprocessed_training_data, content_type="csv")},
    job_name=execution_input["TrainingJobName"],
    wait_for_completion=True,
)

---

## 5. Model Evaluation Step

### Model Evaluation Artifact

In [None]:
%%writefile ../artifacts/evaluate.py

import json
import os
import tarfile
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn import preprocessing

def load_model(model_path):
    model = tf.keras.models.load_model(os.path.join(model_path, 'model.h5'))
    model.compile(optimizer='adam', loss='mse')
    return model


def evaluate_model(prefix, model):
    column_names = ["rings", "length", "diameter", "height", "whole weight", "shucked weight",
                    "viscera weight", "shell weight", "sex_F", "sex_I", "sex_M"]
    input_path = os.path.join(prefix, "processing/testing")
    output_path = os.path.join(prefix, "processing/evaluation")
    predictions = []
    truths = []
    test_df = pd.read_csv(os.path.join(input_path, "testing.csv"), names=column_names)
    y = test_df['rings'].to_numpy()
    X = test_df.drop(['rings'], axis=1).to_numpy()
    X = preprocessing.normalize(X)
    for row in range(len(X)):
        payload = [X[row].tolist()]
        result = model.predict(payload)
        print(result[0][0])
        predictions.append(float(result[0][0]))
        truths.append(float(y[row]))
    report = {
        "GroundTruth": truths,
        "Predictions": predictions
    }
    with open(os.path.join(output_path, "evaluation.json"), "w") as f:
        f.write(json.dumps(report))


if __name__ == "__main__":
    print("Extracting model archive ...")
    prefix = "/opt/ml"
    model_path = os.path.join(prefix, "model")
    tarfile_path = os.path.join(prefix, "processing/model/model.tar.gz")
    with tarfile.open(tarfile_path) as tar:
        tar.extractall(path=model_path)
    print("Loading Trained Model ...")
    model = load_model(model_path)
    print("Evaluating Trained Model ...")
    evaluate_model(prefix, model)
    print("Done!")

### Model Evaluation Step Definition

In [None]:
%%custom_writefile ../workflow/main.py

evaluation_step = ProcessingStep(
    "Model Evaluation",
    processor=Processor(
        image_uri=deeplearning_container_image,
        instance_count=1,
        instance_type="ml.m5.xlarge",
        role=role,
        max_runtime_in_seconds=1200
    ),
    job_name=execution_input["EvaluationProcessingJobName"],
    inputs=[
        ProcessingInput(
            source=preprocessed_testing_data,
            destination="/opt/ml/processing/testing",
            input_name="input"
        ),
        ProcessingInput(
            source=model_data_s3_uri,
            destination="/opt/ml/processing/model",
            input_name="model"
        ),
        ProcessingInput(
            source=sagemaker_session.upload_data(
                path="../artifacts/evaluate.py",
                bucket=bucket_name,
                key_prefix=os.path.join(data_prefix, "code")
            ),
            destination="/opt/ml/processing/input/code",
            input_name="code"
        )
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/evaluation",
            destination=output_model_evaluation_s3_uri,
            output_name="evaluation"
        )
    ],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/evaluate.py"]
)

---

## 6. Analyze Evaluation Metrics Step

### Lambda Function (`lambda_handler()`) to Analyze Evaluation Metrics

In [None]:
%%writefile ../artifacts/analyze_results.py

import os
import json
import logging
import boto3
import botocore
import math
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.debug("## Environment Variables ##")
    logger.debug(os.environ)
    logger.debug("## Event ##")
    logger.debug(event)
    s3 = boto3.client("s3")
    if ("Bucket" in event):
        bucket = event["Bucket"]
    else:
        raise KeyError("S3 'Bucket' not found in Lambda event!")
    if ("Key" in event):
        key = event["Key"]
    else:
        raise KeyError("S3 'Key' not found in Lambda event!")
    logger.info("Downloading evlauation results file ...")
    json_file = json.loads(s3.get_object(Bucket = bucket, Key = key)['Body'].read())
    logger.info("Analyzing Model Evaluation Results ...")
    y = json_file["GroundTruth"]
    y_hat = json_file["Predictions"]
    summation = 0
    for i in range (0, len(y)):
        squared_diff = (y[i] - y_hat[i])**2
        summation += squared_diff
    rmse = math.sqrt(summation/len(y))
    logger.info("Root Mean Square Error: {}".format(rmse))
    logger.info("Done!")
    return {
        "statusCode": 200,
        "Result": rmse,
    }



### Analyze Evaluation Metrics Step Definition

In [None]:
%%custom_writefile ../workflow/main.py

analyze_results_step = LambdaStep(
    "Analyze Evaluation Results",
    parameters={
        "FunctionName": get_lambda(
            "analyze_results",
            bucket_name,
            "Analyze the results from the Model Evaluation"
        ),
        "Payload": {
            "Bucket": bucket_name,
            "Key": f"""{model_prefix}/{training_job_name}/evaluation/evaluation.json"""
        }
    }
)

---

## 7. Register Trained Model Step

### Register Model Step Definition

In [None]:
%%custom_writefile ../workflow/main.py

register_model_step = ModelStep(
    "Register Trained Model",
    model=training_step.get_expected_model(),
    model_name=execution_input["ModelName"],
    instance_type="ml.m5.large"
)

---

## 8. Define the Step Functions Workflow

### Define the Workflow _Failure_ States

#### Workflow Failed State

In [None]:
%%custom_writefile ../workflow/main.py

workflow_failed_state = stepfunctions.steps.states.Fail(
    "ML Workflow Failed", cause="SageMakerProcessingJobFailed"
)
catch_state = stepfunctions.steps.states.Catch(error_equals=["States.TaskFailed"], next_step=workflow_failed_state)
processing_step.add_catch(catch_state)
training_step.add_catch(catch_state)
evaluation_step.add_catch(catch_state)
analyze_results_step.add_catch(catch_state)
register_model_step.add_catch(catch_state)

#### Model Evaluation Failed State

In [None]:
%%custom_writefile ../workflow/main.py

threshold_fail_state = stepfunctions.steps.states.Fail(
    "Model Evaluation Exceeds Threshold"
)

### Define Workflow _Success_ States

#### Model Evaluation Success State

In [None]:
%%custom_writefile ../workflow/main.py

threshold_pass_state = stepfunctions.steps.states.Pass(
    "Model Evaluation Below Threshold"
)

### Define Workflow _Choice_ States

#### Model Evaluation Threshold Choice State

In [None]:
%%custom_writefile ../workflow/main.py

check_threshold_step = steps.states.Choice(
    "Threshold Evaluation Check"
)
threshold_rule = steps.choice_rule.ChoiceRule.NumericLessThan(
    variable=analyze_results_step.output()['Payload']['Result'],
    value=float(os.environ["THRESHOLD"])
)
check_threshold_step.add_choice(rule=threshold_rule, next_step=threshold_pass_state)
check_threshold_step.default_choice(next_step=threshold_fail_state)

### Denfine the Overall Workflow Graph

In [None]:
%%custom_writefile ../workflow/main.py

ml_workflow_graph = Chain(
    [
        processing_step,
        training_step,
        register_model_step,
        evaluation_step,
        analyze_results_step,
        check_threshold_step
    ]
)

---

## 9. Perform Workflow System Tests (Optional)


In [None]:
ml_workflow = Workflow(
    name="abalone-workflow-unit-test",
    definition=ml_workflow_graph,
    role=get_workflow_role(),
)

ml_workflow.create()

In [None]:
json.loads(ml_workflow.definition.to_json(pretty=True))

In [None]:
execution = ml_workflow.execute(
    inputs={
        "ModelName": model_name,
        "PreprocessingJobName": preprocessing_job_name,
        "TrainingJobName": training_job_name,
        "EvaluationProcessingJobName": evaluation_job_name,
    }
)

execution_output = execution.get_output(wait=True)

In [None]:
ml_workflow.delete()

---

## 10. Continuous Integration

### Continuous Integration of the Workflow

>__NOTE:__ Ensure all previous State Machines have been deleted

In [None]:
%%writefile -a ../workflow/main.py

print("Creating ML Workflow")
ml_workflow = Workflow(
    name="abalone-workflow",
    definition=ml_workflow_graph,
    role=get_workflow_role(),
)

try:
    print("Creating Step Functions State Machine")
    ml_workflow.create()
except sfn_client.exceptions.StateMachineAlreadyExists:
    print("Found Existing State Machine, Updating the State Machine definition")
else:
    ml_workflow.update(ml_workflow_graph)
    time.sleep(120)

print("Executing ML Workflow State Machine")
ml_workflow.execute(
    inputs={
        "ModelName": model_name,
        "PreprocessingJobName": preprocessing_job_name,
        "TrainingJobName": training_job_name,
        "EvaluationProcessingJobName": evaluation_job_name
    }
)

<!-- #### Save Execution Input Variables for Integration -->

**Commit changes to the CodeCommit Repository to Start the Continuous Workflow**