In [2]:
# Load necessary extensions
%load_ext autoreload
%autoreload 2
%load_ext dotenv
%dotenv


# Standard library imports
import sys
import logging
from pathlib import Path

# Third-party library imports
import ipytest
import json

# Local imports
from src.paths import CODE_FOLDER, DATA_DIR, INFERENCE_CODE_FOLDER, PARENT_DIR

sys.path.append(str(PARENT_DIR / CODE_FOLDER))
sys.path.append(str(PARENT_DIR / INFERENCE_CODE_FOLDER))

DATA_FILE_PATH = DATA_DIR / "penguins.csv"

ipytest.autoconfig(raise_on_error=True)

# Prevent SageMaker SDK to log events related to the default
# configuration using the INFO level
logging.getLogger("sagemaker.config").setLevel(logging.ERROR)


In [3]:
LOCAL_MODE = False

In [4]:
import os

bucket = os.getenv("BUCKET")
role = os.getenv("ROLE")

S3_LOCATION = f"s3://{bucket}/penguins"

In [5]:
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession, LocalPipelineSession

pipeline_session = PipelineSession(default_bucket=bucket) if not LOCAL_MODE else None

if LOCAL_MODE:
    config = {
        "session": LocalPipelineSession(default_bucket=bucket),
        "instance_type": "local",
        "image": None
    }
else:
    config = {
        "session": pipeline_session,
        "instance_type": "ml.m5.xlarge",
        "image": None
    }

config["framework_version"] = "2.11"
config["py_version"] = "py39"

In [6]:
import boto3

sagemaker_session = sagemaker.Session()
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
region = boto3.Session().region_name

By default, only the specified columns in transformers are transformed and combined in the output, and the non-specified columns are dropped. (default of 'drop')

## Creating the Preprocessing Script

In [7]:
%%writefile {CODE_FOLDER}/preprocessor.py
"""
This module preprocesses data for machine learning tasks. It includes functions to read data from CSV files,
split the data into training, validation, and test sets, save baseline data, transform data, and save the
processed data and models.
"""

import os
import tarfile
import tempfile
from pathlib import Path

# Import statements...
from typing import Tuple

import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler


def preprocess(base_directory: str) -> None:
    """
    Preprocess the data by loading, splitting, transforming, saving the splits, and saving the model.

    Args:
        base_directory: The base directory where the input data and outputs will be managed.
    """
    # 1. Load supplied data
    df = _read_data_from_csv_files(base_directory)

    # 2. Split data into train and test sets
    df_train, df_validation, df_test = _split_data(df)

    # 3. Save baseline data
    _save_baselines(base_directory, df_train, df_test)

    # 3. Transform the train and test sets
    target_transformer = ColumnTransformer(
        transformers=[("species", OrdinalEncoder(), [0])]
    )

    numeric_transformer = make_pipeline(
        SimpleImputer(strategy="mean"), StandardScaler()
    )

    categorical_transformer = make_pipeline(
        SimpleImputer(strategy="most_frequent"), OneHotEncoder()
    )

    features_transformer = ColumnTransformer(
        transformers=[
            (
                "numeric",
                numeric_transformer,
                make_column_selector(dtype_exclude="object"),
            ),
            ("categorical", categorical_transformer, ["island"]),
        ]
    )

    y_train = target_transformer.fit_transform(
        np.array(df_train.species.values).reshape(-1, 1)
    )
    y_validation = target_transformer.transform(
        np.array(df_validation.species.values).reshape(-1, 1)
    )
    y_test = target_transformer.transform(
        np.array(df_test.species.values).reshape(-1, 1)
    )

    df_train = df_train.drop(columns=["species"], axis=1)
    df_validation = df_validation.drop(columns=["species"], axis=1)
    df_test = df_test.drop(columns=["species"], axis=1)

    X_train = features_transformer.fit_transform(df_train)
    X_validation = features_transformer.transform(df_validation)
    X_test = features_transformer.transform(df_test)

    # 4. Save the train and test splits
    _save_splits(
        base_directory, X_train, y_train, X_validation, y_validation, X_test, y_test
    )

    # 5. Save the model (transformers) in tar.gz format
    _save_model(base_directory, target_transformer, features_transformer)


def _read_data_from_csv_files(base_directory: str) -> pd.DataFrame:
    """
    Read and concatenate data from CSV files located in the input directory.

    Args:
        base_directory: The directory where CSV files are located.

    Returns:
        A DataFrame containing the concatenated data.
    """
    input_directory = Path(base_directory) / "input"
    files = [file for file in input_directory.glob("*.csv")]

    if len(files) == 0:
        raise ValueError(f"No csv files found in {input_directory}")

    raw_data = [pd.read_csv(file) for file in files]
    df = pd.concat(raw_data)

    # Shuffle the data
    return df.sample(frac=1, random_state=42)


