In [None]:
!pip install -U pandas pandas-profiling scikit-learn sagemaker openpyxl

# Download data

In [None]:
import requests
import zipfile
import io
import pandas as pd


url = "https://kh3-ls-storage.s3.us-east-1.amazonaws.com/Updated Project guide data set/Propensify.zip"  # Replace with the data URL
response = requests.get(url)
if response.status_code == 200:
    
    with zipfile.ZipFile(io.BytesIO(response.content)) as zip_ref:
        zip_ref.extractall("../data/propensify")  
    
    train_df = pd.read_excel("../data/propensify/train.xlsx") # historical data
    test_df = pd.read_excel("../data/propensify/test.xlsx")  # unseen data

    # Display the DataFrames shape to verify
    print(train_df.shape, test_df.shape)

else:
    print(f"Failed to download file, status code: {response.status_code}")

## Write the training and testing datasets to S3

In [None]:
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()

print(bucket)

In [None]:
# Write the files locally
train_df.to_csv("../data/train.csv", index=False)
test_df.to_csv("../data/test.csv", index=False)

In [None]:
# Send the files to S3
train_path = session.upload_data(
    path="../data/train.csv", bucket=bucket, key_prefix="sagemaker/propensify"
)

test_path = session.upload_data(
    path="../data/test.csv", bucket=bucket, key_prefix="sagemaker/propensify"
)

print(f"Train path: {train_path}")
print(f"Test path: {test_path}")

# Fit the Pipeline Model on Sagemaker!


In [24]:
%%writefile train.py

import argparse
import os
import pandas as pd
import joblib
import numpy as np
from sklearn.preprocessing import StandardScaler, FunctionTransformer, LabelEncoder
from sklearn.compose import ColumnTransformer
import xgboost as xgb
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from imblearn.combine import SMOTEENN
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Define custom preprocessing functions

def treat_missing_values(X):
    ## Keeping only those columns that are required
    columns_to_keep = ['custAge', 'profession', 'marital', 'schooling', 'default', 'housing',
                       'loan', 'contact', 'month', 'day_of_week', 'campaign', 'pdays', 'previous',
                       'poutcome', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx',
                       'euribor3m', 'nr.employed', 'pmonths', 'pastEmail', 'responded']
    
    X = X[columns_to_keep]

    ## Feature engineering for schooling
    schooling_category = {
        'basic.4y' : 'basic',
        'basic.6y' : 'basic',
        'basic.9y' : 'basic',
        'high.school': 'high.school',
        'illiterate':'illiterate',
        'professional.course': 'professional.course',
        'university.degree':'university.degree',
        'unknown':'unknown',
    }

    X.loc[:,'schooling'] = X['schooling'].replace(schooling_category)

    ## Imputation of missing values in education based on profession
    imputation_mapping = {
        'blue-collar' : 'basic',
        'self-employed': 'illiterate',
        'technician'   : 'professional.course',
        'admin.'        : 'university.degree',
        'services'      : 'high.school',
        'management'    : 'university.degree',
        'retired'       : 'unknown',
        'entrepreneur'  : 'university.degree'
    }

    X.loc[:,'schooling'] = X['schooling'].combine_first(X['profession'].map(imputation_mapping))

    ## Imputing age values
    ## Calculate median age for each profession
    median_age = X.groupby('profession')['custAge'].median().rename('mean_age').reset_index()

    ## Create a mapping from profession to mean age
    median_age_dict = median_age.set_index('profession')['mean_age'].to_dict()

    ## Fill missing age values based on profession
    X['custAge']  = X['custAge'].fillna(X['profession'].map(median_age_dict))

    ## Impute random day for missing 'day_of_week' values
    X.loc[:,'day_of_week'] = X['day_of_week'].apply(lambda day: np.random.choice(['mon', 'tue', 'wed', 'thu', 'fri']) if pd.isna(day) else day)

    ## Drop remaining missing values
    X = X.dropna()

    return X

