# W261 Final Project

#### *Anusha Munjuluri, Arvindh Ganesan, Kim Vignola, Christina Papadimitriou*

### Notebook Set-up

In [10]:
# imports
import re
import time
import json
import yaml
# import pydot
import operator
import numpy as np
import pandas as pd
import seaborn as sns
from math import sqrt
from csv import reader
import matplotlib.pyplot as plt
%matplotlib inline
from IPython.display import Image
from random import seed, randrange
from collections import defaultdict
import matplotlib.patches as mpatches


# Imports for DF and MLIB
from pyspark.sql import types
import pyspark.sql
import pyspark.sql.functions
import pyspark.sql.functions as func
from pyspark.sql.functions import col, countDistinct, approxCountDistinct, count, when, desc
from pyspark.ml.feature import QuantileDiscretizer, Bucketizer
from pyspark.sql import DataFrameStatFunctions as statFunc

In [11]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

In [12]:
# start Spark Session
from pyspark.sql import SparkSession
app_name = "final_project"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

## 1. Question Formulation


### Background 

The following analysis is based on a Kaggle dataset from Criteo, an internet advertising company focused on retargeting. Criteo's goal is to increase online clickthrough rates among consumers who have previously visited an advertiser's website. This information will be used by Criteo to more efficiently provide the right ads to the right people. Optimizing the retargeting process not only helps advertisers become more efficient in terms of how they spend their dollars, but also it reduces clutter for consumers who do not want to be "followed" by ads for irrelevant products (or ones they may have already purchased!). **Our goal is to create a model that will most accurately predict clickthroughs (label = 1); Due to binary categorical nature of the output label (0,1), we are exploring classification models for analysis.** 

Features given in the data set most likely represent characterstics about consumer behavior (history of clickthroughs, site visitiation, etc.), the ads themselves (product, creative approach, placement, etc.) and general metrics such as the date the ad was published.  However **since there is no visibility into what each feature represents, our challenge is to make our predictions based on the data alone. With over 6 million records to train each day (~45 million per week), this will require a scalable approach**.


### Dataset Introduction 

The training dataset consists of a portion of Criteo's traffic over a period of 7 days. Each row corresponds to a display ad served by Criteo and the first column indicates whether this ad has been clicked or not. The positive (clicked) and negatives (non-clicked) examples have both been subsampled (but at different rates 75% - 0 Class, 25% - Class) in order to reduce the dataset size.

**There are 13 numerical features (mostly count features) and 26 categorical features in this dataset. The values of the categorical features have been hashed onto 32 bits for anonymization purposes. The semantic of these features is undisclosed**. Some features may have missing values. All the rows are chronologically ordered. The test set is computed in the same way as the training set but it corresponds to events on the day following the training period and does not have the label column. Since, there is no time data available, we are not considering this dataset to be a time series model.


### Key Questions: Features and Model

#### 1. Which features are most important in predicting clickthroughs?

Having this information can help Criteo focus on the metrics that are most critical to their product. With 39 features, there is a high risk of overfitting. We should identify a model that provides an optimal tradeoff between bias and variance. **Since we didnt get any metadata about the features, we are relying on EDA and regularization techniques to help us determine the important features and reduce dimensionality of the feature space**.


#### 2. Which machine learning approach not only provides the highest accuracy in predicting clickthroughs, but is also scalable enough to be useful in a production environment?

As internet patterns and product choices change rapidly, the ideal model should be trained daily to update the following day's retargeting model. Scaling would help us achieve shorter training times than processing records sequentially. **Any ML algorithm which can be trained using associative and commutative properties (ex. simple addition, with no state dependencies) such as Batch Logisitc Regression or Tree Algorithms based can be used for scaling the training approach.**

## 2. EDA & Discussion of Challenges

### Goals for this section:

#### PART A: Loading and Creating files with pre-processing 

1. Create RDDs for full train and test files. 
2. Sample full train RDD to create trainRDD for training, heldOutRDD for testing and a random sample of 1000 rows as toy dataset for demonstration purpose ('toy1000.txt').
3. Take another random sample of 300 rows from the train file ('toy_test300.txt') which will be used to validate ML models in later sections. 

#### PART B: EDA with Toy Dataset and RDDs/Pandas

Perform EDA on toy dataset using pandas/RDDs including visualizations that help inform our data transformation decisions.  

#### PART C:  EDA on Full Dataset with Dataframes 

Perform EDA on full train dataset using Dataframes.

#### PART D: Data Transformation on Toy Dataset
Data transformations using Spark RDDs on toy dataset.  

**NOTE: In section 4, we perform the same data transformations on entire dataset using Spark Dataframes for Spark ML models.**

### PART A: LOADING AND CREATING FILES WITH PRE-PROCESSING

### Files and Columns Details:

1. **Full Train File 'train.txt'**:  Number of rows: 45840617 (~ 45 million; ~ 10 GB) 40 columns
2. **Full Test File 'test.txt'**: Number of rows: 6042135 (~ 6 million; 1.4 GB) 39 columns (all columns except label column)
3. **Toy Train File 'toy100.txt'**: A random sample of 1000 rows as toy dataset for demonstration purpose with the same number of columns as the train file. 
4. **Toy Test File 'toy_test300.txt'**: A random sample of 300 rows as toy test dataset with the same of columns as the train file and validation of ML models in later section. 
4. **Columns**: 40
    -  13 Numerical Features I1-I13
    -  26 Categorical Features C1-C26
    -  1 Label Column - 0 or 1   

### 2.1 Data Loading

In [13]:
# take a look at the full train file data top row 
!head -n 1 data/train.txt

0	1	1	5	0	1382	4	15	2	181	1	2		2	68fd1e64	80e26c9b	fb936136	7b4723c4	25c83c98	7e0ccccf	de7995b8	1f89b562	a73ee510	a8cd5504	b2cb9c98	37c9c164	2824a5f6	1adce6ef	8ba8b39a	891b62e7	e5ba7672	f54016b9	21ddcdc9	b1252a9d	07b5194c		3a171ecb	c5c50484	e8b83407	9727dd16


In [14]:
# load the full train and test files
fullTrainRDD = sc.textFile('data/train.txt')
testRDD = sc.textFile('data/test.txt')

FIELDS = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13',
          'C1','C2','C3','C4','C5','C6','C7','C8','C9','C10','C11','C12','C13','C14',
          'C15','C16','C17','C18','C19','C20','C21','C22','C23','C24','C25','C26','Label']

# Get categorical and numeric field names separately
NUM_FIELDS = FIELDS[0:13]
CAT_FIELDS = list(set(FIELDS)-set(NUM_FIELDS))

In [15]:
# number of rows in full train/test data
print(f"Number of records in train data: {fullTrainRDD.count()} ...")
print(f"Number of records in test data: {testRDD.count()} ...")

Number of records in train data: 45840617 ...
Number of records in test data: 6042135 ...


### 2.2 Creating Train and Test Split

In [16]:
# Generate 80/20 (pseudo)random train/test split 
trainRDD, heldOutRDD = fullTrainRDD.randomSplit([0.8,0.2], seed = 1)
print(f"... held out {heldOutRDD.count()} records for evaluation and assigned {trainRDD.count()} for training.")

... held out 9167871 records for evaluation and assigned 36672746 for training.


### 2.3 Creating a toy RDD

#### 2.3.1 Toy Train Set 

A train toy data set was created by randomly sampling 1000 records from the dataset: 

`!gshuf -n 1000 data/train.txt >> data/toy1000.txt`

In [17]:
toyRDD = sc.textFile('data/toy1000.txt')
print(f"Number of records in toy data: {toyRDD.count()} ...")

Number of records in toy data: 1000 ...


In [18]:
print(toyRDD.take(1))

['1\t0\t478\t13\t\t3396\t194\t11\t13\t312\t0\t7\t\t\t05db9164\t207b2d81\t1757640a\t06148e59\t25c83c98\tfbad5c96\tf36791d8\t0b153874\ta73ee510\tc7009b63\t2714650d\t1a69f1c0\t9a88e2e2\t07d13a8f\t0c67c4ca\t8075af0c\te5ba7672\t395856b0\t21ddcdc9\tb1252a9d\t8e4884c0\t\t423fab69\tb936bfbe\t001f3601\tf2fc1d6e']


#### 2.3.1 Toy Test Set 

We will randomly sample a toy held-out dataset frim the `train.txt` file.

`!gshuf -n 300 data/train.txt >> data/toy_test300.txt`

In [19]:
toy_testRDD = sc.textFile('data/toy_test300.txt')

### 2.4 Pre-Processing

In [20]:
# helper functions
def parse(line):
    """
    Map line --> tuple of (features, label)
    """
    fields = line.split('\t')
    features,label = fields[1:], fields[0]
    return(features, label)

def edit_data_types(line):
    """
    Map tuple of (features, label) --> tuple of (formated features, label)
    
    * '' is replaced with 'null'
    * numerical fields are converted to integers
    * make label column numeric
    """
    features, label = line[0], line[1]
    formated_features = []
    for i, value in enumerate(features):
        if value == '':
            formated_features.append('null')
        else:
            if i < 13:
                formated_features.append(float(value)) 
            else:
                formated_features.append(value)
    return (formated_features, int(label))

