# Using Featuretools with Dask to generate a feature matrix

<p style="margin:30px">
    <img width=50% src="https://www.featuretools.com/wp-content/uploads/2017/12/FeatureLabs-Logo-Tangerine-800.png" alt="Featuretools" />
</p>

In this tutorial, we show how [Featuretools](http://www.featuretools.com) can be used to perform feature engineering on a multi-table dataset of 3 million online grocery orders provided by Instacart. We will generate a feature matrix that can be used to train a machine learning model to predict what product a customer buys next.

*Note: This notebook requires a dataset from Instacart. You can download the dataset [here](https://www.instacart.com/datasets/grocery-shopping-2017). Once you have downloaded the data, be sure to place the CSV files contained in the archive in a directory called `data/instacart`. If you use a different directory name, you will need to update the code below to point to the proper location.*

## Highlights

* We demonstrate how to generate features in a scalable manner using [Dask](http://dask.pydata.org/en/latest/)
* We automatically generate label times using [Compose](https://github.com/FeatureLabs/compose) which can be reused for numerous prediction problems
* We save the resulting feature matrix to disk so it can be used in downstream machine learning models

In [None]:
import os
import composeml as cp
import featuretools as ft
import dask.dataframe as dd
import numpy as np
import pandas as pd

from dask.distributed import Client
ft.__version__

## Step 1. Load and preprocess data

First, we will create a Dask distributed client so we can track the progress of our computation on the Dask dashboard that is created when the client is initialized.

In [None]:
client = Client()
client

Next, we will specify our input and output directories and set the blocksize we will be using to read the raw CSV files into Dask dataframes. When running on a 4 processor machine with 16GB of memory available for the Dask workers, a `40MB` blocksize has worked well. This number may need to be adjusted based on your specific environment. Refer to the [Dask documentation](https://docs.dask.org/en/latest/best-practices.html) for additional info.

In [None]:
data_dir = os.path.join("data", "instacart")
output_dir = os.path.join("data", "instacart", "dask_data")
blocksize = "40MB"

Now we will read our data into Dask dataframes. This operation will complete quite fast as we are not actually bringing the data into memory at this stage.

In [None]:
order_products = dd.concat([dd.read_csv(os.path.join(data_dir, "order_products__prior.csv"), blocksize=blocksize),
                            dd.read_csv(os.path.join(data_dir, "order_products__train.csv"), blocksize=blocksize)])
orders = dd.read_csv(os.path.join(data_dir, "orders.csv"), blocksize=blocksize)
departments = dd.read_csv(os.path.join(data_dir, "departments.csv"), blocksize=blocksize)
products = dd.read_csv(os.path.join(data_dir, "products.csv"), blocksize=blocksize)

In the next few cells, we will perform some required preprocessing to clean up our data. We will merge together some of the raw dataframes and add absolute order time information from the relative times used in the raw data. This will allow us to use cutoff times as part of the Deep Feature Synthesis process.

In [None]:
order_products = order_products.merge(products).merge(departments)

In [None]:
def add_time(df):
    df.reset_index(drop=True)
    df["order_time"] = np.nan
    days_since = df.columns.tolist().index("days_since_prior_order")
    hour_of_day = df.columns.tolist().index("order_hour_of_day")
    order_time = df.columns.tolist().index("order_time")

    df.iloc[0, order_time] = pd.Timestamp('Jan 1, 2015') +  pd.Timedelta(df.iloc[0, hour_of_day], "h")
    for i in range(1, df.shape[0]):
        df.iloc[i, order_time] = df.iloc[i - 1, order_time] \
            + pd.Timedelta(df.iloc[i, days_since], "d") \
                                    + pd.Timedelta(df.iloc[i, hour_of_day], "h")

    to_drop = ["order_number", "order_dow", "order_hour_of_day", "days_since_prior_order", "eval_set"]
    df.drop(to_drop, axis=1, inplace=True)

    return df

In [None]:
orders = orders.groupby("user_id").apply(add_time)
order_products = order_products.merge(orders[["order_id", "order_time"]])
order_products["order_product_id"] = order_products["order_id"] * 1000 + order_products["add_to_cart_order"]
order_products = order_products.drop(["product_id", "department_id", "add_to_cart_order"], axis=1)

Now that the preprocessing work is complete, we will save the results to disk. This will allow us to start from this point in the process in the future, without having to repeat all of the preprocessing steps. If you have already saved the results to disk previously, you can skip the cell below.

#### Note: The process of saving to CSV is computationally intensive and may take 45 minutes or more, depending on the system you are using. You can use the Dask dashboard to monitor the progress.

In [None]:
orders.to_csv(os.path.join(output_dir, "orders-*.csv"), index=False)
order_products.to_csv(os.path.join(output_dir, "order_products-*.csv"), index=False)

If you have already performed the preprocessing steps and saved the processed files to disk, you can read them in with the commands in the following cell.

In [None]:
orders = dd.read_csv(os.path.join(output_dir, "orders-*.csv"), blocksize=blocksize)
order_products = dd.read_csv(os.path.join(output_dir, "order_products-*.csv"), blocksize=blocksize)

In [None]:
orders.head()

In [None]:
order_products.head()

## Step 2: Create a Featuretools entityset

When using Dask dataframes to create an entityset, variable type inference is not performed as it is with entitysets created from pandas dataframes. As a result, users must specify the Featuretools variable types for all of the columns in the dataframes that make up the entityset when using Dask. In the following cell we define the data types for the `order_products` and `orders` entities.

In [None]:
order_products_vtypes = {
    "order_id": ft.variable_types.Id,
    "reordered": ft.variable_types.Boolean,
    "product_name": ft.variable_types.Categorical,
    "aisle_id": ft.variable_types.Categorical,
    "department": ft.variable_types.Categorical,
    "order_time": ft.variable_types.Datetime,
    "order_product_id": ft.variable_types.Index,
}

order_vtypes = {
    "order_id": ft.variable_types.Index,
    "user_id": ft.variable_types.Id,
    "order_time": ft.variable_types.DatetimeTimeIndex,
}

Now that we have defined the data types, we can create the entityset and establish the relationship between the two entities.

In [None]:
es = ft.EntitySet("instacart")
es.entity_from_dataframe(entity_id="order_products",
                         dataframe=order_products,
                         index="order_product_id",
                         variable_types=order_products_vtypes,
                         time_index="order_time")

es.entity_from_dataframe(entity_id="orders",
                         dataframe=orders,
                         index="order_id",
                         variable_types=order_vtypes,
                         time_index="order_time")

es.add_relationship(ft.Relationship(es["orders"]["order_id"], es["order_products"]["order_id"]))

Next, we will normalize the `orders` entity to create a new `users` entity that we will later use as the target entity during the deep feature synthesis process.

In [None]:
es.normalize_entity(base_entity_id="orders", new_entity_id="users", index="user_id")

To finish up creation of the entity set we will add last time indexes and set some interesting values.

In [None]:
es.add_last_time_indexes()

In [None]:
es["order_products"]["department"].interesting_values = ['produce', 'dairy eggs', 'snacks', 'beverages', 'frozen', 'pantry', 'bakery', 'canned goods', 'deli', 'dry goods pasta']
es["order_products"]["product_name"].interesting_values = ['Banana', 'Bag of Organic Bananas', 'Organic Baby Spinach', 'Organic Strawberries', 'Organic Hass Avocado', 'Organic Avocado', 'Large Lemon', 'Limes', 'Strawberries', 'Organic Whole Milk']

## Step 3. Use Compose to generate our cutoff times dataframe

In the cells that follow we will demonstrate how [Compose](https://github.com/FeatureLabs/compose) can be used to generate the label times dataframe that will be used as cutoff times for deep feature synthesis.

In [None]:
def bought_product(df, product_name):
    purchased = df.product_name.str.contains(product_name).any()
    return purchased

In [None]:
lm = cp.LabelMaker(
    target_entity='user_id',
    time_index='order_time',
    labeling_function=bought_product,
    window_size='4w',
)

In [None]:
def denormalize(es):
    df = es['order_products'].df.merge(es['orders'].df).merge(es['users'].df)
    return df

Compose does not currently work on Dask dataframes, so we must first run `.compute()` on the denormalized entityset to switch to pandas.

In [None]:
df = denormalize(es).compute()

In [None]:
label_times = lm.search(
    df.sort_values('order_time'),
    minimum_data='2015-03-15',
    num_examples_per_instance=2,
    product_name='Banana',
    verbose=True,
)

## Step 4. Run Deep Feature Synthesis

At this point we are ready to run deep feature synthesis to generate our feature matrix. This will execute quickly and the resulting feature matrix will be returned as a Dask dataframe. This process does not cause the feature matrix to be computed or brought into memory.

In [None]:
feature_matrix, features = ft.dfs(target_entity="users",
                                  cutoff_time=label_times,
                                  entityset=es,
                                  trans_primitives=["day", "year", "month", "weekday", "num_words", "num_characters"],
                                  agg_primitives=["sum", "std", "max", "min", "mean", "count", "percent_true"],
                                  verbose=True)

Now that we have a Dask feature matrix, we can save it to disk for future use.

In [None]:
feature_matrix.to_csv(os.path.join(output_dir, "feature_matrix-*.csv"), index=False)

Finally, let's read back in the feature matrix we just saved, compute it and take a look at what we created.

In [None]:
fm = dd.read_csv(os.path.join(output_dir, "feature_matrix-*.csv")).compute()
fm.head()

In [None]:
fm.shape

In [None]:
print(f"Feature matrix memory usage: {fm.memory_usage().sum() / 1000000} MB")

Now that we have finished, we can close our Dask client.

In [None]:
client.close()