def label_encoding(X):
     ## Label encoding for 'profession'
    X.loc[:,'profession'] = X['profession'].map({'student': 'Dependents', 'retired': 'Dependents', 'unemployed': 'Unemployed&Unknown', 'unknown': 'Unemployed&Unknown',
                                                 'admin.': 'Working', 'blue-collar': 'Working', 'entrepreneur': 'Working', 'housemaid': 'Working',
                                                 'management': 'Working', 'self-employed': 'Working', 'services': 'Working', 'technician': 'Working'})

    ## Label encoding for 'marital'
    X.loc[:,'marital'] = X['marital'].map({'single': 'Single&Divorced', 'divorced': 'Single&Divorced', 'married': 'married', 'unknown': 'Unknown'})

    ## Label encoding for 'schooling'
    X.loc[:,'schooling'] = X['schooling'].map({'basic': 'Uneducated&BasicEducation', 'high.school': 'Uneducated&BasicEducation',
                                               'illiterate': 'Uneducated&BasicEducation', 'unknown': 'Unknown',
                                               'professional.course': 'Educated', 'university.degree': 'Educated'})

    ## Transforming month to quarter in a new column
    ## Create a dictionary mapping  month names to quarters
    month_to_quarter = {
        'jan': 'Q1', 'feb': 'Q1', 'mar': 'Q1',
        'apr': 'Q2', 'may': 'Q2', 'jun': 'Q2',
        'jul': 'Q3', 'aug': 'Q3', 'sep': 'Q3',
        'oct': 'Q4', 'nov': 'Q4', 'dec': 'Q4'
    }

    ## Map the  month names to quarters
    X['quarter'] = X['month'].map(month_to_quarter)

    ## Dropping month column
    X = X.drop(columns='month', axis=1)

    ## Label encoding for 'day_of_week'
    X.loc[:,'day_of_week'] = X['day_of_week'].map({'mon': 'WeekBeginning', 'tue': 'WeekBeginning', 'wed': 'WeekBeginning',
                                                   'thu': 'WeekEnding', 'fri': 'WeekEnding'})
    
    ## Label encoding for 'default'
    X.loc[:,'default'] = X['default'].map({'no': 'No', 'unknown': 'Yes&Unknown', 'yes': 'Yes&Unknown'})

    ## Feature engineering of other variables
    ## pdays
    conditions = [
        (X['pdays'] == 999),
        (X['pdays'] < 5),
        ((X['pdays'] >= 5) & (X['pdays'] <= 10)),
        (X['pdays'] > 10)
    ]

    choices = ['first visit', 'less than 5 days', '5 to 10 days', 'more than 10 days']

    ## Create the 'pdays_bin' column based on conditions
    X.loc[:,'pdays_bin'] = np.select(conditions, choices, default='unknown')

    ## pmonths
    conditions = [
        (X['pmonths'] == 999),
        (X['pmonths'] <= 0.2),
        (X['pmonths'] > 0.2)
    ]

    choices = ['first visit', 'less than 2 months', 'more than 2 months']

    ## Create the 'pmonths_bin' column based on conditions
    X['pmonths_bin'] = np.select(conditions, choices, default='unknown')

    ## drop pday and pmonth
    X = X.drop(['pdays', 'pmonths'], axis=1)

    return X

def feature_transformation(X):
    # Drop target and unnecessary columns
    x = X.drop(['responded'], axis=1)
    y = X['responded']

    # One-hot encode categorical columns
    X_encoded = pd.get_dummies(x, columns=['loan', 'marital', 'schooling', 'default', 'housing', 'day_of_week',
                                           'poutcome', 'pdays_bin', 'pmonths_bin', 'profession', 'quarter', 'contact'], drop_first=True)

    # Identify continuous columns for normalization
    continuous_columns = ['custAge', 'campaign', 'previous', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx',
                           'euribor3m', 'nr.employed', 'pastEmail']

    # Extract the continuous columns from X_encoded
    X_continuous = X_encoded[continuous_columns]

    # Instantiate StandardScaler
    scaler = StandardScaler()

    # Fit and transform the scaler on the continuous data
    X_continuous_normalized = scaler.fit_transform(X_continuous)

    # Replace the original continuous columns in X_encoded with the normalized ones
    X_encoded[continuous_columns] = X_continuous_normalized

    return X_encoded, y