In [21]:
# Parsing, making '' as np.nan and converting numerical features and output label to int
trainRDDCached = trainRDD.map(parse).map(edit_data_types).cache()
toyRDDCached = toyRDD.map(parse).map(edit_data_types).cache()

In [22]:
print(trainRDDCached.take(1))

[([1.0, 1.0, 5.0, 0.0, 1382.0, 4.0, 15.0, 2.0, 181.0, 1.0, 2.0, 'null', 2.0, '68fd1e64', '80e26c9b', 'fb936136', '7b4723c4', '25c83c98', '7e0ccccf', 'de7995b8', '1f89b562', 'a73ee510', 'a8cd5504', 'b2cb9c98', '37c9c164', '2824a5f6', '1adce6ef', '8ba8b39a', '891b62e7', 'e5ba7672', 'f54016b9', '21ddcdc9', 'b1252a9d', '07b5194c', 'null', '3a171ecb', 'c5c50484', 'e8b83407', '9727dd16'], 0)]


In [23]:
print(toyRDDCached.take(1))

[([0.0, 478.0, 13.0, 'null', 3396.0, 194.0, 11.0, 13.0, 312.0, 0.0, 7.0, 'null', 'null', '05db9164', '207b2d81', '1757640a', '06148e59', '25c83c98', 'fbad5c96', 'f36791d8', '0b153874', 'a73ee510', 'c7009b63', '2714650d', '1a69f1c0', '9a88e2e2', '07d13a8f', '0c67c4ca', '8075af0c', 'e5ba7672', '395856b0', '21ddcdc9', 'b1252a9d', '8e4884c0', 'null', '423fab69', 'b936bfbe', '001f3601', 'f2fc1d6e'], 1)]


#### 2.4.1 Some extra pre-processing for a pandas DataFrame for EDA use

In [24]:
# helper function 'null' to np.nan for pandas df 
def null_to_nan(line):
    """
    converts "null" to np.nan in RDD
    """
    features, label = line[0], line[1]
    formated_features = []
    for i, value in enumerate(features):
        if value == 'null':
            formated_features.append(np.nan)
        else:
            formated_features.append(value)
    return (formated_features, label)

# put the toy RDD into a pandas dataframe for EDA charting
toyRDDtoPandas = toyRDDCached.map(null_to_nan) \
                                .map(lambda x: np.append(x[0], [x[1]])) \
                                .collect()

toy_df = pd.DataFrame(toyRDDtoPandas, columns=FIELDS)

In [25]:
toy_df.head()

Unnamed: 0,I1,I2,I3,I4,I5,I6,I7,I8,I9,I10,...,C18,C19,C20,C21,C22,C23,C24,C25,C26,Label
0,0.0,478.0,13.0,,3396.0,194.0,11.0,13.0,312.0,0.0,...,395856b0,21ddcdc9,b1252a9d,8e4884c0,,423fab69,b936bfbe,001f3601,f2fc1d6e,1
1,2.0,0.0,,,1209.0,0.0,2.0,1.0,0.0,1.0,...,526e8765,,,8b7fb864,,32c7478e,45b2acf4,,,0
2,,1.0,1.0,8.0,489.0,66.0,0.0,11.0,8.0,,...,8f9b4e88,,,050a23dc,,32c7478e,8a3cfad4,,,0
3,0.0,179.0,7.0,8.0,996.0,67.0,6.0,44.0,344.0,0.0,...,2804effd,,,723b4dfd,,32c7478e,b34f3128,,,0
4,2.0,2.0,10.0,27.0,1101.0,29.0,11.0,42.0,241.0,1.0,...,e88ffc9d,21ddcdc9,b1252a9d,dca9a28d,ad3062eb,bcdee96c,3fdb382b,cb079c2d,1c2df582,1


### PART B: EDA WITH TOY DATASET USING RDDs/PANDAS

### 2.5 Labels

First we take a look at the percentage of records that belong to each label for our classification problem in order to determine whether our classes are balanced. 

In [26]:
# TOY DATA
# counting records for each class 
count_label_0 = toyRDDCached.filter(lambda x: x[1] == 0).count()
count_label_1 = toyRDDCached.filter(lambda x: x[1] == 1).count()
total = count_label_0 + count_label_1

print(f"{np.round(count_label_0/total*100, 2)} % of the records have label=0 and {np.round(count_label_1/total*100, 2)} % have label=1...")

72.3 % of the records have label=0 and 27.7 % have label=1...


In [27]:
# FULL DATA  ( Don't re-run this!!! )
# counting records for each class 
count_label_0 = trainRDDCached.filter(lambda x: x[1] == 0).count()
count_label_1 = trainRDDCached.filter(lambda x: x[1] == 1).count()
total = count_label_0 + count_label_1

print(f"{np.round(count_label_0/total*100, 2)} % of the records have label=0 and {np.round(count_label_1/total*100, 2)} % have label=1...")

KeyboardInterrupt: 

The result above show that the **labels are imbalanced**, with 75% of records having label=0 (i.e. unclicked ads). However, we will not attempt to balance the labels at this stage. **Being aware of this imbalance, we will carefully examine the prediction results to detect any model bias (i.e. predicting always label=0)**

### 2.6 Counting nulls in each column

Then, we take a look at the percentage of null values for all columns in our dataset. 

In [None]:
def get_pct_nulls_in_column(dataRDD, var_position):
    """
    Counts the % nulls in a column 
    """

    null_count = dataRDD.map(lambda x: x[0][var_position]) \
                             .filter(lambda x: x == 'null').count()
    total_count = dataRDD.map(lambda x: x[0][var_position]).count()

    return null_count/total_count*100

In [None]:
# TOY DATA
for var_position, var in enumerate(FIELDS):

    if var_position < 39:
        null_pct = get_pct_nulls_in_column(toyRDDCached, var_position)
        print("FEATURE {}: {}% is null".format(var, np.round(null_pct,2)))

We notice that some columns have a high % of null values. **We could exclude columns that have more than 50% nulls when calculated on full train dataset because those columns will likely not contribute a lot to the prediction results. However, since those variables with more than 50% missing values are categorical variables, the one-hot encoding approach that we will take later on will take care of this issue**.

### 2.7 Numeric Features

#### 2.7.1 Get Summary Statistics and Distributions

Next, we take a look at the summary statistics for our numerical features in order to determine their distributions and detect outliers. 

In [None]:
# RDD version 
def get_stats(dataRDD, var_position):
    """
    Get statistics for numeric variables 
    stats: mean, median, variance, min, max 
    """

    mean = dataRDD.map(lambda x: x[0][var_position]).filter(lambda x: x != 'null').mean() 
    variance = dataRDD.map(lambda x: x[0][var_position]).filter(lambda x: x != 'null').variance() 
    minimum = dataRDD.map(lambda x: x[0][var_position]).filter(lambda x: x != 'null').min() 
    maximum = dataRDD.map(lambda x: x[0][var_position]).filter(lambda x: x != 'null').max() 

    return mean, variance, minimum, maximum

In [None]:
# save the means in a dictionary
mean_dict_toy = {}
st_dev_dict_toy = {}

# get summary stats with RDDs
for var_position, var in enumerate(FIELDS):
    if var_position < 13:
        mean, variance, minimum, maximum = get_stats(toyRDDCached, var_position)
        print("FEATURE {}: \t mean={}, \t variance={}, \t min={}, \t max={}".format(var, np.round(mean, 2), np.round(variance, 2), minimum, maximum))
        mean_dict_toy[var_position] = mean
        st_dev_dict_toy[var_position] = np.sqrt(variance)

In [None]:
# Pandas version 
num_columns = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13']
toy_df_num = toy_df[num_columns].astype(np.float)
toy_df_num.describe()

In [None]:
# Take a look at histograms for each numeric feature 
toy_df_num.hist(figsize=(20,15), bins=15)
plt.show()

