# Time to create the preprocessing pipeline required to feed the model

In [4]:
!pip install numpy
!pip install -q --upgrade sagemaker
!pip install -q --upgrade pip
!pip install -q --upgrade awscli

%load_ext autoreload
%autoreload 2

import sys
from pathlib import Path

CODE_FOLDER = Path("code")
sys.path.append(f"./{CODE_FOLDER}")

from constants import *
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
distributed 2022.7.0 requires tornado<6.2,>=6.0.3, but you have tornado 6.3.2 which is incompatible.[0m[31m
[0m



## This is inspired by the <a href="https://www.kaggle.com/code/damienbeneschi/mnist-eda-preprocessing-classifiers"> MNIST - EDA, Preprocessing & Classifiers by Damien Beneschi in Kaggle <a>

In [64]:
%%writefile {CODE_FOLDER}/preprocessor.py
import os
import numpy as np
import json
import numpy as np
import tempfile
import pandas as pd
from pathlib import Path
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, TransformerMixin
from pickle import dump

BASE_DIRECTORY = "/opt/ml/processing"
DATA_FILEPATH = Path(BASE_DIRECTORY) / "input" / "mnist_train.csv" # The CSV name needs to be the same than the S3 bucket



def _save_splits(base_directory, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """

    train_path = Path(base_directory) / "train"
    validation_path = Path(base_directory) / "validation"
    test_path = Path(base_directory) / "test"

    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        validation_path / "validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)

    
def _save_pipeline(base_directory, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_directory) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", "wb"))
    
# def _save_baseline(base_directory, df_train, df_test):
#     """
#     During the data and quality monitoring steps, we will need a baseline
#     to compute constraints and statistics. This function will save that
#     baseline to the disk.
#     """

#     for split, data in [("train", df_train), ("test", df_test)]:
#         print(split)
#         baseline_path = Path(base_directory) / f"{split}-baseline"
#         baseline_path.mkdir(parents=True, exist_ok=True)

#         df = data.copy()
        
#         df.to_json(
#             baseline_path / f"{split}-baseline.json", orient="records", lines=True
#         )


class PixelTransformer(BaseEstimator, TransformerMixin):
    
    def __init__(self):
        print('init called')
    
    def fit(self):
        print('fit called')
        return self
        
    def transform(self,pixels_df):
        """Removes from the images the pixels that have a constant intensity value,
        either always black (0) or white (255)
        Returns the cleared dataset & the list of the removed pixels (columns)"""

        #Remove the pixels that are always black to compute faster
        changing_pixels_df = pixels_df.loc[:]

        #Pixels with max value =0 are pixels that never change
        for col in pixels_df:
            if changing_pixels_df[col].max() == 0:
                changing_pixels_df.drop(columns=[col], inplace=True)

        #Same with pixels with min=255 (white pixels)
        for col in changing_pixels_df:
            if changing_pixels_df[col].min() == 255:
                changing_pixels_df.drop(columns=[col], inplace=True)

        return changing_pixels_df

def preprocess(base_directory, data_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train,
    validation, and a test set.
    """

    """Read Data"""
    pixels_df = pd.read_csv(data_filepath)


    pipeline = Pipeline(
        steps=[
            ("pixel_transformer", PixelTransformer())
        ]
    )
    

    df = pixels_df.sample(frac=1, random_state=42)
    df_train, temp = train_test_split(df, test_size=0.3)
    df_validation, df_test = train_test_split(temp, test_size=0.5)

    y_train = df_train['label']
    y_validation = df_validation['label']
    y_test = df_test['label']


    #_save_baseline(base_directory, df_train, df_test)

    df_train = df_train.drop(["label"], axis=1)
    df_validation = df_validation.drop(["label"], axis=1)
    df_test = df_test.drop(["label"], axis=1)
    

    X_train = pipeline.transform(df_train)
    X_validation = pipeline.transform(df_validation)
    X_test = pipeline.transform(df_test)

    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)

    _save_splits(base_directory, train, validation, test)
    _save_pipeline(base_directory, pipeline=pipeline)



if __name__ == "__main__":
    preprocess(BASE_DIRECTORY, DATA_FILEPATH)


Overwriting code/preprocessor.py


In [59]:
from preprocessor import preprocess
import tempfile
import os
DATASET_FOLDER = Path("dataset")

LOCAL_FILEPATH = Path(DATASET_FOLDER) / "mnist_train.csv"
with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        data_filepath=LOCAL_FILEPATH
    )
    
    print(f"Folders: {os.listdir(directory)}")

init called
Folders: ['train', 'validation', 'test', 'pipeline']


In [65]:
dataset_location = ParameterString(
    name="dataset_location",
    default_value="s3://mnistmlschool/mnist/mnist_train.csv",
)


pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

In [68]:
from sagemaker.workflow.pipeline_context import PipelineSession
pipeline_session = PipelineSession()

sklearn_processor = SKLearnProcessor(
    base_job_name="mnist-preprocessing",
    framework_version="0.23-1",
    instance_type="ml.t3.large", #required for this type of data - might have to check credits when running it
    instance_count=1,
    role=role,
    sagemaker_session=pipeline_session
)

preprocess_data_step = ProcessingStep(
    name="preprocess-data",
    step_args=sklearn_processor.run(
        code=f"{CODE_FOLDER}/preprocessor.py",
        inputs=[
            ProcessingInput(source=dataset_location, destination="/opt/ml/processing/input"),  
        ],
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
            ProcessingOutput(output_name="pipeline", source="/opt/ml/processing/pipeline"),
            #ProcessingOutput(output_name="classes", source="/opt/ml/processing/classes"),
            # ProcessingOutput(output_name="train-baseline", source="/opt/ml/processing/train-baseline"),
            # ProcessingOutput(output_name="test-baseline", source="/opt/ml/processing/test-baseline"),
        ]
    ),
    cache_config=cache_config
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [69]:
session1_pipeline = Pipeline(
    name="mnist-session1-pipeline",
    parameters=[
        dataset_location
    ],
    steps=[
        preprocess_data_step, 
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=pipeline_session
)

session1_pipeline.upsert(role_arn=role)
execution = session1_pipeline.start()

# Preprocessor code worked :D time to move to model creation