# UDACITY Designing Your First Workflow - Step Functions

## Step Functions & SageMaker

In the prior exercises, we've been working with many small services. This can be overwhelming for a data scientist that wants to establish a consistent methodology for handling data. Step Functions is an orchestration service that can allow us to utilize SageMaker in a methodical and consistent way. Step Functions also integrates with Lambda, which can allow us to potentially automate our entire machine learning pipeline end-to-end. Let's get a handle on what a 'step' in a step function looks like.

In this exercise, you will create a preprocessing step and a training step. Then you will create a step function to chain the two steps.

## Exercise: Grant Permissions and install packages.

Attach the IAMFullAccess and the StepFunctionsFullAccess polices to your SageMaker execution role.

In [7]:
%%bash
pip install stepfunctions



  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes


## Exercise: Fill out preprocessing step.

The 'step' interface is designed to be quite similar to the Preprocessing Job in lesson 2. The main difference between these is the ability of a 'step' to interface with other steps. Given the successful outcome of a single step, the next step specified in a workflow will automatically continue. In our case, a training step will launch given the successful outcome of a preprocessing step. The preprocessing step has been encoded for you. Upload the preprocessing code 'HelloBlazePreprocess.py' and the zipped dataset 'reviews_Musical_Instruments_5.json.zip' to s3, and fill out the constants in the code below. 

Code below is the preprocessing step. Fill in the constants in the code.

In [8]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from stepfunctions.steps.sagemaker import ProcessingStep
import sagemaker

role = get_execution_role()

#PREPROCESSING_JOB_NAME = "sagemaker-scikit-learn-2021-12-07-15-18-27-558"
PREPROCESSING_JOB_NAME = "stepf-instruments-process-job5"
input_data = 's3://edgarin-mlend-c2/toys/instruments/reviews_Musical_Instruments_5.json.zip'
input_preprocessing_code = 's3://edgarin-mlend-c2/toys/instruments/stepfunctions/HelloBlazePreprocess.py'
sess = sagemaker.Session()

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.m5.large',
                                     instance_count=1)

processed_data_train = "s3://edgarin-mlend-c2/toys/instruments/stepfunctions/input-procesado-con-job-4/blaze_train_scikit/"
processed_data_test = "s3://edgarin-mlend-c2/toys/instruments/stepfunctions/input-procesado-con-job-4/blaze_test_scikit/"

inputs=[ProcessingInput(source=input_data, destination='/opt/ml/processing/input', input_name = 'input-1'),  
        ProcessingInput(source=input_preprocessing_code , destination='/opt/ml/processing/input/code', input_name = 'code')]

outputs=[ProcessingOutput(source='/opt/ml/processing/output/train', destination=processed_data_train, output_name = 'train_data'), 
         ProcessingOutput(source='/opt/ml/processing/output/test', destination=processed_data_test, output_name = 'test_data')]

processing_step = ProcessingStep(
    "SageMaker pre-processing step 4",
    processor=sklearn_processor,
    job_name=PREPROCESSING_JOB_NAME,
    inputs=inputs,
    outputs=outputs,
    container_entrypoint=["python3", "/opt/ml/processing/input/code/HelloBlazePreprocess.py"],
)
processing_step


