# Setup

In [84]:
# Autoreload
%load_ext autoreload
%autoreload 2

# Load env variables
%load_ext dotenv
%dotenv

# System Libraries
import json
import logging
import sys
from pathlib import Path
import os
import warnings

# Tests
import ipytest

# Create folders for temporal scripts
CODE_FOLDER = Path("code_scripts")
sys.path.extend([f"./{CODE_FOLDER}"])

# Data
DATA_FILEPATH = "../data/clean/clean-stroke-data.csv"

ipytest.autoconfig(raise_on_error=True)

# Change the default logging level form INFO (verbose) to ERROR instead.
logging.getLogger("sagemaker.config").setLevel(logging.ERROR)

# Ignore warnings
warnings.filterwarnings("ignore")

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [85]:
DATA_FILEPATH = "../data/clean/clean-stroke-data.csv"

In [86]:
# Environmental variables

bucket = os.environ["BUCKET"]
role = os.environ["ROLE"]

COMET_API_KEY = os.environ.get("COMET_API_KEY", None)
COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME", None)

# Configure Pipeline

In [87]:
# True for local pipeline and False to run in SageMaker

LOCAL_MODE = False

In [88]:
import sagemaker
from sagemaker.workflow.pipeline_context import LocalPipelineSession, PipelineSession

pipeline_session = PipelineSession(default_bucket=bucket) if not LOCAL_MODE else None

if LOCAL_MODE:
    config = {
        "session": LocalPipelineSession(default_bucket=bucket),
        "instance_type": "local",
        "image": None,
    }
    
else:
    config = {
        "session": pipeline_session,
        "instance_type": "ml.m5.xlarge",
        "image": None,
    }

# Initialize Session

In [89]:
import boto3

S3_LOCATION = f"s3://{bucket}/stroke"

sagemaker_session = sagemaker.session.Session()
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
region = boto3.Session().region_name

In [90]:
# # Create the bucket
# s3 = boto3.client('s3')
# s3.create_bucket(Bucket=bucket,
#                  CreateBucketConfiguration={
#                      'LocationConstraint': region,})rn

# print(f"Bucket {bucket} created successfully!")

# Data Ingestion

In [91]:
import numpy as np
import pandas as pd

clean_df = pd.read_csv(DATA_FILEPATH)
clean_df.head()

Unnamed: 0,gender,age,hypertension,heart_disease,ever_married,work_type,Residence_type,avg_glucose_level,bmi,smoking_status,stroke
0,Male,67.0,0,1,Yes,Private,Urban,228.69,36.6,formerly smoked,1
1,Male,80.0,0,1,Yes,Private,Rural,105.92,32.5,never smoked,1
2,Female,49.0,0,0,Yes,Private,Urban,171.23,34.4,smokes,1
3,Female,79.0,1,0,Yes,Self-employed,Rural,174.12,24.0,never smoked,1
4,Male,81.0,0,0,Yes,Private,Urban,186.21,29.0,formerly smoked,1


In [92]:
## Upload data to bucket 

# s3 = boto3.client('s3')

# s3.upload_file(DATA_FILEPATH, bucket, 'stroke/data/data.csv')

# Splitting and Transforming Data

## Preprocessing Script

In [93]:
(CODE_FOLDER / "processing").mkdir(parents=True, exist_ok=True)

sys.path.extend([f"./{CODE_FOLDER}/processing"])

In [94]:
%%writefile {CODE_FOLDER}/processing/script.py
# | filename: script.py
# | code-line-numbers: true

import os
import tarfile
import tempfile
from pathlib import Path

import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import OneHotEncoder


def preprocess(base_directory):
    """
    Load, split, and transform the data.
    """
    
    df = _read_data_from_input_csv_files(base_directory)
        
    X_cat = df.drop(columns = ["stroke"]).select_dtypes(include=['object'])
    cat_features = X_cat.columns.to_list()

    categorical_transformer = make_pipeline(
                                    OneHotEncoder())
    
    features_transformer = ColumnTransformer(
                                    transformers=[
                                        ("categorical", 
                                        categorical_transformer, 
                                        cat_features),
                                    ],
                                    remainder='passthrough')
    

    
    df_train, df_validation, df_test = _split_data(df)
    
    _save_train_baseline(base_directory, df_train)
    _save_test_baseline(base_directory, df_test)

    y_test = df_test.pop("stroke").values
    y_train = df_train.pop("stroke").values
    y_validation = df_validation.pop("stroke").values

    X_train = features_transformer.fit_transform(df_train)  
    X_validation = features_transformer.transform(df_validation) 
    X_test = features_transformer.transform(df_test)  

    feature_names = features_transformer.get_feature_names_out().tolist()

    _save_splits(
            base_directory,
            X_train,
            y_train,
            X_validation,
            y_validation,
            X_test,
            y_test,
            feature_names)
    
    _save_model(base_directory, features_transformer)
    

def _read_data_from_input_csv_files(base_directory):
    """
    Read the data from the input CSV files.

    This function reads every CSV file available and
    concatenates them into a single dataframe.
    """
    
    input_directory = Path(base_directory) / "input"
    files = list(input_directory.glob("*.csv"))

    if len(files) == 0:
        message = f"The are no CSV files in {input_directory.as_posix()}/"
        raise ValueError(message)

    raw_data = [pd.read_csv(file) for file in files]
    df = pd.concat(raw_data)

    return df.sample(frac=1, random_state=42)   


def _split_data(df):
    """
    Split the data into train, validation, and test.
    """
    
    stratify_column = df['stroke']
    df_train, temp = train_test_split(df, test_size=0.3, stratify=stratify_column, random_state=42)
    df_validation, df_test = train_test_split(temp, test_size=0.5, stratify=temp['stroke'], random_state=42)

    return df_train, df_validation, df_test


def _save_train_baseline(base_directory, df_train):
    """
    Save the untransformed training data to disk.
    Determines the baseline for the model    
    """
    
    baseline_path = Path(base_directory) / "train-baseline"
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = df_train.copy()

    df = df.drop("stroke", axis=1)

    df.to_csv(baseline_path / "train-baseline.csv", header=True, index=False)

    
def _save_test_baseline(base_directory, df_test):
    """Save the untransformed test data to disk.

    We will need the test data to compute a baseline to
    determine the quality of the model predictions when deployed.
    """
    
    baseline_path = Path(base_directory) / "test-baseline"
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = df_test.copy()

    df.to_csv(baseline_path / "test-baseline.csv", header=False, index=False)


def _save_splits(base_directory,
                 X_train,
                 y_train,
                 X_validation,  
                 y_validation,
                 X_test,
                 y_test,
                 feature_names):
    
    """Save data splits to disk.

    This function concatenates the transformed features
    and the target variable, and saves each one of the split
    sets to disk.
    """
    
    train = np.concatenate((X_train, y_train.reshape(-1, 1)), axis=1)
    validation = np.concatenate((X_validation, y_validation.reshape(-1, 1)), axis=1)
    test = np.concatenate((X_test, y_test.reshape(-1, 1)), axis=1)
    
    train_path = Path(base_directory) / "train"
    validation_path = Path(base_directory) / "validation"
    test_path = Path(base_directory) / "test"

    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)
      
    pd.DataFrame(train, columns=feature_names + ['stroke']).to_csv(train_path / "train.csv", header=True, index=False)
    pd.DataFrame(validation, columns=feature_names + ['stroke']).to_csv(validation_path / "validation.csv", header=True, index=False)
    pd.DataFrame(test, columns=feature_names + ['stroke']).to_csv(test_path / "test.csv", header=True, index=False)



