# Modeling on Amazon Sagemaker

This notebook demonstrates the process of training and deploying our best machine learning model (discovered locally) in the cloud using Amazon Sagemaker. We will be using Sagemaker's "script mode" and using Sagemaker's prebuilt SKLearn container for both our data preprocessing steps and our modeling. We will be creating an inference pipeline that includes the data transformation operations as well as the training/inference of the machine learning model. This inference pipeline will contain two Sagemaker "models". The first model is responsible for the data preprocessing. In Sagemaker, we deploy this much the same way as an estimator. However, we will override some of the Sagemaker functions such as ```input_fn```, ```output_fn```, and ```predict_fn``` to make sure that we are using the transform operation as opposed to predicting a value. We will fit the data preprocessor to the training data and persist the scikit-learn object(s) to S3. When new data is sent to the pipeline for inference, we load those pre-fit objects and use them to transform the data before it is sent to our estimator.

Note: In order to run this example, you will need to be using an Amazon Sagemaker notebook and your notebook will need to have proper IAM permissions to your AWS environment. This includes write access to Amazon Simple Storage Service (S3).

### Install and import libraries

In [None]:
import sagemaker
import json
import pandas as pd
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.predictor import csv_serializer, RealTimePredictor

### Configure Sagemaker environment

In [None]:
# Get a SageMaker-compatible role used by this Notebook Instance.
role = sagemaker.get_execution_role()

### Define constants

In [None]:
MODEL_HYPERPARAMS = {'max_depth': 2, 'min_samples_leaf': 7, 'min_samples_split': 2, 'n_estimators': 50}
LOCAL_MODE = True
TRAINING_INSTANCE_TYPE = 'local' if LOCAL_MODE == True else 'ml.c5.xlarge'
INFERENCE_INSTANCE_TYPE = 'ml.c5.xlarge'
sagemaker_session = sagemaker.LocalSession() if LOCAL_MODE == True else sagemaker.Session()
DATA_DIRECTORY = 'data/'
S3_BUCKET = sagemaker_session.default_bucket()
S3_PREFIX = 'student-performance-sagemaker'

### Write preprocessing and training scripts

The steps defined in the below scripts were developed during our initial exploration. See the root directory for more detail on how we developed these steps. 

In [None]:
%%writefile sagemaker_preprocessing.py
import argparse
import joblib
import os
import json
import sys
from io import StringIO
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
from sagemaker_containers import _encoders as encoders
from sagemaker_containers import _worker as worker

# the following may be required in order to get the SKLearn container to deploy properly
# see the following issue: https://github.com/aws/sagemaker-python-sdk/issues/648
# module_path = os.path.abspath('/opt/ml/code')
# if module_path not in sys.path:
#     sys.path.append(module_path)

feature_columns_names = ['school', 'sex', 'age', 'address', 'famsize', 'Pstatus', 'Medu', 'Fedu',
       'Mjob', 'Fjob', 'reason', 'guardian', 'traveltime', 'studytime',
       'failures', 'schoolsup', 'famsup', 'paid', 'activities', 'nursery',
       'higher', 'internet', 'romantic', 'famrel', 'freetime', 'goout', 'Dalc',
       'Walc', 'health', 'absences', 'G1', 'G2']

label_column = 'G3'

numeric_cols_to_keep = ['age', 'Medu', 'traveltime', 'studytime', 'failures', 'goout', 'Dalc', 'absences']
nominal_cols_to_keep = ['address', 'Fjob', 'guardian', 'higher', 'internet', 'romantic']

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    
    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])
    
    args = parser.parse_args()
    
    # Take the set of files and read them all into a single pandas dataframe
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))
    raw_data = [ pd.read_csv(
        file, 
        header=0, 
        index_col=0) for file in input_files ]
    concat_data = pd.concat(raw_data)
        
    preprocessor = ColumnTransformer(
         transformers = [("numeric", MinMaxScaler(), numeric_cols_to_keep),
                         ("nominal", OneHotEncoder(drop='if_binary', handle_unknown='error'), nominal_cols_to_keep)],
         remainder = 'drop',
         n_jobs = -1
    )
    
    preprocessor.fit(concat_data)
    
    joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))
    
    
def input_fn(input_data, content_type):
    """Parse input data payload

    This function is used by Amazon Sagemaker only during inference.
    We will only allow text/csv format. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    if content_type == 'text/csv':
        # Read the raw input data as CSV
        # We need to use StringIO because the input_data will be the actual csv data, not the filename
        df = pd.read_csv(StringIO(input_data), index_col=0)
        
        if len(df.columns) == len(feature_columns_names) + 1:
            # This is a labelled example, includes the G3 label
            df.columns = feature_columns_names + [label_column]
        elif len(df.columns) == len(feature_columns_names):
            # This is an unlabelled example.
            df.columns = feature_columns_names
        
        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))
    
    
def output_fn(prediction, accept):
    """Format prediction output

    This function is used by Amazon Sagemaker only during inference.
    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append({"features": row})

        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), accept, mimetype=accept) # we use the Sagemaker container helper classes to return the proper types
#         return json.dumps(json_output)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), accept, mimetype=accept) # we use the Sagemaker container helper classes to return the proper types
    else:
        raise RuntimeError("{} accept type is not supported by this script.".format(accept))
        
        
def predict_fn(input_data, model):
    """Preprocess input data

    We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
    so we want to use .transform().
    """
    features = model.transform(input_data)
    
    # if labels were passed in, we need to add them back to the dataset because ColumnTransformer will remove them
    if label_column in input_data:
        # Return the label (as the first column) and the set of features.
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # Return only the set of features
        return features
    
