# Amazon SageMaker Workshop
### _**Pipelines**_

---
In this part of the workshop we will all our previous work from the labs and will automate the whole ML workflow. With that we can make the whole process more robust and any updates to the data preparation, modeling, evaluation, inference and monitoring will be put into production faster and more reliable.

---

## Contents

a. [Background](#background) - Getting the work from previous labs.

b. [Create the training pipeline](#Create_pipeline) - featuring [SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html)
1. [Creating data preparation step](#dataprep_step)
2. [Creating training step](#train_step)
3. [Creating evaluation step](#eval_step)
4. [Creating approve and register model steps](#appr_model_reg_step)
5. [Finish the pipeline](#end_creation_pipe)

d. [Create the end-to-end solution automatically](#SM_Projects) - Create end-to-end ML solutions with CI/CD (featuring [SageMaker Projects](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects.html))
1. customize the project with our pipeline and code
2. trigger training pipeline
3. trigger deployment pipeline

---
<a id='background'></a>

## Background

In the previous labs we created multiple resources to prepare the data (_2-DataPrep_), train the model (_3-Modeling_), evaluate model performance (_4-Evaluation_), deploy and customize inference logic (_4-Deployment/RealTime_) and monitor the deployed model (_5-Monitoring_).

Now it's time to **bring everything together**!

We will create a pipeline with 5 steps:

1. Data preparation
2. Training
3. Evaluation
4. Approve model
5. Save to model registry step

We will build our pipeline iterating little by little.

---

In [2]:
#Load libraries

import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker
from time import gmtime, strftime, sleep
from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.conditions import (
    ConditionGreaterThan,
    ConditionGreaterThanOrEqualTo
)
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import (
    Join,
    JsonGet
)
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)
from sagemaker.drift_check_baselines import DriftCheckBaselines

from sagemaker.image_uris import retrieve

sagemaker.__version__

'2.165.0'

In [3]:
%store -r 

%store

try:
    initialized
except NameError:
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN 00-start-here notebook   ")
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")

Stored variables and their in-db values:
bucket_name                              -> 'sagemaker-zara-blouses-generation'
bucket_prefix                            -> 'loading_dataset'
customers_count                          -> 10000
customers_feature_group_name             -> 'fscw-customers-06-21-08-51'
domain_id                                -> 'd-ivd5gnez0yil'
initialized                              -> True
orders_count                             -> 100000
orders_feature_group_name                -> 'fscw-orders-06-21-08-51'
products_count                           -> 17001
products_feature_group_name              -> 'fscw-products-06-21-08-51'
region                                   -> 'eu-central-1'
sm_role                                  -> 'arn:aws:iam::567821811420:role/service-role/Amazo


SET CONSTANTS

In [4]:
# Set names of pipeline objects
project = "zara-course-project"

pipeline_name = f"{project}-pipeline"
pipeline_model_name = f"{project}-model-txt2img"
model_package_group_name = f"{project}-model-group"
endpoint_config_name = f"{project}-endpoint-config"
endpoint_name = f"{project}-endpoint"

In [5]:
# Set instance types and counts
process_instance_type = "ml.c5.xlarge"
train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

In [6]:
# Set S3 urls for processed data
train_s3_url = f"s3://{bucket_name}/{bucket_prefix}/train"
validation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/validation"
test_s3_url = f"s3://{bucket_name}/{bucket_prefix}/test"
baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/baseline"

evaluation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/evaluation"
prediction_baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/prediction_baseline"

output_s3_url = f"s3://{bucket_name}/{bucket_prefix}/output"

In [7]:
%store train_s3_url
%store validation_s3_url
%store test_s3_url
%store baseline_s3_url
%store model_package_group_name
%store evaluation_s3_url
%store prediction_baseline_s3_url
%store output_s3_url

Stored 'train_s3_url' (str)
Stored 'validation_s3_url' (str)
Stored 'test_s3_url' (str)
Stored 'baseline_s3_url' (str)
Stored 'model_package_group_name' (str)
Stored 'evaluation_s3_url' (str)
Stored 'prediction_baseline_s3_url' (str)
Stored 'output_s3_url' (str)


In [8]:
print(f"Train S3 url: {train_s3_url}")
print(f"Validation S3 url: {validation_s3_url}")
print(f"Test S3 url: {test_s3_url}")
print(f"Data baseline S3 url: {baseline_s3_url}")
print(f"Evaluation metrics S3 url: {evaluation_s3_url}")
print(f"Model prediction baseline S3 url: {prediction_baseline_s3_url}")

Train S3 url: s3://sagemaker-zara-blouses-generation/loading_dataset/train
Validation S3 url: s3://sagemaker-zara-blouses-generation/loading_dataset/validation
Test S3 url: s3://sagemaker-zara-blouses-generation/loading_dataset/test
Data baseline S3 url: s3://sagemaker-zara-blouses-generation/loading_dataset/baseline
Evaluation metrics S3 url: s3://sagemaker-zara-blouses-generation/loading_dataset/evaluation
Model prediction baseline S3 url: s3://sagemaker-zara-blouses-generation/loading_dataset/prediction_baseline


In [9]:
try:
    input_s3_url
except NameError:      
    # If input_s3_url is not defined, upload the dataset to S3 and store the path
    input_s3_url = sagemaker.Session().upload_data(
        path="data/bank-additional/bank-additional-full.csv",
        bucket=bucket_name,
        key_prefix=f"{bucket_prefix}/input"
    )
    print(f"Upload the dataset to {input_s3_url}")

    %store input_s3_url

FileNotFoundError: [Errno 2] No such file or directory: 'data/bank-additional/bank-additional-full.csv'

In [10]:
%store -r bucket_name
%store -r bucket_prefix
%store -r region
%store -r docker_image_name

no stored variable or alias docker_image_name


In [11]:
bucket, prefix, region, docker_image_name

NameError: name 'bucket' is not defined

In [12]:
#Supress default INFO logging
import logging
logger = logging.getLogger()
logger.setLevel(logging.ERROR)

In [13]:
import sagemaker 
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.session.Session()

---
<a id="Create_pipeline"></a>
# Create the training pipeline with SageMaker Pipelines

<a id="dataprep_step"></a>

## 1. Create data preparation step

Get the raw data location and the S3 URI where our code for data preparation was stored:

In [14]:
%store -r s3uri_raw
s3uri_raw

no stored variable or alias s3uri_raw


NameError: name 's3uri_raw' is not defined

In [None]:
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)

This first step will receive some inputs:

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

In [None]:
# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

# Set model approval param
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

# Minimal threshold for model performance on the test dataset
test_score_threshold_param = ParameterFloat(
    name="TestScoreThreshold", 
    default_value=0.75
)

# Set S3 url for input dataset
input_s3_url_param = ParameterString(
    name="InputDataUrl",
    default_value=input_s3_url,
)

In [None]:
# Parameters for data preparation step
input_data = ParameterString(
    name="InputDataUrl",
    default_value=s3uri_raw # S3 URI where we stored the raw data
)
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount", default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)

In [None]:
from my_labs_solutions.dataprep_solution import get_dataprep_processor
sklearn_processor = get_dataprep_processor(processing_instance_type, processing_instance_count, role)
sklearn_processor

In [None]:
# Processing step for feature engineering
step_process = ProcessingStep(
    name="CustomerChurnProcess",  # choose any name
    processor=sklearn_processor,
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(
            output_name="validation", source="/opt/ml/processing/validation"
        ),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code=s3_dataprep_code_uri,
    job_arguments=["--input-data", input_data],
)

Preprocessing step

In [None]:
%%writefile preprocessing.py

import pandas as pd
import numpy as np
import argparse
import os

def _parse_args():
    
    parser = argparse.ArgumentParser()
    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--filepath', type=str, default='/opt/ml/processing/input/')
    parser.add_argument('--filename', type=str, default='bank-additional-full.csv')
    parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')
    
    return parser.parse_known_args()


if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    
    target_col = "y"
    
    # Load data
    df_data = pd.read_csv(os.path.join(args.filepath, args.filename), sep=";")

    # Indicator variable to capture when pdays takes a value of 999
    df_data["no_previous_contact"] = np.where(df_data["pdays"] == 999, 1, 0)

    # Indicator for individuals not actively employed
    df_data["not_working"] = np.where(
        np.in1d(df_data["job"], ["student", "retired", "unemployed"]), 1, 0
    )

    # remove unnecessary data
    df_model_data = df_data.drop(
        ["duration", "emp.var.rate", "cons.price.idx", "cons.conf.idx", "euribor3m", "nr.employed"],
        axis=1,
    )

    df_model_data = pd.get_dummies(df_model_data)  # Convert categorical variables to sets of indicators

    # Replace "y_no" and "y_yes" with a single label column, and bring it to the front:
    df_model_data = pd.concat(
        [
            df_model_data["y_yes"].rename(target_col),
            df_model_data.drop(["y_no", "y_yes"], axis=1),
        ],
        axis=1,
    )

    # Shuffle and splitting dataset
    train_data, validation_data, test_data = np.split(
        df_model_data.sample(frac=1, random_state=1729),
        [int(0.7 * len(df_model_data)), int(0.9 * len(df_model_data))],
    )

    print(f"Data split > train:{train_data.shape} | validation:{validation_data.shape} | test:{test_data.shape}")
    
    # Save datasets locally
    train_data.to_csv(os.path.join(args.outputpath, 'train/train.csv'), index=False, header=False)
    validation_data.to_csv(os.path.join(args.outputpath, 'validation/validation.csv'), index=False, header=False)
    test_data[target_col].to_csv(os.path.join(args.outputpath, 'test/test_y.csv'), index=False, header=False)
    test_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'test/test_x.csv'), index=False, header=False)
    
    # Save the baseline dataset for model monitoring
    df_model_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'baseline/baseline.csv'), index=False, header=False)
    
    print("## Processing complete. Exiting.")

In [None]:
sklearn_processor = SKLearnProcessor(
        framework_version="0.23-1",
        role=sm_role,
        instance_type=process_instance_type_param,
        instance_count=1,
        base_job_name=f"{pipeline_name}/preprocess",
        sagemaker_session=session,
    )
    
processing_inputs=[
    ProcessingInput(source=input_s3_url_param, destination="/opt/ml/processing/input")
]

processing_outputs=[
    ProcessingOutput(output_name="train_data", source="/opt/ml/processing/output/train", 
                     destination=train_s3_url),
    ProcessingOutput(output_name="validation_data", source="/opt/ml/processing/output/validation",
                     destination=validation_s3_url),
    ProcessingOutput(output_name="test_data", source="/opt/ml/processing/output/test",
                     destination=test_s3_url),
    ProcessingOutput(output_name="baseline_data", source="/opt/ml/processing/output/baseline", 
                     destination=baseline_s3_url),
]

processor_args = sklearn_processor.run(
    inputs=processing_inputs,
    outputs=processing_outputs,
    code='preprocessing.py',
    # arguments = ['arg1', 'arg2'],
)
    
# Define processing step
step_process = ProcessingStep(
    name=f"{pipeline_name}-preprocess-data",
    step_args=processor_args,
)

## Create the first iteration of the Pipeline

We will create a simple pipeline that receives some inputs and just have 1 data preparation step:

In [None]:
from time import strftime, gmtime
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.step_collections import RegisterModel

You can associate SageMaker Experiments with Pipelines to help track multiple moving pieces (ML hyperparameters, data, artifacts, plots, metrics, etc. - a.k.a. [ML lineage tracking](https://docs.aws.amazon.com/sagemaker/latest/dg/lineage-tracking.html)) 

In [None]:
# Experiment configs
create_date = lambda: strftime("%Y-%m-%d-%H-%M-%S", gmtime())

experiment_name=f"pipeline-customer-churn-prediction-xgboost-{create_date()}"
trial_name=f"pipeline-framework-trial-{create_date()}"
pipeline_name = f"ChurnMLPipeline"

In [None]:
pipeline_experiment_config = PipelineExperimentConfig(
    experiment_name = experiment_name,
    trial_name = trial_name
)

In [None]:
# Pipeline with just input parameters and 1 step for data prep
pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            input_data,
            processing_instance_type,
            processing_instance_count,
        ],
        steps=[step_process],
        sagemaker_session=sagemaker_session,
    )

In [None]:
# Validate that pipeline was configured correctly and load its definition
import json
json.loads(pipeline.definition())

#### Ok, looks good. Let's create the pipeline:

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
execution.describe()

In [None]:
execution.list_steps()

In [None]:
# If we wanted to wait for execution to end:
# execution.wait()

---

<a id="train_step"></a>

# 2. Create modeling step

In [None]:
%store -r s3_model
%store -r train_script_name
s3_modeling_code_uri, train_script_name

In [None]:
model_id, model_version = "model-txt2img-stabilityai-stable-diffusion-v2-1-base", "*"

In [None]:
from my_labs_solutions.modeling_solution import get_modeling_estimator

xgb_train = get_modeling_estimator(bucket,
                                   prefix,
                                   s3_modeling_code_uri, 
                                   docker_image_name,
                                   role,
                                   entry_point_script = train_script_name)
xgb_train

In [None]:
from sagemaker.inputs import TrainingInput

In [None]:
# Instantiate an XGBoost estimator object
estimator = sagemaker.estimator.Estimator(
    image_uri=s3_model,
    role=sm_role, 
    instance_type=train_instance_type_param,
    instance_count=train_instance_count_param,
    output_path=output_s3_url,
    sagemaker_session=session,
    base_job_name=f"{pipeline_name}/train",
)

# Define algorithm hyperparameters
estimator.set_hyperparameters(
    num_round=150, # the number of rounds to run the training
    max_depth=5, # maximum depth of a tree
    eta=0.5, # step size shrinkage used in updates to prevent overfitting
    alpha=2.5, # L1 regularization term on weights
    objective="binary:logistic",
    eval_metric="auc", # evaluation metrics for validation data
    subsample=0.8, # subsample ratio of the training instance
    colsample_bytree=0.8, # subsample ratio of columns when constructing each tree
    min_child_weight=3, # minimum sum of instance weight (hessian) needed in a child
    early_stopping_rounds=10, # the model trains until the validation score stops improving
    verbosity=1, # verbosity of printing messages
)

training_inputs = {
    "train": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
    "validation": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "validation_data"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
}

training_args = estimator.fit(training_inputs)

# Define training step
step_train = TrainingStep(
    name=f"{pipeline_name}-train",
    step_args=training_args
)

Train the pretrained model

In [None]:
from sagemaker import image_uris, model_uris, script_uris

# Currently, not all the stable diffusion models in jumpstart support finetuning. Thus, we manually select a model
# which supports finetuning.
train_model_id, train_model_version, train_scope = (
    "model-txt2img-stabilityai-stable-diffusion-v2-1-base",
    "*",
    "training",
)

# Tested with ml.g4dn.2xlarge (16GB GPU memory) and ml.g5.2xlarge (24GB GPU memory) instances. Other instances may work as well.
# If ml.g5.2xlarge instance type is available, please change the following instance type to speed up training.
training_instance_type = instance_types.retrieve_default(
    region=None,
    model_id=train_model_id,
    model_version=train_model_version,
    scope=train_scope
)

# Retrieve the docker image
train_image_uri = image_uris.retrieve(
    region=None,
    framework=None,  # automatically inferred from model_id
    model_id=train_model_id,
    model_version=train_model_version,
    image_scope=train_scope,
    instance_type=training_instance_type,
)

# Retrieve the training script. This contains all the necessary files including data processing, model training etc.
train_source_uri = script_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, script_scope=train_scope
)
# Retrieve the pre-trained model tarball to further fine-tune
train_model_uri = model_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, model_scope=train_scope
)

