# CS611: Project Pipeline

In this notebook we will build an Amazon SageMaker Pipeline to:
1. preprocess the URL data file
2. train a model using XGBoost, and 
3. register a machine learning model to Model Registry. 
4. deploy an endpoint

## Dataset

In [2]:
%pip install sagemaker

[0mNote: you may need to restart the kernel to use updated packages.


In [3]:
import os
import time
import boto3
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [4]:
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

pipeline_session = PipelineSession()

print(bucket)

sagemaker-us-east-1-843047345337


In [5]:
DATASET_FILE = 'raw_url_dataset.csv'

GRP_NAME = 'GRP1' # CHANGE THIS TO YOUR FIRST NAME
DATA_PREFIX = f'{GRP_NAME}/data/' # S3 prefix to store data
MODEL_OUTPUT_S3_PATH = f's3://{bucket}/{GRP_NAME}/model/' # S3 prefix to store the XGBoost training information and model.

BASE_JOB_PROCESSING_NAME = f'{GRP_NAME}-processing'  # base_job_name for preprocessing
BASE_JOB_TRAINING_NAME = f'{GRP_NAME}-training'  # base_job_name for training
BASE_JOB_EVALUATION_NAME = f'{GRP_NAME}-evaluation'  # base_job_name for evaluation

PIPELINE_NAME = f'{GRP_NAME}-pipeline'  # SageMaker Pipeline name
MODEL_PACKAGE_GROUP_NAME = f'{GRP_NAME}-ModelPackageGroup'  # Model package group name in the Model Registry

print(f'DATA_PREFIX: {DATA_PREFIX}')
print(f'PIPELINE_NAME: {PIPELINE_NAME}')
print(f'MODEL_PACKAGE_GROUP_NAME: {MODEL_PACKAGE_GROUP_NAME}')

DATA_PREFIX: GRP1/data/
PIPELINE_NAME: GRP1-pipeline
MODEL_PACKAGE_GROUP_NAME: GRP1-ModelPackageGroup


Define Pipeline parameters used to parametrize the pipeline:

In [6]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# raw input data
input_data = ParameterString(name='InputData')

# status of newly trained model in registry
model_approval_status = ParameterString(name='ModelApprovalStatus', 
                                        default_value='Approved')

# processing step parameters
processing_instance_type = ParameterString(name='ProcessingInstanceType', 
                                           default_value='ml.m5.large')
processing_instance_count = ParameterInteger(name='ProcessingInstanceCount', 
                                             default_value=1)

# training step parameters
training_instance_type = ParameterString(name='TrainingInstanceType', 
                                         default_value='ml.m5.large')

# model performance step parameters
auc_threshold = ParameterFloat(name='AUCThreshold', 
                                         default_value=0.90)

Upload data:

In [7]:
raw_s3_path = sagemaker_session.upload_data(DATASET_FILE, key_prefix=DATA_PREFIX)
print(raw_s3_path)

s3://sagemaker-us-east-1-843047345337/GRP1/data/raw_url_dataset.csv


## Preprocessing

In [8]:
!mkdir -p code

In [9]:
%%writefile code/preprocess.py

import numpy as np
import pandas as pd
import os
import json
import joblib
from io import StringIO
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import tarfile
import logging
import re

try:
    from sagemaker_containers.beta.framework import (
        content_types,
        encoders,
        env,
        modules,
        transformer,
        worker,
        server,
    )
except ImportError:
    pass

# ================================================================================
# Helper functions
def clean_url(url):
    pattern = r"(?:^https?://)?([^/]+)"
    match = re.match(pattern, url)
    if match:
        return match.group(1).lower() 
    else:
        raise ValueError("Input not a URL.")

def calculate_url_char_prob(url):
    char_probabilities = {
        'a': 0.08, 'b': 0.018, 'c': 0.08, 'd': 0.028, 'e': 0.088, 'f': 0.012, 
        'g': 0.032, 'h': 0.023, 'i': 0.063, 'j': 0.007, 'k': 0.013, 'l': 0.04, 
        'm': 0.068, 'n': 0.057, 'o': 0.11, 'p': 0.022, 'q': 0.002, 'r': 0.07, 
        's': 0.054, 't': 0.054, 'u': 0.035, 'v': 0.012, 'w': 0.011, 'x': 0.004, 
        'y': 0.016, 'z': 0.005, '0': 0.001, '1': 0.0001, '2': 0.0001, '3': 0.0001, 
        '4': 0.0001, '5': 0.0001, '6': 0.0001, '7': 0.0001, '8': 0.0001, '9': 0.0001
    }
    cleaned_url = clean_url(url)
    total_prob = 0.0
    for char in cleaned_url:
        if char in char_probabilities:
            total_prob += char_probabilities[char]
    url_length = len(cleaned_url)
    url_char_prob = (total_prob / url_length) if (url_length > 0) else 0    
    return url_char_prob

def count_digit(url):
    domain = clean_url(url)
    num_digits = sum(c.isdigit() for c in domain)
    return num_digits

def find_longest_sequence(s, pattern):
    sequences = re.findall(pattern, s)
    return max(sequences, key=len, default="")

def char_continuation_rate(url):
    cleaned_url = clean_url(url)
    patterns = {
        'alphabet': r'[a-zA-Z]+',
        'digit': r'\d+',
        'special': r'[^A-Za-z\d\s]+'
    }    
    total_seq_length = sum(len(find_longest_sequence(cleaned_url, pattern)) for pattern in patterns.values())
    total_url_length = len(cleaned_url)
    return (total_seq_length / total_url_length) if (total_url_length > 0) else 0

def extract_url_features(df):
    # Check if 'URL' column exists: Minimum requirement for call
    if 'URL' not in df.columns:
        raise ValueError("The dataframe must contain a column named 'URL'.")

    # Create a new dataframe to hold the features
    features_df = pd.DataFrame()

    # URL Features: Can be expanded on and reran to include new features in the fit
    features_df['CharContinuationRate'] = df['URL'].apply(char_continuation_rate)
    features_df['URLCharProb'] = df['URL'].apply(calculate_url_char_prob)
    features_df['DomainLength'] = df['URL'].apply(lambda url: len(clean_url(url)))
    features_df['SubdomainCount'] = df['URL'].apply(lambda url: clean_url(url).count('.')-1)
    features_df['NoOfDigitsInURL'] = df['URL'].apply(count_digit)
    features_df['IsHTTPS'] = df['URL'].apply(lambda url: 1 if url.lower().startswith("https://") else 0)    
    return features_df

# ================================================================================
RANDOM_STATE = 2024
LABEL_COLUMN = 'label'

base_dir = "/opt/ml/processing"
base_output_dir = "/opt/ml/output/"

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
if __name__ == "__main__":
    
    df = pd.read_csv(f"{base_dir}/input/raw_url_dataset.csv")
    
    feature_data = extract_url_features(df)        
    FEATURE_COLUMN = list(feature_data.columns)
    
    label_data = df[LABEL_COLUMN]

    x_train, x_temp, y_train, y_temp = train_test_split(feature_data, label_data, test_size=0.20, stratify=label_data, random_state=RANDOM_STATE)
    x_val, x_test, y_val, y_test = train_test_split(x_temp, y_temp, test_size=0.50, stratify=y_temp, random_state=RANDOM_STATE)
    
    scaler = StandardScaler()
    scaler.fit(x_train)
    x_train = scaler.transform(x_train)
    x_val = scaler.transform(x_val)
    x_test = scaler.transform(x_test)
    
    print(f'Test feature: {x_test}')
    print(f'The shape of training set: {x_train.shape}')
    print(f'The shape of validation set: {x_val.shape}')
    print(f'The shape of testing set: {x_test.shape}')

    train_dataset = pd.concat([y_train.reset_index(drop=True), pd.DataFrame(x_train)], axis=1)
    val_dataset = pd.concat([y_val.reset_index(drop=True), pd.DataFrame(x_val)], axis=1)
    test_dataset = pd.concat([y_test.reset_index(drop=True), pd.DataFrame(x_test)], axis=1)

    train_dataset.columns = [LABEL_COLUMN] + FEATURE_COLUMN
    val_dataset.columns = [LABEL_COLUMN] + FEATURE_COLUMN
    test_dataset.columns = [LABEL_COLUMN] + FEATURE_COLUMN

    train_dataset.to_csv(f"{base_dir}/train/train.csv", header=None, index=False)
    val_dataset.to_csv(f"{base_dir}/validation/validation.csv", header=None, index=False)
    test_dataset.to_csv(f"{base_dir}/test/test.csv", header=None, index=False)
    
    joblib.dump(scaler, "model.joblib")
    with tarfile.open(f"{base_dir}/scaler_model/model.tar.gz", "w:gz") as tar_handle:
        tar_handle.add(f"model.joblib")

def input_fn(input_data, content_type):

    logger.debug("Starting input:")
    if content_type == "text/csv":
        df = pd.read_csv(StringIO(input_data))
        logger.debug(f"Sample input csv: {df}")
        return df
    elif content_type == "application/json":
        data = json.loads(input_data)
        df = pd.DataFrame([data])
        logger.debug(f"Sample input json: {df}")
        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))

