# End-To-End MLOps Pipeline With SageMaker (AWS)

Let's begin by setting up the environment.

In [46]:
%load_ext autoreload
%autoreload 2
%load_ext dotenv
%dotenv

import json
import sys
import os
from pathlib import Path
import boto3
import ipytest

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


Throughout this project we will have the ability to run our functions in local mode or not. By setting this to `True` we will run all of the pipelines locally (and not connect to AWS). Setting it to `False` will run the pipeline in SageMaker.

In [47]:
LOCAL_MODE = False

Now we are going to load the environment variables that we need to run this project.

In [48]:
import logging
# This stops SageMaker from reporting in this cell.
logging.getLogger('sagemaker.config').disabled = True

from sagemaker.workflow.pipeline_context import LocalPipelineSession, PipelineSession
import sagemaker

CODE_FOLDER = Path("code") 
CODE_FOLDER.mkdir(exist_ok=True) 
sys.path.extend([f"./{CODE_FOLDER}"])

DATA_FILEPATH = "penguins.csv" # Path to data

ipytest.autoconfig(raise_on_error=True) # Testing library

bucket = os.environ["BUCKET"]
role = os.environ["ROLE"]

COMET_API_KEY = os.environ.get("COMET_API_KEY", None)
COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME", None)

Initialise our pipeline session and initialise required config variables.

In [49]:
pipeline_session = PipelineSession(default_bucket=bucket) if not LOCAL_MODE else LocalPipelineSession(default_bucket=bucket)
instance_type = "ml.m5.xlarge" if not LOCAL_MODE else "local"

config = {
        "session": pipeline_session,
        "instance_type": instance_type,
        "image": None,
    }

config["framework_version"] = "2.12"
config["py_version"] = "py310"

S3_LOCATION = f"s3://{bucket}/penguins"

sagemaker_session = sagemaker.session.Session()
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
region = boto3.Session().region_name

# What Data Are We Dealing With?
There is a file in this project called [`data_analysis.ipynb`](https://github.com/4igeek/ml-ops-pipeline/blob/main/program/data_analysis.ipynb) and in that file I conduct some exploratory data analysis so that we can understand the dataset that we are going to be working with. If you're interested then make sure you take a read of that file before moving on.

# Splitting & Transforming The Data
Now we're going to start building a pipeline (which is just a chain of components). The first component will be for transforming our data.

*Note: If you haven't set up the project yet then you need to read [Configuring AWS & Local Environment for MLOps Pipelines Project](https://digitalredneck.co.uk/configuring-aws-local-environment-for-mlops-pipelines-project/) to get up to this point.* 

We've already uploaded our data to s3 so what this pipeline component is going to do is go to the s3 bucket and retrieve the data and then create a model out of that dataset. In order to do that there are a number of steps that we need to go through. We need to process the data, then we need to train the model, then we need to evaluate the model to make sure it is producing good results, and then we'll be in the position where we can register the model that we can then deploy for use in the future.

The pipeline is the structure that is going to combine all of these steps and execute them one after the other.

<img src="https://digitalredneck.co.uk/PipeLine_split_transform.jpg" style="width: 100%; height: auto;" />

This step is going to take the data from our s3 bucket and then split the data (training and testing sets) and transform the data (replacing missing values etc). The output of this component is going to be saved back in s3. Then the next step will be training and then evaluation etc.

We'll use the [Scikit-Learn Pipeline](https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html) for the transformations, and a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) with a [SKLearnProcessor](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-processor) to execute a processing script. For more information on this check out the [SageMaker Pipelines Overview](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) for an intro into the fundamental components of a SageMaker Pipeline.

## Step 1: The Processing Script
We need to write a script that will be used to split and transform the data. This Processing Step will create a SageMaker Processing Job in the background. It will then run the script and upload the output to AWS s3.

We will create a folder called "processing" and add it to the system path so that we can use it later.

In [50]:
(CODE_FOLDER / "processing").mkdir(parents=True, exist_ok=True)

The first line in the following cell i.e. `%%writefile {CODE_FOLDER}/processing/script.py` essentially writes the code written in the cell to a new file in a folder called `processing` without executing it. This means we're using Jupyter to write python files that will then be used in production (and not Jupyter itself).

In [51]:
%%writefile {CODE_FOLDER}/processing/script.py
# | filename: script.py
# | code-line-numbers: true

import os
import tarfile
import tempfile
from pathlib import Path

import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler


def preprocess(base_directory):
    """Load, split, and transform the data."""

    # Get all of the data in our data directory and receive a 
    # shuffled dataset that contains all of the data.
    df = _read_data_from_input_csv_files(base_directory)

    # This is where we create a SciKit-Learn pipeline to transform the dataset.

    # Create a ColumnTransformer for the target feature.
    # This will transform the categorical data in the target feature
    # using an ordinal encoder: so 0, 1, 2 etc for the different classes.
    target_transformer = ColumnTransformer(
        transformers=[("species", OrdinalEncoder(), [0])],
    )

    # This is applied to all of the numerical columns in the dataset.
    # We're going to impute missing values with the mean of all the other values.
    # We're also going to scale (normalise) all of the values.
    numeric_transformer = make_pipeline(
        SimpleImputer(strategy="mean"),
        StandardScaler(),
    )

    # This will be applied to all of the categorical columns in the dataset.
    # We're going to impute missing values with the most common of all the other values.
    # We're also going to one-hot encode the columns to create new features.
    categorical_transformer = make_pipeline(
        SimpleImputer(strategy="most_frequent"),
        OneHotEncoder(),
    )

    # This is where we use the transformers we just created in a ColumnTransformer.
    features_transformer = ColumnTransformer(
        transformers=[
            (
                "numeric", # The name of this transformer
                numeric_transformer,
                make_column_selector(dtype_exclude="object"),
            ),
            (
                "categorical", # The name of this transformer 
                categorical_transformer, 
                ["island"]), # Only being applied to the island column.
                # We don't transform the "sex" column and as such SciKit Learn 
                # will drop it. By ignoring it it will be dropped.
                # The reason we're doing that is because "sex" doesn't have any 
                # predictive power in this dataset and as such we don't need it.
        ],
    )

    # Now that the data has been transformed we're now going to split it.
    # We need to split the data before transforming it.
    # 70% for training, 15% for validation, and 15% for test
    df_train, df_validation, df_test = _split_data(df)

    # This is going to help us when we get to monitoring our model
    # These are the baselines for our raw data.
    _save_train_baseline(base_directory, df_train)
    _save_test_baseline(base_directory, df_test)

    # This is where we now transform the data. This is how the transformers are applied.
    # This will be applied to all of the sets in our data i.e. train, validation, and test.
    # We "fit" the train set and from the info collected from that we will then transform
    # The validation and test set using the info collected by calling "fit_transform".
    
    # The reason we do this is in case some of the classes are missing in the validation 
    # or test sets. If they are we will know because we got that from "fit_transform"
    # By doing it this way, we can ensure that the encodings of the data are the same 
    # in all sets i.e. class A=0, class B=1, class C=2 etc. 
    y_train = target_transformer.fit_transform( # fit_transform 
        np.array(df_train.species.values).reshape(-1, 1),
    )
    y_validation = target_transformer.transform( # transform
        np.array(df_validation.species.values).reshape(-1, 1),
    )
    y_test = target_transformer.transform( # transform
        np.array(df_test.species.values).reshape(-1, 1),
    )

    # Now that we've got our "y" labels we can drop them from the three main datasets.
    # We are going to drop the target feature for the three sets.
    df_train = df_train.drop("species", axis=1)
    df_validation = df_validation.drop("species", axis=1)
    df_test = df_test.drop("species", axis=1)

    # This is where we transform the features in the dataset (minus the target column).
    X_train = features_transformer.fit_transform(df_train)  # noqa: N806
    X_validation = features_transformer.transform(df_validation)  # noqa: N806
    X_test = features_transformer.transform(df_test)  # noqa: N806

    # Once this is done our data is ready to be saved.
    _save_splits(
        base_directory,
        X_train,
        y_train,
        X_validation,
        y_validation,
        X_test,
        y_test,
    )

    # This is where we save the SciKit Learn transformation pipeline.
    # When we deploy this model in production we are going to be receiving raw data.
    # Before we can make a prediction on that raw data we need to transform the data
    # so that it works with the model (i.e. has had the same transformations made to it).
    # We need to transform that raw data using the exact same process as we have here.
    _save_model(base_directory, target_transformer, features_transformer)


def _read_data_from_input_csv_files(base_directory):
    """Read the data from the input CSV files.

    This function reads every CSV file available and
    concatenates them into a single dataframe.
    """

    # This script will grab data out of every csv file in the directory
    # This is because more data can be added to the project (in separate files)
    # And that new data will be included in the processing step.
    input_directory = Path(base_directory) / "input"
    files = list(input_directory.glob("*.csv"))

    # If there are no files then we are going to raise an error
    if len(files) == 0:
        message = f"The are no CSV files in {input_directory.as_posix()}/"
        raise ValueError(message)

    # If there are files then we are going to save them to a DataFrame
    # This DataFrame will contain the data from all of the files in the directory.
    raw_data = [pd.read_csv(file) for file in files]
    df = pd.concat(raw_data)

    # Shuffle and return the data
    return df.sample(frac=1, random_state=42)


def _split_data(df):
    """Split the data into train, validation, and test."""
    df_train, temp = train_test_split(df, test_size=0.3)
    df_validation, df_test = train_test_split(temp, test_size=0.5)

    return df_train, df_validation, df_test


def _save_train_baseline(base_directory, df_train):
    """Save the untransformed training data to disk.

    We will need the training data to compute a baseline to
    determine the quality of the data that the model receives
    when deployed.
    """
    baseline_path = Path(base_directory) / "train-baseline"
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = df_train.copy().dropna()

    # To compute the data quality baseline, we don't need the
    # target variable, so we'll drop it from the dataframe.
    df = df.drop("species", axis=1)

    df.to_csv(baseline_path / "train-baseline.csv", header=True, index=False)


def _save_test_baseline(base_directory, df_test):
    """Save the untransformed test data to disk.

    We will need the test data to compute a baseline to
    determine the quality of the model predictions when deployed.
    """
    baseline_path = Path(base_directory) / "test-baseline"
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = df_test.copy().dropna()

    # We'll use the test baseline to generate predictions later,
    # and we can't have a header line because the model won't be
    # able to make a prediction for it.
    df.to_csv(baseline_path / "test-baseline.csv", header=False, index=False)


def _save_splits(
    base_directory,
    X_train,  # noqa: N803
    y_train,
    X_validation,  # noqa: N803
    y_validation,
    X_test,  # noqa: N803
    y_test,
):
    """Save data splits to disk.

    This function concatenates the transformed features
    and the target variable, and saves each one of the split
    sets to disk.
    """
    train = np.concatenate((X_train, y_train), axis=1)
    validation = np.concatenate((X_validation, y_validation), axis=1)
    test = np.concatenate((X_test, y_test), axis=1)

    train_path = Path(base_directory) / "train"
    validation_path = Path(base_directory) / "validation"
    test_path = Path(base_directory) / "test"

    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        validation_path / "validation.csv",
        header=False,
        index=False,
    )
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)


