# Penguins Pipeline in Production

This notebook aims to create a [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) to build an end-to-end Machine Learning system to solve the problem of classifying penguin species based on the [Penguins dataset](https://www.kaggle.com/datasets/parulpandey/palmer-archipelago-antarctica-penguin-data).

### Install required libraries

In [2]:
# !pip install -q --upgrade pip
# !pip install -q --upgrade awscli boto3
# !pip install -q --upgrade scikit-learn==0.23.2
# !pip install -q --upgrade PyYAML==6.0
# !pip install -q --upgrade sagemaker==2.165.0
# !pip show sagemaker

In [3]:
%load_ext autoreload
%autoreload 2

## Initial Setup

In [4]:
BUCKET = "ml-school-gibran-2023"

# !aws s3api create-bucket --bucket $BUCKET --region us-east-2 --create-bucket-configuration LocationConstraint=us-east-2

In [5]:
import pandas as pd
import sagemaker
import urllib.request

from pathlib import Path


PENGUINS_FOLDER = Path("penguins")
S3_FILEPATH = f"s3://{BUCKET}/{PENGUINS_FOLDER}"
LOCAL_FILEPATH = Path(PENGUINS_FOLDER)/ "data.csv"

# Create the local folder if it doesn't exist.
PENGUINS_FOLDER.mkdir(parents=True, exist_ok=True)

# Download the official Penguins dataset and store it locally.
urllib.request.urlretrieve(
    "https://storage.googleapis.com/download.tensorflow.org/data/palmer_penguins/penguins_size.csv", 
    LOCAL_FILEPATH
)

# Upload the dataset to S3. We need to do this to make it available to 
# the preprocessing step.
INPUT_DATA_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(LOCAL_FILEPATH), 
    desired_s3_uri=S3_FILEPATH,
)

print(f"Dataset S3 location: {INPUT_DATA_URI}")

Dataset S3 location: s3://ml-school-gibran-2023/penguins/data.csv


In [6]:
df = pd.read_csv(LOCAL_FILEPATH)
df

Unnamed: 0,species,island,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g,sex
0,Adelie,Torgersen,39.1,18.7,181.0,3750.0,MALE
1,Adelie,Torgersen,39.5,17.4,186.0,3800.0,FEMALE
2,Adelie,Torgersen,40.3,18.0,195.0,3250.0,FEMALE
3,Adelie,Torgersen,,,,,
4,Adelie,Torgersen,36.7,19.3,193.0,3450.0,FEMALE
...,...,...,...,...,...,...,...
339,Gentoo,Biscoe,,,,,
340,Gentoo,Biscoe,46.8,14.3,215.0,4850.0,FEMALE
341,Gentoo,Biscoe,50.4,15.7,222.0,5750.0,MALE
342,Gentoo,Biscoe,45.2,14.8,212.0,5200.0,FEMALE


# Session 1 - Data Preprocessing

In [12]:
import os
import numpy as np
import boto3
import json
import numpy as np
import argparse
import tempfile

from botocore.exceptions import ClientError
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig

In [13]:
iam_client = boto3.client("iam")
sagemaker_client = boto3.client("sagemaker")
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

## Step 1 - Preprocess the Dataset

In [14]:
%%writefile {PENGUINS_FOLDER}/preprocessor.py

import os
import numpy as np
import pandas as pd
import tempfile

from pathlib import Path
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from pickle import dump


# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIRECTORY = "/opt/ml/processing"
DATA_FILEPATH = Path(BASE_DIRECTORY) / "input" / "data.csv"


