## Import Modules

In [4]:
import pandas as pd

import os
import boto3

import sagemaker
from sagemaker.network import NetworkConfig
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor

## Set up Processing Job

In [3]:
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

bucket = 'cdo-ililapse-364524684987-bucket'
kms_key = 'arn:aws:kms:us-east-1:364524684987:key/a083c34e-20a5-4a4b-a2d7-43ab86c2acf8'
subnets = ['subnet-0c6285a21fd5322ce', 'subnet-0c8948f11b21bc735', 'subnet-0c8687b8292e1ebd5']
security_group_ids= ['sg-027a01f4d54125468']

sm_session= sagemaker.Session()
sm_session._default_bucket= bucket
sm_client = boto3.client('sagemaker')
s3_client = boto3.client('s3')

In [4]:
network_kwargs= dict(
    subnets=subnets,
    security_group_ids=security_group_ids,
    enable_network_isolation=True,
    encrypt_inter_container_traffic=True
)

In [5]:
my_xid = 'x268052'

In [6]:
sklearn_processor = SKLearnProcessor(                 # from sagemaker.sklearn.processing import SKLearnProcessor
    framework_version = '0.23-1',
    role = role,                                      # sagemaker.get_execution_role()
    instance_type = 'ml.m5.24xlarge',
    instance_count = 1,
    volume_kms_key = kms_key,                         # os.environ['CDO_KMS_KEY']
    output_kms_key = kms_key,
    base_job_name = f'{my_xid}-preprocessing',       
    sagemaker_session = sm_session,                   # sagemaker.Session()
    network_config = NetworkConfig(**network_kwargs)  # from sagemaker.network import NetworkConfig
)

In [7]:
pfmc_events_input_path_s3 = 's3://cdo-ililapse-364524684987-bucket/x268052/data/pfmc/events'
pfmc_inf_input_path_s3 = 's3://cdo-ililapse-364524684987-bucket/x268052/data/pfmc/inf'

pfmc_ouput_path_s3 = 's3://cdo-ililapse-364524684987-bucket/x268052/data/pfmc/preprocessed'

## Preprocessing script

In [12]:
%%writefile '/root/scripts/pfmc_preprocess_script_whole_dataset.py'

################################################ import modules ################################################

import pandas as pd
import numpy as np
import datetime
import glob

import argparse

# import warnings
# warnings.filterwarnings("ignore")


# !pip install pytictoc
# !pip install pytictoc --upgrade

# from pytictoc import TicToc


################################################ paths ################################################

# define input paths
pfmc_events_input_path = '/opt/ml/processing/input/events'
pfmc_inf_input_path = '/opt/ml/processing/input/inf'

# define output paths
pfmc_output_path = '/opt/ml/processing/output/preprocessed'


################################################ functions ################################################

def collect_input(path, file_format='csv'):
    '''
    Collect input files and return as a single DataFrame
    
    params:
        path (str) : path to local directory with CSV files
        
    returns:
        full_frame (DataFrame) : concatenated DataFrame with the input files
    '''
    
    all_files = glob.glob(path + f'/*.{file_format}')

    df_list = []

    for filename in all_files:
        df_part = pd.read_csv(filename, encoding="ISO-8859-1")
        df_list.append(df_part)

    full_frame = pd.concat(df_list, axis=0, ignore_index=True)
    
    return full_frame
    
################################################ main ################################################

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--my_xid', type=str, default=None)
    args, _ = parser.parse_known_args()
    
    
    # collect the input arguments 
    my_xid = args.my_xid
    
    
################################################ import data from s3 ################################################
        
#     t = TicToc() #create instance of class
#     t.tic()
    
    # collect input as DataFrame
    events = collect_input(pfmc_events_input_path)    
    pfmc = collect_input(pfmc_inf_input_path) 
    
#     t.toc('Reading the files took:', restart=True)
    