In [None]:
# plot boxplots of each feature vs. the outcome
fig, ax_grid = plt.subplots(5, 3, figsize=(20,15))
y = toy_df['Label']
for idx, feature in enumerate(num_columns):
    x = toy_df_num[feature]
    sns.boxplot(x, y, ax=ax_grid[idx//3][idx%3], orient='h', linewidth=.5)
    ax_grid[idx//3][idx%3].invert_yaxis()
fig.suptitle("BoxPlots by Label", fontsize=15, y=0.9)
plt.show()

**The above results show that most of the numeric features are heavily skewed with several outliers on the right end. Hence, we decided to impute the missing values in the numeric features with median value for each corresponding column than mean as mean will be skewed due to the large number of outliers.** 

#### 2.7.2 Correlations

In [None]:
# Correlation between numerical features
corr = toy_df_num.corr()
fig, ax = plt.subplots(figsize=(7, 7))
mask = np.zeros_like(corr, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
cmap = sns.diverging_palette(10, 240, as_cmap=False)
sns.heatmap(corr, mask=mask, cmap=cmap, center=0, linewidths=.2)
plt.title("Correlations between features")
plt.show()

**We observe that some of the numerical variables are highly correlated with each other. In the context of Logistic Regression we would want to eliminate features that are highly correlated with each other. A regularization  technique like Lasso or Ridge could take care of this. However, since Logistic regression is just our baseline model and we are focusing on tree-based algorithms, we do not need to worry about this. Tree-based models know how to assign variable importance and do not require any significant feature engineering.** 

### 2.8 Categorical Features

In this section, we will perform data processing on the 26 categorical features of the dataset. We will start by performing some EDA to compute the **number of unique categories** within each categorical feature and the total counts for each category. 

In [None]:
def count_categories(dataRDD, var, var_position, top):
    """
    input: RDD, name and position of a categorical variable 
    
    output: 
    * number of unique categories in the variable
    * counts of each category occurance by label
    """
    
    # counting category occurance within each categorical feature 
    count_per_category = dataRDD.map(lambda x: ( x[0][var_position], 1)) \
                                           .reduceByKey(lambda x,y: x+y) \
                                           .sortBy(lambda x: -x[1])

    # counting number of unique values within the categorical variable
    num_unique_values = count_per_category.map(lambda x: x[0]).distinct().count()

    print('Unique values within the category:', num_unique_values)
    print(' ')
    top_x = count_per_category.take(top)
    print('Top {} categories by count:'.format(top))
    for i in top_x: 
        print('Category: {}; Count: {}'.format(i[0],i[1]))
    print(' ')

In [None]:
# TOY DATA
for var_position, var in enumerate(FIELDS):

    if var_position > 12 and var_position < 39:
        print(" ")
        print("VARIABLE {}".format(var))
        print(" ")
        count_categories(toyRDDCached, var, var_position=var_position, top=10)

In [None]:
# Bar graphs of category counts within each categorical variable by label
fig, ax = plt.subplots(13, 2,figsize=(15,40))
plt.tight_layout()
fno = 0
# axes are in a two-dimensional array, indexed by [row, col]
for i in range(13):
    for j in range(2):
        fno += 1
        col = "C" + str(fno)
        sns.countplot(y=col, hue="Label", data=toy_df, palette="Greens_d",
                  order=toy_df[col].value_counts().iloc[:10].index,ax=ax[i,j]).set_title('Count By C'+str(fno)+' Label')

**The above analysis shows that some categorical variables have a high number of unique categories. Aditionally, the distribution of counts for most of the categorical variables is very skewed (i.e. some categories appear much more often than others). Considering this information, we decided to bucket the categorical features into 'High', 'Medium', 'Rare' and 'Missing' buckets for each column which will be explained later in the data transformation section.** 

### PART C:  EDA ON FULL DATASET USING DATAFRAMES

### 2.9 Creating a dataframe for TrainRDD (from above)

In [None]:
sampleDF = trainRDDCached.map(lambda x: np.append(x[0], [x[1]])).map(lambda x: x.tolist()).toDF(FIELDS).cache()
                                                                   
# Converting Columns to Numeric Type
for col_name in NUM_FIELDS:
    sampleDF = sampleDF.withColumn(col_name, col(col_name).cast('float'))
sampleDF = sampleDF.withColumn("Label", col("Label").cast('int')).cache()

# Show col types and top row
start = time.time()
sampleDF.head(1)
print(f'\n... {time.time() - start} seconds.')

### 2.10 Categorical Features: 

* Get missing fields, distinct features count and high/medium frequency features for categorical columns. 


* **Each categorical column has a large number of distinct features and missing values (ex. 'C22') which makes categorical columns One Hot Encoding cumbersome** and increases the dimensionality of the feature space by several times. This will make the ML models perform not very well if these categorical columns are used as is. Hence, **as a bucketing technique we are getting the list of features by their counts to group them as 'High', 'Medium', 'Rare' or 'Missing'**. 

    * **High Frequency Features:** are those which which occur at least 10% of the time. Ex: if n = 45 million; 10% of n = 4.5million. ie. give me a list of categorical features (categories) in a column that occur at least 10% of the time. 

    * **Medium Frequency Features:** Categories that occur 2%-10% of the time in a column

    * **Rare Features**: After getting the 'High' and 'Medium' frequency features from a column, remaining features would all be clubbed under the 'Rare' category which occur less than 2% of the time 
    
    * **Missing**: If they are nulls, they are bucketed as 'Missing'.

In [None]:
start = time.time()

# Keep a dictionary of high and low features
high_features = dict()
medium_features =  dict()

# total row count
row_count = sampleDF.count()

for field in CAT_FIELDS:
    print("\nFOR COLUMN "+field+":")
    
    columnDF = sampleDF[[field]].cache()
    
    #Count of missing 
    missing_count = columnDF.select(count(when(col(field) == "null", 1))).head()[0]
    print("No. of Missing Values:", missing_count)
    
    print("Percentage of Missing values:", round(missing_count*100/row_count, 2), "%")
    
    # Distinct
    distinct = columnDF.agg(approxCountDistinct(col(field)).alias(field)).head()[0]
    if missing_count != 0:
        distinct = distinct -1
    print("No. of Distinct Features (without null):", distinct)
    
    # Frequent Items 
    freq10 = columnDF.stat.freqItems([field], 0.1).head()[0]
    freq2 = columnDF.stat.freqItems([field], 0.02).head()[0]
    freq2 = list(set(freq2) - set(freq10))
    print("10% Occuring Features:", freq10)
    print("2% Occuring Features:", freq2)
    
    high_features[field] = freq10
    medium_features[field] =  freq2

print(f'\n... {time.time() - start} seconds.')   

# writing list of features to pickle files for future reference 
with open(r"highfeatures.pickle", "wb") as output_file:
    pickle.dump(high_features, output_file)
with open(r"mediumfeatures.pickle", "wb") as output_file:
    pickle.dump(medium_features, output_file)

### 2.11 Numerical Features

#### 2.11.1 Summary Stats

In [None]:
# Get numeric features summary stats
start = time.time()
numeric_summaryDF = sampleDF[[NUM_FIELDS]].summary().cache()

# Round numeric summary stats
for field in NUM_FIELDS:
    numeric_summaryDF = numeric_summaryDF.withColumn(field, func.round(numeric_summaryDF[field], 2))
    split = set([value[0] for value in numeric_summaryDF[[field]].collect()[3:]])
    splits.append(sorted(list(split)))

# Display numeric summary stats
numeric_summaryDF.show()

print(f'\n... {time.time() - start} seconds.') 

I12 is one of the columns that has the most missing values (> 50%). Missing values are being imputed with medians as discussed in the next sections. 

#### 2.11.2 Column Quantiles to bucket numeric features

Note: values are [min, 25%, median, 75%, max]

In [None]:
splits = []
start = time.time()

# Round numeric summary stats
for field in NUM_FIELDS:
    split = set(statFunc(sampleDF).approxQuantile(field, probabilities = [0.0,0.25,0.50,0.75,1.0], relativeError=0.2))
    splits.append(sorted(list(split)))

print("COLUMN QUANTILES (keeping only unique values for binning):\n")
print(splits)

print(f'\n... {time.time() - start} seconds.') 

#### 2.11.3 Bucket each numerical column based on quantiles

In [None]:
count_buckets = []
buckets = []
start = time.time()

for i,field in enumerate(NUM_FIELDS):
    if len(splits[i])==2:
        splits[i] = [-float("inf")] + splits[i] + [float("inf")]
        
    # Bucketize the values
    bucketizer = Bucketizer(handleInvalid = 'skip', splits = splits[i], inputCol=field, outputCol="bucket")
    result = bucketizer.transform(sampleDF[[field]]).cache()
    print("\nFOR COLUMN "+field+":", "splits are:", splits[i])
    
    # Get ccount of each bucket in a column
    bucket = result.groupBy(["bucket"]).agg(count("bucket").alias("count")).orderBy("bucket")
    count_buckets.append([value[1] for value in bucket.collect()])
    buckets.append([value[0] for value in bucket.collect()])
    print(bucket.show())
    
print("Buckets:", buckets)
print("Bucket Counts:", count_buckets)

print(f'\n... {time.time() - start} seconds.') 

#### 2.11.4 Visualize each of the bucketed numerical column 

Note: if values are too low, they are not shown in the plot. Look at count values in title to determine which buckets are a minority/outliers.

In [None]:
for j, field in enumerate(NUM_FIELDS):
    N = len(buckets[j])
    index = np.arange(N)
    bar_width = 0.95
    labels = []

    for i in range(N):
        labels.append(str(splits[j][i]) + "-" + str(splits[j][i+1]))

    plt.bar(buckets[j], count_buckets[j], align='center', alpha=0.5)
    plt.ylabel('Count')
    plt.xticks(index, labels)   
    plt.title('Bar Plot Column: ' + field + '\n'+str(count_buckets[j]))
plt.show()

* **Above plots show that most of the numeric features have outliers beyond the 75% percentile value on the right end but their count is much smaller compared to the other buckets.** 


* **There are not too many outliers for the Logistic Regression model estimates to be skewed by these values (relying on large sample asymptotics and invoking Central Limit Theorem!) and Decision Trees can anyways innately handle features with outliers.** Hence, we decided not to do any processing such as removing these outliers and retained them as is.  

* However, to be on the safe side we decided to use median (as mean will be skewed because of the outliers) for imputing the values and standardize the features by computing mean post imputation, as means calculated before imputation would be skewed.

### 2.12 Label Column: Dependent Variable

In [None]:
### Label Column Distribution 
start = time.time()
label1_count = sampleDF[['Label']].select(count(when(col('Label') == "1", 1))).head()[0]
label0_count = sampleDF[['Label']].select(count(when(col('Label') == "0", 1))).head()[0]
print("Number of rows with label = 1:", label1_count , '\n% of rows with label = 1:', round(label1_count*100/row_count,2), "%")
print("\nNumber of rows with label = 0:", label0_count , '\n% of rows with label = 0:', round(label0_count*100/row_count,2), "%")

print('\n...', time.time()-start, 'seconds.')

### 2.13 Data Transformation Steps based on EDA  

Based on our EDA on full train dataset, we decided to do the following transformations on columns:

#### A. Numerical Columns:

 **1. Imputing with medians**: 
    
 We decided to impute the missing values in the numeric features with the median value for each corresponding column. **The choice of median instead of mean is made due to the multiple large outliers on the high end that would have caused our mean values to be larger (skewed).** Note, that there is no need to impute the null values for the categorical features since one hot encoding will take care of the nulls later on. 
    
**2. Standardizing**: 
    
In the summary statistics, **we noticed that the numerical features have different ranges for each column, and thus we decided to standardize our data** (i.e. subtracting by mean and dividing by standard deviation of each column), in order to scale them. **Standardization would also help with our Logistic Regression algorithm to coverge faster and use regularization to determine importance of coefficients**.

#### B. Categorical Columns:

**1. Bucketing as 'High', 'Medium', 'Rare', 'Missing':**

The above analysis shows that some categorical variables have a high number of unique categories. Aditionally, the distribution of counts for most of the categorical variables is very skewed (i.e. some categories appear much more often than others). Considering this information, we decided to take the following approach to deal with categorical variables: 

Bucket the categories within each categorical column into 4 groups based on their occurence counts:

    * **High frequency**: categories that occur more times than 10% of the total row count (Example: if the total row count is 1000 -> categories that occur *more than 100 times*)
    * **Medium frequency**: categories that occur more times than 2% and less than 10% of the total row count (Example: if the total row count is 1000 -> categories that occur *50-100 times*)
    * **Low frequency**: categories that occur less times than 2% of the total row count (Example: if the total row count is 1000 -> categories that occur *less than 50 times*)
    * **Missing**: null occurencies (note: since there are a couple of categorical variables with significant percentages of null occurencies, we wanted to retain this information to see if it potentially creates some signal for our models)

**2. One Hot Encoding:**

Convert categorical features to numerical using *One-hot Encoding* as per the buckets obtained above for each column. Specifically, each categorical feature will be converted into 4 columns (`high`, `medium`, `low` and `missing`) that will have values of `1`'s and `0`'s denoting whether that record has a value for that categorical variable that belongs in one of the 4 buckets. This is an alternative data transformation method of categorical variables to numerical, instead of the traditional one-hot enconding. The choice for this method is based on the effort to retain all the information/signals from the categorical variables and at the same time reduce the dimensionality of the dataset compared to a traditional one-hot enconding transformation.

### PART D: DATA TRANSFORMATION ON TOY DATASET

#### 2.13.1 Impute nulls with medians

In [None]:
def impute_nulls(line, mean_or_median_dict):
    """
    Impute the null values of the numerical columns with the mean value of the column
    """
    features, label = line[0], line[1]
    imputed_features = []
    for i, value in enumerate(features):
        if i < 13: 
            if value == 'null':
                imputed_features.append(mean_or_median_dict[i])
            else:
                imputed_features.append(value)
        else: 
            imputed_features.append(value)
    return (imputed_features, int(label))

In [None]:
# getting the median with pandas because RDDs don't have a built-in function 
medians = toy_df_num.median().tolist()
median_dict_toy = dict(zip(mean_dict_toy.keys(), medians))

In [None]:
# imputing nulls with median 
imputedToyRDDCached = toyRDDCached.map(lambda x: impute_nulls(x, median_dict_toy)).cache()

In [None]:
print(imputedToyRDDCached.take(1))

#### 2.13.2 Standardize features 

In [None]:
def standardize(line, mean_dict, st_dev_dict):
    """
    Scale and center data round mean of each feature (mean=0, sd=1)
    """
    features, label = line[0], line[1]
    formated_features = []
    for i, value in enumerate(features):
        if i < 13: 
            formated_features.append((value-mean_dict[i])/st_dev_dict[i])
        else: 
            formated_features.append(value)

    return (formated_features, label)

In [None]:
# TOY DATA 
normedToyRDDCached = imputedToyRDDCached.map(lambda x: standardize(x, mean_dict_toy, st_dev_dict_toy)).cache()

#### 2.13.3 Bucketing Categorical features 

Below, we will demonstrate how the bucketing/one-hot encoding that was applied in a scalable manner to our Criteo dataset, using our toy dataset with 1000 rows.

We will use variable `C1` as an example to demonstrate the implementation. The analysis above showed that variable `C1` has 57 uniques categories, hence we will obtain the occurence counts for each of the 57 categories in `C1`. Each of these categories will then be placed in one of the 4 buckets mentioned above based on its occurence counts.

**Step 1**. Obtain the occurence counts for each category.

In [None]:
# C1 has var_position = 13
category_counts_C1 = normedToyRDDCached.map(lambda x: ( x[0][13], 1)) \
                                       .reduceByKey(lambda x,y: x+y) \
                                       .sortBy(lambda x: -x[1])

category_counts_C1.take(10)

**Step 2**. Classify each category into one of the 4 buckets mentioned above based on its occurence counts and broadcast this information. As we learned throughout the semester, broadcasted variables are very useful in cases where the programmer wants to pass a copy of some useful information to every node in an efficient manner. 

* `>=` 100 times (i.e. 10% of 1000 rows) -> *High frequency* 
* 20-100 times (i.e. 2-10% of 1000 rows) -> *Medium frequency* 
* `<`20 times (i.e. 2% of 1000 rows) -> *Low frequency* 
* `==` 'null' -> *Missing*


In [None]:
# applying on C1 only 
high_frequency_categorices_C1 = sc.broadcast(category_counts_C1.filter(lambda x: x[1] >= 100).map(lambda x: x[0]).collect())
medium_frequency_categorices_C1 = sc.broadcast(category_counts_C1.filter(lambda x: x[1] < 100 and x[1] >= 20).map(lambda x: x[0]).collect())
low_frequency_categorices_C1 = sc.broadcast(category_counts_C1.filter(lambda x: x[1] < 20).map(lambda x: x[0]).collect())

In [None]:
# C1 
print('High frequency categories: {} \n'.format(high_frequency_categorices_C1.value))
print('Medium frequency categories: {} \n'.format(medium_frequency_categorices_C1.value))
print('Low frequency categories: {} \n'.format(low_frequency_categorices_C1.value))

**Step 3.** Applying a homegrown one-hot encoding implementation as explained above. 

In [None]:
# C1 example: the new columns of our dataset now are
FIELDS_NEW_C1 = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13',          # numerical 
          'High_Freq_C1','Medium_Freq_C1','Low_Freq_C1','Missing_C1',      # categorical 
          'Label']                        

In [None]:
def OHE_transform(line, cat_var_position, high_frequency_categorices, medium_frequency_categorices, low_frequency_categorices):
    """
    One hot encoding transformation of an RDD 
    using the high/medium/low/missing logic
    
    returns: (ohe_transformed_features, label)
    """
    features, label = line[0], line[1]
    cat_features = []
    num_features = []
    for i, value in enumerate(features):
        if i > 12 and i < 39: 
            cat_features.append(value)
        else:
            num_features.append(value)

    # define the cat_var_position here 
    high_freq = 1 if any(i in cat_features[cat_var_position] for i in high_frequency_categorices.value) else 0
    medium_freq = 1 if any(i in cat_features[cat_var_position] for i in medium_frequency_categorices.value) else 0
    low_freq = 1 if any(i in cat_features[cat_var_position] for i in low_frequency_categorices.value) else 0
    missing = 1 if any(i in cat_features[cat_var_position] for i in ['null']) else 0
    ohe_features = [high_freq] + [medium_freq] + [low_freq] + [missing]

    return (num_features + ohe_features, label)

In [None]:
# transforming RDD for C1 
var_position = 13
cat_var_position = 0
oheTrasformedToyRDDCached_C1 = normedToyRDDCached.map(lambda x: OHE_transform(x, cat_var_position, high_frequency_categorices_C1, medium_frequency_categorices_C1, low_frequency_categorices_C1)).cache()

In [None]:
print(oheTrasformedToyRDDCached_C1.take(1))

**Step 4**. Transform all 26 categorical features

In [None]:
oheTrasformedToyRDDCached_all = oheTrasformedToyRDDCached_C1

for cat_var_position in list(range(1,26)):
    
    category_counts_C_i = normedToyRDDCached.map(lambda x: ( x[0][cat_var_position+13], 1)) \
                                       .reduceByKey(lambda x,y: x+y) \
                                       .sortBy(lambda x: -x[1])
    
    high_frequency_categorices_C_i = sc.broadcast(category_counts_C_i.filter(lambda x: x[1] >= 100).map(lambda x: x[0]).collect())
    medium_frequency_categorices_C_i = sc.broadcast(category_counts_C_i.filter(lambda x: x[1] < 100 and x[1] >= 20).map(lambda x: x[0]).collect())
    low_frequency_categorices_C_i = sc.broadcast(category_counts_C_i.filter(lambda x: x[1] < 20).map(lambda x: x[0]).collect())

    oheTrasformedToyRDDCached_C_i = normedToyRDDCached.map(lambda x: OHE_transform(x, cat_var_position, high_frequency_categorices_C_i, medium_frequency_categorices_C_i, low_frequency_categorices_C_i)).cache()
    
    oheTrasformedToyRDDCached_all = oheTrasformedToyRDDCached_all.zip(oheTrasformedToyRDDCached_C_i.map(lambda x: x[0][13:])).map(lambda x: (x[0][0]+x[1], x[0][1]))

In [None]:
print(oheTrasformedToyRDDCached_all.take(1))

In [None]:
cat_col_names = []

for i in list(range(1,27)):
    cat_col_names.append('High_Freq_C'+str(i))
    cat_col_names.append('Medium_Freq_C'+str(i))
    cat_col_names.append('Low_Freq_C'+str(i))
    cat_col_names.append('Missing'+str(i))
    
FIELDS_NEW = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13'] + cat_col_names + ['Label']

In [None]:
print(FIELDS_NEW)

### ...Caching toy train and test RDDs post data transformations

In [None]:
toyTrainRDD = oheTrasformedToyRDDCached_all

We will apply the same data transformations to the toy held-out dataset, using the parameters from the train toy set (i.e. median for imputation, mean and st. deviation for standardization).

### ...Data transformation of the toy Test Dataset

In [None]:
# transforming the test toy data using the train data parameters 
# toy_testRDD = sc.textFile('data/toy_test300.txt')
toyTestRDDCached = toy_testRDD.map(parse).map(edit_data_types).cache()
imputedToyTestRDDCached = toyTestRDDCached.map(lambda x: impute_nulls(x, median_dict_toy)).cache()
normedToyTestRDDCached = imputedToyTestRDDCached.map(lambda x: standardize(x, mean_dict_toy, st_dev_dict_toy)).cache()

In [None]:
# transforming categorical features for test toy RDD using parameters obtained for train RDDs 

# initialize to get C1
var_position = 13
cat_var_position = 0
oheTrasformedToyTestRDDCached_C1 = normedToyTestRDDCached.map(lambda x: OHE_transform(x, cat_var_position, high_frequency_categorices_C1, medium_frequency_categorices_C1, low_frequency_categorices_C1)).cache()

# get transformed features for full data 
oheTrasformedToyTestRDDCached_all = oheTrasformedToyTestRDDCached_C1

for cat_var_position in list(range(1,26)):
    
    category_counts_C_i = normedToyRDDCached.map(lambda x: ( x[0][cat_var_position+13], 1)) \
                                       .reduceByKey(lambda x,y: x+y) \
                                       .sortBy(lambda x: -x[1])
    
    high_frequency_categorices_C_i = sc.broadcast(category_counts_C_i.filter(lambda x: x[1] >= 100).map(lambda x: x[0]).collect())
    medium_frequency_categorices_C_i = sc.broadcast(category_counts_C_i.filter(lambda x: x[1] < 100 and x[1] >= 20).map(lambda x: x[0]).collect())
    low_frequency_categorices_C_i = sc.broadcast(category_counts_C_i.filter(lambda x: x[1] < 20).map(lambda x: x[0]).collect())

    oheTrasformedToyTestRDDCached_C_i = normedToyTestRDDCached.map(lambda x: OHE_transform(x, cat_var_position, high_frequency_categorices_C_i, medium_frequency_categorices_C_i, low_frequency_categorices_C_i)).cache()
    
    oheTrasformedToyTestRDDCached_all = oheTrasformedToyTestRDDCached_all.zip(oheTrasformedToyTestRDDCached_C_i.map(lambda x: x[0][13:])).map(lambda x: (x[0][0]+x[1], x[0][1]))

In [None]:
toyTestRDD = oheTrasformedToyTestRDDCached_all

In [None]:
print(toyTestRDD.take(1))

### ...Transform toy RDD train and test sets to pandas DataFrames:
These will be used in section 3B for training and validating the homegrown Random Forest implementation in Python.

In [None]:
toyTrainRDDtoPandas = toyTrainRDD.map(null_to_nan).map(lambda x: np.append(x[0], [x[1]])).collect()
toyTestRDDtoPandas = toyTestRDD.map(null_to_nan).map(lambda x: np.append(x[0], [x[1]])).collect()

toy_train_df = pd.DataFrame(toyTrainRDDtoPandas, columns=FIELDS_NEW) 
toy_test_df = pd.DataFrame(toyTestRDDtoPandas, columns=FIELDS_NEW) 

In [None]:
toy_train_df.head()

In [None]:
toy_test_df.head()

## 3. Algorithm Explanation

### Goals for this section:

#### PART A: Logistic Regression

Build a home grown implementation of Logisitc Regression using RDDs

#### PART B: Random Forest

Build a home grown implementation of Random Forest using Python. 

### Why did we choose these two models?

Given scalability concerns and the need for feature selection, we decided to explore two independent models to assess the optimal performance. We also wanted to identify algorithms that have a history of success in the Spark framework for binary classification. Logistic Regression and Decision Trees met all of these criteria. Logistic Regession is highly scalable and combined with regularization could aid in feature selection. Decision trees have similar benefits and additionally require little pre-processing and any prior feature selection. We continued down these parallel paths to compare the performance of these models.

### Baseline: 

Our baseline is at 74.38% (which is majority of class 0 in train dataset). We will be bechmarking our models' accuracy and performance results in comparison with this baseline. 

### PART A: Logisitc Regression

### 3.1 Homegrown implementation of Logistic Regression

In [None]:
# Define the baseline model
BASELINE_1 = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
#BASELINE_2 = np.array([meanClickthroughs, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

In [None]:
# Compute log loss for Logisitc Regression
def LRLoss(cachedRDD, W):
    augmentedData = cachedRDD.map(lambda x: (np.append([1.0], x[0]), x[1]))
    loss = augmentedData.map(lambda x: -x[1] * np.log(1/ (1 + np.exp(-(np.dot(x[0],W))))) - (1-x[1]) * np.log(1-(1/ (1 + np.exp(-(np.dot(x[0],W))))))).mean()
    return loss

In [None]:
#Using meanClickthroughs as our bias term improves loss significantly (it will be used in our baseline)

train_loss_baseline1 = LRLoss(normedRDD_train, BASELINE_1)
print("Train Loss using 0 for baseline", train_loss_baseline1)

train_loss_baseline2 = LRLoss(normedRDD_train, BASELINE_2)
print("Train Loss using meanClickthroughs as baseline", train_loss_baseline2)


In [None]:
# BASELINE = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

# np.zeros(47)
BASELINE = np.array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [None]:
# Compute the gradient for logistic regression (one step)
def GDUpdate(dataRDD, W, learningRate):
    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1])).cache()
    grad = augmentedData.map(lambda x: np.dot(x[0], ((1/ (1 + np.exp(-(np.dot(x[0],W))))) - x[1]))).mean()
    new_model = W - (learningRate * grad)
    return new_model

