In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import datetime
import random
import string
import matplotlib.pyplot as plt
import seaborn as sns

from numba import jit
from multiprocessing import cpu_count
from dask.multiprocessing import get
from functools import reduce

nCores = cpu_count()
sns.set()

%load_ext line_profiler

In [43]:
row_range = 10 ** np.arange(5,8)

## Initializing Data

The computer I primarily use has 32GB of RAM (DDR4-3200,CAS14).  Additionally, I created a 64GB swap file partition on a Samsung 960 Evo M.2 Flash Drive (if anyone has any experience using Intel Optane drive for this, let me know about your experiences).  I wanted to create a dataframe that exceeded 32GB in memory to test the efficacy of Pandas vs Dask vs Spark.  The following parameters should accomplish that with 100 million rows.

The "data" contains a random date between 1900 and 2000, a random float between 0 and 1, a random int between 0 and 3333, and a "categorical" string of [a-z].

The random int column, c2, contains NaN values, 10% of the total.

The categorical column, c3, also contains NaN values, 10% of the total.

In [2]:
def gen_data(nRows, startdate):
    @jit
    def numerical_data(nRows, startdate):
        c1 = np.random.rand(nRows)
        c2 = np.random.randint(0,3333,nRows)
        c3 = np.random.choice(['a','b','c','d','e',
                   'f','g','h','i','j',
                   'k','l','m','n','o',
                   'p','q','r','s','t',
                   'u','v','w','x','y','z'],size=nRows)

        return c1, c2, c3

    c1, c2, c3 = numerical_data(nRows, startdate)
    date = [startdate + datetime.timedelta(int(i)) for i in np.random.randint(1,36524,size=nRows)]
    
    data = pd.DataFrame({'date':date,
                         'c1':c1,
                         'c2':c2,
                         'c3':c3
                        })

    data.date = pd.to_datetime(data.date)

    ### picking random null values
    data.loc[data.sample(frac=.1).index,'c2'] = np.nan
    data.loc[data.sample(frac=.1).index,'c3'] = np.nan
    
    return data

## Python, Pandas, Single-Processor

The following function extracts the various parts of time from a datetime column and returns the original dataframe with the new features:

In [3]:
def date_extractor(df, dateCol, interval=['year','month','week','day','dayofweek']):
    '''
    input: dataframe, column of datetime objects, desired list of time-interval features
    output: dataframe with new time-interval features appended
    '''
    df.is_copy = False # we're not dealing with copies, this avoids the warning
    for i in interval:
        df.loc[:, i] = eval('df.%s.dt.%s'% (dateCol, i))
    df.drop(dateCol, axis=1, inplace=True)
    
    return df

This next function fills our missing values and creates an addition feature: `feat_isnull`

In [4]:
def impute_null(df, cols):
    '''
    input: dataframe, numerical columns with null values
    output: dataframe with null values imputed with mean and a new feature indicating that entry was null
    '''
    df.is_copy = False # we're not dealing with copies, this avoids the warning
    for col in cols:
        # creating the new feature which indicates isnull
        feat_is_null = col + '_isnull'
        df.loc[:, feat_is_null] = np.int8(df[col].isnull())
        
         # imputing median
        impute_value = np.nanmean(df[col])
        df.loc[:, col].fillna(impute_value, inplace=True)
    
    return df

For categorical features, we'll treat them slightly differently.

In [5]:
def impute_null_cat(df, cols, naStr):
    '''
    input: dataframe, categorical column with null values, string to signify missing value
    output: dataframe with null values imputed with 'UNK' and a new feature indicating entry was null
    '''
    df.is_copy = False # we're not dealing with copies, this avoids the warning
    for col in cols:
        # creating the new feature which indicates isnull
        feat_is_null = col + '_isnull'
        df.loc[:, feat_is_null] = np.int8(df[col].isnull())
        
        # imputing missing code
        df.loc[:, col].fillna(naStr, inplace=True)
        
    return df

