### AWS Sagemaker XGBoost Batch Transform with ShapValues
Purpose: generate predictions along with ShapValues
Main Steps: 
1. Create input files in S3 in a format suitable for AWS Sagemaker Batch Transform job
2. Run Batch Transform job and create a file in S3 with predictions and Shap Values

In [1]:
temp_folder='/home/kate/Research/YearBuilt/Notebooks/Experiments_v2/tmp/'
Experiments_file='/home/kate/Research/YearBuilt/Experiments/DevExperiments.xlsx'
AllExperiments_tab='Experiments'
Experiment_name='prediction'
#Experiment configuration: differenet datasets to predict from
#1.each line in the file contains the model name and set of features to built a dataset for SageMaker with only specific features
Experiment_tab='%s Models'%Experiment_name
#2.ModelFiles: each line is a model name (Model) and full model file name (ModelData - model.tar.gz) in an S3 bucket. SageMaker models will be created based on the data
Experiment_ModelFiles_tab='%s ModelFiles'%Experiment_name

Trial_name_preprocessing='%s-PreparingData'%Experiment_name
Trial_name_inference='%s-Inference'%Experiment_name



bucket='kdproperty'
path_to_data='Data'
path_to_input_data='Data/Experiments/%s/'%Experiment_name
path_to_output_data='Data/Experiments/%s/Prediction/'%Experiment_name
path_to_configuration='Config'


instance_type_inference='ml.m5.xlarge'
#input data files can be splitted to parts and the number of instances should be proportinal to speed up the process
instance_count_inference=5 


instance_type_preprocessing='ml.t3.2xlarge'
instance_count_preprocessing=1

#timeout for waiting Shap Values
transformation_job_timeout = 3600

In [2]:
import boto3
import os
import sys
import time
import pandas as pd
import numpy as np





import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

from sagemaker.xgboost.model import XGBoostModel

In [3]:
region = boto3.session.Session().region_name
role = 'arn:aws:iam::XYZ:role/service-role/AmazonSageMaker-ExecutionRole-20200819T131882'
sagemaker_session = sagemaker.session.Session(default_bucket=bucket)

In [4]:
#sys.path.append('/home/kate/Research/YearBuilt/Notebooks/Experiments')
import ExperimentsUtils as eu

In [5]:
experiments = pd.read_excel(open(Experiments_file, 'rb'), sheet_name=AllExperiments_tab)

1. Reading experiment configuration from an excel file

1.1 Target variable and data file name

In [6]:
target=experiments[experiments['Experiment']==Experiment_name]['Target'].values[0]
print('Target of models in %s experiment is %s'%(Experiment_name,target))
data_file=experiments[experiments['Experiment']==Experiment_name]['Dataset'].values[0]
print('Datafile used in %s experiment is %s'%(Experiment_name,data_file))

Target of models in prediction experiment is hasclaim_water
Datafile used in prediction experiment is dwelling_basedata_v4.csv


1.2 Features to create datafiles in S3

In [7]:
model_all_features = pd.read_excel(open(Experiments_file, 'rb'), sheet_name=Experiment_tab)
model_all_features

Unnamed: 0,Model,F1,F2,F3,F4
0,PropertyAgeFold0,cal_year-yearbuilt,cova_deductible,sqft,water_risk_3_blk


1.3 Model files (usually model.tar.gz produced from training)
Later SageMaker Models will be created  based on this info. 

In [8]:
model_files = pd.read_excel(open(Experiments_file, 'rb'), sheet_name=Experiment_ModelFiles_tab)
model_files

Unnamed: 0,Model,ModelData
0,PropertyAgeFold0,s3://kdproperty/Models/Experiments/bf2/Propert...


1.4.Verification if we have the same set of  models in both configurations

In [9]:
models_from_model_features=model_all_features['Model'].tolist()
models_from_models_files=model_files['Model'].tolist()
if len([x for x in models_from_model_features if x not in models_from_models_files])!=0:
    raise Exception('Different set of models in featuresets and files!')

2.Saving into S3 models configurations (sets of features) to be used in data preprocessing

In [10]:
Model_Config_file='%s.csv'%Experiment_name
Models_Config_path = os.path.join(temp_folder, Model_Config_file) 

model_all_features.to_csv(Models_Config_path, header=True, index=False)


input_code = sagemaker_session.upload_data(
        Models_Config_path,
        bucket=bucket,
        key_prefix=path_to_configuration
    )

In [None]:
3.Creating experiments and trials in SageMaker

