### SageMaker Preprocessor with Scikit-learn Pipeline

In [173]:
%%writefile sklearn_preprocessor.py

import sys
import io
import os
import shutil

import argparse
import json
import joblib

import numpy as np
import pandas as pd

from sklearn.compose import make_column_transformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder

from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker
)

# Since we got a headerless CSV file we specify the column names here.
feature_columns_names = [
    "Loan_ID",
    "Gender",
    "Married",
    "Dependents",
    "Education",
    "Self_Employed",
    "ApplicationIncome",
    "CoapplicationIncome",
    "LoanAmount",
    "Loan_Amount_Term",
    "Credit_History",
    "Property_Area",
]

label_column = "Loan_Status"
# Loan_ID	Gender	Married	Dependents	Education	Self_Employed	ApplicantIncome	CoapplicantIncome	LoanAmount	Loan_Amount_Term	Credit_History	Property_Area
columns_dtype = {
    "Loan_ID": "object",
    "ApplicationIncome": "float64",
    "CoApplicationIncome": "float64",
    "LoanAmount": "float64",
    "Loan_Amount_Term": "float64",
    "Gender": "category",
    "Married": "category",
    "Dependents": "category",
    "Education": "category",
    "Credit_History": "category",
    "Property_Area": "category",
    "Loan_Status": "category",
}

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    
    # Sagemaker specific arguments. Defaults are set in environment variables.
    parser.add_argument("--output-data-dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
    parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
    parser.add_argument("--train", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
    
    args = parser.parse_args()
    
    # Take the set of files and read them all into a single pandas dataframe
    input_files = [os.path.join(args.train, file) for file in os.listdir(args.train)]
    
    if len(input_files) == 0:
        raise ValueError(("There are no files in {}.\n" +
                          "This is usually indicates that channel ({}) was incorrectly specified, \n" +
                          "the data specification in S3 was incorrectly specified or the role specified\n" +
                          "does not have permission to access the data.").format(args.train, "train"))
    
    raw_data = [pd.read_csv(
        file,
        header=None,
        names=[label_column] + feature_columns_names,
        dtype=columns_dtype) for file in input_files
    ]
    
    concat_data = pd.concat(raw_data)
    
    # Labels should not preprocessed. predict_fn will reinsert the label after featurizing.
    concat_data = concat_data.drop(columns=[label_column])
    
    # Define preprocessing pipeline
    preprocessor = make_column_transformer(
        (Pipeline([
            ("imputer", SimpleImputer(strategy="mean")), 
        ]), ["ApplicationIncome", "CoapplicationIncome", "LoanAmount", "Loan_Amount_Term"]),
        (Pipeline([
            ("imputer", SimpleImputer(strategy="most_frequent")), 
            ("encoder", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=1024)),
        ]), ["Gender", "Married", "Dependents", "Education"]),
        (Pipeline([
            ("imputer", SimpleImputer(strategy="most_frequent")), 
            ("encoder", OneHotEncoder(handle_unknown="ignore")),
        ]), ["Property_Area"]),
    )
    
    preprocessor.fit(concat_data)
    
    joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))
    
    print("Model Saved!")
    