SET TRAINING PARAMS

In [None]:
import sagemaker.metric_definitions
# Sample training data is available in this bucket
training_data_bucket = f"jumpstart-cache-prod-{aws_region}"
training_data_prefix = "training-datasets/dogs_sd_finetuning/"

training_dataset_s3_path = f"s3://{training_data_bucket}/{training_data_prefix}"

output_bucket = sess.default_bucket()
output_prefix = "jumpstart-example-sd-training"

# Retrieve the default metric definitions to emit to CloudWatch Logs\n",
metric_definitions = sagemaker.metric_definitions.retrieve_default(
    model_id=train_model_id, model_version=train_model_version,
)

s3_output_location = f"s3://{output_bucket}/{output_prefix}/output"

In [None]:
from sagemaker import hyperparameters

# Retrieve the default hyper-parameters for fine-tuning the model
hyperparameters = hyperparameters.retrieve_default(
    model_id=train_model_id, model_version=train_model_version
)

# [Optional] Override default hyperparameters with custom values
hyperparameters["max_steps"] = "400"
print(hyperparameters)

In [None]:
from sagemaker.tuner import IntegerParameter
from sagemaker.tuner import ContinuousParameter
from sagemaker.tuner import HyperparameterTuner


use_amt = True

hyperparameter_ranges = {
    "learning_rate": ContinuousParameter(1e-7, 3e-6, "Linear"),
    "max_steps": IntegerParameter(50, 400, "Linear"),
}

