# Adding a Conditional Step to Evaluate the Model Performance in AWS SageMaker Pipelines
## Training Pipeline - Using Bring your own code method
### Usecase- Customer Churn Prediction 
-------------------------------------------------------------------------------------------------------------------------

# Table of Contents

* [1. Data](#1.-Data)
	* [1.1 Importing data](#1.1-Importing-data)
	* [1.2 Data description](#1.2-Data-description)
	* [1.3 EDA](#1.3-EDA)    
* [2. Sagemaker Pipeline](#2.-Sagemaker-Pipeline)
	* [2.1 Architecture](#2.1-Architecture)
	* [2.2 Install predefined Sagemaker libraries](#2.2-Install-predefined-Sagemaker-libraries)
	* [2.3 Import MLOps define functions](#2.3-Import-MLOps-define-functions)
	* [2.4 Define Preprocessing Stage](#2.4-Define-Preprocessing-Stage)
	* [2.5 Define Training stage](#2.5-Define-Training-stage)
	* [2.6 Define the Model Evaluation step](#2.5-Define-the-Model-Evaluation-step)
	* [2.7 Define the Model Register step](#2.6-Define-the-Model-Register-step)
	* [2.7 Define the Condition Step](#2.6-Define-the-Condition-Step)
	* [2.9 Define required parameters for get pipeline](#2.7-Define-required-parameters-for-get-pipeline)   


# 1. Data

## 1.1 Importing data

In [4]:
import pandas as pd
df = pd.read_csv("data/customer_churn.csv")
df.head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


## 1.2 Data description

Each row represents a customer, each column contains customer’s attributes described on the column Metadata. The raw data contains 7043 rows (customers) and 21 columns (features). The “Churn” column is our target. 

- **`CustomerID:`** Customer ID 

- **`Gender:`** Whether the customer is a male or a female 

- **`SeniorCitizen:`** Whether the customer is a senior citizen or not (1, 0) 

- **`Partner:`** Whether the customer has a partner or not (Yes, No) 

- **`Dependents:`** Whether the customer has dependents or not (Yes, No) 

- **`tenure:`** Number of months the customer has stayed with the company 

- **`PhoneService:`** Whether the customer has a phone service or not (Yes, No) 

- **`MultipleLines:`** Whether the customer has multiple lines or not (Yes, No, No phone service) 

- **`InternetService:`** Customer’s internet service provider (DSL, Fiber optic, No) 

- **`OnlineSecurity:`** Whether the customer has online security or not (Yes, No, No internet service) 

- **`OnlineBackup:`** Whether the customer has online backup or not (Yes, No, No internet service)

- **`DeviceProtection:`** Whether the customer has device protection or not (Yes, No, No internet service)

- **`TechSupport:`** Whether the customer has tech support or not (Yes, No, No internet service)

- **`StreamingTV:`** Whether the customer has streaming TV or not (Yes, No, No internet service)

- **`StreamingMovies:`** Whether the customer has streaming movies or not (Yes, No, No internet service)

- **`Contract:`** The contract term of the customer (Month-to-month, One year, Two year)

- **`PaperlessBilling:`** Whether the customer has paperless billing or not (Yes, No)

- **`PaymentMethod:`** The customer’s payment method (Electronic check, Mailed check, Bank transfer (automatic), Credit card (automatic))

- **`MonthlyCharges:`** The amount charged to the customer monthly

- **`TotalCharges:`** The total amount charged to the customer

- **`Churn:`** Whether the customer churned or not (Yes or No)

### 1.3 EDA

If you need to do further analysis about the data, please follow the Auto EDA library (Pandas Profiling)
```python
pip install pandas-profiling

#importing required packages
import pandas as pd
import pandas_profiling
import numpy as np

#descriptive statistics
pandas_profiling.ProfileReport(df)
```

# 2. Sagemaker Pipeline

SageMaker Pipelines supports the following activities, which are demonstrated in this notebook:

- Pipelines - A DAG of steps and conditions to orchestrate SageMaker jobs and resource creation.
- Processing job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.
- Training job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
- Register model steps - A step that creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.
- Create model steps - A step that creates a model for use in transform steps or later publication as an endpoint.
- Transform job steps - A batch transform to preprocess datasets to remove noise or bias that interferes with training or inference from a dataset, get inferences from large datasets, and run inference when a persistent endpoint is not needed.
- Post processing - (Optional) A step that filtering the final predicted output base : In here we don't include that step into our pipeline
- Parametrized Pipeline executions - Enables variation in pipeline executions according to specified parameters.

### 2.1 Architecture

This **training** pipeline contains preprocess, training and model register steps.

![architecture](images/eval11.PNG)

Lets beginning the code

![workflow](images/eval1.PNG)

## 2.2 Install predefined Sagemaker libraries


Initailly we have to install AWS predefined Sagemaker libraries.

In [12]:
#!/usr/bin/env python
# coding: utf-8

# In[1]:

import os
import pytz
from datetime import datetime

import boto3
import sagemaker
import sagemaker.session

from sagemaker.transformer import Transformer
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput,TransformInput,CreateModelInput
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.network import NetworkConfig
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
    TransformStep,
    CreateModelStep
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model import Model


sess = sagemaker.Session()

![workflowimage](images/eval2.PNG)

## 2.3 Import Other define functions


Next step is start session and in this process we are defining our <b> AWS region, sagemaker client, boto 3 session</b> and <b>default s3 bucket </b>.

In [13]:
def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.
    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )

In [7]:
default_bucket = sess.default_bucket()
account_id = boto3.client("sts").get_caller_identity().get("Account")
region = boto3.session.Session().region_name

In [None]:
## Store this dataset in the s3 bucket
df.to_csv(f"s3://{default_bucket}/customer_churn/inference/training_input_dataset/telco_cutomer_churn.csv", header = False)

Enter the project name for import config files

![workflowimage](images/eval3.PNG)

## 2.4 Define Preprocessing Stage

This is the script used in preprocessing 

In [33]:
!pygmentize customer_churn_training_preprocessing/preprocessing.py

[34mimport[39;49;00m [04m[36mboto3[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m
[34mimport[39;49;00m [04m[36margparse[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36mnumpy[39;49;00m [34mas[39;49;00m [04m[36mnp[39;49;00m
[34mfrom[39;49;00m [04m[36msklearn[39;49;00m[04m[36m.[39;49;00m[04m[36mpreprocessing[39;49;00m [34mimport[39;49;00m MinMaxScaler


[34mdef[39;49;00m [32mchange_format[39;49;00m(df):
    df[[33m'[39;49;00m[33mTotalCharges[39;49;00m[33m'[39;49;00m] = pd.to_numeric(df.TotalCharges, errors=[33m'[39;49;00m[33mcoerce[39;49;00m[33m'[39;49;00m)
    
    [34mreturn[39;49;00m df

[34mdef[39;49;00m [32mmissing_value[39;49;00m(df):
    [36mprint[39;49;00m([33m"[39;49;00m[33mcount of missing values: (before treatment)[39;49;00m[33m"[39;49;00m, df.isnull().sum())
    
    df[[33m'[39;49;00m[33mTotalCharges[39;49;00m[33m'

This is the code for run the **preprocessing** part. You can add more inputs or outputs according to your requirement. Please refer the guideline document for more details.

In [10]:
def get_pipeline(
    region,
    subnets,
    security_group_ids,
    role=None,
    default_bucket=None,
    model_package_group_name="CustomerChurnModelPackageGroup",  # Choose any name
    pipeline_name="customer-churn-prediction-training-pipeline",  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
    base_job_prefix="cutomer-churn-prediction-training", # Choose any name
):
    """Gets a SageMaker ML Pipeline instance working with on CustomerChurn data.
    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        an instance of a pipeline
    """
    #data versioning control using date
    srilanka_tz = pytz.timezone('Asia/Colombo')
    s3 = boto3.client('s3')
    date_folder = datetime.now(srilanka_tz).strftime("%Y-%m-%d")
    
    #working with input data path
    input_data =  f"s3://{default_bucket}/customer_churn/training_input_dataset/telco_cutomer_churn.csv"
    
    #working with output data path   
    preprocessed_output1 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output1/"
    preprocessed_output2 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output2/"
    
    
    sagemaker_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sagemaker_session)
    account_id = boto3.client("sts").get_caller_identity().get("Account")
    region = boto3.session.Session().region_name

    # Parameters for pipeline execution
    model_path = ParameterString(
        name="ModelPath",
        default_value=f"s3://{default_bucket}/customer_churn/training/{date_folder}/model/xgboost/", 
    )
    
    
    model_approval_status = ParameterString(
        name="ModelApprovalStatus",
        default_value="PendingManualApproval",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
    )

    ####### --------------------- PREPROCESSING --------------------------------------------------------------------

    ecr_repository = "customer-churn-prediction-preprocessing-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    preprocessing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
        
    script_processor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = "ml.m5.large",
         #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Preprocessing'}],
         network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )
    
    step_preprocess = ProcessingStep(
        name= f"{base_job_prefix}-preprocessing",
        processor= script_processor, 
        code= "customer_churn_training_preprocessing/preprocessing.py",
        inputs= [ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
               ],
        outputs= [
            ProcessingOutput(output_name="output1", destination=preprocessed_output1, source="/opt/ml/processing/output1"),
            ProcessingOutput(output_name="output2", destination=preprocessed_output2,  source="/opt/ml/processing/output2"),       
        ]
    )
    # ========================================= PIPELINE ORCHESTRATION ================================================
    
    # Pipeline instance
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
        ],
        steps=[
            step_preprocess,
              ],
        sagemaker_session=sagemaker_session,
    )
    return pipeline


![workflowimage](images/eval4.PNG)

![workflowimage](images/eval5.PNG)

## 2.5 Define Training stage

We used this script for model training stage.

In [40]:
!pygmentize customer_churn_training/model/train_without.py

[37m#Import the neccessary libaries in here[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m
[34mfrom[39;49;00m [04m[36mxgboost[39;49;00m [34mimport[39;49;00m XGBClassifier,plot_importance
[37m#from imblearn.over_sampling import SMOTE[39;49;00m
[37m#from imblearn.combine import SMOTETomek # doctest: +NORMALIZE_WHITESPACE[39;49;00m
[34mfrom[39;49;00m [04m[36msklearn[39;49;00m[04m[36m.[39;49;00m[04m[36mmodel_selection[39;49;00m [34mimport[39;49;00m train_test_split
[34mfrom[39;49;00m [04m[36msklearn[39;49;00m[04m[36m.[39;49;00m[04m[36mmetrics[39;49;00m [34mimport[39;49;00m precision_recall_fscore_support [34mas[39;49;00m score
[34mfrom[39;49;00m [04m[36msklearn[39;49;00m[04m[36m.[39;49;00m[04m[36mmetrics[39;49;00m [34mimport[39;49;00m accuracy_score, precision_score, recall_score, auc,roc_curve,r2_score,confusion_matrix,roc_auc_score,

Please execute bellow code to create the pipeline. This pipeline contains **preprocessing and training** steps together.

In [13]:
def get_pipeline(
    region,
    subnets,
    security_group_ids,
    role=None,
    default_bucket=None,
    model_package_group_name="CustomerChurnModelPackageGroup",  # Choose any name
    pipeline_name="customer-churn-prediction-training-pipeline",  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
    base_job_prefix="customer-churn-prediction-training", # Choose any name
):
    """Gets a SageMaker ML Pipeline instance working with on CustomerChurn data.
    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        an instance of a pipeline
    """
    #data versioning control using date
    srilanka_tz = pytz.timezone('Asia/Colombo')
    s3 = boto3.client('s3')
    date_folder = datetime.now(srilanka_tz).strftime("%Y-%m-%d")
    
    #working with input data path
    input_data =  f"s3://{default_bucket}/customer_churn/training_input_dataset/telco_cutomer_churn.csv"
    
    #working with output data path   
    preprocessed_output1 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output1/"
    preprocessed_output2 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output2/"
    
    
    sagemaker_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sagemaker_session)
    account_id = boto3.client("sts").get_caller_identity().get("Account")
    region = boto3.session.Session().region_name

    # Parameters for pipeline execution
    model_path = ParameterString(
        name="ModelPath",
        default_value=f"s3://{default_bucket}/customer_churn/training/{date_folder}/model/xgboost/", 
    )
    
    
    model_approval_status = ParameterString(
        name="ModelApprovalStatus",
        default_value="PendingManualApproval",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
    )

    ####### --------------------- PREPROCESSING --------------------------------------------------------------------

    ecr_repository = "customer-churn-prediction-preprocessing-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    preprocessing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
        
    script_processor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = "ml.m5.large",
         #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Preprocessing'}],
         network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )
    
    step_preprocess = ProcessingStep(
        name= f"{base_job_prefix}-preprocessing",
        processor= script_processor, 
        code= "customer_churn_training_preprocessing/preprocessing.py",
        inputs= [ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
               ],
        outputs= [
            ProcessingOutput(output_name="output1", destination=preprocessed_output1, source="/opt/ml/processing/output1"),
            ProcessingOutput(output_name="output2", destination=preprocessed_output2,  source="/opt/ml/processing/output2"),       
        ]
    )
    
    ###### --------------------- TRAINING --------------------------------------------------------------------
    
    # Training step for generating model artifacts
    ecr_repository_est = "customer-churn-prediction-training-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    recommender_image_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository_est + tag
    )
    
    estimator = Estimator(image_uri=recommender_image_uri,
                      role=role,
                      sagemaker_session=sess,                                  # Technical object
                      output_path=model_path,
                      base_job_name=f'{base_job_prefix}-training-job',
                      input_mode='File',                                       # Copy the dataset and then train    
                      train_instance_count=1,
                      train_instance_type= "ml.m5.4xlarge",
                      debugger_hook_config=False,
                      disable_profiler = True,
                      metric_definitions=[
                            {'Name': 'roc_auc_score:' , 'Regex': 'roc_auc_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'Precision' , 'Regex': 'precision:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'recall_score' , 'Regex': 'recall_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'f1_score' , 'Regex': 'f1_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'accuracy_score' , 'Regex': 'accuracy_score:([-+]?[0-9]*\.?[0-9]+)'},
                      ],
                      #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Training'}],
                      subnets = subnets.split(':'),
                      security_group_ids = security_group_ids.split(':')
                         )

    # start training
    step_train = TrainingStep(
        name= f"{base_job_prefix}-training",
        estimator= estimator,
        inputs = {
            "input1": TrainingInput(
                s3_data= step_preprocess.properties.ProcessingOutputConfig.Outputs["output1"].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "input2": TrainingInput(
                s3_data= step_preprocess.properties.ProcessingOutputConfig.Outputs["output2"].S3Output.S3Uri,
               content_type="text/csv",
            ),
        }
    )
    

    # ========================================= PIPELINE ORCHESTRATION ================================================
    
    # Pipeline instance
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            model_path,
            model_approval_status
        ],
        steps=[
            step_preprocess,
            step_train,
              ],
        sagemaker_session=sagemaker_session,
    )
    return pipeline


![workflowimage](images/eval6.PNG)

## 2.6 Define the Model Evaluation step

In this step we are going to evaluate the model performace and we used accuracy, precision, f1 score and recall values to evaluate it. Later we will use these matrices for another script.

In [None]:
import os
import pickle
import boto3
import gzip
from io import BytesIO
#import joblib
import sagemaker
import sagemaker.session

from sagemaker.transformer import Transformer
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput,TransformInput,CreateModelInput
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
    TransformStep,
    CreateModelStep
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model import Model
from sagemaker.network import NetworkConfig

from datetime import datetime
from dateutil import tz
import json

sess = sagemaker.Session()

from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import JsonGet

def get_pipeline(
    region,
    subnets,
    security_group_ids,
    role=None,
    default_bucket=None,
    model_package_group_name="CustomerChurnModelPackageGroup",  # Choose any name
    pipeline_name="customer-churn-prediction-training-pipeline",  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
    base_job_prefix="customer-churn-prediction-training", # Choose any name
):
    """Gets a SageMaker ML Pipeline instance working with on CustomerChurn data.
    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        an instance of a pipeline
    """
    #data versioning control using date
    srilanka_tz = pytz.timezone('Asia/Colombo')
    s3 = boto3.client('s3')
    date_folder = datetime.now(srilanka_tz).strftime("%Y-%m-%d")
    
    #working with input data path
    input_data =  f"s3://{default_bucket}/customer_churn/training_input_dataset/telco_cutomer_churn.csv"
    
    #working with output data path   
    preprocessed_output1 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output1/"
    preprocessed_output2 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output2/"
    preprocessed_output3 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output3/"
    preprocessed_output4 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output4/"
    
    sagemaker_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sagemaker_session)
    account_id = boto3.client("sts").get_caller_identity().get("Account")
    region = boto3.session.Session().region_name

    # Parameters for pipeline execution
    model_path = ParameterString(
        name="ModelPath",
        default_value=f"s3://{default_bucket}/customer_churn/training/{date_folder}/model/xgboost/", 
    )
    
    
    model_approval_status = ParameterString(
        name="ModelApprovalStatus",
        default_value="PendingManualApproval",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
    )

    ####### --------------------- PREPROCESSING --------------------------------------------------------------------

    ecr_repository = "customer-churn-prediction-preprocessing-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    preprocessing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
        
    script_processor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = "ml.m5.large",
         #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Preprocessing'}],
         network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )
    
    step_preprocess = ProcessingStep(
        name= f"{base_job_prefix}-preprocessing",
        processor= script_processor, 
        code= "customer_churn_training_preprocessing/preprocessing.py",
        inputs= [ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
               ],
        outputs= [
            ProcessingOutput(output_name="output1", destination=preprocessed_output1, source="/opt/ml/processing/output1"),
            ProcessingOutput(output_name="output2", destination=preprocessed_output2,  source="/opt/ml/processing/output2"),
            ProcessingOutput(output_name="X_val", destination=preprocessed_output3, source="/opt/ml/processing/X_val"),
            ProcessingOutput(output_name="y_val", destination=preprocessed_output4, source="/opt/ml/processing/y_val"),
        ]
    )
    
    ###### --------------------- TRAINING --------------------------------------------------------------------
    
    # Training step for generating model artifacts
    ecr_repository_est = "customer-churn-prediction-training-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    recommender_image_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository_est + tag
    )
    
    estimator = Estimator(image_uri=recommender_image_uri,
                      role=role,
                      sagemaker_session=sess,                                  # Technical object
                      output_path=model_path,
                      base_job_name=f'{base_job_prefix}-training-job',
                      input_mode='File',                                       # Copy the dataset and then train    
                      train_instance_count=1,
                      train_instance_type= "ml.m5.4xlarge",
                      debugger_hook_config=False,
                      disable_profiler = True,
                      metric_definitions=[
                            {'Name': 'roc_auc_score:' , 'Regex': 'roc_auc_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'Precision' , 'Regex': 'precision:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'recall_score' , 'Regex': 'recall_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'f1_score' , 'Regex': 'f1_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'accuracy_score' , 'Regex': 'accuracy_score:([-+]?[0-9]*\.?[0-9]+)'},
                      ],
                      #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Training'}],
                      subnets = subnets.split(':'),
                      security_group_ids = security_group_ids.split(':')
                         )

    # start training
    step_train = TrainingStep(
        name= f"{base_job_prefix}-training",
        estimator= estimator,
        inputs = {
            "input1": TrainingInput(
                s3_data= step_preprocess.properties.ProcessingOutputConfig.Outputs["output1"].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "input2": TrainingInput(
                s3_data= step_preprocess.properties.ProcessingOutputConfig.Outputs["output2"].S3Output.S3Uri,
               content_type="text/csv",
            ),
        }
    )
    
    # Processing step for evaluation
    script_eval = ScriptProcessor(
        image_uri="120582440665.dkr.ecr.ap-southeast-1.amazonaws.com/customer-churn-prediction-evaluation-image:latest",
        command=["python3"],
        instance_type="ml.m5.2xlarge",
        instance_count=1,
        base_job_name=f"{base_job_prefix}-eval",
        sagemaker_session=sagemaker_session,
        role=role,
    )
    
    evaluation_report = PropertyFile(
        name="EvaluationReport",
        output_name="evaluation",
        path="evaluation.json",
    )
    
    step_eval = ProcessingStep(
        name=f"{base_job_prefix}-eval-job",
        processor=script_eval,
        inputs=[
            ProcessingInput(
                source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model",
            ),
            ProcessingInput(
                source=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                    "X_val"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/X_val",
            ),
             ProcessingInput(
                source=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                    "y_val"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/y_val",
            ),
        ],
        outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ],
        #code=os.path.join(BASE_DIR, "evaluate.py"),
        code = "customer_churn_training_evaluation/evaluation.py",
        property_files=[evaluation_report],
    )

    # Register model step that will be conditionally executed
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri="{}/evaluation.json".format(
                step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
            ),
            content_type="application/json",
        )
    )
    
    
    # ========================================= PIPELINE ORCHESTRATION ================================================
    
    # Pipeline instance
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            model_path,
            model_approval_status
        ],
        steps=[
            step_preprocess,
            step_train,
            step_eval,
              ],
        sagemaker_session=sagemaker_session,
    )
    return pipeline


![workflowimage](images/eval7.PNG)

## 2.7 Define the Model Register step



Register your model in model registery using bellow code

Please execute bellow code to create the pipeline.This pipeline contains **preprocessing,training, evaluating and model register** steps together

In [14]:
import os
import pickle
import boto3
import gzip
from io import BytesIO
#import joblib
import sagemaker
import sagemaker.session

from sagemaker.transformer import Transformer
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput,TransformInput,CreateModelInput
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
    TransformStep,
    CreateModelStep
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model import Model
from sagemaker.network import NetworkConfig

from datetime import datetime
from dateutil import tz
import json

sess = sagemaker.Session()

In [15]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import JsonGet

In [16]:
def get_pipeline(
    region,
    subnets,
    security_group_ids,
    role=None,
    default_bucket=None,
    model_package_group_name="CustomerChurnModelPackageGroup",  # Choose any name
    pipeline_name="customer-churn-prediction-training-pipeline",  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
    base_job_prefix="customer-churn-prediction-training", # Choose any name
):
    """Gets a SageMaker ML Pipeline instance working with on CustomerChurn data.
    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        an instance of a pipeline
    """
    #data versioning control using date
    srilanka_tz = pytz.timezone('Asia/Colombo')
    s3 = boto3.client('s3')
    date_folder = datetime.now(srilanka_tz).strftime("%Y-%m-%d")
    
    #working with input data path
    input_data =  f"s3://{default_bucket}/customer_churn/training_input_dataset/telco_cutomer_churn.csv"
    
    #working with output data path   
    preprocessed_output1 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output1/"
    preprocessed_output2 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output2/"
    preprocessed_output3 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output3/"
    preprocessed_output4 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output4/"
    
    sagemaker_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sagemaker_session)
    account_id = boto3.client("sts").get_caller_identity().get("Account")
    region = boto3.session.Session().region_name

    # Parameters for pipeline execution
    model_path = ParameterString(
        name="ModelPath",
        default_value=f"s3://{default_bucket}/customer_churn/training/{date_folder}/model/xgboost/", 
    )
    
    
    model_approval_status = ParameterString(
        name="ModelApprovalStatus",
        default_value="PendingManualApproval",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
    )

    ####### --------------------- PREPROCESSING --------------------------------------------------------------------

    ecr_repository = "customer-churn-prediction-preprocessing-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    preprocessing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
        
    script_processor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = "ml.m5.large",
         #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Preprocessing'}],
         network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )
    
    step_preprocess = ProcessingStep(
        name= f"{base_job_prefix}-preprocessing",
        processor= script_processor, 
        code= "customer_churn_training_preprocessing/preprocessing.py",
        inputs= [ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
               ],
        outputs= [
            ProcessingOutput(output_name="output1", destination=preprocessed_output1, source="/opt/ml/processing/output1"),
            ProcessingOutput(output_name="output2", destination=preprocessed_output2,  source="/opt/ml/processing/output2"),
            ProcessingOutput(output_name="X_val", destination=preprocessed_output3, source="/opt/ml/processing/X_val"),
            ProcessingOutput(output_name="y_val", destination=preprocessed_output4, source="/opt/ml/processing/y_val"),
        ]
    )
    
    ###### --------------------- TRAINING --------------------------------------------------------------------
    
    # Training step for generating model artifacts
    ecr_repository_est = "customer-churn-prediction-training-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    recommender_image_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository_est + tag
    )
    
    estimator = Estimator(image_uri=recommender_image_uri,
                      role=role,
                      sagemaker_session=sess,                                  # Technical object
                      output_path=model_path,
                      base_job_name=f'{base_job_prefix}-training-job',
                      input_mode='File',                                       # Copy the dataset and then train    
                      train_instance_count=1,
                      train_instance_type= "ml.m5.4xlarge",
                      debugger_hook_config=False,
                      disable_profiler = True,
                      metric_definitions=[
                            {'Name': 'roc_auc_score:' , 'Regex': 'roc_auc_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'Precision' , 'Regex': 'precision:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'recall_score' , 'Regex': 'recall_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'f1_score' , 'Regex': 'f1_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'accuracy_score' , 'Regex': 'accuracy_score:([-+]?[0-9]*\.?[0-9]+)'},
                      ],
                      #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Training'}],
                      subnets = subnets.split(':'),
                      security_group_ids = security_group_ids.split(':')
                         )

    # start training
    step_train = TrainingStep(
        name= f"{base_job_prefix}-training",
        estimator= estimator,
        inputs = {
            "input1": TrainingInput(
                s3_data= step_preprocess.properties.ProcessingOutputConfig.Outputs["output1"].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "input2": TrainingInput(
                s3_data= step_preprocess.properties.ProcessingOutputConfig.Outputs["output2"].S3Output.S3Uri,
               content_type="text/csv",
            ),
        }
    )
    
    # Processing step for evaluation
    script_eval = ScriptProcessor(
        image_uri="120582440665.dkr.ecr.ap-southeast-1.amazonaws.com/customer-churn-prediction-evaluation-image:latest",
        command=["python3"],
        instance_type="ml.m5.2xlarge",
        instance_count=1,
        base_job_name=f"{base_job_prefix}-eval",
        sagemaker_session=sagemaker_session,
        role=role,
    )
    
    evaluation_report = PropertyFile(
        name="EvaluationReport",
        output_name="evaluation",
        path="evaluation.json",
    )
    
    step_eval = ProcessingStep(
        name=f"{base_job_prefix}-eval-job",
        processor=script_eval,
        inputs=[
            ProcessingInput(
                source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model",
            ),
            ProcessingInput(
                source=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                    "X_val"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/X_val",
            ),
             ProcessingInput(
                source=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                    "y_val"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/y_val",
            ),
        ],
        outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ],
        #code=os.path.join(BASE_DIR, "evaluate.py"),
        code = "customer_churn_training_evaluation/evaluation.py",
        property_files=[evaluation_report],
    )

    # Register model step that will be conditionally executed
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri="{}/evaluation.json".format(
                step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
            ),
            content_type="application/json",
        )
    )
    

    ###### --------------------- Model Registry ----------------------------------------------------------------
    
    #registering the model

    step_register = RegisterModel(
        name= f"{base_job_prefix}-registermodel",
        estimator= estimator,
        model_data= step_train.properties.ModelArtifacts.S3ModelArtifacts,
        content_types= ["text/csv"],
        response_types= ["text/csv"],
        inference_instances= ["ml.t2.medium", "ml.m5.xlarge"],
        transform_instances= ["ml.m5.xlarge"],
        model_package_group_name=model_package_group_name,
        approval_status=model_approval_status,
        model_metrics=model_metrics,
        display_name="XG boost model- LR 0.05",
        description="XG boost model - only changed the learning rate to 0.05",
        #tags=None,
    )
    
    # ========================================= PIPELINE ORCHESTRATION ================================================
    
    # Pipeline instance
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            model_path,
            model_approval_status
        ],
        steps=[
            step_preprocess,
            step_train,
            step_eval,
            step_register
              ],
        sagemaker_session=sagemaker_session,
    )
    return pipeline


![workflowimage](images/eval8.PNG)

## 2.8 Define the Condition Step

A condition step requires a list of conditions, a list of steps to run if the condition evaluates to true, and a list of steps to run if the condition evaluates to false.
<p>Now we are going to add that step to the pipeline and in here we are going to check the model performance exceed the accuracy level 0.8 or not. If the accuracy level exceed that treshold value, it will automatically going to the defined conditions. In here it's storing the model artifact in the model registry step. The following example shows how to create a Condition step definition. </p>

In [None]:
def get_pipeline(
    region,
    subnets,
    security_group_ids,
    role=None,
    default_bucket=None,
    model_package_group_name="CustomerChurnModelPackageGroup",  # Choose any name
    pipeline_name="customer-churn-prediction-training-pipeline",  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
    base_job_prefix="customer-churn-prediction-training", # Choose any name
):
    """Gets a SageMaker ML Pipeline instance working with on CustomerChurn data.
    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        an instance of a pipeline
    """
    #data versioning control using date
    srilanka_tz = pytz.timezone('Asia/Colombo')
    s3 = boto3.client('s3')
    date_folder = datetime.now(srilanka_tz).strftime("%Y-%m-%d")
    
    #working with input data path
    input_data =  f"s3://{default_bucket}/customer_churn/training_input_dataset/telco_cutomer_churn.csv"
    
    #working with output data path   
    preprocessed_output1 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output1/"
    preprocessed_output2 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output2/"
    preprocessed_output3 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output3/"
    preprocessed_output4 = f"s3://{default_bucket}/customer_churn/training/{date_folder}/output4/"
    
    sagemaker_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sagemaker_session)
    account_id = boto3.client("sts").get_caller_identity().get("Account")
    region = boto3.session.Session().region_name

    # Parameters for pipeline execution
    model_path = ParameterString(
        name="ModelPath",
        default_value=f"s3://{default_bucket}/customer_churn/training/{date_folder}/model/xgboost/", 
    )
    
    
    model_approval_status = ParameterString(
        name="ModelApprovalStatus",
        default_value="PendingManualApproval",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
    )

    ####### --------------------- PREPROCESSING --------------------------------------------------------------------

    ecr_repository = "customer-churn-prediction-preprocessing-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    preprocessing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
        
    script_processor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = "ml.m5.large",
         #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Preprocessing'}],
         network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )
    
    step_preprocess = ProcessingStep(
        name= f"{base_job_prefix}-preprocessing",
        processor= script_processor, 
        code= "customer_churn_training_preprocessing/preprocessing.py",
        inputs= [ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
               ],
        outputs= [
            ProcessingOutput(output_name="output1", destination=preprocessed_output1, source="/opt/ml/processing/output1"),
            ProcessingOutput(output_name="output2", destination=preprocessed_output2,  source="/opt/ml/processing/output2"),
            ProcessingOutput(output_name="X_val", destination=preprocessed_output3, source="/opt/ml/processing/X_val"),
            ProcessingOutput(output_name="y_val", destination=preprocessed_output4, source="/opt/ml/processing/y_val"),
        ]
    )
    
    ###### --------------------- TRAINING --------------------------------------------------------------------
    
    # Training step for generating model artifacts
    ecr_repository_est = "customer-churn-prediction-training-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    recommender_image_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository_est + tag
    )
    
    estimator = Estimator(image_uri=recommender_image_uri,
                      role=role,
                      sagemaker_session=sess,                                  # Technical object
                      output_path=model_path,
                      base_job_name=f'{base_job_prefix}-training-job',
                      input_mode='File',                                       # Copy the dataset and then train    
                      train_instance_count=1,
                      train_instance_type= "ml.m5.4xlarge",
                      debugger_hook_config=False,
                      disable_profiler = True,
                      metric_definitions=[
                            {'Name': 'roc_auc_score:' , 'Regex': 'roc_auc_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'Precision' , 'Regex': 'precision:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'recall_score' , 'Regex': 'recall_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'f1_score' , 'Regex': 'f1_score:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'accuracy_score' , 'Regex': 'accuracy_score:([-+]?[0-9]*\.?[0-9]+)'},
                      ],
                      #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Training'}],
                      subnets = subnets.split(':'),
                      security_group_ids = security_group_ids.split(':')
                         )

    # start training
    step_train = TrainingStep(
        name= f"{base_job_prefix}-training",
        estimator= estimator,
        inputs = {
            "input1": TrainingInput(
                s3_data= step_preprocess.properties.ProcessingOutputConfig.Outputs["output1"].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "input2": TrainingInput(
                s3_data= step_preprocess.properties.ProcessingOutputConfig.Outputs["output2"].S3Output.S3Uri,
               content_type="text/csv",
            ),
        }
    )
    
    # Processing step for evaluation
    script_eval = ScriptProcessor(
        image_uri="120582440665.dkr.ecr.ap-southeast-1.amazonaws.com/customer-churn-prediction-evaluation-image:latest",
        command=["python3"],
        instance_type="ml.m5.2xlarge",
        instance_count=1,
        base_job_name=f"{base_job_prefix}-eval",
        sagemaker_session=sagemaker_session,
        role=role,
    )
    
    evaluation_report = PropertyFile(
        name="EvaluationReport",
        output_name="evaluation",
        path="evaluation.json",
    )
    
    step_eval = ProcessingStep(
        name=f"{base_job_prefix}-eval-job",
        processor=script_eval,
        inputs=[
            ProcessingInput(
                source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model",
            ),
            ProcessingInput(
                source=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                    "X_val"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/X_val",
            ),
             ProcessingInput(
                source=step_preprocess.properties.ProcessingOutputConfig.Outputs[
                    "y_val"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/y_val",
            ),
        ],
        outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ],
        #code=os.path.join(BASE_DIR, "evaluate.py"),
        code = "customer_churn_training_evaluation/evaluation.py",
        property_files=[evaluation_report],
    )

    # Register model step that will be conditionally executed
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri="{}/evaluation.json".format(
                step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
            ),
            content_type="application/json",
        )
    )
    

    ###### --------------------- Model Registry ----------------------------------------------------------------
    
    #registering the model

    step_register = RegisterModel(
        name= f"{base_job_prefix}-registermodel",
        estimator= estimator,
        model_data= step_train.properties.ModelArtifacts.S3ModelArtifacts,
        content_types= ["text/csv"],
        response_types= ["text/csv"],
        inference_instances= ["ml.t2.medium", "ml.m5.xlarge"],
        transform_instances= ["ml.m5.xlarge"],
        model_package_group_name=model_package_group_name,
        approval_status=model_approval_status,
        model_metrics=model_metrics,
        display_name="XG boost model- LR 0.05",
        description="XG boost model - only changed the learning rate to 0.05",
        #tags=None,
    )
   
    # Condition step for evaluating model quality and branching execution
    cond_lte = ConditionGreaterThanOrEqualTo(  # You can change the condition here
        left=JsonGet(
            step_name=step_eval.name,
            property_file=evaluation_report,
            json_path="binary_classification_metrics.accuracy.value",  # This should follow the structure of your report_dict defined in the evaluate.py file.
        ),
        right=0.8,  # You can change the threshold here
    )
    step_cond = ConditionStep(
        name=f"{base_job_prefix}-accuracycond",
        conditions=[cond_lte],
        if_steps=[step_register],
        else_steps=[],
    )
    
    # ========================================= PIPELINE ORCHESTRATION ================================================
    
    # Pipeline instance
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            model_path,
            model_approval_status
        ],
        steps=[
            step_preprocess,
            step_train,
            step_eval,
            #step_register
            step_cond
              ],
        sagemaker_session=sagemaker_session,
    )
    return pipeline


![workflowimage](images/eval9.PNG)

Addtional if we need we can use stop/pipeline failed condition by adding below command to the pipeline.


In [None]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import ParameterInteger

mse_threshold_param = ParameterInteger(name="accuracyThreshold", default_value=0.8)
step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(
        on=" ", values=["Execution failed due to Accuracy >", mse_threshold_param]
    ),
)

![workflowimage](images/eval10.PNG)

## 2.9 Define required parameters for get pipeline

Define subnets and parameters for get_pipeline function here

In [None]:
subnets = 'your_subnet'
sg = 'your security group'

In [58]:
role='your IAM role'
#role=None
default_bucket=default_bucket
pipeline_def = get_pipeline(region, 
                            subnets, 
                            sg, 
                            role,
                            default_bucket)

train_instance_count has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
train_instance_type has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [None]:
pipeline_def.upsert(role_arn=role)

In [19]:
execution = pipeline_def.start()

![workflowimage](images/eval12.PNG)

![workflowimage](images/eval11.PNG)

#### To see the execution ID





In [None]:
execution.describe()

In [None]:
execution.list_steps()

## Lineage
Review the lineage of the artifacts generated by the pipeline.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)