def input_fn(request_body, content_type):
    """Parse input data payload
    
    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    if content_type == "text/csv":
        # Read the raw input data as CSV.
        df = pd.read_csv(io.StringIO(request_body), header=None)
        
        if len(df.columns) == len(feature_columns_names) + 1:
            df.columns = [label_column] + feature_columns_names
        elif len(df.columns) == len(feature_columns_names):
            df.columns = feature_columns_names
            
        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))
        

def output_fn(prediction, accept):
    """Format prediction output
    
    The default content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as content_type so the
    container can read the response payload correctly
    """
    
    if accept == "application/json":
        json_output = {"instances": [{"features": row} for row in prediction.tolist()]}
        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == "text/csv":
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise ValueError("{} accept type is not supported by this script".format(accept))
        

def predict_fn(input_data, model):
    """Preprocess input data

    We implement this because the default predict_fn function uses .predict(), but our model is a preprocessor
    so we want to use .transform().
    
    The output is returned in the following order:
    
        rest of features either one hot encoded or standardized
    """
    features = model.transform(input_data)
    
    if label_column in input_data:
        # Return the label (as the first column) and the set of the features.
        input_data[label_column] = input_data[label_column].replace({"Y": 1, "N": 0})
        return np.column_stack([input_data[label_column], features])
    else:
        # Return only the set of features
        return features
    
    
def model_fn(model_dir):
    """Deserialize fitted model
    """
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor

Writing sklearn_preprocessor.py


### SageMaker Estimator with Scikit-learn Random Forest Classifier

In [175]:
%%writefile sklearn_estimator.py

import os
import io
import json
import joblib
import argparse
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier

# Import SageMaker modules for handling environment and I/O
from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker
)

if __name__ == "__main__":
    # Define command-line arguments
    parser = argparse.ArgumentParser()
    
    # Define default values for training hyperparameters
    parser.add_argument('--n-estimators', type=int, default=100) # Number of trees
    parser.add_argument('--max-depth', type=int, default=10) # Maximum depth of trees
    
    # SageMaker environment variables for I/O
    parser.add_argument('--output-data-dir', type=str, default=os.environ.get('SM_OUTPUT_DATA_DIR'))
    parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    
    # Parse command-line arguments
    args = parser.parse_args()
    
    # Construct input file paths
    input_files = [os.path.join(args.train, file) for file in os.listdir(args.train)]
    
    # Check if any input files are found
    if len(input_files) == 0:
        raise ValueError(("There are no files in {}.\n" +
                          "This usually indicates that the channel ({}) was incorrectly specified, \n" +
                          "the data specification in S3 was incorrectly specified, or the specified role\n" +
                          "does not have permission to access the data.").format(args.train, "train"))
    
    # Read and concatenate the raw data
    raw_data = [pd.read_csv(file, header=None) for file in input_files]
    concat_data = pd.concat(raw_data)
    
    # Split features and labels
    X_train = concat_data.iloc[:, 1:]  # Features
    y_train = concat_data.iloc[:, 0]   # Labels
    
    # Initialize and train the RandomForestClassifier
    clf = RandomForestClassifier(
        n_estimators=args.n_estimators,
        max_depth=args.max_depth
    )
    clf.fit(X_train, y_train)
    
    # Save the trained model to the model directory
    joblib.dump(clf, os.path.join(args.model_dir, "model.joblib"))
    

# SageMaker serving functions
def input_fn(request_body, content_type):
    """Parse input data payload
    
    Parses the incoming request body and returns the input data for inference.
    """
    if content_type == "application/json":
        # Parse JSON content
        request_body = json.loads(request_body)
        features = [instance["features"] for instance in request_body["instances"]]
        features = np.asarray(features)
        return features
    if content_type == "text/csv":
        # Parse CSV content
        df = pd.read_csv(io.StringIO(request_body), header=None)
        return df
    else:
        raise ValueError("{} not supported by script".format(content_type))
    
    
def output_fn(prediction, accept):
    """Format prediction output
    
    Formats the prediction output and sets the appropriate content type.
    """
    if accept == "application/json":
        # Format output as JSON
        json_output = {"instances": [{"feature": row} for row in prediction.tolist()]}
        return worker.Response(json.dumps(json_output), mimetype=accept)
    if accept == "text/csv":
        # Encode prediction as CSV
        return worker.Response(encoders.encode(prediction, content_type), mimetype=accept)
    else:
        raise ValueError("{} accept type is not supported by script".format(accept))
        
        
def predict_fn(input_data, model):
    """Perform prediction
    
    Uses the provided model to make predictions on input data.
    """
    prediction = model.predict(input_data)
    return prediction


def model_fn(model_dir):
    """Perform deserialization
    
    Loads the trained model from the specified directory.
    """
    model = joblib.load(os.path.join(model_dir, "model.joblib"))
    return model


Overwriting sklearn_estimator.py


### SageMaker SKLearn Preprocessor

In [176]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.estimator import SKLearn

sagemaker_session = sagemaker.Session()
role = get_execution_role()
region = sagemaker_session.boto_region_name
train_input = 's3://farukcan-loan-eligibility/train'

sklearn_processor = SKLearn(
    entry_point="sklearn_preprocessor.py",
    framework_version="1.2-1",
    role=role,
    instance_type="ml.m5.xlarge",
)

sklearn_processor.fit({"train": train_input})

INFO:sagemaker:Creating training-job with name: sagemaker-scikit-learn-2024-04-04-13-07-45-721


2024-04-04 13:07:46 Starting - Starting the training job...
2024-04-04 13:08:04 Starting - Preparing the instances for training...
2024-04-04 13:08:47 Downloading - Downloading the training image......
2024-04-04 13:09:32 Training - Training image download completed. Training in progress..[34m2024-04-04 13:09:40,946 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2024-04-04 13:09:40,950 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2024-04-04 13:09:40,953 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2024-04-04 13:09:40,970 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2024-04-04 13:09:41,201 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2024-04-04 13:09:41,205 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m

### Preprocessing Data with SKLearn Transformer

In [177]:
sklearn_transformer = sklearn_processor.transformer(
    instance_count=1, instance_type="ml.m5.xlarge", assemble_with="Line", accept="text/csv",
)

sklearn_transformer.transform(train_input, content_type="text/csv")
print("Waiting for transform job: " + sklearn_transformer.latest_transform_job.job_name)
sklearn_transformer.wait()

preprocessed_train = sklearn_transformer.output_path

INFO:sagemaker:Creating model with name: sagemaker-scikit-learn-2024-04-04-13-10-28-500
INFO:sagemaker:Creating transform job with name: sagemaker-scikit-learn-2024-04-04-13-10-29-250


..............................[34m2024-04-04 13:15:32,108 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[34m2024-04-04 13:15:32,111 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[34m2024-04-04 13:15:32,111 INFO - sagemaker-containers - nginx config: [0m
[34mworker_processes auto;[0m
[34mdaemon off;[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log  /dev/stderr;[0m
[34mworker_rlimit_nofile 4096;[0m
[34mevents {
  worker_connections 2048;[0m
[34m}[0m
[34mhttp {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  access_log /dev/stdout combined;
  upstream gunicorn {
    server unix:/tmp/gunicorn.sock;
  }
  server {
    listen 8080 deferred;
    client_max_body_size 0;
    keepalive_timeout 3;
    location ~ ^/(ping|invocations|execution-parameters) {
      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header Host $http_host;
      proxy_redirect off;


### Training SKLearn Estimator with Preprocessed Data

In [179]:
from sagemaker import get_execution_role
from sagemaker.sklearn.estimator import SKLearn

# Get the execution role
role = get_execution_role()

# Configure the SKLearn estimator
sklearn_estimator = SKLearn(
    entry_point='sklearn_estimator.py',  # Name of the entry point script
    framework_version='1.0-1',  # SKLearn framework version
    role=role,
    instance_type='ml.m5.xlarge',  # Instance type for training
    hyperparameters={'n-estimators': 100, 'max-depth': 8},  # Hyperparameters for training
)

sklearn_estimator.fit({"train": preprocessed_train})

INFO:sagemaker:Creating training-job with name: sagemaker-scikit-learn-2024-04-04-13-20-19-880


2024-04-04 13:20:20 Starting - Starting the training job...
2024-04-04 13:20:37 Starting - Preparing the instances for training...
2024-04-04 13:21:15 Downloading - Downloading the training image......
2024-04-04 13:22:18 Training - Training image download completed. Training in progress.
2024-04-04 13:22:18 Uploading - Uploading generated training model[34m2024-04-04 13:22:09,594 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2024-04-04 13:22:09,597 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2024-04-04 13:22:09,599 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2024-04-04 13:22:09,613 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2024-04-04 13:22:09,830 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2024-04-04 13:22:09,833 sagemaker-training-toolkit INFO 

### Deploying Inference Pipeline with SKLearn Processor and Estimator Models

In [180]:
from sagemaker.pipeline import PipelineModel
from sagemaker.utils import name_from_base

sklearn_processor_model = sklearn_processor.create_model()
sklearn_estimator_model = sklearn_estimator.create_model()

model_name = name_from_base("inference-pipeline")
endpoint_name = name_from_base("inference-pipeline-ep")

sm_model = PipelineModel(
    name=model_name, role=role, models=[sklearn_processor_model, sklearn_estimator_model]
)

sm_model.deploy(
    initial_instance_count=1, 
    instance_type="ml.m5.xlarge", 
    endpoint_name=endpoint_name,
)

INFO:sagemaker:Creating model with name: inference-pipeline-2024-04-04-13-23-03-692
INFO:sagemaker:Creating endpoint-config with name inference-pipeline-ep-2024-04-04-13-23-03-692
INFO:sagemaker:Creating endpoint with name inference-pipeline-ep-2024-04-04-13-23-03-692


-------!

### Making Predictions Using Deployed Inference Pipeline

In [181]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer

predictor = Predictor(
    endpoint_name=endpoint_name, serializer=CSVSerializer(), sagemaker_session=sagemaker_session,
)

payload = "LP001003, Male, Yes, 1, Graduate, No, 4583, 1508.0, 128.0, 360.0, 1.0, Rural"

prediction = predictor.predict(payload)
print(prediction)

b'{"instances": [{"feature": 1.0}]}'
