# Deploy Inference Pipeline with Scikit-learn preprocessor and classifier to AWS with SageMaker

The goal is to deploy the ML Pipeline[Preprocessor, classifaier] to the cloud. For that I will use `Sagemaker Scikit-learn container` for Preprocessor and `SageMaker XGBoost algorithm container` for classifaier.

## Define Sagemaker session and role

In [62]:
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()

# S3 prefix
bucket = sagemaker_session.default_bucket()
prefix = "Scikit-DefaultPredictor-pipeline"

In [63]:
bucket, prefix, role

('sagemaker-eu-central-1-918203234730',
 'Scikit-DefaultPredictor-pipeline',
 'arn:aws:iam::918203234730:role/administrator')

## Preprocessing data and training the model
### Upload the data for training

In [64]:
WORK_DIRECTORY = "data"

train_input = sagemaker_session.upload_data(
    path="{}/{}".format(WORK_DIRECTORY, "train.csv"),
    bucket=bucket,
    key_prefix="{}/{}".format(prefix, "train"),
)

#### Training script

In [18]:
# featurizer_remote.py
import pandas as pd
import numpy as np
import os
import joblib
from io import StringIO
import argparse
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer

from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker)

feature_columns_names = ['uuid','default', 'account_amount_added_12_24m', 'account_days_in_dc_12_24m',
                       'account_days_in_rem_12_24m', 'account_days_in_term_12_24m',
                       'account_incoming_debt_vs_paid_0_24m', 'account_status',
                       'account_worst_status_0_3m', 'account_worst_status_12_24m',
                       'account_worst_status_3_6m', 'account_worst_status_6_12m', 'age',
                       'avg_payment_span_0_12m', 'avg_payment_span_0_3m', 'merchant_category',
                       'merchant_group', 'has_paid', 'max_paid_inv_0_12m',
                       'max_paid_inv_0_24m', 'name_in_email',
                       'num_active_div_by_paid_inv_0_12m', 'num_active_inv',
                       'num_arch_dc_0_12m', 'num_arch_dc_12_24m', 'num_arch_ok_0_12m',
                       'num_arch_ok_12_24m', 'num_arch_rem_0_12m',
                       'num_arch_written_off_0_12m', 'num_arch_written_off_12_24m',
                       'num_unpaid_bills', 'status_last_archived_0_24m',
                       'status_2nd_last_archived_0_24m', 'status_3rd_last_archived_0_24m',
                       'status_max_archived_0_6_months', 'status_max_archived_0_12_months',
                       'status_max_archived_0_24_months', 'recovery_debt',
                       'sum_capital_paid_account_0_12m', 'sum_capital_paid_account_12_24m',
                       'sum_paid_inv_0_12m', 'time_hours', 'worst_status_active_inv']

