In [14]:
"""
Changes:
    latest object key is not getting latest object during this test. 
    Instead it is getting specific key to training data.
    
    commented out installs

"""



import os
os.environ['AWS_DEFAULT_REGION'] = 'eu-west-2'


# import subprocess
# import sys
# # Install sagemaker and xgboost as not present 
# subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker', 'xgboost'])


## import libraries
import pandas as pd
import numpy as np
import xgboost
import io
import sagemaker, boto3
from sagemaker import get_execution_role
from sagemaker.inputs import TrainingInput 
from sklearn.metrics import precision_score, recall_score
from datetime import datetime
import pytz

# Create a timezone object for London
tz = pytz.timezone('Europe/London')



"""
Script to train XGBoost model.
Code fetches data from specified S3 location, cleans data, saves to S3 training data location,
initiates XGBoost container, trains model and saves model back into S3 for 
future deployment
"""

## get data, Duh!

from typing import Tuple
from pandas import DataFrame
from datetime import datetime
import pytz

# Create a timezone object for London
tz = pytz.timezone('Europe/London')

def get_data(bucket, prefix, file_path) -> Tuple[DataFrame, DataFrame, DataFrame]:
    
    """
    This function gets the latest training-data file added to the training-data location in S3.
    This is the same location that triggers the training pipeline which executes this script when a new 
    file is uploaded to S3. The function then returns a pandas DataFrame.
    
    Parameters:
        bucket: The name of the S3 bucket to fetch and deposit data, currently 'cmg-sagemaker-compliance-cases-data'.
        prefix: The name of folder where the training data is stored, currently 'training_data'.
        file_path: The name of the folder in S3 for saving temp pre-processing data.

    Returns:
        Tuple[DataFrame, DataFrame, DataFrame]: training_data, testing_data, validation_data.
        + makes available training and validation data in S3.

    """
    
    print(f"{datetime.now(tz).strftime('%H:%M:%S')}: Getting Data", flush=True) ## Print status Message to logs.
    
    s3 = boto3.resource('s3')     ## Create an Amazon S3 resource object.
    s3_bucket = s3.Bucket(bucket) ## Create a bucket object for a specific S3 bucket.

    ## Get a list of objects in the bucket.
    objects = list(s3_bucket.objects.filter(Prefix=prefix))

    ## Sort the objects by last modified date.
    objects.sort(key=lambda obj: obj.last_modified, reverse=True)

    ## Get the key of the latest object.
    latest_object_key = 'trainingBucket/April-Dataset_v3.csv' # objects[0].key
    
    ## Retriev CSV file from S3 bucket and load it into a pandas DataFrame.
    s3 = boto3.client('s3') ## Change s3 object to client interface.
    obj = s3.get_object(Bucket= bucket, Key= latest_object_key) ## Get object from S3.
    df = pd.read_csv(io.BytesIO(obj['Body'].read())) ## Convert object to Pandas DataFrame
    
    ## Select features needed for model.
    df = df[['master_case_number',
             'target',

           'prev_rlq_percent_comp',
           'cur_rlq_percent_comp',
           'curr_rlq_paid',
           'curr_rlq_due',

         'direct_payment',
         'change_income_when',
         'full_year_liability_per_case',
         'pwc_age_diff',
         'dfb_failure_when',
         'deo',
         'credit_4',
         'dfb_failure_duration',
         'debit_std',
         'liability_allocated_1',
         'call_inbound_pwc_int_cnt',
         'annual_review_count',
         'unemployed',
         'out_of_arrears_when',
         'deo_when',
         'liability_allocated_median',
         'compliance_notification_count',
         'liability_due_4',
         'liability_due_median',
         'end_employer_when',
         'sum_qc_age_group_3',
         'standing_order_pmop',
         'no_action_when',
         'sr_open',
         'credit_1',
         'liability_allocated_sum']]
            
    ## Split data into training, testing and validation datasets.
    training_data, testing_data, validation_data = train_test_split(df, bucket, file_path)
    
    #print(f'{datetime.now()}: XGBoost ', flush=True) ## Print status Message to logs.
    
    display(df)
    
    return training_data, testing_data, validation_data

