In [9]:
# import libraies 

import pandas as pd 
import json 
import boto3 
import pathlib 
import io 
import sagemaker 

from sagemaker.deserializers import CSVDeserializer 
from sagemaker.serializers import CSVSerializer 


from sagemaker.xgboost.estimator import XGBoost
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (ProcessingInput,ProcessingOutput,ScriptProcessor)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline 
from sagemaker.workflow.steps import (ProcessingStep,TrainingStep,CreateModelStep)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (ParameterInteger,ParameterFloat,ParameterString,ParameterBoolean)
from sagemaker.workflow.clarify_check_step import (ModelBiasCheckConfig,ClarifyCheckStep,ModelExplainabilityCheckConfig)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep 
from sagemaker.workflow.functions import JsonGet 

from sagemaker.workflow.lambda_step import (LambdaStep,LambdaOutputTypeEnum,LambdaOutput)
from sagemaker.model_metrics import (MetricsSource,ModelMetrics,FileSource)
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.image_uris import retrieve

In [None]:
# initiates aws session and client objects 
import sagemaker

# Replace "arn:aws:iam::905418308898:role/YOUR_SAGEMAKER_EXECUTION_ROLE_NAME" with the ARN of your SageMaker execution role

# Create the SageMaker session with the specified execution role ARN
sess = sagemaker.Session()
write_bucket = sess.default_bucket()
write_prefix = "fraud-detect-demo"

region = sess.boto_region_name
s3_client = boto3.client("s3",region_name = region)
sm_client = boto3.client("sagemaker",region_name=region)
sm_runtime_client = boto3.client("sagemaker-runtime")

# fetch sm excutution role 
sagemaker_role = sagemaker.get_execution_role()

#s3 location where raw data to fetch 
read_bucket ="sagemaker-sample-files"
read_prefix ="datasets/tabular/synthetic_autombile_claims"
# data fetch 
raw_data_key = f"s3://{read_bucket}/{read_prefix}"
#data upload 
processed_data_key=f"{write_prefix}/processed"
train_data_key = f"{write_prefix}/train"
validation_data_key=f"{write_prefix}/validation"
test_data_key=f"{write_prefix}/test"

# trai image 
training_image = retrive(framework="xgboost",region=region,version="1.3-1")

# Full S3 paths
claims_data_uri = f"{raw_data_key}/claims.csv"
customers_data_uri = f"{raw_data_key}/customers.csv"
output_data_uri = f"s3://{write_bucket}/{write_prefix}/"
scripts_uri = f"s3://{write_bucket}/{write_prefix}/scripts"
estimator_output_uri = f"s3://{write_bucket}/{write_prefix}/training_jobs"
processing_output_uri = f"s3://{write_bucket}/{write_prefix}/processing_jobs"
model_eval_output_uri = f"s3://{write_bucket}/{write_prefix}/model_eval"
clarify_bias_config_output_uri = f"s3://{write_bucket}/{write_prefix}/model_monitor/bias_config"
clarify_explainability_config_output_uri = f"s3://{write_bucket}/{write_prefix}/model_monitor/explainability_config"
bias_report_output_uri = f"s3://{write_bucket}/{write_prefix}/clarify_output/pipeline/bias"
explainability_report_output_uri = f"s3://{write_bucket}/{write_prefix}/clarify_output/pipeline/explainability"

In [12]:
# set names of pipeline objects 
pipeline_name ="FraudDetectXGBPipeline"
pipeline_model_name="fraud-detect-xgb-pipeline"
model_package_group_name = "fraud-detect-xgb-model-group"
base_job_name_prefix="fraud-detect"
endpoint_config_name =f"{pipeline_model_name}-endpoint-config"
endpoint_name = f"{pipeline_model_name}-endpoint"

#set the parameters 
target_col = "fraud"

