# Featuretools Implementation with Dask

A simple run of Deep Feature Synthesis from the Automated Loan Repayment notebook takes about 25 hours on an AWS machine with 64 GB of RAM! Featuretools does have support for parallel processing if you have multiple cores (which nearly every single laptop now does), but it currently sends the entire EntitySet to each process which means you might exhaust the memory on any one core. For example, that AWS machine has 8 GB per core, which might seem like a lot until you realize the EntitySet takes up about 11 GB and setting `n_jobs=-1` will cause an out of memory error. Therefore, we cannot use the parallel processing in Featuretools and instead have to build our own implementation with Dask. Fortunately, options such as Dask make it easy to take advantage of multiple cores on our own machine. In this notebook, we'll see how to run Deep Feature Synthesis in about 3 hours on a personal laptop with 16 GB of RAM. 

## Roadmap

Following is our plan of action for implementing Dask

1. Convert `object` data types to `category`
    * This reduces memory consumption significantly
2. Create 100 partitions of data and save to disk
    * Each partition will contain data from all 7 seven tables for 1/100 of the client ids, `SK_ID_CURR`
    * Each partition can be used to make an EntitySet and hence a feature matrix
3. Write a function to take a partition and create an `EntitySet`
4. Write a function to take an `EntitySet` and calculate a `feature_matrix`
    * Since we already have the feature names, we can use `ft.calculate_feature_matrix`
5. Use Dask with system processes to generate feature matrices for 8 partitions at a time
    * Save these subset feature matrices to disk
    * Using proceses will start 8 workers, one for each core, with 2 GB of memory each
    * We can't generate the entire feature matrix at once using processes because the final feature matrix is too large to fit on a single core
6. Use Dask with threads to read in subset feature matrices and create final feature matrix
    * Using threads will start 1 worker with 16 GB of memory, enough to hold the entire feature matrix
    * Can save this feature matrix to disk for later use in a machine learning pipeline
    
This might seem like a lot of tasks, but each one is a relatively simple step. At the end, we'll have a working implementation of Dask that lets us take full advantage of our computing resources. While we could solve this whole problem by just renting a larger machine, this approach will give us a chance to learn about how to work with constraints and engineer a solution. Sometimes having too many resources can limit your creativity, and working with constraints forces us to be innovative! 

In [100]:
# pandas and numpy for data manipulation
import pandas as pd
import numpy as np

# featuretools for automated feature engineering
import featuretools as ft
import featuretools.variable_types as vtypes

# Utilities
import sys
import psutil
import os

In [110]:
def convert_types(df):
    # Iterate through each column
    for c in df:
        
        # Convert ids and booleans to integers
        if ('SK_ID' in c):
            df[c] = df[c].fillna(0).astype(np.int32)
            
        # Convert objects to category
        elif (df[c].dtype == 'object') and (df[c].nunique() < df.shape[0]):
            df[c] = df[c].astype('category')
        
        # Booleans mapped to integers
        elif list(df[c].unique()) == [1, 0]:
            df[c] = df[c].astype(bool)
        
        # Float64 to float32
        elif df[c].dtype == float:
            df[c] = df[c].astype(np.float32)
            
        # Int64 to int32
        elif df[c].dtype == int:
            df[c] = df[c].astype(np.int32)
        
    return df

In [102]:
# Read in the datasets and replace the anomalous values
app_train = pd.read_csv('../input/application_train.csv').replace({365243: np.nan})
app_test = pd.read_csv('../input/application_test.csv').replace({365243: np.nan})
bureau = pd.read_csv('../input/bureau.csv').replace({365243: np.nan})
bureau_balance = pd.read_csv('../input/bureau_balance.csv').replace({365243: np.nan})
cash = pd.read_csv('../input/POS_CASH_balance.csv').replace({365243: np.nan})
credit = pd.read_csv('../input/credit_card_balance.csv').replace({365243: np.nan})
previous = pd.read_csv('../input/previous_application.csv').replace({365243: np.nan})
installments = pd.read_csv('../input/installments_payments.csv').replace({365243: np.nan})