feature_columns_dtype = {'uuid': "object", 'default': "float64", 'account_amount_added_12_24m' : "int64", 'account_days_in_dc_12_24m' : "float64", 'account_days_in_rem_12_24m' : "float64", 
                        'account_days_in_term_12_24m' : "float64", 'account_incoming_debt_vs_paid_0_24m' : "float64", 'account_status' : "float64", 
                        'account_worst_status_0_3m' : "float64", 'account_worst_status_12_24m' : "float64", 'account_worst_status_3_6m' : "float64", 
                        'account_worst_status_6_12m' : "float64", 'age' : "int64", 'avg_payment_span_0_12m' : "float64", 'avg_payment_span_0_3m' : "float64", 
                        'merchant_category' : "object", 'merchant_group' : "object", 'has_paid' : "bool", 'max_paid_inv_0_12m' : "float64", 
                        'max_paid_inv_0_24m' : "float64", 'name_in_email' : "object", 'num_active_div_by_paid_inv_0_12m' : "float64", 'num_active_inv' : "int64", 
                        'num_arch_dc_0_12m' : "int64", 'num_arch_dc_12_24m' : "int64", 'num_arch_ok_0_12m' : "int64", 'num_arch_ok_12_24m' : "int64", 
                        'num_arch_rem_0_12m' : "int64", 'num_arch_written_off_0_12m' : "float64", 'num_arch_written_off_12_24m' : "float64", 'num_unpaid_bills' : "int64", 
                        'status_last_archived_0_24m' : "int64", 'status_2nd_last_archived_0_24m' : "int64", 'status_3rd_last_archived_0_24m' : "int64", 
                        'status_max_archived_0_6_months' : "int64", 'status_max_archived_0_12_months' : "int64", 'status_max_archived_0_24_months' : "int64", 
                        'recovery_debt' : "int64", 'sum_capital_paid_account_0_12m' : "int64", 'sum_capital_paid_account_12_24m' : "int64", 'sum_paid_inv_0_12m' : "int64", 
                        'time_hours' : "float64", 'worst_status_active_inv' : "float64"}

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

    args = parser.parse_args()
    
    # Take the set of files and read them all into a single pandas dataframe
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))

    raw_data = [pd.read_csv(
                file, 
                header=None, 
                sep = ";",
                names=feature_columns_names) for file in input_files]

    train = pd.concat(raw_data)
    train = train.set_index("uuid")

    # define features
    categorical_low_card = [col for col in train.columns if col.find("status") != -1]
    categorical_high_card = ["merchant_category", "merchant_group", "name_in_email"]
    binary = ["has_paid"]
    numerical = list(set(train.columns) - set(categorical_low_card + categorical_high_card + binary) - set(['default']))

    # Preprocessing pipeline
    # Numeric features transforming Pipeline
    num_transformer = Pipeline(steps=[
            ('num_imputer', SimpleImputer(strategy="median")),
            ('scaler', StandardScaler())
        ])
    # Categorical features transforming Pipeline
    cat_low_card_transformer = Pipeline(steps=[
        ('cat_low_imputer', SimpleImputer(strategy="most_frequent"))
    ])

    cat_high_card_transformer = Pipeline(steps=[
        ('cat_high_imputer', OrdinalEncoder())
    ])

    # Binary features transforming Pipeline
    binary_transformer = Pipeline(steps=[
        ('ordinal', OrdinalEncoder())
    ])
    
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', num_transformer, numerical),
            ('cat_low_card', cat_low_card_transformer, categorical_low_card),
            ('cat_high_card', cat_high_card_transformer, categorical_high_card),
            ('binary', binary_transformer, binary)
        ])
    
    preprocessor.fit(train)
    
    joblib.dump(preprocessor, os.path.join(args.model_dir, "preprocessor.joblib"))

    print("saved model!")

    
def input_fn(input_data, content_type):
    """Parse input data
    """
    if content_type == 'text/csv':
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data))

        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))


def output_fn(prediction, accept):
    """Format prediction output
    """
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append({"features": row})

        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))
        
        
def predict_fn(input_data, model):
    """Preprocess input data

    The model is a preprocessor = use .transform().
    """
    features = model.transform(input_data)

    if label_column in input_data:
        # Return the label (as the first column) and the set of features.
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # Return only the set of features
        return features

def model_fn(model_dir):
    """Deserialize fitted model
    """
    preprocessor = joblib.load(os.path.join(model_dir, "preprocessor.joblib"))
    return preprocessor

## Create SageMaker Scikit Estimator 

In [65]:
from sagemaker.sklearn.estimator import SKLearn

FRAMEWORK_VERSION = "0.23-1"
script_path = "featurizer_remote.py"

sklearn_preprocessor = SKLearn(
    entry_point=script_path,
    role=role,
    framework_version=FRAMEWORK_VERSION,
    instance_type="ml.c4.xlarge",
    sagemaker_session=sagemaker_session,
)

In [66]:
# Fit preprocessor
sklearn_preprocessor.fit({"train": train_input})