#set instance types and counts 
process_instance_type = "ml.c5.xlarge"
train_instance_count = 1
train_instance_type = "ml.m4.xlarge"
predictor_instance_count = 1 
predictor_instance_type="ml.m4.xlarge"
clarify_instance_count = 1
clarify_instance_type ="ml.m4.xlarge"

In [None]:
# Set up pipeline input parameters

# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

# Set deployment instance type
deploy_instance_type_param = ParameterString(
    name="DeployInstanceType",
    default_value=predictor_instance_type,
)

# Set deployment instance count
deploy_instance_count_param = ParameterInteger(
    name="DeployInstanceCount",
    default_value=predictor_instance_count
)

# Set Clarify check instance type
clarify_instance_type_param = ParameterString(
    name="ClarifyInstanceType",
    default_value=clarify_instance_type,
)

# Set model bias check params
skip_check_model_bias_param = ParameterBoolean(
    name="SkipModelBiasCheck", 
    default_value=False
)

register_new_baseline_model_bias_param = ParameterBoolean(
    name="RegisterNewModelBiasBaseline",
    default_value=False
)

supplied_baseline_constraints_model_bias_param = ParameterString(
    name="ModelBiasSuppliedBaselineConstraints", 
    default_value=""
)

# Set model explainability check params
skip_check_model_explainability_param = ParameterBoolean(
    name="SkipModelExplainabilityCheck", 
    default_value=False
)

register_new_baseline_model_explainability_param = ParameterBoolean(
    name="RegisterNewModelExplainabilityBaseline",
    default_value=False
)

supplied_baseline_constraints_model_explainability_param = ParameterString(
    name="ModelExplainabilitySuppliedBaselineConstraints", 
    default_value=""
)

# Set model approval param
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus", default_value="Approved"
)

In [13]:
%%writefile preprocessing.py