def _save_model(base_directory, features_transformer):
    """Save the Scikit-Learn transformation pipelines.

    This function creates a model.tar.gz file that
    contains the two transformation pipelines we built
    to transform the data.
    """
    
    with tempfile.TemporaryDirectory() as directory:
        joblib.dump(features_transformer, Path(directory) / "features.joblib")

        model_path = Path(base_directory) / "model"
        model_path.mkdir(parents=True, exist_ok=True)

        with tarfile.open(f"{(model_path / 'model.tar.gz').as_posix()}", "w:gz") as tar:
            tar.add(Path(directory) / "features.joblib", arcname="features.joblib",)


if __name__ == "__main__":
    preprocess(base_directory="/opt/ml/processing")

Overwriting code_scripts/processing/script.py


## Test Preprocessing Script

In [95]:
%%ipytest -s
# | code-fold: true

import os
import shutil
import tarfile
import tempfile

import pytest
from code_scripts.processing.script import preprocess


@pytest.fixture(autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILEPATH, input_directory / "data.csv")

    directory = Path(directory)
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=DeprecationWarning)
        preprocess(base_directory=directory)

    yield directory

    shutil.rmtree(directory)

def test_preprocess_directories(directory):
    assert (directory / "train").exists()
    assert (directory / "test").exists()
    assert (directory / "train-baseline").exists()
    assert (directory / "test-baseline").exists()
    assert (directory / "model" / "model.tar.gz").exists()
    assert (directory / "train" / "train.csv").stat().st_size > 0
    assert (directory / "test" / "test.csv").stat().st_size > 0
    assert (directory / "train-baseline" / "train-baseline.csv").stat().st_size > 0
    assert (directory / "test-baseline" / "test-baseline.csv").stat().st_size > 0

    # Check if the model file is created
    assert (directory / "model" / "model.tar.gz").stat().st_size > 0

def test_preprocess_generates_data_splits(directory):
    output_directories = os.listdir(directory)

    assert "train" in output_directories
    assert "validation" in output_directories
    assert "test" in output_directories


def test_splits_are_transformed(directory):
    train = pd.read_csv(directory / "train" / "train.csv", header=None)
    validation = pd.read_csv(directory / "validation" / "validation.csv", header=None)
    test = pd.read_csv(directory / "test" / "test.csv", header=None)

    ## After transforming the data, the number of features should be 20:
    
    # * 2 - gender (one-hot encoded)
    # * 1 - age
    # * 1 - hypertension
    # * 1 - heart_disease
    # * 2 - ever_married (one-hot encoded)
    # * 5 - work_type (one-hot encoded)
    # * 2 - Residence_type (one-hot encoded)
    # * 1 - avg_glucose_level
    # * 1 - bmi
    # * 4 - smoking_status (one-hot encoded)

    number_of_features = 20

    # The transformed splits should have an additional column for the target variable.
    assert train.shape[1] == number_of_features + 1
    assert validation.shape[1] == number_of_features + 1
    assert test.shape[1] == number_of_features + 1


def test_train_baseline_is_not_transformed(directory):
    baseline = pd.read_csv(
                    directory / "train-baseline" / "train-baseline.csv",
                    header=None)
    
    gender = baseline.iloc[:, 0].unique()

    assert "Male" in gender
    assert "Female" in gender
   
def test_test_baseline_is_not_transformed(directory):
    baseline = pd.read_csv(directory / "test-baseline" / "test-baseline.csv",
                           header=None)
     
    gender = baseline.iloc[:, 0].unique()

    assert "Male" in gender
    assert "Female" in gender

def test_train_baseline_includes_header(directory):
    baseline = pd.read_csv(directory / "train-baseline" / "train-baseline.csv")
    assert baseline.columns[-1] == "smoking_status"

def test_test_baseline_does_not_include_header(directory):
    baseline = pd.read_csv(directory / "test-baseline" / "test-baseline.csv")
    assert baseline.columns[-1] != "smoking_status"