def _save_model(base_directory, target_transformer, features_transformer):
    """Save the Scikit-Learn transformation pipelines.

    This function creates a model.tar.gz file that
    contains the two transformation pipelines we built
    to transform the data.
    """
    with tempfile.TemporaryDirectory() as directory:
        joblib.dump(target_transformer, Path(directory) / "target.joblib")
        joblib.dump(features_transformer, Path(directory) / "features.joblib")

        model_path = Path(base_directory) / "model"
        model_path.mkdir(parents=True, exist_ok=True)

        with tarfile.open(f"{(model_path / 'model.tar.gz').as_posix()}", "w:gz") as tar:
            tar.add(Path(directory) / "target.joblib", arcname="target.joblib")
            tar.add(
                Path(directory) / "features.joblib", arcname="features.joblib",
            )


if __name__ == "__main__":
    preprocess(base_directory="/opt/ml/processing")

Overwriting code/processing/script.py


Let's run some unit tests on the script to ensure everything worked okay.

In [52]:
%%ipytest -s
# | code-fold: true

import os
import shutil
import tarfile
import tempfile

import pytest
from processing.script import preprocess
import pandas as pd


@pytest.fixture(autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILEPATH, input_directory / "data.csv")

    directory = Path(directory)
    preprocess(base_directory=directory)

    yield directory

    shutil.rmtree(directory)


def test_preprocess_generates_data_splits(directory):
    output_directories = os.listdir(directory)

    assert "train" in output_directories
    assert "validation" in output_directories
    assert "test" in output_directories


def test_preprocess_generates_baselines(directory):
    output_directories = os.listdir(directory)

    assert "train-baseline" in output_directories
    assert "test-baseline" in output_directories


def test_preprocess_creates_two_models(directory):
    model_path = directory / "model"
    tar = tarfile.open(model_path / "model.tar.gz", "r:gz")

    assert "features.joblib" in tar.getnames()
    assert "target.joblib" in tar.getnames()


