In [None]:
%pip install --upgrade -q pip

In [None]:
%pip install -q lightgbm==3.3.5 pandas==1.0.5

In [None]:
import boto3
import sagemaker
from sagemaker.deserializers import CSVDeserializer
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.estimator import Estimator
from sagemaker.image_uris import retrieve
from sagemaker.inputs import TrainingInput
from sagemaker.lambda_helper import Lambda
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
    FileSource
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor
)
from sagemaker.serializers import CSVSerializer
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.tuner import (
    IntegerParameter,
    ContinuousParameter,
    HyperparameterTuner
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig,
    ClarifyCheckStep,
    ModelExplainabilityCheckConfig
)
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.lambda_step import (
    LambdaStep,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterBoolean
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import (
    ProcessingStep,
    CreateModelStep,
    TuningStep
)

In [None]:
sess = sagemaker.Session()
write_bucket = sess.default_bucket()
write_prefix = "census-income-pipeline"

region = sess.boto_region_name
s3_client = boto3.client("s3", region_name=region)
sm_client = boto3.client("sagemaker", region_name=region)
sm_runtime_client = boto3.client("sagemaker-runtime")

sagemaker_role = sagemaker.get_execution_role()


read_bucket = "avw-sagemaker-data"
read_prefix = "census-income" 

raw_data_key = f"s3://{read_bucket}/{read_prefix}"

processed_data_key = f"{write_prefix}/processed"

train_data_key = f"{write_prefix}/train"

validation_data_key = f"{write_prefix}/validation"

test_data_key = f"{write_prefix}/test"


census_income_data_uri = f"{raw_data_key}/census-income.csv"
output_data_uri = f"s3://{write_bucket}/{write_prefix}/"
scripts_uri = f"s3://{write_bucket}/{write_prefix}/scripts"
model_prefix=f"{write_prefix}/training_jobs"
estimator_output_uri = f"s3://{write_bucket}/{write_prefix}/training_jobs"
processing_output_uri = f"s3://{write_bucket}/{write_prefix}/processing_jobs"
model_eval_output_uri = f"s3://{write_bucket}/{write_prefix}/model_eval"
clarify_bias_config_output_uri = f"s3://{write_bucket}/{write_prefix}/model_monitor/bias_config"
clarify_explainability_config_output_uri = f"s3://{write_bucket}/{write_prefix}/model_monitor/explainability_config"
bias_report_output_uri = f"s3://{write_bucket}/{write_prefix}/clarify_output/pipeline/bias"
explainability_report_output_uri = f"s3://{write_bucket}/{write_prefix}/clarify_output/pipeline/explainability"

train_model_id, train_model_version, train_scope = "lightgbm-classification-model", "*", "training"
process_instance_type = "ml.m5.large"
train_instance_count = 1
train_instance_type = "ml.m5.large"
predictor_instance_count = 1
predictor_instance_type = "ml.m5.large"
clarify_instance_count = 1
clarify_instance_type = "ml.m5.large"

train_image_uri = retrieve(
    region=None,
    framework=None,
    model_id=train_model_id,
    model_version=train_model_version,
    image_scope=train_scope,
    instance_type=train_instance_type
)

In [None]:
pipeline_name = "CensusIncomeLGBPipeline"
pipeline_model_name = "census-income-lgb-pipeline"
model_package_group_name = "census-income-lgb-model-group"
base_job_name_prefix = "census-income"
tuning_job_name_prefix = "census-income-tuning"
endpoint_config_name = f"{pipeline_model_name}-endpoint-config"
endpoint_name = f"{pipeline_model_name}-endpoint"

target_col = "Target"

In [None]:
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

deploy_instance_type_param = ParameterString(
    name="DeployInstanceType",
    default_value=predictor_instance_type,
)

deploy_instance_count_param = ParameterInteger(
    name="DeployInstanceCount",
    default_value=predictor_instance_count
)

clarify_instance_type_param = ParameterString(
    name="ClarifyInstanceType",
    default_value=clarify_instance_type,
)

skip_check_model_bias_param = ParameterBoolean(
    name="SkipModelBiasCheck", 
    default_value=False
)

register_new_baseline_model_bias_param = ParameterBoolean(
    name="RegisterNewModelBiasBaseline",
    default_value=False
)

supplied_baseline_constraints_model_bias_param = ParameterString(
    name="ModelBiasSuppliedBaselineConstraints", 
    default_value=""
)

skip_check_model_explainability_param = ParameterBoolean(
    name="SkipModelExplainabilityCheck", 
    default_value=False
)

register_new_baseline_model_explainability_param = ParameterBoolean(
    name="RegisterNewModelExplainabilityBaseline",
    default_value=False
)

supplied_baseline_constraints_model_explainability_param = ParameterString(
    name="ModelExplainabilitySuppliedBaselineConstraints", 
    default_value=""
)

model_approval_status_param = ParameterString(
    name="ModelApprovalStatus", default_value="Approved"
)

In [None]:
%%writefile src/preprocessing.py

import argparse
import os
import pandas as pd
import numpy as np
import logging
from sklearn.model_selection import train_test_split

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-ratio", type=float, default=0.8)
    parser.add_argument("--validation-ratio", type=float, default=0.1)
    parser.add_argument("--test-ratio", type=float, default=0.1)
    args, _ = parser.parse_known_args()
    logger.info("Received arguments {}".format(args))
    
    local_dir = "/opt/ml/processing"    
    
    input_data_path = os.path.join("/opt/ml/processing/census-income", "census-income.csv")
    
    logger.info("Reading claims data from {}".format(input_data_path))
    df = pd.read_csv(input_data_path)
    
    df = df.replace("unknown", np.nan)

    categorical_features = list(
        df.loc[:, df.dtypes == "object"].columns.values
    )
    for f in categorical_features:
        df[f] = df[f].astype("category")
    categorical_features = [c for c in categorical_features if not c == "Class"]

    for f in df.columns:
        if f in categorical_features:
            df[f].fillna(df[f].mode()[0], inplace=True)
        else:
            df[f].fillna(df[f].median(), inplace=True)

    df = pd.get_dummies(df, columns=categorical_features)

    X = df.drop(columns=["Class"], axis=1)
    y = df["Class"]
    
    train_ratio = args.train_ratio
    val_ratio = args.validation_ratio
    test_ratio = args.test_ratio
    
    logger.debug("Splitting data into train, validation, and test sets")

    X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=test_ratio)
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=val_ratio)
    
    X_train["Class"] = y_train
    X_val["Class"] = y_val
    X_test["Class"] = y_test
    X["Class"] = y
    
    logger.info("Train data shape after preprocessing: {}".format(X_train.shape))
    logger.info("Validation data shape after preprocessing: {}".format(X_val.shape))
    logger.info("Test data shape after preprocessing: {}".format(X_test.shape))
    
    logger.debug("Writing processed datasets to container local path.")
    train_output_path = os.path.join(f"{local_dir}/train", "train.csv")
    validation_output_path = os.path.join(f"{local_dir}/val", "validation.csv")
    test_output_path = os.path.join(f"{local_dir}/test", "test.csv")
    full_processed_output_path = os.path.join(f"{local_dir}/full", "dataset.csv")

    logger.info("Saving train data to {}".format(train_output_path))
    X_train.to_csv(train_output_path, index=False)
    
    logger.info("Saving validation data to {}".format(validation_output_path))
    X_val.to_csv(validation_output_path, index=False)

    logger.info("Saving test data to {}".format(test_output_path))
    X_test.to_csv(test_output_path, index=False)
    
    logger.info("Saving full processed data to {}".format(full_processed_output_path))
    X.to_csv(full_processed_output_path, index=False)