In [11]:
eu.cleanup_experiment(Experiment_name)
eu.create_experiment(Experiment_name)
eu.create_trial(Experiment_name,Trial_name_preprocessing)
eu.create_trial(Experiment_name,Trial_name_inference)

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials

4. Preparing datasets for prediction

4.1 Script to create a datafile in a Processing job

In [12]:
%%writefile preprocessingDataForPrediction.py

#no headers


import argparse
import os
import pandas as pd
import numpy as np


if __name__=='__main__':
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--data_file', type=str)
    parser.add_argument('--config_file', type=str) 
    parser.add_argument('--split_to_N_parts', type=int, default=1)
    args, _ = parser.parse_known_args()    
    print('Received arguments {}'.format(args))
    
    split_to_N_parts=args.split_to_N_parts
    input_data_path = os.path.join('/opt/ml/processing/input', args.data_file)
    config_data_path = os.path.join('/opt/ml/processing/config', args.config_file)


    
    print('Reading input data from {}'.format(input_data_path))
    dataset = pd.read_csv(input_data_path, error_bad_lines=False, index_col=False)
    

    print('Reading config data from {}'.format(config_data_path))
    models = pd.read_csv(config_data_path, error_bad_lines=False, index_col=False)      

  
    #iterating thru config file with models and featureset
    for index, row in models.iterrows():
        model=row['Model']
        print (index, ': Creating featuresets for model %s'%model)
        featureset=row[1:51].tolist()
        featureset=[x for x in featureset if str(x) != 'nan']    

        X = pd.DataFrame()
        for f in featureset:
            X[f]=dataset.eval(f)        
        
        if not os.path.exists('/opt/ml/processing/output/%s'%model):
            os.makedirs('/opt/ml/processing/output/%s'%model)
        output_data_path = os.path.join('/opt/ml/processing/output/%s'%model, 'data.csv')
        if split_to_N_parts>1:
            parts = np.array_split(X, split_to_N_parts)
            for i,p in enumerate(parts):
                output_data_path = os.path.join('/opt/ml/processing/output/%s'%model, 'data_%s.csv'%i)
                p.to_csv(output_data_path,header=False,index=False)
        else:           
            X.to_csv(output_data_path, header=False, index=False)
        

    

    
    
 
    

Overwriting preprocessingDataForPrediction.py


4.2 Processors and waiting job completion

In [13]:
processors=list()



data_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type=instance_type_preprocessing,
                                     instance_count=instance_count_preprocessing)
    
data_processor.run(code='preprocessingDataForPrediction.py',
                        inputs= [ProcessingInput(input_name='data',source='s3://%s/%s/%s'%(bucket,path_to_data,data_file),destination='/opt/ml/processing/input'),
                                ProcessingInput(input_name='config',source='s3://%s/%s/%s'%(bucket,path_to_configuration,Model_Config_file),destination='/opt/ml/processing/config'),
                                ],
                        outputs=[
                                ProcessingOutput(output_name='output', source='/opt/ml/processing/output', destination='s3://%s/%s'%(bucket,path_to_input_data)),                                                          
                                ],
                        arguments=['--data_file',data_file,
                                '--config_file',Model_Config_file,
                                 '--split_to_N_parts',str(instance_count_inference)],
                        experiment_config = {
        'ExperimentName': Experiment_name ,
        'TrialName' : Trial_name_preprocessing,
        'TrialComponentDisplayName' : Trial_name_preprocessing},
                        wait=True
                        )
processors.append(data_processor)

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker:Creating processing-job with name sagemaker-scikit-learn-2021-02-28-19-13-33-728