app_test['TARGET'] = np.nan

# Join together training and testing
app = app_train.append(app_test, ignore_index = True, sort = True)

# Need `SK_ID_CURR` in every dataset
bureau_balance = bureau_balance.merge(bureau[['SK_ID_CURR', 'SK_ID_BUREAU']], 
                                      on = 'SK_ID_BUREAU', how = 'left')

print(f"""Total memory before converting types: \
{round(np.sum([x.memory_usage().sum() / 1e9 for x in 
[app, bureau, bureau_balance, cash, credit, previous, installments]]), 2)} gb.""")

# Convert types to reduce memory usage
app = convert_types(app)
bureau = convert_types(bureau)
bureau_balance = convert_types(bureau_balance)
cash = convert_types(cash)
credit = convert_types(credit)
previous = convert_types(previous)
installments = convert_types(installments)

print(f"""Total memory after converting types: \
{round(np.sum([x.memory_usage().sum() / 1e9 for x in 
[app, bureau, bureau_balance, cash, credit, previous, installments]]), 2)} gb.""")

# Set the index for locating
for dataset in [app, bureau, bureau_balance, cash, credit, previous, installments]:
    dataset.set_index('SK_ID_CURR', inplace = True)

Total memory before converting types: 4.38 gb.
Total memory after converting types: 2.06 gb.


In [90]:
print('Object memory usage.')
print(bureau['CREDIT_TYPE'].astype('object').memory_usage() / 1e9, 'gb')

print('Category memory usage.')
print(bureau['CREDIT_TYPE'].astype('category').memory_usage() / 1e9, 'gb')

print('Length of data: ', bureau.shape[0])
print('Number of unique categories: ', bureau['CREDIT_TYPE'].nunique())

Object memory usage.
0.027462848 gb
Category memory usage.
0.015448612 gb
Length of data:  1716428
Number of unique categories:  15


# Partitioning Data

Next, we partition the data into 104 separate datasets based on the client id, `SK_ID_CURR`. Each partition by itself can be used to make an `EntitySet` and later a feature matrix. One partition will contain seven data tables, each with only the data associated with the set clients. 104 partitions is sort of an arbitrary number and it might be worth exploring other options to see which works best.

In [115]:
def create_partition(user_list, partition):
    """Creates a dataset with only the users in `user_list`."""

    
    # Make the directory
    directory = '../input/partitions/p%d' % (partition + 1)
    if os.path.exists(directory):
        return
    
    else:
        os.makedirs(directory)
        
        # Subset based on user list
        app_subset = app[app.index.isin(user_list)].copy().reset_index()
        bureau_subset = bureau[bureau.index.isin(user_list)].copy().reset_index()

        # Drop SK_ID_CURR from bureau_balance, cash, credit, and installments
        bureau_balance_subset = bureau_balance[bureau_balance.index.isin(user_list)].copy().reset_index(drop = True)
        cash_subset = cash[cash.index.isin(user_list)].copy().reset_index(drop = True)
        credit_subset = credit[credit.index.isin(user_list)].copy().reset_index(drop = True)
        previous_subset = previous[previous.index.isin(user_list)].copy().reset_index()
        installments_subset = installments[installments.index.isin(user_list)].copy().reset_index(drop = True)
        

        # Save data to the directory
        app_subset.to_csv('%s/app.csv' % directory, index = False)
        bureau_subset.to_csv('%s/bureau.csv' % directory, index = False)
        bureau_balance_subset.to_csv('%s/bureau_balance.csv' % directory, index = False)
        cash_subset.to_csv('%s/cash.csv' % directory, index = False)
        credit_subset.to_csv('%s/credit.csv' % directory, index = False)
        previous_subset.to_csv('%s/previous.csv' % directory, index = False)
        installments_subset.to_csv('%s/installments.csv' % directory, index = False)

        if partition % 10 == 0:
            print('Saved all files in partition {} to {}.'.format(partition + 1, directory))