In [None]:

s3_client.upload_file(
    Filename="src/preprocessing.py", Bucket=write_bucket, Key=f"{write_prefix}/scripts/preprocessing.py"
)

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker_role,
    instance_count=1,
    instance_type=process_instance_type,
    base_job_name=f"{base_job_name_prefix}-processing",
)

process_step = ProcessingStep(
    name="DataProcessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=census_income_data_uri, destination="/opt/ml/processing/census-income"),
    ],
    outputs=[
        ProcessingOutput(destination=f"{processing_output_uri}/train_data", output_name="train_data", source="/opt/ml/processing/train"),
        ProcessingOutput(destination=f"{processing_output_uri}/validation_data", output_name="validation_data", source="/opt/ml/processing/val"),
        ProcessingOutput(destination=f"{processing_output_uri}/test_data", output_name="test_data", source="/opt/ml/processing/test"),
        ProcessingOutput(destination=f"{processing_output_uri}/processed_data", output_name="processed_data", source="/opt/ml/processing/full")
    ],
    job_arguments=[
        "--train-ratio", "0.8", 
        "--validation-ratio", "0.1",
        "--test-ratio", "0.1"
    ],
    code=f"s3://{write_bucket}/{write_prefix}/scripts/preprocessing.py"
)

In [None]:
%%writefile src/lightgbm_train.py