Job Name:  sagemaker-scikit-learn-2021-02-28-19-13-33-728
Inputs:  [{'InputName': 'data', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://kdproperty/Data/dwelling_basedata_v4.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'config', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://kdproperty/Config/prediction.csv', 'LocalPath': '/opt/ml/processing/config', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-XYZ/sagemaker-scikit-learn-2021-02-28-19-13-33-728/input/code/preprocessingDataForPrediction.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'Output

In [14]:
#Stop the execution if there is an issue with creating input data for the models
job_name=data_processor.jobs[-1].describe()['ProcessingJobName']
if not(sagemaker_session.was_processing_job_successful(job_name)):
    raise Exception('Preprocessing job Failed!')    

5. Running inference jobs to predict

5.1. Script for inference with Shap Values. The file must have name inference.py!!!!
See the very last lines of the code below how Shap values are produced and returns

In [15]:
%%writefile inference.py
import json
import os
import pickle as pkl

import numpy as np

import sagemaker_xgboost_container.encoder as xgb_encoders


def model_fn(model_dir):
    """
    Deserialize and return fitted model.
    """
    model_file = "xgboost-model"
    booster = pkl.load(open(os.path.join(model_dir, model_file), "rb"))
    return booster


def input_fn(request_body, request_content_type):
    """
    The SageMaker XGBoost model server receives the request data body and the content type,
    and invokes the `input_fn`.

    Return a DMatrix (an object that can be passed to predict_fn).
    """
    if request_content_type == "text/csv":
        return xgb_encoders.csv_to_dmatrix(request_body.rstrip('\n').lstrip('\n'))
    else:
        raise ValueError(
            "Content type {} is not supported.".format(request_content_type)
        )


def predict_fn(input_data, model):
    """
    SageMaker XGBoost model server invokes `predict_fn` on the return value of `input_fn`.

    Return a two-dimensional NumPy array where the first columns are predictions
    and the remaining columns are the feature contributions (SHAP values) for that prediction.
    """
    prediction = model.predict(input_data)
    feature_contribs = model.predict(input_data, pred_contribs=True)
    output = np.hstack((prediction[:, np.newaxis], feature_contribs))
    
    return  output

Overwriting inference.py


5.2. Creating models in SageMaker to be used in interference(prediction) based on model files provided in models_ModelFiles

In [16]:
models = list()
model_names = list()
i = 0
for index, row in model_files.iterrows():
    #Try to delete if exists model and create a new model based on a model file
    name=row['Model']
    name=name.replace('_','-')
    model_data=row['ModelData']
    print(name,model_data)
    try:
        response = smclient.delete_model(ModelName=name)
        print('%s model was deleted'%name)
    except:
        print('%s model does not exist'%name)
        pass
    xgb_inference_model = XGBoostModel(
    name=name,
    model_data=model_data,
    role=role,
    entry_point='inference.py',
    framework_version="1.0-1",
    )
    models.append(xgb_inference_model)
    model_names.append(name)
    print('%s model was created'%name)
    i = i + 1  

PropertyAgeFold0 s3://kdproperty/Models/Experiments/bf2/PropertyAge-1-2021-02-16-16-37-37/output/model.tar.gz
PropertyAgeFold0 model does not exist
PropertyAgeFold0 model was created


5.3. Running transform jobs using inference.py script and models created above

In [17]:
tranform_jobs = list()
tranformers = list()
i = 0
for m,model_name in zip(models,model_names):   
    s3_batch_input='s3://%s/%s%s'%(bucket,path_to_input_data,model_name)
    s3_batch_output_model = 's3://%s/%s%s'%(bucket,path_to_output_data,model_name)
    print(model_name)
    transformer =  m.transformer(
                                              instance_count=instance_count_inference, 
                                              instance_type=instance_type_inference,
                                              output_path=s3_batch_output_model,
                                              accept='text/csv',
                                              strategy='MultiRecord',
                                              assemble_with='Line',
                                              env = {'SAGEMAKER_MODEL_SERVER_TIMEOUT' : str(transformation_job_timeout)}
                                )
    tranformers.append(transformer)
    transformer.transform(data=s3_batch_input, content_type='text/csv',split_type='Line', wait=False,
    experiment_config = {
        'ExperimentName': Experiment_name ,
        'TrialName' : Trial_name_inference,
        'TrialComponentDisplayName' : '%s-%s'%(Trial_name_inference,model_name.replace('_','-')),})
    job_name = transformer.latest_transform_job.name
    tranform_jobs.append(job_name)
    print('Job %s started'%job_name)
    i = i + 1

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


PropertyAgeFold0


INFO:sagemaker:Creating model with name: PropertyAgeFold0
INFO:sagemaker:Creating transform job with name: PropertyAgeFold0-2021-02-28-19-18-25-542


Job PropertyAgeFold0-2021-02-28-19-18-25-542 started


In [18]:
eu.wait_transform_jobs(processors=tranformers,tranform_jobs=tranform_jobs,check_every_sec=10,print_every_n_output=20,wait_min=60)

Transforming job PropertyAgeFold0-2021-02-28-19-18-25-542 status: InProgress
Continue waiting...
All Transforming Jobs are Completed


## The resulting files are huge and the best next step is to directly load them in Redshift for analyzing and visualizing in a BI tool