In [None]:
from sagemaker.estimator import Estimator
from sagemaker.utils import name_from_base
from sagemaker.tuner import HyperparameterTuner

training_job_name = name_from_base(f"jumpstart-example-{train_model_id}-transfer-learning")

# Create SageMaker Estimator instance
sd_estimator = Estimator(
    role=aws_role,
    image_uri=train_image_uri,
    source_dir=train_source_uri,
    model_uri=train_model_uri,
    entry_point="transfer_learning.py",  # Entry-point file in source_dir and present in train_source_uri.
    instance_count=1,
    instance_type=training_instance_type,
    max_run=360000,
    metric_definitions=metric_definitions,
    hyperparameters=hyperparameters,
    output_path=s3_output_location,
    base_job_name=training_job_name,
)


if use_amt:
    # Let estimator emit fid_score metric to AMT
    sd_estimator.set_hyperparameters(compute_fid="True")
    tuner_parameters = {
        "estimator": sd_estimator,
        "metric_definitions": [{"Name": "fid_score", "Regex": "fid_score=([-+]?\\d\\.?\\d*)"}],
        "objective_metric_name": "fid_score",
        "objective_type": "Minimize",
        "hyperparameter_ranges": hyperparameter_ranges,
        "max_jobs": 3,
        "max_parallel_jobs": 3,
        "strategy": "Bayesian",
        "base_tuning_job_name": training_job_name,
    }

    tuner = HyperparameterTuner(**tuner_parameters)
    tuner.fit({"training": training_dataset_s3_path}, logs=True)
