# Assignment 3: Create a SageMaker pipeline
In this assignment you create an end-to-end ML workflow using [SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/).

Refer to the notebook [`03-sagemaker-pipeline.ipynb`](../03-sagemaker-pipeline.ipynb) for code snippets and a general guidance for the exercises in this assignment.

## Import packages

In [16]:
%pip install 'sagemaker[feature-processor]' --force-reinstall

Collecting sagemaker[feature-processor]
  Using cached sagemaker-2.253.1-py3-none-any.whl.metadata (17 kB)
Collecting attrs<26,>=24 (from sagemaker[feature-processor])
  Using cached attrs-25.4.0-py3-none-any.whl.metadata (10 kB)
Collecting boto3<2.0,>=1.39.5 (from sagemaker[feature-processor])
  Using cached boto3-1.40.60-py3-none-any.whl.metadata (6.6 kB)
Collecting cloudpickle>=2.2.1 (from sagemaker[feature-processor])
  Using cached cloudpickle-3.1.1-py3-none-any.whl.metadata (7.1 kB)
Collecting docker (from sagemaker[feature-processor])
  Using cached docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting fastapi (from sagemaker[feature-processor])
  Using cached fastapi-0.120.1-py3-none-any.whl.metadata (28 kB)
Collecting google-pasta (from sagemaker[feature-processor])
  Using cached google_pasta-0.2.0-py3-none-any.whl.metadata (814 bytes)
Collecting graphene<4,>=3 (from sagemaker[feature-processor])
  Using cached graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collec

In [18]:
import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker
import mlflow
from time import gmtime, strftime, sleep
from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer
from importlib.metadata import version, PackageNotFoundError

