## Import library and set the roles and S3 buckets

In [1]:
import boto3
import sagemaker
import pandas as pd
import matplotlib.pyplot as plt
import sagemaker, boto3, json
from sagemaker import get_execution_role
import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker


from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)
from sagemaker.drift_check_baselines import DriftCheckBaselines

from sagemaker.image_uris import retrieve

from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
import boto3

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
# Aws_role will be used to access aws resources
aws_role = get_execution_role()

# Set up clients to make requests to s3 and sm services in the correct region
sess = sagemaker.Session()  # start a sagemaker session
region = sess.boto_region_name
s3_client = boto3.client("s3", region_name=region)
sm_client = boto3.client("sagemaker", region_name=region)
sm_runtime_client = boto3.client("sagemaker-runtime")

# Set up parameters for identifying write / read locations
write_bucket = sess.default_bucket()  # sagemaker-us-west-1-183295447234
write_prefix = "spam-mail"

read_bucket = "spam-mail-dataset"
dataset = 'spam.csv'

# Keys will be used during pipeline steps
raw_data_key = f"s3://{read_bucket}"
processed_data_key = f"s3://{write_bucket}/{write_prefix}/processed"
train_data_key = f"s3://{write_bucket}/{write_prefix}/train"
validation_data_key = f"s3://{write_bucket}/{write_prefix}/validation"
test_data_key = f"s3://{write_bucket}/{write_prefix}/test"

# Full S3 paths
output_data_uri = f"s3://{write_bucket}/{write_prefix}/"
scripts_uri = f"s3://{write_bucket}/{write_prefix}/scripts"
estimator_output_uri = f"s3://{write_bucket}/{write_prefix}/training_jobs"
processing_output_uri = f"s3://{write_bucket}/{write_prefix}/processing_jobs"
model_eval_output_uri = f"s3://{write_bucket}/{write_prefix}/model_eval"
clarify_bias_config_output_uri = f"s3://{write_bucket}/{write_prefix}/model_monitor/bias_config"
clarify_explainability_config_output_uri = f"s3://{write_bucket}/{write_prefix}/model_monitor/explainability_config"
bias_report_output_uri = f"s3://{write_bucket}/{write_prefix}/clarify_output/pipeline/bias"
explainability_report_output_uri = f"s3://{write_bucket}/{write_prefix}/clarify_output/pipeline/explainability"

# Retrieve training image
training_image = retrieve(framework="sklearn", region=region, version="0.23-1")

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


##### Pipeline parameters

In [3]:
# Set names of pipeline objects
pipeline_name = "SpamHamLogRegPipeline"
pipeline_model_name = "spam-ham-logreg-pipeline"
model_package_group_name = "spam-ham-logreg-model-group"
base_job_name_prefix = "spam-ham"
endpoint_config_name = f"{pipeline_model_name}-endpoint-config"
endpoint_name = f"{pipeline_model_name}-endpoint"

# Set data parameters
target_col = "label"  # 1 = Spam, 0 = Ham

# Set instance types and counts
process_instance_type = "ml.c5.xlarge"  # Preprocessing
train_instance_count = 1
train_instance_type = "ml.m4.xlarge"  # Training
predictor_instance_count = 1
predictor_instance_type = "ml.m4.xlarge"  # Inference

In [4]:
# Set up pipeline runtime input parameters
# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

# Set deployment instance type
deploy_instance_type_param = ParameterString(
    name="DeployInstanceType",
    default_value=predictor_instance_type,
)

# Set deployment instance count
deploy_instance_count_param = ParameterInteger(
    name="DeployInstanceCount",
    default_value=predictor_instance_count
)

# Set model approval param
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus", default_value="Approved"
)

## Preprocess Data

In [5]:
# Ensure the preprocessing script is uploaded to S3
s3_client.upload_file(
    Filename="preprocessing.py", 
    Bucket=write_bucket, 
    Key=f"{write_prefix}/scripts/preprocessing.py"
)

# Define the ScriptProcessor configuration
script_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve("sklearn", region=region, version="0.23-1"),
    command=['python3'],
    role=aws_role,
    instance_count=1,
    instance_type=process_instance_type,
    base_job_name=f"{write_prefix}-processing",
)

# Define the pipeline processing step
local_dir = '/opt/ml/processing'

