In [14]:
import numpy as np
#import scipy
import pandas as pd
from pandas.api.types import is_numeric_dtype
import gc
import os, psutil
import pickle

from warnings import simplefilter
simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
pd.set_option('mode.chained_assignment', None)

base_dir =  'kaggle/input/amex-default-prediction/'
filename = base_dir + 'train_data.csv'
filename_label = base_dir + 'train_labels.csv'


In [15]:
def memory_usage(metric='MB', places=1,print_out=True):
    metric_mapping = {'B':0, 'KB': 2, 'MB': 2, 'GB': 3}
    multiplier= metric_mapping[metric]
    
    mem_used = psutil.Process(os.getpid()).memory_info().rss / 1024 ** multiplier
    mem_used_total = psutil.virtual_memory()[3] /1024**multiplier
    if print_out:
        print(f'Memory used Process: {mem_used:.{places}F}{metric} Memory used Total: {mem_used_total:.{places}F}{metric}')

    return mem_used

In [16]:
#custom aggregate functions
def lm_diff(series):
    if len(series)>1:
        return series.iloc[-1] - series.iloc[-2]
    else:
        return 0

def squared_mean(series):
    return (series**2).mean()

def missing_values(series):
    return series.isna().sum()

def missing_last_value(series):
    return series.isna().sum()

def missing_last_value(series):
    return series.isna().iloc[-1].astype(int)