In [None]:
# Perform one Gradient Descent update with lasso regularization
def GDUpdate_Lasso(dataRDD, W, learningRate, regParam):

    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1])).cache()
    #new_model = None    
    wReg = np.sign(W)
    wReg[0] = 0  #set bias to zero
       
    grad = augmentedData.map(lambda x: np.dot(x[0], ((1/ (1 + np.exp(-(np.dot(x[0],W))))) - x[1]))).mean() + (wReg * regParam)
    new_model = W - (learningRate * grad)
    
    return new_model

In [None]:
new_model = GDUpdate_Lasso(toyTrainRDD, BASELINE, .1, .1)
print(new_model)

In [None]:
def predict_and_score(dataRDD, W, threshold):
    """Assess peformance of current models"""
    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1]))
        
    sc.broadcast(W)
    sc.broadcast(threshold)

    def predict_label(line):
        features = line[0]
        label = line[1]
        pred = None
        z = np.dot(features,W)
        prob = np.divide(1.0, (1.0 + np.exp(-z)))
        if prob > threshold:
            pred = float(1.0)
        else:
            pred = float(0.0)
        return label, pred
        
    def map_accuracy(line):
        label_actual = line[0]        
        label_pred = line[1]
        
        if label_actual == 1.0 and label_pred == 1.0:
            return "TP", 1.0
        elif label_actual == 1.0 and label_pred == 0.0:
            return "FN", 1.0
        elif label_actual == 0.0 and label_pred == 0.0:
            return "TN", 1.0
        else:
            if label_actual == 0.0 and label_pred == 1.0:
                return "FP", 1.0
    
    scores = augmentedData.map(predict_label).map(map_accuracy).reduceByKey(lambda x, y: x + y).collect()
    
    TP, FN, TN, FP = 0.0, 0.0, 0.0, 0.0

    for i in scores:
        if i[0] == 'TP':
            TP = i[1]
        elif i[0] == 'FN':
            FN = i[1]
        elif i[0] == 'TN':
            TN = i[1]
        else:
            if i[0] == 'FP':
                FP = i[1]

    if TP != 0.0:
        precision = TP/(TP + FP)
        recall = TP/(TP + FN)
        f1_score = 2*((precision*recall)/(precision+recall))
        accuracy = (TP+TN)/(TP+TN+FP+FN)
        false_positive_rate = FP/(FP+TN)
        
    else:
        precision = 0.0
        recall = 0.0
        f1_score = 0.0
        accuracy = 0.0
        false_positive_rate = 0.0