process_step = ProcessingStep(
    name="DataProcessing",
    processor=script_processor,
    inputs=[
        ProcessingInput(source=f"s3://{read_bucket}/{dataset}", destination=f"{local_dir}/spam-mail")
    ],
    outputs=[
        ProcessingOutput(destination=f"{processed_data_key}/train", output_name="train_data", source=f"{local_dir}/train"),
        ProcessingOutput(destination=f"{processed_data_key}/val", output_name="validation_data", source=f"{local_dir}/val"),
        ProcessingOutput(destination=f"{processed_data_key}/test", output_name="test_data", source=f"{local_dir}/test"),
        ProcessingOutput(destination=f"{processed_data_key}/processed", output_name="processed_data", source=f"{local_dir}/processed")
    ],
    job_arguments=[
        "--train-ratio", "0.8",
        "--validation-ratio", "0.1",
        "--test-ratio", "0.1"
    ],
    code=f"s3://{write_bucket}/{write_prefix}/scripts/preprocessing.py"
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


## Train the Model

In [6]:
from sagemaker.sklearn import SKLearn
from sagemaker.inputs import TrainingInput

# Set Logistic Regression model hyperparameters
hyperparams = {
    "C": 1.0,                 
    "max_iter": 100,          
    "solver": "liblinear",    
    "penalty": "l2",          
    "fit_intercept": True     
}

# Set SKLearn estimator
sklearn_estimator = SKLearn(
    entry_point="logistic_regression_train.py", 
    output_path=estimator_output_uri,           # S3 path for the output model
    code_location=estimator_output_uri,         # S3 path for the code
    hyperparameters=hyperparams,                
    role=aws_role,     
    # Fetch instance type and count from pipeline parameters
    instance_count=train_instance_count,     
    instance_type=train_instance_type,    
    framework_version="0.23-1"         
)

# Access the location where the preceding processing step saved train and validation datasets
# Pipeline step properties can give access to outputs which can be used in succeeding steps
s3_input_train = TrainingInput(
    s3_data=process_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri, 
    content_type="csv", 
    s3_data_type="S3Prefix"
)
s3_input_validation = TrainingInput(
    s3_data=process_step.properties.ProcessingOutputConfig.Outputs["validation_data"].S3Output.S3Uri,
    content_type="csv",
    s3_data_type="S3Prefix"
)

# Set pipeline training step
train_step = TrainingStep(
    name="LogisticRegressionModelTraining", 
    estimator=sklearn_estimator,            
    inputs={
        "train": s3_input_train,             # Train data channel
        "validation": s3_input_validation    # Validation data channel
    }
)

## Create the Model

In [7]:
# Create a SageMaker model
model = sagemaker.model.Model(
    image_uri=training_image,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=aws_role
)

# Specify model deployment instance type
inputs = sagemaker.inputs.CreateModelInput(instance_type=deploy_instance_type_param)

create_model_step = CreateModelStep(name="SpamHamModel", model=model, inputs=inputs)

## Evaluate the Model

In [8]:
# Upload model evaluation script to S3
s3_client.upload_file(
    Filename="evaluate.py", Bucket=write_bucket, Key=f"{write_prefix}/scripts/evaluate.py"
)

eval_processor = ScriptProcessor(
    image_uri=training_image,
    command=["python3"],
    instance_type=predictor_instance_type,
    instance_count=predictor_instance_count,
    base_job_name=f"{base_job_name_prefix}-model-eval",
    sagemaker_session=sess,
    role=aws_role,
)

evaluation_report = PropertyFile(
    name="SpamHamEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

# Set model evaluation step
evaluation_step = ProcessingStep(
    name="LogisticRegressionModelEvaluate",
    processor=eval_processor,
    inputs=[
        ProcessingInput(
            # Fetch S3 location where train step saved model artifacts
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            # Fetch S3 location where processing step saved test data
            source=process_step.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(destination=f"{model_eval_output_uri}", output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code=f"s3://{write_bucket}/{write_prefix}/scripts/evaluate.py",
    property_files=[evaluation_report],
)

## Register the Model

In [9]:
# Define register model step
register_step = RegisterModel(
    name="LogisticRegressionRegisterModel",
    estimator=sklearn_estimator,
    # Fetching S3 location where train step saved model artifacts
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[predictor_instance_type],
    transform_instances=[predictor_instance_type],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param,
)

## Deploy the Model

In [10]:
# The function name must contain sagemaker
function_name = "sagemaker-spam-det-lambda-step"
# Define Lambda helper class can be used to create the Lambda function required in the Lambda step
func = Lambda(
    function_name=function_name,
    execution_role_arn=aws_role,
    script="lambda_deployer.py",
    handler="lambda_deployer.lambda_handler",
    timeout=600,
    memory_size=3008,
)

# The inputs used in the lambda handler are passed through the inputs argument in the 
# LambdaStep and retrieved via the `event` object within the `lambda_handler` function

lambda_deploy_step = LambdaStep(
    name="LambdaStepRealTimeDeploy",
    lambda_func=func,
    inputs={
        "model_name": pipeline_model_name,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "model_package_arn": register_step.steps[0].properties.ModelPackageArn,
        "role": aws_role,
        "instance_type": deploy_instance_type_param,
        "instance_count": deploy_instance_count_param
    }
)

## Check the Model

In [11]:
# Evaluate model performance on test set
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="classification_metrics.roc_auc.value",
    ),
    right=0.7, # Threshold to compare model performance against
)
condition_step = ConditionStep(
    name="SpamHamLogRegEvaluation",
    conditions=[cond_gte],
    if_steps=[create_model_step, register_step, lambda_deploy_step], 
    else_steps=[]
)

## Build the pipeline

In [12]:
# Create the Pipeline with all component steps and parameters
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[ 
                train_instance_type_param, 
                train_instance_count_param, 
                deploy_instance_type_param,
                deploy_instance_count_param,
                model_approval_status_param],
    steps=[
        process_step,
        train_step,
        evaluation_step,
        condition_step
    ],
    sagemaker_session=sess
    
)

In [13]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=aws_role)

# Full Pipeline description
pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
pipeline_definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m4.xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'DeployInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m4.xlarge'},
  {'Name': 'DeployInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'DataProcessing',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.c5.xlarge',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '746614075791.dkr.ecr.us-west-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerArguments': ['--train-ratio',
      

## Execute Pipeline

In [14]:
start_response = pipeline.start()

## Delete the Model Endpoint