class amex_helper():
      
    def __init__(self, chunksize= 500000):
        self.previous_chunk_data= None
        self.chunksize = chunksize
        self.key_columns = ['customer_ID','S_2']
        self.categorical_columns = ['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']
        self.non_numeric_columns = self.key_columns + self.categorical_columns
        
        self.numeric_dtype = np.float32
        
        #Values gotten from for converting to inshttps://www.kaggle.com/code/raddar/amex-data-int-types-train/notebook
        self.int_scaling = [
            ('B_4',0,78), ('B_16',0,12), ('B_19',0,1), ('B_20',0,17), ('B_22',0,2), ('B_31',0,1), ('B_32',0,1), ('B_33',0,1), ('B_41',0,1), 
            ('D_39',0,34), ('D_44',0,8), ('D_49',0,71), ('D_51',0,3), ('D_59',48/5,48), ('D_65',0,38), ('D_70',0,4), 
            ('D_72',0,3), ('D_74',0,14), ('D_75',0,15), ('D_78',0,2), ('D_79',0,2), ('D_80',0,5), ('D_81',0,1), ('D_82',0,2), 
            ('D_83',0,1), ('D_84',0,2), ('D_86',0,1), ('D_87',0,1), ('D_89',0,9), ('D_91',0,2), ('D_92',0,1), ('D_93',0,1), 
            ('D_94',0,1), ('D_96',0,1), ('D_103',0,1), ('D_106',0,23), ('D_107',0,3), ('D_108',0,1), ('D_109',0,1),('D_111',0,2),
            ('D_113',0,5), ('D_122',0,7), ('D_123',0,1), ('D_124',22,22), ('D_125',0,1), ('D_127',0,1), ('D_129',0,1), ('D_135',0,1),
            ('D_136',0,4), ('D_137',0,1), ('D_138',0,2), ('D_139',0,1), ('D_140',0,1), ('D_143',0,1), ('D_145',0,11), ('R_2',0,1), 
            ('R_3',0,10), ('R_4',0,1), ('R_5',0,2),('R_8',0,1), ('R_9',0,6), ('R_10',0,1),('R_11',0,2), ('R_13',0,31), 
            ('R_15',0,1), ('R_16',0,2), ('R_17',0,35), ('R_18',0,31),('R_19',0,1), ('R_20',0,1), ('R_21',0,1),('R_22',0,1), 
            ('R_23',0,1), ('R_24',0,1), ('R_25',0,1), ('R_26',0,28), ('R_28',0,1), ('S_6',0,1), ('S_11',5,25), ('S_15',10/3,10),
            ('S_18',0,1), ('S_20',0,1),
            ('S_8',0,100), ('S_13',0,100) # these have overlap and should be treated differently
              ]
    
    def read_data(self, filename, update_columns=True, first_chunk_only=False, raw=False):

        print(f'Reading Data ')
        if update_columns:
            #read all columns names and update lists
            self.all_columns = pd.read_csv(filename, index_col=False, nrows=0).columns.tolist()
            self.numeric_columns = [x for x in self.all_columns if x not in self.non_numeric_columns]
            #Types of features
            # self.delinquency_features = [col for col in self.all_columns if col.startswith('D_')]
            # self.spend_features = [col for col in self.all_columns if col.startswith('S_')]
            # self.payment_features = [col for col in self.all_columns if col.startswith('P_')]
            # self.balance_features = [col for col in self.all_columns if col.startswith('B_')]
            # self.risk_features = [col for col in self.all_columns if col.startswith('R_')]
            
            print(f'Calculating data sets aggregate statistics')
            aggregate_final = None
            final_column_na = None
            final_count_values = []
            drop_cuttoff = 0.35
            
            with pd.read_csv(filename, chunksize=400000) as reader: #self.chunksize
                for i, chunk in enumerate(reader):
                    
                    #calculate chunk aggregates
                    aggregate = chunk[self.numeric_columns].agg(['mean', 'count', 'min', 'max',squared_mean]).transpose()
                    #coutn number of na in chunk
                    chunk_column_na = chunk.agg(['count',missing_values]).transpose() 
                    #get count of each value in categorical columns
                    chunk_count_values = dict()
                    for column in self.categorical_columns:
                        chunk_count_values[column] = chunk[column].value_counts().to_dict()
                    final_count_values.append(pd.DataFrame(chunk_count_values))
                    
                    del chunk
                    gc.collect()
                    
                    #update aggregates
                    if aggregate_final is not None:
                        aggregate_final['mean'] = (aggregate_final['mean']*aggregate_final['count']  + aggregate['mean']*aggregate['count'] ) / ( aggregate_final['count'] + aggregate['count'] )
                        aggregate_final['squared_mean'] = (aggregate_final['squared_mean']*aggregate_final['count']  + aggregate['squared_mean']*aggregate['count'] ) / ( aggregate_final['count'] + aggregate['count'] )
                        aggregate_final['count'] = aggregate_final['count'] + aggregate['count']
                        aggregate_final['min'] = pd.concat([aggregate_final['min'],  aggregate['min']], axis=1).min(axis=1)
                        aggregate_final['max'] = pd.concat([aggregate_final['max'],  aggregate['max']], axis=1).max(axis=1)
                    else:
                        aggregate_final = aggregate
                        
                    #update NA values
                    if final_column_na is not None:
                        final_column_na['count'] = final_column_na['count'] + chunk_column_na['count']
                        final_column_na['missing_values'] = final_column_na['missing_values'] + chunk_column_na['missing_values']
                        break
                    else:
                        final_column_na = chunk_column_na
                        
            #Na Values final 
            final_column_na['percent_missing'] = final_column_na['missing_values'] / (final_column_na['missing_values'] +final_column_na['count'])
            self.columns_drop = list(final_column_na[final_column_na['percent_missing']> drop_cuttoff].reset_index()['index'])
            #Add redundant features to drop list 
            self.columns_drop = list(set(self.columns_drop + ['D_103','D_139'])) #tied to D_107 and D_145 respectively
            
            #update meta data list for columns to remove columns that will be dropped
            self.categorical_columns = list(set(self.categorical_columns) - set(self.columns_drop ) )
            self.non_numeric_columns = list(set(self.non_numeric_columns) - set(self.columns_drop))
            self.all_columns = list(set(self.all_columns) - set(self.columns_drop))
            self.numeric_columns = list(set(self.numeric_columns ) - set(self.columns_drop))
            self.int_scaling = [x for x in self.int_scaling if x[0] not in self.columns_drop]
            
            #aggregates values final
            aggregate_final['var'] = aggregate_final['squared_mean'] - aggregate_final['mean']**2      
            aggregate_final['std'] = aggregate_final['var']**(1/2)
            aggregate_final = aggregate_final[~aggregate_final.index.isin(self.columns_drop)] #remove columns that will be dropped
            self.aggregate_stats  = aggregate_final
                       
            #merge all value counts
            final_count_values = pd.concat(final_count_values).groupby(level=0).sum()
            #remove columns that will be dropped
            final_count_values = final_count_values[~final_count_values.index.isin(self.columns_drop)]
            self.categorical_mode = final_count_values.idxmax().to_dict()
            #get dict of values to impute categorical values
            categorical_values = dict()
            for column in final_count_values.columns:
                mapping_dict = final_count_values[final_count_values[column] != 0].reset_index()['index'].to_dict()
                categorical_values[column] = { mapping_dict[x]:x for x in mapping_dict}
            self.categorical_encode = categorical_values
        print(f'Reading {filename} in chunks')
        if raw:
            return pd.read_csv(filename)
        else:
            output = []
            with pd.read_csv(filename, chunksize=self.chunksize) as reader:
                for i, chunk in enumerate(reader):
                    print(f'Reading chunk: {i+1}')
                    output.append(self.process_chunk(chunk))
                    memory_usage("GB",1)
                    
                    if first_chunk_only:
                        break
                    gc.collect()
                    
            print(f'finished reading all chunks')
            gc.collect()
            self.previous_chunk_data = None
            print(f'combining all chunks')
            data = pd.concat(output, copy=False).sort_index().reset_index(drop=True)
            print(f'minimising data types')
            self.print_memory_usage(data, label='before')
            self.update_dtypes = self.compress(data)
            data = data.astype(self.update_dtypes)
            self.print_memory_usage(data, label='compressed')
            
            return data



    def process_chunk(self, chunk):

        ##Pre Processing of dataframe chunk to make sure customer ID is all processed at same time##

        #Test to see if it is not last chunk and take last id from the DF incase it is over 2 chunks
        if len(chunk) >= self.chunksize:
            last_id_in_chunk = chunk['customer_ID'].iloc[-1]
            last_id_in_chunk_data = chunk[chunk['customer_ID']==last_id_in_chunk].copy()
            chunk = chunk.loc[chunk['customer_ID']!=last_id_in_chunk]
        else: 
            last_id_in_chunk_data = None

        #Check if any previous chunk data
        if self.previous_chunk_data is not None: 
            chunk = pd.concat([self.previous_chunk_data,chunk])
        

        self.previous_chunk_data = last_id_in_chunk_data
        
        #calculate NA aggregates
        x_na_aggregate = chunk.groupby("customer_ID")[self.numeric_columns].agg([missing_values, missing_last_value])
        x_na_aggregate.columns = ['_'.join(x) for x in x_na_aggregate.columns]

        #Drop columns with too many na
        chunk.drop(self.columns_drop, axis=1, inplace=True)
        
        #fill NA
        numeric_means = self.aggregate_stats['mean'].to_dict()
        numeric_means = { x:numeric_means[x] for x in numeric_means if  x.startswith('R_') or x.startswith('D_')}
        print('filling na')
        chunk.fillna(numeric_means, inplace=True)
        chunk.fillna(self.categorical_mode, inplace=True)
        print('filling 0')
        chunk.fillna(0, inplace=True) #fill remaining with 0

        #encode categorical values
        chunk.replace(self.categorical_encode, inplace=True)
        
        #set numeric datatype
        update_dtypes_numeric = {x: self.numeric_dtype for x in self.numeric_columns}
        chunk = chunk.astype(update_dtypes_numeric)
        
        update_dtypes_categorical = {x: np.int16 for x in self.categorical_columns}
        chunk = chunk.astype(update_dtypes_categorical)
                
        #clip outliers to max/min values
        n_deviations_limit = 10
        upper_limit = list(self.aggregate_stats['mean'] + self.aggregate_stats['std'] * n_deviations_limit)
        lower_limit = list(self.aggregate_stats['mean'] - self.aggregate_stats['std'] * n_deviations_limit)
        chunk[self.numeric_columns] = chunk[self.numeric_columns].clip(lower_limit,upper_limit, axis=1)
        
        #Remove noise from float columns ie turn float -> int
        for column_conversion in self.int_scaling: #column_conversion -> (column_name, add_value, multiply_value)
            chunk[column_conversion[0]] = ((chunk[column_conversion[0]] +  column_conversion[1])*column_conversion[2]).round(0).astype(np.int16)