def output_fn(prediction, accept):
    try:
        return worker.Response(encoders.encode(prediction, "text/csv"), mimetype="text/csv")
    except:
         raise RuntimeException("Original accept: {}. Encoder error with text/csv.".format(accept))
        
def predict_fn(input_data, model):
    logger.debug(f"Starting: Feature scaling")
    logger.debug(f"Data into feature extract: {input_data}")
    feature_data = extract_url_features(input_data)
    features = model.transform(feature_data)
    logger.debug(f"Sample feature: {features}")
    return features

def model_fn(model_dir):
    """Deserialize fitted StandardScaler model"""
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor
    

Overwriting code/preprocess.py


In [10]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

CODE_PREFIX = f'{GRP_NAME}/code/' # S3 prefix to store code
processcode_s3_path = sagemaker_session.upload_data('code/preprocess.py', key_prefix=CODE_PREFIX)
print(processcode_s3_path)

sklearn_framework_version = "1.2-1"
processing_dir = f's3://{bucket}/{GRP_NAME}/processing'

sklearn_processor = SKLearnProcessor(
    framework_version=sklearn_framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=BASE_JOB_PROCESSING_NAME,
    role=role,
    sagemaker_session=pipeline_session
)

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="scaler_model", source="/opt/ml/processing/scaler_model", destination=f"{processing_dir}/scaler_model"),
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=f"{processing_dir}/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=f"{processing_dir}/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=f"{processing_dir}/test"),
    ],
    code=processcode_s3_path,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