def train_test_split(df, bucket, file_path):
    
    
    ## create 'target' object to move target column to start of the data.
    target = df.target
    ## remove unnecessary features.
    df = df.drop(columns=[
                            'master_case_number',
                            'target'])
    
    ## Make 'Target' first column.
    df = pd.concat([target, df], axis=1)

    ## split data into training, test and validation.
    np.random.seed(42)
    train_data, test_data, validation_data = np.split(
        df.sample(frac=1, random_state=42),
        [int(0.8 * len(df)), int(0.9 * len(df))], ## slices = train:80%, test:10%, val:10% 
    )
    
    
    ## upload cleaned data to S3.
    upload(train_data, bucket, file_path, 'temp_train.csv') 
    upload(test_data, bucket,  file_path, 'temp_test.csv') 

    return train_data, test_data, validation_data


def upload(df, bucket, file_path, file_name):
    ## Keep df as StringIO object in memory, reducing need for large instance storage.
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, header=False, index=False)
    boto3.client('s3').put_object(Body=csv_buffer.getvalue(), Bucket=bucket, Key=file_path+'/'+file_name) 

    
def get_data_pointer(bucket, file_path):
    ## Create pointer objects that direct the estimator to the data in S3.
    s3_input_train = TrainingInput(
        s3_data="s3://{}/{}/temp_train".format(bucket, file_path), content_type="csv", input_mode='Pipe'
    ) ## pipe_mode streams the data from S3 to the training script reducing the amount of memory required.

    ## Define testing input to algorithm.
    s3_input_test = TrainingInput(
        s3_data="s3://{}/{}/temp_test".format(bucket, file_path), content_type="csv", input_mode='Pipe'
    ) 
    return s3_input_train, s3_input_test



def model(bucket, file_path):
    
    print(f"{datetime.now(tz).strftime('%H:%M:%S')}: Defining Model", flush=True)
    
    """Fetch the AWS XGBoost algorithm container. 
    Define some container parameters. 
    Set some estimator hyper-parameters.
    """
    
    ## set container parameters
    #from sagemaker.debugger import Rule, rule_configs
    
    ## this just needs defining for some reason.
    role = get_execution_role()

    # Sagemaker session
    sess = sagemaker.Session() #"""prob dont need this now"""

    ## get xgboost container
    container = sagemaker.image_uris.retrieve("xgboost", sess.boto_region_name, "1.5-1")

#     ## Create a SageMaker XGBoost Estimator
#     xgb = sagemaker.estimator.Estimator(
#         image_uri=container,
#         entry_point="custom_eval.py",  ## Path to your custom training script with custom eval
#         #framework_version="1.5-1",
#         role=role,
#         instance_count=1,
#         instance_type='ml.p3.2xlarge',
#         output_path="s3://cmg-sagemaker-compliance-cases-data/model_dev_pipeline/model_artefacts/",
#     )
    
    ## define estimator parameters.
    xgb = sagemaker.estimator.Estimator(
        container,
        role,
        instance_count=1,
        instance_type='ml.p3.2xlarge',
        output_path="s3://cmg-sagemaker-compliance-cases-data/model_dev_pipeline/model_artefacts/",
        sagemaker_session=sess,
        #rules=[Rule.sagemaker(rule_configs.create_xgboost_report())],
    )

    ## set basic estimator hyper-parameters.
    ## XGB specific hyps will be searched for using hyperparameterTuner.
    xgb.set_hyperparameters(
                            eval_metric="auc",
                            objective="binary:logistic",
                            num_round=10,
                            rate_drop=0.3,
                            tweedie_variance_power=1.4,
                            seed=42,
    )
    ## How should the algorithm evaluate the model?
    objective_metric_name = "validation:auc"
    
    return xgb



def fit_model(xgb, s3_input_train, s3_input_test):
    
    print(f"{datetime.now(tz).strftime('%H:%M:%S')}: fitting model", flush=True) ## Print status Message to logs.
    
    xgb.fit({"train": s3_input_train, "validation": s3_input_test})#, job_name=job_name)
    return xgb



# ## check_data_exists
# def check_data_exists(train_path, validation_path):
#     if not os.path.exists(train_path):
#         print(f"Training data not found at {train_path}")
#     else:
#         print(f"Training data found at {train_path}")