#             if column_conversion[0] =='S_13':
#                 print(chunk[column_conversion[0]].unique())

        
        #create aggregates for each customer_id numeric columns
        print('Doing Aggregates')
        x_aggregate = chunk.groupby("customer_ID")[self.numeric_columns].agg(['first', 'mean', 'std', 'min', 'max', 'last', lm_diff])
        x_aggregate.columns = ['_'.join(x) for x in x_aggregate.columns]
        #fill std columns with 0 incase of only one value as it results in nan
        std_fill_na_columns = {x:0 for x in x_aggregate.columns if '_std' in x}
        x_aggregate.fillna(std_fill_na_columns, inplace=True)
        
        #create aggregates for each customer_id categorical columns
        x_aggregate_category = chunk.groupby("customer_ID")[self.categorical_columns].agg(['first', 'last']) #,'nunique''count',  removed because dont seem to add much value
        x_aggregate_category.columns = ['_'.join(x) for x in x_aggregate_category.columns]
        
        #Get the number of months a record has been active
        chunk['S_2'] = pd.to_datetime(chunk['S_2'])
        x_date = chunk.groupby("customer_ID")['S_2'].agg(['first','last'])
        x_aggregate['months'] = (x_date['last'].dt.year  - x_date['first'].dt.year)*12 + (x_date['last'].dt.month  - x_date['first'].dt.month)+1
        
        del chunk, x_date
        gc.collect()

        #Features for how metrics have changed over time 
        for column in  x_aggregate:
            if 'first' in column:
                column_first = column
                column = column.replace('_first', '')
                column_last = column +'_last'

                x_aggregate[column+'_change_sub'] = (x_aggregate[column_first] - x_aggregate[column_last])/ x_aggregate['months']
                x_aggregate[column+'_change_div'] = ((x_aggregate[column_first] / x_aggregate[column_last])/ x_aggregate['months']).replace([np.inf, -np.inf], np.nan).fillna(0)
        
        
        # #create ratio features between payment and spending
        # payment_features = [col.replace('_first', '') for col in x_aggregate if col.startswith('P_') and col.endswith('_first')]
        # spend_features = [col.replace('_first', '') for col in x_aggregate if col.startswith('S_') and col.endswith('_first')]
        
        # column_suffix = ['_mean', '_last']
        # for p_column in payment_features:
        #     for s_column in spend_features:
        #         column_name = f'{p_column}_{s_column}'
        #         for suffix in column_suffix:
        #             x_aggregate[column_name + '_ratio' + suffix] = (x_aggregate[p_column+suffix] / x_aggregate[s_column+suffix]).replace([np.inf, -np.inf], np.nan).fillna(0)
        #             x_aggregate[column_name + '_sub' + suffix] = (x_aggregate[p_column+suffix] - x_aggregate[s_column+suffix])
                

        #create combination features
        features = ['B_3','B_1','B_37','B_9','B_2','B_7','B_18','D_48','D_44','D_39','P_2','P_3','P_4','R_1','R_2','R_3','S_3','S_23','S_7']
        column_suffix = '_last'
        for p_column in features:
            #print(p_column)
            for s_column in features:
                column_name = f'{p_column}_{s_column}'
                x_aggregate[column_name + '_multi' + column_suffix] = (x_aggregate[p_column+column_suffix] * x_aggregate[s_column+column_suffix])
                if p_column != s_column:
                    x_aggregate[column_name + '_ratio' + column_suffix] = (x_aggregate[p_column+column_suffix] / x_aggregate[s_column+column_suffix]).replace([np.inf, -np.inf], np.nan).fillna(0)
                    x_aggregate[column_name + '_sub' + column_suffix] = (x_aggregate[p_column+column_suffix] - x_aggregate[s_column+column_suffix])


        update_cols = {x: np.float32 for x in x_aggregate.select_dtypes(include=[float]).columns}
        x_aggregate = x_aggregate.astype(update_cols)
        
        #feature for has the category change over time
        for column in  x_aggregate_category:
            if 'first' in column:
                column_first = column
                column = column.replace('_first', '')
                column_last = column +'_last'

                x_aggregate_category[column+'_change'] = (x_aggregate_category[column_first] == x_aggregate_category[column_last]).astype(np.int16)
                
        chunk = pd.concat([x_aggregate, x_aggregate_category, x_na_aggregate], axis=1)
        
        #drop first aggregate
        columns_drop = [x for x in chunk.columns if '_first' in x]
        chunk.drop(columns_drop, axis=1, inplace=True)

        return chunk
    
    def print_memory_usage(self, data, metric='MB', label=''):
        metric_mapping = {'B':0, 'KB': 2, 'MB': 2, 'GB': 3}
        multiplier= metric_mapping[metric]
        memory = data.memory_usage().sum() / (1024**multiplier)
        print(f'Memory usage {label}: {memory:.2f}{metric}')

    
    def compress(self, data):
        INT8_MIN    = np.iinfo(np.int8).min
        INT8_MAX    = np.iinfo(np.int8).max
        INT16_MIN   = np.iinfo(np.int16).min
        INT16_MAX   = np.iinfo(np.int16).max
        INT32_MIN   = np.iinfo(np.int32).min
        INT32_MAX   = np.iinfo(np.int32).max

        FLOAT16_MIN = np.finfo(np.float16).min
        FLOAT16_MAX = np.finfo(np.float16).max
        FLOAT32_MIN = np.finfo(np.float32).min
        FLOAT32_MAX = np.finfo(np.float32).max
        column_dtypes = {}
        for col in data.columns:
            col_dtype = data[col][:100].dtype

            if col_dtype != 'object':
                col_series = data[col]
                col_min = col_series.min()
                col_max = col_series.max()

                if col_dtype == 'float64':
