# UDACITY Designing Your First Workflow - Step Functions

## Step Functions & SageMaker

In the prior exercises, we've been working with many small services. This can be overwhelming for a data scientist that wants to establish a consistent methodology for handling data. Step Functions is an orchestration service that can allow us to utilize SageMaker in a methodical and consistent way. Step Functions also integrates with Lambda, which can allow us to potentially automate our entire machine learning pipeline end-to-end. Let's get a handle on what a 'step' in a step function looks like.

In this exercise, you will create a preprocessing step and a training step. Then you will create a step function to chain the two steps.

## Exercise: Grant Permissions and install packages.

Attach the IAMFullAccess and the StepFunctionsFullAccess polices to your SageMaker execution role.

In [1]:
%%bash
pip install stepfunctions


Collecting stepfunctions
  Downloading stepfunctions-2.3.0.tar.gz (67 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.1/67.1 kB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting boto3>=1.14.38 (from stepfunctions)
  Downloading boto3-1.35.8-py3-none-any.whl.metadata (6.6 kB)
Collecting botocore<1.36.0,>=1.35.8 (from boto3>=1.14.38->stepfunctions)
  Downloading botocore-1.35.8-py3-none-any.whl.metadata (5.7 kB)
Downloading boto3-1.35.8-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.1/139.1 kB[0m [31m20.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.35.8-py3-none-any.whl (12.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.5/12.5 MB[0m [31m77.0 MB/s[0m eta [36m0:00:00[0m:00:01[0m0:01[0m
[?25hBuilding wheels for collected packages: stepfunctions
  Building wheel for

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
aiobotocore 2.13.1 requires botocore<1.34.132,>=1.34.70, but you have botocore 1.35.8 which is incompatible.
amazon-sagemaker-sql-magic 0.1.3 requires sqlparse==0.5.0, but you have sqlparse 0.5.1 which is incompatible.
autogluon-common 0.8.3 requires pandas<1.6,>=1.4.1, but you have pandas 2.1.4 which is incompatible.
autogluon-core 0.8.3 requires pandas<1.6,>=1.4.1, but you have pandas 2.1.4 which is incompatible.
autogluon-core 0.8.3 requires scikit-learn<1.4.1,>=1.1, but you have scikit-learn 1.4.2 which is incompatible.
autogluon-features 0.8.3 requires pandas<1.6,>=1.4.1, but you have pandas 2.1.4 which is incompatible.
autogluon-features 0.8.3 requires scikit-learn<1.4.1,>=1.1, but you have scikit-learn 1.4.2 which is incompatible.
autogluon-multimodal 0.8.3 requires pandas<1.6,>=1.4.1, but you have pan

Successfully installed boto3-1.35.8 botocore-1.35.8 stepfunctions-2.3.0


[0m

In [None]:
# Todo
#s3://sagemaker-us-east-1-852197184467/l3e3/data/
import os
import boto3
s3_bucket = "sagemaker-us-east-1-852197184467"
s3_prefix = "l3e3/data"
file_name = "HelloBlazePreprocess.py"
def upload_file_to_s3(file_name, s3_prefix):
    object_name = os.path.join(s3_prefix, file_name)
    s3_client = boto3.client('s3')
    try:
        response = s3_client.upload_file(file_name, s3_bucket, object_name)
    except ClientError as e:
        logging.error(e)
        return False
    return

upload_file_to_s3(file_name, s3_prefix)

processing_job_input_path = "s3://" + "/".join([s3_bucket, s3_prefix, file_name])

print(processing_job_input_path)

## Exercise: Fill out preprocessing step.

The 'step' interface is designed to be quite similar to the Preprocessing Job in lesson 2. The main difference between these is the ability of a 'step' to interface with other steps. Given the successful outcome of a single step, the next step specified in a workflow will automatically continue. In our case, a training step will launch given the successful outcome of a preprocessing step. The preprocessing step has been encoded for you. Upload the preprocessing code 'HelloBlazePreprocess.py' and the zipped dataset 'reviews_Musical_Instruments_5.json.zip' to s3, and fill out the constants in the code below. 

Code below is the preprocessing step. Fill in the constants in the code.

In [2]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from stepfunctions.steps.sagemaker import ProcessingStep
import sagemaker

role = get_execution_role()

PREPROCESSING_JOB_NAME = "preprocess-job-01"
input_data = 's3://sagemaker-us-east-1-852197184467/l3e3/data/reviews_Musical_Instruments_5.json.zip'
input_preprocessing_code = 's3://sagemaker-us-east-1-852197184467/l3e3/data/HelloBlazePreprocess.py'
sess = sagemaker.Session()

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.m5.large',
                                     instance_count=1)


processed_data_train = "{}{}/{}".format("s3://", sess.default_bucket(), 'hello_blaze_train_scikit')
processed_data_test = "{}{}/{}".format("s3://", sess.default_bucket(), 'hello_blaze_test_scikit')

inputs=[ProcessingInput(source=input_data, destination='/opt/ml/processing/input', input_name = 'input-1'),  ProcessingInput(source=input_preprocessing_code , destination='/opt/ml/processing/input/code', input_name = 'code')]


outputs=[ProcessingOutput(source='/opt/ml/processing/output/train', destination=processed_data_train, output_name = 'train_data'), ProcessingOutput(source='/opt/ml/processing/output/test', destination=processed_data_test, output_name = 'test_data')]


processing_step = ProcessingStep(
    "SageMaker pre-processing step 4",
    processor=sklearn_processor,
    job_name=PREPROCESSING_JOB_NAME,
    inputs=inputs,
    outputs=outputs,
    container_entrypoint=["python3", "/opt/ml/processing/input/code/HelloBlazePreprocess.py"],
)


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


## Exercise: Fill out Training Step

Upon the success of the preprocessing step, we wish to execute a training step. A training step is defined below. Fill the constants in the code.

In [3]:
from stepfunctions.steps.sagemaker import TrainingStep
import boto3

WORKFLOW_OUTPUT = "s3://sagemaker-us-east-1-852197184467/l3e3/workflow_output/"
TRAINING_JOB_NAME = "test-job-train-001"

region_name = boto3.Session().region_name
container = sagemaker.image_uris.retrieve(
    region=region_name, framework="blazingtext", version="latest"
)

helloBlazeEstimator = sagemaker.estimator.Estimator(
    container,
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size=30,
    max_run=360000,
    input_mode="File",
    output_path=WORKFLOW_OUTPUT,
    sagemaker_session=sess,
)

helloBlazeEstimator.set_hyperparameters(mode='supervised')

training_step = TrainingStep(
    "SageMaker Training Step",
    estimator=helloBlazeEstimator,
    data={"train": sagemaker.TrainingInput(processed_data_train, content_type="text/plain"), "validation": sagemaker.TrainingInput(processed_data_test, content_type="text/plain")},
    job_name=TRAINING_JOB_NAME,
    wait_for_completion=True,
)

## Exercise: Create Workflow & Execute It. 

To link the steps, you'll need to create a role that is capable of doing so. Go to IAM and create a Step Functions role, and attach the CloudWatchEventsFullAccess and SageMakerFullAccess policies. Once done, make use of the above steps to create a workflow. Quick debugging tip: jobs must have a unique name; you'll need to rename job names when debugging. Consider creating a method that will dynamically create unique job names! 

In [4]:
from stepfunctions.steps import Chain
from stepfunctions.workflow import Workflow

workflow_role = "arn:aws:iam::852197184467:role/step-function-0001"

workflow_graph = Chain([processing_step, training_step])
workflow = Workflow(
    name="SageMakerProcessingWorkflow0001",
    definition=workflow_graph,
    role=workflow_role,
)

workflow.create()

execution = workflow.execute(
    inputs={
        "PreprocessingJobName": PREPROCESSING_JOB_NAME,  # Each pre processing job (SageMaker processing job) requires a unique name,
        "TrainingJobName": TRAINING_JOB_NAME  # Each Sagemaker Training job requires a unique name,       
    }
)

execution_output = execution.get_output(wait=True)

You can track the outcome of this workflow through a custom UI that gets generated! Check it out!

In [5]:
execution.render_progress()