def _save_splits(base_directory, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """
    
    train_path = Path(base_directory) / "train" 
    validation_path = Path(base_directory) / "validation" 
    test_path = Path(base_directory) / "test"
    
    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)
    
    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path / "validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)
    

def _save_pipeline(base_directory, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_directory) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", 'wb'))
    

def _save_classes(base_directory, classes):
    """
    Saves the list of classes from the dataset.
    """
    path = Path(base_directory) / "classes"
    path.mkdir(parents=True, exist_ok=True)
    
    print("CLASSES", np.asarray(classes))

    np.asarray(classes).tofile(path / "classes.csv", sep = ",") 
    

def _generate_baseline_dataset(split_name, base_directory, X, y):
    """
    To monitor the data and the quality of our model we need to compare the 
    production quality and results against a baseline. To create those baselines, 
    we need to use a dataset to compute statistics and constraints. That dataset
    should contain information in the same format as expected by the production
    endpoint. This function will generate a baseline dataset and save it to 
    disk so we can later use it.
    
    """
    baseline_path = Path(base_directory) / f"{split_name}-baseline" 
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = X.copy()
    
    # The baseline dataset needs a column containing the groundtruth.
    df["groundtruth"] = y
    df["groundtruth"] = df["groundtruth"].values.astype(str)
    
    # We will use the baseline dataset to generate baselines
    # for monitoring data and model quality. To simplify the process, 
    # we don't want to include any NaN rows.
    df = df.dropna()

    df.to_json(baseline_path / f"{split_name}-baseline.json", orient='records', lines=True)
    
    
def preprocess(base_directory, data_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train, validation,
    and a test set.
    """
    
    df = pd.read_csv(data_filepath)
    
    numerical_columns = [column for column in df.columns if df[column].dtype in ["int64", "float64"]]
    
    numerical_preprocessor = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="mean")),
        ("scaler", StandardScaler())
    ])

    categorical_preprocessor = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore"))
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ("numerical", numerical_preprocessor, numerical_columns),
            ("categorical", categorical_preprocessor, ["island"]),
        ]
    )
    

    X = df.drop(["sex"], axis=1)
    columns = list(X.columns)
    
    X = X.to_numpy()
    
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7 * len(X)), int(.85 * len(X))])
    
    X_train = pd.DataFrame(train, columns=columns)
    X_validation = pd.DataFrame(validation, columns=columns)
    X_test = pd.DataFrame(test, columns=columns)
    
    y_train = X_train.species
    y_validation = X_validation.species
    y_test = X_test.species
    
    label_encoder = LabelEncoder()
    
    y_train = label_encoder.fit_transform(y_train)
    y_validation = label_encoder.transform(y_validation)
    y_test = label_encoder.transform(y_test)
    
    X_train.drop(["species"], axis=1, inplace=True)
    X_validation.drop(["species"], axis=1, inplace=True)
    X_test.drop(["species"], axis=1, inplace=True)

    # Let's generate a dataset that we can later use to compute
    # baseline statistics and constraints about the data that we
    # used to train our model.
    _generate_baseline_dataset("train", base_directory, X_train, y_train)
    
    # To generate baseline constraints about the quality of the
    # model's predictions, we will use the test set.
    _generate_baseline_dataset("test", base_directory, X_test, y_test)
    
    # Transform the data using the Scikit-Learn pipeline.
    X_train = preprocessor.fit_transform(X_train)
    X_validation = preprocessor.transform(X_validation)
    X_test = preprocessor.transform(X_test)
    
    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)
    
    _save_splits(base_directory, train, validation, test)
    _save_pipeline(base_directory, pipeline=preprocessor)
    _save_classes(base_directory, label_encoder.classes_)
        

if __name__ == "__main__":
    preprocess(BASE_DIRECTORY, DATA_FILEPATH)


Overwriting penguins/preprocessor.py


## Step 2 - Testing the Preprocessing Script

In [15]:
from penguins.preprocessor import preprocess

with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        data_filepath=LOCAL_FILEPATH
    )
    
    print(f"Folders: {os.listdir(directory)}")

CLASSES ['Adelie' 'Chinstrap' 'Gentoo']
Folders: ['train-baseline', 'test-baseline', 'train', 'validation', 'test', 'pipeline', 'classes']


## Step 3 - Pipeline Configuration

In [16]:
dataset_location = ParameterString(
    name="dataset_location",
    default_value=INPUT_DATA_URI,
)

preprocessor_destination = ParameterString(
    name="preprocessor_destination",
    default_value=f"{S3_FILEPATH}/preprocessing",
)

train_dataset_baseline_destination = ParameterString(
    name="train_dataset_baseline_destination",
    default_value=f"{S3_FILEPATH}/preprocessing/baselines/train",
)

test_dataset_baseline_destination = ParameterString(
    name="test_dataset_baseline_destination",
    default_value=f"{S3_FILEPATH}/preprocessing/baselines/test",
)

## Step 4 - Caching Pipeline Steps

In [17]:
cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

## Step 5 - Setting up a Processing Step

In [18]:
sklearn_processor = SKLearnProcessor(
    base_job_name="penguins-preprocessing",
    framework_version="0.23-1",
    instance_type="ml.t3.medium",
    instance_count=1,
    role=role,
)

preprocess_data_step = ProcessingStep(
    name="preprocess-data",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=dataset_location, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=preprocessor_destination),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=preprocessor_destination),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=preprocessor_destination),
        ProcessingOutput(output_name="pipeline", source="/opt/ml/processing/pipeline", destination=preprocessor_destination),
        ProcessingOutput(output_name="classes", source="/opt/ml/processing/classes", destination=preprocessor_destination),
        ProcessingOutput(output_name="train-baseline", source="/opt/ml/processing/train-baseline", destination=train_dataset_baseline_destination),
        ProcessingOutput(output_name="test-baseline", source="/opt/ml/processing/test-baseline", destination=test_dataset_baseline_destination),
    ],
    code=f"{PENGUINS_FOLDER}/preprocessor.py",
    cache_config=cache_config
)