s3://sagemaker-us-east-1-843047345337/GRP1/code/preprocess.py




In [11]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name="PreprocessData",
    step_args=processor_args,
)

## Training (XGBoost) + Hyperparameter Tuning

In [12]:
from sagemaker.inputs import TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.workflow.steps import TrainingStep, TuningStep
from sagemaker.tuner import (
    ContinuousParameter,
    IntegerParameter,
    HyperparameterTuner,
)

RANDOM_STATE = 2024

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.7-1",
    py_version="py3",
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=MODEL_OUTPUT_S3_PATH,
    base_job_name=BASE_JOB_TRAINING_NAME,
    sagemaker_session=pipeline_session,
    role=role,
)

xgb_train.set_hyperparameters(
    objective="binary:logistic",
    num_round=100,
    eta=0.2,
    gamma=4,
    reg_lambda=10,
    subsample=0.7,
    verbosity=0
)

hyperparameter_ranges = {
    "min_child_weight": IntegerParameter(5, 10, scaling_type="Logarithmic"),
    "max_depth": IntegerParameter(4, 8, scaling_type="Logarithmic"),
    "alpha": ContinuousParameter(0.01, 10, scaling_type="Logarithmic"),
    "lambda": ContinuousParameter(0.01, 10, scaling_type="Logarithmic")
}