In [116]:
# Break into 104 chunks
chunk_size = app.shape[0] // 103

# Construct an id list
id_list = [list(app.iloc[i:i+chunk_size].index) for i in range(0, app.shape[0], chunk_size)]

In [117]:
from itertools import chain

# Sanity check that we have not missed any ids
print('Number of ids in id_list:         {}.'.format(len(list(chain(*id_list)))))
print('Total length of application data: {}.'.format(len(app)))

Number of ids in id_list:         356255.
Total length of application data: 356255.


In [None]:
start = timer()
for i, ids in enumerate(id_list):
    # Create a partition based on the ids
    create_partition(ids, i)
    
end = timer()
print(f'Partitioning took {round(end - start)} seconds.')

Saved all files in partition 1 to ../input/partitions/p1.
Saved all files in partition 11 to ../input/partitions/p11.
Saved all files in partition 21 to ../input/partitions/p21.
Saved all files in partition 31 to ../input/partitions/p31.
Saved all files in partition 41 to ../input/partitions/p41.


We can independently generate the feature matrix for each partition because the partition contains all the data for that group of clients. These partitioned feature matrices can then be joined together into larger feature matrices, and eventually one single matrix with all of the clients.

#### Load in Feature names

We already calculated the feature names, so we can read them in. This avoids the need to have to recalculate the features on each partition. Instead of using `ft.dfs`, if we have the feature names, we can use `ft.calculate_feature_matrix` and pass in the `EntitySet` and the feature names.

In [None]:
featurenames = ft.load_features('../input/feature_names.txt')
print(len(featurenames))

For each feature matrix, we'll make 1820 features! 

#### Variable Types

If the Automated notebook, we specified the variable types when adding entities to the entityset. However, since we already properly defined the data types for each column, Featuretools will now infer the correct variable type. For example, while before we have Booleans mapped to integers which would be interpreted as numeric, now the Booleans are represented as Booleans and hence will be correctly inferred by Featuretools.

In [None]:
# app_types = {'FLAG_CONT_MOBILE': vtypes.Boolean, 'FLAG_DOCUMENT_10': vtypes.Boolean, 'FLAG_DOCUMENT_11': vtypes.Boolean, 'FLAG_DOCUMENT_12': vtypes.Boolean, 'FLAG_DOCUMENT_13': vtypes.Boolean, 'FLAG_DOCUMENT_14': vtypes.Boolean, 'FLAG_DOCUMENT_15': vtypes.Boolean, 'FLAG_DOCUMENT_16': vtypes.Boolean, 'FLAG_DOCUMENT_17': vtypes.Boolean, 'FLAG_DOCUMENT_18': vtypes.Boolean, 'FLAG_DOCUMENT_19': vtypes.Boolean, 'FLAG_DOCUMENT_2': vtypes.Boolean, 'FLAG_DOCUMENT_20': vtypes.Boolean, 'FLAG_DOCUMENT_21': vtypes.Boolean, 'FLAG_DOCUMENT_3': vtypes.Boolean, 'FLAG_DOCUMENT_4': vtypes.Boolean, 'FLAG_DOCUMENT_5': vtypes.Boolean, 'FLAG_DOCUMENT_6': vtypes.Boolean, 'FLAG_DOCUMENT_7': vtypes.Boolean, 'FLAG_DOCUMENT_8': vtypes.Boolean, 'FLAG_DOCUMENT_9': vtypes.Boolean, 'FLAG_EMAIL': vtypes.Boolean, 'FLAG_EMP_PHONE': vtypes.Boolean, 'FLAG_MOBIL': vtypes.Boolean, 'FLAG_PHONE': vtypes.Boolean, 'FLAG_WORK_PHONE': vtypes.Boolean, 'LIVE_CITY_NOT_WORK_CITY': vtypes.Boolean, 'LIVE_REGION_NOT_WORK_REGION': vtypes.Boolean, 'REG_CITY_NOT_LIVE_CITY': vtypes.Boolean, 'REG_CITY_NOT_WORK_CITY': vtypes.Boolean, 'REG_REGION_NOT_LIVE_REGION': vtypes.Boolean, 'REG_REGION_NOT_WORK_REGION': vtypes.Boolean, 'REGION_RATING_CLIENT': vtypes.Ordinal, 'REGION_RATING_CLIENT_W_CITY': vtypes.Ordinal, 'HOUR_APPR_PROCESS_START': vtypes.Ordinal}
# previous_types = {'NFLAG_LAST_APPL_IN_DAY': vtypes.Boolean, 
#              'NFLAG_INSURED_ON_APPROVAL': vtypes.Boolean}