def test_splits_are_transformed(directory):
    train = pd.read_csv(directory / "train" / "train.csv", header=None)
    validation = pd.read_csv(directory / "validation" / "validation.csv", header=None)
    test = pd.read_csv(directory / "test" / "test.csv", header=None)

    # After transforming the data, the number of features should be 7:
    # * 3 - island (one-hot encoded)
    # * 1 - culmen_length_mm = 1
    # * 1 - culmen_depth_mm
    # * 1 - flipper_length_mm
    # * 1 - body_mass_g
    number_of_features = 7

    # The transformed splits should have an additional column for the target
    # variable.
    assert train.shape[1] == number_of_features + 1
    assert validation.shape[1] == number_of_features + 1
    assert test.shape[1] == number_of_features + 1


def test_train_baseline_is_not_transformed(directory):
    baseline = pd.read_csv(
        directory / "train-baseline" / "train-baseline.csv",
        header=None,
    )

    island = baseline.iloc[:, 0].unique()

    assert "Biscoe" in island
    assert "Torgersen" in island
    assert "Dream" in island


def test_test_baseline_is_not_transformed(directory):
    baseline = pd.read_csv(
        directory / "test-baseline" / "test-baseline.csv", header=None
    )

    island = baseline.iloc[:, 1].unique()

    assert "Biscoe" in island
    assert "Torgersen" in island
    assert "Dream" in island


def test_train_baseline_includes_header(directory):
    baseline = pd.read_csv(directory / "train-baseline" / "train-baseline.csv")
    assert baseline.columns[0] == "island"


def test_test_baseline_does_not_include_header(directory):
    baseline = pd.read_csv(directory / "test-baseline" / "test-baseline.csv")
    assert baseline.columns[0] != "island"