import argparse
import pathlib
import boto3
import os
import pandas as pd
import logging
from sklearn.model_selection import train_test_split

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-ratio", type=float, default=0.8)
    parser.add_argument("--validation-ratio", type=float, default=0.1)
    parser.add_argument("--test-ratio", type=float, default=0.1)
    args, _ = parser.parse_known_args()
    logger.info("Received arguments {}".format(args))
    
    # Set local path prefix in the processing container
    local_dir = "/opt/ml/processing"    
    
    input_data_path_claims = os.path.join("/opt/ml/processing/claims", "claims.csv")
    input_data_path_customers = os.path.join("/opt/ml/processing/customers", "customers.csv")
    
    logger.info("Reading claims data from {}".format(input_data_path_claims))
    df_claims = pd.read_csv(input_data_path_claims)
    
    logger.info("Reading customers data from {}".format(input_data_path_customers))
    df_customers = pd.read_csv(input_data_path_customers)
    
    logger.debug("Formatting column names.")
    # Format column names
    df_claims = df_claims.rename({c : c.lower().strip().replace(' ', '_') for c in df_claims.columns}, axis = 1)
    df_customers = df_customers.rename({c : c.lower().strip().replace(' ', '_') for c in df_customers.columns}, axis = 1)
    
    logger.debug("Joining datasets.")
    # Join datasets
    df_data = df_claims.merge(df_customers, on = 'policy_id', how = 'left')

    # Drop selected columns not required for model building
    df_data = df_data.drop(['customer_zip'], axis = 1)
    
    # Select Ordinal columns
    ordinal_cols = ["police_report_available", "policy_liability", "customer_education"]

    # Select categorical columns and filling with na
    cat_cols_all = list(df_data.select_dtypes('object').columns)
    cat_cols = [c for c in cat_cols_all if c not in ordinal_cols]
    df_data[cat_cols] = df_data[cat_cols].fillna('na')
    
    logger.debug("One-hot encoding categorical columns.")
    # One-hot encoding categorical columns
    df_data = pd.get_dummies(df_data, columns = cat_cols)
    
    logger.debug("Encoding ordinal columns.")
    # Ordinal encoding
    mapping = {
               "Yes": "1",
               "No": "0" 
              }
    df_data['police_report_available'] = df_data['police_report_available'].map(mapping)
    df_data['police_report_available'] = df_data['police_report_available'].astype(float)

    mapping = {
               "15/30": "0",
               "25/50": "1", 
               "30/60": "2",
               "100/200": "3"
              }
    
    df_data['policy_liability'] = df_data['policy_liability'].map(mapping)
    df_data['policy_liability'] = df_data['policy_liability'].astype(float)

    mapping = {
               "Below High School": "0",
               "High School": "1", 
               "Associate": "2",
               "Bachelor": "3",
               "Advanced Degree": "4"
              }
    
    df_data['customer_education'] = df_data['customer_education'].map(mapping)
    df_data['customer_education'] = df_data['customer_education'].astype(float)
    
    df_processed = df_data.copy()
    df_processed.columns = [c.lower() for c in df_data.columns]
    df_processed = df_processed.drop(["policy_id", "customer_gender_unkown"], axis=1)
    
    # Split into train, validation, and test sets
    train_ratio = args.train_ratio
    val_ratio = args.validation_ratio
    test_ratio = args.test_ratio
    
    logger.debug("Splitting data into train, validation, and test sets")
    
    y = df_processed['fraud']
    X = df_processed.drop(['fraud'], axis = 1)
    X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=test_ratio, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=val_ratio, random_state=42)

    train_df = pd.concat([y_train, X_train], axis = 1)
    val_df = pd.concat([y_val, X_val], axis = 1)
    test_df = pd.concat([y_test, X_test], axis = 1)
    dataset_df = pd.concat([y, X], axis = 1)
    
    logger.info("Train data shape after preprocessing: {}".format(train_df.shape))
    logger.info("Validation data shape after preprocessing: {}".format(val_df.shape))
    logger.info("Test data shape after preprocessing: {}".format(test_df.shape))
    
    # Save processed datasets to the local paths in the processing container.
    # SageMaker will upload the contents of these paths to S3 bucket
    logger.debug("Writing processed datasets to container local path.")
    train_output_path = os.path.join(f"{local_dir}/train", "train.csv")
    validation_output_path = os.path.join(f"{local_dir}/val", "validation.csv")
    test_output_path = os.path.join(f"{local_dir}/test", "test.csv")
    full_processed_output_path = os.path.join(f"{local_dir}/full", "dataset.csv")

    logger.info("Saving train data to {}".format(train_output_path))
    train_df.to_csv(train_output_path, index=False)
    
    logger.info("Saving validation data to {}".format(validation_output_path))
    val_df.to_csv(validation_output_path, index=False)

    logger.info("Saving test data to {}".format(test_output_path))
    test_df.to_csv(test_output_path, index=False)
    
    logger.info("Saving full processed data to {}".format(full_processed_output_path))
    dataset_df.to_csv(full_processed_output_path, index=False)


Writing preprocessing.py


In [None]:
from sagemaker.workflow.pipeline_context import  PipelineSession 

# uploading preprocessing.script to s3 
s3_client.upload_file(
    Filename="preprocessing.py",bucket=write_bucket,key=f"{write_prefix}/scripts/preprocessing.py"

)

# define preprocessing sklearn configuration 

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role =sagemaker_role,
    instance_count = 1,
    instance_type = process_instance_type,
    base_job_name =f"{base_job_name_prefix}-processing",

)