## Function to Create EntitySet from Partition 

The next function takes a single partition of data and make an `EntitySet`. We won't save these entitysets to disk, but instead will use them in Dask. Therefore, if we want to make any changes to the `EntitySet`, such as adding in interesting values or seed features, we can alter this function and remake the `EntitySet` without having to rewrite all the Entity Sets on disk. Writing the entity sets to disk would be another option if we are sure that they won't ever change. For greater flexibility, we write the data partitions to disk (as done above). 

In [None]:
def entityset_from_partition(path):
    """Create an EntitySet from a partition of data specified as a path."""
    
    # Read in data
    app = pd.read_csv('%s/app.csv' % path)
    bureau = pd.read_csv('%s/bureau.csv' % path)
    bureau_balance = pd.read_csv('%s/bureau_balance.csv' % path)
    previous = pd.read_csv('%s/previous.csv' % path)
    credit = pd.read_csv('%s/credit.csv' % path)
    installments = pd.read_csv('%s/installments.csv' % path)
    cash = pd.read_csv('%s/cash.csv' % path)
    
    # Empty entityset
    es = ft.EntitySet(id = 'clients')
    
    # Entities with a unique index
    es = es.entity_from_dataframe(entity_id = 'app', dataframe = app, index = 'SK_ID_CURR',
                                  variable_types = app_types)

    es = es.entity_from_dataframe(entity_id = 'bureau', dataframe = bureau, index = 'SK_ID_BUREAU')

    es = es.entity_from_dataframe(entity_id = 'previous', dataframe = previous, index = 'SK_ID_PREV',
                                  variable_types = previous_types)

    # Entities that do not have a unique index
    es = es.entity_from_dataframe(entity_id = 'bureau_balance', dataframe = bureau_balance, 
                                  make_index = True, index = 'bureaubalance_index')

    es = es.entity_from_dataframe(entity_id = 'cash', dataframe = cash, 
                                  make_index = True, index = 'cash_index')

    es = es.entity_from_dataframe(entity_id = 'installments', dataframe = installments,
                                  make_index = True, index = 'installments_index')

    es = es.entity_from_dataframe(entity_id = 'credit', dataframe = credit,
                                  make_index = True, index = 'credit_index')
    
    # Relationship between app_train and bureau
    r_app_bureau = ft.Relationship(es['app']['SK_ID_CURR'], es['bureau']['SK_ID_CURR'])

    # Relationship between bureau and bureau balance
    r_bureau_balance = ft.Relationship(es['bureau']['SK_ID_BUREAU'], es['bureau_balance']['SK_ID_BUREAU'])

    # Relationship between current app and previous apps
    r_app_previous = ft.Relationship(es['app']['SK_ID_CURR'], es['previous']['SK_ID_CURR'])

    # Relationships between previous apps and cash, installments, and credit
    r_previous_cash = ft.Relationship(es['previous']['SK_ID_PREV'], es['cash']['SK_ID_PREV'])
    r_previous_installments = ft.Relationship(es['previous']['SK_ID_PREV'], es['installments']['SK_ID_PREV'])
    r_previous_credit = ft.Relationship(es['previous']['SK_ID_PREV'], es['credit']['SK_ID_PREV'])
    
    # Add in the defined relationships
    es = es.add_relationships([r_app_bureau, r_bureau_balance, r_app_previous,
                               r_previous_cash, r_previous_installments, r_previous_credit])

    return es