import argparse
import os
import joblib
import json
import pandas as pd
from sklearn.metrics import f1_score
from sklearn.model_selection import cross_val_score

def install(package):
    subprocess.check_call([sys.executable, "-q", "-m", "pip", "install", package])
    
install("lightgbm")

import lightgbm as lgb


def prepare_data(df):
    categorical_features = list(
        df.loc[:, df.dtypes == "object"].columns.values
    )
    for f in categorical_features:
        df[f] = df[f].astype("category")
    X = df.drop(columns=["Class"])
    y = df["Class"]
    return X, y


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    
    parser.add_argument("--boosting_type", type=str, default="gbdt")
    parser.add_argument("--objective", type=str, default="binary")

    parser.add_argument("--n_estimators", type=int, default=200)
    parser.add_argument("--learning_rate", type=float, default=0.001)
    parser.add_argument("--num_leaves", type=int, default=30)
    parser.add_argument("--max_bin", type=int, default=300)

    parser.add_argument("--train_data_dir", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--validation_data_dir", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION"))
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--output_data_dir", type=str, default=os.environ.get("SM_OUTPUT_DATA_DIR"))

    args = parser.parse_args()

    train_df = pd.read_csv(f"{args.train_data_dir}/train.csv")
    val_df = pd.read_csv(f"{args.validation_data_dir}/validation.csv")

    params = {
        "n_estimators": args.n_estimators,
        "learning_rate": args.learning_rate,
        "num_leaves": args.num_leaves,
        "max_bin": args.max_bin,
    }

    X, y = prepare_data(train_df)
    model = lgb.LGBMClassifier(**params)

    scores = cross_val_score(model, X, y, scoring="f1_macro")
    train_f1 = scores.mean()
    model = model.fit(X, y)

    X_test, y_test = prepare_data(val_df)
    test_f1 = f1_score(y_test, model.predict(X_test))

    print(f"[0]#011train-f1:{train_f1:.2f}")
    print(f"[0]#011validation-f1:{test_f1:.2f}")

    metrics_data = {"hyperparameters": params,
                    "binary_classification_metrics": {"validation:f1": {"value": test_f1},
                                                      "train:f1": {"value": train_f1}
                                                      }
                    }

    metrics_location = args.output_data_dir + "/metrics.json"

    model_location = args.model_dir + "/lightgbm-model"

    with open(metrics_location, "w") as f:
        json.dump(metrics_data, f)

    with open(model_location, "wb") as f:
        joblib.dump(model, f)


In [None]:
static_hyperparams = {  
    "boosting_type" : "gbdt",
    "objective": "binary",
}