The following functions deal with encoding categorical features.  We will not be using the sklearn encoder as it has issues dealing with previously unseen values.  For example, let's say you have a feature with 6 levels: `{A, B, C, D, E, F}`, however, `F` is relatively rare.  After creating a validation set, it turns out that `F` is not seen in the `x_train` but is in `x_valid`.  

Sklearn's encoder may not be able to "train" on `x_train` and properly convert `x_valid` due to the varying number of values.  In the functions below, instead, we pass `unkStr` to recode a previously unseen value.  For this example, we will encode these as we did missing values.

In [6]:
def cat_encode_train(df, col, unkStr):
    '''
    input: dataframe, name of single categorical column
    output: dictionary representing previous levels and new numerical encodings
    '''
    keys = set(df[col])
    keys.add(unkStr)
    values = np.arange(0, len(keys)+1)
    
    return dict(zip(keys,values))


def cat_encoder(df, col, dict_enc, unkStr):
    '''
    input: dataframe, name of single categorical column, dictionary of encodings, string to use as unknown
    output: dataframe with categorical values encoded
    note: you probably want to match the unknown str with the string used as null in impute_null_cat
    '''
    df.is_copy = False # we're not dealing with copies, this avoids the warning
    # need to replace unknown values with unkStr
    df.loc[~df[col].isin(set(dict_enc.keys())), col] = unkStr

    df.loc[:,col] = df.loc[:,col].map(lambda x : dict_enc[x])
    
    return df

In [7]:
results_py = pd.DataFrame(columns = [
                                     'pd_date',
                                     'pd_impute',
                                     'pd_encode',
                                     'pd_total',
                                     'nRows',
                                     'memory'
                                    ])

# parameters for generating data
startdate = datetime.date(1900,1,1)

for i,nIter in enumerate(row_range): 
    data = gen_data(nIter, startdate)
    
    # date features
    print('Size: %s'% nIter)
    tmp_ = %timeit -n 1 -r 1 -o date_extractor(data, 'date')
    out1_ = tmp_.average
    
    # imputing nulls
    tmp_ = %timeit -n 1 -r 1 -o impute_null(data, ['c2'])
    out2_ = tmp_.average
    tmp_ = %timeit -n 1 -r 1 -o impute_null_cat(data, ['c3'], 'UNK')
    out2_ += tmp_.average
    
    # encode cat features
    tmp_ = %timeit -n 1 -r 1 -o cat_encode_train(data, 'c3', 'UNK')
    out3_ = tmp_.average
    dict_ = cat_encode_train(data, 'c3', 'UNK')
    tmp_ = %timeit -n 1 -r 1 -o cat_encoder(data, 'c3', dict_, 'UNK')
    out3_ += tmp_.average
    
    row = {'pd_date':out1_,
           'pd_impute':out2_,
           'pd_encode':out3_,
           'pd_total':out1_ + out2_ + out3_,
           'nRows':nIter,
           'memory':data.memory_usage().sum() / 1e6
          }
    
    results_py.loc[i] = row

Size: 100000
45.3 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
4.48 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
8.44 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
1.2 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
35.4 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Size: 1000000
381 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
24.5 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
75.6 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
12.9 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
344 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [8]:
results_py

Unnamed: 0,pd_date,pd_impute,pd_encode,pd_total,nRows,memory
0,0.045273,0.012925,0.036581,0.094779,100000.0,6.60008
1,0.381167,0.100056,0.356497,0.83772,1000000.0,66.00008


## Dask, Multi-Processor

For this next section, we'll be using map_partitions to apply the previous written functions across data sets to parallelize their operations.  While we could re-write these functions, this is the quickest way to get the benefit of multi-core parallel operation.  However, we're going to have to rewrite our `impute_null()` and `cat_encoder()` functions.  

