In [1]:
import pandas as pd
import numpy as np

import featuretools as ft
import os
import warnings
warnings.filterwarnings('ignore')
warnings.filterwarnings('ignore', category=FutureWarning, module='woodwork.*')
warnings.filterwarnings('ignore', category=UserWarning, module='woodwork.*')


In [2]:
#!pip install --upgrade numpy pandas matplotlib seaborn woodwork featuretools scikit-learn pyarrow

In [3]:
#!pip install --upgrade dask distributed nodejs dask-labextension

In [4]:
def entityset_from_partition(path):
    
    """Create an EntitySet from a partition of data specified as a path.
      Returns a dictionary with the entityset and the number used for saving the feature matrix."""
    warnings.filterwarnings('ignore')
    warnings.filterwarnings('ignore', category=FutureWarning, module='woodwork.*')
    warnings.filterwarnings('ignore', category=UserWarning, module='woodwork.*')
    partition_num = int(path[20:])
    
    # Read in data
    app = pd.read_csv('%s/app.csv' % path)
    bureau = pd.read_csv('%s/bureau.csv' % path)
    bureau_balance = pd.read_csv('%s/bureau_balance.csv' % path)
    previous = pd.read_csv('%s/previous.csv' % path)
    credit = pd.read_csv('%s/credit.csv' % path)
    installments = pd.read_csv('%s/installments.csv' % path)
    cash = pd.read_csv('%s/cash.csv' % path)
    
    
    # Empty entityset
    es = ft.EntitySet(id = 'clients')
    
    es = es.add_dataframe(dataframe_name = 'app', dataframe = app, 
                                 index = 'SK_ID_CURR',)
    es = es.add_dataframe(dataframe_name = 'bureau', dataframe = bureau, 
                                 index = 'SK_ID_BUREAU', time_index='bureau_credit_application_date')
    
    es = es.add_dataframe(dataframe_name = 'previous', dataframe = previous, 
                                 index = 'SK_ID_PREV', time_index = 'previous_decision_date',)
    
    # Entities that do not have a unique index
    es = es.add_dataframe(dataframe_name = 'bureau_balance', dataframe = bureau_balance, 
                                 make_index = True, index = 'bb_index', time_index = 'bureau_balance_date'
                                 )
    
    es = es.add_dataframe(dataframe_name = 'cash', dataframe = cash, 
                                 make_index = True, index = 'cash_index', time_index = 'cash_balance_date'
                                 )
    
    es = es.add_dataframe(dataframe_name = 'installments', dataframe = installments,
                                 make_index = True, index = 'installments_index', time_index = 'installments_paid_date'
                                 )
    
    es = es.add_dataframe(dataframe_name = 'credit', dataframe = credit,
                                 make_index = True, index = 'credit_index', time_index = 'credit_balance_date'
                                 )
    
    es = es.add_relationship(parent_dataframe_name='app', 
                                 parent_column_name='SK_ID_CURR', 
                                 child_dataframe_name='bureau', 
                                 child_column_name='SK_ID_CURR')
    
    es = es.add_relationship(parent_dataframe_name='bureau', 
                                    parent_column_name='SK_ID_BUREAU', 
                                    child_dataframe_name='bureau_balance', 
                                    child_column_name='SK_ID_BUREAU')
    es = es.add_relationship(parent_dataframe_name='app', 
                                    parent_column_name='SK_ID_CURR', 
                                    child_dataframe_name='previous', 
                                    child_column_name='SK_ID_CURR')
    es = es.add_relationship(parent_dataframe_name='previous', 
                                    parent_column_name='SK_ID_PREV', 
                                    child_dataframe_name='cash', 
                                    child_column_name='SK_ID_PREV')
    es = es.add_relationship(parent_dataframe_name='previous', 
                                             parent_column_name='SK_ID_PREV', 
                                             child_dataframe_name='installments', 
                                             child_column_name='SK_ID_PREV')
    es = es.add_relationship(parent_dataframe_name='previous', 
                                       parent_column_name='SK_ID_PREV', 
                                       child_dataframe_name='credit', 
                                       child_column_name='SK_ID_PREV')

    return ({'es': es, 'num': partition_num})

In [5]:
def feature_matrix_from_entityset(es_dict, feature_defs, return_fm = False):
    import os

    """Run deep feature synthesis from an entityset and feature definitions. 
    Saves feature matrix based on partition.""" 
    
    # Extract the entityset
    es = es_dict['es']
    
    #Calculate the feature matrix and save
    feature_matrix = ft.calculate_feature_matrix(feature_defs,
                                                 entityset=es, 
                                                 n_jobs = 1, 
                                                 verbose = 0,
                                                 chunk_size = es['app'].shape[0])
    
    directory = './input/tm'
    if os.path.exists(directory):
        print("directory exists")
    else:
        os.makedirs(directory)

    feature_matrix.to_csv('./input/tm/t%d_fm.csv' % es_dict['num'], index = True)
    
    if return_fm:
        return feature_matrix

In [6]:
feature_defs = ft.load_features('./input/time_features.txt')
print(len(feature_defs))

450


In [7]:
import dask.bag as db
from dask.distributed import Client

# Use all 8 cores
client = Client(processes = True)

In [8]:
paths = ['./input/partitions/t%d' %  i for i in range(90, 105)]
#

In [9]:
# Create a bag object
b = db.from_sequence(paths)

# Map entityset function
b = b.map(entityset_from_partition)

# Map feature matrix function
b = b.map(feature_matrix_from_entityset, feature_defs = feature_defs)
with warnings.catch_warnings():
    warnings.simplefilter('ignore', FutureWarning) 
    b.compute()

In [None]:
base = './input/tm/'
fm_paths = [base + p for p in os.listdir(base) if 'tm.csv' in p]
fms = [pd.read_csv(path) for path in fm_paths]
feature_matrix = pd.concat(fms, axis = 0)

In [None]:
feature_matrix.to_csv("./input/time_features.csv")