## Step 6 - Running the Pipeline

In [19]:
session1_pipeline = Pipeline(
    name="penguins-session1-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination
    ],
    steps=[
        preprocess_data_step, 
    ]
)

In [20]:
# session1_pipeline.upsert(role_arn=role)
# execution = session1_pipeline.start()

# Session 2 - Model Training and Tuning

In [21]:
from sagemaker.tuner import HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep
from sagemaker.parameter import IntegerParameter
from sagemaker.inputs import TrainingInput
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession

## Step 1 - Training the Model

In [22]:
%%writefile {PENGUINS_FOLDER}/train.py

import os
import argparse

import numpy as np
import pandas as pd
import tensorflow as tf

from pathlib import Path
from sklearn.metrics import accuracy_score

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD


def train(base_directory, train_path, validation_path, epochs=50, batch_size=32):
    X_train = pd.read_csv(Path(train_path) / "train.csv")
    y_train = X_train[X_train.columns[-1]]
    X_train.drop(X_train.columns[-1], axis=1, inplace=True)
    
    X_validation = pd.read_csv(Path(validation_path) / "validation.csv")
    y_validation = X_validation[X_validation.columns[-1]]
    X_validation.drop(X_validation.columns[-1], axis=1, inplace=True)
    
    model = Sequential([
        Dense(10, input_shape=(X_train.shape[1],), activation="relu"),
        Dense(8, activation="relu"),
        Dense(3, activation="softmax"),
    ])

    model.compile(
        optimizer=SGD(learning_rate=0.01),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    model.fit(
        X_train, 
        y_train, 
        validation_data=(X_validation, y_validation),
        epochs=epochs, 
        batch_size=batch_size,
        verbose=2,
    )

    predictions = np.argmax(model.predict(X_validation), axis=-1)
    print(f"Validation accuracy: {accuracy_score(y_validation, predictions)}")
    
    model_filepath = Path(base_directory) / "model" / "001"
    model.save(model_filepath)
    
if __name__ == "__main__":
    # Any hyperparameters provided by the training job are passed to the entry point
    # as script arguments. SageMaker will also provide a list of special parameters
    # that you can capture here. Here is the full list: 
    # https://github.com/aws/sagemaker-training-toolkit/blob/master/src/sagemaker_training/params.py
    parser = argparse.ArgumentParser()
    parser.add_argument("--base_directory", type=str, default="/opt/ml/")
    parser.add_argument("--train_path", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", None))
    parser.add_argument("--validation_path", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION", None))
    parser.add_argument("--epochs", type=int, default=50)
    parser.add_argument("--batch_size", type=int, default=32)
    args, _ = parser.parse_known_args()
    
    train(
        base_directory=args.base_directory,
        train_path=args.train_path,
        validation_path=args.validation_path,
        epochs=args.epochs,
        batch_size=args.batch_size
    )

Overwriting penguins/train.py


## Step 2 - Testing the Model Training Script

In [23]:
from penguins.preprocessor import preprocess
from penguins.train import train


with tempfile.TemporaryDirectory() as directory:
    # First, we preprocess the data and create the 
    # dataset splits.
    preprocess(
        base_directory=directory, 
        data_filepath=LOCAL_FILEPATH
    )
    
    print("preprocess done!")
    print(f"base directory: {directory}")

    # Then, we train a model using the train and 
    # validation splits.
    train(
        base_directory=directory, 
        train_path=Path(directory) / "train", 
        validation_path=Path(directory) / "validation",
        epochs=15
    )

CLASSES ['Adelie' 'Chinstrap' 'Gentoo']
preprocess done!
base directory: /tmp/tmp036yqxcg
Epoch 1/15
Extension horovod.torch has not been built: /usr/local/lib/python3.8/site-packages/horovod/torch/mpi_lib/_mpi_lib.cpython-38-x86_64-linux-gnu.so not found
If this is not expected, reinstall Horovod with HOROVOD_WITH_PYTORCH=1 to debug the build error.
[2023-06-30 18:51:53.192 tensorflow-2-6-cpu-py-ml-t3-medium-1f322d27c215073601493d5d980e:146 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None
[2023-06-30 18:51:53.284 tensorflow-2-6-cpu-py-ml-t3-medium-1f322d27c215073601493d5d980e:146 INFO profiler_config_parser.py:111] Unable to find config at /opt/ml/input/config/profilerconfig.json. Profiler is disabled.
8/8 - 1s - loss: 1.4438 - accuracy: 0.0962 - val_loss: 1.3222 - val_accuracy: 0.1373
Epoch 2/15
8/8 - 0s - loss: 1.3653 - accuracy: 0.1088 - val_loss: 1.2703 - val_accuracy: 0.0588
Epoch 3/15
8/8 - 0s - loss: 1.3025 - accuracy: 0.0962 - val_loss: 1.2318 - val_accuracy: 0.0980
Epoch

INFO:tensorflow:Assets written to: /tmp/tmp036yqxcg/model/001/assets


In [24]:
estimator = TensorFlow(
    entry_point=f"{PENGUINS_FOLDER}/train.py",
    
    hyperparameters={
        "epochs": 50,
        "batch_size": 32
    },
    
    framework_version="2.6",
    instance_type="ml.m5.large",
    py_version="py38",
    instance_count=1,
    script_mode=True,
    
    # The default profiler rule includes a timestamp which will change each time
    # the pipeline is upserted, causing cache misses. Since we don't need
    # profiling, we can disable it to take advantage of caching.
    disable_profiler=True,

    role=role,
)

In [25]:
train_model_step = TrainingStep(
    name="train-model",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

## Step 4 - Hyperparameter Tuning

In [26]:
objective_metric_name = "val_accuracy"
objective_type = "Maximize"
metric_definitions = [{"Name": objective_metric_name, "Regex": "val_accuracy: ([0-9\\.]+)"}]
    
hyperparameter_ranges = {
    "epochs": IntegerParameter(10, 50),
}

tuner = HyperparameterTuner(
    estimator,
    objective_metric_name,
    hyperparameter_ranges,
    metric_definitions,
    objective_type=objective_type,
    max_jobs=3,
    max_parallel_jobs=3,
)

In [27]:
tune_model_step = TuningStep(
    name = "tune-model",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

## Step 5 - Switching between Training and Tuning

In [28]:
USE_TUNING_STEP = False

## Step 6 - Running the Pipeline

In [29]:
session2_pipeline = Pipeline(
    name="penguins-session2-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
    ],
    steps=[
        preprocess_data_step, 
        tune_model_step if USE_TUNING_STEP else train_model_step
    ]
)

In [30]:
# session2_pipeline.upsert(role_arn=role)
# execution = session2_pipeline.start()

# Session 3 - Model Registration

This session extends the SageMaker Pipeline with a step to evaluate and register the model if it reaches a predefined accuracy threshold. We'll use a Processing Step with a ScriptProcessor running TensorFlow to execute an evaluation script. We'll use a Condition Step to determine whether the model's accuracy is above a threshold and a Model Step to register the model. After we register the model, we'll deploy it manually. To learn more about the Model Registry, check Register and Deploy Models with Model Registry.

In [31]:
import time
import tarfile

from sagemaker import ModelPackage
from sagemaker.tensorflow import TensorFlowProcessor
from sagemaker.model import Model
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.predictor import Predictor
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.functions import Join
from sagemaker.workflow.properties import PropertyFile

## Step 1 - Model Evaluation

In [32]:
%%writefile {PENGUINS_FOLDER}/evaluation.py

import os
import json
import tarfile
import numpy as np
import pandas as pd

from pathlib import Path
from tensorflow import keras
from sklearn.metrics import accuracy_score


MODEL_PATH = "/opt/ml/processing/model/"
TEST_PATH = "/opt/ml/processing/test/"
OUTPUT_PATH = "/opt/ml/processing/evaluation/"


def evaluate(model_path, test_path, output_path):
    # The first step is to extract the model package so we can load 
    # it in memory.
    with tarfile.open(Path(model_path) / "model.tar.gz") as tar:
        tar.extractall(path=Path(model_path))
        
    model = keras.models.load_model(Path(model_path) / "001")
    
    X_test = pd.read_csv(Path(test_path) / "test.csv")
    y_test = X_test[X_test.columns[-1]]
    X_test.drop(X_test.columns[-1], axis=1, inplace=True)
    
    predictions = np.argmax(model.predict(X_test), axis=-1)
    accuracy = accuracy_score(y_test, predictions)
    print(f"Test accuracy: {accuracy}")

    # Let's create an evaluation report using the model accuracy.
    evaluation_report = {
        "metrics": {
            "accuracy": {
                "value": accuracy
            },
        },
    }
    
    Path(output_path).mkdir(parents=True, exist_ok=True)
    with open(Path(output_path) / "evaluation.json", "w") as f:
        f.write(json.dumps(evaluation_report))


if __name__ == "__main__":
    evaluate(
        model_path=MODEL_PATH, 
        test_path=TEST_PATH,
        output_path=OUTPUT_PATH
    )

Overwriting penguins/evaluation.py


## Step 2 - Testing the Evaluation Script

In [33]:
from penguins.preprocessor import preprocess
from penguins.train import train
from penguins.evaluation import evaluate


with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        data_filepath=LOCAL_FILEPATH
    )

    train(
        base_directory=directory, 
        train_path=Path(directory) / "train", 
        validation_path=Path(directory) / "validation",
        epochs=10
    )
    
    # After training a model, we need to prepare a package just like
    # SageMaker would. This package is what the evaluation script is
    # expecting as an input.
    with tarfile.open(Path(directory) / "model.tar.gz", "w:gz") as tar:
        tar.add(Path(directory) / "model" / "001", arcname="001")
        
    
    # We can now call the evaluation script.
    evaluate(
        model_path=directory, 
        test_path=Path(directory) / "test",
        output_path=Path(directory) / "evaluation",
    )

CLASSES ['Adelie' 'Chinstrap' 'Gentoo']
Epoch 1/10
8/8 - 1s - loss: 1.2710 - accuracy: 0.2762 - val_loss: 1.2251 - val_accuracy: 0.2745
Epoch 2/10
8/8 - 0s - loss: 1.2390 - accuracy: 0.2887 - val_loss: 1.1976 - val_accuracy: 0.3725
Epoch 3/10
8/8 - 0s - loss: 1.2130 - accuracy: 0.2971 - val_loss: 1.1738 - val_accuracy: 0.3725
Epoch 4/10
8/8 - 0s - loss: 1.1904 - accuracy: 0.3222 - val_loss: 1.1519 - val_accuracy: 0.3529
Epoch 5/10
8/8 - 0s - loss: 1.1692 - accuracy: 0.3640 - val_loss: 1.1320 - val_accuracy: 0.3529
Epoch 6/10
8/8 - 0s - loss: 1.1508 - accuracy: 0.3849 - val_loss: 1.1133 - val_accuracy: 0.3922
Epoch 7/10
8/8 - 0s - loss: 1.1337 - accuracy: 0.3891 - val_loss: 1.0960 - val_accuracy: 0.4118
Epoch 8/10
8/8 - 0s - loss: 1.1180 - accuracy: 0.3891 - val_loss: 1.0817 - val_accuracy: 0.4118
Epoch 9/10
8/8 - 0s - loss: 1.1038 - accuracy: 0.4017 - val_loss: 1.0680 - val_accuracy: 0.4118
Epoch 10/10
8/8 - 0s - loss: 1.0901 - accuracy: 0.4142 - val_loss: 1.0546 - val_accuracy: 0.4118

INFO:tensorflow:Assets written to: /tmp/tmp6jwiz6a_/model/001/assets


Test accuracy: 0.47058823529411764


## Step 3 - Setting up a Processing Step

In [34]:
tensorflow_processor = TensorFlowProcessor(
    base_job_name="penguins-evaluation-processor",
    framework_version="2.6",
    py_version="py38",
    instance_type="ml.m5.large",
    instance_count=1,
    role=role
)

# This is a workaround to a problem with the SageMaker SDK: 
# By default, the TensorFlowProcessor runs the script using
# /bin/bash as its entrypoint. We want to ensure we run it 
# using python3.
tensorflow_processor.framework_entrypoint_command = ["python3"]


# We want to map the evaluation report that we generate inside
# the evaluation script so we can later reference it.
evaluation_report = PropertyFile(
    name="evaluation-report",
    output_name="evaluation",
    path="evaluation.json"
)


# Notice how this step uses the model generated by the tuning or training
# step, and the test set generated by the preprocessing step.
evaluate_model_step = ProcessingStep(
    name="evaluate-model",
    processor=tensorflow_processor,
    inputs=[
        ProcessingInput(
            source=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        ),
        ProcessingInput(
            source=(
                tune_model_step.get_top_model_s3_uri(top_k=0, s3_bucket=sagemaker_session.default_bucket()) 
                if USE_TUNING_STEP 
                else train_model_step.properties.ModelArtifacts.S3ModelArtifacts
            ),
            destination="/opt/ml/processing/model",
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination=f"{S3_FILEPATH}/evaluation"),
    ],
    code=f"{PENGUINS_FOLDER}/evaluation.py",
    property_files=[evaluation_report],
    cache_config=cache_config
)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


## Step 4 - Configuring the Model Metrics

In [35]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(on="/", values=[
            evaluate_model_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'],
            "evaluation.json"]
        ),
        content_type="application/json",
    )
)

## Step 5 - Model Registration

In [36]:
model_package_group_name = "penguins"

model = TensorFlowModel(
    model_data=(
        tune_model_step.get_top_model_s3_uri(top_k=0, s3_bucket=sagemaker_session.default_bucket())
        if USE_TUNING_STEP
        else train_model_step.properties.ModelArtifacts.S3ModelArtifacts
    ),
    framework_version="2.6",
    sagemaker_session=PipelineSession(),
    role=role,
)

register_model_step = ModelStep(
    name="register-model",
    step_args=model.register(
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
        approval_status="Approved",
        
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.m5.large"],
        transform_instances=["ml.m5.large"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="TENSORFLOW",
        framework_version="2.6",
    ),
)

INFO:sagemaker.tensorflow.model:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


## Step 6 - Setting up a Condition Step

In [37]:
accuracy_threshold = ParameterFloat(
    name="accuracy_threshold", 
    default_value=0.70
)

condition_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluate_model_step.name,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value"
    ),
    right=accuracy_threshold
)

fail_step = FailStep(
    name="fail",
    error_message=Join(
        on=" ", 
        values=[
            "Execution failed because the model's accuracy was lower than", 
            accuracy_threshold
        ]
    ),
)

condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition_gte],
    if_steps=[register_model_step],
    else_steps=[fail_step], 
)

