# Workflow

The following notebook contains the step functions workflow definition for training and baseline jobs

In [None]:
# Import the latest sagemaker, stepfunctions and boto3 SDKs
import sys
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install -qU awscli boto3 "sagemaker>=2.0.0" # 2.0.0
!{sys.executable} -m pip install -qU git+https://github.com/brightsparc/aws-step-functions-data-science-sdk-python.git@sagemaker-v2 
!{sys.executable} -m pip show sagemaker stepfunctions

In [None]:
import boto3
import json
import os
import time
import uuid

import sagemaker
from sagemaker.image_uris import retrieve 
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput
from sagemaker.model_monitor.dataset_format import DatasetFormat

import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

Load variables from environment

In [None]:
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
pipeline_name = os.environ['PIPELINE_NAME']
model_name = os.environ['MODEL_NAME']
workflow_pipeline_arn = os.environ['WORKFLOW_PIPELINE_ARN']
create_experiment_function_name = os.environ['CREATE_EXPERIMENT_LAMBDA']
query_training_function_name = os.environ['QUERY_TRAINING_LAMBDA']

# Get the session and default bucket
session = sagemaker.session.Session()
bucket = session.default_bucket()

print('region: {}'.format(region))
print('role: {}'.format(role))
print('pipeline: {}'.format(pipeline_name))
print('model name: {}'.format(model_name))
print('bucket: {}'.format(bucket))

Load the input data from the mlops notebook and print values

In [None]:
%store -r input_data 
input_data 

Specify the training model output base uri

In [None]:
output_data = {
    'ModelOutputUri': 's3://{}/{}/model'.format(bucket, model_name), 
}

## Define Training Resources

### Input Schema

Define the input schema for the step functions which can then be used as arguments to resources

In [None]:
execution_input = ExecutionInput(
    schema={
        "GitBranch": str,
        "GitCommitHash": str,
        "DataVersionId": str,
        "ExperimentName": str,
        "TrialName": str,
        "BaselineJobName": str,
        "BaselineOutputUri": str,
        "TrainingJobName": str
    }
)

### Define the model monitor baseline

Define the environment variables

In [None]:
dataset_format = DatasetFormat.csv()
env = {
    "dataset_format": json.dumps(dataset_format),
    "dataset_source": "/opt/ml/processing/input/baseline_dataset_input",
    "output_path": "/opt/ml/processing/output",
    "publish_cloudwatch_metrics": "Disabled", # Have to be disabled from processing job?
}

Define the processing inputs and outputs 

In [None]:
inputs = [
    ProcessingInput(
        source=input_data['BaselineUri'],
        destination="/opt/ml/processing/input/baseline_dataset_input",
        input_name="baseline_dataset_input",
    ),
]
outputs = [
    ProcessingOutput(
        source="/opt/ml/processing/output",
        destination=execution_input["BaselineOutputUri"],
        output_name="monitoring_output",
    ),
]

Create the baseline processing job using the sagemaker [model monitor](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_monitoring.html) container.

In [None]:
# Get the default model monitor container
region = boto3.Session().region_name
monor_monitor_container_uri = retrieve(region=region, framework="model-monitor", version="latest")

# Use the base processing where we pass through the 
monitor_analyzer = Processor(
    image_uri=monor_monitor_container_uri,
    role=role, 
    instance_count=1,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1800,
    env=env
)

Test the model baseline processing job by running inline

In [None]:
#monitor_analyzer.run(inputs=inputs, outputs=outputs, wait=True)

### Defining the Training Job

Define the training job to run in paralell with the processing job

In [None]:
image_uri = sagemaker.image_uris.retrieve(region=region, framework="xgboost", version="latest")

# Create the estimator
xgb = sagemaker.estimator.Estimator(
    image_uri,
    role,
    instance_count=1,
    instance_type="ml.m4.xlarge",
    output_path=output_data['ModelOutputUri'], # NOTE: Can't use execution_input here
)

# Set the hyperparameters overriding with any defaults
hyperparameters = {
    "max_depth": "9",
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "300",
    "subsample": "0.8",
    "objective": "reg:linear",
    "early_stopping_rounds": "10",
    "num_round": "3",
}
xgb.set_hyperparameters(**hyperparameters)

# Specify the data source
s3_input_train = sagemaker.inputs.TrainingInput(s3_data=input_data['TrainingUri'], content_type="csv")
s3_input_val = sagemaker.inputs.TrainingInput(s3_data=input_data['ValidationUri'], content_type="csv")
data = {"train": s3_input_train, "validation": s3_input_val}

Test the estimator directly in the notebook

In [None]:
#xgb.fit(inputs=data)

## Define Training Workflow

### 1. Create the Experiment

Define the create experiment lambda.

In future add [ResultsPath](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-resultpath.html) to filter the results.

In [None]:
create_experiment_step = steps.compute.LambdaStep(
    'Create Experiment',
    parameters={  
        "FunctionName": create_experiment_function_name,
        'Payload': {
            "ExperimentName.$": '$.ExperimentName',
            "TrialName.$": '$.TrialName',
        }
    },
    result_path='$.CreateTrialResults'
)

### 2a. Run processing Job

Define the processing job with a specific failure handling