from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep,
    CacheConfig
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.quality_check_step import (
    DataQualityCheckConfig,
    ModelQualityCheckConfig,
    QualityCheckStep,
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.conditions import (
    ConditionGreaterThan,
    ConditionGreaterThanOrEqualTo
)
from sagemaker.workflow.parallelism_config import ParallelismConfiguration
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import (
    Join,
    JsonGet
)
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig 
from sagemaker.image_uris import retrieve
from sagemaker.workflow.function_step import step
from sagemaker.workflow.step_outputs import get_step
from sagemaker.model_monitor import DatasetFormat, model_monitoring
from IPython.display import HTML

(sagemaker.__version__, boto3.__version__, mlflow.__version__)

('2.253.1', '1.40.60', '2.22.2')

## Set constants
You need some fixed string literals to simplify coding. Create these literals here.

In [2]:
# Set names of pipeline objects
project = "from-idea-to-prod"
pipeline_name = f"{project}-pipeline"
pipeline_model_name = f"{project}-model-xgb"
model_package_group_name = f"{project}-model-group"
endpoint_config_name = f"{project}-endpoint-config"
endpoint_name = f"{project}-endpoint"

# Set instance types and counts
process_instance_type = "ml.c5.xlarge"
train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

# Set S3 urls for processed data
output_s3_prefix = f"s3://{bucket_name}/{bucket_prefix}"
output_s3_url = f"{output_s3_prefix}/output"
train_s3_url = f"{output_s3_prefix}/train"
validation_s3_url = f"{output_s3_prefix}/validation"
test_s3_url = f"{output_s3_prefix}/test"
evaluation_s3_url = f"{output_s3_prefix}/evaluation"
baseline_s3_url = f"{output_s3_prefix}/baseline"
baseline_results_s3_url = f"{baseline_s3_url}/results"
prediction_baseline_s3_url = f"{output_s3_prefix}/prediction_baseline"
prediction_baseline_results_s3_url = f"{prediction_baseline_s3_url}/results"

## Exercise 1: Create pipeline
Follow the steps to create a SageMaker pipeline with your ML workflow:
- Setup pipeline [parameters](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#parameters)
- Build the pipeline [steps](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#steps)
- Construct the [pipeline](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline)
- [Upsert](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.upsert) the pipeline

Configure outputs and inputs for each pipeline step.

In [7]:
bucket_name = sagemaker.Session().default_bucket()
bucket_prefix = "from-idea-to-prod/xgboost" 

sagemaker.config INFO - Fetched defaults config from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [8]:
output_s3_prefix = f"s3://{bucket_name}/{bucket_prefix}"
output_s3_url = f"{output_s3_prefix}/output"

train_s3_url = f"{output_s3_prefix}/train"
validation_s3_url = f"{output_s3_prefix}/validation"
test_s3_url = f"{output_s3_prefix}/test"
evaluation_s3_url = f"{output_s3_prefix}/evaluation"

baseline_s3_url = f"{output_s3_prefix}/baseline"
baseline_results_s3_url = f"{baseline_s3_url}/results"

prediction_baseline_s3_url = f"{output_s3_prefix}/prediction_baseline"
prediction_baseline_results_s3_url=f"{prediction_baseline_s3_url}/results"

In [13]:
#Jake, you will need to update these values later if you don't finish

input_s3_url = f"s3://{bucket_name}/{bucket_prefix}/input"
mlflow_arn = "arn:aws:sagemaker:us-east-2:284038851123:mlflow-tracking-server/mlflow-d-21qjmpg5nbfo-16-17-36-05"


In [14]:
# Setup pipeline parameters
# Set S3 url for input dataset
# input_s3_url_param = ParameterString()
# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set model approval status for the model registry
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus",
    default_value=model_approval_status
)

# Minimal threshold for model performance on the test dataset
test_score_threshold_param = ParameterFloat(
    name="TestScoreThreshold",
    default_value=0.75
)

# Parametrize the S3 url for input dataset
input_s3_url_param = ParameterString(
    name="InputDataUrl",
    default_value=input_s3_url,
)

# Model package group name
model_package_group_name_param = ParameterString(
    name="ModelPackageGroupName",
    default_value=model_package_group_name,
)

# MLflow tracking server ARN
tracking_server_arn_param = ParameterString(
    name="TrackingServerARN",
    default_value=mlflow_arn,
)

In [None]:
# Processing step
# sklearn_processor = SKLearnProcessor()

# processing_inputs=[]

# processing_outputs=[]

# processor_args = sklearn_processor.run()

# step_process = ProcessingStep()

In [19]:
# Python function code is in the local files
from pipeline_steps.preprocess import preprocess

In [None]:
# Training step
# estimator = sagemaker.estimator.Estimator()

# estimator.set_hyperparameters()

# training_inputs = {}

# training_args = estimator.fit(training_inputs)

# step_train = TrainingStep()

Create an executable Python script for the evaluation step

In [None]:
%%writefile evaluation_assignment.py

import json
import pathlib
import pickle as pkl
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb
import datetime as dt
from sklearn.metrics import roc_curve, auc

if __name__ == "__main__":   
    # All paths are local for the processing container
        
    # Read model tar file

    # Load model
    
    # Read test data

    # Run predictions

    # Evaluate predictions

    # Save evaluation report

Use this script to setup a [ScriptProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ScriptProcessor) object. Don't forget to pass the [PropertyFile](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.properties.PropertyFile) where the evaluation script outputs the metrics to the [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) constructor.

In [None]:
# Evaluation step
# script_processor = ScriptProcessor()

# eval_inputs=[]

# eval_outputs=[]

# eval_args = script_processor.run()

# evaluation_report = PropertyFile()

# step_eval = ProcessingStep()

Use a model artifact from the training step and a property file from the evaluation step to create a [Model](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.Model) and [ModelMetrics](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_metrics.ModelMetrics) objects. Use these objects to construct a [ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.model_step.ModelStep).

In [None]:
# Register step
# model = Model()

# model_metrics = ModelMetrics()

# register_args = model.register()

# step_register = ModelStep()

Add a [FailStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.fail_step.FailStep) to stop the pipeline execution if the model performance metric doesn't meet the specified threshold. 

In [None]:
# Fail step
# step_fail = FailStep()

Use [Conditions](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#conditions) and `JsonGet` to construct a condition and the condition step.

In [None]:
# Condition step
# cond_lte = ConditionGreaterThan()

# step_cond = ConditionStep()

Use step array to create a [Pipeline](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline) object.

In [None]:
# Create pipeline
# pipeline = Pipeline()

In [None]:
# Create a new or update existing Pipeline
# pipeline.upsert(role_arn=sm_role)

In [None]:
# Print the pipeline definition
# pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
# pipeline_definition

## Exercise 2: Execute the pipeline

In [None]:
# Start an execution
# execution = pipeline.start()

In [None]:
# Un-comment this call if you want the notebook to wait until the pipeline's execution finished
# Execution time of this pipeline is about 13 minutes
# execution.wait()

In [None]:
# List execution steps
# execution.list_steps()

## Continue with the assignment 4
Navigate to the [assignment 4](04-assignment-sagemaker-project.ipynb) notebook.