# Introduction: Feature Engineering with Spark

Problem: In `Feature Engineering`, we developed a pipeline for automated feature engineering using a dataset of customer transactions and label times. Running this pipeline on a single partition of customers takes about 15 minutes which means computing all of the features would require several days if done one at a time. 

Solution: Break the dataset into independent partitions of customers and run multiple subsets in parallel. This can be done using multiple processors on a single machine or a cluster of machines.

## Spark with PySpark

[Apache Spark](http://spark.apache.org) is a popular framework for distributed computed and large-data processing. It allows us to run computations in parallel either on a single machine, or distributed across a cluster of machines. In this notebook, we will run automated feature engineering in [Featuretools](https://github.com/Featuretools/featuretools) using Spark with the [PySpark library](http://spark.apache.org/docs/2.2.0/api/python/pyspark.html). 

The first step is initializing Spark. We can use the `findspark` library to make sure that `pyspark` can find Spark in the Jupyter Notebook. This notebook assumes the Spark cluster is already running. To get started with a Spark cluster, refer to [this guide](https://data-flair.training/blogs/install-apache-spark-multi-node-cluster/). 

(We'll skip the Featuretools details in this notebook, but for an introduction see [this article](https://towardsdatascience.com/automated-feature-engineering-in-python-99baf11cc219). For a comparison of manual to automated feature engineering, see [this article](https://towardsdatascience.com/why-automated-feature-engineering-will-change-the-way-you-do-machine-learning-5c15bf188b96). )

In [1]:
import findspark
# update based on your installation
findspark.init()

## Set up Spark 

A `SparkContext` is the interface to a running Spark cluster. We pass in a number of parameters to the `SparkContext` using a `SparkConf` object. Namely, we'll turn on logging, tell Spark to use 12 cores on our machine, and direct Spark to the location of the master (parent) node. 

Adjust the parameters depending on your cluster set up. I found [this guide](https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html) to be helpful in choosing the parameters.

In [2]:
import pyspark

# update based on your installation
conf = pyspark.SparkConf()

# Enable logging
conf.set('spark.eventLog.enabled', True);
conf.set('spark.eventLog.dir', './data/tmp/');

# Use all cores on all machines
conf.set('spark.num.executors', 1)
conf.set('spark.executor.memory', '12g')
conf.set('spark.executor.cores', 12)

# Set the parent
conf.set('spark.master', 'spark://AMB-R09BLVCJ:7077')
conf.getAll()

dict_items([('spark.eventLog.enabled', 'True'), ('spark.eventLog.dir', './data/tmp/'), ('spark.num.executors', '1'), ('spark.executor.memory', '12g'), ('spark.executor.cores', '12'), ('spark.master', 'spark://AMB-R09BLVCJ:7077')])

## Testing Spark 

Before we get to the feature engineering, we want to test if our cluster is running correctly. We'll instantiate a `Spark` cluster and run a simple program that calculates the value of pi. 

In [3]:
sc = pyspark.SparkContext(appName="pi_calc", 
                          conf = conf)
sc

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/09 16:33:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
num_samples = 100000000
import random

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

# Parallelize counting samples inside circle using Spark
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

                                                                                

3.14170524


### Spark Dashboards

After starting the Spark cluster  from the command line- before running any of the code in the notebook - you can view a dashboard of the cluster at localhost:8080. This shows basic information such as the number of workers and the currently running or completed jobs.


![](../images/spark_cluster_main.png)

Once a `SparkContext` has been initialized, the job can be viewed at localhost:4040. This shows particular details such as the number of tasks completed and the directed acyclic graph of the operation. 

![](../images/stages.png)

Using the web dashboard can be a helpful method to help debug your cluster. 

Once the cluster is running correctly, we can move on to feature engineering. 

## Data Storage

In previous notebooks, we partitioned our data and created feature matrices for the first 50 partitions. Normally, all of the reading and writing for running with Spark would happen through S3, but for this example we will use our local files.

In [5]:
import os

import pandas as pd
import featuretools as ft

import warnings
warnings.simplefilter('ignore')

partition = 20
CWD = os.getcwd()
directory = f'{CWD}/data/partitions/p' + str(partition)
cutoff_times_file = 'MS-31_labels.csv'


# Read in the data files
members = pd.read_csv(f'{directory}/members.csv', 
                  parse_dates=['registration_init_time'], 
                  infer_datetime_format = True, 
                  dtype = {'gender': 'category'})

trans = pd.read_csv(f'{directory}/transactions.csv',
                   parse_dates=['transaction_date', 'membership_expire_date'], 
                    infer_datetime_format = True)

logs = pd.read_csv(f'{directory}/logs.csv', parse_dates = ['date'])

cutoff_times = pd.read_csv(f'{directory}/{cutoff_times_file}', parse_dates = ['time'])
cutoff_times = cutoff_times.drop_duplicates(subset = ['msno', 'time'])

  from pandas import Int64Index as NumericIndex


# Feature Engineering

First we'll make the set of features using a single partiton so we don't have to recalculate them for each partition. This also ensures that the same exact features are made for each subset of customers. (It also is possible to load in calculated features from disk.) Again, I'm skipping the explanation for what is going on here so check out the [Featuretools documentation](https://featuretools.alteryx.com/) or some of the [online tutorials](https://www.featuretools.com/demos). 

### Features for One Partition

In [6]:
# Create empty entityset
es = ft.EntitySet(id = 'customers')

# Add the members parent table
es.add_dataframe(dataframe_name='members', dataframe=members,
                 index = 'msno', time_index = 'registration_init_time', 
                 logical_types = {'city': 'Categorical', 'bd': 'Categorical',
                                  'registered_via': 'Categorical'})
# Create new features in transactions
trans['price_difference'] = trans['plan_list_price'] - trans['actual_amount_paid']
trans['planned_daily_price'] = trans['plan_list_price'] / trans['payment_plan_days']
trans['daily_price'] = trans['actual_amount_paid'] / trans['payment_plan_days']

# Add the transactions child table
es.add_dataframe(dataframe_name='transactions', dataframe=trans,
                 index = 'transactions_index', make_index = True,
                 time_index = 'transaction_date', 
                 logical_types = {'payment_method_id': 'Categorical', 
                                  'is_auto_renew': 'Boolean', 'is_cancel': 'Boolean'})

# Add transactions interesting values
es.add_interesting_values(dataframe_name='transactions',
                          values={'is_cancel': [False, True],
                                  'is_auto_renew': [False, True]})

# Create new features in logs
logs['total'] = logs[['num_25', 'num_50', 'num_75', 'num_985', 'num_100']].sum(axis = 1)
logs['percent_100'] = logs['num_100'] / logs['total']
logs['percent_unique'] = logs['num_unq'] / logs['total']

# Add the logs child table
es.add_dataframe(dataframe_name='logs', dataframe=logs,
                 index = 'logs_index', make_index = True,
                 time_index = 'date')

# Add the relationships
r_member_transactions = ft.Relationship(es, 'members', 'msno', 'transactions', 'msno')
r_member_logs = ft.Relationship(es, 'members', 'msno', 'logs', 'msno')
es.add_relationships([r_member_transactions, r_member_logs])

es

Entityset: customers
  DataFrames:
    members [Rows: 6817, Columns: 6]
    transactions [Rows: 21992, Columns: 13]
    logs [Rows: 418190, Columns: 13]
  Relationships:
    transactions.msno -> members.msno
    logs.msno -> members.msno

## Custom Primitives

Below is a custom primitive we wrote (see the `Feature Engineering` notebook) for this dataset. It calculates the total amount of a quantity in the previous month.

In [7]:
def total_previous_month(numeric, datetime, time):
    """Return total of `numeric` column in the month prior to `time`."""
    df = pd.DataFrame({'value': numeric, 'date': datetime})
    previous_month = time.month - 1
    year = time.year
   
    # Handle January
    if previous_month == 0:
        previous_month = 12
        year = time.year - 1
        
    # Filter data and sum up total
    df = df[(df['date'].dt.month == previous_month) & (df['date'].dt.year == year)]
    total = df['value'].sum()
    
    return total

In [8]:
from featuretools.primitives import make_agg_primitive
from woodwork.column_schema import ColumnSchema
from woodwork.logical_types import Datetime

numeric = ColumnSchema(semantic_tags={'numeric'})
datetime = ColumnSchema(logical_type=Datetime)

# Takes in a number and outputs a number
total_previous = make_agg_primitive(total_previous_month, input_types = [numeric, datetime],
                                    return_type = numeric, 
                                    uses_calc_time = True)

#### Run Deep Feature Synthesis

The first time we create the features, we use `ft.dfs` passing in the selected primitives, the target dataframe, the critical `cutoff_time`, the depth of the feature to stack, and several other parameters. 

In [9]:
# Specify aggregation primitives
agg_primitives = ['sum', 'time_since_last', 'avg_time_between', 'num_unique', 'min', 'last', 
                  'percent_true', 'max', 'count']

# Specify transformation primitives
trans_primitives = ['is_weekend', 'cum_sum', 'day', 'month', 'time_since_previous']

# Specify where primitives
where_primitives = ['sum', 'mean', 'percent_true']

In [10]:
# Run deep feature synthesis
feature_matrix, feature_defs = ft.dfs(entityset=es, target_dataframe_name='members', 
                                      cutoff_time = cutoff_times, 
                                      agg_primitives = agg_primitives,
                                      trans_primitives = trans_primitives,
                                      where_primitives = where_primitives,
                                      max_depth = 2, features_only = False,
                                      chunk_size = 100, n_jobs = 1, verbose = 1)

Built 316 features
Elapsed: 24:05 | Progress: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████


These features definitions can be saved on disk. Every time we want to make the same exact features, we can just pass in these into the `ft.calculate_feature_matrix` function.

In [11]:
ft.save_features(feature_defs, f'{CWD}/data/features.txt')

In [12]:
feature_defs = ft.load_features(f'{CWD}/data/features.txt')
print(f'There are {len(feature_defs)} features.')

There are 316 features.


# Partition to Feature Matrix Function

The main function of this notebook is used to make features from a single partition. 

This function, `partition_to_feature_matrix`, does the following:

1. Takes in the name of a partition 
2. Reads the data from the partition directory
3. Creates an entityset from the data
4. Computes the feature matrix for the partition
5. Saves the feature matrix to the partition directory

Note, for time and disk space limitation reason, we will only run this on the first 20 partitions, instead of the full 1000 partitions that were created in the first notebook.

In [13]:
N_PARTITIONS = 20
BASE_DIR = f'{CWD}/data/partitions/'
    
def partition_to_feature_matrix(partition, feature_defs = feature_defs, 
                                cutoff_time_name = 'MS-31_labels.csv', write = True):
    """Take in a partition number, create a feature matrix, and save to disk
    
    Params
    --------
        partition (int): number of partition
        feature_defs (list of ft features): features to make for the partition
        cutoff_time_name (str): name of cutoff time file
        write: (boolean): whether to write the data to disk. Defaults to True
        
    Return
    --------
        None: saves the feature matrix to disk
    
    """
    
    partition_dir = BASE_DIR + 'p' + str(partition)
    
    # Read in the data files
    members = pd.read_csv(f'{partition_dir}/members.csv', 
                      parse_dates=['registration_init_time'], 
                      infer_datetime_format = True, 
                      dtype = {'gender': 'category'})

    trans = pd.read_csv(f'{partition_dir}/transactions.csv',
                       parse_dates=['transaction_date', 'membership_expire_date'], 
                        infer_datetime_format = True)
    logs = pd.read_csv(f'{partition_dir}/logs.csv', parse_dates = ['date'])
    
    # Make sure to drop duplicates
    cutoff_times = pd.read_csv(f'{partition_dir}/{cutoff_time_name}', parse_dates = ['time'])
    cutoff_times = cutoff_times.drop_duplicates(subset = ['msno', 'time'])
    
    # Needed for saving
    cutoff_spec = cutoff_time_name.split('_')[0]
    
    # Create empty entityset
    es = ft.EntitySet(id = 'customers')

    # Add the members parent table
    es.add_dataframe(dataframe_name='members', dataframe=members,
                     index = 'msno', time_index = 'registration_init_time', 
                     logical_types = {'city': 'Categorical',
                                      'registered_via': 'Categorical'})
    # Create new features in transactions
    trans['price_difference'] = trans['plan_list_price'] - trans['actual_amount_paid']
    trans['planned_daily_price'] = trans['plan_list_price'] / trans['payment_plan_days']
    trans['daily_price'] = trans['actual_amount_paid'] / trans['payment_plan_days']

    # Add the transactions child table
    es.add_dataframe(dataframe_name='transactions', dataframe=trans,
                     index = 'transactions_index', make_index = True,
                     time_index = 'transaction_date', 
                     logical_types = {'payment_method_id': 'Categorical', 
                                      'is_auto_renew': 'Boolean', 'is_cancel': 'Boolean'})

    # Add transactions interesting values
    es.add_interesting_values(dataframe_name='transactions',
                              values={'is_cancel': [False, True],
                                      'is_auto_renew': [False, True]})
    
    # Create new features in logs
    logs['total'] = logs[['num_25', 'num_50', 'num_75', 'num_985', 'num_100']].sum(axis = 1)
    logs['percent_100'] = logs['num_100'] / logs['total']
    logs['percent_unique'] = logs['num_unq'] / logs['total']
    logs['seconds_per_song'] = logs['total_secs'] / logs['total'] 
    
    # Add the logs child table
    es.add_dataframe(dataframe_name='logs', dataframe=logs,
                     index = 'logs_index', make_index = True,
                     time_index = 'date')

    # Add the relationships
    r_member_transactions = ft.Relationship(es, 'members', 'msno', 'transactions', 'msno')
    r_member_logs = ft.Relationship(es, 'members', 'msno', 'logs', 'msno')
    es.add_relationships([r_member_transactions, r_member_logs])
    
    # Calculate the feature matrix using pre-calculated features
    feature_matrix = ft.calculate_feature_matrix(entityset=es, features=feature_defs, 
                                                 cutoff_time=cutoff_times, cutoff_time_in_index = True,
                                                 chunk_size = 1000)

    if write:
        bytes_to_write = feature_matrix.to_csv(None).encode()

        with open(f'{partition_dir}/{cutoff_spec}_feature_matrix.csv', 'wb') as f:
            f.write(bytes_to_write)

### Test Function

Let's give the function a test with 2 different partitions.

In [14]:
from timeit import default_timer as timer

start = timer()
partition_to_feature_matrix(10, feature_defs, 'MS-31_labels.csv', write=True)
end = timer()
print(f'{round(end - start)} seconds elapsed.')

227 seconds elapsed.


In [15]:
feature_matrix = pd.read_csv(f'{BASE_DIR}/p10/MS-31_feature_matrix.csv', low_memory=False)
feature_matrix.head()

Unnamed: 0,msno,time,city,bd,gender,registered_via,AVG_TIME_BETWEEN(transactions.transaction_date),COUNT(transactions),LAST(transactions.actual_amount_paid),LAST(transactions.daily_price),...,DAY(LAST(transactions.transaction_date)),IS_WEEKEND(LAST(logs.date)),IS_WEEKEND(LAST(transactions.membership_expire_date)),IS_WEEKEND(LAST(transactions.transaction_date)),MONTH(LAST(logs.date)),MONTH(LAST(transactions.membership_expire_date)),MONTH(LAST(transactions.transaction_date)),label,days_to_churn,churn_date
0,+2DC4F7/bSQhhmmAW/fys80YqMcQgTvZpvpntkPsnX0=,2015-01-01,5.0,57.0,female,9.0,,1,149.0,4.806452,...,1.0,False,True,False,,2.0,1.0,0.0,,
1,+2DC4F7/bSQhhmmAW/fys80YqMcQgTvZpvpntkPsnX0=,2015-02-01,5.0,57.0,female,9.0,2678400.0,2,149.0,4.806452,...,1.0,True,True,True,2.0,3.0,2.0,0.0,,
2,+2DC4F7/bSQhhmmAW/fys80YqMcQgTvZpvpntkPsnX0=,2015-03-01,5.0,57.0,female,9.0,2548800.0,3,149.0,4.806452,...,1.0,False,False,True,2.0,4.0,3.0,0.0,,
3,+2DC4F7/bSQhhmmAW/fys80YqMcQgTvZpvpntkPsnX0=,2015-04-01,5.0,57.0,female,9.0,2548800.0,3,149.0,4.806452,...,1.0,False,False,True,4.0,4.0,3.0,0.0,,
4,+2DC4F7/bSQhhmmAW/fys80YqMcQgTvZpvpntkPsnX0=,2015-05-01,5.0,57.0,female,9.0,2620800.0,4,149.0,4.806452,...,2.0,False,False,False,5.0,5.0,4.0,0.0,,


In [16]:
start = timer()
partition_to_feature_matrix(19, feature_defs, 'MS-31_labels.csv', write=True)
end = timer()
print(f'{round(end - start)} seconds elapsed.')

269 seconds elapsed.


In [17]:
feature_matrix = pd.read_csv(f'{BASE_DIR}/p19/MS-31_feature_matrix.csv', low_memory = False)
feature_matrix.head()

Unnamed: 0,msno,time,city,bd,gender,registered_via,AVG_TIME_BETWEEN(transactions.transaction_date),COUNT(transactions),LAST(transactions.actual_amount_paid),LAST(transactions.daily_price),...,DAY(LAST(transactions.transaction_date)),IS_WEEKEND(LAST(logs.date)),IS_WEEKEND(LAST(transactions.membership_expire_date)),IS_WEEKEND(LAST(transactions.transaction_date)),MONTH(LAST(logs.date)),MONTH(LAST(transactions.membership_expire_date)),MONTH(LAST(transactions.transaction_date)),label,days_to_churn,churn_date
0,++sZWXQ1v6+vf3GHez2B+CmMxTBSy8lEONF6d5E3bcI=,2015-01-01,5.0,0.0,,7.0,,0,,,...,,False,False,False,1.0,,,0.0,304.0,
1,++sZWXQ1v6+vf3GHez2B+CmMxTBSy8lEONF6d5E3bcI=,2015-02-01,5.0,0.0,,7.0,,1,149.0,4.966667,...,31.0,True,True,True,2.0,2.0,1.0,0.0,273.0,
2,++sZWXQ1v6+vf3GHez2B+CmMxTBSy8lEONF6d5E3bcI=,2015-03-01,5.0,0.0,,7.0,2419200.0,2,149.0,4.966667,...,28.0,True,False,True,3.0,3.0,2.0,0.0,245.0,
3,++sZWXQ1v6+vf3GHez2B+CmMxTBSy8lEONF6d5E3bcI=,2015-04-01,5.0,0.0,,7.0,2548800.0,3,149.0,4.966667,...,31.0,False,False,False,4.0,4.0,3.0,0.0,214.0,
4,++sZWXQ1v6+vf3GHez2B+CmMxTBSy8lEONF6d5E3bcI=,2015-05-01,5.0,0.0,,7.0,2563200.0,4,149.0,inf,...,30.0,False,True,False,5.0,5.0,4.0,0.0,184.0,


# Run with Spark

The next cell parallelizes all the feature engineering calculations using Spark. We want to `map` the partitions to the function and we let Spark divide the work between the executors, each of which is one core on one machine. 

In [18]:
# Create list of partitions
partitions = list(range(N_PARTITIONS))

# Create Spark context - update based on your config
sc = pyspark.SparkContext(master = 'spark://AMB-R09BLVCJ:7077',
                          appName = 'featuretools', conf = conf)

# Parallelize feature engineering
r = sc.parallelize(partitions, numSlices=N_PARTITIONS).\
    map(lambda x: partition_to_feature_matrix(x, feature_defs,
                                              'MS-31_labels.csv')).collect()
sc.stop()

                                                                                

While the run is going on, we can look at the status of the cluster at localhost:8080 and the state of the particular job at localhost:4040. 

__Here is the overall state of the cluster.__

![](../images/spark_cluster2.png)

__Here is information about the submitted job.__

![](../images/spark_job.png)

## Joining the Data

From here, we could read in all the partitioned feature matrices and build a single feature matrix, or if we have a model that supports [incremental (also known as on-line) learning](https://en.wikipedia.org/wiki/Incremental_learning), we can train it with one partition at a time.

In [19]:
feature_matrix = pd.read_csv(f'{BASE_DIR}/p19/MS-31_feature_matrix.csv', low_memory = False)
feature_matrix.head()

Unnamed: 0,msno,time,city,bd,gender,registered_via,AVG_TIME_BETWEEN(transactions.transaction_date),COUNT(transactions),LAST(transactions.actual_amount_paid),LAST(transactions.daily_price),...,DAY(LAST(transactions.transaction_date)),IS_WEEKEND(LAST(logs.date)),IS_WEEKEND(LAST(transactions.membership_expire_date)),IS_WEEKEND(LAST(transactions.transaction_date)),MONTH(LAST(logs.date)),MONTH(LAST(transactions.membership_expire_date)),MONTH(LAST(transactions.transaction_date)),label,days_to_churn,churn_date
0,++sZWXQ1v6+vf3GHez2B+CmMxTBSy8lEONF6d5E3bcI=,2015-01-01,5.0,0.0,,7.0,,0,,,...,,False,False,False,1.0,,,0.0,304.0,
1,++sZWXQ1v6+vf3GHez2B+CmMxTBSy8lEONF6d5E3bcI=,2015-02-01,5.0,0.0,,7.0,,1,149.0,4.966667,...,31.0,True,True,True,2.0,2.0,1.0,0.0,273.0,
2,++sZWXQ1v6+vf3GHez2B+CmMxTBSy8lEONF6d5E3bcI=,2015-03-01,5.0,0.0,,7.0,2419200.0,2,149.0,4.966667,...,28.0,True,False,True,3.0,3.0,2.0,0.0,245.0,
3,++sZWXQ1v6+vf3GHez2B+CmMxTBSy8lEONF6d5E3bcI=,2015-04-01,5.0,0.0,,7.0,2548800.0,3,149.0,4.966667,...,31.0,False,False,False,4.0,4.0,3.0,0.0,214.0,
4,++sZWXQ1v6+vf3GHez2B+CmMxTBSy8lEONF6d5E3bcI=,2015-05-01,5.0,0.0,,7.0,2563200.0,4,149.0,inf,...,30.0,False,True,False,5.0,5.0,4.0,0.0,184.0,


# Conclusions

In this notebook, we saw how to distribute feature engineering in Featuretools using the Spark framework. This big-data processing technology lets us use multiple computers to parallelize calculations, resulting in efficient data science workflows even on large datasets. 

The basic approach is:

1. Divide data into independent partitions
2. Run each subset in parallel with a different worker
3. Join results together if necessary 

The nice part about using frameworks such as Dask and Spark with PySpark is we don't have to change the underlying Featuretools code. We write our code in native Python, change the backend running the calculations, and distribute the calculations across a cluster of machines. Using this approach, we'll be able to scale to any size datasets and take on even more exciting data science and machine learning problems. 

## Next Steps

The final step of the machine learning pipeline is to build a model to make predictions for these features. This is implemented in the `Modeling` notebook.