## Step 7 - Running the Pipeline

In [38]:
session3_pipeline = Pipeline(
    name="penguins-session3-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
        accuracy_threshold,
    ],
    steps=[
        preprocess_data_step, 
        tune_model_step if USE_TUNING_STEP else train_model_step, 
        evaluate_model_step,
        condition_step
    ],
)

In [39]:
# session3_pipeline.upsert(role_arn=role)
# execution = session3_pipeline.start()

# Session 4 - Model Deployment

This session extends the SageMaker Pipeline with a step to deploy the model to an endpoint. We'll use a Lambda Step to create an endpoint and deploy the model. To control the endpoint's inputs and outputs, we'll modify the model's assets to include code that customizes the processing of a request.

In [40]:
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.tensorflow.model import TensorFlowPredictor
from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput, LambdaOutputTypeEnum
from sagemaker.workflow.parameters import ParameterBoolean
from sagemaker.lambda_helper import Lambda
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.s3 import S3Downloader

## Step 1 - Deploy Latest Model From the Registry

In [41]:
response = sagemaker_client.list_model_packages(
    ModelPackageGroupName=model_package_group_name,
    ModelApprovalStatus="Approved",
    SortBy="CreationTime",
    MaxResults=1,
)

package = response["ModelPackageSummaryList"][0]
package