#     print("scores=", scores)
#     print("True Positives=", TP)
#     print("False Negatives=", FN)
#     print("True Negatives=", TN)
#     print("False Positives=", FP)
#     print("Precision=", precision)
#     print("Recall=", recall)
#     print("F1_score=", f1_score)
#     print('Accuracy=', accuracy)

    return accuracy, precision, recall, f1_score, false_positive_rate
    

In [None]:
updated_model = GDUpdate_Lasso(toyTrainRDD, BASELINE, .1, .1)

In [None]:
predict_and_score(toyTrainRDD, updated_model, threshold=.55)

In [None]:
def GradientDescent(trainRDD, testRDD, wInit, nSteps = 20, 
                    learningRate = 0.1, verbose = True):
    """
    Perform nSteps iterations of Logistic Regression gradient descent and 
    track performance metrics on the test and train set. 
    """
    # initialize lists to track model performance
    train_history, test_history, model_history = [], [], []
    accuracies, f1_scores, precisions, recalls, false_positive_rates = [], [], [], [], []
    
    # perform n updates & compute test and train loss after each
    model = wInit
    for idx in range(nSteps): 
        
        ############## CORE CODE #############
        
        new_model = GDUpdate(trainRDD, model, learningRate=0.1)
        training_loss = LRLoss(trainRDD, new_model)
        test_loss = LRLoss(testRDD, new_model)
        accuracy, precision, recall, f1_score, false_positive_rate = predict_and_score(toyTrainRDD, new_model, threshold=.55)
        
        model = new_model
        
        ############## (END) CORE CODE #############
        
        # keep track of metrics for plotting
        train_history.append(training_loss)
        test_history.append(test_loss)
        model_history.append(model)
        accuracies.append(accuracy)
        f1_scores.append(f1_score)
        precisions.append(precision)
        recalls.append(recall)
        false_positive_rates.append(false_positive_rate)
        
        # console output
        if verbose:
            print("----------")
            print(f"STEP: {idx+1}")
            print(f"training loss: {training_loss}")
            print(f"test loss: {test_loss}")
            print(f"Accuracy: {accuracy}")
            print(f"Precision: {precision}")
            print(f"Recall: {recall}")
            print(f"False Positive Rate: {false_positive_rate}")
            print(f"Model: {[round(w,3) for w in model]}")
    return train_history, test_history, model_history, accuracies, f1_scores, precisions, recalls, false_positive_rates