Let's test the function to make sure it can make an `EntitySet` from a data partition.

In [None]:
es1 = entityset_from_partition('../input/partitions/p1')
es1

The function looks like it works as intended. The next step is to write a function that can take a single `EntitySet` and the `features` we want to build, and make a feature matrix. This is simple using `ft.calculate_feature_matrix`. 

# Function to Create Feature Matrix from EntitySet 

With the entity set and the feature names, generating the feature matrix is a one-liner in Featuretools. Since we are going to use Dask for parallelizing the operation, we'll set the number of jobs to 1. The `chunk_size` is an extremely important parameter, and I'd suggest experimenting with this to find the optimal value. Since we aren't getting any updates, it might make sense to set the chunk size as large as possible. We can try setting it to the length of the `app` dataframe for each entityset. 

In [None]:
def feature_matrix_from_entityset(es, feature_names):
    """Run deep feature synthesis from an entityset and feature names"""

    feature_matrix = ft.calculate_feature_matrix(feature_names, 
                                                 entityset=es, 
                                                 n_jobs = 1, 
                                                 verbose = 0,
                                                 chunk_size = es['app'].df.shape[0])
    
    return feature_matrix

Below we test the function using the entityset from the first partition.

In [None]:
fm1 = feature_matrix_from_entityset(es1, featurenames)
fm1.shape

We have all the parts needed to create our feature matrixes. The last step is to get Dask to run this in parallel. For the dask implementation, we'll set `n_jobs = 1` and `verbose = 0` because Dask already runs the calculation in parallel and because Dask should have a progress bar that we can view.

# Dask

We will use the Dask utility `delayed` to parallelize the operation. We iterate through each path in a list of the partitions and tell dask to first create the entity set from the partition, then create the featurematrix from the entityset, and finally, append the feature matrix to a list of feature matrices. The last step is to `concat`enate all of the feature matrices together to get one final matrix that we save to disk.

In [9]:
from dask import delayed
from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler, CacheProfiler
from dask.distributed import Client

client = Client(processes = True)

from timeit import default_timer as timer

In [10]:
paths = ['../input/partitions/%s' % file for file in os.listdir('../input/partitions/')]
paths[:8]

['../input/partitions/p100',
 '../input/partitions/p4',
 '../input/partitions/p3',
 '../input/partitions/p101',
 '../input/partitions/p2',
 '../input/partitions/p5',
 '../input/partitions/p19',
 '../input/partitions/p26']

In [11]:
start_index = 1
overall_start = timer()

# Iterate through 8 paths at a time
for i, end_index in enumerate(range(9, len(paths) + 5, 8)):
    
    # Subset to the 8 paths
    if end_index > len(paths):
        subset_paths = paths[start_index:]
    else:
        subset_paths = paths[start_index: end_index]
    
    # Empty list of feature matrices
    fms = []

    # Iterate through the paths
    for path in subset_paths:

        # Make the entityset
        es = delayed(entityset_from_partition)(path)

        # Make the feature matrix and add to the list
        fm = delayed(feature_matrix_from_entityset)(es, feature_names = featurenames)
        fms.append(fm)

    # Final operation will be to concatenate together all of the feature matrices
    X = delayed(pd.concat)(fms, axis = 0)
    
    print(f"Starting feature matrix {i}")
    start = timer()
    feature_matrix = X.compute()
    end = timer()
    
    print(f"Feature Matrix {i} complete, Time Elapsed: {round(end - start, 2)} seconds.")
    
    # Save the feature matrix to disk
    feature_matrix.to_csv('../input/fm/%s.csv' % i, index = True)
    
    # Start index becomes previous ending index
    start_index = end_index

