# Featuretools with DASK

In [1]:
import pandas as pd
import numpy as np
import featuretools as ft
import time
import os
import matplotlib.pyplot as plt
import seaborn as sns
import dask.bag as db
from dask.distributed import Client
% matplotlib inline

In [3]:
DATA_PATH = 'Home_credit_data/'
app_train = pd.read_csv(DATA_PATH + 'application_train.csv')
# app_test = pd.read_csv(DATA_PATH + 'application_test.csv')
bureau = pd.read_csv(DATA_PATH + 'bureau.csv')
bureau_balance = pd.read_csv(DATA_PATH + 'bureau_balance.csv')
cash = pd.read_csv(DATA_PATH + 'POS_CASH_balance.csv')
credit = pd.read_csv(DATA_PATH + 'credit_card_balance.csv')
previous = pd.read_csv(DATA_PATH + 'previous_application.csv')
installments = pd.read_csv(DATA_PATH + 'installments_payments.csv')

In [4]:
# quick cleaning: unrealistic days value (365243 days is > 1000 years)
app_train = app_train.replace({365243: np.nan})
# app_test = app_test.replace({365243: np.nan})
bureau = bureau.replace({365243: np.nan})
bureau_balance = bureau_balance.replace({365243: np.nan})
cash = cash.replace({365243: np.nan})
credit = credit.replace({365243: np.nan})
previous = previous.replace({365243: np.nan})
installments = installments.replace({365243: np.nan})

### Changing types to save memory

In [5]:
def change_variable_types(df):
    """Changes data types to be memory efficient"""
    for col in df:
        # IDs and bool to int
        if ('SK_ID' in col):
            df[col] = df[col].fillna(0).astype(np.int32)   
        # objects to category
        elif (df[col].dtype == 'object') and (df[col].nunique() < df.shape[0]):
            df[col] = df[col].astype('category')    
        # float64 to float32
        elif df[col].dtype == float:
            df[col] = df[col].astype(np.float32)
        # int64 to int32
        elif df[col].dtype == int:
            df[col] = df[col].astype(np.int32)
        # bool to int
        elif set(df[col].unique()) == {0, 1}:
            df[col] = df[col].astype(bool) 
    return df

In [6]:
app_train = change_variable_types(app_train)
bureau = change_variable_types(bureau)
bureau_balance = change_variable_types(bureau_balance)
cash = change_variable_types(cash)
credit = change_variable_types(credit)
previous = change_variable_types(previous)
installments = change_variable_types(installments)

### Partitioning

In [7]:
# adding SK_ID_CURR to bureau_balance
bureau_balance = bureau_balance.merge(bureau[['SK_ID_CURR', 'SK_ID_BUREAU']], 
                                      on='SK_ID_BUREAU',
                                      how='left')

# SK_ID_CURR has been added as a float64 so we call the change_variable_types function again
bureau_balance = change_variable_types(bureau_balance)