In [None]:
# run 50 iterations (RUN THIS CELL AS IS)
wInit = BASELINE
#trainRDD, testRDD = normedRDD.randomSplit([0.8,0.2], seed = 2018)
start = time.time()
logLossTrain, logLossTest, model_history, accuracy, f1_score, precision, recall, false_positive_rate = GradientDescent(toyTrainRDD, toyTestRDD, wInit, nSteps = 30)
print(f"\n... trained {30} iterations in {time.time() - start} seconds")

In [None]:
def plotErrorCurves(trainLoss, testLoss, title = None):
    """
    Helper function for plotting.
    Args: trainLoss (list of MSE) , testLoss (list of MSE)
    """
    fig, ax = plt.subplots(1,1,figsize = (16,8))
    x = list(range(len(trainLoss)))[1:]
    ax.plot(x, trainLoss[1:], 'k--', label='Training Loss') #trainLoss[1:]
    ax.plot(x, testLoss[1:], 'r--', label='Test Loss')

    ax.legend(loc='upper right', fontsize='x-large')
    plt.xlabel('Number of Iterations')
    plt.ylabel('Performance')
    if title:
        plt.title(title)
    plt.show()

In [None]:
LR_LogLoss_learningRate_point2 = plotErrorCurves(logLossTrain, logLossTest, title = 'Logisitic Regression LogLoss Trend')
LR_LogLoss_learningRate_point2

In [None]:
LR_LogLoss_lasso = plotErrorCurves(logLossTrain, logLossTest, title = 'Logisitic Regression LogLoss Trend')
LR_LogLoss_lasso

In [None]:
LR_Loss_original = plotErrorCurves(logLossTrain, logLossTest, title = 'Logisitic Regression LogLoss Trend')
LR_Loss_original

In [None]:
def plotPerformance(accuracy, f1_score, title = None):
    """
    Helper function for plotting.    
    """
    fig, ax = plt.subplots(1,1,figsize = (16,8))
    x = list(range(len(accuracy)))[1:]
    ax.plot(x, accuracy[1:], 'r--', label='Accuracy')
    ax.plot(x, f1_score[1:], 'k--', label='F1_score')
    #ax.plot(x, precision[1:], 'g--', label='Precision')
    #ax.plot(x, recall[1:], 'b--', label='Recall')

    ax.legend(loc='upper right', fontsize='x-large')
    plt.xlabel('Number of Iterations')
    plt.ylabel('Performance')
    if title:
        plt.title(title)
    plt.show()
    
    #precision, recall

In [None]:
LR_performance_learningRate_point2 = plotPerformance(accuracy, f1_score, title = 'Logisitic Regression Performance - learning rate .2')
LR_performance_learningRate_point2

In [None]:
LR_performance_lasso = plotPerformance(accuracy, f1_score, title = 'Logisitic Regression Performance - Lasso')
LR_performance_lasso

In [None]:
LR_performance_original = plotPerformance(accuracy, f1_score, title = 'Logisitic Regression Performance - Original')
LR_performance_original

In [None]:
def plotROCcurves(recall, false_positive_rate, title = None):
    """
    Helper function for plotting.    
    """
    fig, ax = plt.subplots(1,1,figsize = (16,8))
    #x = list(range(len(recall)))[1:]
    #y = list(range(len(false_positive_rate)))[1:]
    
    #ax.plot(false_positive_rate[1:], recall[1:], 'r--', label='False Positive Rate')

    ax.plot(false_positive_rate, recall, 'r--')

    #ax.plot(x, false_positive_rate[1:], 'r--', label='False Positive Rate')
    #ax.plot(y, recall[1:], 'k--', label='True Positive Rate')

    #ax.legend(loc='upper right', fontsize='x-large')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    if title:
        plt.title(title)
    plt.show()

In [None]:
ROC_learning_rate2 = plotROCcurves(recall, false_positive_rate, title = "ROC Curve - learning rate = .2")
ROC_learning_rate2

In [None]:
ROC_lasso = plotROCcurves(recall, false_positive_rate, title = "ROC Curve - Lasso")
ROC_lasso 

In [None]:
ROC_original = plotROCcurves(recall, false_positive_rate, title = "ROC Curve - original")
ROC_original 

### PART B: Random Forest

### 3.2 Random Forest (RF)

### 3.2.1 Random Forest Algorithm Details

Random Forest:
    
1. Why we chose RF for this data set?
2. Concepts of RF:
3. How does the home grown implementation work? + Flow diagram
4. Tuning parameters used
5. Cross validation

### 3.2.2 Homegrown Implementation of RF 

In [None]:
# Using transformed toy datasets from above as pandas Dataframes
toy_dataset = toy_train_df.values.tolist()
toy_test_dataset = toy_test_df.values.tolist()

print(toy_datatset[0])
print(toy_test_dataset[0])

In [None]:
# Load a CSV file
def load_csv(filename):
    dataset = list()
    with open(filename, 'r') as file:
        csv_reader = reader(file)
        for row in csv_reader:
            if not row:
                continue
        dataset.append(row)
    return dataset

# Convert string column to float
def str_column_to_float(dataset, column):
    for row in dataset:
        if row[column] == 'null':
            row[column] = None
        else:
            row[column] = float(row[column].strip())