[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m
[32m[32m[1m7 passed[0m[32m in 1.28s[0m[0m


## Caching 

In [96]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="15d")

## Pipeline Configuration

In [97]:
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

dataset_location = ParameterString(
    name="dataset_location",
    default_value=f"{S3_LOCATION}/data",
)

## Processing Step

In [98]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    base_job_name="preprocess-data",
    framework_version="1.2-1",
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [99]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

preprocessing_step = ProcessingStep(
    name="preprocess-data",
    step_args=processor.run(
        code=f"{(CODE_FOLDER / 'processing' / 'script.py').as_posix()}",
        inputs=[
            ProcessingInput(
                source=dataset_location,
                destination="/opt/ml/processing/input",
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="train",
                source="/opt/ml/processing/train", # Transformed
                destination=f"{S3_LOCATION}/preprocessing/train",
            ),
            ProcessingOutput(
                output_name="validation",
                source="/opt/ml/processing/validation",
                destination=f"{S3_LOCATION}/preprocessing/validation",
            ),
            ProcessingOutput(
                output_name="test",
                source="/opt/ml/processing/test", # Transformed
                destination=f"{S3_LOCATION}/preprocessing/test",
            ),
            ProcessingOutput(
                output_name="model",
                source="/opt/ml/processing/model",
                destination=f"{S3_LOCATION}/preprocessing/model",
            ),
            ProcessingOutput(
                output_name="train-baseline",
                source="/opt/ml/processing/train-baseline",
                destination=f"{S3_LOCATION}/preprocessing/train-baseline",
            ),
            ProcessingOutput(
                output_name="test-baseline",
                source="/opt/ml/processing/test-baseline",
                destination=f"{S3_LOCATION}/preprocessing/test-baseline",
            ),
        ],
    ),
    cache_config=cache_config,
)

## Preprocessing Pipeline

In [100]:
from sagemaker.workflow.pipeline import Pipeline

preprocessing_pipeline = Pipeline(
    name="preprocessing-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"])

preprocessing_pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:730335307143:pipeline/preprocessing-pipeline',
 'ResponseMetadata': {'RequestId': 'd41be779-9f32-4521-8206-5b208917ea68',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd41be779-9f32-4521-8206-5b208917ea68',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '93',
   'date': 'Tue, 04 Jun 2024 15:24:37 GMT'},
  'RetryAttempts': 0}}

In [101]:
# # %%script false --no-raise-error
# # | eval: false

# preprocessing_pipeline.start()

# Training the Model

## Training Script

In [102]:
(CODE_FOLDER / "training").mkdir(parents=True, exist_ok=True)

sys.path.extend([f"./{CODE_FOLDER}/training"])

In [103]:
%%writefile {CODE_FOLDER}/training/script.py
# | filename: script.py
# | code-line-numbers: true

import argparse
import json
import os
import tarfile

from pathlib import Path
from comet_ml import Experiment

import pandas as pd
import xgboost as xgb
from packaging import version
from sklearn.metrics import roc_auc_score, f1_score, recall_score, precision_score

def train(
    model_directory,
    train_path,
    validation_path,
    pipeline_path,
    experiment,
    eta=0.3):
    """
    Train the model, generate metrics, log in comet and save the model
    """

    X_train = pd.read_csv(Path(train_path) / "train.csv")
    y_train = X_train[X_train.columns[-1]]
    X_train = X_train.drop(X_train.columns[-1], axis=1)
        
    X_validation = pd.read_csv(Path(validation_path) / "validation.csv")
    y_validation = X_validation[X_validation.columns[-1]]
    X_validation = X_validation.drop(X_validation.columns[-1], axis=1)
    
    
    model = xgb.XGBClassifier(objective='binary:logistic', 
                              eval_metric='auc', 
                              eta=eta,
                              nthread=1)
                               
    model.fit(X_train, 
              y_train,
              eval_set=[(X_validation, y_validation)],
              early_stopping_rounds=10
              )
    
    # Predictions
    predictions = model.predict(X_validation)
    y_pred_proba = model.predict_proba(X_validation)[:, 1]

    # Evaluate the model 
    roc_auc= roc_auc_score(y_validation, y_pred_proba)
    precision = precision_score(y_validation, predictions)
    recall = recall_score(y_validation, predictions)
    f1 = f1_score(y_validation, predictions)

    print(f'auc: {roc_auc}')  
    print(f'precision: {precision}')  
    print(f'recall: {recall}')  
    print(f'f1: {f1}')  

    # SAVE MODEL AS BOOSTER
    booster = model.get_booster()
    model_filepath = Path(model_directory) / "xgbclass"
    booster.save_model(model_filepath)
    # booster.save_model(model_filepath.as_posix())
    
    # # # SAVE MODEL AS TXT
    # model_filepath = Path(model_directory) / "model.txt"
    # model.save_model(model_filepath)

    # # # # SAVE MODEL AS PICKLE
    # import pickle
    # model_filepath = Path(model_directory) / "model.pkl"
    # with open(model_filepath, 'wb') as f:
    #     # pickle.dump(model.get_booster(), f)        
    #     pickle.dump(model, f)


  
    # Bundle transformation pipelines with model
    with tarfile.open(Path(pipeline_path) / "model.tar.gz", "r:gz") as tar:
        tar.extractall(model_directory)

    if experiment:
        experiment.log_parameters(
            {
                "eta": eta,
            })

        experiment.log_dataset_hash(X_train)
        
        experiment.log_confusion_matrix(
                                y_validation.astype(int), predictions.astype(int))
        
        experiment.log_model("stroke", model_filepath.as_posix())
        experiment.log_metric("roc_auc", roc_auc)
        experiment.log_metric("precision", precision)
        experiment.log_metric("recall", recall)
        experiment.log_metric("f1", f1)

        
if __name__ == "__main__":
    
    # Hyperparameters
    parser = argparse.ArgumentParser()
    parser.add_argument("--eta", type=float, default=0.3)
    args, _ = parser.parse_known_args()

    # Create a Comet experiment to log the metrics and parameters
    comet_api_key = os.environ.get("COMET_API_KEY", None)
    comet_project_name = os.environ.get("COMET_PROJECT_NAME", None)

    experiment = (
        Experiment(
            project_name=comet_project_name,
            api_key=comet_api_key,
            auto_metric_logging=True,
            auto_param_logging=True,
            log_code=True,
        )
        if comet_api_key and comet_project_name
        else None)

    training_env = json.loads(os.environ.get("SM_TRAINING_ENV", "{}"))
    job_name = training_env.get("job_name", None) if training_env else None

    
    # SageMaker's training job name = experiment name
    if job_name and experiment:
        experiment.set_name(job_name)

    # SageMaker will create a model.tar.gz file with anything
    # inside this directory when the training script finishes.
    # SageMaker creates one channel for each one of the inputs to the Training Step.
    train(model_directory=os.environ["SM_MODEL_DIR"], 
          train_path=os.environ["SM_CHANNEL_TRAIN"],
          validation_path=os.environ["SM_CHANNEL_VALIDATION"],
          pipeline_path=os.environ["SM_CHANNEL_PIPELINE"],
          experiment=experiment,
          eta=args.eta)

Overwriting code_scripts/training/script.py


## Test Training Script

In [104]:
%%ipytest -s
#| code-fold: true

import os
import shutil
import pytest
import tempfile
from pathlib import Path
import pandas as pd

from code_scripts.processing.script import preprocess
from code_scripts.training.script import train

@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILEPATH, input_directory / "data.csv")
    
    directory = Path(directory)
    # preprocess(base_directory=directory)
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=DeprecationWarning)
        preprocess(base_directory=directory)
                   
    train(model_directory=directory / "model",
          train_path=directory / "train", 
          validation_path=directory / "validation",
          pipeline_path=directory / "model",
          experiment=None,
          eta=0.3)
    
    yield directory
    
    shutil.rmtree(directory)

def test_train_bundles_model_assets(directory):
    bundle = os.listdir(directory / "model")
    assert "xgbclass" in bundle
    assert "features.joblib" in bundle
    # assert "model.txt" in bundle
    # assert "model.pkl" in bundle



[0]	validation_0-auc:0.71030
[1]	validation_0-auc:0.75081
[2]	validation_0-auc:0.73930
[3]	validation_0-auc:0.73361
[4]	validation_0-auc:0.73631
[5]	validation_0-auc:0.74221
[6]	validation_0-auc:0.72924
[7]	validation_0-auc:0.75003
[8]	validation_0-auc:0.75692
[9]	validation_0-auc:0.76124
[10]	validation_0-auc:0.75854
[11]	validation_0-auc:0.75765
[12]	validation_0-auc:0.75880
[13]	validation_0-auc:0.75436
[14]	validation_0-auc:0.75619
[15]	validation_0-auc:0.75381
[16]	validation_0-auc:0.75331
[17]	validation_0-auc:0.75299
[18]	validation_0-auc:0.75235
auc: 0.7612445664607641
precision: 0.0
recall: 0.0
f1: 0.0
[32m.[0m
    if is_sparse(dtype):

    elif is_categorical_dtype(dtype) and enable_categorical:

    if is_categorical_dtype(dtype)

    return is_int or is_bool or is_float or is_categorical_dtype(dtype)

t_4eef12270e334cb2ae7b175e9128d3df.py::test_train_bundles_model_assets
t_4eef12270e334cb2ae7b175e9128d3df.py::test_train_bundles_model_assets
    if is_sparse(data):

t_4eef

## Training Step

In [105]:
%%writefile {CODE_FOLDER}/training/requirements.txt
#| label: requirements.txt
#| filename: requirements.txt
#| code-line-numbers: false

comet_ml
urllib3==1.26.5
pandas
scikit-learn==1.2.1
urllib3==1.26.5
xgboost==1.7.1

Overwriting code_scripts/training/requirements.txt


In [106]:
from sagemaker.xgboost.estimator import XGBoost

estimator = XGBoost(base_job_name="training",
                    entry_point = "script.py", 
                    framework_version='1.7-1', 
                    source_dir=f"{(CODE_FOLDER / 'training').as_posix()}",
                    hyperparameters={
                            "eta": 0.3,
                        },
                    environment={
                        "COMET_API_KEY": COMET_API_KEY,
                        "COMET_PROJECT_NAME": COMET_PROJECT_NAME,
                    },
                    # py_version=config["py_version"],
                    instance_type=config["instance_type"],
                    sagemaker_session=config["session"],
               

                    role=role,
                    instance_count=1
                    )


INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


In [107]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

def create_training_step(estimator):
    """Create a SageMaker TrainingStep using the provided estimator."""
    return TrainingStep(
        name="train-model",
        step_args=estimator.fit(
            inputs={
                "train": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                        "train"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                ),
                "validation": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                        "validation"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                ),
                "pipeline": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                        "model"
                    ].S3Output.S3Uri,
                    content_type="application/tar+gzip",
                ),
            },
        ),
        cache_config=cache_config,
    )


train_model_step = create_training_step(estimator)

## Training Pipeline

In [108]:
training_pipeline = Pipeline(
    name="training-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
        train_model_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"])

training_pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:730335307143:pipeline/training-pipeline',
 'ResponseMetadata': {'RequestId': '615fb08b-2840-4272-8e38-57688e3d5ad1',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '615fb08b-2840-4272-8e38-57688e3d5ad1',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Tue, 04 Jun 2024 15:24:41 GMT'},
  'RetryAttempts': 0}}

In [109]:
# # %%script false --no-raise-error
# # | eval: false

# training_pipeline.start()

# Training Container

## Docker Image

In [110]:
import shutil

(CODE_FOLDER / "containers" / "training").mkdir(parents=True, exist_ok=True)
shutil.copy2(
    CODE_FOLDER / "training" / "script.py",
    CODE_FOLDER / "containers" / "training" / "train.py",
)