objective_metric_name="validation:auc"
tuner_log = HyperparameterTuner(
    xgb_train,
    objective_metric_name,
    hyperparameter_ranges,
    max_jobs=3,
    max_parallel_jobs=3,
    strategy="Random",
    objective_type="Maximize",
    random_seed=RANDOM_STATE
)

hpo_args = tuner_log.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)


step_train = TuningStep(
    name="TrainHPTuning",
    step_args=hpo_args,
)


## Evaluate model [Best model post tuning]

In [13]:
%%writefile code/evaluate.py

import json
import logging
import math
import pickle
import tarfile
import os

import numpy as np
import pandas as pd
import xgboost
from sklearn.metrics import mean_squared_error, roc_auc_score, f1_score
import pathlib

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

if __name__ == "__main__":
    ## Your code to perform model evaluation on testing dataset, and 
    ## store the evaluation report
    logger.debug("Starting evaluation.")
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    logger.debug("Loading xgboost model.")
    model = xgboost.Booster()
    model.load_model("xgboost-model")
    
    logger.debug("Reading test data.")
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)

    logger.info("Performing predictions against test data.")
    predictions = model.predict(X_test)
    predictions = np.array(predictions, dtype=float) 
    binary_predictions = np.where(predictions > 0.5, 1, 0)
    
    logger.debug("Calculating AUC.")
    auc = roc_auc_score(y_test, predictions)
    f1_score = f1_score(y_test, binary_predictions)
    report_dict = {
        "xgb_metrics": {
            "AUC": {"value": auc}, 
            "f1_score": {"value": f1_score} 
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info("Writing out evaluation report with AUC: %f", auc)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))    


Overwriting code/evaluate.py


In [14]:
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile

MODEL_PREFIX = f'{GRP_NAME}/model'
evalout_s3_path = f"s3://{bucket}/{GRP_NAME}/evaluation"
evalcode_s3_path = sagemaker_session.upload_data('code/evaluate.py', key_prefix=CODE_PREFIX)
print(evalcode_s3_path)

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name=BASE_JOB_EVALUATION_NAME,
    sagemaker_session=pipeline_session,
    role=role,
)

evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.get_top_model_s3_uri(
                top_k=0, 
                s3_bucket=bucket, 
                prefix=MODEL_PREFIX
            ),
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=evalout_s3_path
        ),
    ],
    code=evalcode_s3_path,
)

step_eval = ProcessingStep(
    name="EvaluateModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

s3://sagemaker-us-east-1-843047345337/GRP1/code/evaluate.py


## Model package creation

In [15]:
from sagemaker.xgboost import XGBoostPredictor
from sagemaker.model import Model
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import PipelineModel

# Register model to model registry
# Combine processing and training for model deployment?
scaler_model_s3 = "{}/model.tar.gz".format(
    step_process.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

scaler_model = SKLearnModel(
    model_data=scaler_model_s3,
    role=role,
    sagemaker_session=pipeline_session,
    entry_point="code/preprocess.py",
    framework_version=sklearn_framework_version,
    code_location = MODEL_OUTPUT_S3_PATH    
)

# Define the image URI for XGBoost
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.7-1",  # Specify the appropriate version for XGBoost
    image_scope="inference",
    instance_type="ml.m5.xlarge",
)

xgb_model = Model(
    image_uri=inference_image_uri,
    model_data=step_train.get_top_model_s3_uri(
        top_k=0, 
        s3_bucket=bucket, 
        prefix=MODEL_PREFIX
    ),
    predictor_cls=XGBoostPredictor,
    sagemaker_session=pipeline_session,
    role=role,
)

# 2 containers in this model running in a sequential manner
pipeline_model = PipelineModel(
    models=[scaler_model, xgb_model], 
    role=role, 
    sagemaker_session=pipeline_session
)


INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


## Register Model + Condition step

In [16]:
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

evaluation_s3_uri = "{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)
print(evaluation_s3_uri)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type="application/json",
    )
)

register_args = pipeline_model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=MODEL_PACKAGE_GROUP_NAME,
    model_metrics=model_metrics,
    approval_status=model_approval_status,
)