# Convert string column to integer
def str_column_to_int(dataset, column):
    class_values = [row[column] for row in dataset]
    unique = set(class_values)
    lookup = dict()
    # useful if label column coming as categories
    for i, value in enumerate(unique):
        lookup[value] = i 
    for row in dataset:
        row[column] = lookup[row[column]]
    return lookup
 
# Split a dataset into k folds
def cross_validation_split(dataset, n_folds):
    dataset_split = list()
    dataset_copy = list(dataset)
    fold_size = int(len(dataset) / n_folds)
    for i in range(n_folds):
        fold = list()
        while len(fold) < fold_size:
            # gives one random number
            index = randrange(len(dataset_copy))
            # pop this random number from dataset
            fold.append(dataset_copy.pop(index))
        # add fold generated to list of folds
        dataset_split.append(fold)
    return dataset_split
 
# Calculate accuracy percentage
def accuracy_metric(actual, predicted):
    correct = 0
    for i in range(len(actual)):
        if actual[i] == predicted[i]:
            correct += 1
    return correct / float(len(actual)) * 100.0
 
# Evaluate an algorithm using a cross validation split
def evaluate_algorithm(dataset, algorithm, n_folds, *args):
    global flag
    folds = cross_validation_split(dataset, n_folds)
    scores = list()
    for i,fold in enumerate(folds):
        train_set = list(folds)
        train_set.remove(fold)
        # Converting list of lists to list
        # Ex. converts [[1],[2],[3]] to [1,2,3]
        train_set = sum(train_set, [])
        test_set = list()
        for row in fold:
            row_copy = list(row)
            # Setting the label to None in test set
            row_copy[-1] = None
            test_set.append(row_copy)
        trees, predicted = algorithm(train_set, test_set, *args)
        actual = [row[-1] for row in fold]
        accuracy = round(accuracy_metric(actual, predicted),4)
        scores.append(accuracy)
        # using these flags so that the graph will have the nodes of one tree only and will not merge mutliple trees
        if i == len(folds)-1 and flag==1:
            decisiontree= trees[0]
            visit(decisiontree, 0) 
            #graph.write_png('decision_tree_graph.png')  
            print("Sample Decision Tree Graph:\n\n", graph)
            flag = 2
            #print(yaml.dump(decisiontree, default_flow_style=False)) 
            print("Sample Decision Tree Nested Dictionary:\n\n", decisiontree)
    return scores
 
# Split a dataset based on an attribute and an attribute value
def test_split(index, value, dataset):
    left, right = list(), list()
    # index here is the column index in dataset
    # value is the column/feature value based on which split has to be done 
    for row in dataset:
        if row[index] < value:
            left.append(row)
        else:
            right.append(row)
    return left, right
 
# Calculate the Gini index for a split dataset
def gini_index(groups, classes): 
    # classes: number of output/label classes
    # number of split branches at a split 
    # ex. if groups = 2, meaning left and right node of the splits
    
    # count all samples at split point
    # total number of samples in left and right nodes 
    n_instances = float(sum([len(group) for group in groups]))
    # sum weighted Gini index for each group
    gini = 0.0
    for group in groups:
        size = float(len(group))
        # avoid divide by zero - if there are no samples in a node
        if size == 0: 
            continue
        score = 0.0
        # score the group based on the score for each class
        node_labels = [row[-1] for row in group]
        for class_val in classes:
            # count the no. of samples of class = 1,0 etc. 
            p = node_labels.count(class_val) / size
            # calculate unweighted gini index
            score += p * (1-p)
        # weight the group score by its relative size
        # calculate gini index by weighing by the number of instances
        gini += score * (size / n_instances)
    return gini
 
# Select the best split point for a dataset
def get_split(dataset, n_features):
    class_values = list(set(row[-1] for row in dataset))
    b_index, b_value, b_score, b_groups = 999, 999, 999, None
    features = list()
    while len(features) < n_features:
        index = randrange(len(dataset[0])-1)
        if index not in features:
            features.append(index)
    for index in features:
        # Get unique set of values for a feature (index)
        unique_values = set([row[index] for row in dataset])
        # use test split to get the distribution of nodes for each value 
        # get gini index of this test split 
        for value in unique_values:
            # groups here are the left and right node samples after splitting on the value
            groups = test_split(index, value, dataset)
            gini = gini_index(groups, class_values)
            if gini < b_score:
                b_index, b_value, b_score, b_groups = index, value, gini, groups
    # returns a node with the index with which it has to be split, value and two groups after split
    features_importance[str(FIELDS[b_index])] += float(b_score)
    return {'index':b_index, 'value':b_value, 'groups':b_groups}
 
# Create a terminal node value
def to_terminal(group):
    outcomes = [row[-1] for row in group]
    # set the label of the terminal node to be the dominant label of the node samples based on count
    return max(set(outcomes), key=outcomes.count)
 
# Create child splits for a node or make terminal
def split(node, max_depth, min_size, n_features, depth):
    left, right = node['groups']
    del(node['groups'])
    # check for a no split
    if not left or not right:
        node['left'] = node['right'] = to_terminal(left + right)
        return
    # check for max depth
    if depth >= max_depth:
        node['left'], node['right'] = to_terminal(left), to_terminal(right)
        return
    # process left child
    if len(left) <= min_size:
        node['left'] = to_terminal(left)
    else:
        node['left'] = get_split(left, n_features)
        split(node['left'], max_depth, min_size, n_features, depth+1)
    # process right child
    if len(right) <= min_size:
        node['right'] = to_terminal(right)
    else:
        node['right'] = get_split(right, n_features)
        split(node['right'], max_depth, min_size, n_features, depth+1)

# Build a decision tree
def build_tree(train, max_depth, min_size, n_features):
    root = get_split(train, n_features)
    # split is called on the root node to iteratively build the tree
    split(root, max_depth, min_size, n_features, 1)
    return root # this will have the entire tree
 
# Make a prediction with a decision tree
def predict(node, row):
    # node here will be the root node
    if row[node['index']] < node['value']:
        # if the left branch is a node
        # call predict again to pass row down that node
        if isinstance(node['left'], dict):
            return predict(node['left'], row)
        else:
        # else if it is terminal node, just return the label value
            return node['left']
    else:
        if isinstance(node['right'], dict):
            return predict(node['right'], row)
        else:
            return node['right']

# Create a random subsample from the dataset with replacement
def subsample(dataset, ratio):
    sample = list()
    n_sample = round(len(dataset) * ratio)
    while len(sample) < n_sample:
        index = randrange(len(dataset))
        sample.append(dataset[index])
    return sample
 
# Make a prediction with a list of bagged trees
def bagging_predict(trees, row):
    predictions = [predict(tree, row) for tree in trees]
    return max(set(predictions), key=predictions.count)

# Building a tree traversal and graph structure for visualization 
def visit(node, depth, parent = None):
    feature=node['index']
    feature = FIELDS[feature]
    feature_value=node['value']
    left_node = node['left']
    right_node = node['right']
    child_nodes = [left_node, right_node]
    
    if depth ==0:            
        node_root = "D"+str(depth)+"_"+str(feature)
    else:
        parent = FIELDS[parent]
        node_root = "D"+str(depth)+"_"+str(parent)
    
    # processing left_child
    if isinstance(left_node, dict):
        node_child = "D"+str(depth+1)+"_"+str(FIELDS[left_node['index']])
        graph.add_edge(pydot.Edge(node_root, node_child, label="L "+str(feature_value)))
        visit(left_node,depth+1, parent=left_node['index'])
    else:
        node_terminal = "D"+str(depth+1)+"_Class_"+str(left_node)
        graph.add_edge(pydot.Edge(node_root, node_terminal, label="L "+str(feature_value)))
    
    # processing right_child
    if isinstance(right_node, dict):
        node_child = "D"+str(depth+1)+"_"+str(FIELDS[right_node['index']])
        graph.add_edge(pydot.Edge(node_root, node_child, label='R '+str(feature_value)))
        visit(right_node,depth+1, parent=right_node['index'])
    else:
        node_terminal = "D"+str(depth+1)+"_Class_"+str(right_node)
        graph.add_edge(pydot.Edge(node_root, node_terminal, label="R "+str(feature_value)))


# Random Forest Algorithm
def random_forest(train, test, max_depth, min_size, sample_size, n_trees, n_features):
    trees = list()
    for i in range(n_trees):
        sample = subsample(train, sample_size)
        decisiontree = build_tree(sample, max_depth, min_size, n_features)
        trees.append(decisiontree)
    predictions = [bagging_predict(trees, row) for row in test]
    return(trees, predictions)