{'ModelPackageGroupName': 'penguins',
 'ModelPackageVersion': 1,
 'ModelPackageArn': 'arn:aws:sagemaker:us-east-2:833751773833:model-package/penguins/1',
 'CreationTime': datetime.datetime(2023, 6, 28, 19, 31, 52, 64000, tzinfo=tzlocal()),
 'ModelPackageStatus': 'Completed',
 'ModelApprovalStatus': 'Approved'}

In [42]:
endpoint_name = "penguins-class-endpoint"

In [43]:
model_package = ModelPackage(
    model_package_arn=package["ModelPackageArn"], 
    sagemaker_session=sagemaker_session,
    role=role, 
)

model_package.deploy(
    endpoint_name=endpoint_name,
    initial_instance_count=1, 
    instance_type="ml.m5.large",
)

INFO:sagemaker:Creating model with name: penguins-2023-06-30-18-53-27-736
INFO:sagemaker:Creating endpoint-config with name penguins-class-endpoint
INFO:sagemaker:Creating endpoint with name penguins-class-endpoint


---!

In [44]:
# Sample payload
predictor = Predictor(endpoint_name=endpoint_name)

payload = """
0.6569590202313976, -1.0813829646495108, 1.2097102831892812, 0.9226343641317372, 1.0, 0.0, 0.0
-0.7751048801481084, 0.8822689351285553,  -1.2168066120762704, 0.9226343641317372, 0.0, 1.0, 0.0
-0.837387834894918, 0.3386660813829646, -0.26237731892812, -1.92351941317372, 0.0, 0.0, 1.0
"""