def model_fn(model_dir):
    """Deserialize fitted model
    """
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor

In [None]:
%%writefile sagemaker_modeling.py
import argparse
import joblib
import os
import json
from io import StringIO
import pandas as pd
from sklearn.ensemble import RandomForestRegressor

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    
    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])
    
    # hyperparameters sent by the client are passed as command-line arguments to the script
    parser.add_argument("--max-depth", type=int, default=2)
    parser.add_argument("--min-samples-leaf", type=int, default=7)
    parser.add_argument("--min-samples-split", type=int, default=2)
    parser.add_argument("--n-estimators", type=int, default=50)
    
    args = parser.parse_args()
    
    # Take the set of files and read them all into a single pandas dataframe
    train_dir = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(train_dir) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))
        
    input_files = list()
    for item in train_dir:
        if os.path.isdir(item): # item in train folder may be a directory with files (directory can't be read by Pandas)
            subitems = os.listdir(item)
            for file in subitems:
                input_files.append(item + '/' + file) # add full path to filename
        else:
            input_files.append(item)
    print('Input files: ', input_files)
    raw_data = [ pd.read_csv(file) for file in input_files ]
    concat_data = pd.concat(raw_data)
        
    # separate features and target variable
    X = concat_data[concat_data.columns[1:]]
    y = concat_data[concat_data.columns[0]]
    
    hyperparameters = {
        "max_depth": args.max_depth,
        "verbose": 1,  # show all logs
        "min_samples_leaf": args.min_samples_leaf,
        "min_samples_split": args.min_samples_split,
        "n_estimators": args.n_estimators,
    }
    
    print("Training the regressor")
    model = RandomForestRegressor()
    model.set_params(**hyperparameters)
    model.fit(X, y)
    
    joblib.dump(model, os.path.join(args.model_dir, "model.joblib"))

def model_fn(model_dir):
    """
    Deserialized and return fitted model
    Note that this should have the same name as the serialized model in the main method
    """
    model = joblib.load(os.path.join(model_dir, "model.joblib"))
    return model

### Upload data to S3

In [None]:
# we will read in the two data files and concatenate them into a single file to simplify the upload process to S3
math_data = pd.read_csv(filepath_or_buffer = '../data/student-mat.csv', sep=';', header=0)
port_data = pd.read_csv(filepath_or_buffer = '../data/student-por.csv', sep=';', header=0)
df = pd.concat([math_data, port_data])

In [None]:
# we make a data directory in our AWS folder containing our data destined for Sagemaker
!mkdir -p {DATA_DIRECTORY}

In [None]:
df.to_csv(path_or_buf = DATA_DIRECTORY + 'train.csv', header=True)

In [None]:
train_input = sagemaker_session.upload_data(
    path='{}/{}'.format(DATA_DIRECTORY, 'train.csv'), 
    bucket=S3_BUCKET,
    key_prefix='{}/{}'.format(S3_PREFIX, 'train'))

### Fit data preprocesser

In [None]:
preprocessing_script_path = 'sagemaker_preprocessing.py'

sklearn_preprocessor = SKLearn(
    entry_point=preprocessing_script_path,
    role=role,
    instance_type=TRAINING_INSTANCE_TYPE,
    sagemaker_session=sagemaker_session,
    py_version="py3",
    framework_version="0.23-1")

sklearn_preprocessor.fit({'train': train_input})

### Transform train data

In [None]:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
    instance_count=1, 
    instance_type=TRAINING_INSTANCE_TYPE,
    accept = 'text/csv')
#     accept = 'application/json') # since we support both csv and JSON, we can choose to accept either one

# Preprocess training input
transformer.transform(train_input, content_type='text/csv')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path

In [None]:
print('Processed data written to: ', preprocessed_train)

### Creating estimator

In [None]:
modeling_script_path = 'sagemaker_modeling.py'

sklearn_estimator = SKLearn(
    entry_point=modeling_script_path,
    role=role,
    instance_type=TRAINING_INSTANCE_TYPE,
    sagemaker_session=sagemaker_session,
    py_version="py3",
    framework_version="0.23-1")

In [None]:
preprocessed_train_data = sagemaker.inputs.TrainingInput(
    preprocessed_train, # location of preprocessed data in S3
    distribution='FullyReplicated',
    content_type='text/csv', 
    s3_data_type='S3Prefix')

data_channels = {'train': preprocessed_train_data}

In [None]:
sklearn_estimator.fit(inputs=data_channels, logs=True)

### Create Sagemaker inference pipeline

In [None]:
preprocessor_step = sklearn_preprocessor.create_model()
estimator_step = sklearn_estimator.create_model()

In [None]:
model_name = 'Student-Performance-Pipeline-Model'
endpoint_name = 'Student-Performance-Pipeline-Endpoint'

In [None]:
sm_model = PipelineModel(
    name=model_name, 
    role=role, 
    models=[
        preprocessor_step, 
        estimator_step])

In [None]:
sm_model.deploy(initial_instance_count=1, instance_type=INFERENCE_INSTANCE_TYPE, endpoint_name=endpoint_name)

### Test inference for the deployed model

In [None]:
payload = 'GP, M, 17, R, GT3, T, 3, 4, services, health, "home", "mother", 2, 2, 1, no, yes, no, yes, no, yes, yes, no, 5, 2, 2, 1, 1, 5, 5, 17, 18'

In [None]:
predictor = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer,
    content_type='text/csv',
    accept='application/json')

print(predictor.predict(payload))

### Delete endpoint

In [None]:
sm_client = sagemaker_session.boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint_name)