# Managing ML workflows with SageMaker Pipelines

<img align="left" width="130" src="https://raw.githubusercontent.com/PacktPublishing/Amazon-SageMaker-Cookbook/master/Extra/cover-small-padded.png"/>

This notebook contains the code to help readers work through one of the recipes of the book [Machine Learning with Amazon SageMaker Cookbook: 80 proven recipes for data scientists and developers to perform ML experiments and deployments](https://www.amazon.com/Machine-Learning-Amazon-SageMaker-Cookbook/dp/1800567030)

### How to do it...

In [None]:
!mkdir -p tmp

In [None]:
g = "raw.githubusercontent.com"
p = "PacktPublishing"
a = "Amazon-SageMaker-Cookbook"
mc = "master/Chapter01"

path = f"https://{g}/{p}/{a}/{mc}/files"

In [None]:
csv = "management_experience_and_salary.csv"

!wget -P tmp {path}/{csv}

In [None]:
s3_bucket = "<insert S3 bucket name here>"
prefix = 'chapter09'
input_data_uri = f"s3://{s3_bucket}/{prefix}/input/{csv}"

In [None]:
!aws s3 cp tmp/{csv} {input_data_uri}

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_type = ParameterString(
    name="ProcessingInstanceType", 
    default_value="ml.m5.xlarge"
)

training_instance_type = ParameterString(
    name="TrainingInstanceType", 
    default_value="ml.m5.xlarge"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

In [None]:
from sagemaker import get_execution_role 

role = get_execution_role()

In [None]:
g = "raw.githubusercontent.com"
p = "PacktPublishing"
a = "Amazon-SageMaker-Cookbook"
mc = "master/Chapter09"

path = f"https://{g}/{p}/{a}/{mc}/scripts"

In [None]:
!wget -P tmp {path}/preprocessing.py

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=1,
    role=role,
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name="ProcessingStep",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=input_data, 
            destination="/opt/ml/processing/input"
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="output", 
            source="/opt/ml/processing/output"
        ),
    ],
    code="tmp/preprocessing.py",
)

In [None]:
import sagemaker 
import boto3
from sagemaker import get_execution_role 

role = get_execution_role()
session = sagemaker.Session()
region_name = boto3.Session().region_name

In [None]:
from sagemaker.image_uris import retrieve 

model_path = f"s3://{s3_bucket}/{prefix}/model"

container = retrieve(
    "linear-learner", 
    region_name, "1"
)

estimator = sagemaker.estimator.Estimator(
    container,
    role, 
    instance_count=1, 
    instance_type='ml.m5.xlarge',
    output_path=model_path,
    sagemaker_session=session
)

estimator.set_hyperparameters(
    predictor_type='regressor', 
    mini_batch_size=4
)

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

s3_input_data = step_process.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri

step_train = TrainingStep(
    name="TrainStep",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=s3_input_data,
            content_type="text/csv",
        )
    },
)

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"Pipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        training_instance_type,
        input_data,
    ],
    steps=[step_process, step_train],
)

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
execution.describe()

In [None]:
execution.wait()

In [None]:
execution.list_steps()

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

session = sagemaker.session.Session()
viz = LineageTableVisualizer(session)
ess = reversed(execution.list_steps())

for execution_step in ess:
    print(execution_step)
    display(viz.show(
        pipeline_execution_step=execution_step
    ))
    time.sleep(5)

In [None]:
from pprint import pprint

pprint(pipeline.describe())

In [None]:
# pipeline.delete()