response = predictor.predict(payload, initial_args={"ContentType": "text/csv"})
response = json.loads(response.decode("utf-8"))

print(json.dumps(response, indent=2))
print(f"\nSpecies: {np.argmax(response['predictions'], axis=1)}")

{
  "predictions": [
    [
      0.00610151328,
      0.0144627569,
      0.979435682
    ],
    [
      0.698282719,
      0.25099811,
      0.050719168
    ],
    [
      0.956152499,
      0.040099889,
      0.00374757638
    ]
  ]
}

Species: [2 0 0]


In [45]:
predictor.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: penguins-class-endpoint
INFO:sagemaker:Deleting endpoint with name: penguins-class-endpoint


## Step 2 - Preparing the Inference Code

In [54]:
CODE_FOLDER = PENGUINS_FOLDER / "code"

# Create the local folder if it doesn't exist.
CODE_FOLDER.mkdir(parents=True, exist_ok=True)

In [55]:
%%writefile $CODE_FOLDER/inference.py

import os
import json
import boto3
import requests
import numpy as np
import pandas as pd

from pickle import load
from pathlib import Path


PIPELINE_FILE = Path("/tmp") / "pipeline.pkl"
CLASSES_FILE = Path("/tmp") / "classes.csv"

s3 = boto3.resource("s3")