WindowsPath('code_scripts/containers/training/train.py')

In [111]:
%%writefile {CODE_FOLDER}/containers/training/requirements.txt
# | filename: requirements.txt
# | code-line-numbers: true

sagemaker-training
packaging
pandas
scikit-learn==1.2.1
comet_ml
urllib3==1.26.5
xgboost==1.7.1

Overwriting code_scripts/containers/training/requirements.txt


In [112]:
%%writefile {CODE_FOLDER}/containers/training/Dockerfile
# | filename: Dockerfile
# | code-line-numbers: true

FROM python:3.10-slim

RUN apt-get -y update && apt-get install -y --no-install-recommends \
    python3 \
    build-essential libssl-dev pkg-config libhdf5-dev

# Let's install the required Python packages from 
# the requirements.txt file.
COPY requirements.txt .
RUN pip install --user --upgrade pip
RUN pip3 install -r requirements.txt

# We are going to be running the training script
# as the entrypoint of this container.
COPY train.py /opt/ml/code/train.py
ENV SAGEMAKER_PROGRAM train.py


Overwriting code_scripts/containers/training/Dockerfile


## Building the Docker Image

In [113]:
# IMAGE_NAME = "training-container"

# if not LOCAL_MODE:
#     # If we aren't running the code in Local Mode, we need
#     # to specify we want to build the Docker image for the
#     # linux/amd64 architecture before uploading it to ECR.
#     print("Building Docker image for linux/amd64 architecture...")

#     !docker build --platform="linux/amd64" -t $IMAGE_NAME \
#         $CODE_FOLDER/containers/training/
# else:
#     # If we are running in Local Mode, we can use the
#     # default Docker build command.
#     print("Building Docker image for arm64 architecture...")

#     !docker build -t $IMAGE_NAME \
#         $CODE_FOLDER/containers/training/

## Pushing Docker Image to ECR

Run in the terminal the following to push the image: 

./src/image_script.sh "False" "training-container"

## Training Step

In [114]:
IMAGE_NAME = "training-container"

account_id = boto3.client("sts").get_caller_identity().get("Account")
tag = ":latest"

training_container_image = (
    IMAGE_NAME
    if LOCAL_MODE
    else (f"{account_id}.dkr.ecr.{region}.amazonaws.com/{IMAGE_NAME}:latest")
)

training_container_image

'730335307143.dkr.ecr.eu-central-1.amazonaws.com/training-container:latest'

In [115]:
from sagemaker.estimator import Estimator

xgb_docker_estimator = Estimator(
    image_uri=training_container_image,
    hyperparameters={
        "eta": 0.3,
    },
    environment={
        "COMET_API_KEY": COMET_API_KEY,
        "COMET_PROJECT_NAME": COMET_PROJECT_NAME,
    },
    instance_count=1,
    instance_type=config["instance_type"],
    sagemaker_session=config["session"],
    role=role,
)

xgb_docker_train_model_step = create_training_step(xgb_docker_estimator)

## Training Pipeline

In [116]:
training_docker_pipeline = Pipeline(
    name="training-docker-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
        # This time we want to use the new training step
        # we created using the custom Docker image.
        xgb_docker_train_model_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

training_docker_pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:730335307143:pipeline/training-docker-pipeline',
 'ResponseMetadata': {'RequestId': '34a74205-cffd-4241-9e66-9ee7b607c15c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '34a74205-cffd-4241-9e66-9ee7b607c15c',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '95',
   'date': 'Tue, 04 Jun 2024 15:24:44 GMT'},
  'RetryAttempts': 0}}

In [117]:
# # %%script false --no-raise-error
# # | eval: false

# training_docker_pipeline.start()

# Tuning the Model

##  Tuning Step

In [118]:
USE_TUNING_STEP = True

In [119]:
LOCAL_MODE # Local mode must be false to use tuning step

False

In [120]:
from sagemaker.parameter import ContinuousParameter
from sagemaker.tuner import HyperparameterTuner

tuner = HyperparameterTuner(
    estimator,
    objective_metric_name="validation:auc",
    objective_type="Maximize",
    hyperparameter_ranges={
        "eta": ContinuousParameter(0.05, 0.5),
    },
    metric_definitions=[
        {"Name": "validation:auc", 
         "Regex": "auc: ([0-9\\.]+)"
        }
    ],
    max_jobs=3,
    max_parallel_jobs=3)

In [121]:
from sagemaker.workflow.steps import TuningStep

tune_model_step = TuningStep(
    name="tune-model",
    step_args=tuner.fit(
        inputs={
            "train": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "pipeline": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "model"
                ].S3Output.S3Uri,
                content_type="application/tar+gzip",
            ),
        },
    ),
    cache_config=cache_config,
)

## Tuning Pipeline

In [122]:
tuning_pipeline = Pipeline(
    name="tuning-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
        tune_model_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

tuning_pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:730335307143:pipeline/tuning-pipeline',
 'ResponseMetadata': {'RequestId': 'b03fd267-0060-4ac5-bd2b-40b6d41b81c7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'b03fd267-0060-4ac5-bd2b-40b6d41b81c7',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Tue, 04 Jun 2024 15:24:47 GMT'},
  'RetryAttempts': 0}}

In [123]:
# # # %%script false --no-raise-error
# # # | eval: false

# tuning_pipeline.start()

# Evaluating the Model

## Evaluation Script

In [124]:
(CODE_FOLDER / "evaluation").mkdir(parents=True, exist_ok=True)

In [125]:
%%writefile {CODE_FOLDER}/evaluation/script.py
# | filename: script.py
# | code-line-numbers: true

import json
import tarfile
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score, precision_score, f1_score, recall_score, roc_curve 
import xgboost as xgb



def evaluate(model_path, test_path, output_path):
    """
    Model loadin, evaluation and generation of metrics report
    """
        
    X_test = pd.read_csv(Path(test_path) / "test.csv")
    y_test = X_test[X_test.columns[-1]]
    X_test = X_test.drop(X_test.columns[-1], axis=1)

    # Extract the model and load it in memory.
    with tarfile.open(Path(model_path) / "model.tar.gz") as tar:
        tar.extractall(path=Path(model_path))
        
    # # LOAD AS TXT
    # model = xgb.XGBClassifier()
    # model.load_model(Path(model_path) / "model.txt") 
    # # predictions = np.argmax(model.predict(X_test), axis=-1)
    # predictions = model.predict(X_test)
    # # Make predictions for model_logloss
    # y_pred_proba = model.predict_proba(X_test)[:, 1]
    
    
    # LOAD AS BOOSTER
    model_filepath = Path(model_path) / "xgbclass"
    model = xgb.Booster()
    model.load_model(model_filepath)
    # booster.set_param("nthread", 1)
    # model.load_model(model_filepath.as_posix())
    # Make predictions
    dtest = xgb.DMatrix(X_test)
    y_pred_proba = model.predict(dtest)
    
    # # LOAD AS PICKLE BOOSTER
    # import pickle
       
    # model_filepath = Path(model_path) / "model.pkl"
    # with open(model_filepath, 'rb') as f:
    #     model = xgb.Booster()
    #     model = pickle.load(f)
        
    # dtest = xgb.DMatrix(X_test)
    # y_pred_proba = model.predict(dtest)    
 

    # # LOAD AS PICKLE 
    # import pickle
    # model_filepath = Path(model_path) / "model.pkl"

    # with open(model_filepath, 'rb') as f:
    #     model = pickle.load(f)
        
    # predictions = model.predict(X_test)
    # # Make predictions for model_logloss
    # y_pred_proba = model.predict_proba(X_test)[:, 1]  
    
    
    # Evaluate the model with the custom threshold
    roc_auc = roc_auc_score(y_test, y_pred_proba)     
    print(f'auc: {roc_auc}') 
    
    # Calculate ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba)

    # Find the threshold that gives the best trade-off
    optimal_idx = np.argmax(tpr - fpr)
    best_threshold = float(thresholds[optimal_idx])
    
    # Evaluate the model with the custom threshold
    y_pred_custom_threshold = (y_pred_proba >= best_threshold).astype(int)
    precision_custom = precision_score(y_test, y_pred_custom_threshold)
    recall_custom = recall_score(y_test, y_pred_custom_threshold)
    f1_custom = f1_score(y_test, y_pred_custom_threshold)
    
    # Log in the expected format
    print(f'best_threshold: {best_threshold}')  
    print(f'auc: {roc_auc}')  
    print(f'precision (custom threshold): {precision_custom}')  
    print(f'recall (custom threshold): {recall_custom}')  
    print(f'f1 (custom threshold): {f1_custom}')  


    # Eevaluation report
    evaluation_report = {
        "metrics": {
            "auc": {"value": roc_auc},
            "best_threshold": {"value": best_threshold},
            "precision_custom": {"value": precision_custom},
            "recall_custom": {"value": recall_custom},
            "f1_custom": {"value": f1_custom},
        },
    }

    Path(output_path).mkdir(parents=True, exist_ok=True)
    with open(Path(output_path) / "evaluation.json", "w") as f:
        f.write(json.dumps(evaluation_report))