Starting feature matrix 0
Feature Matrix 0 complete, Time Elapsed: 838.17 seconds.
Starting feature matrix 1
Feature Matrix 1 complete, Time Elapsed: 993.07 seconds.
Starting feature matrix 2
Feature Matrix 2 complete, Time Elapsed: 975.8 seconds.
Starting feature matrix 3
Feature Matrix 3 complete, Time Elapsed: 1022.86 seconds.
Starting feature matrix 4
Feature Matrix 4 complete, Time Elapsed: 1009.8 seconds.
Starting feature matrix 5
Feature Matrix 5 complete, Time Elapsed: 954.63 seconds.
Starting feature matrix 6
Feature Matrix 6 complete, Time Elapsed: 895.52 seconds.
Starting feature matrix 7
Feature Matrix 7 complete, Time Elapsed: 904.09 seconds.
Starting feature matrix 8
Feature Matrix 8 complete, Time Elapsed: 903.8 seconds.
Starting feature matrix 9
Feature Matrix 9 complete, Time Elapsed: 905.32 seconds.
Starting feature matrix 10
Feature Matrix 10 complete, Time Elapsed: 933.32 seconds.
Starting feature matrix 11
Feature Matrix 11 complete, Time Elapsed: 929.27 seconds.
S

In [12]:
# Base directory for feature matrices
base = '../input/fm/'
fm_paths = [base + p for p in os.listdir(base) if '.csv' in p]

['../input/fm/6.csv',
 '../input/fm/7.csv',
 '../input/fm/5.csv',
 '../input/fm/4.csv',
 '../input/fm/0.csv',
 '../input/fm/1.csv',
 '../input/fm/3.csv',
 '../input/fm/2.csv',
 '../input/fm/10.csv',
 '../input/fm/11.csv',
 '../input/fm/12.csv',
 '../input/fm/9.csv',
 '../input/fm/8.csv']

In [13]:
# Start a new client with processes
client = Client(processes = False)

# Empty list for feature matrices
fms = []

# Iterate through the feature matrices
for path in fm_paths:
    # Read in each dataframe and append to list
    X = delayed(pd.read_csv)(path, index_col = 0)
    fms.append(X)

# Concatenate all the matrices together (append rows)
fm_out = delayed(pd.concat)(fms, axis = 0)

# Time how long operate takes
start = timer()
feature_matrix = fm_out.compute()
end = timer()

print(f'Time elasped: {round(end - start, 2)} seconds.')
overall_end = timer()

Time elasped: 97.33 seconds.


In [14]:
feature_matrix.shape

(352693, 1820)

The final feature matrix is exactly the expected shape: 

In [15]:
print(f'Total Time for Feature Matrix Calculation: {round(overall_start - overall_end, 2)}.')

Total Time for Feature Matrix Calculation: -12574.52.


In [None]:
# start = timer()

# # Context for progress and profiling
# with ProgressBar(), Profiler() as prof, ResourceProfiler(dt=0.25) as rprof, CacheProfiler() as cprof:
#     feature_matrix = X.compute()

# end = timer()

# print(f'Total time elapsed: {round(end - start)} seconds.')

In [None]:
# Save the feature matrix
# feature_matrix.to_csv('../input/feature_matrix.csv', chunksize = 1000)

# Visualizations of Run

We can use Bokeh and the built in plotting capabilities of Dask to plot the resources used during the computation. This is not necessary for the project but might be interesting for understanding how Dask operates. 

In [None]:
import bokeh
from bokeh.io import output_notebook
output_notebook()

In [None]:
prof.visualize()

In [None]:
with open('../input/prof.txt', 'w') as f:
    f.write(str(prof.results))
    
with open('../input/rprof.txt', 'w') as f:
    f.write(str(rprof.results))
    
with open('../input/cprof.txt', 'w') as f:
    f.write(str(cprof.results))

# Conclusions

In this notebook we used Dask to complete an operation that normally would have taken far more resources than we have available on our personal computer. Although featuretools supports parallel processing, there are still some issues that need to be worked out, and we can take matters into our own hands to get the job done! This will allow anyone with reasonable hardware to take advantage of featuretools! The next notebook is Sampling and Feature Selection where we limit reduce the numbers of features to allow for reasonable modeling times. 