# Run Experimental Pipeline

### Preliminaries

#### Update Sagemaker and AWS CLI

In [None]:
!pip install --quiet --upgrade sagemaker

#### Import dependencies

In [None]:
import os

import boto3
import sagemaker
import sagemaker.session

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
)
from sagemaker.workflow.functions import (
    JsonGet,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model
from sagemaker.workflow.pipeline_context import PipelineSession

import json

In [None]:
BASE_DIR = os.path.abspath("")


def get_sagemaker_client(region):
    """Gets the sagemaker client.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """
    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")
    return sagemaker_client


def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )


def get_pipeline_session(region, default_bucket):
    """Gets the pipeline session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        PipelineSession instance
    """

    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")

    return PipelineSession(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        default_bucket=default_bucket,
    )


def get_pipeline_custom_tags(new_tags, region, sagemaker_project_arn=None):
    try:
        sm_client = get_sagemaker_client(region)
        response = sm_client.list_tags(ResourceArn=sagemaker_project_arn)
        project_tags = response["Tags"]
        for project_tag in project_tags:
            new_tags.append(project_tag)
    except Exception as e:
        print(f"Error getting project tags: {e}")
    return new_tags

# Sagemaker Pipelines

#### [General Overview](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html)

#### In-short:  A SageMaker pipeline is a series of interconnected steps used to describe and enable a concrete workflow. 

This pipeline definition encodes a pipeline using a directed acyclic graph (DAG) that can be exported as a JSON definition. This DAG gives information on the requirements for and relationships between each step of your pipeline. The structure of a pipeline's DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step's output are passed as the input to another step. 

SageMaker Pipelines are composed of *steps*. These steps define the actions that the pipeline takes and the relationships between steps using properties.

#### [Step Types: ](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html)
The following describes the requirements of each step type and provides an example implementation of the step. These are not functional implementations because they don't provide the resource and inputs needed. For a tutorial that implements these steps, see Create and Manage SageMaker Pipelines.

Amazon SageMaker Model Building Pipelines support the following step types:

- *[Processing](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing)* (we're using this one for step 1: Data Preprocessing | code: init_cache.py)
- *[Training](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training)* (we're using this one for step 2: training the CNN with Pytorch and capturing metrics of the process | code: train.py, dataset.py, blazeface.py)
- *[Tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-tuning)* (future optimization: hyperparemeter optimization)
- *Model*
- *CreateModel*
- *[RegisterModel](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-register-model)* (we're using it for registering the model in the model registry)
- Transform
- Condition
- Callback
- Lambda
- ClarifyCheck
- QualityCheck
- EMR
- Fail 

<br>
<br>

In [None]:
region = "us-east-2"
default_bucket = None
role = None
processing_instance_type = "ml.m5.xlarge"
training_instance_type = "ml.m5.xlarge"
base_job_prefix = "abalone"
model_package_group_name = "abalone-pipeline-exp-models"
base_job_prefix = "abalone"

config_file = open("../.sagemaker-code-config")
sagemaker_code_config = json.load(config_file)
sagemaker_pipeline_name = sagemaker_code_config["sagemakerPipelineName"]
pipeline_name = f"{sagemaker_pipeline_name}-experimental"

## Pipeline Definition

In [None]:
sagemaker_session = get_session(region, default_bucket)
if role is None:
    role = sagemaker.session.get_execution_role(sagemaker_session)

pipeline_session = get_pipeline_session(region, default_bucket)

# parameters for pipeline execution
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount", default_value=1
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-servicecatalog-seedcode-{region}/dataset/abalone-dataset.csv",
)

# processing step for feature engineering
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)
step_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(
            output_name="validation", source="/opt/ml/processing/validation"
        ),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code=os.path.join(BASE_DIR, "scripts/preprocess.py"),
    arguments=["--input-data", input_data],
)
step_process = ProcessingStep(
    name="PreprocessAbaloneData",
    step_args=step_args,
)

# training step for generating model artifacts
model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    base_job_name=f"{base_job_prefix}/abalone-train",
    sagemaker_session=pipeline_session,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)
step_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)
step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=step_args,
)

# processing step for evaluation
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-abalone-eval",
    sagemaker_session=pipeline_session,
    role=role,
)
step_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation", source="/opt/ml/processing/evaluation"
        ),
    ],
    code=os.path.join(BASE_DIR, "scripts/evaluate.py"),
)
evaluation_report = PropertyFile(
    name="AbaloneEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
step_eval = ProcessingStep(
    name="EvaluateAbaloneModel",
    step_args=step_args,
    property_files=[evaluation_report],
)

# register model step that will be conditionally executed
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
                "S3Uri"
            ]
        ),
        content_type="application/json",
    )
)
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(
    name="RegisterAbaloneModel",
    step_args=step_args,
)

# condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=6.0,
)
step_cond = ConditionStep(
    name="CheckMSEAbaloneEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[],
)

# pipeline instance
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
pipeline.start()