def train_and_evaluate_model(X_encoded, y):

    #instantiate label encoder
    label_encoder = LabelEncoder()

    # encoding target for xgb 
    y_encoded = label_encoder.fit_transform(y)

    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X_encoded, y_encoded, test_size=0.2, random_state=42)

    # Apply SMOTEENN to the training data
    smoteenn = SMOTEENN(random_state=42)
    X_train_resampled, y_train_resampled = smoteenn.fit_resample(X_train, y_train)

    # Create a XGB classifier
    xgb_clf = xgb.XGBClassifier(random_state=42)

    # Train the classifier
    xgb_clf.fit(X_train_resampled, y_train_resampled)

    # Make predictions
    y_pred = xgb_clf.predict(X_test)


    # Evaluate the accuracy
    accuracy = accuracy_score(y_test, y_pred)
    print('-'*50) # simple line
    print(f"Accuracy: {accuracy:.2f}")

    # Display classification report
    print("Classification Report:")
    print(classification_report(y_test, y_pred))
    print('-'*50) # simple line

    # Get the confusion matrix
    conf_matrix = confusion_matrix(y_test, y_pred)

    plt.figure(figsize=(6,4))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['Predicted No', 'Predicted Yes'], 
                yticklabels=['Actual No', 'Actual Yes'])
    plt.title('Confusion Matrix - XGB ')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.show()

    return xgb_clf

# Model and preprocessing file names
model_file_name = "propensify_model.joblib"
preprocessor_file_name = "preprocessing_pipeline.joblib"

# Main function
def main():
    # Arguments
    parser = argparse.ArgumentParser()
    
    # Inbuilt Arguments
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    
    args, _ = parser.parse_known_args()
    
    # Load data
    train_df = pd.read_csv("...")  # TODO: Paste the S3 path to your train.csv

    # Create a preprocessing pipeline
    preprocessing_pipeline = Pipeline([
        ('missing_values', FunctionTransformer(func=treat_missing_values)),
        ('label_encoding', FunctionTransformer(func=label_encoding)),
        ('feature_transformation', FunctionTransformer(func=feature_transformation))
    ])

    # Fit transform the preprocessing pipeline
    X_train_transformed, y_train = preprocessing_pipeline.fit_transform(train_df)

    # Train and Evaluate model
    xgb_clf =  train_and_evaluate_model(X_train_transformed, y_train)

    # Save the preprocessing pipeline
    preprocessor_save_path = os.path.join(args.model_dir, preprocessor_file_name)
    joblib.dump(preprocessing_pipeline, preprocessor_save_path)
    print(f"Preprocessing pipeline saved at {preprocessor_save_path}")

    # Save the model
    model_save_path = os.path.join(args.model_dir, model_file_name)
    joblib.dump(xgb_clf, model_save_path)
    print(f"Model saved at {model_save_path}")

# Run the main function when the script runs
if __name__ == "__main__":
    main()


Overwriting train.py


In [25]:
%%writefile requirements.txt
pandas
joblib
scikit-learn
xgboost
imbalanced-learn
matplotlib
seaborn
fsspec
s3fs

Writing requirements.txt


## Check the training job 

In [None]:
# Train!
# Choose instance_type: https://aws.amazon.com/sagemaker/pricing/
# Choose framework_version: https://docs.aws.amazon.com/sagemaker/latest/dg/sklearn.html
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role

sklearn_estimator = SKLearn(
    base_job_name="xgb-pipeline-run",
    framework_version="1.2-1",
    entry_point="train.py",
    dependencies=["requirements.txt"],
    instance_count=1,
    instance_type="ml.m5.large",
    use_spot_instances=True,
    max_wait=600,
    max_run=600,
    role=get_execution_role(),
)

# Launch Training job
sklearn_estimator.fit()

In [None]:
import boto3
sm_client = boto3.client("sagemaker")

training_job_name = sklearn_estimator.latest_training_job.name

# Obtain the location of the model stored on S3 - Optional
# You can directly copy the location of the artifact from S3 also!
model_artifact = sm_client.describe_training_job(
    TrainingJobName=training_job_name
)["ModelArtifacts"]["S3ModelArtifacts"]