lgb_estimator = Estimator(
    source_dir="src",
    entry_point="lightgbm_train.py",
    output_path=estimator_output_uri,
    code_location=estimator_output_uri,
    hyperparameters=static_hyperparams,
    role=sagemaker_role,
    image_uri=train_image_uri,
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    framework_version="1.3-1",
)

hyperparameter_ranges = {
    "n_estimators": IntegerParameter(10, 400),
    "learning_rate": ContinuousParameter(0.0001, 0.5, scaling_type="Logarithmic"),
    "num_leaves": IntegerParameter(2, 200),
    "max_bin": IntegerParameter(50, 500)
}

tuner_config_dict = {
     "estimator" : lgb_estimator,
     "max_jobs" : 20,
     "max_parallel_jobs" : 2,
     "objective_metric_name" : "validation-f1",
     "metric_definitions": [{"Name": "validation-f1", "Regex": "validation-f1:([0-9\\.]+)"}],
     "hyperparameter_ranges" : hyperparameter_ranges,
     "base_tuning_job_name" : tuning_job_name_prefix,
     "strategy" : "Random"
}

tuner = HyperparameterTuner(**tuner_config_dict)

tuning_step = TuningStep(
    name="LGBModelTuning",
    tuner=tuner,
    inputs={
            "train": TrainingInput(
                s3_data=process_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri, 
                content_type="text/csv", 
                s3_data_type="S3Prefix"
            ),
            "validation": TrainingInput(
                s3_data=process_step.properties.ProcessingOutputConfig.Outputs["validation_data"].S3Output.S3Uri,
                content_type="text/csv",
                s3_data_type="S3Prefix"
            ),
    }
)

In [None]:
model = sagemaker.model.Model(
    image_uri=train_image_uri,
    model_data=tuning_step.get_top_model_s3_uri(
        top_k=0, s3_bucket=write_bucket, prefix=model_prefix
    ),
    sagemaker_session=sess,
    role=sagemaker_role
)

inputs = sagemaker.inputs.CreateModelInput(instance_type=deploy_instance_type_param)

create_model_step = CreateModelStep(name="CensusIncomeModel", model=model, inputs=inputs)

In [None]:
check_job_config = CheckJobConfig(
    role=sagemaker_role,
    instance_count=1,
    instance_type=clarify_instance_type,
    volume_size_in_gb=30,
    sagemaker_session=sess,
)

model_bias_data_config = sagemaker.clarify.DataConfig(
    s3_data_input_path=process_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
    s3_output_path=bias_report_output_uri,
    label=target_col,
    dataset_type="text/csv",
    s3_analysis_config_output_path=clarify_bias_config_output_uri
)

model_config = sagemaker.clarify.ModelConfig(
    model_name=create_model_step.properties.ModelName,
    instance_count=train_instance_count,
    instance_type=train_instance_type
)

bias_config = sagemaker.clarify.BiasConfig(
    label_values_or_threshold=[1], facet_name="Sex", facet_values_or_threshold=[0], group_name="Age"
)

model_predictions_config = sagemaker.clarify.ModelPredictedLabelConfig(probability_threshold=0.5)

model_bias_check_config = ModelBiasCheckConfig(
    data_config=model_bias_data_config,
    data_bias_config=bias_config,
    model_config=model_config,
    model_predicted_label_config=model_predictions_config,
    methods=["DPPL"]
)

model_bias_check_step = ClarifyCheckStep(
    name="ModelBiasCheck",
    clarify_check_config=model_bias_check_config,
    check_job_config=check_job_config,
    skip_check=skip_check_model_bias_param,
    register_new_baseline=register_new_baseline_model_bias_param,
    supplied_baseline_constraints=supplied_baseline_constraints_model_bias_param
)

In [None]:
model_explainability_data_config = sagemaker.clarify.DataConfig(
    s3_data_input_path=process_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
    s3_output_path=explainability_report_output_uri,
    label=target_col,
    dataset_type="text/csv",
    s3_analysis_config_output_path=clarify_explainability_config_output_uri 
)