In [8]:
def partition_datasets(clients_list, num_partition):
    """Partitions the datasets based on who is in the clients_list argument"""
    # subsetting
    app_train_subset = app_train[app_train.index.isin(clients_list)].copy()
    bureau_subset = bureau[bureau.index.isin(clients_list)].copy()
    bureau_balance_subset = bureau_balance[bureau_balance.index.isin(clients_list)].copy()
    cash_subset = cash[cash.index.isin(clients_list)].copy()
    credit_subset = credit[credit.index.isin(clients_list)].copy()
    previous_subset = previous[previous.index.isin(clients_list)].copy()
    installments_subset = installments[installments.index.isin(clients_list)].copy()
    
    # resetting indexes
    app_train_subset = app_train_subset.reset_index()
    bureau_subset = bureau_subset.reset_index()
    bureau_balance_subset = bureau_balance_subset.reset_index(drop=True)
    cash_subset = cash_subset.reset_index(drop=True)
    credit_subset = credit_subset.reset_index(drop=True)
    previous_subset = previous_subset.reset_index(drop=True)
    installments_subset = installments_subset.reset_index(drop=True)
    
    # saving
    # app_train_subset.to_csv('partitions/app_train' + str(num_partition) + '.csv', index=False)
    # bureau_subset.to_csv('partitions/bureau' + str(num_partition) + '.csv', index=False)
    # bureau_balance_subset.to_csv('partitions/bureau_balance' + str(num_partition) + '.csv', index=False)
    # cash_subset.to_csv('partitions/cash' + str(num_partition) + '.csv', index=False)
    # credit_subset.to_csv('partitions/credit' + str(num_partition) + '.csv', index=False)
    # previous_subset.to_csv('partitions/previous' + str(num_partition) + '.csv', index=False)
    # installments_subset.to_csv('partitions/installments' + str(num_partition) + '.csv', index=False)
    
    directory = 'partitions/part' + str(num_partition)
    os.makedirs(directory)
    app_train_subset.to_csv('partitions/part' + str(num_partition) + '/app_train.csv', index=False)
    bureau_subset.to_csv('partitions/part' + str(num_partition) + '/bureau.csv', index=False)
    bureau_balance_subset.to_csv('partitions/part' + str(num_partition) + '/bureau_balance.csv', index=False)
    cash_subset.to_csv('partitions/part' + str(num_partition) + '/cash.csv', index=False)
    credit_subset.to_csv('partitions/part' + str(num_partition) + '/credit.csv', index=False)
    previous_subset.to_csv('partitions/part' + str(num_partition) + '/previous.csv', index=False)
    installments_subset.to_csv('partitions/part' + str(num_partition) + '/installments.csv', index=False)
    return

In [39]:
batch_size = app_train.shape[0] // 80
clients_ids_list = [list(app_train.iloc[i : i+batch_size].index) for i in range(0, app_train.shape[0], batch_size)]

start = time.time()
for n, clients_ids in enumerate(clients_ids_list):
    partition_datasets(clients_ids, n)
end = time.time()
print((end - start) / 60, 'minutes')

11.831073566277821 minutes


In [17]:
# freeing some memory
del app_train, bureau, bureau_balance, previous, credit, cash, installments

### Entity sets

In [2]:
def create_entity_set_from_partition(path):
    """Creates an entity set for a partition"""
    num_partition = int(path[15:])
    
    # grabbing the data
    app_train = pd.read_csv(path + '/app_train.csv')
    bureau = pd.read_csv(path + '/bureau.csv')
    bureau_balance = pd.read_csv(path + '/bureau_balance.csv')
    cash = pd.read_csv(path + '/cash.csv')
    credit = pd.read_csv(path + '/credit.csv')
    previous = pd.read_csv(path + '/previous.csv')
    installments = pd.read_csv(path + '/installments.csv')
    
    # creating the entity set (see details in other notebook)
    es = ft.EntitySet(id='clients')
    es = es.entity_from_dataframe(entity_id='app_train', 
                                  dataframe=app_train, 
                                  index='SK_ID_CURR')
    es = es.entity_from_dataframe(entity_id='bureau', 
                                  dataframe=bureau, 
                                  index='SK_ID_BUREAU')
    es = es.entity_from_dataframe(entity_id='previous', 
                                  dataframe=previous, 
                                  index='SK_ID_PREV')
    es = es.entity_from_dataframe(entity_id='bureau_balance', 
                                  dataframe=bureau_balance, 
                                  make_index=True, 
                                  index='bureaubalance_index')
    es = es.entity_from_dataframe(entity_id='cash', 
                                  dataframe=cash, 
                                  make_index=True, 
                                  index='cash_index')
    es = es.entity_from_dataframe(entity_id='installments', 
                                  dataframe=installments,
                                  make_index=True, 
                                  index='installments_index')
    es = es.entity_from_dataframe(entity_id='credit', 
                                  dataframe=credit,
                                  make_index=True, 
                                  index='credit_index')
    
    # defining relationships (see details in other notebook) and adding them to the entity set
    rel_app_bureau = ft.Relationship(es['app_train']['SK_ID_CURR'], 
                                     es['bureau']['SK_ID_CURR'])
    rel_bureau_balance = ft.Relationship(es['bureau']['SK_ID_BUREAU'], 
                                         es['bureau_balance']['SK_ID_BUREAU'])
    rel_app_previous = ft.Relationship(es['app_train']['SK_ID_CURR'], 
                                       es['previous']['SK_ID_CURR'])
    rel_previous_cash = ft.Relationship(es['previous']['SK_ID_PREV'], 
                                        es['cash']['SK_ID_PREV'])
    rel_previous_installments = ft.Relationship(es['previous']['SK_ID_PREV'], 
                                                es['installments']['SK_ID_PREV'])
    rel_previous_credit = ft.Relationship(es['previous']['SK_ID_PREV'], 
                                          es['credit']['SK_ID_PREV'])
    es = es.add_relationships([rel_app_bureau, 
                           rel_bureau_balance, 
                           rel_app_previous,
                           rel_previous_cash, 
                           rel_previous_installments, 
                           rel_previous_credit])
    
    return {'es': es, 'number': num_partition}

