# PRACTISE CODE OF SESSIONS


## Session 1 - Introduction and Initial Setup


- there are 4 pipelines as part of this framework
  - training
  - inference
  - deployment
  - monitoring


In [39]:
%load_ext autoreload
%autoreload 2
%load_ext dotenv
%dotenv

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [40]:
import json
import logging
import sys
from pathlib import Path
import ipytest

In [43]:
CODE_FOLDER = Path("code")
sys.path.extend([f"./{CODE_FOLDER}"])

# Folder at which the code scripts are saved
CODE_FOLDER

PosixPath('code')

In [44]:
DATA_FILEPATH = "penguins.csv"
ipytest.autoconfig(raise_on_error=True)
logging.getLogger("sagemaker.config").setLevel(logging.ERROR)

In [85]:
LOCAL_MODE = True  # to run the pipelines locally
# LOCAL_MODE = False  # to run the pipelines on Sagemaker

In [86]:
# load environment variables
import os

bucket = os.environ["BUCKET"]
role = os.environ["ROLE"]

COMET_API_KEY = os.environ.get("COMET_API_KEY", None)
# None means if the value is not available, then None is assigned
COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME", None)

In [87]:
# # show the values
# bucket, role, COMET_API_KEY, COMET_PROJECT_NAME,

In [88]:
# get system OSX architecture
architecture = !(uname -m)
IS_ARM64_ARCHITECTURE = architecture[0] == "arm64"
IS_ARM64_ARCHITECTURE

True

In [89]:
# configuration dictionary
import sagemaker
from sagemaker.workflow.pipeline_context import LocalPipelineSession, PipelineSession

pipeline_session = PipelineSession(default_bucket=bucket) if not LOCAL_MODE else None

In [90]:
# check if LOCAL_MODE active
LOCAL_MODE

True

In [91]:
# use LOCAL_MODE w/ docker config if LOCAL_MODE is true
if LOCAL_MODE:
    config = {
        "session": LocalPipelineSession(default_bucket=bucket),
        "instance_type": "local",
        # use this docker image if arm64 architecture
        "image": (
            ("sagemaker-tensorflow-toolkit-local") if IS_ARM64_ARCHITECTURE else None
        ),
    }
else:
    config = {
        "session": pipeline_session,
        "instance_type": "ml.m5.xlarge",
        "image": None,
    }

# the below specific settings refer to the sagemaker
config["framework_version"] = "2.12"
config["py_version"] = "py310"

# check the configuration
config

{'session': <sagemaker.workflow.pipeline_context.LocalPipelineSession at 0x32f0f7f10>,
 'instance_type': 'local',
 'image': 'sagemaker-tensorflow-toolkit-local',
 'framework_version': '2.12',
 'py_version': 'py310'}

In [95]:
# define the s3 bucket for this project
import boto3

S3_LOCATION = f"s3://{bucket}/penguins"

print(bucket)
print(S3_LOCATION)

bucket-mlschool-sample
s3://bucket-mlschool-sample/penguins


In [104]:
sagemaker_session = sagemaker.session.Session()
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
region = boto3.Session().region_name

region
# sagemaker_session.account_id() # shows the account id

'us-east-1'

## Session 3 - Splitting and Transforming the Data


### Step 1 - Creating the Preprocessing Script


In [107]:
CODE_FOLDER

PosixPath('code')

In [108]:
f"./{CODE_FOLDER}/processing"

'./code/processing'

In [109]:
(CODE_FOLDER / "processing").mkdir(parents=True, exist_ok=True)
sys.path.extend([f"./{CODE_FOLDER}/processing"])

In [156]:
%%writefile {CODE_FOLDER}/processing/script.py
# | filename: script.py

import os
import tarfile
import tempfile
from pathlib import Path

import joblib
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler

def _read_data_from_input_csv_files(base_directory):
    """Read the data from the input CSV files.

    This function reads every CSV file available and concatenates them
    into a single dataframe.
    """
    input_directory = Path(base_directory)/"input"
    files = list(input_directory.glob("*.csv"))

    if len(files) == 0:
        message = f"There are no CSV files in {input_directory.as_posix()}/"
        raise ValueError(message)

    raw_data = [pd.read_csv(file) for file in files]
    df = pd.concat(raw_data)

    # shuffle data
    return df.sample(frac=1, random_state=42)

def _split_data(df):
    """Split the data into train, validation, and test."""
    df_train, temp = train_test_split(df, test_size=0.3)
    df_validation, df_test = train_test_split(temp, test_size=0.5)

    return df_train, df_validation, df_test