print(f"Training job name: {training_job_name}")
print(f"Model storage location: {model_artifact}")

## Create the inference script
- Since the model has been trained with good accuracy we can deploy it.

In [26]:
%%writefile serve.py

import os
import joblib
import pandas as pd

def model_fn(model_dir):
    """Load and return the model and preprocessing pipeline"""
    model_file_name = "propensify_model.joblib"
    preprocessor_file_name = "preprocessing_pipeline.joblib"

    # Load the model
    pipeline_model = joblib.load(os.path.join(model_dir, model_file_name))
    
    # Load the preprocessing pipeline
    preprocessing_pipeline = joblib.load(os.path.join(model_dir, preprocessor_file_name))
    
    return pipeline_model, preprocessing_pipeline

def input_fn(request_body, request_content_type):
    """Process the input json data and return the processed data.
    You can also add any input data pre-processing in this function
    """
    if request_content_type == "application/json":
        input_object = pd.read_json(request_body, lines=True)
        
        return input_object
    else:
        raise ValueError("Only application/json content type supported!")

def predict_fn(input_object, model_and_pipeline):
    """Make predictions on processed input data"""

    pipeline_model, preprocessing_pipeline = model_and_pipeline


    # Apply the same preprocessing pipeline as in training
    input_transformed , _ = preprocessing_pipeline.transform(input_object)

    predictions = pipeline_model.predict(input_transformed)
    pred_probs = pipeline_model.predict_proba(input_transformed)
    
    prediction_object = pd.DataFrame(
        {
            "prediction": predictions.tolist(),
            "pred_prob_class0": pred_probs[:, 0].tolist(),
            "pred_prob_class1": pred_probs[:, 1].tolist(),
        }
    )
    
    return prediction_object

def output_fn(prediction_object, request_content_type):
    """Post process the predictions and return as json"""
    return_object = prediction_object.to_json(orient="records", lines=True)
    
    return return_object

Writing serve.py


# Serverless Inference Endpoint
- Most cost effective option for real time inference
- Only runs when there is traffic so small delay in latency of first prediction

In [None]:
# Create the deployment
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import Session, get_execution_role

session = Session()
bucket = session.default_bucket()

training_job_name = "..." # TODO: Update with  TrainingJobName 
model_artifact = f"s3://{bucket}/{training_job_name}/output/model.tar.gz"
endpoint_name = "propensify-xgb-pipeline-serverless"

model = SKLearnModel(
    name=endpoint_name,
    framework_version="1.2-1",
    entry_point="serve.py",
    dependencies=["requirements.txt"],
    model_data=model_artifact,
    role=get_execution_role(),
)

In [None]:
# Create a config for serverless inference
from sagemaker.serverless import ServerlessInferenceConfig
serverless_config = ServerlessInferenceConfig(memory_size_in_mb=1024, max_concurrency=4)


# Deploy!
predictor = model.deploy(serverless_inference_config=serverless_config)
endpoint_name = predictor.endpoint_name
print("Endpoint name:")
print(f"{endpoint_name}")


## Invoke the model


In [None]:
# Load some data that we want to make predictions on
import pandas as pd
import boto3

test_df = pd.read_csv("...") # TODO: Paste the S3 path to your test.csv
test_df['responded'] = 99 # creating dummy target variable for preprocessing fit


#  make predictions on
X_pred = test_df.to_json(orient="records", lines=True)

# Submit to the endpoint
sm_runtime = boto3.client("sagemaker-runtime")

response = sm_runtime.invoke_endpoint(EndpointName=endpoint_name, 
                                      Body=X_pred, 
                                      ContentType="application/json", 
                                      Accept="application/json")
# Decode the response from the endpoint
response_body = response['Body']
response_str = response_body.read().decode('utf-8')
response_df = pd.read_json(response_str, lines=True)

response_df.head()


In [None]:
# saving results

new_index = response_df['new_index'].tolist()
response_df.index = new_index
test_df['Predicted_Response']= response_df['Predicted_Response']

test_df.to_csv("../data/result.csv", index=False)

## Cleanup
- Delete the endpoint
- Delete the endpoint config
- Delete the model