### Computing feature matrices with DASK

In [3]:
# grabbing the features list created in the other notebook
features_list = ft.load_features('features.txt')
print(len(features_list))

1820


In [4]:
def compute_feature_matrix(es_dict, features_list):
    """Computes the feature matrix corresponding to defined features, for an entity set"""
    es = es_dict['es']
    num_partition = es_dict['number']
    feature_matrix = ft.calculate_feature_matrix(features_list,
                                                entityset=es,
                                                n_jobs=1,
                                                chunk_size=es['app_train'].df.shape[0])
    feature_matrix.to_csv('matrices_from_partitions/feature_matrix_part' + str(num_partition) + '.csv', index=True)
    return feature_matrix

In [5]:
paths_list = ['partitions/part' + str(i) for i in range(0, 81)]
paths_list[:5]

['partitions/part0',
 'partitions/part1',
 'partitions/part2',
 'partitions/part3',
 'partitions/part4']

In [6]:
# use all cores
client = Client(processes=False)

In [7]:
client.ncores()

{'inproc://10.36.117.196/12212/2': 4}

In [8]:
# creating a bag object
b = db.from_sequence(paths_list)

# mapping an entity set function and a feature matrix function
b = b.map(create_entity_set_from_partition)
b = b.map(compute_feature_matrix, features_list=features_list)
    
b

dask.bag<map-com..., npartitions=81>

In [9]:
# computing the features matrices
start = time.time()
b.compute()
end = time.time()
print((end - start) / 60, 'minutes')

[            TARGET NAME_CONTRACT_TYPE CODE_GENDER FLAG_OWN_CAR  \
 SK_ID_CURR                                                       
 100002         1.0         Cash loans           M            N   
 100003         0.0         Cash loans           F            N   
 100004         0.0    Revolving loans           M            Y   
 100006         0.0         Cash loans           F            N   
 100007         0.0         Cash loans           M            N   
 100008         0.0         Cash loans           M            N   
 100009         0.0         Cash loans           F            Y   
 100010         0.0         Cash loans           M            Y   
 100011         0.0         Cash loans           F            N   
 100012         0.0    Revolving loans           M            N   
 100014         0.0         Cash loans           F            N   
 100015         0.0         Cash loans           F            N   
 100016         0.0         Cash loans           F            

### Building the final dataframe

In [10]:
matrices_paths_list = ['matrices_from_partitions/feature_matrix_part' + str(i) for i in range(0, 81)]
matrices_paths_list[:5]

['matrices_from_partitions/feature_matrix_part0',
 'matrices_from_partitions/feature_matrix_part1',
 'matrices_from_partitions/feature_matrix_part2',
 'matrices_from_partitions/feature_matrix_part3',
 'matrices_from_partitions/feature_matrix_part4']

In [13]:
matrices = [pd.read_csv(path + '.csv', low_memory=False) for path in matrices_paths_list]

In [15]:
final_feat_matrix = pd.concat(matrices, axis=0)
print(final_feat_matrix.shape)

(307511, 1821)


In [None]:
final_feat_matrix.to_csv('final_features_matrix.csv', index=False)