## Exercise: Fill out Preprocess Step


In [13]:
%%bash
pip install stepfunctions



In [18]:
from sagemaker import get_execution_role
import sagemaker

role = get_execution_role()
print(role)
print(sagemaker.Session().default_bucket())
print(sagemaker.Session())

arn:aws:iam::248116845806:role/service-role/AmazonSageMaker-ExecutionRole-20220101T170667
sagemaker-us-east-1-248116845806
<sagemaker.session.Session object at 0x7f87c428eba8>


In [19]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from stepfunctions.steps.sagemaker import ProcessingStep
import sagemaker

role = get_execution_role()
print(role)
print(sagemaker.Session().default_bucket())
print(sagemaker.Session())

PREPROCESSING_JOB_NAME = 'test-job-preprocess-17'
input_data = 's3://sagemaker-us-east-1-248116845806/reviews_Musical_Instruments_5.json.zip'
input_preprocessing_code = 's3://sagemaker-us-east-1-248116845806/HelloBlazePreprocess.py'
sess = sagemaker.Session()

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.m5.large',
                                     instance_count=1)


processed_data_train = "{}{}/{}".format("s3://", sess.default_bucket(), '/hello_blaze_train_scikit')
processed_data_test = "{}{}/{}".format("s3://", sess.default_bucket(), '/hello_blaze_test_scikit')

inputs=[ProcessingInput(source=input_data, destination='/opt/ml/processing/input', input_name = 'input-1'),  ProcessingInput(source=input_preprocessing_code , destination='/opt/ml/processing/input/code', input_name = 'code')]


outputs=[ProcessingOutput(source='/opt/ml/processing/output/train', destination=processed_data_train, output_name = 'train_data'), ProcessingOutput(source='/opt/ml/processing/output/test', destination=processed_data_test, output_name = 'test_data')]


processing_step = ProcessingStep(
    "SageMaker pre-processing step 4",
    processor=sklearn_processor,
    job_name=PREPROCESSING_JOB_NAME,
    inputs=inputs,
    outputs=outputs,
    container_entrypoint=["python3", "/opt/ml/processing/input/code/HelloBlazePreprocess.py"],
)



print(" ".join([processed_data_train, processed_data_test]))
print(inputs)


arn:aws:iam::248116845806:role/service-role/AmazonSageMaker-ExecutionRole-20220101T170667
sagemaker-us-east-1-248116845806
<sagemaker.session.Session object at 0x7f87c5ce9748>
s3://sagemaker-us-east-1-248116845806//hello_blaze_train_scikit s3://sagemaker-us-east-1-248116845806//hello_blaze_test_scikit
[<sagemaker.processing.ProcessingInput object at 0x7f87c74144a8>, <sagemaker.processing.ProcessingInput object at 0x7f87c47a5470>]


## Exercise: Fill out Training Step

In [20]:
from stepfunctions.steps.sagemaker import TrainingStep
import boto3

WORKFLOW_OUTPUT = "s3://sagemaker-us-east-1-248116845806/ex3-output/"
TRAINING_JOB_NAME = "test-job-train-17"

region_name = boto3.Session().region_name
container = sagemaker.image_uris.retrieve(
    region=region_name, framework="blazingtext", version="latest"
)

helloBlazeEstimator = sagemaker.estimator.Estimator(
    container,
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size=30,
    max_run=360000,
    input_mode="File",
    output_path=WORKFLOW_OUTPUT,
    sagemaker_session=sess,
)

helloBlazeEstimator.set_hyperparameters(mode='supervised')

training_step = TrainingStep(
    "SageMaker Training Step",
    estimator=helloBlazeEstimator,
    data={"train": sagemaker.TrainingInput(processed_data_train, content_type="text/plain"), "validation": sagemaker.TrainingInput(processed_data_test, content_type="text/plain")},
    job_name=TRAINING_JOB_NAME,
    wait_for_completion=True,
)

## Exercise: Create & Execute Workflow

In [21]:
from stepfunctions.steps import Chain
from stepfunctions.workflow import Workflow

workflow_role = 'arn:aws:iam::248116845806:role/step-function-execution-role'

workflow_graph = Chain([processing_step, training_step])
workflow = Workflow(
    name="SageMakerProcessingWorkflow-17",
    definition=workflow_graph,
    role=workflow_role,
)

workflow.create()

# Execute workflow
execution = workflow.execute(
    inputs={
        "PreprocessingJobName": PREPROCESSING_JOB_NAME,  # Each pre processing job (SageMaker processing job) requires a unique name,
        "TrainingJobName": TRAINING_JOB_NAME  # Each Sagemaker Training job requires a unique name,       
    }
)

execution_output = execution.get_output(wait=True)


In [22]:
execution.render_progress()