else:
    # Launch a SageMaker Training job by passing s3 path of the training data
    sd_estimator.fit({"training": training_dataset_s3_path}, logs=True)

Notice that we can link one step's output to other steps input by accessing the properties:
```python
# Get output from processing step with key `train`
step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri
```

## Create the second iteration of the Pipeline (updating the definition)

We will update the pipeline adding an input parameter for the training Step and also the training Step itself, resulting in a pipeline with 2 step:

In [None]:
# Add an input parameter to define the training instance type
training_instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.m5.xlarge"
)

In [None]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
    ],
    steps=[step_process, step_train],
    sagemaker_session=sagemaker_session,
)

In [None]:
# Update the pipeline
pipeline.upsert(role_arn=role)

---

<a id="appr_model_reg_step"></a>

# 4. Create approve model and save to model registry steps

In [None]:
from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)

Add new input parameter for the model registration step:

In [None]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
)

Create register model step:

In [None]:
# Model metrics that will be associated with RegisterModel step
'''
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
                "S3Uri"
            ]
        ),
        content_type="application/json",
    )
)
'''

In [None]:
model_package_group_name="CustomerChurnPackageGroup"

# Register model step that will be conditionally executed
step_register = RegisterModel(
    name="CustomerChurnRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    #model_metrics=model_metrics,
)