2021-08-24 16:06:56 Starting - Starting the training job...ProfilerReport-1629821209: InProgress
..............................
2021-08-24 16:12:28 Starting - Launching requested ML instances..................
2021-08-24 16:15:43 Starting - Preparing the instances for training..............................
2021-08-24 16:20:44 Downloading - Downloading input data............
2021-08-24 16:22:47 Training - Downloading the training image..[34m2021-08-24 16:23:10,176 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2021-08-24 16:23:10,178 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2021-08-24 16:23:10,188 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2021-08-24 16:23:10,675 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2021-08-24 16:23:10,686 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpu


2021-08-24 16:23:45 Training - Training image download completed. Training in progress.
2021-08-24 16:24:25 Uploading - Uploading generated training model

KeyboardInterrupt: 

## Batch transform training data
Use fitted preprocessor and batch transform to directly preprocess the raw data and store right back into s3

In [None]:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
    instance_count=1, instance_type="ml.m4.xlarge", assemble_with="Line", accept="text/csv"
)

In [None]:
# Preprocess training input
transformer.transform(train_input, content_type="text/csv")
print("Waiting for transform job: " + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path

## Fit a Tree-based Model with the preprocessed data

Take the preprocessed training data and fit XGBoost model

In [None]:
import boto3
from sagemaker.debugger import Rule, rule_configs
from sagemaker.session import TrainingInput


s3_output_location='s3://{}/{}/{}'.format(bucket, prefix, 'xgboost_model')

container=sagemaker.image_uris.retrieve("xgboost", boto3.Session().region_name, "1.2-1")
print(container)

# Create an XGBoost estimator using the sagemaker.estimator.Estimator class

xgb_estimator=sagemaker.estimator.Estimator(
    image_uri=container,
    role=role,
    instance_count=1,
    instance_type='ml.m4.xlarge',
    train_volume_size=5,
    output_path=s3_output_location,
    sagemaker_session=sagemaker_session,
    rules=[Rule.sagemaker(rule_configs.create_xgboost_report())]
)

xgb_estimator.set_hyperparameters(
    max_depth = 5,
    eta = 0.2,
    gamma = 4,
    min_child_weight = 6,
    subsample = 0.7,
    objective = "binary:logistic",
    num_round = 1000
)

In [None]:
from sagemaker.session import TrainingInput

xgb_train_data = sagemaker.inputs.TrainingInput(
    preprocessed_train,
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)

data_channels = {"train": xgb_train_data}

xgb_estimator.fit(inputs=data_channels, logs=True)

## Serial Inference Pipeline with Scikit preprocessor and xgb

Configure pipeline model with the fitted Scikit-learn inference model and the fitted xgd model

In [None]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

scikit_learn_inferencee_model = sklearn_preprocessor.create_model()
xgb_model = xgb_estimator.create_model()

model_name = "inference-pipeline-" + timestamp_prefix
endpoint_name = "inference-pipeline-ep-" + timestamp_prefix
xgb_predictor = PipelineModel(
    name=model_name, role=role, models=[scikit_learn_inferencee_model, xgb_model]
)

## Deploy model

In [None]:
from sagemaker.serializers import CSVSerializer

# Serialize input data to a CSV-formatted string as XGBoost algorithm accepts input files in CSV format

xgb_predictor.deploy(initial_instance_count=1, 
                instance_type="ml.c4.xlarge", 
                endpoint_name=endpoint_name,
                serializer=CSVSerializer())

## Make a request to the pipeline endpoint

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer

datapoint = "0, 0.0, 0.0, 0.0, 0.00913, 1.0, 1.0, null, 1.0, 1.0, 20, 6.4, 5.25, 'Youthful Shoes & Clothing', 'Clothing & Shoes', true, 7225.0, 7225.0, 'F', 0.0, 0, 0, 0, 5, 0, 0, 0.0, 0.0, 1, 1, 1, 1, 1, 1, 1, 0, 8815, 0, 27157, 19.8955, null"
actual_default = 0
predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=sagemaker_session, serializer=CSVSerializer()
)

print(predictor.predict(datapoint))