# Scaling Featuretools with Dask

* https://dask.pydata.org/en/latest/
* "Dask provides advanced parallelism for analytics, enabling performance at scale for the tools you love"
* "Dask's schedulers scale to thousand-node clusters and its algorithms have been tested on some of the largest supercomputers in the world."
* Works with NumPy, Pandas, Scikit-Learn. Mimic their APIs.

In [23]:
import os
from datetime import datetime
from glob import glob

import numpy as np
import pandas as pd
import featuretools as ft

from dask import bag
from dask.diagnostics import ProgressBar
from featuretools.primitives import *

In [5]:
pbar = ProgressBar()
pbar.register()

## 1. Partition data

In [3]:
# data is taken from kaggle.com/c/talkingdata-adtracking-fraud-detection
input_file = '../data/train_sample.csv'
output_dir = "../data/partitioned"

def partition_by(df, column, output_dir):
    directory = f"{output_dir}/{column}"
    if not os.path.exists(directory):
        os.makedirs(directory)
    df.groupby(column).apply(lambda x: x.to_csv(f"{directory}/train_{x.name}.csv", index=False))

partition_by(pd.read_csv(input_file), 'app', output_dir)

## 2. Create distributed EntitySets

In [6]:
input_path = '../data/partitioned/app'

dtypes = {
    'ip': 'uint32',
    'app': 'uint16',
    'device': 'uint16',
    'os': 'uint16',
    'channel': 'uint16',
    'is_attributed': 'uint8'
}
to_read = ['app', 'device', 'os', 'channel', 'is_attributed', 'click_time']
to_parse = ['click_time']

In [7]:
filenames = glob(f"{input_path}/train_*.csv")

In [8]:
def createEntitySet(filename):
    df = pd.read_csv(filename, usecols=to_read, dtype=dtypes, parse_dates=to_parse)
    df['id'] = range(len(df))
    
    es = ft.EntitySet(id='clicks')
    es = es.entity_from_dataframe(
        entity_id='clicks',
        dataframe=df,
        index='id',
        time_index='click_time',
        
        variable_types={
            'app': ft.variable_types.Categorical,
            'device': ft.variable_types.Categorical,
            'os': ft.variable_types.Categorical,
            'channel': ft.variable_types.Categorical,
            'is_attributed': ft.variable_types.Boolean,
        }
    )

    es = es.normalize_entity(base_entity_id='clicks', new_entity_id='apps', index='app', make_time_index=False)
    es.add_last_time_indexes()
    return es

In [9]:
b = bag.from_sequence(filenames)
entity_sets = b.map(createEntitySet)

## 3. Calculate feature matrices and definitions

In [11]:
def calc_feature_matrix(es, entity_id, cutoff_time):
    feature_matrix, feature_defs = ft.dfs(
        entityset=es,
        target_entity=entity_id,
        cutoff_time=cutoff_time,
        training_window=ft.Timedelta("3 days"),
        max_depth=3
    )

    return feature_matrix, feature_defs

In [26]:
# For the sake of simplicity we take predefined time
cutoff_time = datetime.datetime(2017, 11, 9, 15, 59, 51)

datetime.datetime(2017, 11, 9, 15, 59, 51)

In [29]:
feature_matrices = entity_sets.map(calc_feature_matrix, entity_id='apps', cutoff_time=cutoff_time)

## 4. Compute the distributed features

In [30]:
out = feature_matrices.compute()
_, feature_defs = out[0]
feature_matrices = list(map(list, zip(*out)))[0]
feature_matrix = pd.concat(feature_matrices)

[########################################] | 100% Completed |  4.4s
[########################################] | 100% Completed |  4.5s


In [31]:
feature_defs

[<Feature: COUNT(clicks)>,
 <Feature: PERCENT_TRUE(clicks.is_attributed)>,
 <Feature: NUM_UNIQUE(clicks.device)>,
 <Feature: NUM_UNIQUE(clicks.os)>,
 <Feature: NUM_UNIQUE(clicks.channel)>,
 <Feature: MODE(clicks.device)>,
 <Feature: MODE(clicks.os)>,
 <Feature: MODE(clicks.channel)>,
 <Feature: NUM_UNIQUE(clicks.DAY(click_time))>,
 <Feature: NUM_UNIQUE(clicks.YEAR(click_time))>,
 <Feature: NUM_UNIQUE(clicks.MONTH(click_time))>,
 <Feature: NUM_UNIQUE(clicks.WEEKDAY(click_time))>,
 <Feature: MODE(clicks.DAY(click_time))>,
 <Feature: MODE(clicks.YEAR(click_time))>,
 <Feature: MODE(clicks.MONTH(click_time))>,
 <Feature: MODE(clicks.WEEKDAY(click_time))>]

In [32]:
feature_matrix

Unnamed: 0_level_0,COUNT(clicks),PERCENT_TRUE(clicks.is_attributed),NUM_UNIQUE(clicks.device),NUM_UNIQUE(clicks.os),NUM_UNIQUE(clicks.channel),MODE(clicks.device),MODE(clicks.os),MODE(clicks.channel),NUM_UNIQUE(clicks.DAY(click_time)),NUM_UNIQUE(clicks.YEAR(click_time)),NUM_UNIQUE(clicks.MONTH(click_time)),NUM_UNIQUE(clicks.WEEKDAY(click_time)),MODE(clicks.DAY(click_time)),MODE(clicks.YEAR(click_time)),MODE(clicks.MONTH(click_time)),MODE(clicks.WEEKDAY(click_time))
app,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
163,1,0.000000,1,1,1,0,0,4,1,1,1,1,9,2017,11,3
19,478,0.146444,76,14,8,0,24,213,4,1,1,4,9,2017,11,3
134,4,0.000000,3,3,1,3032,607,347,2,1,1,2,7,2017,11,1
538,1,0.000000,1,1,1,3032,607,347,1,1,1,1,7,2017,11,1
48,3,0.333333,1,2,1,1,13,213,1,1,1,1,7,2017,11,1
16,3,0.000000,1,2,1,1,18,268,2,1,1,2,7,2017,11,1
202,6,0.166667,1,5,1,1,6,421,3,1,1,3,7,2017,11,1
115,1,1.000000,1,1,1,1,22,203,1,1,1,1,9,2017,11,3
8,2004,0.001996,3,51,3,1,19,145,4,1,1,4,9,2017,11,3
32,286,0.003497,2,37,2,1,19,376,4,1,1,4,9,2017,11,3