Create condition step for **accuracy above 0.8**:

In [None]:
# Condition step for evaluating model quality and branching execution
cond_lte = ConditionGreaterThanOrEqualTo(  # You can change the condition here
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="binary_classification_metrics.accuracy.value",  # This should follow the structure of your report_dict defined in the evaluate.py file.
    ),
    right=0.8,  # You can change the threshold here
)
step_cond = ConditionStep(
    name="CustomerChurnAccuracyCond",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[],
)

## Create the forth and final iteration of the Pipeline (updating the definition)

We will update the pipeline the final approve model (contidion) and save model steps, resulting in a pipeline with 5 steps: data prep, training, evaluation, approval, save to registry steps.

In [None]:
pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            input_data,
            processing_instance_type,
            processing_instance_count,
            training_instance_type,
            model_approval_status,
        ],
        steps=[step_process, step_train, step_eval, step_cond],
        sagemaker_session=sagemaker_session,
    )

In [None]:
pipeline.upsert(role_arn=role)

Let's start final execution:

In [None]:
execution = pipeline.start()

<a id='end_creation_pipe'></a>
    
# 5. End of pipeline creation

With the caches all should be faster now.

Let's get the final result of the pipeline. Read evaluation report:

In [None]:
evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
json.loads(evaluation_json)

