# Pipeline of Digits

This is a starting notebook for solving the "Pipeline of Digits" assignment.


This notebook was created by [Santiago L. Valdarrama](https://twitter.com/svpino) as part of the [Machine Learning School](https://www.ml.school) program.

In [2]:
%load_ext autoreload
%autoreload 2

In [12]:
import boto3
import tempfile
import sagemaker
import pandas as pd

from pathlib import Path

role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

## Creating the S3 Bucket

Let's create an S3 bucket where you will upload all the information generated by the pipeline. Make sure you set `BUCKET` to the name of the bucket you want to use. This name has to be unique.

If you want to create a bucket in a region other than `us-east-1`, use this command instead:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region
```

The `LocationConstraint` argument should specify the region where you want to create the bucket.

In [5]:
BUCKET = "maurizio-s3-bucket-mnist"

!aws s3api create-bucket --bucket $BUCKET

{
    "Location": "/maurizio-s3-bucket-mnist"
}


## Loading the dataset

We have two CSV files containing the MNIST dataset. These files come from the [MNIST in CSV](https://www.kaggle.com/datasets/oddrationale/mnist-in-csv) Kaggle dataset.

The `mnist_train.csv` file contains 60,000 training examples and labels. The `mnist_test.csv` contains 10,000 test examples and labels. Each row consists of 785 values: the first value is the label (a number from 0 to 9) and the remaining 784 values are the pixel values (a number from 0 to 255).

Let's extract the `dataset.tar.gz` file.

In [5]:
MNIST_FOLDER = "mnist"
DATASET_FOLDER = Path("dataset")

!tar -xvzf dataset.tar.gz --no-same-owner

dataset/
dataset/mnist_test.csv
dataset/mnist_train.csv


Let's load the first 10 rows of the test set.

In [6]:
df = pd.read_csv(DATASET_FOLDER / "mnist_train.csv")
df.head(10)

Unnamed: 0,label,1x1,1x2,1x3,1x4,1x5,1x6,1x7,1x8,1x9,...,28x19,28x20,28x21,28x22,28x23,28x24,28x25,28x26,28x27,28x28
0,5,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,9,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,2,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
6,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
7,3,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
8,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
9,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


## Uploading dataset to S3

In [8]:
S3_FILEPATH = f"s3://{BUCKET}/{MNIST_FOLDER}"


TRAIN_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_train.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

TEST_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_test.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

print(f"Train set S3 location: {TRAIN_SET_S3_URI}")
print(f"Test set S3 location: {TEST_SET_S3_URI}")

Train set S3 location: s3://maurizio-s3-bucket-mnist/mnist/mnist_train.csv
Test set S3 location: s3://maurizio-s3-bucket-mnist/mnist/mnist_test.csv


### Preprocessing the data

Let's create a script to do preprocessing the original dataset.

The script should split the data into train, validation, and test sets so we can later train and evaluate a model. We will use the Pytorch framework to preprocess the data and later on save the pipeline that we use to preprocess the data to use it during inference time.

The train set will use the top 70% of the data. The validation dataset takes 15% of the remaining train dataset.

Save the pipeline to later scale production data using the same parameters we learned on the train dataset.


In [10]:
%%writefile {DATASET_FOLDER}/preprocessor.py

import os
import numpy as np
import pandas as pd
import tempfile

from pathlib import Path
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from pickle import dump

# pre-process the data, record the size, shape of the data
# number of classes devide the train and validation dataset
# and record the position of the baseline, etc....

# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIR = "/opt/ml/processing"
DATA_FILEPATH = Path(BASE_DIR) / "input" / "data.csv"

def save_splits(base_dir, train, validation, test):
    """
    The goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """
    
    train_path = Path(base_dir) / "train" 
    validation_path = Path(base_dir) / "validation" 
    test_path = Path(base_dir) / "test"
    
    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)
    
    pd.Dataframe(train).to_csv(train_path/"train.csv", header=False, index=False)
    pd.Dataframe(validation).to_csv(validation_path/"validation.csv", header=False, index=False)
    pd.Dataframe(test).to_csv(test_path/"train.csv", header=False, index=False)

def save_pipeline(base_dir, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_dir) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", 'wb'))
    
def generate_baseline_dataset(split_name, base_dir, X, y):
    """
    To monitor the data and the quality of our model we need to compare the 
    production quality and results against a baseline. To create those baselines, 
    we need to use a dataset to compute statistics and constraints. That dataset
    should contain information in the same format as expected by the production
    endpoint. This function will generate a baseline dataset and save it to 
    disk so we can later use it.
    
    """
    baseline_path = Path(base_dir) / f"{split_name}-baseline" 
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = X.copy()
    
    # The baseline dataset needs a column containing the groundtruth.
    df["groundtruth"] = y
    df["groundtruth"] = df["groundtruth"].values.astype(str)
    
    # We will use the baseline dataset to generate baselines
    # for monitoring data and model quality. To simplify the process, 
    # we don't want to include any NaN rows.
    df = df.dropna()

    df.to_json(baseline_path / f"{split_name}-baseline.json", orient='records', lines=True)
    

def preprocess(base_dir, data_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train and validation set.
    """
    
    df = pd.read_csv(data_filepath)
    
    #numerical_columns = [column for column in df.columns if df[column].dtype in ["int64", "float64"]]
    
    train, validation = np.split(df, [int(.7 * len(df))])
    
    X_train = pd.DataFrame(train)
    X_validation = pd.DataFrame(validation)
    X_test = pd.DataFrame(test)
    
    y_train = X_train[0]
    y_validation = X_validation[0]
    y_test = X_test[0]
    
    label_encoder = LabelEncoder()
    
    # Let's generate a dataset that we can later use to compute
    # baseline statistics and constraints about the data that we
    # used to train our model.
    generate_baseline_dataset("train", base_dir, X_train, y_train)
    
    # To generate baseline constraints about the quality of the
    # model's predictions, we will use the test set.
    generate_baseline_dataset("test", base_dir, X_test, y_test)
    
    save_splits(base_dir, train, validation, test)
    save_pipeline(base_dir, pipeline=preprocessor)
        

if __name__ == "__main__":
    preprocess(BASE_DIR, DATA_FILEPATH)
    

Overwriting dataset/preprocessor.py


### Step 4 - Testing the Preprocessing Script
We can now load the script we just created and run it locally to ensure it outputs every file we need.

We will set up a SageMaker Processing Job to run this script, but we always want to test the code locally. In this case, we can call the preprocess() function with the local directory and the local copy of the dataset.

In [13]:
from dataset.preprocessor import preprocess

def print_baseline(split_name):
    print()
    print(f"Baseline {split_name}:")
    with open(Path(directory) / f"{split_name}-baseline" / f"{split_name}-baseline.json") as baseline:
        lines = [next(baseline) for _ in range(5)]
        
    for l in lines:
        print(l[:-1])
    

with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_dir=directory, 
        data_filepath=LOCAL_FILEPATH
    )
    
    print(f"Folders: {os.listdir(directory)}")
    
    print_baseline("train")
    print_baseline("test")

NameError: name 'LOCAL_FILEPATH' is not defined