#                     if (col_min > FLOAT16_MIN) and (col_max < FLOAT16_MAX):
#                         column_dtypes[col] = np.float16
#                     elif
                    if (col_min > FLOAT32_MIN) and (col_max < FLOAT32_MAX):
                        column_dtypes[col] = np.float32
                    else:
                        pass


                if col_dtype == 'int64':
                    if (col_min > INT8_MIN/2) and (col_max < INT8_MAX/2):
                        column_dtypes[col] = np.int8
                    elif (col_min > INT16_MIN) and (col_max < INT16_MAX):
                        column_dtypes[col] = np.int16
                    elif (col_min > INT32_MIN) and (col_max < INT32_MAX):
                        column_dtypes[col] = np.int32
                    else:
                        pass
        return column_dtypes
    
    


In [17]:
# a_amex_helper = amex_helper(chunksize= 1000)
# x_input = a_amex_helper.read_data(filename, first_chunk_only=True)
# x_input.head(5)

In [18]:
a_amex_helper = amex_helper(chunksize= 400000)
x_input = a_amex_helper.read_data(filename, first_chunk_only=False)
y_input = pd.read_csv(filename_label)
y_input = y_input.loc[x_input.index, :]
gc.collect()

Reading Data 
Calculating data sets aggregate statistics
Reading kaggle/input/amex-default-prediction/train_data.csv in chunks
Reading chunk: 1
filling na
filling 0
Doing Aggregates
Memory used Process: 2.0GB Memory used Total: 16.0GB
Reading chunk: 2
filling na
filling 0
Doing Aggregates
Memory used Process: 2.3GB Memory used Total: 15.9GB
Reading chunk: 3
filling na
filling 0
Doing Aggregates
Memory used Process: 2.6GB Memory used Total: 16.1GB
Reading chunk: 4
filling na
filling 0
Doing Aggregates
Memory used Process: 3.0GB Memory used Total: 16.1GB
Reading chunk: 5
filling na
filling 0
Doing Aggregates
Memory used Process: 2.6GB Memory used Total: 17.3GB
Reading chunk: 6
filling na
filling 0
Doing Aggregates
Memory used Process: 2.6GB Memory used Total: 15.9GB
Reading chunk: 7
filling na
filling 0
Doing Aggregates
Memory used Process: 2.7GB Memory used Total: 16.4GB
Reading chunk: 8
filling na
filling 0
Doing Aggregates
Memory used Process: 2.7GB Memory used Total: 16.8GB
Reading c

0

In [19]:
x_input.to_parquet('x_input.parquet')
y_input.to_parquet('y_input.parquet')

# Store data (serialize)
with open('helper_object.pickle', 'wb') as handle:
    pickle.dump(a_amex_helper, handle, protocol=pickle.HIGHEST_PROTOCOL)