if __name__ == "__main__":
    evaluate(
        model_path="/opt/ml/processing/model/",
        test_path="/opt/ml/processing/test/",
        output_path="/opt/ml/processing/evaluation/",
    )

Overwriting code_scripts/evaluation/script.py


## Test Evaluation Script

In [126]:
%%ipytest -s
# | code-fold: true

import os
import shutil
import tarfile
import pytest
import tempfile
import json
from pathlib import Path

from processing.script import preprocess
from training.script import train
from evaluation.script import evaluate


@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILEPATH, input_directory / "data.csv")

    directory = Path(directory)

    preprocess(base_directory=directory)

   
    train(
        model_directory=directory / "model",
        train_path=directory / "train", 
        validation_path=directory / "validation",
        pipeline_path=directory / "model",
        experiment=None,
        eta=0.3
    )

    # Load the XGBoost model as per Sagemaker requirements
    with tarfile.open(directory / "model.tar.gz", "w:gz") as tar:
        tar.add(directory / "model/xgbclass", arcname="xgbclass")
        # tar.add(directory / "model/model.txt", arcname="model.txt")
        # tar.add(directory / "model/model.pkl", arcname="model.pkl")


    evaluate(
        model_path=directory,
        test_path=directory / "test",
        output_path=directory / "evaluation",
    )

    yield directory / "evaluation"

    shutil.rmtree(directory)


def test_evaluate_generates_evaluation_report(directory):
    output = os.listdir(directory)
    assert "evaluation.json" in output


def test_evaluation_report_contains_auc(directory):
    with open(directory / "evaluation.json", "r") as file:
        report = json.load(file)

    assert "metrics" in report
    assert "auc" in report["metrics"]

[0]	validation_0-auc:0.71030
[1]	validation_0-auc:0.75081
[2]	validation_0-auc:0.73930
[3]	validation_0-auc:0.73361
[4]	validation_0-auc:0.73631
[5]	validation_0-auc:0.74221
[6]	validation_0-auc:0.72924
[7]	validation_0-auc:0.75003
[8]	validation_0-auc:0.75692
[9]	validation_0-auc:0.76124
[10]	validation_0-auc:0.75854
[11]	validation_0-auc:0.75765
[12]	validation_0-auc:0.75880
[13]	validation_0-auc:0.75436
[14]	validation_0-auc:0.75619
[15]	validation_0-auc:0.75381
[16]	validation_0-auc:0.75331
[17]	validation_0-auc:0.75299
[18]	validation_0-auc:0.75235
auc: 0.7612445664607641
precision: 0.0
recall: 0.0
f1: 0.0
auc: 0.8607047872340425
best_threshold: 0.04540661722421646
auc: 0.8607047872340425
precision (custom threshold): 0.14432989690721648
recall (custom threshold): 0.875
f1 (custom threshold): 0.24778761061946902
[32m.[0m[0]	validation_0-auc:0.71030
[1]	validation_0-auc:0.75081
[2]	validation_0-auc:0.73930
[3]	validation_0-auc:0.73361
[4]	validation_0-auc:0.73631
[5]	validation_0

## Get Model Assets

In [127]:
model_assets = train_model_step.properties.ModelArtifacts.S3ModelArtifacts

if USE_TUNING_STEP:
    model_assets = tune_model_step.get_top_model_s3_uri(
        top_k=0,
        s3_bucket=config["session"].default_bucket(),
    )
    
print("Model assets path:", model_assets.expr)

Model assets path: {'Std:Join': {'On': '/', 'Values': ['s3:/', 'stroke-bucket', {'Get': 'Steps.tune-model.TrainingJobSummaries[0].TrainingJobName'}, 'output/model.tar.gz']}}


## Save Output Evaluation Report

In [128]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="evaluation-report",
    output_name="evaluation",
    path="evaluation.json")

##  Evaluation Step

In [129]:
from sagemaker.xgboost import XGBoostProcessor

evaluation_processor = XGBoostProcessor(
    base_job_name="evaluation-processor",
    framework_version='1.7-1',
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"])


INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


In [130]:
evaluate_model_step = ProcessingStep(
    name="evaluate-model",
    step_args=evaluation_processor.run(
        code=f"{(CODE_FOLDER / 'evaluation' / 'script.py').as_posix()}",
        inputs=[
            ProcessingInput(
                source=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test",
            ),

            ProcessingInput(
                source=model_assets,
                destination="/opt/ml/processing/model",
            ),
        ],
        
        outputs=[
            ProcessingOutput(
                output_name="evaluation",
                source="/opt/ml/processing/evaluation",
            ),
        ],
    ),
    property_files=[evaluation_report],
    cache_config=cache_config,
)

##  Evaluation Pipeline

In [131]:
evaluation_pipeline = Pipeline(
    name="evaluation-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
        tune_model_step if USE_TUNING_STEP else train_model_step,
        evaluate_model_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

evaluation_pipeline.upsert(role_arn=role)

INFO:sagemaker.processing:Uploaded None to s3://stroke-bucket/evaluation-pipeline/code/af7bccc9ab7aa123484460dff61a6567/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://stroke-bucket/evaluation-pipeline/code/f3b7867d7495763812a03744135acb08/runproc.sh
INFO:sagemaker.processing:Uploaded None to s3://stroke-bucket/evaluation-pipeline/code/af7bccc9ab7aa123484460dff61a6567/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://stroke-bucket/evaluation-pipeline/code/f3b7867d7495763812a03744135acb08/runproc.sh


{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:730335307143:pipeline/evaluation-pipeline',
 'ResponseMetadata': {'RequestId': 'c0842dfa-7cf4-463a-9084-1af428bd2058',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c0842dfa-7cf4-463a-9084-1af428bd2058',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '90',
   'date': 'Tue, 04 Jun 2024 15:24:51 GMT'},
  'RetryAttempts': 0}}

In [132]:
# # %%script false --no-raise-error
# # | eval: false

# evaluation_pipeline.start()

# Registering the Model

In [133]:
BASIC_MODEL_PACKAGE_GROUP = "basic-stroke"

## Creating the Model

In [134]:
from sagemaker.xgboost.model import XGBoostModel

xgboost_model = XGBoostModel(
    model_data=model_assets,
    role=role,
    sagemaker_session=config["session"],
    framework_version="1.7-1",
)

## Configuring Model Metrics

In [135]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.functions import Join

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            on="/",
            values=[
                evaluate_model_step.properties.ProcessingOutputConfig.Outputs[
                    "evaluation"
                ].S3Output.S3Uri,
                "evaluation.json",
            ],
        ),
        content_type="application/json",
    ),
)