# Make this available in model registry
step_register_pipeline_model = ModelStep(
    name=PIPELINE_NAME,
    step_args=register_args,
)

cond_lte = ConditionLessThanOrEqualTo(
    right=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="xgb_metrics.AUC.value",
    ),
    left=auc_threshold,
)

step_cond = ConditionStep(
    name="CheckAUC-Evaluation",
    conditions=[cond_lte],
    if_steps=[step_register_pipeline_model],
    else_steps=[],
)



s3://sagemaker-us-east-1-843047345337/GRP1/evaluation/evaluation.json


## SageMaker pipeline

In [17]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

pipeline_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

# Create a Sagemaker Pipeline.
pipeline = Pipeline(
    name=PIPELINE_NAME,
    parameters=[
        training_instance_type,
        processing_instance_type,
        processing_instance_count,
        input_data,
        model_approval_status,
        auc_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    pipeline_definition_config=pipeline_config
)

In [18]:
import json

# JSON definition is a representation of the graph (workflow), submit in sagemaker pipeline
definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'InputData', 'Type': 'String'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'},
  {'Name': 'AUCThreshold', 'Type': 'Float', 'DefaultValue': 0.9}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessData',
   'Type': 'Processing',
   'Arguments': {'ProcessingJobName': 'GRP1-processing',
    'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecific

In [19]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:843047345337:pipeline/GRP1-pipeline',
 'ResponseMetadata': {'RequestId': 'f34087a3-f9aa-4745-9d26-ce075a63963d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f34087a3-f9aa-4745-9d26-ce075a63963d',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '81',
   'date': 'Mon, 01 Jul 2024 04:07:56 GMT'},
  'RetryAttempts': 0}}

In [20]:
execution = pipeline.start(
    parameters=dict(
        InputData=raw_s3_path
    )
)

In [21]:
execution.wait()
execution.list_steps()