################################################ processing ################################################

    
    ##### pfmc_inf #####
    pfmc['val_dt']= pfmc['val_dt'].astype('datetime64[ns]')

    # remove NAT time 
    pfmc = pfmc.dropna(subset=['val_dt'])
    pfmc['pfmc_cur_month'] = pfmc['val_dt'].map(lambda x: x.strftime('%Y-%m'))
    
    
    ##### events_monthly #####
    events['prcs_dt']= events['prcs_dt'].astype('datetime64[ns]')
    events['evnt_cur_month'] = events['prcs_dt'].map(lambda x: x.strftime('%Y-%m'))
    events['evnt_3_month_ago'] =  ((events['prcs_dt'] - pd.DateOffset(months=3)).dt.to_period("M")).astype(str)
    
    # indicator
    events['lapse_ind'] = (np.where(events['evnt_desc'].str.contains('Lapse', na=False),1, 0)).astype(int)
    events['surrender_ind'] = (np.where(events['evnt_desc'].str.contains('Surrender',na=False),1, 0)).astype(int)
    events['reinstate_ind'] = (np.where(events['evnt_desc'].str.contains('True Reinstatement',na=False),1, 0)).astype(int)
    
#     t.toc('Processing took', restart=True)
    
    ##### join #####
    # Merge pfmc and events
    df = pfmc.merge(events[['agmt_pkge_id','evnt_desc','lapse_ind', 'surrender_ind', 'reinstate_ind','evnt_3_month_ago', 
                            'evnt_cur_month']], 
                  how='left', 
                  left_on=['agmt_pkge_id', 'pfmc_cur_month'], 
                  right_on=['agmt_pkge_id', 'evnt_3_month_ago'], 
                  suffixes=('', '_remove'))

    df.drop([i for i in df.columns if 'remove' in i],axis=1, inplace=True)
    
#     t.toc('Merging took:', restart=True)
    
    
    # this causes mixed type
    del df["iss_rmo"]
    del df['limra_lob_desc']
        
################################################ output data to s3 ################################################

    df.to_parquet(f'{pfmc_output_path}/preprocessed_pfmc_inf_events_03.2019-12.2022_whole_dataset.parquet')
#     t.toc('Output to parquet took:', restart=True)    

Overwriting /root/scripts/pfmc_preprocess_script_whole_dataset.py


## Run Processing Job

In [13]:
%%time
# script that does the pre-processing for 2019-2022 data: 1 hour and 12 mins wall time

sklearn_processor.run(
    code='/root/scripts/pfmc_preprocess_script_whole_dataset.py',
    inputs=[
        ProcessingInput(input_name= 'preprocesing_input_pmfc_events', source= pfmc_events_input_path_s3, destination= '/opt/ml/processing/input/events'),
        ProcessingInput(input_name= 'preprocesing_input_pfmc_inf', source= pfmc_inf_input_path_s3, destination= '/opt/ml/processing/input/inf')
    ],
    outputs=[
        ProcessingOutput(output_name= 'preprocessing_output', source= '/opt/ml/processing/output/preprocessed', destination= pfmc_ouput_path_s3)
    ],
    arguments=['--my_xid', my_xid]
)


Job Name:  x268052-preprocessing-2023-01-17-19-18-36-685
Inputs:  [{'InputName': 'preprocesing_input_pmfc_events', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://cdo-ililapse-364524684987-bucket/x268052/data/pfmc/events', 'LocalPath': '/opt/ml/processing/input/events', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'preprocesing_input_pfmc_inf', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://cdo-ililapse-364524684987-bucket/x268052/data/pfmc/inf', 'LocalPath': '/opt/ml/processing/input/inf', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://cdo-ililapse-364524684987-bucket/x268052-preprocessing-2023-01-17-19-18-36-685/input/code/pfmc_preprocess_script_whole_dataset.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputM

## Check out output

In [None]:
pfmc = pd.read_parquet('s3://cdo-ililapse-364524684987-bucket/x268052/data/pfmc/preprocessed/\
preprocessed_pfmc_inf_events_03.2019-12.2022_whole_dataset.parquet')

In [5]:
# print(pfmc.shape)
# pfmc.head()