shap_config = sagemaker.clarify.SHAPConfig(
    seed=829, 
    num_samples=100,
    agg_method="mean_abs",
    save_local_shap_values=True
)

model_explainability_config = ModelExplainabilityCheckConfig(
    data_config=model_explainability_data_config,
    model_config=model_config,
    explainability_config=shap_config
)

model_explainability_step = ClarifyCheckStep(
    name="ModelExplainabilityCheck",
    clarify_check_config=model_explainability_config,
    check_job_config=check_job_config,
    skip_check=skip_check_model_explainability_param,
    register_new_baseline=register_new_baseline_model_explainability_param,
    supplied_baseline_constraints=supplied_baseline_constraints_model_explainability_param
)

In [None]:
%%writefile src/evaluate.py

import logging
import pathlib
import tarfile
import subprocess
import sys
import joblib
import json

def install(package):
    subprocess.check_call([sys.executable, "-q", "-m", "pip", "install", package])
    
install("lightgbm")

import pandas as pd

from sklearn.metrics import f1_score


def prepare_data(df):
    categorical_features = list(
        df.loc[:, df.dtypes == "object"].columns.values
    )
    for f in categorical_features:
        df[f] = df[f].astype("category")
    X = df.drop(columns=["Class"])
    y = df["Class"]
    return X, y

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


if __name__ == "__main__":
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    logger.debug("Loading LightGBM model.")
    model = joblib.load(open("lightgbm-model", "rb"))

    logger.debug("Reading test data.")
    test_local_path = "/opt/ml/processing/test/test.csv"
    
    test_df = pd.read_csv(test_local_path)
    
    X_test, y_test = prepare_data(test_df)
    test_f1 = f1_score(y_test, model.predict(X_test))
    
    logger.debug("Calculating F1 score.")
    metric_dict = {
        "classification_metrics": {"f1": {"value": test_f1}}
    }
    
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info("Writing evaluation report with F1: %f", test_f1)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(metric_dict))

In [None]:
s3_client.upload_file(
    Filename="src/evaluate.py", Bucket=write_bucket, Key=f"{write_prefix}/scripts/evaluate.py"
)