[{'StepName': 'GRP1-pipeline-RegisterModel',
  'StartTime': datetime.datetime(2024, 7, 1, 4, 19, 4, 297000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 7, 1, 4, 19, 5, 627000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:843047345337:model-package/GRP1-ModelPackageGroup/1'}},
  'AttemptCount': 1},
 {'StepName': 'CheckAUC-Evaluation',
  'StartTime': datetime.datetime(2024, 7, 1, 4, 19, 3, 325000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 7, 1, 4, 19, 3, 596000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Condition': {'Outcome': 'True'}},
  'AttemptCount': 1},
 {'StepName': 'EvaluateModel',
  'StartTime': datetime.datetime(2024, 7, 1, 4, 16, 29, 413000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 7, 1, 4, 19, 2, 456000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:843047345337:processing-j

## Endpoint deployment + Testing

In [22]:
ENDPOINT_NAME = f'{GRP_NAME}-endpoint-' + time.strftime('%Y-%m-%d-%H-%M-%S', time.gmtime())
print(f'Endpoint Name= {ENDPOINT_NAME}')

Endpoint Name= GRP1-endpoint-2024-07-01-04-19-28


In [23]:
from botocore.exceptions import ClientError

def get_approved_package(model_package_group_name):
    """Gets the latest approved model package for a model package group.

    Args:
        model_package_group_name: The model package group name.

    Returns:
        The SageMaker Model Package ARN.
    """
    try:
        # Get the latest approved model package
        response = sm_client.list_model_packages(
            ModelPackageGroupName=model_package_group_name,
            ModelApprovalStatus="Approved",
            SortBy="CreationTime",
            MaxResults=100,
        )
        approved_packages = response["ModelPackageSummaryList"]

        # Fetch more packages if none returned with continuation token
        while len(approved_packages) == 0 and "NextToken" in response:
            response = sm_client.list_model_packages(
                ModelPackageGroupName=model_package_group_name,
                ModelApprovalStatus="Approved",
                SortBy="CreationTime",
                MaxResults=100,
                NextToken=response["NextToken"],
            )
            approved_packages.extend(response["ModelPackageSummaryList"])

        # Return error if no packages found
        if len(approved_packages) == 0:
            error_message = (
                f"No approved ModelPackage found for ModelPackageGroup: {model_package_group_name}"
            )
            raise Exception(error_message)

        # Return the pmodel package arn
        model_package_arn = approved_packages[0]["ModelPackageArn"]
        return approved_packages[0]
        # return model_package_arn
    except ClientError as e:
        error_message = e.response["Error"]["Message"]
        raise Exception(error_message)

In [24]:
sm_client = boto3.client("sagemaker")

pck = get_approved_package(
    MODEL_PACKAGE_GROUP_NAME
)
model_description = sm_client.describe_model_package(ModelPackageName=pck["ModelPackageArn"])

model_description

{'ModelPackageGroupName': 'GRP1-ModelPackageGroup',
 'ModelPackageVersion': 1,
 'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:843047345337:model-package/GRP1-ModelPackageGroup/1',
 'CreationTime': datetime.datetime(2024, 7, 1, 4, 19, 5, 485000, tzinfo=tzlocal()),
 'InferenceSpecification': {'Containers': [{'Image': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3',
    'ImageDigest': 'sha256:ed242e33af079f334972acd2a7ddf74d13310d3c9a0ef3a0e9b0429ccc104dcd',
    'ModelDataUrl': 's3://sagemaker-us-east-1-843047345337/GRP1/processing/scaler_model/model.tar.gz',
    'Environment': {'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
     'SAGEMAKER_PROGRAM': 'preprocess.py',
     'SAGEMAKER_REGION': 'us-east-1',
     'SAGEMAKER_SUBMIT_DIRECTORY': 's3://sagemaker-us-east-1-843047345337/GRP1/model/sagemaker-scikit-learn-2024-07-01-04-07-55-170/sourcedir.tar.gz'}},
   {'Image': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.7-1',
    'ImageDigest': 's

Deploy the model from the Model Registry into an endpoint.

In [25]:
from sagemaker import ModelPackage

model_package_arn = pck["ModelPackageArn"]
model = ModelPackage(
    role=role, model_package_arn=model_package_arn, sagemaker_session=sagemaker_session
)

print("EndpointName= {}".format(ENDPOINT_NAME))
model.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge", endpoint_name=ENDPOINT_NAME)

INFO:sagemaker:Creating model with name: GRP1-ModelPackageGroup-2024-07-01-04-19-29-321


EndpointName= GRP1-endpoint-2024-07-01-04-19-28


INFO:sagemaker:Creating endpoint-config with name GRP1-endpoint-2024-07-01-04-19-28
INFO:sagemaker:Creating endpoint with name GRP1-endpoint-2024-07-01-04-19-28


--------!

In [26]:
response = sm_client.describe_endpoint(EndpointName=ENDPOINT_NAME)
status = response['EndpointStatus']
print("Status: " + status)

Status: InService


In [27]:
# AutoScaling client
asg = boto3.client('application-autoscaling')

# Resource type is variant and the unique identifier is the resource ID.
EndPoint_id = f'endpoint/{ENDPOINT_NAME}/variant/AllTraffic' # S3 prefix to store model
print(EndPoint_id)

# scaling configuration
response = asg.register_scalable_target(
    ServiceNamespace='sagemaker', #
    ResourceId=EndPoint_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount', 
    MinCapacity=2,
    MaxCapacity=4
)

#Target Scaling
response = asg.put_scaling_policy(
    PolicyName=f'Request-ScalingPolicy-{ENDPOINT_NAME}',
    ServiceNamespace='sagemaker',
    ResourceId=EndPoint_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    PolicyType='TargetTrackingScaling',
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 10.0, # Threshold
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance',
        },
        'ScaleInCooldown': 300, # duration until scale in
        'ScaleOutCooldown': 60 # duration between scale out
    }
)

endpoint/GRP1-endpoint-2024-07-01-04-19-28/variant/AllTraffic


After the model is deployed, we use some data points from our raw dataset for testing.

In [28]:
import json

smr = boto3.client('sagemaker-runtime')

url1 = "https://www.smu.edu.sg"
url2 = "https://www.youtube.com/watch?v=jgpRAiar2LQ"

payload = json.dumps({"URL": url1})
resp = smr.invoke_endpoint(EndpointName=ENDPOINT_NAME, Body=payload, ContentType='application/json')
print(resp['Body'].read().decode('utf-8'))
                           

0.9960029721260071



In [29]:
import pandas as pd
from sagemaker.predictor import Predictor

predictor = Predictor(endpoint_name=ENDPOINT_NAME)

df = pd.read_csv(DATASET_FILE)
label_data = df['label']
feature_data = df.drop('label', axis=1, inplace=False)

start_row = 800
pred_count = 10
payload = feature_data.iloc[start_row:start_row+pred_count].to_csv(header=True, index=False)
p = predictor.predict(payload, initial_args={"ContentType": "text/csv"})
print(p.decode("utf-8"))

0.9712852239608765
0.00017983945144806057
0.9798189401626587
0.9846684336662292
0.01017708145081997
0.025043800473213196
0.95907062292099
0.0002088078181259334
0.980344295501709
0.00024241443315986544



In [30]:
predictions = p.decode("utf-8").split('\n')
for i in range(pred_count):
    print(
        f"Predicted: {predictions[i]} and Actual is: {label_data.iloc[start_row+i]}"
    )

Predicted: 0.9712852239608765 and Actual is: 1
Predicted: 0.00017983945144806057 and Actual is: 0
Predicted: 0.9798189401626587 and Actual is: 1
Predicted: 0.9846684336662292 and Actual is: 1
Predicted: 0.01017708145081997 and Actual is: 0
Predicted: 0.025043800473213196 and Actual is: 0
Predicted: 0.95907062292099 and Actual is: 1
Predicted: 0.0002088078181259334 and Actual is: 0
Predicted: 0.980344295501709 and Actual is: 1
Predicted: 0.00024241443315986544 and Actual is: 0


In [31]:
response = sm_client.describe_endpoint(EndpointName=ENDPOINT_NAME)
status = response['EndpointStatus']
print("Status: " + status)

while status=='Updating':
    time.sleep(1)
    response = sm_client.describe_endpoint(EndpointName=ENDPOINT_NAME)
    status = response['EndpointStatus']
    instance_count = response['ProductionVariants'][0]['CurrentInstanceCount']
    print(f"Status: {status}")
    print(f"Current Instance count: {instance_count}")

Status: Updating
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance cou

## Clean-up
- Delete model package
- Delete endpoint
- Delete pipeline

In [32]:
sm_client = boto3.client("sagemaker")

for d in sm_client.list_model_packages(ModelPackageGroupName=MODEL_PACKAGE_GROUP_NAME)[
    "ModelPackageSummaryList"
]:
    print(d["ModelPackageArn"])
    sm_client.delete_model_package(ModelPackageName=d["ModelPackageArn"])

sm_client.delete_model_package_group(ModelPackageGroupName=MODEL_PACKAGE_GROUP_NAME)
predictor.delete_endpoint()
pipeline.delete()

arn:aws:sagemaker:us-east-1:843047345337:model-package/GRP1-ModelPackageGroup/1


INFO:sagemaker:Deleting endpoint configuration with name: GRP1-endpoint-2024-07-01-04-19-28
INFO:sagemaker:Deleting endpoint with name: GRP1-endpoint-2024-07-01-04-19-28
INFO:sagemaker.workflow.pipeline:If triggers have been setup for this target, they will become orphaned.You will need to clean them up manually via the CLI or EventBridge console.


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:843047345337:pipeline/GRP1-pipeline',
 'ResponseMetadata': {'RequestId': 'c5b80ef3-34a7-475b-a984-d24d4836f0fd',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c5b80ef3-34a7-475b-a984-d24d4836f0fd',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '81',
   'date': 'Mon, 01 Jul 2024 04:28:14 GMT'},
  'RetryAttempts': 0}}