def _split_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Split the DataFrame into training, validation, and test sets.

    Args:
        df: The DataFrame to be split.

    Returns:
        A tuple containing the training, validation, and test DataFrames.
    """
    df_train, temp = train_test_split(df, test_size=0.3)
    df_validation, df_test = train_test_split(temp, test_size=0.5)

    return df_train, df_validation, df_test


def _save_baselines(
    base_directory: str, df_train: pd.DataFrame = None, df_test: pd.DataFrame = None
) -> None:
    """
    Save baseline versions of the training and test data sets.

    Args:
        base_directory: Directory where the baseline data will be saved.
        df_train: Training data DataFrame.
        df_test: Test data DataFrame.
    """
    for split, data in [("train", df_train), ("test", df_test)]:
        baseline_path = Path(base_directory) / f"{split}-baseline"
        baseline_path.mkdir(parents=True, exist_ok=True)

        df = data.copy().dropna()

        # Save header only for the train baseline
        header = True if split == "train" else False
        df.to_csv(baseline_path / f"{split}-baseline.csv", index=False, header=header)


def _save_splits(
    base_directory: str,
    X_train: np.ndarray,
    y_train: np.ndarray,
    X_validation: np.ndarray,
    y_validation: np.ndarray,
    X_test: np.ndarray,
    y_test: np.ndarray,
) -> None:
    """
    Save the training, validation, and test sets after concatenating features with their respective targets.

    Args:
        base_directory: Directory where the data splits will be saved.
        X_train: Features of the training set.
        y_train: Target of the training set.
        X_validation: Features of the validation set.
        y_validation: Target of the validation set.
        X_test: Features of the test set.
        y_test: Target of the test set.
    """
    train = np.concatenate((X_train, y_train), axis=1)
    validation = np.concatenate((X_validation, y_validation), axis=1)
    test = np.concatenate((X_test, y_test), axis=1)

    train_path = Path(base_directory) / "train"
    validation_path = Path(base_directory) / "validation"
    test_path = Path(base_directory) / "test"

    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    pd.DataFrame(train).to_csv(train_path / "train.csv", index=False, header=False)
    pd.DataFrame(validation).to_csv(
        validation_path / "validation.csv", index=False, header=False
    )
    pd.DataFrame(test).to_csv(test_path / "test.csv", index=False, header=False)


def _save_model(base_directory: str, target_transformer, features_transformers) -> None:
    """
    Save the preprocessing model (transformers) in tar.gz format.

    Args:
        base_directory: Directory where the model will be saved.
        target_transformer: The transformer used for the target variable.
        features_transformers: The transformers used for the feature variables.
    """
    with tempfile.TemporaryDirectory() as directory:
        joblib.dump(target_transformer, os.path.join(directory, "target.joblib"))
        joblib.dump(features_transformers, os.path.join(directory, "features.joblib"))

        model_path = Path(base_directory) / "model"
        model_path.mkdir(parents=True, exist_ok=True)

        with tarfile.open(f"{str(model_path / 'model.tar.gz')}", "w:gz") as tar:
            tar.add(os.path.join(directory, "target.joblib"), arcname="target.joblib")
            tar.add(
                os.path.join(directory, "features.joblib"), arcname="features.joblib"
            )


if __name__ == "__main__":
    preprocess(base_directory="/opt/ml/processing")


Overwriting /Users/carlos/Projects/penguin-classifier/src/preprocessor.py


In [8]:
%%ipytest -s

import pytest
import pandas as pd
import tempfile
from pathlib import Path
import shutil
import tarfile
from preprocessor import preprocess


@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)

    shutil.copy2(DATA_FILE_PATH, input_directory / "data.csv")

    directory = Path(directory)
    preprocess(directory)

    yield directory
    
    shutil.rmtree(directory)


def test_preprocess_generate_baselines(directory):
    output_directories = os.listdir(directory)

    assert "train-baseline" in output_directories
    assert "test-baseline" in output_directories


def test_preprocess_generate_data_splits(directory):
    output_directories = os.listdir(directory)

    assert "train" in output_directories
    assert "validation" in output_directories
    assert "test" in output_directories


def test_preprocess_creates_two_models(directory):
    model_path = directory / "model"

    tar = tarfile.open(model_path / "model.tar.gz", "r:gz")

    assert "target.joblib" in tar.getnames()
    assert "features.joblib" in tar.getnames()


def tests_splits_are_transformed(directory):
    train = pd.read_csv(directory / "train" / "train.csv", header=None)
    validation = pd.read_csv(directory / "validation" / "validation.csv", header=None)
    test = pd.read_csv(directory / "test" / "test.csv", header=None)

    # The number of features should be 7
    # * 3 - island (one-hot encoded)
    # * 1 - culmen_length_mm
    # * 1 - culmen_depth_mm
    # * 1 - flipper_length_mm
    # * 1 - body_mass_g
    num_features = 7

    # The number of targets should be 1
    assert train.shape[1] == num_features + 1
    assert validation.shape[1] == num_features + 1
    assert test.shape[1] == num_features + 1
    

[32m.[0m[32m.[0m[32m.[0m[32m.[0m
[32m[32m[1m4 passed[0m[32m in 0.16s[0m[0m


### Setting up the Processing Step

In [9]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="P15D") # type: ignore

In [10]:
from sagemaker.workflow.parameters import ParameterString

dataset_location = ParameterString(
    name="dataset_location",
    default_value=f"{S3_LOCATION}/data",
)

In [11]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version="1.2-1",
    base_job_name="preprocess-data",
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
)

In [12]:
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ProcessingInput, ProcessingOutput

preprocessing_step = ProcessingStep(
    name="preprocess-data",
    step_args=processor.run(
        code=f"{CODE_FOLDER}/preprocessor.py",
        inputs=[
            ProcessingInput(
                source=dataset_location,
                destination="/opt/ml/processing/input"
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="train",
                source="/opt/ml/processing/train",
                destination=f"{S3_LOCATION}/preprocessing/train",
            ),
            ProcessingOutput(
                output_name="validation",
                source="/opt/ml/processing/validation",
                destination=f"{S3_LOCATION}/preprocessing/validation",
            ),
            ProcessingOutput(
                output_name="test",
                source="/opt/ml/processing/test",
                destination=f"{S3_LOCATION}/preprocessing/test",
            ),
            ProcessingOutput(
                output_name="model",
                source="/opt/ml/processing/model",
                destination=f"{S3_LOCATION}/preprocessing/model",
            ),
            ProcessingOutput(
                output_name="train-baseline",
                source="/opt/ml/processing/train-baseline",
                destination=f"{S3_LOCATION}/preprocessing/train-baseline",
            ),
            ProcessingOutput(
                output_name="test-baseline",
                source="/opt/ml/processing/test-baseline",
                destination=f"{S3_LOCATION}/preprocessing/test-baseline",
            ),
        ],
    ),
    cache_config=cache_config,
)



### Creating the Pipeline

In [13]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

pipeline_definition_config = PipelineDefinitionConfig(
    use_custom_job_prefix=True
)

preprocessing_pipeline = Pipeline(
    name="preprocessing-pipeline",
    parameters=[dataset_location],
    steps=[preprocessing_step],
    sagemaker_session=config["session"],
    pipeline_definition_config=pipeline_definition_config,
)

preprocessing_pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:035306718946:pipeline/preprocessing-pipeline',
 'ResponseMetadata': {'RequestId': '6409536e-96b4-4bdf-997d-b4670a8ada1a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6409536e-96b4-4bdf-997d-b4670a8ada1a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '90',
   'date': 'Fri, 01 Dec 2023 13:17:43 GMT'},
  'RetryAttempts': 0}}

In [14]:
# %%script false --no-raise-error

preprocessing_pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-2:035306718946:pipeline/preprocessing-pipeline/execution/5msi3g8gmcum', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x1032fcd50>)

## Building Models and the Training Pipeline

In [15]:
%%writefile {CODE_FOLDER}/train.py

import argparse
import os

import numpy as np
import pandas as pd

from pathlib import Path
from sklearn.metrics import accuracy_score

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD


def train(model_directory: str, train_path: str, validation_path: str, epochs: int =50, batch_size: int=32) -> None:
    """
    Train a model using the training and validation data sets.

    Args:
        model_directory: Directory where the model will be saved.
        train_path: Path to the training data set.
        validation_path: Path to the validation data set.
        epochs: Number of epochs to train the model.
        batch_size: Batch size used during training.
    """
    # Load training and validation data sets
    X_train = pd.read_csv(Path(train_path) / "train.csv")
    y_train = X_train[X_train.columns[-1]]
    X_train.drop(X_train.columns[-1], axis=1, inplace=True)

    X_validation = pd.read_csv(Path(validation_path) / "validation.csv")
    y_validation = X_validation[X_validation.columns[-1]]
    X_validation.drop(X_validation.columns[-1], axis=1, inplace=True)

    # Build a Sequential model
    model = Sequential([
        Dense(10, activation="relu", input_shape=(X_train.shape[1],)),
        Dense(8, activation="relu"),
        Dense(3, activation="softmax")
    ])

    model.compile(
        optimizer=SGD(learning_rate=0.01),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    
    model.fit(
        X_train,
        y_train,
        validation_data=(X_validation, y_validation),
        epochs=epochs,
        batch_size=batch_size,
        verbose=2
    )

    # Make predictions
    predictions = model.predict(X_validation)
    predictions = np.argmax(predictions, axis=-1)
    print(f"Validation accuracy: {accuracy_score(y_validation, predictions)}")

    # Save the model
    model_filepath = Path(model_directory) / "001"
    model.save(model_filepath)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int, default=50)
    parser.add_argument("--batch_size", type=int, default=32)

    args, _ = parser.parse_known_args()

    train(
        model_directory=os.environ["SM_MODEL_DIR"],
        train_path=os.environ["SM_CHANNEL_TRAIN"],
        validation_path=os.environ["SM_CHANNEL_VALIDATION"],

        epochs=args.epochs,
        batch_size=args.batch_size
    )

Overwriting /Users/carlos/Projects/penguin-classifier/src/train.py


In [16]:
%%ipytest -s

import os
import shutil
import tarfile
import pytest
import tempfile
import joblib

from preprocessor import preprocess
from train import train


@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILE_PATH, input_directory / "data.csv")
    
    directory = Path(directory)
    
    preprocess(base_directory=directory)
    train(
        model_directory=directory / "model",
        train_path=directory / "train", 
        validation_path=directory / "validation",
        epochs=1
    )
    
    yield directory
    
    shutil.rmtree(directory)


def test_train_saves_a_folder_with_model_assets(directory):
    output = os.listdir(directory / "model")
    assert "001" in output
    
    assets = os.listdir(directory / "model" / "001")
    assert "saved_model.pb" in assets

2023-12-01 10:17:46.774914: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


8/8 - 1s - loss: 1.2502 - accuracy: 0.1339 - val_loss: 1.1430 - val_accuracy: 0.1765 - 604ms/epoch - 76ms/step
Validation accuracy: 0.17647058823529413


INFO:tensorflow:Assets written to: /var/folders/ft/p0zj8zms6wldqq4t_fyn6w300000gn/T/tmpvbo9zizm/model/001/assets


[32m.[0m
[32m[32m[1m1 passed[0m[32m in 1.41s[0m[0m


### Setting up the Training Step

In [17]:
from sagemaker.tensorflow import TensorFlow

estimator = TensorFlow(
    base_job_name="training",
    entry_point=f"{CODE_FOLDER}/train.py",
    hyperparameters={
        "epochs": 50,
        "batch_size": 32
    },
    metrics_definitions=[
        {"Name": "loss", "Regex": "loss: ([0-9\\.]+)"},
        {"Name": "accuracy", "Regex": "accuracy: ([0-9\\.]+)"},
        {"Name": "val_loss", "Regex": "val_loss: ([0-9\\.]+)"},
        {"Name": "val_accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"}
    ],
    image_uri=config["image"],
    framework_version=config["framework_version"],
    py_version=config["py_version"],
    instance_type=config["instance_type"],
    instance_count=1,
    disable_profiler=True,
    sagemaker_session=config["session"],
    role=role,
)

In [18]:
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput

train_model_step = TrainingStep(
    name="train-model",
    step_args=estimator.fit(
        inputs={
            "train": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv"
            ),
            "validation": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv"
            ),
        }
    ),
    cache_config=cache_config,
)



In [19]:
USE_TUNING_STEP = False and not LOCAL_MODE

In [20]:
from sagemaker.tuner import HyperparameterTuner
from sagemaker.parameter import IntegerParameter

tuner = HyperparameterTuner(
    estimator,
    objective_metric_name="val_accuracy",
    objective_type="Maximize",
    hyperparameter_ranges={
        "epochs": IntegerParameter(10, 50),
    },
    metric_definitions=[{"Name": "val_accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"}],
    max_jobs=3,
    max_parallel_jobs=3,
)

In [21]:
from sagemaker.workflow.steps import TuningStep

tune_model_step = TuningStep(
    name="tune-model",
    step_args=tuner.fit(
        inputs={
            "train": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
    ),
    cache_config=cache_config,
)

In [22]:
training_pipeline = Pipeline(
    name="training-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
        tune_model_step if USE_TUNING_STEP else train_model_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

training_pipeline.upsert(role_arn=role)

Using provided s3_resource


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


{'PipelineArn': 'arn:aws:sagemaker:us-east-2:035306718946:pipeline/training-pipeline',
 'ResponseMetadata': {'RequestId': '28df84da-146e-4559-a031-ef71c4d8eee9',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '28df84da-146e-4559-a031-ef71c4d8eee9',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '85',
   'date': 'Fri, 01 Dec 2023 13:17:51 GMT'},
  'RetryAttempts': 0}}

In [23]:
# %%script false --no-raise-error

training_pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-2:035306718946:pipeline/training-pipeline/execution/nsyiike64qs7', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x1032fcd50>)

## Evaluating and Versioning Models

In [44]:
%%writefile {CODE_FOLDER}/evaluation.py

import json
import tarfile
import pandas as pd
import numpy as np

from pathlib import Path
from sklearn.metrics import accuracy_score
from tensorflow import keras


def evaluate(model_path: str, test_path: str, output_path: str) -> None:
    X_test = pd.read_csv(Path(test_path) / "test.csv")
    y_test = X_test[X_test.columns[-1]]
    X_test.drop(X_test.columns[-1], axis=1, inplace=True)

    # Let's now extract the model package so we can load 
    # it in memory.
    with tarfile.open(Path(model_path) / "model.tar.gz") as tar:
        tar.extractall(path=Path(model_path))

    model = keras.models.load_model(Path(model_path) / "001")

    predictions = model.predict(X_test)
    predictions = np.argmax(predictions, axis=-1)

    accuracy = accuracy_score(y_test, predictions)
    print(f"Test accuracy: {accuracy}")

    evaluation_report = {
        "metrics": {
            "accuracy": {
                "value": accuracy
            },
        },
    }

    Path(output_path).mkdir(parents=True, exist_ok=True)
    with open(Path(output_path) / "evaluation.json", "w") as f:
        f.write(json.dumps(evaluation_report))


if __name__ == "__main__":
    evaluate(
        model_path="/opt/ml/processing/model/", 
        test_path="/opt/ml/processing/test/",
        output_path="/opt/ml/processing/evaluation/"
    )

Overwriting /Users/carlos/Projects/penguin-classifier/src/evaluation.py


In [45]:
%%ipytest -s
#| code-fold: true
#| output: false

import os
import shutil
import tarfile
import pytest
import tempfile
import joblib

from preprocessor import preprocess
from train import train
from evaluation import evaluate


@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILE_PATH, input_directory / "data.csv")
    
    directory = Path(directory)
    
    preprocess(base_directory=directory)
    
    train(
        model_directory=directory / "model",
        train_path=directory / "train", 
        validation_path=directory / "validation",
        epochs=1
    )
    
    # After training a model, we need to prepare a package just like
    # SageMaker would. This package is what the evaluation script is
    # expecting as an input.
    with tarfile.open(directory / "model.tar.gz", "w:gz") as tar:
        tar.add(directory / "model" / "001", arcname="001")
        
    evaluate(
        model_path=directory, 
        test_path=directory / "test",
        output_path=directory / "evaluation",
    )

    yield directory / "evaluation"
    
    shutil.rmtree(directory)


def test_evaluate_generates_evaluation_report(directory):
    output = os.listdir(directory)
    assert "evaluation.json" in output


def test_evaluation_report_contains_accuracy(directory):
    with open(directory / "evaluation.json", 'r') as file:
        report = json.load(file)
        
    assert "metrics" in report
    assert "accuracy" in report["metrics"]
    

8/8 - 0s - loss: 1.1274 - accuracy: 0.2427 - val_loss: 1.1343 - val_accuracy: 0.2157 - 497ms/epoch - 62ms/step




Validation accuracy: 0.21568627450980393


INFO:tensorflow:Assets written to: /var/folders/ft/p0zj8zms6wldqq4t_fyn6w300000gn/T/tmp4noe6nxa/model/001/assets


Test accuracy: 0.11764705882352941
[32m.[0m



8/8 - 1s - loss: 1.4623 - accuracy: 0.1967 - val_loss: 1.3159 - val_accuracy: 0.2549 - 524ms/epoch - 65ms/step
Validation accuracy: 0.2549019607843137


INFO:tensorflow:Assets written to: /var/folders/ft/p0zj8zms6wldqq4t_fyn6w300000gn/T/tmpo4x2rbho/model/001/assets


Test accuracy: 0.1568627450980392
[32m.[0m
[32m[32m[1m2 passed[0m[32m in 2.85s[0m[0m


In [47]:
from sagemaker.tensorflow import TensorFlowProcessor

evaluation_processor = TensorFlowProcessor(
    base_job_name="evaluation-processor",
    image_uri=config["image"],
    framework_version=config["framework_version"],
    py_version=config["py_version"],
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


In [48]:
model_assets = train_model_step.properties.ModelArtifacts.S3ModelArtifacts

if USE_TUNING_STEP:
    model_assets = tune_model_step.get_top_model_s3_uri(
        top_k=0, s3_bucket=config["session"].default_bucket()
    )

In [49]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="evaluation-report", output_name="evaluation", path="evaluation.json"
)

In [50]:

evaluate_model_step = ProcessingStep(
    name="evaluate-model",
    step_args=evaluation_processor.run(
        inputs=[
            ProcessingInput(
                source=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test",
            ),
            ProcessingInput(
                source=model_assets,
                destination="/opt/ml/processing/model",
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="evaluation", source="/opt/ml/processing/evaluation"
            ),
        ],
        code=f"{CODE_FOLDER}/evaluation.py",
    ),
    property_files=[evaluation_report],
    cache_config=cache_config,
)



### Registering the Model

In [51]:
MODEL_PACKAGE_GROUP = "penguins"

In [52]:
from sagemaker.tensorflow.model import TensorFlowModel

tensorflow_model = TensorFlowModel(
    model_data=model_assets,
    framework_version=config["framework_version"],
    role=role,
    sagemaker_session=config["session"],
    )

In [53]:
from sagemaker.model_metrics import ModelMetrics, MetricsSource
from sagemaker.workflow.functions import Join

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            on="/",
            values=[
                evaluate_model_step.properties.ProcessingOutputConfig.Outputs[
                    "evaluation"
                ].S3Output.S3Uri,
                "evaluation.json",
            ],
        ),
        content_type="application/json",
    )
)

In [54]:
from sagemaker.workflow.model_step import ModelStep

register_model_step = ModelStep(
    name="register-model",
    step_args=tensorflow_model.register(
        model_package_group_name=MODEL_PACKAGE_GROUP,
        approval_status="Approved",
        model_metrics=model_metrics,
        content_types=["text/csv"],
        response_types=["application/json"],
        inference_instances=[config["instance_type"]],
        transform_instances=[config["instance_type"]],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="TENSORFLOW",
        framework_version=config["framework_version"],
    ),
)

INFO:sagemaker.tensorflow.model:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


### Setting up a Condition Step

In [55]:
from sagemaker.workflow.parameters import ParameterFloat

accuracy_threshold = ParameterFloat(name="accuracy_threshold", default_value=0.70)

In [56]:
from sagemaker.workflow.fail_step import FailStep

fail_step = FailStep(
    name="fail",
    error_message=Join(
        on=" ",
        values=[
            "Execution failed because the model's accuracy was lower than",
            accuracy_threshold,
        ],
    ),
)

In [57]:
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo

condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluate_model_step.name,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=accuracy_threshold,
)

In [58]:
from sagemaker.workflow.condition_step import ConditionStep

condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition],
    if_steps=[register_model_step] if not LOCAL_MODE else [],
    else_steps=[fail_step],
)

### Creating the Pipeline

In [59]:
evaluating_pipeline = Pipeline(
    name="evaluating-pipeline",
    parameters=[dataset_location, accuracy_threshold],
    steps=[
        preprocessing_step,
        tune_model_step if USE_TUNING_STEP else train_model_step,
        evaluate_model_step,
        condition_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

evaluating_pipeline.upsert(role_arn=role)

Using provided s3_resource


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource


INFO:sagemaker.processing:Uploaded None to s3://sigmoidal-bucket/evaluating-pipeline/code/968040ffa8955b343adf7f02b3f05a26/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sigmoidal-bucket/evaluating-pipeline/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh


Using provided s3_resource


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource


INFO:sagemaker.processing:Uploaded None to s3://sigmoidal-bucket/evaluating-pipeline/code/968040ffa8955b343adf7f02b3f05a26/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sigmoidal-bucket/evaluating-pipeline/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh


{'PipelineArn': 'arn:aws:sagemaker:us-east-2:035306718946:pipeline/evaluating-pipeline',
 'ResponseMetadata': {'RequestId': 'a729b39f-5ac4-4022-811e-6e0e30d37783',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a729b39f-5ac4-4022-811e-6e0e30d37783',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '87',
   'date': 'Fri, 01 Dec 2023 13:51:08 GMT'},
  'RetryAttempts': 0}}

In [60]:
evaluating_pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-2:035306718946:pipeline/evaluating-pipeline/execution/lqfsi88fhfkb', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x1032fcd50>)

## Deploying Models and Serving Predictions

In [40]:
from sagemaker.predictor import Predictor

ENDPOINT = "penguins-endpoint"
DATA_CAPTURE_DESTINATION = f"{S3_LOCATION}/monitoring/data-capture"

In [61]:
# Deploying model from registry
response = sagemaker_client.list_model_packages(
    ModelPackageGroupName=MODEL_PACKAGE_GROUP,
    ModelApprovalStatus="Approved",
    SortBy="CreationTime",
    MaxResults=1,
)

package = (
    response["ModelPackageSummaryList"][0]
    if response["ModelPackageSummaryList"]
    else None
)
package

{'ModelPackageGroupName': 'penguins',
 'ModelPackageVersion': 2,
 'ModelPackageArn': 'arn:aws:sagemaker:us-east-2:035306718946:model-package/penguins/2',
 'CreationTime': datetime.datetime(2023, 12, 1, 11, 3, 56, 501000, tzinfo=tzlocal()),
 'ModelPackageStatus': 'Completed',
 'ModelApprovalStatus': 'Approved'}

In [62]:
from sagemaker import ModelPackage

if package:
    model_package = ModelPackage(
        model_package_arn=package["ModelPackageArn"],
        sagemaker_session=sagemaker_session,
        role=role,
    )

In [63]:
model_package.deploy(
    endpoint_name=ENDPOINT, 
    initial_instance_count=1, 
    instance_type=config["instance_type"]
)

INFO:sagemaker:Creating model with name: penguins-2023-12-01-14-05-01-090
INFO:sagemaker:Creating endpoint-config with name penguins-endpoint
INFO:sagemaker:Creating endpoint with name penguins-endpoint


---!

In [64]:
payload = """
0.6569590202313976,-1.0813829646495108,1.2097102831892812,0.9226343641317372,1.0,0.0,0.0
-0.7751048801481084,0.8822689351285553,-1.2168066120762704,0.9226343641317372,0.0,1.0,0.0
-0.837387834894918,0.3386660813829646,-0.26237731892812,-1.92351941317372,0.0,0.0,1.0
"""

In [65]:
import numpy as np

predictor = Predictor(endpoint_name=ENDPOINT)

try:
    response = predictor.predict(payload, initial_args={"ContentType": "text/csv"})
    response = json.loads(response.decode("utf-8"))

    print(json.dumps(response, indent=2))
    print(f"\nSpecies: {np.argmax(response['predictions'], axis=1)}")
except Exception as e:
    print(e)

{
  "predictions": [
    [
      0.0326238759,
      0.00740541192,
      0.959970713
    ],
    [
      0.783189774,
      0.148078725,
      0.0687314644
    ],
    [
      0.971578062,
      0.0213339273,
      0.00708800321
    ]
  ]
}

Species: [2 0 0]


In [66]:
predictor.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: penguins-endpoint
INFO:sagemaker:Deleting endpoint with name: penguins-endpoint


In [67]:
%%writefile {INFERENCE_CODE_FOLDER}/preprocessing_component.py

import os
import pandas as pd
import json
import joblib

from io import StringIO

try:
    from sagemaker_containers.beta.framework import encoders, worker
except ImportError:
    # We don't have access to the `worker` instance when testing locally.
    # We'll set it to None so we can change the way functions create
    # a response.
    worker = None


TARGET_COLUMN = "species"
FEATURE_COLUMNS = [
    "island",
    "culmen_length_mm",
    "culmen_depth_mm",
    "flipper_length_mm",
    "body_mass_g",
    "sex",
]


def model_fn(model_dir):
    """
    Deserializes the model that will be used in this container.
    """

    return joblib.load(os.path.join(model_dir, "features.joblib"))
    

def input_fn(input_data, content_type):
    """
    Parses the input payload and creates a Pandas DataFrame.

    This function will check whether the target column is present in the
    input data, and will remove it.
    """

    if content_type == "text/csv":
        df = pd.read_csv(StringIO(input_data), header=None, skipinitialspace=True)

        if len(df.columns) == len(FEATURE_COLUMNS) + 1:
            df = df.drop(df.columns[0], axis=1)

        df.columns = FEATURE_COLUMNS
        return df

    if content_type == "application/json":
        df = pd.DataFrame([json.loads(input_data)])

        if "species" in df.columns:
            df = df.drop("species", axis=1)

        return df

    raise ValueError(f"{content_type} is not supported!")


def predict_fn(input_data, model):
    """
    Preprocess the input using the transformer.
    """

    try:
        response = model.transform(input_data)
        return response
    except ValueError as e:
        print("Error transforming the input data", e)
        return None


def output_fn(prediction, accept):
    """
    Formats the prediction output to generate a response.

    The default accept/content-type between containers for serial inference
    is JSON. Since this model will preceed a TensorFlow model, we want to
    return a JSON object following TensorFlow's input requirements.
    """

    if prediction is None:
        raise Exception("There was an error transforming the input data")

    instances = [p for p in prediction.tolist()]
    response = {"instances": instances}
    return (
        worker.Response(json.dumps(response), mimetype=accept)
        if worker
        else (response, accept)
    )

    raise Exception(f"{accept} accept type is not supported.")

Writing /Users/carlos/Projects/penguin-classifier/src/inference/preprocessing_component.py


In [69]:
%%ipytest

from preprocessing_component import input_fn, predict_fn, output_fn, model_fn


@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILE_PATH, input_directory / "data.csv")
    
    directory = Path(directory)
    
    preprocess(base_directory=directory)
    
    with tarfile.open(directory / "model" / "model.tar.gz") as tar:
        tar.extractall(path=directory / "model")
    
    yield directory / "model"
    
    shutil.rmtree(directory)



def test_input_csv_drops_target_column_if_present():
    input_data = """
    Adelie, Torgersen, 39.1, 18.7, 181, 3750, MALE
    """
    
    df = input_fn(input_data, "text/csv")
    assert len(df.columns) == 6 and "species" not in df.columns


def test_input_json_drops_target_column_if_present():
    input_data = json.dumps({
        "species": "Adelie", 
        "island": "Torgersen",
        "culmen_length_mm": 44.1,
        "culmen_depth_mm": 18.0,
        "flipper_length_mm": 210.0,
        "body_mass_g": 4000.0,
        "sex": "MALE"
    })
    
    df = input_fn(input_data, "application/json")
    assert len(df.columns) == 6 and "species" not in df.columns


def test_input_csv_works_without_target_column():
    input_data = """
    Torgersen, 39.1, 18.7, 181, 3750, MALE
    """
    
    df = input_fn(input_data, "text/csv")
    assert len(df.columns) == 6


def test_input_json_works_without_target_column():
    input_data = json.dumps({
        "island": "Torgersen",
        "culmen_length_mm": 44.1,
        "culmen_depth_mm": 18.0,
        "flipper_length_mm": 210.0,
        "body_mass_g": 4000.0,
        "sex": "MALE"
    })
    
    df = input_fn(input_data, "application/json")
    assert len(df.columns) == 6


def test_output_raises_exception_if_prediction_is_none():
    with pytest.raises(Exception):
        output_fn(None, "application/json")
    
    
def test_output_returns_tensorflow_ready_input():
    prediction = np.array([
        [-1.3944109908736013,1.15488062669371,-0.7954340636549508,-0.5536447804097907,0.0,1.0,0.0],
        [1.0557485835338234,0.5040085971987002,-0.5824506029515057,-0.5851840035995248,0.0,1.0,0.0]
    ])
    
    response = output_fn(prediction, "application/json")
    
    assert response[0] == {
        "instances": [
            [-1.3944109908736013,1.15488062669371,-0.7954340636549508,-0.5536447804097907,0.0,1.0,0.0],
            [1.0557485835338234,0.5040085971987002,-0.5824506029515057,-0.5851840035995248,0.0,1.0,0.0]
        ]
    }
    
    assert response[1] == "application/json"

    
def test_predict_transforms_data(directory):
    input_data = """
    Torgersen, 39.1, 18.7, 181, 3750, MALE
    """
    
    model = model_fn(str(directory))
    df = input_fn(input_data, "text/csv")
    response = predict_fn(df, model)
    assert type(response) is np.ndarray
    

def test_predict_returns_none_if_invalid_input(directory):
    input_data = """
    Invalid, 39.1, 18.7, 181, 3750, MALE
    """
    
    model = model_fn(str(directory))
    df = input_fn(input_data, "text/csv")
    assert predict_fn(df, model) is None

[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                                     [100%][0m
[32m[32m[1m8 passed[0m[32m in 0.12s[0m[0m


In [70]:
%%writefile {INFERENCE_CODE_FOLDER}/postprocessing_component.py

import os
import numpy as np
import json
import joblib


try:
    from sagemaker_containers.beta.framework import encoders, worker
except ImportError:
    # We don't have access to the `worker` instance when testing locally.
    # We'll set it to None so we can change the way functions create
    # a response.
    worker = None


def model_fn(model_dir):
    """
    Deserializes the target model and returns the list of fitted categories.
    """

    model = joblib.load(os.path.join(model_dir, "target.joblib"))
    return model.named_transformers_["species"].categories_[0]


def input_fn(input_data, content_type):
    if content_type == "application/json":
        predictions = json.loads(input_data)["predictions"]
        return predictions
    
    raise ValueError(f"{content_type} is not supported.!")


def predict_fn(input_data, model):
    """
    Transforms the prediction into its corresponding category.
    """
    predictions = np.argmax(input_data, axis=-1)
    confidence = np.max(input_data, axis=-1)
    return [
        (model[prediction], confidence)
        for confidence, prediction in zip(confidence, predictions)
    ]

def output_fn(prediction, accept):
    if accept == "text/csv":
        return (
            worker.Response(encoders.encode(prediction, accept), mimetype=accept)
            if worker
            else (prediction, accept)
        )

    if accept == "application/json":
        response = []
        for p, c in prediction:
            response.append({"prediction": p, "confidence": c})

        # If there's only one prediction, we'll return it
        # as a single object.
        if len(response) == 1:
            response = response[0]

        return (
            worker.Response(json.dumps(response), mimetype=accept)
            if worker
            else (response, accept)
        )

    raise Exception(f"{accept} accept type is not supported.")

Writing /Users/carlos/Projects/penguin-classifier/src/inference/postprocessing_component.py


In [71]:
%%ipytest

import numpy as np

from postprocessing_component import predict_fn, output_fn


def test_predict_returns_prediction_as_first_column():
    input_data = [
        [0.6, 0.2, 0.2], 
        [0.1, 0.8, 0.1],
        [0.2, 0.1, 0.7]
    ]
    
    categories = ["Adelie", "Gentoo", "Chinstrap"]
    
    response = predict_fn(input_data, categories)
    
    assert response == [
        ("Adelie", 0.6),
        ("Gentoo", 0.8),
        ("Chinstrap", 0.7)
    ]


def test_output_does_not_return_array_if_single_prediction():
    prediction = [("Adelie", 0.6)]
    response, _ = output_fn(prediction, "application/json")

    assert response["prediction"] == "Adelie"


def test_output_returns_array_if_multiple_predictions():
    prediction = [("Adelie", 0.6), ("Gentoo", 0.8)]
    response, _ = output_fn(prediction, "application/json")

    assert len(response) == 2
    assert response[0]["prediction"] == "Adelie"
    assert response[1]["prediction"] == "Gentoo"


[32m.[0m[32m.[0m[32m.[0m[32m                                                                                          [100%][0m
[32m[32m[1m3 passed[0m[32m in 0.01s[0m[0m


In [72]:
transformation_pipeline_model = Join(
    on="/",
    values=[
        preprocessing_step.properties.ProcessingOutputConfig.Outputs[
            "model"
        ].S3Output.S3Uri,
        "model.tar.gz",
    ],
)

In [73]:
from sagemaker.sklearn.model import SKLearnModel

preprocessing_model = SKLearnModel(
    model_data=transformation_pipeline_model,
    entry_point="preprocessing_component.py",
    source_dir=str(INFERENCE_CODE_FOLDER),
    framework_version="1.2-1",
    sagemaker_session=config["session"],
    role=role,
)

In [74]:
post_processing_model = SKLearnModel(
    model_data=transformation_pipeline_model,
    entry_point="postprocessing_component.py",
    source_dir=str(INFERENCE_CODE_FOLDER),
    framework_version="1.2-1",
    sagemaker_session=config["session"],
    role=role,
)

In [75]:
from sagemaker.pipeline import PipelineModel

pipeline_model = PipelineModel(
    name="inference-model",
    models=[preprocessing_model, tensorflow_model, post_processing_model],
    sagemaker_session=config["session"],
    role=role,
)

In [76]:
PIPELINE_MODEL_PACKAGE_GROUP = "pipeline"

In [77]:
register_model_step = ModelStep(
    name="register",
    display_name="register-model",
    step_args=pipeline_model.register(
        model_package_group_name=PIPELINE_MODEL_PACKAGE_GROUP,
        model_metrics=model_metrics,
        approval_status="PendingManualApproval",
        content_types=["text/csv", "application/json"],
        response_types=["text/csv", "application/json"],
        inference_instances=[config["instance_type"]],
        transform_instances=[config["instance_type"]],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="TENSORFLOW",
        framework_version=config["framework_version"],
    ),
)



In [78]:
condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition],
    if_steps=[register_model_step] if not LOCAL_MODE else [],
    else_steps=[fail_step],
)

In [79]:
model__pipeline = Pipeline(
    name="model-pipeline",
    parameters=[dataset_location, accuracy_threshold],
    steps=[
        preprocessing_step,
        tune_model_step if USE_TUNING_STEP else train_model_step,
        evaluate_model_step,
        condition_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

model__pipeline.upsert(role_arn=role)

Using provided s3_resource


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource


INFO:sagemaker.processing:Uploaded None to s3://sigmoidal-bucket/model-pipeline/code/968040ffa8955b343adf7f02b3f05a26/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sigmoidal-bucket/model-pipeline/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh


{'PipelineArn': 'arn:aws:sagemaker:us-east-2:035306718946:pipeline/model-pipeline',
 'ResponseMetadata': {'RequestId': 'c4106408-1a40-4151-924f-eded49ab567a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c4106408-1a40-4151-924f-eded49ab567a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'date': 'Fri, 01 Dec 2023 14:16:45 GMT'},
  'RetryAttempts': 0}}

In [80]:
model__pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-2:035306718946:pipeline/model-pipeline/execution/c0z3ozt5uhqy', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x1032fcd50>)