# Pipeline of Digits

This is a starting notebook for solving the "Pipeline of Digits" assignment.


This notebook was created by [Santiago L. Valdarrama](https://twitter.com/svpino) as part of the [Machine Learning School](https://www.ml.school) program.

Let's make sure we are running the latest version of the SakeMaker's SDK. **Restart the notebook** after you upgrade the library.

In [2]:
# !pip install -q --upgrade pip
# !pip install -q --upgrade awscli boto3
# !pip install -q --upgrade sagemaker==2.146.0
# !pip show sagemaker

In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
import os
import sagemaker
import numpy as np
import boto3
import json
import pandas as pd
import numpy as np
import urllib.request
import argparse
import tempfile
from pathlib import Path

from botocore.exceptions import ClientError
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig

In [5]:
iam_client = boto3.client("iam")
sagemaker_client = boto3.client("sagemaker")
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

## Creating the S3 Bucket

Let's create an S3 bucket where you will upload all the information generated by the pipeline. Make sure you set `BUCKET` to the name of the bucket you want to use. This name has to be unique.

If you want to create a bucket in a region other than `us-east-1`, use this command instead:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region
```

The `LocationConstraint` argument should specify the region where you want to create the bucket.

In [6]:
BUCKET = "mlschool01"

!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region


An error occurred (BucketAlreadyOwnedByYou) when calling the CreateBucket operation: Your previous request to create the named bucket succeeded and you already own it.


## Loading the dataset

We have two CSV files containing the MNIST dataset. These files come from the [MNIST in CSV](https://www.kaggle.com/datasets/oddrationale/mnist-in-csv) Kaggle dataset.

The `mnist_train.csv` file contains 60,000 training examples and labels. The `mnist_test.csv` contains 10,000 test examples and labels. Each row consists of 785 values: the first value is the label (a number from 0 to 9) and the remaining 784 values are the pixel values (a number from 0 to 255).

Let's extract the `dataset.tar.gz` file.

In [7]:
MNIST_FOLDER = Path('/root/ml.school/mnist')
# MNIST_FOLDER = "/root/ml.school/mnist"
DATASET_FOLDER = Path(MNIST_FOLDER) / "dataset"

!tar -xvzf $MNIST_FOLDER/dataset.tar.gz -C $MNIST_FOLDER --no-same-owner

# TRIM ORIGINAL MNIST DATASET SO THAT IT CAN BE USED WITH THE FREE KERNEL FROM SAGEMAKER
# df = pd.read_csv(DATASET_FOLDER / "mnist_train.csv")
# train, _ = np.split(df, [int(.7 * len(df))])
# pd.DataFrame(train).to_csv(Path(MNIST_FOLDER) / "dataset/mnist_train.csv", index=False)
# # df.shape
# train
# !tar -xvzf $MNIST_FOLDER/dataset.tar.gz -C $MNIST_FOLDER --no-same-owner

dataset/
dataset/mnist_test.csv
dataset/mnist_train.csv


In [8]:
df = pd.read_csv(DATASET_FOLDER / "mnist_train.csv")
df.shape

(60000, 785)

Let's load the first 10 rows of the test set.

## Uploading dataset to S3

In [9]:
S3_FILEPATH = f"s3://{BUCKET}/{MNIST_FOLDER}"


TRAIN_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_train.csv"),
    desired_s3_uri=S3_FILEPATH,
)

TEST_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_test.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

print (S3_FILEPATH)
print(f"Train set S3 location: {TRAIN_SET_S3_URI}")
print(f"Test set S3 location: {TEST_SET_S3_URI}")

s3://mlschool01//root/ml.school/mnist
Train set S3 location: s3://mlschool01/root/ml.school/mnist/mnist_train.csv
Test set S3 location: s3://mlschool01/root/ml.school/mnist/mnist_test.csv


In [14]:
dd = pd.read_csv("s3://mlschool01//root/ml.school/mnist/preprocessing/test.csv")
dd.shape

(8999, 785)

## Preprocessing Dataset

In [15]:
%%writefile {MNIST_FOLDER}/preprocessor.py

import os
import numpy as np
import pandas as pd
import tempfile

from pathlib import Path
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from pickle import dump

# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIR = "/opt/ml/processing"
DATA_FILEPATH = Path(BASE_DIR) / "input" / "mnist_train.csv"

# # FOR LOCAL TESTING
# BASE_DIR = '/root/ml.school/mnist/opt/ml/processing'
# DATA_FILEPATH = Path(DATASET_FOLDER) /  "mnist_train.csv"

def save_splits(base_dir, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """
    
    train_path = Path(base_dir) / "train" 
    validation_path = Path(base_dir) / "validation" 
    test_path = Path(base_dir) / "test"
    
    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)
    
    
    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path / "validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)
    
def save_pipeline(base_dir, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_dir) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", 'wb'))
    
def generate_baseline_dataset(split_name, base_dir, X, y):
    """
    To monitor the data and the quality of our model we need to compare the 
    production quality and results against a baseline. To create those baselines, 
    we need to use a dataset to compute statistics and constraints. That dataset
    should contain information in the same format as expected by the production
    endpoint. This function will generate a baseline dataset and save it to 
    disk so we can later use it.
    
    """
    baseline_path = Path(base_dir) / f"{split_name}-baseline" 
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = X.copy()
    
    # The baseline dataset needs a column containing the groundtruth.
    df["groundtruth"] = y
    df["groundtruth"] = df["groundtruth"].values.astype(str)
    
    # We will use the baseline dataset to generate baselines
    # for monitoring data and model quality. To simplify the process, 
    # we don't want to include any NaN rows.
    df = df.dropna()

    df.to_json(baseline_path / f"{split_name}-baseline.json", orient='records', lines=True)
    
def preprocess(base_dir, data_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train, validation,
    and a test set.
    """
    df = pd.read_csv(data_filepath)
    numerical_columns = [column for column in df.columns if df[column].dtype in ["int64", "float64"]]
    numerical_columns.remove('label')
    
    numerical_preprocessor = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="mean")),
        ("scaler", StandardScaler())
    ])
    
    preprocessor = ColumnTransformer(
        transformers=[
            ("numerical", numerical_preprocessor, numerical_columns)
        ]
    )
    
    X = df
    columns = list(X.columns)
    
    X = X.to_numpy()
    np.random.shuffle(X)
    X, _ = np.split(X, [int(.5 * len(X))])
    train, validation, test = np.split(X, [int(.5 * len(X)), int(.7 * len(X))])
    
    X_train = pd.DataFrame(train, columns=columns)
    X_validation = pd.DataFrame(validation, columns=columns)
    X_test = pd.DataFrame(test, columns=columns)
    
    y_train = X_train.label
    y_validation = X_validation.label
    y_test = X_test.label
    
    
    label_encoder = LabelEncoder()
    
    y_train = label_encoder.fit_transform(y_train)
    y_validation = label_encoder.transform(y_validation)
    y_test = label_encoder.transform(y_test)
    
    
    X_train.drop(["label"], axis=1, inplace=True)
    X_validation.drop(["label"], axis=1, inplace=True)
    X_test.drop(["label"], axis=1, inplace=True)
    
    # Let's generate a dataset that we can later use to compute
    # baseline statistics and constraints about the data that we
    # used to train our model.
    generate_baseline_dataset("train", base_dir, X_train, y_train)
    
    # To generate baseline constraints about the quality of the
    # model's predictions, we will use the test set.
    generate_baseline_dataset("test", base_dir, X_test, y_test)
    
    # Transform the data using the Scikit-Learn pipeline.
    X_train = preprocessor.fit_transform(X_train)
    X_validation = preprocessor.transform(X_validation)
    X_test = preprocessor.transform(X_test)

    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)
    
    save_splits(base_dir, train, validation, test)
    save_pipeline(base_dir, pipeline=preprocessor)

if __name__ == "__main__":
    preprocess(BASE_DIR, DATA_FILEPATH)


Overwriting /root/ml.school/mnist/preprocessor.py


In [16]:
df = pd.read_csv(DATASET_FOLDER / "mnist_train.csv", nrows = 10)
df.shape

(10, 785)

### TESTING PREPROCESS

In [17]:
# from preprocessor import preprocess


# def print_baseline(split_name):
#     print()
#     print(f"Baseline {split_name}:")
#     with open(Path(directory) / f"{split_name}-baseline" / f"{split_name}-baseline.json") as baseline:
#         lines = [next(baseline) for _ in range(5)]
        
#     for l in lines:
#         print(l[:-1])
    

# with tempfile.TemporaryDirectory() as directory:
#     print (DATASET_FOLDER / "mnist_train.csv")
#     preprocess(
#         base_dir=directory, 
#         data_filepath= Path(DATASET_FOLDER) / "mnist_train.csv"
#         # data_filepath = TRAIN_SET_S3_URI
#     )
    
#     print(f"Folders: {os.listdir(directory)}")
    
#     print_baseline("train")
#     print_baseline("test")

In [18]:
dataset_location = ParameterString(
    name="dataset_location",
    default_value=TRAIN_SET_S3_URI,
)

preprocessor_destination = ParameterString(
    name="preprocessor_destination",
    default_value=f"{S3_FILEPATH}/preprocessing",
)

train_dataset_baseline_destination = ParameterString(
    name="train_dataset_baseline_destination",
    default_value=f"{S3_FILEPATH}/preprocessing/baselines/train",
)

test_dataset_baseline_destination = ParameterString(
    name="test_dataset_baseline_destination",
    default_value=f"{S3_FILEPATH}/preprocessing/baselines/test",
)

timestamp_signature = ParameterString(
    name="timestamp_signature",
    default_value="",
)

In [19]:
cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

In [20]:
sklearn_processor = SKLearnProcessor(
    base_job_name="mnist-preprocessing",
    framework_version="0.23-1",
    instance_type='ml.t3.medium',
    instance_count=1,
    role=role,
)

In [21]:
preprocess_step = ProcessingStep(
    name="preprocessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=dataset_location, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=preprocessor_destination),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=preprocessor_destination),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=preprocessor_destination),
        ProcessingOutput(output_name="pipeline", source="/opt/ml/processing/pipeline", destination=preprocessor_destination),
        ProcessingOutput(output_name="train-baseline", source="/opt/ml/processing/train-baseline", destination=train_dataset_baseline_destination),
        ProcessingOutput(output_name="test-baseline", source="/opt/ml/processing/test-baseline", destination=test_dataset_baseline_destination),
    ],
    code=f"{MNIST_FOLDER}/preprocessor.py",
    cache_config=cache_config
)

In [22]:
session1_pipeline1 = Pipeline(
    name="mnist-session1-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination
    ],
    steps=[
        preprocess_step, 
    ]
)

### RUN PIPELINE FOR SESSION 1

In [23]:
# session1_pipeline1.upsert(role_arn=role)
# execution = session1_pipeline1.start()

## Training Model

In [24]:
from sagemaker.tuner import HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep
from sagemaker.parameter import IntegerParameter, ContinuousParameter
from sagemaker.inputs import TrainingInput
from sagemaker.tensorflow import TensorFlow
from sagemaker.pytorch import PyTorch
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession

In [25]:
%%writefile {MNIST_FOLDER}/train.py

import argparse
import pandas as pd
import os
from pathlib import Path
import numpy as np
import tempfile
import logging

from sklearn.metrics import accuracy_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor
from torchvision import models
from torchvision import datasets
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.multiprocessing as mp

import boto3

# import torchmetrics
# from torchmetrics.classification import MulticlassAccuracy
import time


logger = logging.getLogger(__name__)
# Define the neural network model
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 10),
        )
        self.softmax = nn.Softmax(dim=1)
        # self.accuracy = MulticlassAccuracy(num_classes=10)
        self.val_acc = 0

    def forward(self, input_data):
        x = self.flatten(input_data)
        logits = self.linear_relu_stack(x)
        predictions = self.softmax(logits)
        return predictions
    
    def validation_accuracy(self, inputs, vals):
        pred = self(inputs).detach().numpy()
        predictions = np.argmax(pred, axis=-1)
        self.val_acc = accuracy_score(vals, predictions)
        # self.log('val_acc', self.accuracy.item())
        return self.val_acc
    
    
class CustomDataset(Dataset):
    def __init__(self, train, target):
        super(Dataset, self).__init__()
        self.train = train
        self.target = target
    
    def __len__(self):
        return len(self.target)
    
    def __getitem__(self, index):

        return self.train[index], self.target[index]    
    
# Download MNIST dataset if you dont already have one
def download_mnist_datasets():
    train_data = datasets.MNIST(
        root = "data",
        download = True,
        train = True,
        transform = ToTensor()
    )
    
    print (train_data)
    
    validation_data = datasets.MNIST(
        root ="data",
        download= True,
        train = False,
        transform = ToTensor()
    )
    
    return train_data, validation_data

    
def train_one_epoch(model, data_loader, loss_fn, optimiser, device, val_input, val_target):
    
    for inputs, targets in data_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        # Calculate loss
        predictions = model(inputs)
        loss = loss_fn(predictions, targets)
        # Back propagate loss and update weights
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()
    
    val_acc = model.validation_accuracy(val_input, val_target)
    print (f"Loss: {loss.item()} | val_accuracy: {val_acc}")
    logger.info(f"val_accuracy: {val_acc}")
    # return loss.item()

    
def train_epochs(model, data_loader, loss_fn, optimiser, device, epochs, val_input, val_target):
    for i in range(epochs):
        print (f"Epoch {i+1}")
        logger.info(f"Epoch: {i+1}")
        train_one_epoch(model, data_loader, loss_fn, optimiser, device, val_input, val_target)
        print ("------------------------------------------------------")
    print ("Training completed")
    
        
def train(base_directory, train_path, validation_path, epochs=50, batch_size=32, learning_rate = 0.001):
    
#     # Download MNIST dataset if you dont already have one
#     train_data, validation_data = download_mnist_datasets()
#     print ('downloaded')
    
#     # Create data loader for the train set
#     train_data_loader = DataLoader(train_data, batch_size)
#     for inputs, targets in train_data_loader:
#         print (inputs.dtype)
#         print (targets.dtype)

    # Load dataset
    X = pd.read_csv(Path(train_path) / "train.csv")
    y = X[X.columns[-1]]
    X.drop(X.columns[-1], axis=1, inplace=True)
    X_train = torch.tensor(X.values, dtype=torch.float32)
    y_train = torch.tensor(y.values, dtype=torch.int64)
    
    print(X_train.shape)
    
    testmnist = CustomDataset(X_train, y_train)
    dataloader = DataLoader(testmnist, batch_size=32, shuffle = False)
    # for inputs, targets in dataloader:
    #     print (inputs.shape)
    #     print("-------------------------------------------------")
    #     print(targets.shape)
    #     print(" ================================================= ")
    
    X_val = pd.read_csv(Path(validation_path) / "validation.csv")
    y_val = X_val[X_val.columns[-1]]
    X_val.drop(X_val.columns[-1], axis=1, inplace=True)
    X_validation = torch.tensor(X_val.values, dtype=torch.float32)
    y_validation = torch.tensor(y_val.values, dtype=torch.int64)


    # Build model
    # Set device for training
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = Net().to(device)
    
    # print (X_train.dtype)
    # print (model.weight.dtype)
    
    # Instantiate loss function & optimiser
    loss_fn = nn.CrossEntropyLoss()
    optimiser = torch.optim.Adam(model.parameters(),
                                lr = learning_rate)
    
    # Train Model
    train_epochs(model, dataloader, loss_fn, optimiser, device, epochs, X_validation, y_validation)
    # train_one_epoch(model, dataloader, loss_fn, optimiser, device)
    
    # # Store model
    # torch.save(model.state_dict(), Path(base_directory)/"model/")
    # print ("Model trained and stored")
    
    # Store model
    model_dir = Path(base_directory) / "model" / "001"
    S3_path = "s3://mlschool01//root/ml.school/mnist"
    # model_dir = Path(S3_path) / "model" / "001"
    
    model_dir.mkdir(parents=True, exist_ok=True)
    with open(os.path.join(model_dir, 'model.pth'), 'wb') as f:
        torch.save(model.state_dict(), f)
        # TEST UPLOAD OF THE MODEL TO S3
        # s3 = boto3.resource('s3')
        # s3.Bucket("mlschool01").upload_file(os.path.join(model_dir, 'model.pth'), "root/ml.school/mnist/model/model.tar.gz")

    
    
if __name__ == "__main__":
    # Any hyperparameters provided by the training job are passed to the entry point
    # as script arguments. SageMaker will also provide a list of special parameters
    # that you can capture here. Here is the full list: 
    # https://github.com/aws/sagemaker-training-toolkit/blob/master/src/sagemaker_training/params.py
    parser = argparse.ArgumentParser()
    parser.add_argument("--base_directory", type=str, default="/opt/ml/")
    parser.add_argument("--train_path", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", None))
    parser.add_argument("--validation_path", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION", None))
    parser.add_argument("--epochs", type=int, default=3)
    parser.add_argument("--batch_size", type=int, default=32)
    parser.add_argument("--learning_rate", type=float, default=0.01)
    args, _ = parser.parse_known_args()
    
    # PRODUCTION
    train(
        base_directory=args.base_directory,
        train_path=args.train_path,
        validation_path=args.validation_path,
        epochs=args.epochs,
        batch_size=args.batch_size,
        learning_rate = args.learning_rate
    )
    
    # # LOCAL TESTING
    # base_dir = Path(os.getcwd()) / "opt/ml/processing/"
    # train_path = Path(base_dir) / "train/"
    # validation_path = Path(base_dir) / "validation/"
    # train(
    #     base_directory= base_dir,
    #     train_path= train_path,
    #     validation_path= validation_path,
    #     epochs=args.epochs,
    #     batch_size=args.batch_size
    # )

Overwriting /root/ml.school/mnist/train.py


## Testing training script

In [26]:
# from preprocessor import preprocess
# from train import train


# with tempfile.TemporaryDirectory() as directory:
#     # First, we preprocess the data and create the 
#     # dataset splits.
#     preprocess(
#         base_dir=directory, 
#         data_filepath = Path(DATASET_FOLDER) /  "mnist_train.csv"
#     )

#     # Then, we train a model using the train and 
#     # validation splits.
#     train(
#         base_directory=directory, 
#         train_path=Path(directory) / "train", 
#         validation_path=Path(directory) / "validation",
#         epochs=5
#     )

## Training and Tuning switch

In [27]:
USE_TUNING_STEP = True
# USE_TUNING_STEP = False

## Setting up training step

In [28]:
hyperparameters = {
    "epochs": 20,
    "batch_size": 32,
    "learning_rate":0.001,
}

estimator = PyTorch(
    entry_point=f"{MNIST_FOLDER}/train.py",
    hyperparameters=hyperparameters,
    framework_version="1.13.1",
    py_version="py39",
    instance_type="ml.m5.large",
    instance_count=1,
    script_mode=True,
    disable_profiler=True,
    role=role,
)


In [29]:
training_step = TrainingStep(
    name="training",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

## Set up tuning step

In [30]:
hyperparameter_ranges = {
    "epochs": IntegerParameter(10, 50),
    "learning_rate":ContinuousParameter(0.001,0.004)
}

objective_metric_name = "val_accuracy"
objective_type = "Maximize"
metric_definitions = [{"Name": objective_metric_name, "Regex": "val_accuracy: ([0-9\\.]+)"}]
    
tuner = HyperparameterTuner(
    estimator,
    objective_metric_name,
    hyperparameter_ranges,
    metric_definitions,
    objective_type=objective_type,
    max_jobs=3,
    max_parallel_jobs=3,
)

In [31]:
tuning_step = TuningStep(
    name = "tuning",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

In [32]:
session2_pipeline = Pipeline(
    name="mnist-session2-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
    ],
    steps=[
        preprocess_step, 
        tuning_step if USE_TUNING_STEP else training_step
    ]
)

In [33]:
# session2_pipeline.upsert(role_arn=role)
# execution = session2_pipeline.start()

# Model Evaluation

In [34]:
import tarfile

from sagemaker.tensorflow import TensorFlowProcessor
from sagemaker.pytorch.processing import PyTorchProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

## Evaluate Model

In [35]:
%%writefile {MNIST_FOLDER}/evaluation.py

import os
import json
import tarfile
import numpy as np
import pandas as pd
import torch
import torch.nn as nn

from pathlib import Path
# from tensorflow import keras
import torch
from sklearn.metrics import accuracy_score

## LOCAL TESTING
# BASE_DIR = '/root/ml.school/mnist/opt/ml/processing'
# MODEL_PATH = Path(BASE_DIR) / "model/"
# TEST_PATH = Path(BASE_DIR) / "test/"
# OUTPUT_PATH = Path(BASE_DIR) / "evaluation/"

## ACTUAL USE
MODEL_PATH = "/opt/ml/processing/model/"
TEST_PATH = "/opt/ml/processing/test/"
OUTPUT_PATH = "/opt/ml/processing/evaluation/"

# Define the neural network model
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 10),
        )
        self.softmax = nn.Softmax(dim=1)
        # self.accuracy = MulticlassAccuracy(num_classes=10)
        self.val_acc = 0

    def forward(self, input_data):
        x = self.flatten(input_data)
        logits = self.linear_relu_stack(x)
        predictions = self.softmax(logits)
        return predictions
    
    def validation_accuracy(self, inputs, vals):
        pred = self(inputs).detach().numpy()
        predictions = np.argmax(pred, axis=-1)
        self.val_acc = accuracy_score(vals, predictions)
        # self.log('val_acc', self.accuracy.item())
        return self.val_acc

def model_fn(model_dir):
    model = Net()
    print (model_dir)
    with open(os.path.join(model_dir, 'model.pth'), 'rb') as f:
        model.load_state_dict(torch.load(f))
    return model
    
def evaluate(model_path, test_path, output_path):
    ## FOR LOCAL TESTING
    # with tarfile.open(Path(model_path) / "model.tar.gz", "w:gz") as tar:
    #     tar.add(Path(model_path) / "001", arcname="001")
    
    # The first step is to extract the model package provided
    # by SageMaker.
    with tarfile.open(Path(model_path) / "model.tar.gz") as tar:
        tar.extractall(path=Path(model_path))
        
    # We can now load the model from disk.
    # model = keras.models.load_model(Path(model_path) / "001")
    # model_dir = Path(model_path) / "model" / "001"
    model_dir = Path(model_path) / "001"
    model = model_fn(model_dir)
    # model = torch.load(Path(model_path) / "001")
    model.eval()
    
    # Read test data
    X_test = pd.read_csv(Path(test_path) / "test.csv")
    y_test = X_test[X_test.columns[-1]]
    X_test.drop(X_test.columns[-1], axis=1, inplace=True)
    
    # Convert dataframe to tensors
    X_test = torch.tensor(X_test.values, dtype=torch.float32)
    y_test = torch.tensor(y_test.values, dtype=torch.int64)
    
    # Getting validation accuracy
    accuracy = model.validation_accuracy(X_test, y_test)
    pred = model(X_test).detach().numpy()
    predictions = np.argmax(pred, axis=-1)
    print(f"Validation accuracy: {accuracy}")
    print (f"Prediction {predictions}")

    # Add the accuracy of the model to our evaluation report.
    evaluation_report = {
        "metrics": {
            "accuracy": {
                "value": accuracy
            },
        },
    }
    
    # Save the evaluation report to the output path.
    Path(output_path).mkdir(parents=True, exist_ok=True)
    with open(Path(output_path) / "evaluation.json", "w") as f:
        f.write(json.dumps(evaluation_report))


if __name__ == "__main__":
    evaluate(
        model_path=MODEL_PATH, 
        test_path=TEST_PATH,
        output_path=OUTPUT_PATH
    )

Overwriting /root/ml.school/mnist/evaluation.py


## Test Evaluation Script

In [36]:
# from preprocessor import preprocess
# from train import train
# from evaluation import evaluate


# with tempfile.TemporaryDirectory() as directory:
#     # First, we preprocess the data and create the 
#     # dataset splits.
#     preprocess(
#         base_dir=directory, 
#         # data_filepath=LOCAL_FILEPATH
#         data_filepath=Path(DATASET_FOLDER) /  "mnist_train.csv"
#     )

#     # Then, we train a model using the train and 
#     # validation splits.
#     train(
#         base_directory=directory, 
#         train_path=Path(directory) / "train", 
#         validation_path=Path(directory) / "validation",
#         epochs=1
#     )
    
#     # After training a model, we need to prepare a package just like
#     # SageMaker would. This package is what the evaluation script is
#     # expecting as an input.
#     with tarfile.open(Path(directory) / "model.tar.gz", "w:gz") as tar:
#         tar.add(Path(directory) / "model" / "001", arcname="001")
        
#     print (f"directory : {directory}")
    
#     # We can now call the evaluation script.
#     evaluate(
#         model_path=directory, 
#         test_path=Path(directory) / "test",
#         output_path=Path(directory) / "evaluation",
#     )

## Pipeline configuration

In [37]:
evaluation_destination = ParameterString(
    name="evaluation_destination",
    default_value=f'{S3_FILEPATH}/evaluation',
)

In [38]:
S3_FILEPATH

's3://mlschool01//root/ml.school/mnist'

## Setting up processor

In [39]:
#Initialize the PyTorchProcessor
pytorch_processor = PyTorchProcessor(
    framework_version='1.13.1',
    role=role,
    instance_type='ml.t3.medium',
    instance_count=1,
    base_job_name='mnist-evaluation-processor',
    py_version="py39",
)

pytorch_processor.framework_entrypoint_command = ["python3"]

## Configure model input

In [40]:
# This is the input in case we want to use the best model generated
# by the Tuning Step.
tuning_model_input = ProcessingInput(
    source=tuning_step.get_top_model_s3_uri(
        top_k=1, 
        s3_bucket=sagemaker_session.default_bucket()
    ),
    destination="/opt/ml/processing/model",
)

# This is the input in case we want to use the trained model
# from the Training Step.
training_model_input = ProcessingInput(
    source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    destination="/opt/ml/processing/model"
)

# We can now select the appropriate input depending on which step
# we are using.
model_input = tuning_model_input if USE_TUNING_STEP else training_model_input

## Set up processing step

In [41]:
print (MNIST_FOLDER)

/root/ml.school/mnist


In [42]:
# We want to map the evaluation report that we generate inside
# the evaluation script so we can later reference it.
evaluation_report = PropertyFile(
    name="evaluation-report",
    output_name="evaluation",
    path="evaluation.json"
)


# Notice how this step uses the model generated by the tuning or training
# step, and the test set generated by the preprocessing step.
evaluation_step = ProcessingStep(
    name="evaluation",
    processor=pytorch_processor,
    inputs=[
        model_input,
        ProcessingInput(
            source=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination=evaluation_destination),
    ],
    code=f"{MNIST_FOLDER}/evaluation.py",
    # code=f"{S3_FILEPATH}/evaluation.py",
    property_files=[evaluation_report],
    cache_config=cache_config
)

## Pipeline

In [43]:
session3_pipeline = Pipeline(
    name="mnist-session3-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        evaluation_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
    ],
    steps=[
        preprocess_step, 
        tuning_step if USE_TUNING_STEP else training_step,
        evaluation_step
    ]
)

In [44]:
# session3_pipeline.upsert(role_arn=role)
# execution = session3_pipeline.start()

# Model Registration

## Approval and Threshold configuration

In [45]:
import time

from sagemaker import ModelPackage
from sagemaker.model import Model
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.pytorch.model import PyTorchModel
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.predictor import Predictor
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.functions import Join

In [46]:
model_approval_status = ParameterString(
    name="model_approval_status", 
    default_value="Approved"
)

accuracy_threshold = ParameterFloat(
    name="accuracy_threshold", 
    default_value=0.60
)

## Configurin Model Assets

In [47]:
# This is the model data in case we want to use the best model generated
# by the Tuning Step.
tuning_model_data = tuning_step.get_top_model_s3_uri(
    top_k=1, 
    s3_bucket=sagemaker_session.default_bucket()
)

# This is the model data in case we want to use the trained model
# from the Training Step.
training_model_data = training_step.properties.ModelArtifacts.S3ModelArtifacts

# We can now select the appropriate model data depending on which step
# we are using.
model_data = tuning_model_data if USE_TUNING_STEP else training_model_data

## Configure Model

In [48]:
model = PyTorchModel(
    model_data=model_data,
    framework_version="1.13.1",
    sagemaker_session=PipelineSession(),
    py_version="py39",
    role=role,
    entry_point = None
)

In [49]:
print (evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output'])

{'S3Uri': ParameterString(name='evaluation_destination', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://mlschool01//root/ml.school/mnist/evaluation'), 'LocalPath': '/opt/ml/processing/evaluation', 'S3UploadMode': 'EndOfJob'}


## Setting up Model Metrics

In [50]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(on="/", values=[
            evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'],
            "evaluation.json"]
        ),
        content_type="application/json",
    )
)

## Set up model step

In [51]:
model_package_group_name = "mnist-model-package-group"

register_model_step = ModelStep(
    name="register-model",
    step_args=model.register(
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
        approval_status=model_approval_status,
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.m5.large"],
        transform_instances=["ml.m5.large"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="PYTORCH",
        framework_version="1.13.1",
    ),
)



In [52]:
# ml.t3.medium
# ml.m5.large

## Set up condition step

In [53]:
condition_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value"
    ),
    right=accuracy_threshold
)

fail_step = FailStep(
    name="fail",
    error_message=Join(
        on=" ", 
        values=[
            "Execution failed because the model's accuracy was lower than", 
            accuracy_threshold
        ]
    ),
)

condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition_gte],
    if_steps=[register_model_step],
    else_steps=[fail_step], 
)

In [54]:
session4_pipeline = Pipeline(
    name="mnist-session4-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
        evaluation_destination,
        model_approval_status,
        accuracy_threshold,
    ],
    steps=[
        preprocess_step, 
        tuning_step if USE_TUNING_STEP else training_step, 
        evaluation_step,
        condition_step
    ],
)

In [55]:
# session4_pipeline.upsert(role_arn=role)
# execution = session4_pipeline.start()

In [56]:
# import json
# import boto3
# key = 'arn:aws:kms:ap-southeast-1:972688410650:key/fcc541cf-7260-42a6-8135-38f301b62c98'
# s3 = boto3.resource('s3')

# obj = s3.Object(BUCKET, key)
# data = json.load(obj.get()['Body']) 

In [57]:
def get_latest_approved_model_package(model_package_group_name):
    """
    Returns the latest approved model package registered under the 
    specified model package group.
    """
    try:
        # We can use the boto3 SageMaker's API to list the existing
        # model packages with the specified name. We only care about
        # approved models.
        response = sagemaker_client.list_model_packages(
            ModelPackageGroupName=model_package_group_name,
            ModelApprovalStatus="Approved",
            SortBy="CreationTime",
            MaxResults=100,
        )
        approved_packages = response["ModelPackageSummaryList"]

        if len(approved_packages) == 0:
            print(f"No approved model pacakages for \"{model_package_group_name}\"")
            return None

        # At this point we identified the latest approved model,
        # so we can return it.
        print(f"Latest approved model package: {approved_packages[0]['ModelPackageArn']}")
        return approved_packages[0]

    except ClientError as e:
        print(e.response["Error"]["Message"])
        raise Exception(e.response["Error"]["Message"])
        

# We can now use the function to get the latest approved model from the Model Registry.
approved_model_package = get_latest_approved_model_package(model_package_group_name)

model_description = None
if approved_model_package:
    approved_model_package_arn = approved_model_package["ModelPackageArn"]

    model_description = sagemaker_client.describe_model_package(
        ModelPackageName=approved_model_package_arn
    )

# model_description

Latest approved model package: arn:aws:sagemaker:ap-southeast-1:972688410650:model-package/mnist-model-package-group/2


In [58]:
# TESTING OF ENDPOINT BUT THIS DIDNT WORK. CANT CALL PYTORCH MODEL PROPERLY WITH SAGEMAKER
# model_package = ModelPackage(
#     model_package_arn=approved_model_package_arn, 
#     sagemaker_session=sagemaker_session,
#     role=role, 
# )

# endpoint_name = "mnist-endpoint"

# model_package.deploy(
#     endpoint_name=endpoint_name,
#     initial_instance_count=1, 
#     instance_type="ml.m5.large",
# )

## Trying to create custom mnist dataset

In [59]:
import random

In [60]:
payload = ""
for i in range(28 * 28):
    if (i != 783):
        payload = payload + str(random.randint(0,254)) + ", "
    else:
        payload = payload + str(random.randint(0,254))
print (payload)

73, 239, 10, 205, 216, 211, 24, 93, 112, 76, 163, 134, 249, 156, 100, 78, 244, 47, 42, 149, 54, 250, 129, 2, 131, 148, 241, 188, 115, 70, 67, 12, 209, 89, 194, 210, 181, 214, 96, 175, 224, 205, 9, 192, 83, 46, 15, 123, 117, 66, 75, 142, 7, 57, 77, 113, 19, 216, 155, 178, 183, 10, 46, 244, 140, 235, 168, 54, 207, 252, 116, 198, 133, 221, 60, 240, 134, 114, 166, 93, 8, 8, 71, 82, 224, 155, 50, 213, 137, 217, 135, 34, 131, 14, 50, 208, 21, 119, 128, 250, 56, 173, 6, 191, 248, 177, 222, 21, 19, 103, 81, 170, 247, 138, 82, 211, 65, 14, 151, 94, 157, 135, 207, 59, 34, 121, 57, 248, 31, 252, 157, 14, 104, 99, 90, 82, 183, 136, 189, 11, 228, 4, 2, 218, 97, 139, 207, 9, 72, 4, 105, 162, 51, 86, 26, 117, 181, 153, 205, 224, 67, 196, 233, 233, 32, 13, 124, 134, 119, 180, 159, 199, 160, 29, 100, 34, 104, 193, 153, 16, 34, 18, 123, 205, 118, 31, 229, 239, 182, 167, 194, 200, 208, 91, 75, 206, 86, 162, 109, 44, 224, 251, 236, 144, 23, 166, 201, 237, 147, 108, 12, 70, 111, 213, 6, 182, 165, 207, 16, 

In [61]:
# predictor.delete_endpoint()

## Preparing Inference Code

In [62]:
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.tensorflow.model import TensorFlowPredictor
from sagemaker.pytorch.model import PyTorchModel
from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput, LambdaOutputTypeEnum
from sagemaker.workflow.parameters import ParameterBoolean
from sagemaker.lambda_helper import Lambda
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.s3 import S3Downloader

In [63]:
CODE_FOLDER = Path(MNIST_FOLDER) / "code"

In [64]:
%%writefile $CODE_FOLDER/inference.py

import os
import json
import boto3
import requests
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms as transforms
from PIL import Image
import logging

from pickle import load
from pathlib import Path

PIPELINE_FILE = Path("/tmp") / "pipeline.pkl"

# By default, we will read the S3 location of the Scikit-Learn
# pipeline from an environment variable that we'll configure
# when registering the model.
PIPELINE_S3_LOCATION = os.environ.get("PIPELINE_S3_LOCATION", None)
logger = logging.getLogger(__name__)

s3 = boto3.resource("s3")

def handler(data, context, model_dir, pipeline_s3_location=PIPELINE_S3_LOCATION):
    """
    This is the entrypoint that will be called by SageMaker when the endpoint
    receives a request. You can see more information at 
    https://github.com/aws/sagemaker-tensorflow-serving-container.
    """
    
    print("Handling endpoint request")
    download_pipeline(pipeline_s3_location)
    input_data = input_fn(data)
    model = model_fn(model_dir)
    predictions = predict_fn(input_data, model)
    output_json = output_fn(predictions)
    
    return output_json


def download_pipeline(pipeline_s3_location):
    """
    This function will download the Scikit-Learn pipeline from S3
    if it doesn't already exist. We need the pipeline to pre-process
    the data before we run it through the model.
    """
    
    s3_parts = pipeline_s3_location.split('/', 3)
    bucket = s3_parts[2]
    key = s3_parts[3]
    
    if not PIPELINE_FILE.exists():
        s3.Bucket(bucket).download_file(f"{key}/pipeline.pkl", str(PIPELINE_FILE))
    
    print ("Pipeline Downloaded")
    
# Define the neural network model
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 10),
        )
        self.softmax = nn.Softmax(dim=1)
        # self.accuracy = MulticlassAccuracy(num_classes=10)
        self.val_acc = 0

    def forward(self, input_data):
        x = self.flatten(input_data)
        logits = self.linear_relu_stack(x)
        predictions = self.softmax(logits)
        return predictions
    
    def validation_accuracy(self, inputs, vals):
        pred = self(inputs).detach().numpy()
        predictions = np.argmax(pred, axis=-1)
        self.val_acc = accuracy_score(vals, predictions)
        # self.log('val_acc', self.accuracy.item())
        return self.val_acc

def model_fn(model_dir):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    logger.info('Loading model')
    model = Net()
    print (model_dir)
    with open(os.path.join(model_dir, 'model.pth'), 'rb') as f:
        model.load_state_dict(torch.load(f))
    model.to(device).eval()
    logger.info('Successfully loaded model')
    return model

def input_fn(request_body, content_type = 'application/json'):
    logger.info('Deserializing input data')
    if content_type == 'image/png':
        
        # Convert image to tensor
        image = Image.open(Path(os.getcwd()) / "sample_image.png")
        
        #Define transform to resize image to 28x28 and convert PIL image to Torch tensor
        image_transform = transforms.Compose([
            transforms.Resize(size = (28,28)),
            transforms.PILToTensor()
        ])
        img_tensor = image_transform(image)
        img_tensor.to(dtype = torch.int64)
        
        return img_tensor
    
    elif content_type == 'application/json':
        input_data = json.loads(request_body)
        url = input_data['url']
        logger.info(f'Image url: {url}')
        image = Image.open(requests.get(url, stream=True).raw)
        
        #Define transform to resize image to 28x28 and convert PIL image to Torch tensor
        image_transform = transforms.Compose([
            transforms.Resize(size = (28,28)),
            transforms.PILToTensor()
        ])
        img_tensor = image_transform(image)
        img_tensor.to(dtype = torch.int64)
        
        return img_tensor
    
    raise Exception(f'Requested unsupported ContentType in content_type {content_type}')

def predict_fn(input_data, model):
    logger.info('Generating prediction')
    if torch.cuda.is_available():
        input_data = input_data.view(1,28,28).cuda()
    else:
        input_data = input_data.view(1,28,28)
    
    with torch.no_grad():
        model.eval()
        out = model(input_data).detach().numpy()
        predictions = np.argmax(out, axis=-1)
        return predictions
    
def output_fn(prediction_outout, accept = 'application/json'):
    logger.info('Serializing generated output')
    results = []
    for i in range(len(prediction_output)):
        results.append(prediction_output[i])
    if accept == 'application/json':
        return json.dumps(result), accept
    raise Exception(f'Requested unsupported content type in accept: {accept}')

# # FOR LOCAL TESTING || JSON CONTAINING URL OF IMAGE
# url_dict = {"url" : "https://conx.readthedocs.io/en/latest/_images/MNIST_57_0.png"}
# json_object = json.dumps(url_dict)

# input_fn(json_object)

Overwriting /root/ml.school/mnist/code/inference.py


In [65]:
%%writefile $CODE_FOLDER/requirements.txt

numpy==1.19.5
pandas==1.2.5
scikit-learn==0.23.2
torch==1.13.1

Overwriting /root/ml.school/mnist/code/requirements.txt


### Testing end point

In [66]:
# pytorch_model = PyTorchModel(model_data='s3://mlschool01/root/ml.school/mnist/model/model.tar.gz', role=role, entry_point='inference.py', framework_version='1.13.1', py_version="py39", source_dir=str(CODE_FOLDER))
# predictor = pytorch_model.deploy(instance_type='ml.m5.large', initial_instance_count=1)

### Registering model after writing inference script

In [67]:
model = PyTorchModel(
    model_data=model_data,
    framework_version="1.13.1",
    sagemaker_session=PipelineSession(),
    env={
        "PIPELINE_S3_LOCATION": preprocessor_destination,
    },
    py_version="py39",
    role=role,
    entry_point = "inference.py",
    source_dir=str(CODE_FOLDER),
)

register_model_step = ModelStep(
    name="register-model",
    step_args=model.register(
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
        approval_status=model_approval_status,
        content_types=["application/json"],
        response_types=["application/json"],
        inference_instances=["ml.m5.large"],
        transform_instances=["ml.m5.large"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="PYTORCH",
        framework_version="1.13.1",
    ),
)

### Data capture configuration

In [68]:
data_capture_enabled = ParameterBoolean(
    name="data_capture_enabled",
    default_value=True,
)

data_capture_percentage = ParameterInteger(
    name="data_capture_percentage",
    default_value=100,
)

data_capture_destination = ParameterString(
    name="data_capture_destination",
    default_value=f"{S3_FILEPATH}/monitoring/data-capture",
)

### Deploying model

In [69]:
%%writefile $MNIST_FOLDER/lambda.py

import os
import json
import boto3

sagemaker = boto3.client("sagemaker")

def lambda_handler(event, context):
    model_package_arn = event["model_package_arn"]
    model_name = event["model_name"]
    endpoint_name = event["endpoint_name"]
    endpoint_config_name = event["endpoint_config_name"]
    data_capture_enabled = event["data_capture_enabled"]
    data_capture_percentage = event["data_capture_percentage"]
    data_capture_destination = event["data_capture_destination"]
    role = event["role"]
    
    
    # The first step is to create a new model. We will use the
    # ARN of the model we registered.
    sagemaker.create_model(
        ModelName=model_name, 
        ExecutionRoleArn=role, 
        Containers=[{
            "ModelPackageName": model_package_arn
        }] 
    )

    # Then, we need to create an Endpoint Configuration.
    # Here is where we specify the hardware we need for the
    # endpoint and the Data Capture configuration. 
    sagemaker.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "ModelName": model_name,
                "InstanceType": "ml.m5.large",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "VariantName": "AllTraffic",
            }
        ],
        DataCaptureConfig={
            "EnableCapture": data_capture_enabled,
            "InitialSamplingPercentage": data_capture_percentage,
            "DestinationS3Uri": data_capture_destination,
            "CaptureOptions": [
                {
                    'CaptureMode': "Input"
                },
                {
                    'CaptureMode': "Output"
                },
            ],
            "CaptureContentTypeHeader": {
                "JsonContentTypes": [
                    "application/json",
                    "application/octect-stream"
                ]
            }
        },
    )

    # Finally, we can create the new Endpoint using the
    # Endpoint Configuration we created above.
    sagemaker.create_endpoint(
        EndpointName=endpoint_name, 
        EndpointConfigName=endpoint_config_name,
    )
    
    return {
        "statusCode": 200,
        "body": json.dumps("Endpoint deployed successfully"),
        "model_name": model_name,
    }

Overwriting /root/ml.school/mnist/lambda.py


### Creating Lamda Role

In [70]:
def create_lambda_role(role_name):
    try:
        response = iam_client.create_role(
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "lambda.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }),
            Description="Lambda Pipeline Role"
        )

        role_arn = response['Role']['Arn']

        iam_client.attach_role_policy(
            RoleName=role_name,
            PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        )

        iam_client.attach_role_policy(
            PolicyArn='arn:aws:iam::aws:policy/AmazonSageMakerFullAccess',
            RoleName=role_name
        )

        return role_arn

    except iam_client.exceptions.EntityAlreadyExistsException:
        response = iam_client.get_role(RoleName=role_name)
        return response['Role']['Arn']
    

lambda_role = create_lambda_role("lambda-pipeline-role")

### Set up Lambda Step

In [71]:
function_name = f"deploy-endpoint-fn-{time.strftime('%m%d%H%M%S', time.localtime())}"
endpoint_name = "mnist-endpoint"

deploy_step = LambdaStep(
    name="deploy",
    lambda_func=Lambda(
        function_name=function_name,
        execution_role_arn=lambda_role,
        script=str(MNIST_FOLDER / "lambda.py"),
        handler="lambda.lambda_handler",
        timeout=600,
        memory_size=3008,
    ),
    inputs={
        # We can use the timestamp_signature pipeline parameter
        # to add a suffix to the model and the endpoint configuration
        # and avoid name collisions.
        "model_name": Join(on="-", values=["mnist-model", timestamp_signature]),
        "endpoint_config_name": Join(on="-", values=["mnist-endpoint-config", timestamp_signature]),

        # We use the ARN of the model we registered to
        # deploy it to the endpoint.
        "model_package_arn": register_model_step.properties.ModelPackageArn,

        "endpoint_name": endpoint_name,
        "data_capture_enabled": data_capture_enabled,
        "data_capture_percentage": data_capture_percentage,
        "data_capture_destination": data_capture_destination,
        "role": role,
    },
    outputs=[
        LambdaOutput(output_name="model_name", output_type=LambdaOutputTypeEnum.String)
    ]
)

In [72]:
condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition_gte],
    if_steps=[
        register_model_step, deploy_step
    ],
    else_steps=[fail_step], 
)

In [73]:
session5_pipeline = Pipeline(
    name="mnist-session5-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
        timestamp_signature,
        evaluation_destination,
        model_approval_status,
        accuracy_threshold,
        data_capture_enabled,
        data_capture_percentage,
        data_capture_destination,
    ],
    steps=[
        preprocess_step, 
        tuning_step if USE_TUNING_STEP else training_step, 
        evaluation_step,
        condition_step
    ],
)

In [74]:
# session5_pipeline.upsert(role_arn=role)

# execution = session5_pipeline.start(parameters=dict(
#     timestamp_signature=time.strftime("%m%d%H%M%S", time.localtime())
# ))

# Data Monitoring

In [75]:
import random

from time import sleep
from datetime import datetime
from threading import Thread, Event

from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep, ModelQualityCheckConfig
from sagemaker.workflow.execution_variables import ExecutionVariables

from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.workflow.parameters import ParameterBoolean
from sagemaker.inputs import CreateModelInput, TransformInput
from sagemaker.model import Model
from sagemaker.transformer import Transformer
from sagemaker.workflow.steps import CreateModelStep, TransformStep
from sagemaker.model_monitor import CronExpressionGenerator, EndpointInput, DefaultModelMonitor, ModelQualityMonitor, DatasetFormat
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.s3 import S3Uploader

## Data Quality Configuration

In [76]:
data_quality_skip_check = ParameterBoolean(
    name="data_quality_skip_check", 
    default_value=True
)

data_quality_register_new_baseline = ParameterBoolean(
    name="data_quality_register_new_baseline", 
    default_value=True
)

data_quality_supplied_baseline_statistics = ParameterString(
    name="data_quality_supplied_baseline_statistics", 
    default_value=""
)

data_quality_supplied_baseline_constraints = ParameterString(
    name="data_quality_supplied_baseline_constraints", 
    default_value=""
)

## Set up data quality check step

In [77]:
check_job_config = CheckJobConfig(
    instance_type="ml.t3.xlarge",
    instance_count=1,
    sagemaker_session=sagemaker_session,
    volume_size_in_gb=20,
    role=role,
)

data_quality_check_config = DataQualityCheckConfig(
    # We will use the train dataset we generated during the preprocessing 
    # step to generate the data quality baseline.
    baseline_dataset=preprocess_step.properties.ProcessingOutputConfig.Outputs["train-baseline"].S3Output.S3Uri,
    
    dataset_format=DatasetFormat.json(lines=True),
    output_s3_uri=Join(on='/', values=[S3_FILEPATH, "monitoring", "data-quality"]),
)

data_quality_check_step = QualityCheckStep(
    name="data-quality-check",
    check_job_config=check_job_config,
    quality_check_config=data_quality_check_config,
    skip_check=data_quality_skip_check,
    register_new_baseline=data_quality_register_new_baseline,
    supplied_baseline_statistics=data_quality_supplied_baseline_statistics,
    supplied_baseline_constraints=data_quality_supplied_baseline_constraints,
    model_package_group_name=model_package_group_name,
    cache_config=cache_config
)

# Model Quality Configuration

In [78]:
model_quality_skip_check = ParameterBoolean(
    name="model_quality_skip_check", 
    default_value=True
)

model_quality_register_new_baseline = ParameterBoolean(
    name="model_quality_register_new_baseline", 
    default_value=True
)

model_quality_supplied_baseline_statistics = ParameterString(
    name="model_quality_supplied_baseline_statistics", 
    default_value=""
)

model_quality_supplied_baseline_constraints = ParameterString(
    name="model_quality_supplied_baseline_constraints", 
    default_value=""
)

## Create Model

In [79]:
model = PyTorchModel(
    model_data=model_data,
    framework_version="1.13.1",
    sagemaker_session=PipelineSession(),
    env={
        "PIPELINE_S3_LOCATION": preprocessor_destination,
    },
    py_version="py39",
    role=role,
    entry_point = "inference.py",
    source_dir=str(CODE_FOLDER),
)

create_model_step = ModelStep(
    name="create-model",
    step_args=model.create(instance_type="ml.m5.large"),
)

## Generate baseline predictions

In [94]:
transformer = Transformer(
    # We can specify the name of the model the Batch Transform Job will 
    # use by using the property from the Model Step that created the model.
    model_name=create_model_step.properties.ModelName,
    instance_type="ml.m4.xlarge",
    instance_count=1,
    
    # The baseline set that we generated in the preprocessing step
    # is in JSON format, where every line is a JSON sample.
    accept="application/json",
    strategy="SingleRecord",
    assemble_with="Line",
    
    output_path=f"{S3_FILEPATH}/transform",
)

transform_step = TransformStep(
    name="transform",
    transformer=transformer,
    inputs=TransformInput(
        
        # We will use the test dataset we generated during the preprocessing 
        # step to run it through the model and generate predictions.
        data=preprocess_step.properties.ProcessingOutputConfig.Outputs["test-baseline"].S3Output.S3Uri,

        join_source="Input",
        content_type="application/json",
        split_type="Line",
    ),
    cache_config=cache_config
)

## Set up model quality check step

In [95]:
model_quality_check_config = ModelQualityCheckConfig(
    # We are going to use the output of the Transform Step to generate
    # the model quality baseline.
    baseline_dataset=transform_step.properties.TransformOutput.S3OutputPath,
    
    dataset_format=DatasetFormat.json(lines=True),
    output_s3_uri=Join(on='/', values=[S3_FILEPATH, "monitoring", "model-quality"]),
    
    # We need to specify the problem type and the fields where the prediction
    # and groundtruth are so the process knows how to interpret the results.
    problem_type="MulticlassClassification",
    inference_attribute="$.SageMakerOutput.prediction",
    ground_truth_attribute="groundtruth",
)

model_quality_check_step = QualityCheckStep(
    name="model-quality-check",
    check_job_config=check_job_config,
    quality_check_config=model_quality_check_config,
    skip_check=model_quality_skip_check,
    register_new_baseline=model_quality_register_new_baseline,
    supplied_baseline_statistics=model_quality_supplied_baseline_statistics,
    supplied_baseline_constraints=model_quality_supplied_baseline_constraints,
    model_package_group_name=model_package_group_name,
    cache_config=cache_config
)

## Set up model metrics

In [96]:
model_metrics = ModelMetrics(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_check_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_check_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    
    model_constraints=MetricsSource(
        s3_uri=model_quality_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
)

drift_check_baselines = DriftCheckBaselines(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_check_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_check_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_constraints=MetricsSource(
        s3_uri=model_quality_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    )
)

## Register Model

In [97]:
register_model_step = ModelStep(
    name="register-model",
    step_args=model.register(
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
        approval_status=model_approval_status,
        content_types=["application/json"],
        response_types=["application/json"],
        inference_instances=["ml.m5.large"],
        transform_instances=["ml.m5.large"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="PYTORCH",
        framework_version="1.13.1",
    ),
)

## Set up condition step

In [98]:
condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition_gte],
    if_steps=[
        create_model_step, 
        transform_step, 
        model_quality_check_step, 
        register_model_step,
        deploy_step
    ],
    else_steps=[fail_step], 
)

## Run pipeline

In [99]:
session6_pipeline = Pipeline(
    name="mnist-session6-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
        timestamp_signature,
        data_capture_enabled,
        data_capture_percentage,
        data_capture_destination,
        data_quality_skip_check,
        data_quality_register_new_baseline,
        data_quality_supplied_baseline_statistics,
        data_quality_supplied_baseline_constraints,
        model_quality_skip_check,
        model_quality_register_new_baseline,
        model_quality_supplied_baseline_statistics,
        model_quality_supplied_baseline_constraints,
        evaluation_destination,
        model_approval_status,
        accuracy_threshold,
    ],
    steps=[
        preprocess_step, 
        data_quality_check_step,
        tuning_step if USE_TUNING_STEP else training_step, 
        evaluation_step,
        condition_step
    ],
)

In [100]:
session6_pipeline.upsert(role_arn=role)
execution = session6_pipeline.start(parameters=dict(
    timestamp_signature=time.strftime("%m%d%H%M%S", time.localtime())
))

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