In [None]:
baseline_step = steps.sagemaker.ProcessingStep(
    "Baseline Job",
    processor=monitor_analyzer,
    job_name=execution_input["BaselineJobName"],
    inputs=inputs,
    outputs=outputs,
    experiment_config={
        'ExperimentName': execution_input["ExperimentName"], # '$.ExperimentName', 
        'TrialName': execution_input["TrialName"],
        'TrialComponentDisplayName': "Baseline",
    },
    tags={
        "GitBranch": execution_input["GitBranch"],
        "GitCommitHash": execution_input["GitCommitHash"],
        "DataVersionId": execution_input["DataVersionId"],
    }
)

baseline_step.add_catch(steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=stepfunctions.steps.states.Fail(
        "Baseline failed", cause="SageMakerBaselineJobFailed"
    ),
))

### 2b. Run and query training Job

Define the training job and add a validation step

In [None]:
training_step = steps.TrainingStep(
    "Training Job",
    estimator=xgb,
    data=data,
    job_name=execution_input["TrainingJobName"],
    experiment_config={
        'ExperimentName': execution_input["ExperimentName"],
        'TrialName': execution_input["TrialName"],
        'TrialComponentDisplayName': "Training",
    },
    tags={
        "GitBranch": execution_input["GitBranch"],
        "GitCommitHash": execution_input["GitCommitHash"],
        "DataVersionId": execution_input["DataVersionId"],
    },
    result_path='$.TrainingResults'
)

training_step.add_catch(stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=stepfunctions.steps.states.Fail(
        "Training failed", cause="SageMakerTrainingJobFailed"
    ),
))

Create a model from the training job, note this must follow training to retrieve the expected model

In [None]:
# Must follow the training test
model_step = steps.sagemaker.ModelStep(
    'Save Model',
    input_path='$.TrainingResults',
    model=training_step.get_expected_model(),
    model_name=execution_input['TrainingJobName'],
    result_path='$.ModelStepResults'
)

Query training results, and validate that the RMSE error is within an acceptable range 

In [None]:
training_query_step = steps.compute.LambdaStep(
    'Query Training Results',
    parameters={  
        "FunctionName": query_training_function_name,
        'Payload':{
            "TrainingJobName.$": '$.TrainingJobName'
        }
    },
    result_path='$.QueryTrainingResults'
)

check_accuracy_fail_step = steps.states.Fail(
    'Model Error Too Low',
    comment='RMSE accuracy higher than threshold'
)

check_accuracy_succeed_step = steps.states.Succeed('Model Error Acceptable')

# TODO: Update query method to query validation error using better result path
threshold_rule = steps.choice_rule.ChoiceRule.NumericLessThan(
    variable=training_query_step.output()['QueryTrainingResults']['Payload']['results']['TrainingMetrics'][0]['Value'], value=10
)

check_accuracy_step = steps.states.Choice(
    'RMSE < 10'
)

check_accuracy_step.add_choice(rule=threshold_rule, next_step=check_accuracy_succeed_step)
check_accuracy_step.default_choice(next_step=check_accuracy_fail_step)

### 3. Add the Error handling in the workflow

We will use the [Catch Block](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/states.html#stepfunctions.steps.states.Catch) to perform error handling. If the Processing Job Step or Training Step fails, the flow will go into failure state.

In [None]:
sagemaker_jobs = steps.states.Parallel("SageMaker Jobs")
sagemaker_jobs.add_branch(baseline_step)
sagemaker_jobs.add_branch(steps.states.Chain([training_step, model_step, training_query_step, check_accuracy_step]))

# Do we need specific failure for the jobs for group?
sagemaker_jobs.add_catch(stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=stepfunctions.steps.states.Fail(
        "SageMaker Jobs failed", cause="SageMakerJobsFailed"
    ),
))

## Execute Training Workflow

In [None]:
# Attach to the existing workflow
workflow = Workflow.attach(workflow_pipeline_arn)
workflow

In [None]:
import time

workflow_graph = steps.states.Chain([
    create_experiment_step,
    sagemaker_jobs
])

workflow.update(workflow_graph)
print('Update workflow: {}'.format(workflow.state_machine_arn))

time.sleep(3) # Sleep to ensure workflow updated before we continue

Render the graph of the workflow as defined by the graph

In [None]:
workflow.render_graph()

We can also inspect the raw workflow definition and verify the execution variables are correctly passed in

In [None]:
print(workflow.definition.to_json(pretty=True))

In [None]:
print(workflow.get_cloudformation_template())

 Now we define the inputs for the workflow

In [None]:
# Define some dummy job and git params
job_id = uuid.uuid1().hex
git_branch = 'master'
git_commit_hash = 'xxx' 
data_verison_id = 'yyy'

# Define the experiment and trial name based on model name and job id
experiment_name = "mlops-{}".format(model_name)
trial_name = "mlops-{}-{}".format(model_name, job_id)

workflow_inputs = {
    "ExperimentName": experiment_name,
    "TrialName": trial_name,
    "GitBranch": git_branch,
    "GitCommitHash": git_commit_hash, 
    "DataVersionId": data_verison_id, 
    "BaselineJobName": trial_name, 
    "BaselineOutputUri": f"s3://{bucket}/{model_name}/monitoring/baseline/mlops-{model_name}-pbl-{job_id}",
    "TrainingJobName": trial_name
}
print(json.dumps(workflow_inputs))

Then execute the workflow

In [None]:
execution = workflow.execute(
    inputs=workflow_inputs
)
execution_output = execution.get_output(wait=False)

Render workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress).

This generates a snapshot of the current state of your workflow as it executes. Run the cell again to refresh progress or jump to step functions in the console.

In [None]:
execution.render_progress()

Use [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) to list all events in the workflow execution.

In [None]:
# execution.list_events(html=True) # Bug