## Registering the Model

In [136]:
from sagemaker.workflow.model_step import ModelStep


def create_registration_step(
    model,
    model_package_group_name,
    approval_status="Approved",
    content_types=["text/csv"],
    response_types=["application/json"],
    model_metrics=None,
    drift_check_baselines=None,
):
    """Create a Registration Step using the supplied parameters."""
    return ModelStep(
        name="register",
        step_args=model.register(
            model_package_group_name=model_package_group_name,
            approval_status=approval_status,
            model_metrics=model_metrics,
            drift_check_baselines=drift_check_baselines,
            content_types=content_types,
            response_types=response_types,
            inference_instances=[config["instance_type"]],
            transform_instances=[config["instance_type"]],
            framework_version='1.7-1',
            domain="MACHINE_LEARNING",
            task="CLASSIFICATION",
            # framework="XGBOOST",
        ),
    )


register_model_step = create_registration_step(
    xgboost_model,
    BASIC_MODEL_PACKAGE_GROUP,
    model_metrics=model_metrics,
)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


## Model Pipeline

In [137]:
register_model_pipeline = Pipeline(
    name="register-model-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
        tune_model_step if USE_TUNING_STEP else train_model_step,
        evaluate_model_step,
        register_model_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

register_model_pipeline.upsert(role_arn=role)

INFO:sagemaker.processing:Uploaded None to s3://stroke-bucket/register-model-pipeline/code/af7bccc9ab7aa123484460dff61a6567/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://stroke-bucket/register-model-pipeline/code/f3b7867d7495763812a03744135acb08/runproc.sh
INFO:sagemaker.processing:Uploaded None to s3://stroke-bucket/register-model-pipeline/code/af7bccc9ab7aa123484460dff61a6567/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://stroke-bucket/register-model-pipeline/code/f3b7867d7495763812a03744135acb08/runproc.sh


{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:730335307143:pipeline/register-model-pipeline',
 'ResponseMetadata': {'RequestId': '86f83a04-b14e-4ea8-a078-ed19010ec727',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '86f83a04-b14e-4ea8-a078-ed19010ec727',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '94',
   'date': 'Tue, 04 Jun 2024 15:24:54 GMT'},
  'RetryAttempts': 0}}

In [138]:
# # # %%script false --no-raise-error
# # # | eval: false

# register_model_pipeline.start()

# Conditional Registration

## Accuracy Threshold

In [139]:
from sagemaker.workflow.parameters import ParameterFloat

# Change the value to the last registered model auc
auc_threshold = ParameterFloat(name="auc_threshold", default_value=0.7)

## Fail Step

In [140]:
from sagemaker.workflow.fail_step import FailStep

fail_step = FailStep(
    name="fail",
    error_message=Join(
        on=" ",
        values=[
            "Execution failed because the model's auc was lower than",
            auc_threshold,
        ],
    ),
)

## Defining the Condition

In [141]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import JsonGet

condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluate_model_step.name,
        property_file=evaluation_report,
        json_path="metrics.auc.value",
    ),
    right=auc_threshold,
)

## Condition Step

In [142]:
from sagemaker.workflow.condition_step import ConditionStep

condition_step = ConditionStep(
    name="check-model-auc",
    conditions=[condition],
    if_steps=[register_model_step],
    else_steps=[fail_step],
)

## Conditional Pipeline