#     if not os.path.exists(validation_path):
#         print(f"Validation data not found at {validation_path}")
#     else:
#         print(f"Validation data found at {validation_path}")

# # Specify your paths
# train_path = '/opt/ml/input/data/train'
# validation_path = '/opt/ml/input/data/validation'

# # Check if data exists at the paths
# check_data_exists(train_path, validation_path)



def evaluate_model_and_upload(xgb, validation_data, bucket):
    
    import boto3
    from sklearn.metrics import precision_score, recall_score
    from sagemaker.serializers import CSVSerializer
    from sagemaker.deserializers import CSVDeserializer


    
    X_test = validation_data.drop('target', axis=1)
    y_test = validation_data.target
    print('length X_test', X_test.shape, flush=True) ## Print shape of data to logs.
    print('length y_test', y_test.shape, flush=True)

    
    # Assuming you have an Estimator object named 'estimator'
    # that you used to train your model

    # Deploy the trained model to an endpoint
    predictor = xgb.deploy(
        initial_instance_count=1,
        instance_type='ml.m4.xlarge',
        serializer=CSVSerializer(),
        deserializer=CSVDeserializer()
    )

    # Use the predictor's predict method to make predictions on new data
    y_pred = predictor.predict(X_test)
    print('length y_pred', len(y_pred), flush=True) ## Print length of y_pred for logs.
    
    # Convert y_test and y_pred to a common data type
    y_test = y_test.astype(float)
    y_pred = np.array(y_pred).astype(float)
    # Threshold the predicted probabilities to obtain binary predictions
    y_pred = (y_pred > 0.5).astype(int)

    # Calculate the precision and recall
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    print('Precision: ', precision, flush=True) ## Print metrics Messages to logs.
    print('Recall ', recall, flush=True)

    # Check if the precision and recall are both greater than 0.7
    if precision > 0.7 and recall > 0.7:
        print(f"{datetime.now(tz).strftime('%H:%M:%S')}: Model Passed Varification", flush=True) ## Print status Message to logs.

        # Create an S3 client
        s3 = boto3.client('s3')

        # Set the source and target S3 locations
        source_prefix = 'model_dev_pipeline/model_artefacts/'
        target_prefix = 'model_artefacts/'

        # Copy the latest model artifact from the source S3 location to the target S3 location
        copy_latest_model_artifact(s3, bucket, source_prefix, target_prefix)

    else:
        print(f"{datetime.now(tz).strftime('%H:%M:%S')}: Model Failed Varification", flush=True) ## Print status Message to logs.
        
    ## delete endpoint used for evaluation.
    delete_endpoint(predictor)


def copy_latest_model_artifact(s3_client, bucket, source_prefix, target_prefix):

    # List the folders within the source folder
    result = s3_client.list_objects_v2(Bucket=bucket, Prefix=source_prefix, Delimiter='/')
    latest_folder = max(result.get('CommonPrefixes', []), key=lambda x: x['Prefix'])

    # List the objects within the latest folder
    result = s3_client.list_objects_v2(Bucket=bucket, Prefix=latest_folder['Prefix'])
    for content in result.get('Contents', []):
        # Construct the source and target keys for each object
        source_key = content['Key']
        print('\n source key', source_key, flush=True)
        target_key = target_prefix + source_key[len(source_prefix):]
        print('target key', target_key, flush=True)
        # Copy the object from the source S3 location to the target S3 location
        s3_client.copy_object(
            CopySource={'Bucket': bucket, 'Key': source_key},
            Bucket=bucket,
            Key=target_key
        )

    print(f"{datetime.now(tz).strftime('%H:%M:%S')}: Model Uploaded to S3", flush=True) ## Print status Message to logs.
    
def delete_endpoint(predictor):
    predictor.delete_endpoint()
    
    

    
#-------------------------------------------------------------------------------------

    
bucket =    'cmg-sagemaker-compliance-cases-data' ## S3 Bucket for training data.
prefix =    'training_data'      ## Subfolder in bucket for retrieving training data.
file_path = 'temp_training_data' ## Subfolder in bucket for saving temp data and model artefacts.