def handler(data, context):
    """
    This is the entrypoint that will be called by SageMaker when the endpoint
    receives a request. You can see more information at 
    https://github.com/aws/sagemaker-tensorflow-serving-container.
    """
    print("Handling endpoint request")
    
    instance = _process_input(data, context)
    output = _predict(instance, context)
    return _process_output(output, context)


def _process_input(data, context):
    print("Processing input data...")
    
    if context is None:
        # The context will be None when we are testing the code
        # directly from a notebook. In that case, we can use the
        # data directly.
        endpoint_input = data
    elif context.request_content_type in ("application/json", "application/octet-stream"):
        # When the endpoint is running, we will receive a context
        # object. We need to parse the input and turn it into 
        # JSON in that case.
        endpoint_input = json.loads(data.read().decode("utf-8"))

        if endpoint_input is None:
            raise ValueError("There was an error parsing the input request.")
    else:
        raise ValueError(f"Unsupported content type: {context.request_content_type or 'unknown'}")
        
    return _transform(endpoint_input)


def _predict(instance, context):
    print("Sending input data to model to make a prediction...")
    
    model_input = json.dumps({"instances": [instance]})
    
    if context is None:
        # The context will be None when we are testing the code
        # directly from a notebook. In that case, we want to return
        # a fake prediction back.
        result = {
            "predictions": [
                [0.2, 0.5, 0.3]
            ]
        }
    else:
        # When the endpoint is running, we will receive a context
        # object. In that case we need to send the instance to the
        # model to get a prediction back.
        response = requests.post(context.rest_uri, data=model_input)
        
        if response.status_code != 200:
            raise ValueError(response.content.decode('utf-8'))
            
        result = json.loads(response.content)
    
    print(f"Response: {result}")
    return result


def _process_output(output, context):
    print("Processing prediction received from the model...")
    
    response_content_type = "application/json" if context is None else context.accept_header
    
    prediction = np.argmax(output["predictions"][0])
    confidence = output["predictions"][0][prediction]
    
    print(f"Prediction: {prediction}. Confidence: {confidence}")
    
    result = json.dumps({
        "species": _get_class(prediction),
        "prediction": int(prediction),
        "confidence": confidence
    }), response_content_type
    
    return result


def _get_pipeline():
    """
    This function will download the Scikit-Learn pipeline from S3 if it doesn't
    already exist. The function will use the `S3_LOCATION` environment
    variable to determine the location of the pipeline.
    """
    
    if not PIPELINE_FILE.exists():
        s3_uri = os.environ.get("S3_LOCATION", None)
        
        s3_parts = s3_uri.split('/', 3)
        bucket = s3_parts[2]
        key = s3_parts[3]

        s3.Bucket(bucket).download_file(f"{key}/pipeline.pkl", str(PIPELINE_FILE))
        
    return load(open(PIPELINE_FILE, 'rb'))


def _get_class(prediction):
    """
    This function returns the class name of a given prediction. 
    
    The function downloads the file with the list of classes from S3 if it doesn't
    already exist. The function will use the `S3_LOCATION` environment
    variable to determine the location of the file.
    """
    
    if not CLASSES_FILE.exists():
        s3_uri = os.environ.get("S3_LOCATION", None)
        
        s3_parts = s3_uri.split('/', 3)
        bucket = s3_parts[2]
        key = s3_parts[3]

        s3.Bucket(bucket).download_file(f"{key}/classes.csv", str(CLASSES_FILE))
            
    with open(CLASSES_FILE) as f:
        file = f.readlines()
        
    classes = list(map(lambda x: x.replace("'", ""), file[0].split(',')))
    return classes[prediction]


def _transform(payload):
    """
    This function transforms the payload in the request using the
    Scikit-Learn pipeline that we created during the preprocessing step.
    """
    
    print("Transforming input data...")

    island = payload.get("island", "")
    culmen_length_mm = payload.get("culmen_length_mm", 0)
    culmen_depth_mm = payload.get("culmen_depth_mm", 0)
    flipper_length_mm = payload.get("flipper_length_mm", 0)
    body_mass_g = payload.get("body_mass_g", 0)
    
    data = pd.DataFrame(
        columns=["island", "culmen_length_mm", "culmen_depth_mm", "flipper_length_mm", "body_mass_g"], 
        data=[[
            island, 
            culmen_length_mm, 
            culmen_depth_mm, 
            flipper_length_mm, 
            body_mass_g
        ]]
    )
    
    result = _get_pipeline().transform(data)
    return result[0].tolist()


Writing penguins/code/inference.py