SageMaker pre-processing step 4 ProcessingStep(resource='arn:aws:states:::sagemaker:createProcessingJob.sync', parameters={'ProcessingJobName': 'stepf-instruments-process-job5', 'ProcessingInputs': [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://edgarin-mlend-c2/toys/instruments/reviews_Musical_Instruments_5.json.zip', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://edgarin-mlend-c2/toys/instruments/stepfunctions/HelloBlazePreprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}], 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'train_data', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://edgarin-mlend-c2/toys/instruments/stepfunctions/input-

## Exercise: Fill out Training Step

Upon the success of the preprocessing step, we wish to execute a training step. A training step is defined below. Fill the constants in the code.

In [9]:
from stepfunctions.steps.sagemaker import TrainingStep
import boto3

WORKFLOW_OUTPUT = "s3://edgarin-mlend-c2/toys/instruments/stepfunctions/model/"
TRAINING_JOB_NAME = "stepf-instruments-training-job5"

region_name = boto3.Session().region_name
container = sagemaker.image_uris.retrieve(
    region=region_name, framework="blazingtext", version="latest"
)

helloBlazeEstimator = sagemaker.estimator.Estimator(
    container,
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size=30,
    max_run=360000,
    input_mode="File",
    output_path=WORKFLOW_OUTPUT,
    sagemaker_session=sess,
)

helloBlazeEstimator.set_hyperparameters(mode='supervised')

training_step = TrainingStep(
    "SageMaker Training Step",
    estimator=helloBlazeEstimator,
    data={
        "train": sagemaker.TrainingInput(processed_data_train, content_type="text/plain"), 
        "validation": sagemaker.TrainingInput(processed_data_test, content_type="text/plain")},
    job_name=TRAINING_JOB_NAME,
    wait_for_completion=True,
)
training_step

SageMaker Training Step TrainingStep(resource='arn:aws:states:::sagemaker:createTrainingJob.sync', parameters={'AlgorithmSpecification': {'TrainingImage': '811284229777.dkr.ecr.us-east-1.amazonaws.com/blazingtext:1', 'TrainingInputMode': 'File'}, 'OutputDataConfig': {'S3OutputPath': 's3://edgarin-mlend-c2/toys/instruments/stepfunctions/model/'}, 'StoppingCondition': {'MaxRuntimeInSeconds': 360000}, 'ResourceConfig': {'InstanceCount': 1, 'InstanceType': 'ml.m5.large', 'VolumeSizeInGB': 30}, 'RoleArn': 'arn:aws:iam::114256180253:role/service-role/AmazonSageMaker-ExecutionRole-20211127T235743', 'InputDataConfig': [{'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix', 'S3Uri': 's3://edgarin-mlend-c2/toys/instruments/stepfunctions/input-procesado-con-job-4/blaze_train_scikit/', 'S3DataDistributionType': 'FullyReplicated'}}, 'ContentType': 'text/plain', 'ChannelName': 'train'}, {'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix', 'S3Uri': 's3://edgarin-mlend-c2/toys/instruments/st

## Exercise: Create Workflow & Execute It. 

To link the steps, you'll need to create a role that is capable of doing so. Go to IAM and create a Step Functions role, and attach the CloudWatchEventsFullAccess and SageMakerFullAccess policies. Once done, make use of the above steps to create a workflow. Quick debugging tip: jobs must have a unique name; you'll need to rename job names when debugging. Consider creating a method that will dynamically create unique job names! 

In [10]:
from stepfunctions.steps import Chain
from stepfunctions.workflow import Workflow

workflow_role = "arn:aws:iam::114256180253:role/mi-stepfunctions-ml-role"

workflow_graph = Chain([processing_step, training_step])
workflow = Workflow(
    name="InstrumentsProcessTrainWorkflow5-txtoutput",
    definition=workflow_graph,
    role=workflow_role,
)

workflow.create()

execution = workflow.execute(
    # Comentado pq me di cuenta q no se necesita
    #inputs={
    #    "PreprocessingJobName": PREPROCESSING_JOB_NAME,  # Must be unique
    #    "TrainingJobName": TRAINING_JOB_NAME  # Must be unique       
    #}
    inputs={
        "foo": "Bar"
    }
)

execution_output = execution.get_output(wait=True)
execution_output

{'TrainingJobName': 'stepf-instruments-training-job5',
 'TrainingJobArn': 'arn:aws:sagemaker:us-east-1:114256180253:training-job/stepf-instruments-training-job5',
 'ModelArtifacts': {'S3ModelArtifacts': 's3://edgarin-mlend-c2/toys/instruments/stepfunctions/model/stepf-instruments-training-job5/output/model.tar.gz'},
 'TrainingJobStatus': 'Completed',
 'SecondaryStatus': 'Completed',
 'HyperParameters': {'mode': 'supervised'},
 'AlgorithmSpecification': {'TrainingImage': '811284229777.dkr.ecr.us-east-1.amazonaws.com/blazingtext:1',
  'TrainingInputMode': 'FILE'},
 'RoleArn': 'arn:aws:iam::114256180253:role/service-role/AmazonSageMaker-ExecutionRole-20211127T235743',
 'InputDataConfig': [{'ChannelName': 'train',
   'DataSource': {'S3DataSource': {'S3DataType': 'S3_PREFIX',
     'S3Uri': 's3://edgarin-mlend-c2/toys/instruments/stepfunctions/input-procesado-con-job-4/blaze_train_scikit/',
     'S3DataDistributionType': 'FULLY_REPLICATED'}},
   'ContentType': 'text/plain',
   'CompressionTy

You can track the outcome of this workflow through a custom UI that gets generated! Check it out!

In [11]:
execution.render_progress()