For imputing the mean on a continuous variable, we'll need to collect all possible values and calculate the mean (instead of taking the mean across partitions, and finding the mean of that).  This next block illustrate that these are not guaranteed to be equivalent values:

In [9]:
test = np.random.rand(10)
test2 = np.random.rand(10)
test3 = np.random.rand(20)

# Mean of means
print('Mean of means: %s' % np.mean([np.mean(test), np.mean(test2), np.mean(test3)]))

# Mean of all values
print('Mean of all values: %s' % (np.sum([np.sum(test),np.sum(test2),np.sum(test3)]) / 40))

Mean of means: 0.4688591570581349
Mean of all values: 0.4708584894709233


In [10]:
def impute_null_dask(dd, cols):
    '''
    input: dask dataframe, numerical columns with null values
    output: dask dataframe with null values imputed with mean and a new feature indicating that entry was null
    '''
    #dd.is_copy = False # we're not dealing with copies, this avoids the warning
    for col in cols:
        # creating the new feature which indicates isnull
        feat_is_null = col + '_isnull'
        
        # unfortunately, have to use some trickery here as the new column assignment is not a string
        eval('dd.assign(%s=data.loc[:,"%s"].isnull().astype(np.uint8))' % (feat_is_null, col))
        
         # we're going to have to aggregate with a tuple to get both the total sum and number of rows
        sum_len = dd.loc[:,col].map_partitions(lambda x : (np.nansum(x), len(x))).compute()
        sum_len = reduce(lambda x, y : (x[0] + y[0], x[1] + y[1]), sum_len)
        impute_val = sum_len[0] / sum_len[1]
        dd = data.map_partitions(lambda df : df.loc[:,col].fillna(impute_val))
    
    return dd  

Our label encoder takes all of the possible values within a feature and creates a numerical mapping.  However, since our data will be split randomly into parititons, we cannot be certain that each partition will have an identical set of values.  In this example, I split the data into 12 partitions, but for this function, I do not want 12 potential encodings.

We'll change the function to, instead, aggregate all the potential encodings before creating a numerical mapping.

In [11]:
def cat_encode_train_dask(dd, col, unkStr):
    '''
    input: Dask Dataframe
    output: Dictionary containing categorical-to-numerical mappings
    note: Need to be careful to grab all potential mappings across all partitions
    '''
    tmp_ = dd.map_partitions(lambda df : set(df[col])).compute()
    keys = reduce(lambda x, y : x | y, tmp_)
    keys.add(unkStr)
    values = np.arange(0, len(keys)+1)
    
    return dict(zip(keys,values))

In [12]:
results_dd = pd.DataFrame(columns = [
                                     'dd_date',
                                     'dd_impute',
                                     'dd_encode',
                                     'dd_total',
                                     'nRows'
                                    ])

# parameters for generating data
startdate = datetime.date(1900,1,1)

for i,nIter in enumerate(10 ** np.arange(5,8)): 
    data_ = gen_data(nIter, startdate)
    data = dd.from_pandas(data_, npartitions=12)
    
    print('Size: %s'% nIter)
    # date features
    tmp_ = %timeit -n 1 -r 1 -o  data.\
                                    map_partitions(lambda df : date_extractor(df,'date')).\
                                    compute()
    out1_ = tmp_.average
    
    # impute nulls
    tmp_ = %timeit -n 1 -r 1 -o  impute_null_dask(data, ['c2']).compute()
    out2_ = tmp_.average
    data = dd.from_pandas(data_, npartitions=12)
    tmp_ = %timeit -n 1 -r 1 -o  data.\
                                    map_partitions(lambda df : impute_null_cat(df, ['c3'], 'UNK')).\
                                    compute()
    out2_ += tmp_.average
    
    # encode cat features
    tmp_ = %timeit -n 1 -r 1 -o cat_encode_train_dask(data, 'c3', 'UNK')
    out3_ = tmp_.average
    dict_ = cat_encode_train_dask(data.\
                                   map_partitions(lambda df : impute_null_cat(df, ['c3'], 'UNK')),
                                  'c3',
                                  'UNK')
    data = dd.from_pandas(data_, npartitions=12)
    tmp_ = %timeit -n 1 -r 1 -o data.\
                                    map_partitions(lambda df : date_extractor(df,'date')).\
                                    map_partitions(lambda df : impute_null(df, ['c2'])).\
                                    map_partitions(lambda df : impute_null_cat(df, ['c3'], 'UNK')).\
                                    map_partitions(lambda df: cat_encoder(df, 'c3', dict_, 'UNK')).\
                                    compute()
    out3_ += tmp_.average
    
    row = {'dd_date':out1_,
           'dd_impute':out2_,
           'dd_encode':out3_-out2_-out1_,
           'dd_total':out3_,
           'nRows':nIter,
          }
    
    results_dd.loc[i] = row