def _save_train_baseline(base_directory, df_train):
    """Save the untransformed training data to disk.

    We will need the training data to compute a baseline to
    determine the quality of the data that the model receives
    when deployed.
    """
    baseline_path = Path(base_directory)/"train-baseline"
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = df_train.copy().dropna()
    df = df.drop("species", axis=1)

    df.to_csv(baseline_path / "train-baseline.csv", header=True, index=False)

def _save_test_baseline(base_directory, df_test):
    """Save the untransformed test data to disk.

    We will need the test data to compute a baseline to
    determine the quality of the model predictions when deployed.
    """
    baseline_path = Path(base_directory) / "test-baseline"
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = df_test.copy().dropna()
    df.to_csv(baseline_path / "test-baseline.csv", header=False, index=False)

def _save_splits(
    base_directory,
    X_train,
    y_train,
    X_validation,
    y_validation,
    X_test,
    y_test
):
    """Save data splits to disk.

    This function concatenates the transformed features and the target variable,
    saves each one of the split sets to disk.
    """
    # combine X,y
    train = np.concatenate((X_train, y_train), axis=1)
    validation = np.concatenate((X_validation, y_validation), axis=1)
    test = np.concatenate((X_test, y_test), axis=1)

    # define paths to save these combined datasets
    train_path = Path(base_directory) / "train"
    validation_path = Path(base_directory) / "validation"
    test_path = Path(base_directory) / "test"

    # create directories to save these datasets
    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    # save these datasets
    pd.DataFrame(train).to_csv(train_path/"train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path/"validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path/"test.csv", header=False,index=False)

def _save_model(base_directory, target_transformer, features_transformer):
    """Save the Scikit-Learn transformation pipeline.

    This function creates a model.tar.gz file that
    contains the two transformation pipelines
    we built to transform the data.
    """
    with tempfile.TemporaryDirectory() as directory:
        joblib.dump(target_transformer, Path(directory) / "target.joblib")
        joblib.dump(features_transformer, Path(directory) / "features.joblib")

        model_path = Path(base_directory) / "model"
        model_path.mkdir(parents=True, exist_ok=True)

        with tarfile.open(f"{(model_path / 'model.tar.gz').as_posix()}", "w:gz") as tar:
            tar.add(Path(directory) / "target.joblib", arcname="target.joblib")
            tar.add(Path(directory) / "features.joblib", arcname="features.joblib")

# pipeline to load, split, save data, preprocess, save model
def preprocess(base_directory):
    """Load the supplied data, split it and transform it."""
    df = _read_data_from_input_csv_files(base_directory)

    # define transformers
    numeric_transformer = make_pipeline(
        SimpleImputer(strategy="mean"),
        StandardScaler(),
    )

    categorical_transformer = make_pipeline(
        SimpleImputer(strategy="most_frequent"),
        OneHotEncoder(),
    )

    # use transformers
    features_transformer = ColumnTransformer(
        transformers=[
            ("numeric", numeric_transformer, make_column_selector(dtype_exclude="object")),
            ("categorical", categorical_transformer, ["island"]),
        ],
    )

    target_transformer = ColumnTransformer(
        transformers=[("species", OrdinalEncoder(), [0])]
    )

    # split data before applying transformers
    df_train, df_validation, df_test = _split_data(df)

    _save_train_baseline(base_directory, df_train)
    _save_test_baseline(base_directory, df_test)

    # get target label - apply transformer
    y_train = target_transformer.fit_transform(
        np.array(df_train["species"].values).reshape(-1,1),
    )

    y_validation = target_transformer.transform(
        np.array(df_validation["species"].values).reshape(-1,1),
    )

    y_test = target_transformer.transform(
        np.array(df_test["species"].values).reshape(-1,1)
    )

    # get datasets without target variables
    df_train = df_train.drop("species", axis=1)
    df_validation = df_validation.drop("species", axis=1)
    df_test = df_test.drop("species", axis=1)

    # get input variables - apply transformers
    X_train = features_transformer.fit_transform(df_train) # noqa: N806
    X_validation = features_transformer.transform(df_validation) # noqa: N806
    X_test = features_transformer.transform(df_test) # noqa: N806

    # save the transformed datasets
    _save_splits(
        base_directory,
        X_train,
        y_train,
        X_validation,
        y_validation,
        X_test,
        y_test
    )

    # save the transformer models to apply for later use
    _save_model(base_directory, target_transformer, features_transformer)

if __name__ == "__main__":
    preprocess(base_directory="/opt/ml/processing")

Overwriting code/processing/script.py


In [None]:
# test the script
# start @ 35 cell