SageMaker's default TensorFlow inference container doesn't come with Scikit-Learn installed, so we need to provide a requirements.txt file with the libraries we want SageMaker to install in our endpoint.

In [56]:
%%writefile $CODE_FOLDER/requirements.txt

numpy==1.19.5
pandas==1.2.5
scikit-learn==0.23.2

Writing penguins/code/requirements.txt


## Step 3 - Testing the Inference Code

In [58]:
%env S3_LOCATION=$preprocessor_destination.default_value

env: S3_LOCATION=s3://ml-school-gibran-2023/penguins/preprocessing


In [63]:
from penguins.code.inference import handler

handler(
    data={
        "island": "Biscoe", 
        "culmen_length_mm": 48.6,
        "culmen_depth_mm": 16.0,
        "flipper_length_mm": 230.0,
        "body_mass_g": 5800.0,
    },
    context=None
)

Handling endpoint request
Processing input data...
Transforming input data...
Sending input data to model to make a prediction...
Response: {'predictions': [[0.2, 0.5, 0.3]]}
Processing prediction received from the model...
Prediction: 1. Confidence: 0.5


('{"species": "Chinstrap", "prediction": 1, "confidence": 0.5}',
 'application/json')

## Step 4 - Registering the Model

In [64]:
model = TensorFlowModel(
    model_data=train_model_step.properties.ModelArtifacts.S3ModelArtifacts,
    entry_point="inference.py",
    source_dir=str(CODE_FOLDER),
    env={
        "S3_LOCATION": preprocessor_destination,
    },
    framework_version="2.6",
    sagemaker_session=PipelineSession(),
    role=role,
)

register_model_step = ModelStep(
    name="register-model",
    step_args=model.register(
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
        approval_status="Approved",
        
        content_types=["application/json"],
        response_types=["application/json"],
        inference_instances=["ml.m5.large"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="TENSORFLOW",
        framework_version="2.6",
    )
)

INFO:sagemaker.tensorflow.model:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


## Step 5 - Deploying the Model

In [68]:
%%writefile $PENGUINS_FOLDER/lambda.py

import os
import json
import boto3
import time

sagemaker = boto3.client("sagemaker")

def lambda_handler(event, context):
    model_package_arn = event["model_package_arn"]
    endpoint_name = event["endpoint_name"]
    data_capture_percentage = event["data_capture_percentage"]
    data_capture_destination = event["data_capture_destination"]
    role = event["role"]
    
    timestamp = time.strftime("%m%d%H%M%S", time.localtime())
    model_name = f"penguins-model-{timestamp}"
    endpoint_config_name = f"penguins-endpoint-config-{timestamp}"

    sagemaker.create_model(
        ModelName=model_name, 
        ExecutionRoleArn=role, 
        Containers=[{
            "ModelPackageName": model_package_arn
        }] 
    )

    sagemaker.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "ModelName": model_name,
                "InstanceType": "ml.m5.large",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "VariantName": "AllTraffic",
            }
        ],
        DataCaptureConfig={
            "EnableCapture": True,
            "InitialSamplingPercentage": data_capture_percentage,
            "DestinationS3Uri": data_capture_destination,
            "CaptureOptions": [
                {
                    'CaptureMode': "Input"
                },
                {
                    'CaptureMode': "Output"
                },
            ],
            "CaptureContentTypeHeader": {
                "JsonContentTypes": [
                    "application/json",
                    "application/octect-stream"
                ]
            }
        },
    )

    sagemaker.create_endpoint(
        EndpointName=endpoint_name, 
        EndpointConfigName=endpoint_config_name,
    )
    
    return {
        "statusCode": 200,
        "body": json.dumps("Endpoint deployed successfully")
    }

Overwriting penguins/lambda.py


In [69]:
def create_lambda_role(role_name):
    try:
        response = iam_client.create_role(
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "lambda.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }),
            Description="Lambda Pipeline Role"
        )

        role_arn = response['Role']['Arn']

        iam_client.attach_role_policy(
            RoleName=role_name,
            PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        )

        iam_client.attach_role_policy(
            PolicyArn='arn:aws:iam::aws:policy/AmazonSageMakerFullAccess',
            RoleName=role_name
        )

        return role_arn

    except iam_client.exceptions.EntityAlreadyExistsException:
        response = iam_client.get_role(RoleName=role_name)
        return response['Role']['Arn']


lambda_role = create_lambda_role("lambda-pipeline-role")

ClientError: An error occurred (AccessDenied) when calling the CreateRole operation: User: arn:aws:sts::833751773833:assumed-role/AmazonSageMaker-ExecutionRole-20230626T144841/SageMaker is not authorized to perform: iam:CreateRole on resource: arn:aws:iam::833751773833:role/lambda-pipeline-role because no identity-based policy allows the iam:CreateRole action