# Pipeline of Digits

This is a starting notebook for solving the "Pipeline of Digits" assignment.


This notebook was created by [Santiago L. Valdarrama](https://twitter.com/svpino) as part of the [Machine Learning School](https://www.ml.school) program.

Let's make sure we are running the latest version of the SakeMaker's SDK. **Restart the notebook** after you upgrade the library.

In [4]:
!pip install -q --upgrade awscli
!pip install -q --upgrade pip
!pip install -q --upgrade sagemaker
!pip install -q --upgrade tensorflow
!pip show sagemaker

[0m[31mERROR: Cannot uninstall 'PyYAML'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.[0m[31m
[0mName: sagemaker
Version: 2.145.0
Summary: Open source library for training and deploying models on Amazon SageMaker.
Home-page: https://github.com/aws/sagemaker-python-sdk/
Author: Amazon Web Services
Author-email: 
License: Apache License 2.0
Location: /opt/conda/lib/python3.7/site-packages
Requires: attrs, boto3, google-pasta, importlib-metadata, jsonschema, numpy, packaging, pandas, pathos, platformdirs, protobuf, protobuf3-to-dict, PyYAML, schema, smdebug-rulesconfig
Required-by: 


In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import boto3
import sagemaker
import pandas as pd

from pathlib import Path

role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

## Creating the S3 Bucket

Let's create an S3 bucket where you will upload all the information generated by the pipeline. Make sure you set `BUCKET` to the name of the bucket you want to use. This name has to be unique.

If you want to create a bucket in a region other than `us-east-1`, use this command instead:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region
```

The `LocationConstraint` argument should specify the region where you want to create the bucket.

In [4]:
BUCKET = "negan1911-mlschool"

!aws s3api create-bucket --bucket $BUCKET

{
    "Location": "/negan1911-mlschool"
}


## Loading the dataset

We have two CSV files containing the MNIST dataset. These files come from the [MNIST in CSV](https://www.kaggle.com/datasets/oddrationale/mnist-in-csv) Kaggle dataset.

The `mnist_train.csv` file contains 60,000 training examples and labels. The `mnist_test.csv` contains 10,000 test examples and labels. Each row consists of 785 values: the first value is the label (a number from 0 to 9) and the remaining 784 values are the pixel values (a number from 0 to 255).

Let's extract the `dataset.tar.gz` file.

In [5]:
MNIST_FOLDER = "."
DATASET_FOLDER = Path() / "dataset"

!tar -xvzf $MNIST_FOLDER/dataset.tar.gz -C $MNIST_FOLDER --no-same-owner

dataset/
dataset/mnist_test.csv
dataset/mnist_train.csv


Let's load the first 10 rows of the test set.

In [6]:
df = pd.read_csv(DATASET_FOLDER / "mnist_train.csv", nrows=10)
df

Unnamed: 0,label,1x1,1x2,1x3,1x4,1x5,1x6,1x7,1x8,1x9,...,28x19,28x20,28x21,28x22,28x23,28x24,28x25,28x26,28x27,28x28
0,5,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,9,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,2,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
6,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
7,3,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
8,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
9,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


## Uploading dataset to S3

In [7]:
S3_FILEPATH = f"s3://{BUCKET}/mnist"


TRAIN_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_train.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

TEST_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_test.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

print(f"Train set S3 location: {TRAIN_SET_S3_URI}")
print(f"Test set S3 location: {TEST_SET_S3_URI}")

Train set S3 location: s3://negan1911-mlschool/mnist/mnist_train.csv
Test set S3 location: s3://negan1911-mlschool/mnist/mnist_test.csv


# Create Validation Pipeline

In [8]:
import os
import sagemaker
import numpy as np
import boto3
import json
import pandas as pd
import numpy as np
import urllib.request
import argparse
import tempfile
from pathlib import Path

from botocore.exceptions import ClientError
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig

In [9]:
iam_client = boto3.client("iam")
sagemaker_client = boto3.client("sagemaker")
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

### Create processor script

In [10]:
%%writefile preprocessor.py

import os
import numpy as np
import pandas as pd
import tempfile

from pathlib import Path
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from pickle import dump


# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIRECTORY = "/opt/ml/processing"
DATA_FILEPATH = Path(BASE_DIRECTORY) / "input"
TRAIN_DATA_FILEPATH =DATA_FILEPATH / "train" / "mnist_train.csv"
TEST_DATA_FILEPATH = DATA_FILEPATH / "test" / "mnist_test.csv"


def save_splits(base_directory, train, validation):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """
    
    train_path = Path(base_directory) / "train" 
    validation_path = Path(base_directory) / "validation"
    
    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    
    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path / "validation.csv", header=False, index=False)
    
    
def save_pipeline(base_directory, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_directory) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", 'wb'))
    
def preprocess(base_directory, train_data_filepath, test_data_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train, validation,
    and a test set.
    """
    
    df_train = pd.read_csv(train_data_filepath)
    df_test = pd.read_csv(test_data_filepath)
    
    X = df_train.copy()
    columns = list(X.columns)
    
    X = X.to_numpy()
    
    np.random.shuffle(X)
    
    train, validation = np.split(X, [int(.8 * len(X))])
    
    X_train = pd.DataFrame(train, columns=columns)
    X_validation = pd.DataFrame(validation, columns=columns)
    

    label_encoder = LabelEncoder()
    
    y_train = label_encoder.fit_transform(X_train.label)
    y_validation = label_encoder.transform(X_validation.label)
    
    X_train.drop(["label"], axis=1, inplace=True)
    X_validation.drop(["label"], axis=1, inplace=True)
    
    
    pipeline = Pipeline(steps=[
        ('preprocessor', ColumnTransformer(
            transformers=[('scaler', StandardScaler(), X_train.columns)]
        ))
    ])
    
    X_train = pipeline.fit_transform(X_train)
    X_validation = pipeline.transform(X_validation)
    
    train = np.concatenate((np.expand_dims(y_train, axis=1), X_train), axis=1)
    validation = np.concatenate((np.expand_dims(y_validation, axis=1), X_validation), axis=1)
    
    save_splits(base_directory, train, validation)

        

if __name__ == "__main__":
    preprocess(BASE_DIRECTORY, TRAIN_DATA_FILEPATH, TEST_DATA_FILEPATH)


Overwriting preprocessor.py


### Test processor script

In [11]:
import os
import tempfile
from preprocessor import preprocess


with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        train_data_filepath=DATASET_FOLDER / "mnist_train.csv",
        test_data_filepath=DATASET_FOLDER / "mnist_test.csv"
    )
    
    print(f"Folders: {os.listdir(directory)}")
    

Folders: ['train', 'validation']


### Pipeline Config

In [12]:
train_dataset_location = ParameterString(
    name="train_dataset_location",
    default_value=TRAIN_SET_S3_URI,
)

test_dataset_location = ParameterString(
    name="test_dataset_location",
    default_value=TEST_SET_S3_URI,
)

preprocessor_destination = ParameterString(
    name="preprocessor_destination",
    default_value=f"{S3_FILEPATH}/preprocessing",
)

timestamp_signature = ParameterString(
    name="timestamp_signature",
    default_value="",
)

### Pipeline Preprocess step

In [13]:
cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

sklearn_processor = SKLearnProcessor(
    base_job_name="mnist-preprocessing",
    framework_version="0.23-1",
    instance_type="ml.m5.large",
    instance_count=1,
    role=role,
)

preprocess_step = ProcessingStep(
    name="preprocessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=train_dataset_location, destination="/opt/ml/processing/input/train"),
        ProcessingInput(source=test_dataset_location, destination="/opt/ml/processing/input/test"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=preprocessor_destination),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=preprocessor_destination)
    ],
    code=f"{MNIST_FOLDER}/preprocessor.py",
    cache_config=cache_config
)

### Test the pipeline

### Execute the pipeline

In [14]:
session1_pipeline = Pipeline(
    name="mnist-session1-pipeline",
    parameters=[
        train_dataset_location,
        test_dataset_location,
        preprocessor_destination
    ],
    steps=[
        preprocess_step, 
    ]
)

session1_pipeline.upsert(role_arn=role)
execution = session1_pipeline.start()

# Create Training Pipeline

In [14]:
from sagemaker.inputs import TrainingInput
from sagemaker.inputs import TrainingInput
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession

### Write Script for Training Model

In [15]:
%%writefile {MNIST_FOLDER}/train.py

import os
import argparse

import numpy as np
import pandas as pd
import tensorflow as tf

from pathlib import Path
from sklearn.metrics import accuracy_score

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD

def train(base_directory, train_path, validation_path, epochs=50, batch_size=32, learning_rate=0.01):
    X_train = pd.read_csv(Path(train_path) / "train.csv")
    y_train = X_train[X_train.columns[0]]
    X_train.drop(X_train.columns[0], axis=1, inplace=True)
    
    X_validation = pd.read_csv(Path(validation_path) / "validation.csv")
    y_validation = X_validation[X_validation.columns[0]]
    X_validation.drop(X_validation.columns[0], axis=1, inplace=True)
    
    model = Sequential([
        Dense(128, input_shape=(X_train.shape[1],), activation="relu"),
        Dense(64, activation="relu"),
        Dense(10, activation="softmax"),
    ])

    model.compile(
        optimizer=SGD(learning_rate=learning_rate),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    model.fit(
        X_train, 
        y_train, 
        validation_data=(X_validation, y_validation),
        epochs=epochs, 
        batch_size=batch_size,
        verbose=2,
    )

    predictions = np.argmax(model.predict(X_validation), axis=-1)
    print(f"Validation accuracy: {accuracy_score(y_validation, predictions)}")
    
    model_filepath = Path(base_directory) / "model" / "001"
    model.save(model_filepath)
    
if __name__ == "__main__":
    # Any hyperparameters provided by the training job are passed to the entry point
    # as script arguments. SageMaker will also provide a list of special parameters
    # that you can capture here. Here is the full list: 
    # https://github.com/aws/sagemaker-training-toolkit/blob/master/src/sagemaker_training/params.py
    parser = argparse.ArgumentParser()
    parser.add_argument("--base_directory", type=str, default="/opt/ml/")
    parser.add_argument("--train_path", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", None))
    parser.add_argument("--validation_path", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION", None))
    parser.add_argument("--epochs", type=int, default=50)
    parser.add_argument("--batch_size", type=int, default=32)
    parser.add_argument("--learning_rate", type=float, default=0.01)
    args, _ = parser.parse_known_args()
    
    train(
        base_directory=args.base_directory,
        train_path=args.train_path,
        validation_path=args.validation_path,
        epochs=args.epochs,
        batch_size=args.batch_size,
        learning_rate=args.learning_rate
    )

Overwriting ./train.py


### Test the training script

In [16]:
from preprocessor import preprocess
from train import train
import tempfile


with tempfile.TemporaryDirectory() as directory:
    # First, we preprocess the data and create the 
    # dataset splits.
    preprocess(
        base_directory=directory,
        train_data_filepath=DATASET_FOLDER / "mnist_train.csv",
        test_data_filepath=DATASET_FOLDER / "mnist_test.csv"
    )

    # Then, we train a model using the train and 
    # validation splits.
    train(
        base_directory=directory, 
        train_path=Path(directory) / "train", 
        validation_path=Path(directory) / "validation",
        epochs=10,
        learning_rate=0.01
    )

Epoch 1/10
1500/1500 - 4s - loss: 0.4803 - accuracy: 0.8662 - val_loss: 0.2739 - val_accuracy: 0.9202 - 4s/epoch - 3ms/step
Epoch 2/10
1500/1500 - 3s - loss: 0.2088 - accuracy: 0.9381 - val_loss: 0.2197 - val_accuracy: 0.9382 - 3s/epoch - 2ms/step
Epoch 3/10
1500/1500 - 3s - loss: 0.1562 - accuracy: 0.9532 - val_loss: 0.1928 - val_accuracy: 0.9452 - 3s/epoch - 2ms/step
Epoch 4/10
1500/1500 - 4s - loss: 0.1274 - accuracy: 0.9623 - val_loss: 0.1767 - val_accuracy: 0.9489 - 4s/epoch - 2ms/step
Epoch 5/10
1500/1500 - 3s - loss: 0.1079 - accuracy: 0.9684 - val_loss: 0.1678 - val_accuracy: 0.9528 - 3s/epoch - 2ms/step
Epoch 6/10
1500/1500 - 3s - loss: 0.0929 - accuracy: 0.9727 - val_loss: 0.1599 - val_accuracy: 0.9560 - 3s/epoch - 2ms/step
Epoch 7/10
1500/1500 - 3s - loss: 0.0807 - accuracy: 0.9769 - val_loss: 0.1550 - val_accuracy: 0.9583 - 3s/epoch - 2ms/step
Epoch 8/10
1500/1500 - 3s - loss: 0.0707 - accuracy: 0.9801 - val_loss: 0.1498 - val_accuracy: 0.9610 - 3s/epoch - 2ms/step
Epoch 9/



INFO:tensorflow:Assets written to: /tmp/tmpgns4qozt/model/001/assets


INFO:tensorflow:Assets written to: /tmp/tmpgns4qozt/model/001/assets


### Setting up a training step

In [17]:
hyperparameters = {
    "epochs": 50,
    "batch_size": 32,
    "learning_rate": 0.01
}

estimator = TensorFlow(
    entry_point=f"{MNIST_FOLDER}/train.py",
    hyperparameters=hyperparameters,
    framework_version="2.6",
    py_version="py38",
    instance_type="ml.m5.large",
    instance_count=1,
    script_mode=True,
    disable_profiler=True,
    role=role,
)

training_step = TrainingStep(
    name="training",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

### Pipeline Process & Train

In [18]:
session2_pipeline = Pipeline(
    name="mnist-session2-pipeline",
    parameters=[
        train_dataset_location,
        test_dataset_location,
        preprocessor_destination
    ],
    steps=[
        preprocess_step, 
        training_step
    ]
)

### Execute Pipeline

In [87]:
session2_pipeline.upsert(role_arn=role)
execution = session2_pipeline.start()

### Model Evaluation

In [19]:
%%writefile {MNIST_FOLDER}/evaluation.py

import os
import json
import tarfile
import numpy as np
import pandas as pd

from pathlib import Path
from tensorflow import keras
from sklearn.metrics import accuracy_score

MODEL_PATH = "/opt/ml/processing/model/"
TEST_PATH = "/opt/ml/processing/test/"
OUTPUT_PATH = "/opt/ml/processing/evaluation"

def evaluate(model_path, test_path, output_path):
    with tarfile.open(Path(model_path) / "model.tar.gz") as tar:
        tar.extractall(path=Path(model_path))
    
    model = keras.models.load_model(Path(model_path) / "001")
    
    X_test = pd.read_csv(Path(test_path) / "mnist_test.csv")
    y_test = X_test[X_test.columns[0]]
    X_test.drop(X_test.columns[0], axis=1, inplace=True)
    
    predictions = np.argmax(model.predict(X_test), axis=-1)
    accuracy = accuracy_score(y_test, predictions)
    print(f'Test Accuracy: {accuracy}')
    
    evaluation_report = {
        "metrics": {
            "accuracy": {
                "value": accuracy
            }
        }
    }
    
    Path(output_path).mkdir(parents=True, exist_ok=True)
    with open(Path(output_path) / "evaluation.json", "w") as f:
        f.write(json.dumps(evaluation_report))
        

if __name__ == "__main__":
    evaluate(
        model_path=MODEL_PATH,
        test_path=TEST_PATH,
        output_path=OUTPUT_PATH
    )
    

Overwriting ./evaluation.py


### Test script

In [20]:
from preprocessor import preprocess
from train import train
from evaluation import evaluate

import tarfile
import tempfile


with tempfile.TemporaryDirectory() as directory:
    # First, we preprocess the data and create the 
    # dataset splits.
    preprocess(
        base_directory=directory,
        train_data_filepath=DATASET_FOLDER / "mnist_train.csv",
        test_data_filepath=DATASET_FOLDER / "mnist_test.csv"
    )

    # Then, we train a model using the train and 
    # validation splits.
    train(
        base_directory=directory, 
        train_path=Path(directory) / "train", 
        validation_path=Path(directory) / "validation",
        epochs=10,
        learning_rate=0.01
    )
    
    # After training a model, we need to prepare a package just like
    # SageMaker would. This package is what the evaluation script is
    # expecting as an input.
    with tarfile.open(Path(directory) / "model.tar.gz", "w:gz") as tar:
        tar.add(Path(directory) / "model" / "001", arcname="001")
    
    # Test the evaluation script
    evaluate(
        model_path=directory,
        test_path=DATASET_FOLDER,
        output_path=Path(directory) / "evaluation"
    )
        

Epoch 1/10
1500/1500 - 4s - loss: 0.5011 - accuracy: 0.8601 - val_loss: 0.2683 - val_accuracy: 0.9234 - 4s/epoch - 3ms/step
Epoch 2/10
1500/1500 - 3s - loss: 0.2112 - accuracy: 0.9375 - val_loss: 0.2103 - val_accuracy: 0.9378 - 3s/epoch - 2ms/step
Epoch 3/10
1500/1500 - 3s - loss: 0.1610 - accuracy: 0.9514 - val_loss: 0.1832 - val_accuracy: 0.9459 - 3s/epoch - 2ms/step
Epoch 4/10
1500/1500 - 3s - loss: 0.1314 - accuracy: 0.9615 - val_loss: 0.1673 - val_accuracy: 0.9514 - 3s/epoch - 2ms/step
Epoch 5/10
1500/1500 - 3s - loss: 0.1114 - accuracy: 0.9676 - val_loss: 0.1576 - val_accuracy: 0.9535 - 3s/epoch - 2ms/step
Epoch 6/10
1500/1500 - 3s - loss: 0.0957 - accuracy: 0.9718 - val_loss: 0.1511 - val_accuracy: 0.9568 - 3s/epoch - 2ms/step
Epoch 7/10
1500/1500 - 4s - loss: 0.0831 - accuracy: 0.9767 - val_loss: 0.1431 - val_accuracy: 0.9601 - 4s/epoch - 2ms/step
Epoch 8/10
1500/1500 - 3s - loss: 0.0736 - accuracy: 0.9793 - val_loss: 0.1395 - val_accuracy: 0.9603 - 3s/epoch - 2ms/step
Epoch 9/



INFO:tensorflow:Assets written to: /tmp/tmpd80hux4e/model/001/assets


INFO:tensorflow:Assets written to: /tmp/tmpd80hux4e/model/001/assets


Test Accuracy: 0.9297


In [21]:
import tarfile

from sagemaker.tensorflow import TensorFlowProcessor
from sagemaker.workflow.properties import PropertyFile

tensorflow_processor = TensorFlowProcessor(
    framework_version="2.6",
    py_version="py38",
    base_job_name="mnist-evaluation-processor",
    instance_type="ml.m5.large",
    instance_count=1,
    role=role
)

# By default, the TensorFlowProcessor runs the script using
# /bin/bash as its entrypoint. We want to ensure we run it 
# using python3.
tensorflow_processor.framework_entrypoint_command = ["python3"]

In [22]:
# This is the input in case we want to use the trained model
# from the Training Step.
model_input = ProcessingInput(
    source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    destination="/opt/ml/processing/model"
)


In [23]:
# This is the location where the Processing Step will store 
# the report.
evaluation_destination = ParameterString(
    name="evaluation_destination",
    default_value=f'{S3_FILEPATH}/evaluation',
)


# We want to map the evaluation report that we generate inside
# the evaluation script so we can later reference it.
evaluation_report = PropertyFile(
    name="evaluation-report",
    output_name="evaluation",
    path="evaluation.json"
)


# Notice how this step uses the model generated by the tuning or training
# step, and the test set generated by the preprocessing step.
evaluation_step = ProcessingStep(
    name="evaluation",
    processor=tensorflow_processor,
    inputs=[
        model_input,
        ProcessingInput(
            source=test_dataset_location,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination=evaluation_destination),
    ],
    code=f"{MNIST_FOLDER}/evaluation.py",
    property_files=[evaluation_report],
    cache_config=cache_config
)

In [24]:
session3_pipeline = Pipeline(
    name="mnist-session3-pipeline",
    parameters=[
        train_dataset_location,
        test_dataset_location,
        preprocessor_destination,
        evaluation_destination
    ],
    steps=[
        preprocess_step, 
        training_step,
        evaluation_step
    ]
)

In [51]:
session3_pipeline.upsert(role_arn=role)
execution = session3_pipeline.start()

### Deploy to Model Registry

In [30]:
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.functions import Join
from sagemaker.workflow.fail_step import FailStep

model_approval_status = ParameterString(
    name="model_approval_status", 
    default_value="Approved"
)

accuracy_threshold = ParameterFloat(
    name="accuracy_threshold", 
    default_value=0.60
)

model = TensorFlowModel(
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    framework_version="2.6",
    sagemaker_session=PipelineSession(),
    role=role,
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(on="/", values=[
            evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'],
            "evaluation.json"]
        ),
        content_type="application/json",
    )
)

model_package_group_name = "mnist-model-package-group"

register_model_step = ModelStep(
    name="register-model",
    step_args=model.register(
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
        approval_status=model_approval_status,
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.m5.large"],
        transform_instances=["ml.m5.large"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="TENSORFLOW",
        framework_version="2.6",
    ),
)

condition_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value"
    ),
    right=accuracy_threshold
)

fail_step = FailStep(
    name="fail",
    error_message=Join(
        on=" ", 
        values=[
            "Execution failed because the model's accuracy was lower than", 
            accuracy_threshold
        ]
    ),
)

condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition_gte],
    if_steps=[register_model_step],
    else_steps=[fail_step], 
)

session4_pipeline = Pipeline(
    name="mnist-session4-pipeline",
    parameters=[
        train_dataset_location,
        test_dataset_location,
        preprocessor_destination,
        evaluation_destination,
        model_approval_status,
        accuracy_threshold,
    ],
    steps=[
        preprocess_step, 
        training_step, 
        evaluation_step,
        condition_step
    ],
)

In [31]:
session4_pipeline.upsert(role_arn=role)
execution = session4_pipeline.start()