#### Run again with caches and changing input parameters:

In [None]:
# Obs.: If we want to override the input parameters with other ones:

execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ModelApprovalStatus="Approved", # Would approve automatically
    )
)

In [None]:
# # Obs.: if we wanted to stop pipeline execution:
# execution.stop()

In [None]:
# # Obs.: if we wanted to delete the whole pipeline:
# pipeline.delete()

Let's put the whole pipeline code into a python script:

In [None]:
%%writefile my_labs_solutions/pipeline_definition.py
import os
import json
from time import strftime, gmtime

import sagemaker
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)

from .dataprep_solution import get_dataprep_processor
from .modeling_solution import get_modeling_estimator
from .evaluation_solution import get_evaluation_processor

BASE_DIR = os.path.dirname(os.path.realpath(__file__))

def get_my_solutions_vars():
    vars_path = os.path.join(".", "pipelines", "my_labs_solutions", "my-solution-vars.json")

    with open(vars_path, "rb") as f:
        my_vars = json.loads(f.read())
        
    return my_vars

def get_pipeline(region,
                 role=None,
                 default_bucket=None,
                 model_package_group_name="MLOpsCustomerChurnPackageGroup",  # Choose any name
                 pipeline_name="MLOpsFinalChurnMLPipeline",  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
                 base_job_prefix="CustomerChurn",  # Choose any name
                ) -> Pipeline:
    
    # Get config vars
    my_vars = get_my_solutions_vars()
    bucket = my_vars["bucket"]
    prefix = my_vars["prefix"]
    region = my_vars["region"]
    docker_image_name = my_vars["docker_image_name"]
    s3uri_raw = my_vars["s3uri_raw"]
    s3_dataprep_code_uri = my_vars["s3_dataprep_code_uri"]
    s3_modeling_code_uri = my_vars["s3_modeling_code_uri"]
    train_script_name = my_vars["train_script_name"]
    s3_evaluation_code_uri = my_vars["s3_evaluation_code_uri"]
    role = my_vars["role"]

    sagemaker_session = sagemaker.session.Session()

    # Parameters for data preparation step
    input_data = ParameterString(
        name="InputDataUrl",
        default_value=s3uri_raw # S3 URI where we stored the raw data
    )
    processing_instance_count = ParameterInteger(
        name="ProcessingInstanceCount", default_value=1
    )
    processing_instance_type = ParameterString(
        name="ProcessingInstanceType", default_value="ml.m5.xlarge"
    )

    # Add an input parameter to define the training instance type
    training_instance_type = ParameterString(
        name="TrainingInstanceType", default_value="ml.m5.xlarge"
    )
    model_approval_status = ParameterString(
        name="ModelApprovalStatus",
        default_value="PendingManualApproval",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
    )


    # Cache for 30 minutes
    cache_config = CacheConfig(enable_caching=True, expire_after="T30m")

    sklearn_processor = get_dataprep_processor(processing_instance_type, processing_instance_count, role)

    # Processing step for feature engineering
    step_process = ProcessingStep(
        name="CustomerChurnProcess",  # choose any name
        processor=sklearn_processor,
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(
                output_name="validation", source="/opt/ml/processing/validation"
            ),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ],
        code=s3_dataprep_code_uri,
        job_arguments=["--input-data", input_data],
        cache_config=cache_config
    )


    xgb_train = get_modeling_estimator(bucket,
                                       prefix,
                                       s3_modeling_code_uri, 
                                       docker_image_name,
                                       role,
                                       entry_point_script = train_script_name)


    step_train = TrainingStep(
        name="CustomerChurnTrain",
        estimator=xgb_train,
        inputs={
            "train": TrainingInput(
                        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                            "train"
                        ].S3Output.S3Uri,
                        content_type="text/csv"
                     ),
            "validation": TrainingInput(
                        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                            "validation"
                        ].S3Output.S3Uri,
                        content_type="text/csv"
                     )
        },
        cache_config=cache_config
    )     


    evaluation_report = PropertyFile(
        name="EvaluationReport",
        output_name="evaluation",
        path="evaluation.json",
    )

    script_eval = get_evaluation_processor(docker_image_name, role)

    # Processing step for evaluation
    step_eval = ProcessingStep(
            name="CustomerChurnEval",
            processor=script_eval,
            inputs=[
                ProcessingInput(
                    source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                    destination="/opt/ml/processing/model",
                ),
                ProcessingInput(
                    source=step_process.properties.ProcessingOutputConfig.Outputs[
                        "test"
                    ].S3Output.S3Uri,
                    destination="/opt/ml/processing/test",
                ),
            ],
            outputs=[
                ProcessingOutput(
                    output_name="evaluation", source="/opt/ml/processing/evaluation"
                ),
            ],
            code=s3_evaluation_code_uri,
            property_files=[evaluation_report],
            cache_config=cache_config
    )


    # Model metrics that will be associated with RegisterModel step
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri="{}/evaluation.json".format(
                step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
                    "S3Uri"
                ]
            ),
            content_type="application/json",
        )
    )

    #model_package_group_name="CustomerChurnPackageGroup"

    # Register model step that will be conditionally executed
    step_register = RegisterModel(
        name="CustomerChurnRegisterModel",
        estimator=xgb_train,
        model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.t2.medium", "ml.m5.large"],
        transform_instances=["ml.m5.large"],
        model_package_group_name=model_package_group_name,
        approval_status=model_approval_status,
        model_metrics=model_metrics,
    )


    # Condition step for evaluating model quality and branching execution
    cond_lte = ConditionGreaterThanOrEqualTo(  # You can change the condition here
        left=JsonGet(
            step=step_eval,
            property_file=evaluation_report,
            json_path="binary_classification_metrics.accuracy.value",  # This should follow the structure of your report_dict defined in the evaluate.py file.
        ),
        right=0.8,  # You can change the threshold here
    )
    step_cond = ConditionStep(
        name="CustomerChurnAccuracyCond",
        conditions=[cond_lte],
        if_steps=[step_register],
        else_steps=[],
    )



    # Experiment configs
    create_date = lambda: strftime("%Y-%m-%d-%H-%M-%S", gmtime())

    experiment_name=f"pipeline-customer-churn-prediction-xgboost-{create_date()}"
    trial_name=f"pipeline-framework-trial-{create_date()}"

    pipeline_experiment_config = PipelineExperimentConfig(
        experiment_name = experiment_name,
        trial_name = trial_name
    )


    pipeline = Pipeline(
            name=pipeline_name,
            parameters=[
                input_data,
                processing_instance_type,
                processing_instance_count,
                training_instance_type,
                model_approval_status,
            ],
            steps=[step_process, step_train, step_eval, step_cond],
            sagemaker_session=sagemaker_session,
        )
    
    return pipeline