In [143]:
conditional_register_pipeline = Pipeline(
    name="conditional-register-pipeline",
    parameters=[dataset_location, auc_threshold],
    steps=[
        preprocessing_step,
        tune_model_step if USE_TUNING_STEP else train_model_step,
        evaluate_model_step,
        condition_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

conditional_register_pipeline.upsert(role_arn=role)

INFO:sagemaker.processing:Uploaded None to s3://stroke-bucket/conditional-register-pipeline/code/af7bccc9ab7aa123484460dff61a6567/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://stroke-bucket/conditional-register-pipeline/code/f3b7867d7495763812a03744135acb08/runproc.sh
INFO:sagemaker.processing:Uploaded None to s3://stroke-bucket/conditional-register-pipeline/code/af7bccc9ab7aa123484460dff61a6567/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://stroke-bucket/conditional-register-pipeline/code/f3b7867d7495763812a03744135acb08/runproc.sh


{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:730335307143:pipeline/conditional-register-pipeline',
 'ResponseMetadata': {'RequestId': '09d62d2f-9cf0-48d1-a31d-39fd3283c554',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '09d62d2f-9cf0-48d1-a31d-39fd3283c554',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '100',
   'date': 'Tue, 04 Jun 2024 15:24:57 GMT'},
  'RetryAttempts': 0}}

In [144]:
# # # %%script false --no-raise-error
# # # | eval: false

# conditional_register_pipeline.start()

# Serving the Model

## Get Last Model

In [145]:
response = sagemaker_client.list_model_packages(
    ModelPackageGroupName=BASIC_MODEL_PACKAGE_GROUP,
    ModelApprovalStatus="Approved",
    SortBy="CreationTime",
    MaxResults=1,
)

package = (
    response["ModelPackageSummaryList"][0]
    if response["ModelPackageSummaryList"]
    else None
)

package

{'ModelPackageGroupName': 'basic-stroke',
 'ModelPackageVersion': 23,
 'ModelPackageArn': 'arn:aws:sagemaker:eu-central-1:730335307143:model-package/basic-stroke/23',
 'CreationTime': datetime.datetime(2024, 6, 4, 17, 2, 46, 588000, tzinfo=tzlocal()),
 'ModelPackageStatus': 'Completed',
 'ModelApprovalStatus': 'Approved'}

In [146]:
(CODE_FOLDER / "serving").mkdir(parents=True, exist_ok=True)


In [147]:
from sagemaker.s3 import S3Downloader

if package:
    response = sagemaker_client.describe_model_package(
        ModelPackageName=package["ModelPackageArn"],
    )

    model_data = response["InferenceSpecification"]["Containers"][0]["ModelDataUrl"]
    print(model_data)
    
    S3Downloader.download(model_data, (CODE_FOLDER / "serving").as_posix())
    s3_bucket_name = model_data.split('/')[2]
    print(s3_bucket_name)

    # Extract the common identifier from the model path
    common_identifier = model_data.split('/')[-3].split('-')[1]
    print(f'Common Identifier: {common_identifier}')
    
    s3_key_path = f'evaluation-pipeline/{common_identifier}/evaluate-model/output/evaluation/evaluation.json'
    print(s3_key_path)
    s3_evaluation_url = f's3://{s3_bucket_name}/{s3_key_path}'
    S3Downloader.download(s3_evaluation_url, (CODE_FOLDER / "serving").as_posix())



s3://stroke-bucket/sagemake-8t6nz7mobc9n-xy4c0n1hlV-003-2f36bb35/output/model.tar.gz
stroke-bucket
Common Identifier: 8t6nz7mobc9n
evaluation-pipeline/8t6nz7mobc9n/evaluate-model/output/evaluation/evaluation.json


In [148]:
# Function to fetch the optimal threshold from S3
s3_client = boto3.client('s3')

def fetch_auc(bucket, key):
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    report = json.loads(obj['Body'].read().decode('utf-8'))
    return report['metrics']['auc']['value']

AUC = fetch_auc(s3_bucket_name, s3_key_path)

def fetch_optimal_threshold(bucket, key):
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    report = json.loads(obj['Body'].read().decode('utf-8'))
    return report['metrics']['best_threshold']['value']

OPTIMAL_THRESHOLD = fetch_optimal_threshold(s3_bucket_name, s3_key_path)

print(f"OPTIMAL_THRESHOLD: {OPTIMAL_THRESHOLD}")
print(f"AUC: {AUC}")

OPTIMAL_THRESHOLD: 0.019277844578027725
AUC: 0.8577570921985815


## Prediction Script 

In [149]:
%%writefile {CODE_FOLDER}/serving/app.py
# | filename: app.py
# | code-line-numbers: true

import tarfile
import tempfile
import numpy as np
import json
import joblib
import logging


from flask import Flask, request, jsonify
from pathlib import Path
import xgboost as xgb


MODEL_PATH = Path(__file__).parent
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load the optimal threshold from the evaluation report
def load_optimal_threshold():
    with open(MODEL_PATH / "evaluation.json", "r") as f:
        report = json.load(f)
        return report["metrics"]["best_threshold"]["value"]

OPTIMAL_THRESHOLD = load_optimal_threshold()


class Model:
    model = None

    def load(self):
        """
        Extracts the model package and loads the model in memory
        if it hasn't been loaded yet.
        """
        # We want to load the model only if it is not loaded yet.
        if not Model.model:

            # Before we load the model, we need to extract it in
            # a temporal directory.

            with tempfile.TemporaryDirectory() as directory:
                with tarfile.open(MODEL_PATH / "model.tar.gz") as tar:
                    tar.extractall(path=directory)
                
                # model_filepath = Path(directory) / "xgbclass"
                model_filepath = Path(directory) / "xgbclass"

          
                logger.info(f"Loading model from {model_filepath}")                  
                Model.model = xgb.Booster()
                Model.model.load_model(model_filepath)
                logger.info("Model loaded successfully")

                

    def predict(self, data):
        """
        Generates predictions for the supplied data.
        """
        self.load()
        
        # Make predictions
        dtest = xgb.DMatrix(data)

        return Model.model.predict(dtest)


app = Flask(__name__)
model = Model()


@app.route("/predict/", methods=["POST"])
def predict():
    try:
        data = request.data.decode("utf-8").strip().split('\n')
        features = np.array([row.split(",") for row in data]).astype(np.float32)


        # Generate probability predictions
        predictions_proba = model.predict(features)
        
        results = []
        for prediction_proba in predictions_proba:
            # Apply the threshold to get binary predictions
            pred_value = int(prediction_proba >= OPTIMAL_THRESHOLD)
            confidence = float(prediction_proba)
            results.append({"prediction": pred_value, "confidence": confidence})

        return jsonify(results)
    except Exception as e:
        return jsonify({"error": str(e)}), 500



Overwriting code_scripts/serving/app.py


## Getting some predictions

$ flask --app src/code_scripts/serving/app.py --debug run --host=0.0.0.0 --port=4242

One prediction of 1:

```bash
curl --location --request POST 'http://localhost:4242/predict' \
    --header 'Content-Type: text/plain' \
    --data-raw '1,0,0,1,0,0,0,1,0,1,0,0,0,1,0,68,1,0,206.09,26.7'
```

Three prediction of 1:

```bash
curl --location --request POST 'http://localhost:4242/predict' \
    --header 'Content-Type: text/plain' \
    --data-raw $'1,0,0,1,0,0,0,1,0,1,0,0,0,1,0,68,1,0,206.09,26.7\n0,1,1,0,1,0,0,0,0,0,1,0,0,1,0,48,0,0,84.2,29.7\n0,1,0,1,0,0,1,0,0,1,0,0,0,1,0,76,1,0,243.58,33.6'
```

One prediction of 0:

```bash
curl --location --request POST 'http://localhost:4242/predict' \
    --header 'Content-Type: text/plain' \
    --data-raw '1,0,1,0,0,0,1,0,0,1,0,1,0,0,0,21,0,0,112.38,25.8'
```

Three prediction of 0:

```bash
curl --location --request POST 'http://localhost:4242/predict' \
    --header 'Content-Type: text/plain' \
    --data-raw $'1,0,1,0,0,0,1,0,0,1,0,1,0,0,0,21,0,0,112.38,25.8\n0,1,1,0,0,0,0,0,1,0,1,1,0,0,0,16,0,0,102.3,21.9\n0,1,1,0,0,0,0,0,1,0,1,1,0,0,0,15,0,0,116.5,27.8'
```



# Deploying the Model

In [150]:
from sagemaker.predictor import Predictor

ENDPOINT = "model-stroke-endpoint"

## Model Package

In [151]:
from sagemaker import ModelPackage

if package:
    model_package = ModelPackage(
        model_package_arn=package["ModelPackageArn"],
        sagemaker_session=sagemaker_session,
        role=role,
    )

    print(package["ModelPackageArn"])

arn:aws:sagemaker:eu-central-1:730335307143:model-package/basic-stroke/23


## Deploying the Model

In [152]:
# model_package.deploy(
#     endpoint_name=ENDPOINT,
#     initial_instance_count=1,
#     instance_type=config["instance_type"],
# )

In [153]:
payload = """
1,0,0,1,0,0,0,1,0,1,0,0,0,1,0,68,1,0,206.09,26.7
"""

In [154]:
# import json
# import numpy as np
# from sagemaker.predictor import Predictor

# # Initialize the predictor
# predictor = Predictor(endpoint_name=ENDPOINT)

# # Define the payload
# payload = "1,0,0,1,0,0,0,1,0,1,0,0,0,1,0,68,1,0,206.09,26.7"

# # Send the payload to the endpoint
# response = predictor.predict(payload, initial_args={"ContentType": "text/csv"})

# # Assuming the response is a JSON string of probabilities
# predictions_proba = json.loads(response.decode('utf-8'))

# # Set optimal threshold as in Serving the Model chapter
# OPTIMAL_THRESHOLD = OPTIMAL_THRESHOLD

# # Apply the threshold to get binary predictions
# predictions = []
# for prediction_proba in predictions_proba:
#     pred_value = int(prediction_proba >= OPTIMAL_THRESHOLD)
#     predictions.append(pred_value)

# print(f"Stroke: {predictions}")


In [155]:
# import numpy as np

# predictor = Predictor(endpoint_name=ENDPOINT)

# try:
#     response = predictor.predict(payload, initial_args={"ContentType": "text/csv"})
#     response = json.loads(response.decode("utf-8"))

#     print(json.dumps(response, indent=2))
#     print(f"\nSpecies: {np.argmax(response['predictions'], axis=1)}")
# except Exception as e:
#     print(e)

In [156]:
# try:
#     sagemaker_client.delete_endpoint(EndpointName=ENDPOINT)
# except Exception as e:
#     print(e)

# Deploying From the Pipeline

In [157]:
DATA_CAPTURE_PERCENTAGE = 100
DATA_CAPTURE_DESTINATION = f"{S3_LOCATION}/monitoring/data-capture"

In [158]:
(CODE_FOLDER / "lambda").mkdir(parents=True, exist_ok=True)

## Lambda Function

In [159]:
%%writefile {CODE_FOLDER}/lambda/lambda.py
# | filename: lambda.py
# | code-line-numbers: true

import os
import json
import boto3
import time

sagemaker = boto3.client("sagemaker")


def lambda_handler(event, context):
    # If we are calling this function from EventBridge,
    # we need to extract the model package ARN and the
    # approval status from the event details. If we are
    # calling this function from the pipeline, we can
    # assume the model is approved and we can get the
    # model package ARN as a direct parameter.
    if "detail" in event:
        model_package_arn = event["detail"]["ModelPackageArn"]
        approval_status = event["detail"]["ModelApprovalStatus"]
    else:
        model_package_arn = event["model_package_arn"]
        approval_status = "Approved"

    print(f"Model: {model_package_arn}")
    print(f"Approval status: {approval_status}")

    if approval_status != "Approved":
        response = {
            "message": "Skipping deployment.",
            "approval_status": approval_status,
        }

        print(response)
        return {"statusCode": 200, "body": json.dumps(response)}

    endpoint_name = os.environ["ENDPOINT"]
    data_capture_percentage = int(os.environ["DATA_CAPTURE_PERCENTAGE"])
    data_capture_destination = os.environ["DATA_CAPTURE_DESTINATION"]
    role = os.environ["ROLE"]

    timestamp = time.strftime("%m%d%H%M%S", time.localtime())
    model_name = f"{endpoint_name}-model-{timestamp}"
    endpoint_config_name = f"{endpoint_name}-config-{timestamp}"

    sagemaker.create_model(
        ModelName=model_name,
        ExecutionRoleArn=role,
        Containers=[{"ModelPackageName": model_package_arn}],
    )

    sagemaker.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "ModelName": model_name,
                "InstanceType": "ml.m5.xlarge",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "VariantName": "AllTraffic",
            }
        ],
        # We can enable Data Capture to record the inputs and outputs
        # of the endpoint to use them later for monitoring the model.
        DataCaptureConfig={
            "EnableCapture": True,
            "InitialSamplingPercentage": data_capture_percentage,
            "DestinationS3Uri": data_capture_destination,
            "CaptureOptions": [
                {"CaptureMode": "Input"},
                {"CaptureMode": "Output"},
            ],
            "CaptureContentTypeHeader": {
                "CsvContentTypes": ["text/csv", "application/octect-stream"],
                "JsonContentTypes": ["application/json", "application/octect-stream"],
            },
        },
    )

    response = sagemaker.list_endpoints(NameContains=endpoint_name, MaxResults=1)

    if len(response["Endpoints"]) == 0:
        # If the endpoint doesn't exist, let's create it.
        sagemaker.create_endpoint(
            EndpointName=endpoint_name,
            EndpointConfigName=endpoint_config_name,
        )
    else:
        # If the endpoint already exists, let's update it with the
        # new configuration.
        sagemaker.update_endpoint(
            EndpointName=endpoint_name,
            EndpointConfigName=endpoint_config_name,
        )

    return {"statusCode": 200, "body": json.dumps("Endpoint deployed successfully")}