Size: 100000
192 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
29.3 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
62.6 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
9.62 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
460 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Size: 1000000
271 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
30.7 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
177 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
16.8 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
1.09 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Size: 10000000
1.32 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
82.8 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
1.02 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
202 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
6.64 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loo

In [13]:
results_dd

Unnamed: 0,dd_date,dd_impute,dd_encode,dd_total,nRows
0,0.191643,0.091936,0.186275,0.469853,100000.0
1,0.271246,0.207563,0.632007,1.110816,1000000.0
2,1.320026,1.105501,4.419274,6.844801,10000000.0


## Spark

For this section, we will need to rewrite much of the previous work to utilize Spark.  Additionally, we'll need to configure the various components of Spark when we initialize a Spark Context.

In [14]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

conf = SparkConf().\
        setMaster('local[*]').\
        setAppName('parallel_preprocessing').\
        set('spark.driver.cores','1').\
        set('spark.num.executors','5').\
        set('spark.driver.memory', '4G').\
        set('spark.executor.memory', '5G').\
        set('spark.driver.maxResultSize', '5G')

sc = SparkContext.getOrCreate(conf=conf)
sqc = SQLContext.getOrCreate(sc)

Much like Python, datetime objects in Spark can be split into their constituent components.  Unfortunately, not every piece behaves like its Python counterpart.  In our case, the default 'dayofweek' will be encoded as a string `'Sunday', 'Monday',...` when we want integers `0, 1,...`.  Also unfortunate, changing data types is not the easiest thing in the world for a Spark Data Frame, but we can use `select` and `cast` to take care of it.

In [16]:
def spark_date_extractor(sdf, dateCol):
    '''
    input: Spark dataframe, column of datetime objects
    output: Spark dataframe with new time-interval features appended
    notes: Unfortunately, there's a lot of hard coding here for the individual features
    ''' 
    str_to_int = {
              'Sunday':'0', 
              'Monday':'1',
              'Tuesday':'2',
              'Wednesday':'3',
              'Thursday':'4',
              'Friday':'5',
              'Saturday':'6',
             }
    
    sdf = sdf.\
    withColumn('year', year(dateCol)).\
    withColumn('month', month(dateCol)).\
    withColumn('day', dayofmonth(dateCol)).\
    withColumn('week', weekofyear(dateCol)).\
    withColumn('dayofweek', date_format(dateCol,'EEEE')).\
    na.replace(str_to_int, 1, 'dayofweek').\
    selectExpr('c1', 'c2', 'c3', 'year', 'month', 'day', 'week', 'cast(dayofweek as int) dayofweek')
    
    return sdf

Since we're using the mean value to impute missing continuous values, we'll want to collect all possible values across the partition first as this simple 

In [17]:
def spark_impute_null(sdf, cols):
    '''
    input: Spark Dataframe, numerical columns missing values
    output: Spark Dataframe with missing values filled with mean
    '''
    for col in cols:
        value = sdf.\
                filter(~isnan(col)).\
                select(avg(col)).\
                head()[0]
        
        sdf = sdf.na.fill({col:value})
    
    return sdf