<a id='SM_Projects'></a>
# Customizing the Build/Train/Deploy MLOps Project Template

SageMaker Projects introduce MLOps templates that automatically provision the underlying resources needed to enable 
CI/CD capabilities for your Machine Learning Development Lifecycle (MLDC). Customers can use a number of built-in 
templates or create your own custom templates.

This workshop we will use one of the **pre-built MLOps templates** to bootstrap your ML project and establish a CI/CD 
pattern from seed code.

### MLOps Template for Build, Train, and Deploy

> Imagine now that you are a data scientist that just joined the company. You need to get access to the ML resources.

To get started with SageMaker Projects, [they must be first enabled in the SageMaker Studio console](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-studio-updates.html). 
This can be done for existing users or while creating new ones:

<img src="media/enable_projects.png">

Within Amazon SageMaker Studio, you can now select “Projects” from a drop-down menu on the “Components and registries” 
tab as shown below:

<img src="media/select_projects.png">

From the projects page you’ll have the option to launch a pre-configured SageMaker MLOps template. Click on `Create project` and we'll select the build, train and deploy template:

<img src="media/create_project.png">

Name the project `ChurnProject`.

> NOTE: Launching this template will kick off a model building pipeline by default and will train a regression model. This will incur a small cost.

Once the project is created from the MLOps template, the following architecture will be deployed:

<img src="media/deep_dive.png">


## Modifying IAM Role for Code Build

Attach AdminRole on Code Build Role

## Modifying the Seed Code for Custom Use Case

After your project has been created the architecture shown above will be deployed and the visualization of the 
Pipeline will be available in the “Pipelines” drop down menu within SageMaker Studio.

In order to modify the seed code from this launched template, we’ll first need to clone the AWS CodeCommit 
repositories to our local SageMaker Studio instance. From the list of projects, select the one that was just 
created. Under the “Repositories” tab you can select the hyperlinks to locally clone the AWS CodeCommit repos:

<img src="media/clone_repos.png">


### Clone the `...modelbuild` repo (click on `clone repo...`)

The SageMaker project template will create this repositories.

In the `...-modelbuild` repository there's the code for preprocessing, training, and evaluating the model. This pre-built template includes another example for a regression model related to the [UCI Abalone dataset](https://archive.ics.uci.edu/ml/datasets/abalone):

<img src="media/repo_directory.png">


**In our case we want to create a pipeline for predicting Churn (previous labs).** We can modify these files in order to solve our own customer churn use-case.

---

### Modifying the code for the Churn problem

This is the sample structure of the Project (Abalone):

<img src="media/repo_directory.png" width="40%">


#### Let's use everything we just built:

In the `...modelbuild` repo:

1. replace `codebuild-buildspec.yml` in your current Studio project (Abalone) with the one found in [modelbuild/codebuild-buildspec.yml](modelbuild/codebuild-buildspec.yml) (Churn)

The final `codebuild-buildspec.yml` should be this one (with the comment at the top 1st line)

<img src="media/buildspec.png" width="60%">


2. go to `pipelines`. Delete the `abalone` directory. 

<img src="media/dir_del.png" width="40%">


3. Cut `my_labs_solutions` directory and paste it to the `...modelbuild/pipelines` repo.

<img src="media/dir_cut.png" width="40%">

<img src="media/dir_paste.png" width="40%">

In the end the `...modelbuild` repo should look like this:

<img src="media/dir_structure.png" width="40%">

## Trigger a new training Pipeline Execution through git commit

By committing these changes to the AWS CodeCommit repository (easily done in SageMaker Studio source control tab), a 
new Pipeline execution will be triggered since there is an EventBridge monitoring for commits.  After a few moments, 
we can monitor the execution by selecting your Pipeline inside of the SageMaker Project.

Go to the directory of the `...modelbuild/pipelines` repo. Click on the git symbol:

<img src="media/git_push.png">

This triggers the pipelines for training. Go to our `“Pipelines”` tab inside of the SageMaker Project. Click on our only pipeline. And you'll see:

<img src="media/execute_pipeline.png">

Select the most recent execution:

<img src="media/dag.png">


## Trigger the ModelDeploy Pipeline

Once the train pipeline is completed, we can go to our `“Model groups”` tab inside of the SageMaker Project and inspect the metadata attached to the model artifacts. If everything looks good, we can manually approve the model:

<img src="media/model_metrics.png">

<img src="media/approve_model.png">

This approval will trigger the ModelDeploy pipeline (in CodePipeline):

<img src="media/execute_pipeline_deploy.png">

After we deploy to a staging environment and run some tests, we will have to **approve the deployment to production** by approving in the `ApproveDeployment` stage:

<img src="media/approve_deploy_prod.png">



Finally, if we go back to Studio, we will see the Production endpoint for real time inference.

<img src="media/endpoints.png">

---