[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m
[32m[32m[1m8 passed[0m[32m in 0.37s[0m[0m


## Step 2: Caching Config
SageMaker supports caching which means it will try to execute a previous run of the same step (if nothing has changed). For more info check out [Caching Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html).

Let's define a caching policy that we can use at every step.

In [53]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="15d")

## Step 3: Pipeline Config
We can make our pipeline more flexible by parameterising it. In our case we are going to pass through the location of the dataset. This means we can then switch out datasets by changing the value of this parameter. For more information visit [Pipeline Parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html).

Imagine you had a massive dataset and you wanted to test something out; you wouldn't necessarily want to run this test on the entire dataset and rather just a sub-set of the data. This is how you would do it by simply pointing the pipeline to the smaller dataset.

In [54]:
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

dataset_location = ParameterString(
    name="dataset_location", # Name of the ParameterString
    default_value=f"{S3_LOCATION}/data",
)

## Step 4: Setting up the Processing Step

*Note: New AWS accounts don't automatically have access to `ml.m5.xlarge` so you need to make sure you've applied for this. You can read more by visiting [Configuring AWS & Local Environment for MLOps Pipelines Project](https://digitalredneck.co.uk/configuring-aws-local-environment-for-mlops-pipelines-project/) and ensuring you're all properly set up.*

We're now going to set up a processing step. We need this for when we run the pipeline because when we do, SageMaker (internally) will create a cluster and it will run multiple instances (in our case only one) and it will deploy a processing container in the cloud.

SageMaker will copy the data from s3 into the processing container, it will then run the script that we created and when it finished it will grab anything in the output directory and then save it back to s3.

Seeing as we're using SciKit-Learn, we need to tell SageMaker to run the script in a container that supports SciKit-Learn. The way we do this is by either specifying all of the criteria you need or we can make use of the internal classes in SageMaker to handle this for us (we'll be using the SciKit-Learn processor class). You need a processor to define a processing step.

In [55]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    base_job_name="preprocess-data", # Defines the job name that we will see in SageMaker.
    framework_version="1.2-1", # We need to tell SageMaker which version of SciKit-Learn we're using.
    instance_type=config["instance_type"], # What type of computer do I want to use.
    instance_count=1, # The number of instances in the cluster generated by SageMaker
    role=role, # What is the role that the job is going to be running under (environment variable)
    sagemaker_session=config["session"], # This is the session we're going to be running under.
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


Now let's define the Processing Step that we'll use in our pipeline. This is the step that we add to the pipeline.

We specify the list of inputs. In this instance it is the dataset that we stored in s3 (make sure you've completed that by visiting [Configuring AWS & Local Environment for MLOps Pipelines Project](https://digitalredneck.co.uk/configuring-aws-local-environment-for-mlops-pipelines-project/) to ensure you're properly set up to run this notebook project).

In [56]:
%%capture
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

preprocessing_step = ProcessingStep(
    name="preprocess-data", # We can see this in SageMaker
    step_args=processor.run(
        code=f"{(CODE_FOLDER / 'processing' / 'script.py').as_posix()}", # What code does the processing job need to run?
        inputs=[ # This is where we tell the processing step where the dataset is i.e. the inputs.
            ProcessingInput(
                source=dataset_location,
                destination="/opt/ml/processing/input", # SageMaker will auto download the data into this location.
            ),
        ],
        outputs=[ # Now we specify the outputs
            ProcessingOutput(
                output_name="train",
                source="/opt/ml/processing/train", # Points to the directory inside the processing container.
                destination=f"{S3_LOCATION}/preprocessing/train", # Optional as SageMaker will save automatically.
            ),
            ProcessingOutput(
                output_name="validation",
                source="/opt/ml/processing/validation",
                destination=f"{S3_LOCATION}/preprocessing/validation",
            ),
            ProcessingOutput(
                output_name="test",
                source="/opt/ml/processing/test",
                destination=f"{S3_LOCATION}/preprocessing/test",
            ),
            ProcessingOutput(
                output_name="model",
                source="/opt/ml/processing/model", # We're saving our processing pipeline to a model.tar.gz file.
                destination=f"{S3_LOCATION}/preprocessing/model", # This is where we save it to.
            ),
            ProcessingOutput(
                output_name="train-baseline",
                source="/opt/ml/processing/train-baseline",
                destination=f"{S3_LOCATION}/preprocessing/train-baseline",
            ),
            ProcessingOutput(
                output_name="test-baseline",
                source="/opt/ml/processing/test-baseline",
                destination=f"{S3_LOCATION}/preprocessing/test-baseline",
            ),
        ],
    ),
    cache_config=cache_config, # We make sure the step is cached.
)

## Step 5: Creating the pipeline
We can no create a pipeline and submits its definition to the SageMaker Pipeline Service to create the pipeline if it doesn't exist (or update it if it does).

In [57]:
from functions import submit_pipeline

steps = [preprocessing_step] # type: ignore
pipeline_name = "data-transform-pipeline"

pipeline = submit_pipeline(dataset_location, pipeline_name, steps, pipeline_definition_config, config, role)

In [58]:
%%script false --no-raise-error
# | eval: false
pipeline.start()

Couldn't find program: 'false'


Provided you ran the above cell NOT in local mode you can now see this pipeline in SageMaker studio under the tab pipelines.

<img src="https://digitalredneck.co.uk/SageMaker_pipelines.jpg" style="width:100%; height: auto;" />

We've successfully created our first step in the pipeline. You'll notice a trend moving forwards as we begin to create the next steps in the project. In the next phase we're going to define a training process (and other steps) and you're going to find that we'll be creating scripts, we'll be defining the container that we want that script to run on, we'll be adding those new steps to the pipeline that we just create and everything will become familiar to us.

# Training the Model
Now we're going to extend the pipeline that we just created with a [Training Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training). We're going to be using TensorFlow for that so if you need to know more about that then visit the [TensorFlow docs in SageMaker](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/using_tf.html#train-a-model-with-tensorflow).

This will work by taking the output from the last step (the processing step) and then using that as in input into this step where we'll train a machine learning model. This step will then output a model that we will then evaluate it in the next step.

We're going to be working on this next:
<img src="https://digitalredneck.co.uk/SageMaker_training_step.jpg" style="width:100%; height:auto;"/>

*Note: Notice how we are using the data from the processing job ("Dataset splits" in the image above).*

We're also going to implement tracking using [Amazon SageMaker Experiments](https://docs.aws.amazon.com/sagemaker/latest/dg/experiments.html) and [Comet ML](https://www.comet.com/site/).

To do this we're going to create a new folder in the project called `training` and this will house the script (which we can then use later).

In [59]:
(CODE_FOLDER / "training").mkdir(parents=True, exist_ok=True)

## Step 1: Creating the Training Script

In [60]:
%%writefile {CODE_FOLDER}/training/script.py
# | filename: script.py
# | code-line-numbers: true

import argparse
import json
import os
import tarfile

from pathlib import Path
from comet_ml import Experiment

import keras
import numpy as np
import pandas as pd
from keras import Input
from keras.layers import Dense
from keras.models import Sequential
from keras.optimizers import SGD
from packaging import version
from sklearn.metrics import accuracy_score


def train(
    model_directory,
    train_path,
    validation_path,
    pipeline_path,
    experiment,
    epochs=50,
    batch_size=32,
):
    print(f"Keras version: {keras.__version__}")

    # We're now loading the data in memory and splitting the labels
    X_train = pd.read_csv(Path(train_path) / "train.csv")
    y_train = X_train[X_train.columns[-1]]
    X_train = X_train.drop(X_train.columns[-1], axis=1)

    X_validation = pd.read_csv(Path(validation_path) / "validation.csv")
    y_validation = X_validation[X_validation.columns[-1]]
    X_validation = X_validation.drop(X_validation.columns[-1], axis=1)

    # We're going to be building a simple sequential model using Keras.
    # The values in the output layer will add up to 1 and the softmax
    # Function is going to return the one with the highest value.
    model = Sequential(
        [
            Input(shape=(X_train.shape[1],)),
            Dense(10, activation="relu"),
            Dense(8, activation="relu"),
            Dense(3, activation="softmax"),
        ]
    )

    # Now we compile the model using a SGD and categorical crossentropy
    # and the metrics we will be tracking is the accuracy of the model.
    model.compile(
        optimizer=SGD(learning_rate=0.01),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )

    # Then we fit the model
    model.fit(
        X_train,
        y_train,
        validation_data=(X_validation, y_validation), # We pass in the validation set
        epochs=epochs,
        batch_size=batch_size,
        verbose=2, # What type of logging do we need (0, 1, or 2)
    )

    # For logging purposes we're logging out the validation accuracy
    predictions = np.argmax(model.predict(X_validation), axis=-1)
    # This is a SciKit-Learn function that retrieves our accuracy
    val_accuracy = accuracy_score(y_validation, predictions)
    print(f"Validation accuracy: {val_accuracy}")

    # Starting on version 3, Keras changed the model saving format.
    # Since we are running the training script using two different versions
    # of Keras, we need to check to see which version we are using and save
    # the model accordingly.
    model_filepath = (
        Path(model_directory) / "001" # Numerical directory indicates the model version (for TensorFlow)
        if version.parse(keras.__version__) < version.parse("3")
        else Path(model_directory) / "penguins.keras"
    )

    # Then we save the model
    model.save(model_filepath)

    # This model expects transformed data and as such we need to run the following 
    # as we need to save this info (transformation pipeline) along with the model.
    # Let's save the transformation pipelines inside the
    # model directory so they get bundled together.
    with tarfile.open(Path(pipeline_path) / "model.tar.gz", "r:gz") as tar:
        tar.extractall(model_directory)

    # If we have a Comet experiment running then we need to update it with the two 
    # parameters i.e. epochs and batch_size.
    if experiment:
        experiment.log_parameters(
            {
                "epochs": epochs,
                "batch_size": batch_size,
            }
        )
        experiment.log_dataset_hash(X_train)

        # Create a confusion matrix and it is going to log it
        experiment.log_confusion_matrix(
            y_validation.astype(int), predictions.astype(int)
        )

        # Log the model (the output of this process)
        experiment.log_model("penguins", model_filepath.as_posix())


if __name__ == "__main__":
    # Any hyperparameters provided by the training job are passed to
    # the entry point as script arguments.
    # SageMaker is going to use the command line to call this script
    # and it is going to parse arguments to this script.
    # For now we want to know the number of epochs and the batch size for the model.
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int, default=50)
    parser.add_argument("--batch_size", type=int, default=32)
    args, _ = parser.parse_known_args()

    # Let's create a Comet experiment to log the metrics and parameters
    # of this training job.
    # We are going to be using Comet ML to keep track of our experiments.
    comet_api_key = os.environ.get("COMET_API_KEY", None)
    comet_project_name = os.environ.get("COMET_PROJECT_NAME", None)

    # If the Comet ML API key and project name are provided then we will create the experiment.
    experiment = (
        Experiment(
            project_name=comet_project_name,
            api_key=comet_api_key,
            auto_metric_logging=True,
            auto_param_logging=True,
            log_code=True,
        )
        if comet_api_key and comet_project_name
        else None
    )

    # SageMaker will automatically create an environment variable called "SM_TRAINING_ENV" and
    # we're reading in that variable. Inside that variable (which is a JSON object) there is 
    # a value called "job_name" which we are also reading in.
    training_env = json.loads(os.environ.get("SM_TRAINING_ENV", {}))
    job_name = training_env.get("job_name", None) if training_env else None

    # We want to use the SageMaker's training job name as the name
    # of the experiment so we can easily recognise it.
    if job_name and experiment:
        experiment.set_name(job_name)

    train(
        # This is the location where we need to save our model.
        # SageMaker will create a model.tar.gz file with anything
        # inside this directory when the training script finishes.
        model_directory=os.environ["SM_MODEL_DIR"],
        # SageMaker creates one channel for each one of the inputs
        # to the Training Step.
        train_path=os.environ["SM_CHANNEL_TRAIN"],
        validation_path=os.environ["SM_CHANNEL_VALIDATION"],
        pipeline_path=os.environ["SM_CHANNEL_PIPELINE"],
        experiment=experiment,
        epochs=args.epochs,
        batch_size=args.batch_size,
    )


Overwriting code/training/script.py


Now we are going to run some unit tests before running it.

In [61]:
%%ipytest -s
#| code-fold: true

import os
import shutil
import pytest
import tempfile

from processing.script import preprocess
from training.script import train

@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILEPATH, input_directory / "data.csv")
    
    directory = Path(directory)
    
    preprocess(base_directory=directory)
    train(
        model_directory=directory / "model",
        train_path=directory / "train", 
        validation_path=directory / "validation",
        pipeline_path=directory / "model",
        experiment=None,
        epochs=1 
    )
    
    yield directory
    
    shutil.rmtree(directory)


def test_train_bundles_model_assets(directory):
    bundle = os.listdir(directory / "model")
    assert "001" in bundle
    
    assets = os.listdir(directory / "model" / "001")
    assert "saved_model.pb" in assets


def test_train_bundles_transformation_pipelines(directory):
    bundle = os.listdir(directory / "model")
    assert "target.joblib" in bundle
    assert "features.joblib" in bundle

Keras version: 2.14.0
8/8 - 0s - loss: 1.2113 - accuracy: 0.2050 - val_loss: 1.1565 - val_accuracy: 0.2353 - 459ms/epoch - 57ms/step
Validation accuracy: 0.23529411764705882


INFO:tensorflow:Assets written to: C:\Users\conta\AppData\Local\Temp\tmpmmyu1qal\model\001\assets


[32m.[0mKeras version: 2.14.0
8/8 - 0s - loss: 1.0050 - accuracy: 0.4979 - val_loss: 1.0572 - val_accuracy: 0.4510 - 496ms/epoch - 62ms/step
Validation accuracy: 0.45098039215686275


INFO:tensorflow:Assets written to: C:\Users\conta\AppData\Local\Temp\tmpono97i3h\model\001\assets


[32m.[0m
[32m[32m[1m2 passed[0m[32m in 2.19s[0m[0m


## Step 2: Creating the Training Step
Now we can create the [Training Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training). Before we do that however, considering that SageMaker is going to run this for us, we need to tell SageMaker that we're using Comet ML. That's what the next cell is achieving. We create this `requirements.txt` file inside the same folder that we run the scripts.

In [62]:
%%writefile {CODE_FOLDER}/training/requirements.txt
#| label: requirements.txt
#| filename: requirements.txt
#| code-line-numbers: false

comet_ml

Overwriting code/training/requirements.txt


When we created the data processing step, we used a processor but when creating a training step we will use something called an estimator. SageMaker uses this estimator to handle end-to-end training and deployment tasks. In the example below we're going to be using the [TensorFlow Estimator](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/sagemaker.tensorflow.html#tensorflow-estimator) to run our script.

You can read more about this by visiting [SageMaker Estimators](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html).

In [63]:
from sagemaker.tensorflow import TensorFlow

estimator = TensorFlow(
    base_job_name="training",
    entry_point="script.py",
    source_dir=f"{(CODE_FOLDER / 'training').as_posix()}",
    
    # SageMaker will pass these hyperparameters as arguments
    # to the entry point of the training script.
    hyperparameters={
        "epochs": 50,
        "batch_size": 32,
    },
    
    # SageMaker will create these environment variables on the
    # Training Job instance.
    environment={
        "COMET_API_KEY": COMET_API_KEY,
        "COMET_PROJECT_NAME": COMET_PROJECT_NAME,
    },
    
    # SageMaker will track these metrics as part of the experiment
    # associated to this pipeline. The metric definitions tells
    # SageMaker how to parse the values from the Training Job logs.
    metric_definitions=[
        {"Name": "loss", "Regex": "loss: ([0-9\\.]+)"},
        {"Name": "accuracy", "Regex": "accuracy: ([0-9\\.]+)"},
        {"Name": "val_loss", "Regex": "val_loss: ([0-9\\.]+)"},
        {"Name": "val_accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"},
    ],
    image_uri=config["image"],
    framework_version=config["framework_version"],
    py_version=config["py_version"],
    instance_type=config["instance_type"],
    instance_count=1,
    disable_profiler=True,
    debugger_hook_config=False,
    sagemaker_session=config["session"],
    role=role,
)

Now we can create the Training Step. This will create a SageMaker Training Job in the background, run our script, and then upload the output of that to our s3 bucket.

This step will take the train and validation data from the preprocessing step that we created earlier.

In [64]:
%%capture
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


def create_training_step(estimator):
    """Create a SageMaker TrainingStep using the provided estimator."""
    return TrainingStep(
        name="train-model",
        step_args=estimator.fit(
            inputs={
                "train": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[ # type: ignore
                        "train"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                ),
                "validation": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[ # type: ignore
                        "validation"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                ),
                "pipeline": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[ # type: ignore
                        "model"
                    ].S3Output.S3Uri,
                    content_type="application/tar+gzip",
                ),
            },
        ),
        cache_config=cache_config,
    )


train_model_step = create_training_step(estimator)

## Step 3: Creating the pipeline
Now we need to define the SageMaker Pipeline and submit the definition to the SageMaker Pipelines service. This will create the pipeline if it doesn't exist, to update it if it does.

In [65]:
from functions import submit_pipeline

steps = [
            preprocessing_step, # type: ignore
            train_model_step # type: ignore
        ] 
pipeline_name = "training-pipeline"

pipeline = submit_pipeline(dataset_location, pipeline_name, steps, pipeline_definition_config, config, role)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Now we can run the pipeline as we did before for the previous step.

In [66]:
%%script false --no-raise-error
# | eval: false
pipeline.start()

Couldn't find program: 'false'


Now we have just created the 2nd step in our pipeline. You can run this NOT in local mode to push this step to SageMaker Studio and once there you can look for a pipeline called `"training-pipeline"`. Once you have located that and have clicked into it you'll see an execution of that step. If you click into that you'll see a map with our two steps.

<img src="https://digitalredneck.co.uk/SageMaker_model.jpg" style="width: 100%; height: auto;"/>

*Note: The order in which we send the steps through to SageMaker doesn't matter as SageMaker will determine the order in which they run.*

We can also, at this point, log into Comet ML and take a look at the results from our training.

<img src="https://digitalredneck.co.uk/CometML_experiments.jpg" style="width: 100%; height: auto;"/>

# Custom Training Container
Now we're going to create a custom Docker image to train our model. This will give us full control over the environment where the scripts will be executed.

## Step 1: Preparing the Docker Image
The first thing that we need to do is copy the training script a folder where we'll prepare the Docker image. We're going to use the same script that we created before for this because it is compatible with the latest version of Keras.

*Note: We're going to be running the script using [Keras 3](https://keras.io/) with a [JAX](https://jax.readthedocs.io/en/latest/index.html) backend.*

In [67]:
import shutil

(CODE_FOLDER / "containers" / "training").mkdir(parents=True, exist_ok=True)
shutil.copy2(
    CODE_FOLDER / "training" / "script.py",
    CODE_FOLDER / "containers" / "training" / "train.py",
)

WindowsPath('code/containers/training/train.py')

We will need to install the libraries needed in the training container. Just like we did before we'll create a `requirements.txt` file that SageMaker will execute before the container is created.

In [68]:
%%writefile {CODE_FOLDER}/containers/training/requirements.txt
# | filename: requirements.txt
# | code-line-numbers: true

sagemaker-training
packaging
keras
pandas
scikit-learn
comet_ml
jax[cpu]

Overwriting code/containers/training/requirements.txt


Now we can create the Dockerfile which will contain the instructions needed to build the training image. This will automatically run the `train.py` script once it starts.

*Note: To use JAX backend we need to set the `KERAS_BACKEND` environment variable to `jax`*

In [69]:
%%writefile {CODE_FOLDER}/containers/training/Dockerfile
# | filename: Dockerfile
# | code-line-numbers: true

FROM python:3.10-slim

RUN apt-get -y update && apt-get install -y --no-install-recommends \
    python3 \
    build-essential libssl-dev pkg-config libhdf5-dev

# Let's install the required Python packages from 
# the requirements.txt file.
COPY requirements.txt .
RUN pip install --user --upgrade pip
RUN pip3 install -r requirements.txt

# We are going to be running the training script
# as the entrypoint of this container.
COPY train.py /opt/ml/code/train.py
ENV SAGEMAKER_PROGRAM = train.py

# We want to use JAX as the backend for Keras.
ENV KERAS_BACKEND = jax

Overwriting code/containers/training/Dockerfile


Now that we've created the dockerfile we can now build the image using the `docker build` command. 

In [70]:
%%capture
IMAGE_NAME = "keras-custom-training-container"

if LOCAL_MODE:
    # If we are running in Local Mode, we can use the
    # default Docker build command.
    print("Building Docker image for arm64 architecture...")

    !docker build -t $IMAGE_NAME $CODE_FOLDER/containers/training/
else:
    # If we aren't running the code in Local Mode, we need
    # to specify we want to build the Docker image for the
    # linux/amd64 architecture before uploading it to ECR.
    print("Building Docker image for linux/amd64 architecture...")

    !docker build --platform="linux/amd64" -t $IMAGE_NAME $CODE_FOLDER/containers/training/
    

## Step 3: Publishing Image To Elastic Container Registry (ECR)
Now that we have our image, we can push it to the ECR which is a fully managed Docker container registry where we can manage our containers. We need to perform this step to make the image that we just created available to SageMaker.

In [71]:
%%capture
account = !aws sts get-caller-identity --query Account --output text
region = !aws configure get region
algorithm_name =IMAGE_NAME

# Extract the actual values from the captured output
account = account[0].strip()
region = region[0].strip()
repository = f"{account}.dkr.ecr.{region}.amazonaws.com/{algorithm_name}:latest"
check_image_command = f"aws ecr describe-repositories --repository-names {IMAGE_NAME}"

if not LOCAL_MODE:
    # Check to see if the repository exists already or not
    check_image_exists = !{check_image_command}
    if not check_image_exists[0]:
        # The repository doesn't exist
        !aws ecr create-repository --repository-name keras-custom-training-container
    else:
        # The repository does exist
        !aws ecr describe-repositories --repository-names keras-custom-training-container

    login_command = f"aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {repository}"
    !{login_command}

    # Use f-strings to construct and execute the bash commands
    tag_command = f"docker tag {algorithm_name} {repository}"
    !{tag_command}

    push_command = f"docker push {repository}"
    !{push_command}

## Step 4: Setting up the Training Step
Now let's get the name of our training image. If we're running in local mode then we'll use the name `IMAGE_NAME` but if not then we'll need to get the name of the image that we pushed to ECR.

In [72]:
%%capture
account_id = boto3.client("sts").get_caller_identity().get("Account")
tag = ":latest"

uri_suffix = "amazonaws.com"
if region in ["cn-north-1", "cn-northwest-1"]:
    uri_suffix = "amazonaws.com.cn"

training_container_image = (
    IMAGE_NAME
    if LOCAL_MODE
    else (f"{account_id}.dkr.ecr.{region}.amazonaws.com/{IMAGE_NAME}:latest")
)

training_container_image

Now we can create the [Estimator](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html) and [Training Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training) using the function we created before.

In [73]:
%%capture
from sagemaker.estimator import Estimator

keras_estimator = Estimator(
    image_uri=training_container_image,
    instance_count=1,
    instance_type=config["instance_type"],
    sagemaker_session=config["session"],
    role=role,
)

keras_train_model_step = create_training_step(keras_estimator)

## Step 5: Creating the Pipeline
Now that we've got our training model step all configured with a custom Docker image, let's define the SageMaker pipeline.

In [74]:
from functions import submit_pipeline

steps = [
            preprocessing_step, # type: ignore
            keras_train_model_step # type: ignore
        ] 
pipeline_name = "keras-train-model-step"

pipeline = submit_pipeline(dataset_location, pipeline_name, steps, pipeline_definition_config, config, role)

Now let's submit the new definition to SageMaker.

In [75]:
%%capture
%%script false --no-raise-error
# | eval: false
pipeline.start()

# Tuning our Model
This section is going to extend the current SageMaker Pipeline with a step to tune the model. This will use a Hyperparameter Tuning Job.

The problem with the current pipeline is that we don't know exactly what the source data will look like. We know how to write a good model for the current problem i.e. the penguins species classification problem but we don't know whether the pipeline will create the best possible model for us. 

To fix this issue, can use a "tuning step" instead of a "training step" which will be automatically optimised for us (in terms of the models hyperparameters). The tuning step will train several versions of our model and then select the best one for us to use depending on its findings.

That means we're going to be improving this step (that we worked on earlier)
<img src="https://digitalredneck.co.uk/SageMaker_training_step.jpg" style="width:100%; height:auto; margin-top:15px;"/>

*Note: This step will not work in LOCAL_MODE so we will configure it to only work if we want to be working in the cloud.*

In [76]:
USE_TUNING_STEP = not LOCAL_MODE

## Step 1: Creating the Tuning Step
The Tuning Step requires a [HyperparameterTuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html) to configure the tuning job.

Here are some of the configuration options:
1. `objective_metric_name`: The name of the metric tuner
2. `objective_type`: This is the object ive of the tuner. Do we want to minimise or maximise it depending on whether we're measuring loss or accuracy.
3. `metric_definitions`: Defines how the tuner will determine the metrics value by looking at the output logs.
4. `max_jobs`: Set the total number of training jobs.
5. `max_parallel_jobs`: Set the maximum number of parallel jobs to run for the tuning step.

We can explore the hyperparameters using a Parameter class (seen below in relation to "epochs").

In [77]:
from sagemaker.parameter import IntegerParameter
from sagemaker.tuner import HyperparameterTuner

tuner = HyperparameterTuner(
    estimator,
    objective_metric_name="val_accuracy",
    objective_type="Maximize",
    hyperparameter_ranges={
        "epochs": IntegerParameter(10, 50),
    },
    metric_definitions=[{"Name": "val_accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"}],
    max_jobs=3,
    max_parallel_jobs=3,
)

Now that we've done that we can create the Tuning Step. When we deploy this, SageMaker will create a Hyperparameter Tuning Job that will run in the background and use the training script (that we used before) to train different versions of the model.

In [78]:
%%capture
from sagemaker.workflow.steps import TuningStep

tune_model_step = TuningStep(
    name="tune-model",
    step_args=tuner.fit(
        inputs={
            "train": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "pipeline": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "model"
                ].S3Output.S3Uri,
                content_type="application/tar+gzip",
            ),
        },
    ),
    cache_config=cache_config,
)

## Step 2: Creating the Pipeline
Now all we need to do is to define the SageMaker Pipeline and then submit its definition.

In [82]:
from functions import submit_pipeline

steps = [
            preprocessing_step, # type: ignore
            tune_model_step, # type: ignore
        ] 
pipeline_name = "tuning-step"

pipeline = submit_pipeline(dataset_location, pipeline_name, steps, pipeline_definition_config, config, role)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


In [80]:
%%script false --no-raise-error
# | eval: false
pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:586242780547:pipeline/tuning-step/execution/754mo8yue885', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x000001C1ADD22BF0>)

# Evaluating the Model
This section extends the SageMaker Pipeline with a step that will evaluate our model for us (using a holdout set) that we created in the preprocessing step.

As a reminder, this is what we're working on now.

<img src="https://digitalredneck.co.uk/SageMaker_evaluation.jpg" style="width: 100%; height: auto;" />


Notice how this step has two inputs. It has the model and the test set that we created in the preprocessing step. We're going to use the metrics gained from this step to eventually register our model.

## Step 1: Creating the Evaluation Script
We're going to use a [Processing Step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) to execute the evaluation script which is responsible for loading the model and evaluating it on the test set. We're going to store this script in a folder called `evaluation`.

In [83]:
(CODE_FOLDER / "evaluation").mkdir(parents=True, exist_ok=True)

Now we can create the script for that folder.

In [84]:
%%writefile {CODE_FOLDER}/evaluation/script.py
# | filename: script.py
# | code-line-numbers: true

import json
import tarfile
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score
from tensorflow import keras


def evaluate(model_path, test_path, output_path):
    X_test = pd.read_csv(Path(test_path) / "test.csv")
    y_test = X_test[X_test.columns[-1]]
    X_test = X_test.drop(X_test.columns[-1], axis=1)

    # Let's now extract the model package so we can load
    # it in memory.
    with tarfile.open(Path(model_path) / "model.tar.gz") as tar:
        tar.extractall(path=Path(model_path))

    model = keras.models.load_model(Path(model_path) / "001")

    predictions = np.argmax(model.predict(X_test), axis=-1)
    accuracy = accuracy_score(y_test, predictions)
    print(f"Test accuracy: {accuracy}")

    # Let's create an evaluation report using the model accuracy.
    evaluation_report = {
        "metrics": {
            "accuracy": {"value": accuracy},
        },
    }

    Path(output_path).mkdir(parents=True, exist_ok=True)
    with open(Path(output_path) / "evaluation.json", "w") as f:
        f.write(json.dumps(evaluation_report))


if __name__ == "__main__":
    evaluate(
        model_path="/opt/ml/processing/model/",
        test_path="/opt/ml/processing/test/",
        output_path="/opt/ml/processing/evaluation/",
    )

Writing code/evaluation/script.py


Now we're going to run some unit tests:

In [87]:
%%ipytest -s
# | code-fold: true

import os
import shutil
import tarfile
import pytest
import tempfile

from processing.script import preprocess
from training.script import train
from evaluation.script import evaluate


@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILEPATH, input_directory / "data.csv")

    directory = Path(directory)

    preprocess(base_directory=directory)

    train(
        model_directory=directory / "model",
        train_path=directory / "train",
        validation_path=directory / "validation",
        pipeline_path=directory / "model",
        experiment=None,
        epochs=1,
    )

    # After training a model, we need to prepare a package just like
    # SageMaker would. This package is what the evaluation script is
    # expecting as an input.
    with tarfile.open(directory / "model.tar.gz", "w:gz") as tar:
        tar.add(directory / "model" / "001", arcname="001")

    evaluate(
        model_path=directory,
        test_path=directory / "test",
        output_path=directory / "evaluation",
    )

    yield directory / "evaluation"

    shutil.rmtree(directory)


def test_evaluate_generates_evaluation_report(directory):
    output = os.listdir(directory)
    assert "evaluation.json" in output


def test_evaluation_report_contains_accuracy(directory):
    with open(directory / "evaluation.json", "r") as file:
        report = json.load(file)

    assert "metrics" in report
    assert "accuracy" in report["metrics"]

Keras version: 2.14.0
8/8 - 1s - loss: 1.0353 - accuracy: 0.5941 - val_loss: 1.0509 - val_accuracy: 0.6275 - 791ms/epoch - 99ms/step
Validation accuracy: 0.6274509803921569


INFO:tensorflow:Assets written to: C:\Users\conta\AppData\Local\Temp\tmped8u7oco\model\001\assets


Test accuracy: 0.6666666666666666
[32m.[0mKeras version: 2.14.0
8/8 - 0s - loss: 1.1661 - accuracy: 0.5983 - val_loss: 1.1440 - val_accuracy: 0.5490 - 485ms/epoch - 61ms/step
Validation accuracy: 0.5490196078431373


INFO:tensorflow:Assets written to: C:\Users\conta\AppData\Local\Temp\tmp8zval3_s\model\001\assets


Test accuracy: 0.6862745098039216
[32m.[0m
[32m[32m[1m2 passed[0m[32m in 4.20s[0m[0m


## Step 2: Referencing the Models Assets

One of the inputs to the evaluation step is the model coming from either the training or tuning step. We can use a flag to determine whether to use the tuning or training step.

In [93]:
model_assets = train_model_step.properties.ModelArtifacts.S3ModelArtifacts # type: ignore

if USE_TUNING_STEP:
    model_assets = tune_model_step.get_top_model_s3_uri( # type: ignore
        top_k=0,
        s3_bucket=config["session"].default_bucket(),
    )

## Step 3: Mapping the Output Property File

We're now going to build a property file and map the evaluation report ot a property file. 

To learn more on this you can read [How to Build and Manage a Property File](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html). 

In [94]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="evaluation-report",
    output_name="evaluation",
    path="evaluation.json",
)

## Step 4 - Setting up the Evaluation Step

To run the evaluation script we'll use a Processing Step configured with a [TensorflowProcessor](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks-tensorflow.html).

In [95]:
from sagemaker.tensorflow import TensorFlowProcessor

evaluation_processor = TensorFlowProcessor(
    base_job_name="evaluation-processor",
    image_uri=config["image"],
    framework_version=config["framework_version"],
    py_version=config["py_version"],
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Now we can define the Processing Step that will run the evaluation script:

In [96]:
%%capture
evaluate_model_step = ProcessingStep(
    name="evaluate-model",
    step_args=evaluation_processor.run(
        code=f"{(CODE_FOLDER / 'evaluation' / 'script.py').as_posix()}",
        inputs=[
            # The first input is the test split that we generated on
            # the first step of the pipeline when we split and
            # transformed the data.
            ProcessingInput(
                source=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test",
            ),
            # The second input is the model that we generated on
            # the Training or Tunning Step.
            ProcessingInput(
                source=model_assets,
                destination="/opt/ml/processing/model",
            ),
        ],
        outputs=[
            # The output is the evaluation report that we generated
            # in the evaluation script.
            ProcessingOutput(
                output_name="evaluation",
                source="/opt/ml/processing/evaluation",
            ),
        ],
    ),
    property_files=[evaluation_report],
    cache_config=cache_config,
)

## Step 5 - Creating the Pipeline
We can no create a pipeline and submits its definition to the SageMaker Pipeline Service to create the pipeline if it doesn't exist (or update it if it does).

In [97]:
from functions import submit_pipeline

steps = [
            preprocessing_step, # type: ignore
            tune_model_step if USE_TUNING_STEP else train_model_step, # type: ignore
            evaluate_model_step, # type: ignore
        ] 
pipeline_name = "tune-model-step"

pipeline = submit_pipeline(dataset_location, pipeline_name, steps, pipeline_definition_config, config, role)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.processing:Uploaded None to s3://mlschool-tests/tune-model-step/code/dcb1a1f793193c0e310e6d0f91fd9390/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://mlschool-tests/tune-model-step/code/f3b7867d7495763812a03744135acb08/runproc.sh


In [100]:
%%script false --no-raise-error
# | eval: false
pipeline.start()

Couldn't find program: 'false'


After we've run that we can log into SageMaker Studio and then visit the Pipelines section to see the pipeline we just created. When we do and we visit the graph of the model we'll see the following (with the evaluation step we just created at the bottom):

<img src="https://digitalredneck.co.uk/SageMaker_tuning_step.jpg" style="width: 100%; height: auto;" />

# Registering the Model 

We're now going to create a step for our pipeline to [register the model](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-version.html) in the [SageMaker Model Directory](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html).

When we evaluated our model in the evaluation step we created some metrics for this step that we can use to assess how good our model is before saving it. The idea is: if a model is good enough then we will register it. We want to attach these metrics as metadata to our model (so that when we look at it in the model directory, we can see how well it performed on the test set as well).

The inputs into this step are the model and the metrics that come from the evaluation step. SageMaker will then register our model in the model registry.

## Step 1: Configuring the Model Package Group
The model directory uses groups to organise versions of our model. Let's give this group a name:

In [101]:
BASIC_MODEL_PACKAGE_GROUP = "penguin-models"

## Step 2: Creating the Model

The model that we created during the Training Step was made using TensorFlow. SageMaker gives us a class that we can use to represent a TensorFlow model. We need this because pert of the information that is going to be stored in the model directory is the docker container that we need to use to run the model.

In [102]:
from sagemaker.tensorflow.model import TensorFlowModel

tensorflow_model = TensorFlowModel(
    model_data=model_assets, # AWS URL that points to the training/tuning step
    framework_version=config["framework_version"],
    sagemaker_session=config["session"],
    role=role,
)

## Step 3: Configuring Model Metrics

Now we need to specify the metrics for our model that we're going to attach to the model before it is saved to the model directory.

In [103]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.functions import Join

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            on="/",
            values=[
                evaluate_model_step.properties.ProcessingOutputConfig.Outputs[ # type: ignore
                    "evaluation"
                ].S3Output.S3Uri,
                "evaluation.json",
            ],
        ),
        content_type="application/json",
    ),
)

## Step 4: Registering the Model

Now it's time to register the model. To do this we're going to write a function to handle this for us (as we'll be using it again throughout the project). We're going to be using something called [ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.model_step.ModelStep) to do this.

In [104]:
from sagemaker.workflow.model_step import ModelStep


def create_registration_step(
    model,
    model_package_group_name,
    approval_status="Approved",
    content_types=["text/csv"],
    response_types=["application/json"],
    model_metrics=None,
    drift_check_baselines=None,
):
    """Create a Registration Step using the supplied parameters."""
    return ModelStep(
        name="register",
        step_args=model.register(
            model_package_group_name=model_package_group_name,
            approval_status=approval_status,
            model_metrics=model_metrics,
            drift_check_baselines=drift_check_baselines,
            content_types=content_types,
            response_types=response_types,
            inference_instances=[config["instance_type"]],
            transform_instances=[config["instance_type"]],
            framework_version=config["framework_version"],
            domain="MACHINE_LEARNING",
            task="CLASSIFICATION",
            framework="TENSORFLOW",
        ),
    )


register_model_step = create_registration_step(
    tensorflow_model,
    BASIC_MODEL_PACKAGE_GROUP,
    model_metrics=model_metrics,
)

INFO:sagemaker.tensorflow.model:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


## Step 5: Creating the Pipeline

We can no create a pipeline and submits its definition to the SageMaker Pipeline Service to create the pipeline if it doesn't exist (or update it if it does).

In [108]:
%%capture
from functions import submit_pipeline

steps = [
            preprocessing_step, # type: ignore
            tune_model_step if USE_TUNING_STEP else train_model_step, # type: ignore
            evaluate_model_step, # type: ignore
            register_model_step,
        ] 
pipeline_name = "register-model"

pipeline = submit_pipeline(dataset_location, pipeline_name, steps, pipeline_definition_config, config, role)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.processing:Uploaded None to s3://mlschool-tests/register-model/code/dcb1a1f793193c0e310e6d0f91fd9390/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://mlschool-tests/register-model/code/f3b7867d7495763812a03744135acb08/runproc.sh
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.processing:Uploaded None to s3://mlschool-tests/register-model/code/dcb1a1f793193c0e310e6d0f91fd9390/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://mlschool-tests/register-model/code/f3b7867d7495763812a03744135acb08/runproc.sh


In [110]:
%%script false --no-raise-error
# | eval: false
pipeline.start()

Couldn't find program: 'false'