def spark_impute_null_cat(sdf, cols, unkStr):
    '''
    input: Spark Dataframe, categorical columns missing values, new value for missing
    output: Spark Dataframe with missing value replaced with unkStr
    '''
    for col in cols:
        sdf = sdf.withColumn(col, regexp_replace(col, 'NaN', 'UNK'))
    
    return sdf

Much like in Dask, we'll need to be sure to gather all possible values across the partitions.

In [40]:
def spark_cat_encode_train(sdf, col, unkStr):
    '''
    input: spark data frame, col to encode, unkStr to append for future unknowns
    output: dictionary containing previous values and numeric encodings
    '''
    keys = sdf.select(col).rdd.reduce(lambda x, y : set(x) | set(y))
    keys.add(unkStr)
    values = np.arange(0, len(keys)+1).astype(str)
    
    return dict(zip(keys,values))
    
def spark_cat_encoder(sdf, col, dict_enc, unkStr):
    '''
    input: spark dataframe, col to encode, dictionary of encodings, unkStr for unknowns
    output: spark dataframe with string column encoded as integer
    '''
    # need to recode unknown values to unkStr
    cast_unk = udf(lambda x : unkStr if x not in dict_enc else x, StringType())
    sdf = sdf.withColumn(col, cast_unk(col))
    
    # now make the replacement according to dict_enc
    sdf = sdf.na.replace(dict_enc, 1, col)
    # need this monstrosity to select all cols with col recast as int
    sdf = eval('sdf.selectExpr("'+\
               '","'.join(['cast(%s as int) %s' % (i,i) if i == col else i for i in sdf.columns])+\
               '")'
              )
    
    return sdf

In [None]:
results_sp = pd.DataFrame(columns = [
                                     'sp_date',
                                     'sp_impute',
                                     'sp_encode',
                                     'sp_total',
                                     'nRows'
                                    ])

# parameters for generating data
startdate = datetime.date(1900,1,1)

for i,nIter in enumerate(row_range): 
    data = gen_data(nIter, startdate)
    data_schema = StructType([
                              StructField('c1',DoubleType(),True),
                              StructField('c2',DoubleType(),True),
                              StructField('c3',StringType(),True),
                              StructField('date',DateType(),True)
                             ])

    data = sqc.createDataFrame(data, data_schema)
    
    print('Size: %s'% nIter)
    # date features
    tmp_ = %timeit -n 1 -r 1 -o  
    out1_ = tmp_.average
    
    # impute nulls
    tmp_ = %timeit -n 1 -r 1 -o  
    out2_ = tmp_.average

    tmp_ = %timeit -n 1 -r 1 -o  
    out2_ += tmp_.average
    
    # encode cat features
    tmp_ = %timeit -n 1 -r 1 -o 
    out3_ = tmp_.average
    dict_ = 
    tmp_ = %timeit -n 1 -r 1 -o 
    out3_ += tmp_.average
    
    row = {'sp_date':out1_,
           'sp_impute':out2_,
           'sp_encode':out3_-out2_-out1_,
           'sp_total':out3_,
           'nRows':nIter,
          }
    
    results_sp.loc[i] = row

In [None]:
fig, ax = plt.subplots()
ax.plot((results_dates.nRows), results_dates.pandas, label='Python')
ax.set_xlabel("Rows")
ax.set_ylabel("Time(Seconds)")
ax.set_xscale('log')
ax.axis([1,100000,0,.05])
ax.legend()
#fig.savefig('./python_numba_only.png',dpi=300)

In [15]:
startdate = datetime.date(1900,1,1)
data = gen_data(100_000,startdate)

data_schema = StructType([
                          StructField('c1',DoubleType(),True),
                          StructField('c2',DoubleType(),True),
                          StructField('c3',StringType(),True),
                          StructField('date',DateType(),True)
                         ])

data_spark = sqc.createDataFrame(data,data_schema)