# Define pipeline 
process_step = ProcessingStep(
    name= "DataProcessing",
    processor = sklearn_processor,
    inputs=[
        ProcessingInput(source=claims_data_uri,destination="/opt/ml/processing/claims")
        ProcessingInput(source=customers_data_uri,destination="/opt/ml/procssing/customers")
    ],
    outputs=[
        ProcessingOutput(destination=f"{processing_output_uri}/train_data",output_name="train_data",source="/opt/ml/processing/train")
        ProcessingOutput(destination=f"{processing_output_uri}/validation_data",output_name="validation_data",source="/opt/ml/processing/val")
        ProcessingOutput(destination=f"{processing_output_uri}/test_data",output_name="test_data",source="/opt/ml/processing/test")
        ProcessingOutput(destination=f"{processing_output_uri}/processed_data",output_name="processed_data",source="/opt/ml/processing/full")


    ],
    job_arguments=[
        "--train-ration","0.8",
        "--validation-ratio","0.1",
        "--test-ratio","0.1"
    ],

    code = f"s3://{write_bucket}/{write_prefix}/scripts/preprocessing.py"
)

In [None]:
%%writefile xgboost_train.py  

import argparse 
import os 
import joblib 
import json 
import pandas as pd  
import numpy as np 
import xgboost as xgb 
from sklearn.metrics import roc_auc_score 

if __name__ == "__main__":
    parser = argparse.ArgumentsParser()

    # Hyperparameters and algorithm parameters 
    parser.add_argument("--num_round",type= int ,default=100)
    parser.add_argument("--max_depth",type=int ,default=3)
    parser.add_argument("--eta",type=float,default=0.2)
    parser.add_argument("--subsample",type=float,default=0.9)
    parser.add_argument("--colsample_bytree",type=float,default=0.8)
    parser.add_argument("--objective",type=str,default="binary:logistic")
    parser.add_argument("--eval_metric",type=str,default="auc")
    parser.add_argument("--nfold",type=int,default=3)
    parser.add_argument("--early-stopping-rounds",type=int,default=3)



    #set location of data 
    parser.add_argument("--train_data_dir",type=str,default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--validation_data_dir",type=str,default=os.environ.get("SM_CHANNEL_VALIDATION"))
    parser.add_argument("--model_dir",type=str,default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--output_data_dir",type=str,default=os.environ.get("SM_OUTPUT_DATA_DIR"))


    args = parser.parse_args()

    data_train=pd.read_csv(f"{args.train_data_dir}/train.csv")
    train=data_train.drop("fraud",axis=1)
    label_train=pd.DataFrame(data_train["fraud"])
    dtrain = xgb.DMatrix(train,label=label_train)

    data_validation = pd.read_csv(f"{args.validation_data_dir}/validation.csv")
    validation = data_validation.drop("fraud",axis=1)
    label_validation = pd.DataFrame(data_validation["fraud"])
    dvalidation=xgb.DMatrix(validation,label=label_validation)

    #choose hyperparameter 
    params = {"max_depth":args.max_depth,
              "eta":args.eta,
              "objective":args.objective,
              "subsample":args.subsample,
              "colsample_bytree":args.colsample_bytree}

    num_boost_round = args.num_round 
    n_fold = args.nfold
    early_stopping_rounds = args.early_stopping_rounds

    # cross validation 
    cv_results=xgb.cv(
        params = params,
        dtrain=dtrain,
        num_boost_round=num_boost_round,
        nfold=nfold,
        early_stopping_rounds=early_stopping_rounds,
        metrics=["auc"],
        seed=42
    )

    model = xgb.train(params=params,dtrain=dtrain,num_boost_round=len(cv_results))

    train_pred = model.predict(dtrain)
    validation_pred = model.predict(dvalidation)

    train_auc = roc_auc_score(label_train,train_pred)
    validation_auc = roc_auc_score(label_validation,validation_pred)



    metric_data={"hyperparameters":params,
                 "binary_classification_metrics":{"validation:auc":{"value":validation_auc}
                                                  "train:auc":{"value":train_auc}}}




    metric_location = args.output_data_dir+"/metric.json"
    model_location = args.model_dir + "/xgboost-model"

    with open(metric_location,"w") as f:
        json.dump(metric_data,f)
    with open(model_location,"wb") as f:
        joblib.dump(model,f)