# get_accuracy
def get_accuracy(dataset, n_trees, n_folds,  max_depth, min_size):
    accuracy_list = []
    global flag
    global features_importance
    print("N-folds:", n_folds, " Max_depth:", max_depth, " Min size:", min_size, "\n")
    for n_trees in number_of_trees:
        start = time.time()
        scores = evaluate_algorithm(dataset, random_forest, n_folds, max_depth, min_size, sample_size, n_trees, n_features)
        print('\nTrees: %d' % n_trees)
        print('Scores: %s' % scores)
        accuracy = round((sum(scores)/float(len(scores))),4)
        accuracy_list.append(accuracy)
        print('Mean Accuracy: %.3f%%' % accuracy)
        print('Training time:', time.time()-start, ' seconds')
    # setting flag back to 1 for next call of get_accuracy
    flag = 1
    features_importance = sorted(features_importance.items(), key=operator.itemgetter(1))
    features_importance = [(item[0],round(item[1],2)) for item in features_importance]
    print('\nFeatures in decreasing order of importance:\n\nSummed gini index for a feature across all trees - lower the gini index the better\n\n', features_importance)
    features_importance = defaultdict(float)
    graph = pydot.Dot(graph_type='digraph')
    flag = 1

### 3.2.3 Training and Testing Homegrown RF on Toy Dataset

In [None]:
sample_size = 1.0
n_features = int(sqrt(len(dataset[0])-1))
number_of_trees = [1, 5, 10]#, 20, 30, 40, 50, 75,100,]#200]
features_importance = defaultdict(float)

print('\n Accuracy of RF on Toy Train Dataset:\n--------------------------------------\n')
get_accuracy(dataset = toy_dataset, n_trees = number_of_trees, n_folds = 3, max_depth = 7, min_size = 10)

print('\n Accuracy of RF on Toy Test Dataset:\n--------------------------------------\n')
get_accuracy(dataset = toy_test_dataset, n_trees = number_of_trees, n_folds = 3, max_depth = 7, min_size = 10)

# display sample decision tree built
Image(filename="decision_tree_graph.png")

### 3.2.4 Comparing RF Accuracy with various Tuning Parameters

In [None]:
#Plotting graphs 
plt.figure(figsize=(15,5),) 
plt.subplot(1,2,1)
plt.title("Accuracy of Random Forest vs Number of Trees \n Number of Folds = 3, Varying Depth, Fixed Min.Size")
plt.xlabel('Number of Trees')
plt.ylabel('Classification Accuracy')
plt.plot(number_of_trees, get_accuracy(dataset = toy_dataset, n_trees = number_of_trees, n_folds = 3, max_depth = 5, min_size = 20), color='red')
plt.plot(number_of_trees, get_accuracy(dataset = toy_dataset, n_trees = number_of_trees, n_folds = 3, max_depth = 7, min_size = 20), color='aqua')
plt.plot(number_of_trees, get_accuracy(dataset = toy_dataset, n_trees = number_of_trees, n_folds = 3, max_depth = 9, min_size = 20), color='darkorchid')
plt.plot(number_of_trees, get_accuracy(dataset = toy_dataset, n_trees = number_of_trees, n_folds = 3, max_depth = 11, min_size = 20), color = 'lime')
red_patch = mpatches.Patch(color='red', label='depth=5 min_size=20')
aqua_patch = mpatches.Patch(color='aqua', label='depth=7 min_size=20')
darkorchid_patch =  mpatches.Patch(color='darkorchid', label='depth=9 min_size=20')
lime_patch =  mpatches.Patch(color='lime', label='depth=11 min_size=20')
plt.legend(handles=[red_patch, aqua_patch, darkorchid_patch, lime_patch])
plt.subplot(1,2,2)
plt.title("Accuracy of Random Forest vs Number of Trees \n Number of Folds = 3, Depth=5,7, Varying Min.Size")
plt.xlabel('Number of Trees')
plt.ylabel('Classification Accuracy')
plt.plot(number_of_trees, get_accuracy(dataset = toy_dataset, n_trees = number_of_trees, n_folds = 3, max_depth = 5, min_size = 20), color='red')
plt.plot(number_of_trees, get_accuracy(dataset = toy_dataset, n_trees = number_of_trees, n_folds = 3, max_depth = 5, min_size = 10), color='aqua')
plt.plot(number_of_trees, get_accuracy(dataset = toy_dataset, n_trees = number_of_trees, n_folds = 3, max_depth = 7, min_size = 20), color='darkorchid')
plt.plot(number_of_trees, get_accuracy(dataset = toy_dataset, n_trees = number_of_trees, n_folds = 3, max_depth = 7, min_size = 10), color = 'lime')
red_patch = mpatches.Patch(color='red', label='depth=5 min_size=20')
aqua_patch = mpatches.Patch(color='aqua', label='depth=5 min_size=10')
darkorchid_patch =  mpatches.Patch(color='darkorchid', label='depth=7 min_size=20')
lime_patch =  mpatches.Patch(color='lime', label='depth=7 min_size=10')
plt.legend(handles=[red_patch, aqua_patch, darkorchid_patch, lime_patch])

### 3.2.5 RF Performance and Accuracy on Toy Dataset

Notes on tuning parameters:
 
1. How long it takes to train 
2. What is the accuracy compared to baseline?
3. How does accuracy change with various tuning parameters?

### 3.2.6 Scaling Random Forest

Scaling RF:
    
4. Why is RF suitable for scaling?
5. How to scale RF?
6. Potential challenges while sclaing:
7. Decision to use homegrown RF on toy but use Spark ML on full 

## 4. Algorithm Implementation - Using Full Dataset

### PART A: DATA TRANSFORMATION USING DATAFRAMES

### 4.1 Data Transformation Pipeline 

Notes: How EDA data tranformations decisions were used for building pipeline?

In [None]:
# Code

### PART B: LR AND RF WITH SPARK ML

### 4.2 LR in Spark ML

In [None]:
# Code

### 4.3 RF in Spark ML

In [None]:
# Code

### 4.4 Comparing Models

Notes on Models:
    1. Model training time details for LR and RF
    2. Comparing accuracy for LR and RF 

## 5. Application of Course Concepts

### 5.1 Course Concepts:

First List: 
1. Scalability
2. Associative and Commutative 
3. No state dependency 
4. Algorithms chosen with this criteria
5. Bias/Variance Tradeoff 
6. Regularization
Updated:
1. Scalability
2. Associative/Commutative
3. Statelessness
4. GD - batch - Kim?
5. Sparse representation - arvindh?
6. Presenting the graph structure - anusha?
7. One Hot encoding - Christina?
8. Assumpations for Algorithsm - kim, anusha
9. implications of iterative algorithms - arvindh spark vs hadoop

Bias Variance Tradeoff
Given the large number of features in this dataset, we needed to be careful not to overfit our data. A very complex model does not generalize well (high variance) while an overly simple model gives too much credit to the data itself (e.g., high bias: we believe too much in our data to do the work for us). These factors are introduced in model selection. We therefore explored two models (Logistic Regression and Decision Trees) that each allow us to identify important features and also adjust for complexities to avoid overfitting. In Logistic Regression, we used Lasso Regularization to avoid overfitting and identify unimportant features (eg. features where the weights are driven to zero). Decision Trees can also be made simpler or more complex by adjusting the depth of the trees, and has specific tools to identify important features. Together these tools helped us to optimize our models.

Normalization:
Given that the features in the dataset are on widely different scales, we needed to standardize the data to avoid lending over importance to features with bigger values. This was more important for the logistic regression model as decision trees opened on splits of each variable rather than a weighting of variables relative to one another.

Map Reduce Design/Broadcasting/Lazy Evaluation
The map reduce design lends itself very well to this problem. Given that we can compute each record as a pair RDD (label and values), and that the operations are associative and commutative, each row can be computed independently. We can cache our RDDs and use Spark’s functional programming design and lazy evaluation to avoid multiple passes over the data at each step. In linear regression, broadcasting values such as medians (which cannot be obtained in the RDD) helps us to optimize the process as well.

### 5.2 Conclusion:
Accuracy of Models and Performance Notes:
    
### 5.3 Future Ideas for Implementation:

**1. Correlation between features**
In a logistic regression context, features should not be highly correlated with each other. We saw that in our case a lot of features were highly correlated. However, we didn't make any feature exclusions based on multicollinearity, since we focused more on our tree-based algorithm that is not affect by feature correlation. As a future improvement step, we would remove those highly correlated features before feeding into the LR model. 

**2. Imbalance of labels**
Our classification labels were imbalanced (75% label=0, 25% label=1). This could potentially decrease our model performance. A future step that our team would take to improve performance would be to balance the classes using bootstrap random sampling. 

**3. Outliers & Skewness**
One noticeable issue that could be addressed to enhance performance would be to treat some extreme outliers. When looking at our numeric feature distributions we noticed some high positive skews. One way to deal with this would be to put a 'ceiling' on the values of features that would not exceed 3 standard deviations of each variable. Additionally, we would like to try taking the log of the highly skewed numeric features to smooth out the skewed distributions. 

**4. Columns/rows with large % of missing values**
Finally, we noticed that a few variables had a high % of missing values. Due to the fact that we dealt with missing values in our categorical feature transformations, we decided to not make any column exclusions. However, it would be a good idea to exclude those columns and even rows with a certain threshold percentage of missing values and evaluate model performance. It is likely that our performance could be improved by doing so. 