eval_processor = ScriptProcessor(
    image_uri=train_image_uri,
    command=["python3"],
    instance_type=predictor_instance_type,
    instance_count=predictor_instance_count,
    base_job_name=f"{base_job_name_prefix}-model-eval",
    sagemaker_session=sess,
    role=sagemaker_role,
)
evaluation_report = PropertyFile(
    name="CensusIncomeEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

evaluation_step = ProcessingStep(
    name="LGBModelEvaluate",
    processor=eval_processor,
    inputs=[
        ProcessingInput(
            source=tuning_step.get_top_model_s3_uri(
                top_k=0, s3_bucket=write_bucket, prefix=model_prefix
            ),
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=process_step.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(destination=f"{model_eval_output_uri}", output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code=f"s3://{write_bucket}/{write_prefix}/scripts/evaluate.py",
    property_files=[evaluation_report],
)

In [None]:
model_metrics = ModelMetrics(
    bias_post_training=MetricsSource(
        s3_uri=model_bias_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json"
    ),
    explainability=MetricsSource(
        s3_uri=model_explainability_step.properties.CalculatedBaselineConstraints,
        content_type="application/json"
    ),
)

drift_check_baselines = DriftCheckBaselines(
    bias_post_training_constraints=MetricsSource(
        s3_uri=model_bias_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    explainability_constraints=MetricsSource(
        s3_uri=model_explainability_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    explainability_config_file=FileSource(
        s3_uri=model_explainability_config.monitoring_analysis_config_uri,
        content_type="application/json",
    ),
)

register_step = RegisterModel(
    name="LGBRegisterModel",
    estimator=lgb_estimator,
    model_data=tuning_step.get_top_model_s3_uri(
        top_k=0, s3_bucket=write_bucket, prefix=model_prefix
    ),
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[predictor_instance_type],
    transform_instances=[predictor_instance_type],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param,
    model_metrics=model_metrics,
    drift_check_baselines=drift_check_baselines
)

In [None]:
%%writefile src/lambda_deployer.py

"""
Lambda function creates an endpoint configuration and deploys a model to real-time endpoint. 
Required parameters for deployment are retrieved from the event object
"""

import json
import boto3


def lambda_handler(event, context):
    sm_client = boto3.client("sagemaker")

    model_name = event["model_name"]
    model_package_arn = event["model_package_arn"]
    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]
    role = event["role"]
    instance_type = event["instance_type"]
    instance_count = event["instance_count"]
    primary_container = {"ModelPackageName": model_package_arn}

    model = sm_client.create_model(
        ModelName=model_name,
        PrimaryContainer=primary_container,
        ExecutionRoleArn=role
    )

    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
        {
            "VariantName": "Alltraffic",
            "ModelName": model_name,
            "InitialInstanceCount": instance_count,
            "InstanceType": instance_type,
            "InitialVariantWeight": 1
        }
        ]
    )

    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name, 
        EndpointConfigName=endpoint_config_name
    )

In [None]:
function_name = "sagemaker-lgb-census-income-lambda-step"
func = Lambda(
    function_name=function_name,
    execution_role_arn=sagemaker_role,
    script="src/lambda_deployer.py",
    handler="lambda_deployer.lambda_handler",
    timeout=600,
    memory_size=10240,
)


lambda_deploy_step = LambdaStep(
    name="LambdaStepRealTimeDeploy",
    lambda_func=func,
    inputs={
        "model_name": pipeline_model_name,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "model_package_arn": register_step.steps[0].properties.ModelPackageArn,
        "role": sagemaker_role,
        "instance_type": deploy_instance_type_param,
        "instance_count": deploy_instance_count_param
    }
)

In [None]:
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="classification_metrics.f1.value",
    ),
    right=0.9,
)
condition_step = ConditionStep(
    name="CheckCensusIncomeLGBEvaluation",
    conditions=[cond_gte],
    if_steps=[create_model_step, model_bias_check_step, model_explainability_step, register_step, lambda_deploy_step], 
    else_steps=[]
)

In [None]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[process_instance_type_param, 
                train_instance_type_param, 
                train_instance_count_param, 
                deploy_instance_type_param,
                deploy_instance_count_param,
                clarify_instance_type_param,
                skip_check_model_bias_param,
                register_new_baseline_model_bias_param,
                supplied_baseline_constraints_model_bias_param,
                skip_check_model_explainability_param,
                register_new_baseline_model_explainability_param,
                supplied_baseline_constraints_model_explainability_param,
                model_approval_status_param],
    steps=[
        process_step,
        tuning_step,
        evaluation_step,
        condition_step
    ],
    sagemaker_session=sess
    
)

In [None]:
pipeline.upsert(role_arn=sagemaker_role)

pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
pipeline_definition

In [None]:
start_response = pipeline.start(parameters=dict(
        SkipModelBiasCheck=True,
        RegisterNewModelBiasBaseline=True,
        SkipModelExplainabilityCheck=True,
        RegisterNewModelExplainabilityBaseline=False)
)

In [None]:
test_df = pd.read_csv(f"{processing_output_uri}/test_data/test.csv")

predictor = sagemaker.predictor.Predictor(endpoint_name, 
                                          sagemaker_session=sess,
                                          serializer=CSVSerializer(),
                                          deserializer=CSVDeserializer()
                                         )

payload = test_df.drop(["Target"], axis=1).iloc[:5]
result = predictor.predict(payload.values)
prediction_df = pd.DataFrame()
prediction_df["Prediction"] = result
prediction_df["Label"] = test_df["Class"].iloc[:5].values
prediction_df

In [None]:
func.delete()
sm_client.delete_endpoint(EndpointName=endpoint_name)
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
sm_client.delete_model(ModelName=pipeline_model_name)
sm_client.delete_pipeline(PipelineName=pipeline_name)