## Get data
training_data, testing_data, validation_data = get_data(bucket, prefix, file_path)

## Create pointer objects to data in S3
s3_input_train, s3_input_test = get_data_pointer(bucket, file_path)

## Def model
xgb = model(bucket, file_path)

## Fit model to data
xgb = fit_model(xgb, s3_input_train, s3_input_test )

## Evaluate model and if precision and recall both > 0.7, then save to S3.
evaluate_model_and_upload(xgb, validation_data, bucket)



14:03:18: Getting Data


Unnamed: 0,master_case_number,target,prev_rlq_percent_comp,cur_rlq_percent_comp,curr_rlq_paid,curr_rlq_due,direct_payment,change_income_when,full_year_liability_per_case,pwc_age_diff,...,compliance_notification_count,liability_due_4,liability_due_median,end_employer_when,sum_qc_age_group_3,standing_order_pmop,no_action_when,sr_open,credit_1,liability_allocated_sum
0,1-529368335,1,53.0,96.0,388.0,406.0,0,0.0,1687.0,0,...,0.0,32.0,32.0,0.0,0.0,1,0.0,1.0,200.0,97.0
1,1-4608924138,1,114.0,116.0,2413.0,2084.0,0,0.0,8548.0,0,...,0.0,726.0,495.0,0.0,1.0,1,0.0,0.0,363.0,3080.0
2,1-6652411617,1,126.0,117.0,675.0,579.0,0,0.0,2321.0,0,...,0.0,45.0,45.0,14.0,0.0,0,0.0,1.0,92.0,134.0
3,1-5723622145,1,102.0,116.0,777.0,672.0,0,0.0,2727.0,0,...,0.0,232.0,224.0,0.0,0.0,0,0.0,0.0,142.0,1606.0
4,1-5019975231,1,110.0,94.0,282.0,300.0,0,0.0,1215.0,0,...,0.0,98.0,96.0,0.0,1.0,1,0.0,0.0,103.0,798.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6937,1-41749522344,0,81.0,106.0,1113.0,1051.0,0,10.0,2599.0,0,...,0.0,89.0,89.0,0.0,0.0,0,0.0,2.0,50.0,695.0
6938,1-73415578763,0,93.0,100.0,1664.0,1660.0,0,0.0,6658.0,0,...,0.0,128.0,128.0,0.0,0.0,0,0.0,0.0,154.0,1174.0
6939,1-66572836486,0,88.0,96.0,215.0,223.0,0,0.0,971.0,0,...,0.0,19.0,19.0,0.0,0.0,0,0.0,0.0,19.0,152.0
6940,1-6168485889,0,70.0,138.0,1170.0,849.0,0,0.0,7136.0,0,...,0.0,282.0,278.0,0.0,1.0,0,87.0,0.0,298.0,849.0


14:03:19: Defining Model


INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


14:03:19: fitting model


INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2023-10-16-13-03-19-700


Using provided s3_resource
2023-10-16 13:03:19 Starting - Starting the training job......
2023-10-16 13:03:57 Starting - Preparing the instances for training......
2023-10-16 13:05:00 Downloading - Downloading input data...
2023-10-16 13:05:51 Training - Training image download completed. Training in progress.....[34m[2023-10-16 13:06:14.842 ip-10-0-170-92.eu-west-2.compute.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2023-10-16 13:06:14.905 ip-10-0-170-92.eu-west-2.compute.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2023-10-16:13:06:15:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2023-10-16:13:06:15:INFO] Failed to parse hyperparameter eval_metric value auc to Json.[0m
[34mReturning the value itself[0m
[34m[2023-10-16:13:06:15:INFO] Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34m[2023-10-16:13:06:15:INFO] Invoking 


2023-10-16 13:06:39 Uploading - Uploading generated training model
2023-10-16 13:06:39 Failed - Training job failed


UnexpectedStatusException: Error for Training job sagemaker-xgboost-2023-10-16-13-03-19-700: Failed. Reason: AlgorithmError: ExecuteUserScriptError:
Command "/miniconda3/bin/python3 -m custom_eval --eval_metric auc --num_round 10 --objective binary:logistic --rate_drop 0.3 --seed 42 --tweedie_variance_power 1.4", exit code: 1