Overwriting code_scripts/lambda/lambda.py


In [160]:
lambda_role_name = "lambda-deploy-stroke"
lambda_role_arn = None

try:
    response = iam_client.create_role(
        RoleName=lambda_role_name,
        AssumeRolePolicyDocument=json.dumps(
            {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": ["lambda.amazonaws.com", "events.amazonaws.com"],
                        },
                        "Action": "sts:AssumeRole",
                    },
                ],
            },
        ),
        Description="Lambda Endpoint Deployment",
    )

    lambda_role_arn = response["Role"]["Arn"]

    iam_client.attach_role_policy(
        PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
        RoleName=lambda_role_name,
    )

    iam_client.attach_role_policy(
        PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
        RoleName=lambda_role_name,
    )

    print(f'Role "{lambda_role_name}" created with ARN "{lambda_role_arn}".')
except iam_client.exceptions.EntityAlreadyExistsException:
    response = iam_client.get_role(RoleName=lambda_role_name)
    lambda_role_arn = response["Role"]["Arn"]
    print(f'Role "{lambda_role_name}" already exists with ARN "{lambda_role_arn}".')

Role "lambda-deploy-stroke" already exists with ARN "arn:aws:iam::730335307143:role/lambda-deploy-stroke".


In [161]:
ENDPOINT = "lambda-depl-stroke"

In [162]:
from sagemaker.lambda_helper import Lambda

deploy_lambda_fn = Lambda(
    function_name="deployment_function",
    execution_role_arn=lambda_role_arn,
    script=(CODE_FOLDER / "lambda" / "lambda.py").as_posix(),
    handler="lambda.lambda_handler",
    timeout=600,
    session=sagemaker_session,
    runtime="python3.11",
    environment={
        "Variables": {
            "ENDPOINT": ENDPOINT,
            "DATA_CAPTURE_DESTINATION": DATA_CAPTURE_DESTINATION,
            "DATA_CAPTURE_PERCENTAGE": str(DATA_CAPTURE_PERCENTAGE),
            "ROLE": role,
        },
    },
)

deploy_lambda_fn_response = deploy_lambda_fn.upsert()
deploy_lambda_fn_response

{'ResponseMetadata': {'RequestId': '2411feed-1baf-4781-96c1-17b0db3d5903',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Tue, 04 Jun 2024 15:25:04 GMT',
   'content-type': 'application/json',
   'content-length': '1619',
   'connection': 'keep-alive',
   'x-amzn-requestid': '2411feed-1baf-4781-96c1-17b0db3d5903'},
  'RetryAttempts': 0},
 'FunctionName': 'deployment_function',
 'FunctionArn': 'arn:aws:lambda:eu-central-1:730335307143:function:deployment_function',
 'Runtime': 'python3.11',
 'Role': 'arn:aws:iam::730335307143:role/lambda-deploy-stroke',
 'Handler': 'lambda.lambda_handler',
 'CodeSize': 3595,
 'Description': '',
 'Timeout': 600,
 'MemorySize': 128,
 'LastModified': '2024-06-04T15:25:04.000+0000',
 'CodeSha256': 'KXGbRnCljQMzGlLi09xT9dOyewM9JgLXMvtTvTa7sNc=',
 'Version': '$LATEST',
 'Environment': {'Variables': {'ROLE': 'arn:aws:iam::730335307143:role/service-role/AmazonSageMaker-ExecutionRole-20240511T180225',
   'DATA_CAPTURE_PERCENTAGE': '100',
   'DATA_CAPTURE_DE

In [163]:
from sagemaker.workflow.lambda_step import LambdaStep


def create_deployment_step(register_model_step):
    """Create a Deploy Step using the supplied parameters."""
    return LambdaStep(
        name="deploy",
        lambda_func=deploy_lambda_fn,
        inputs={
            "model_package_arn": register_model_step.properties.ModelPackageArn,
        },
    )


deploy_step = create_deployment_step(register_model_step)

In [164]:
from sagemaker.workflow.condition_step import ConditionStep

condition_step = ConditionStep(
    name="check-model-auc",
    conditions=[condition],
    if_steps=[register_model_step, deploy_step],
    else_steps=[fail_step],
)

In [165]:
lambda_pipeline = Pipeline(
    name="lambda-pipeline",
    parameters=[dataset_location, auc_threshold],
    steps=[
        preprocessing_step,
        tune_model_step if USE_TUNING_STEP else train_model_step,
        evaluate_model_step,
        condition_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

lambda_pipeline.upsert(role_arn=role)

INFO:sagemaker.processing:Uploaded None to s3://stroke-bucket/lambda-pipeline/code/af7bccc9ab7aa123484460dff61a6567/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://stroke-bucket/lambda-pipeline/code/f3b7867d7495763812a03744135acb08/runproc.sh
INFO:sagemaker.processing:Uploaded None to s3://stroke-bucket/lambda-pipeline/code/af7bccc9ab7aa123484460dff61a6567/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://stroke-bucket/lambda-pipeline/code/f3b7867d7495763812a03744135acb08/runproc.sh


{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:730335307143:pipeline/lambda-pipeline',
 'ResponseMetadata': {'RequestId': 'f8774ffa-5b39-42c0-ac99-e39d11d709af',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f8774ffa-5b39-42c0-ac99-e39d11d709af',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Tue, 04 Jun 2024 15:25:08 GMT'},
  'RetryAttempts': 0}}

In [166]:
# # # # %%script false --no-raise-error
# # # # | eval: false

# lambda_pipeline.start()