# Final Project:  Predicting Allstate Claim Loss Value

### W261 / Spring 2020 / Team One 

#### Alex West, Sarah Danzi, John Boudreaux

Data for this report was provided by Kaggle in the Allstate Claims competition: https://www.kaggle.com/c/allstate-claims-severity

In [2]:
# Import packages used in this notebook.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.types import *
import pyspark.sql.functions as F
import seaborn as sns

from pyspark.ml.linalg import Vectors, DenseMatrix
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
from pyspark.ml.feature import PCA as ps_PCA
from pyspark.ml.regression import LinearRegression as ps_linreg
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor as ps_rf

from collections import defaultdict
from sklearn.pipeline import Pipeline as skPipeline
from sklearn.preprocessing import OneHotEncoder 
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression, Lasso
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_absolute_error
from sklearn import linear_model
sns.set_style("whitegrid")

# Variable Definitions
unpacked_data_dir  = 'dbfs:/user/john.boudreaux@ischool.berkeley.edu/FINAL_PROJECT/'   # Directory where data exists
random_seed = 2020                                                                     # Random seed 

In [3]:
# Read the training data into a Spark dataframe
s_df = spark.read.format('csv').options(header='true', inferSchema='true').load(unpacked_data_dir+"train.csv")

## Question Formulations

The cost of an insurance claim may be measured in dollars, but carries a much more significant impact to those affected. An automated method of predicting the cost - or severity - of a claim could save time, money, and distress for both the insurer and the insured. To explore potential methods, Allstate released a competition on Kaggle to create an algorithm to accurately predict the cost of a claim. The purpose of this competition was twofold for Allstate: one, to gain access to innovative ideas, and two, to recruit talented individuals. 

For the purposes of this project, our goals are slightly different. While we are interested in the performance of our models, we are also interested in exploring the different systems available to accomplish the task. The data from the Kaggle competition are neatly packaged and big enough to train a model, but not big enough to require distributed computing. In the real world, however, data are never so perfect, and especially for a large company like Allstate, so small. Therefore, through the course of this project we are using the Allstate data and Kaggle competition objective to explore the difference between two data science tools: **scikit-learn** (or sklearn) for smaller non-distributed data, and **Spark** for large distributed data sets. 

The data remain the same, with attribute columns pertaining to properties of a claim filed with Allstate. The outcome variable `loss` is a dollar amount requested by the insured to the insurer. This analysis is critical to the insurance industry; accurately predicting a claim value would allow the company to allocate resources effectively, and potentially predict resource needs at a grander scale. It could also allow a company to predict the total claim value of a customer over the course of their engagement with the company, and evaluate risk. 

A useful model would need to be able to predict the value of a claim within a predetermined order of magnitude. For the purposes of the competition that order of magnitude could be fairly high, say $1,000. In reality, multiplied over millions of customers, this performance may not be adequate. With this in mind, we are inclined to pay close attention to the distribution of the data (in particular the outcome variable `loss`) and perform transformations where appropriate.

To measure the success of our model, we will use Mean Absolute Error (or MAE). This is a prediction problem, not classification, and therefore other metrics of success (accuracy, recall, precision, etc) are meaningless unless we change the structure of the data. We care about the distance from the actual amount, and therefore could use either Mean Absolute Error `MAE = mean(abs(y - y_pred))` or Mean Squared Error `MSE = mean((y - y_pred)^2)`. Both MAE and MSE express average model prediction error in units of the variable of interest, but MSE penalizes large errors more (so if being off by 10 is more than twice as bad as being off by 5, use MSE). In this case, we don't have any indication that error severity increases with dollar amount, and we have another big advantage with MAE: interpretability. And finally, it is the same metric used by the Kaggle competition so we will be able to compare our results to a larger group.

Our notebook will first start with exploratory data analysis, then move to feature engineering and algorithm exploration, discuss performance with algorithm implementation, and conclude with a discussion of the capabilities of Spark and sklearn.

## Exploratory Data Analysis & Discussion of Challenges 

Identifying an approach for predicting Allstate claim loss begins by examining the pool of available data.  This exploratory data analysis activity has several overarching goals:

  * Understand the scale, shape, and distribution of the data.
  * Preliminarily identify features for use.  
  * Store the data, inclusive of any transformations/corrections, for further analysis and scalable processing.

This report addresses each of these steps in sequential order with inline code and visualizations to fully characterize the dataset.  

#### Scale, Shape, & Distribution of Data

We perform our exploratory data analysis on the `train.csv` file provided by Allstate.  The file is 0.065 GB and contains 188,318 rows and 132 columns. Of the 132 columns, 130 are features that describe each observation, or row.  The two remaining columns are `id`, a unique identifier for each observation, and `loss`, identified by the dataset's metadata as the outcome variable. 

Our first observation is thus that the data set is quite small by big data standards.  This, in theory, would allow us to explore and develop predictive models using non-parallelized computing options.  However, we recognize that given the source of the dataset, Kaggle, this training dataset may represent only a subset of the company's actual data.  Thus, we will want to pursue developing both a parallelized ML solution and a non-parallelized ML solution to offer options to the business.  Doing this will also allow us to compare and contrast the solutions to develop statements on efficiency and resource utilization for our Customer.

In [7]:
# Print training filename and size
for item in dbutils.fs.ls(unpacked_data_dir + "train.csv"):
  print("*  File:", item.name)
  print("   Size:", item.size / 2**30, "GB")

# Print shape of the data 
print("\n* Shape:\n   Rows:", s_df.count(), " Columns:", len(s_df.columns))

# Print dataset schema
print("\n*  Dataset Schema:")
print(s_df.printSchema())

# Preview the values of one row (e.g., one datapoint)
print("\n*  Row One:\n", s_df.head())

We next test the dataset to identify if there are any missing values or obvious integrity issues.  Luckily, no such issues are identified:  each cell of the dataset is populated.

In [9]:
# Determine if any values are missing
print("* Count the number of missing values in each column:")
s_df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in s_df.columns]).toPandas()

Unnamed: 0,id,cat1,cat2,cat3,cat4,cat5,cat6,cat7,cat8,cat9,cat10,cat11,cat12,cat13,cat14,cat15,cat16,cat17,cat18,cat19,cat20,cat21,cat22,cat23,cat24,cat25,cat26,cat27,cat28,cat29,cat30,cat31,cat32,cat33,cat34,cat35,cat36,cat37,cat38,cat39,...,cat92,cat93,cat94,cat95,cat96,cat97,cat98,cat99,cat100,cat101,cat102,cat103,cat104,cat105,cat106,cat107,cat108,cat109,cat110,cat111,cat112,cat113,cat114,cat115,cat116,cont1,cont2,cont3,cont4,cont5,cont6,cont7,cont8,cont9,cont10,cont11,cont12,cont13,cont14,loss
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


Of the 130 feature columns, Spark inferred upon read-in that 116 of the columns are of type `string`.  For the columns of type `string`, all are  named `catXXX`, where `XXX` is a unique numeric identifier.  Given this information, We presume these features to be categorical in nature and seek to confirm our belief.  The plot shown below quantifies the distribution of unique values that exist within each column.  Seventy-five percent of the columns have four unique values or less;  at least half of the columns have only two unique values.  This distribution corresponds with a belief that these columns are indeed categorical.  Given this distribution, we will likely want to strongly consider one-hot encoding emerges as part of our feature engineering effort.  

Several columns, however, have a very high number of unique values:  for example, column `cat116` has 326 unique values and column `cat110` has 131.  Given that we do not have any interpretable meaning for these columns, we withhold making any assumptions of whether one-hot encoding categorical variables with such a large number of categories is of value.  However, we highlight their presence in the dataset during exploratory analysis as future investigation that weighs their predictive benefit to a solution may be warranted to increase execution times and system resource demands.

In [11]:
# Identify the categorical column names
cat_cols = [w for w in s_df.columns if 'cat' in w]

# Identify the continuous valued column names
cont_cols = [w for w in s_df.columns if 'cont' in w]

# Filter dataframes into categorical features  and continuous features
s_cat_df = s_df.select(*cat_cols)
s_cont_df = s_df.select(*cont_cols)

# Plot the distribution of categorical feature values
cat_df = s_cat_df.toPandas()
fig, ax = plt.subplots(figsize=(8,3))
sns.distplot(cat_df.nunique(), bins = 100, kde=False, rug=False)
ax.set_title('Categorical Features')
ax.set_ylabel('Quantity')
ax.set_xlabel('Number of Unique Values in a Column')
display(fig)

In [12]:
print("Statistical details of the distribution:")
print(cat_df.nunique().describe())

print("\nCategorical features with more than two categories:", )
print(cat_df[cat_df.columns[np.where(cat_df.nunique() > 2)]].nunique())

print("\nData for the categorical features plot:")
print(cat_df.nunique().value_counts())

Spark inferred 14 columns to be of type `double`, all of which are named `contXX`, where `XX` is a unique numeric identifier.  We thus presume these features to be continuous numeric data and plot each to examine its distribution of data.  The visualizations highlight a few interesting characteristics of these columns.  First, all of the plots show that the each column contains values that are all positive and bound between zero and one.  This is convenient for us as we will not need to take corrective action to either center or scale the data.  It is noteworthy, however, that many of our continuous features are not normally distributed.  This isn't necessarily a problem, but could have consequences downstream depending on our solution selection.

In [14]:
# Plot the continuous-valued columns
fig, axes = plt.subplots(5, 3, figsize=(12, 8), sharex=True)
for ax, feature in zip(axes.flat, cont_cols):
    sns.distplot(s_df.select(feature).toPandas(), ax=ax, kde=False, rug=False)
    ax.tick_params(labelsize=7)
    ax.set_xlabel(feature, fontsize=9)

# Blank out the last graph
axes[4][2].set_xticks([])
axes[4][2].set_yticks([])

fig.suptitle('Continuous Feature Distributions', fontsize=12)
display(fig)

In [15]:
# Examine the continuously valued data.
s_cont_df.describe().show(truncate=False, vertical=True)

The last column we need to explore is our outcome variable, `loss`.  The variable looks to be a continuous-valued positive number, ranging from 0.67 to 121,012.25.  The plot below displays the heavy-tailed distribution of data.  The variable has a very wide range of values with some high outliers given that the mean is greater than the median.  We may want to consider filtering out the extremeley high valued data points and treating them as outliers or or using a log-transform to make them closer to the same scale. We lean towards the latter since we do not have greater insight into the dataset to justify grooming the data, however our ultimate algorithm choice will drive our decision.

In [17]:
# Plot the outcome variable, loss
fig, ax = plt.subplots(figsize=(12,4))
sns.distplot(s_df.select('loss').toPandas(), bins = 100, kde=False, rug=False)
ax.set_title('Distribution of the Outcome Variable')
ax.set_ylabel('Number of Claims')
ax.set_xlabel('Loss')
mean = s_df.select(F.mean("loss")).collect()
median = s_df.approxQuantile("loss", [0.5], 0.05)
ax.axvline(mean[0]["avg(loss)"], color='red', linestyle='--')
ax.axvline(median[0], color='green', linestyle='--')
plt.legend({'Mean':mean,'Median':median})
display(fig)

#### Preliminarily identify features for use

The column labels and values for the dataset features unfortunately do not provide clues on the meaning of the data contained therein:  they appear deliberately obfuscated.  While this is not a hindrance to designing a predictive machine learning model, it will prevent us from making domain-based decisions on which features to include or how to design interaction terms.  We will need to rely on mathematically-based dimensionality reduction techniques for any model implementation selected. 

We still need to understand the presence or absence of multicollinearity amongst our continuous features though.  Highly correlated features can increase standard error in regression solutions, so we need to know if we should consider future corrective action.  The heatmap below visualizes the correlation matrix for continuous features.  We observe a few instances here that are rather high in magnitude, most notably between `cont1` and `cont9`, with a correlation coefficient of 0.92.  We note this occurrence as we will likely want to design a solution that mitigates this multicollinearity.

In [19]:
# Convert each row's continuous valued features to a vector 
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=s_cont_df.columns, outputCol=vector_col)
df_vector = assembler.transform(s_cont_df).select(vector_col)

# Calculate the correlation matrix
corr_df = Correlation.corr(df_vector, vector_col).toPandas()
dense_matrix = corr_df.iloc[0]['pearson(corr_features)']
rows = dense_matrix.toArray().tolist()
pdf = pd.DataFrame(rows, columns=s_cont_df.columns, index=s_cont_df.columns)

# Generate a mask for the upper triangle
mask = np.triu(np.ones_like(pdf, dtype=np.bool))
fig, ax = plt.subplots(figsize=(8, 6))
cmap = sns.diverging_palette(220, 10, as_cmap=True)
sns.heatmap(pdf, mask=mask, cmap=cmap, vmax=.9, center=0, square=True, linewidths=.5, cbar_kws={"shrink": .5})
ax.set_title('Correlation Matrix of Continuous Valued Features')
display(fig)

In [20]:
print("Correlation Matrix Values")
pdf

Unnamed: 0,cont1,cont2,cont3,cont4,cont5,cont6,cont7,cont8,cont9,cont10,cont11,cont12,cont13,cont14
cont1,1.0,-0.08518,-0.445431,0.367549,-0.02523,0.758315,0.367384,0.361163,0.929912,0.808551,0.59609,0.614225,0.53485,0.056688
cont2,-0.08518,1.0,0.455861,0.038693,0.191427,0.015864,0.048187,0.137468,-0.032729,0.063526,0.116824,0.10625,0.023335,-0.045584
cont3,-0.445431,0.455861,1.0,-0.341633,0.089417,-0.349278,0.097516,-0.185432,-0.417054,-0.325562,0.025271,0.006111,-0.418203,-0.039592
cont4,0.367549,0.038693,-0.341633,1.0,0.163748,0.220932,-0.115064,0.52874,0.328961,0.283294,0.120927,0.130453,0.179342,0.017445
cont5,-0.02523,0.191427,0.089417,0.163748,1.0,-0.14981,-0.249344,0.009015,-0.088202,-0.064967,-0.151548,-0.148217,-0.082915,-0.021638
cont6,0.758315,0.015864,-0.349278,0.220932,-0.14981,1.0,0.658918,0.437437,0.797544,0.883351,0.773745,0.785144,0.815091,0.042178
cont7,0.367384,0.048187,0.097516,-0.115064,-0.249344,0.658918,1.0,0.142042,0.384343,0.492621,0.747108,0.742712,0.288395,0.022286
cont8,0.361163,0.137468,-0.185432,0.52874,0.009015,0.437437,0.142042,1.0,0.452658,0.336588,0.302381,0.315904,0.476402,0.043539
cont9,0.929912,-0.032729,-0.417054,0.328961,-0.088202,0.797544,0.384343,0.452658,1.0,0.785697,0.608,0.626656,0.642028,0.074154
cont10,0.808551,0.063526,-0.325562,0.283294,-0.064967,0.883351,0.492621,0.336588,0.785697,1.0,0.702896,0.713812,0.707876,0.041808


#### Store the data, inclusive of any transformations/corrections, for further analysis and scalable processing

We choose to store our initial CSV dataset as a parquet file. This should provide us with several advantages moving forward:
  * It will eliminate any future overhead of reading in CSV files. 
  * It will greatly reduce the size of data we store on disk and, consequently, reduce network shuffling when we perform distributed processing
  * It is well suited to what we envision as a batch operation for the business:  We do not anticipate updates to this model to be done with streaming data.  Thus, parquet is a smarter choice than avro.  

We confirm the savings in storage resources immediately:  whereas our CSV initially was initially 0.065 GB, the new parquet file is 0.01 GB.  The new parquet files require only 15% of the storage that the original CSV file did.

In [22]:
# Write the CSV file to disk in Parquet format
s_df.write.format("parquet").save(unpacked_data_dir+"train.parquet")

# Print training filename and size
sum = 0
for item in dbutils.fs.ls(unpacked_data_dir + "train.parquet"):
  sum += item.size
print("* Summed Size of Parquet Files:", sum / 2**30, "GB")

# Feature engineering

As indicated in our analysis above, the column names are coded and their meaning obscured. This has both advantages and disadvantages: we are unlikely to be biased in dropping variables that we believe have nothing to do with claim value but may have influence, but on the other hand our model may be crowded with multiple variables that do little to explain variance in the outcome variable `loss`. In practical terms, we will therefore not be dropping any columns - but we will be performing feature engineering to improve the performance of any model.

For the categorical variables described above, depending on the algorithms we explore we have several options. 

First is one-hot encoding - where each categorical value is given a new column and a binary representation for presence or absence. This increases the dimensionality of the dataset exponentially, however, and works best with dimensionality reduction or regularization. There are a variety of methods to accomplish this, which we will explore in the next section. 

Second is Breiman’s method (for use in decision tree algorithms) - with the number of possible categorical values reaching 326 for some of our variables, Breiman’s method will help us find the best split for the categorical attribute without evaluating all possible subsets. In essence, Breiman’s method takes all possible categorical values and sorts them by average `y` value. So if there are 10 possible categorical values for one variable, Breiman’s method makes the splits based on sorting the average `loss` per category. This method of partitioning based on the mean behavior in the `y` value drastically reduces the computation needed. We will explore this in the next section as well.

Finally, we could eliminate the correlated features that were identified above, but we will address this during modelling with dimensionality reduction techniques or regression regularization.

#### sklearn Feature Engineering

In [26]:
# Read the training data into a pandas dataframe
df_train = pd.read_csv("/dbfs/user/john.boudreaux@ischool.berkeley.edu/FINAL_PROJECT/train.csv")

# Create a dataframe, filtered to remove the unique identifier and the loss columns
df_train_x = df_train.loc[:, (df_train.columns != 'loss') & (df_train.columns != 'id')]

# Because the only data we have labels for is the training dataset, split it into test/train datasets
x_train, x_test, y_train, y_test = train_test_split(df_train_x, df_train.loss, test_size=0.2, random_state=random_seed)

# Log transform the outcome variable.
y_train_log = np.log(y_train)
y_test_log = np.log(y_test)


################################ LINEAR / LASSO TRANSFORMATIONS ################################ 

# Perform one-hot encoding on categorical features for Lasso & Linear Regression.
categorical_cols = df_train.columns[['cat' in val for val in df_train.columns.values]]
categorical_transformer = skPipeline(steps=[('ohe', OneHotEncoder(sparse=False, handle_unknown='ignore'))])

# Any features that do not have a specific transformer, passthrough
preprocessor = ColumnTransformer(transformers=
                                 [('categorical', categorical_transformer, categorical_cols)],
                                 remainder="passthrough")

########################## RANDOM FOREST REGRESSOR TRANSFORMATIONS #############################

# Class to transform categorical data using Brieman's Method 
class MyEncoder(BaseEstimator, TransformerMixin):
  
  def __init__(self, columns):
    self.columns = columns
    self.encoders = dict.fromkeys(columns, None)
    
  def transform(self, X, y=None):
    # Using the dictionary of Brieman values, convert categorical entries to numbers
    res = pd.DataFrame()
    for column in X:
      if column != 'y':      
        res[column] = X[column].map(self.encoders[column])
    return res
  
  def fit(self, X, y = None):
    # Build the dictionary of Brieman values for each column's unique categorical values
    y_median = np.median(X['y'])   # For any key we may see that we have not seen, set its value to the median of y
    for column in X:
      xag = X.groupby([column])['y']
      dd = defaultdict(lambda: y_median)
      for name, group in xag:
        dd[name] = np.median(group)
      self.encoders[column] = dd.copy()
      dd = None
    return self

# To transform using Brieman's method, we need the y variable to be passed as part of the data to the 
# pre-processing transformer.  We will remove it once Brieman's method has been applied.
x_all = x_train.copy()
x_all['y']  = y_train
    
mergedlist = ['y']
mergedlist.extend(categorical_cols)

randforest_preprocessor = skPipeline(steps = [('brieman', MyEncoder(mergedlist))])

# Fit and transform the column transformer for Linear/Lasso (e.g., column feature engineering) based on the training data.
df_train_x_preprocessed = preprocessor.fit_transform(x_train)

# Fit and transform the transformer for Random Forest 
df_train_x_rfpreprocessed = randforest_preprocessor.fit_transform(x_all)

# Print out raw data dataframe shape and transformed data dataframe shapes
print("                        Raw Dataframe Size:", x_train.shape)
print(" OLS/Ridge/Lasso, Post-Feature Engineering:", df_train_x_preprocessed.shape)
print("   Random Forest, Post-Feature Engineering:", df_train_x_rfpreprocessed.shape)

### PySpark Feature Engineering

In [28]:
# convenience function for one-hot encoding
def stageCreator(inputDf):
  stages = []
  cat_cols = [w for w in s_df.columns if 'cat' in w]
  cont_cols = [w for w in s_df.columns if 'cont' in w]

  for cc in cat_cols:
    stringIndexer = StringIndexer(inputCol = cc, outputCol = cc + '_index')
    encoder = OneHotEncoderEstimator(inputCols = [stringIndexer.getOutputCol()], outputCols = [cc + "_vec"])
    stages += [stringIndexer, encoder]

  assembler_inputs = [c + "_vec" for c in cat_cols] + cont_cols

  assembler = VectorAssembler(inputCols = assembler_inputs, outputCol = "features")
  stages += [assembler]
  
  return stages

In [29]:
# for ease with working with pyspark API
s_df = s_df.withColumn('label', s_df.loss)

s_train, s_test = s_df.randomSplit([0.8, 0.2], seed=random_seed)

linear_pipe_encoding_stages = stageCreator(s_train)

# while we could run these pipeline steps for each linear/ridge/lasso model, 
# we can also pre-run them and have them set as an input dataFrame for each to save time

lin_pipeline = Pipeline(stages = linear_pipe_encoding_stages).fit(s_train)
s_df_train_onehot = lin_pipeline.transform(s_train)
s_df_test_onehot = lin_pipeline.transform(s_test)


While we did attempt to write a true pipeline step in the PySpark API to include, there were some particularly tricky aspects that prevented our group from getting them done in time. The first implementation on a private cluster (1 master node, 1 worker) took 19 hours to encode due to joins occuring per column to keep consistent with the API of having a single transformer for a single column.

In order to ensure that we had some sort of pipeline to continue a real apples-to-apples comparison between PySpark and Sklearn, we use our Breiman encoded data from our Pandas dataframe and parallelize it into a Spark dataframe.

In [31]:
# create a pyspark df of our Breiman encoding

# need the loss variable
df_train_temp = df_train_x_rfpreprocessed
df_train_temp['loss'] = y_train

s_df_train_rf = spark.createDataFrame(df_train_temp)
s_df_train_rf = s_df_train_rf.withColumn('label', s_df_train_rf.loss)

### Algorithm Exploration
Given that we will retain the entire feature set due to its obfuscated meaning and the fact that we will be predicting MAE, a continuous value, we pair a form of dimensionality reduction with four regression-based approaches to initially explore with our dataset.  

**Linear Models:  Ordinary Least Squares (OLS) , Ridge, and Lasso**

We first begin by using an **OLS** regression approach as it provides a simple, but often very effective means by which to predict continuous valued outcomes and guide more nuanced regression approaches (ISLR Chapter 3).  In addition, in its basic form and many of its advanced forms, it is a scalable approach to large data sets.  In fact, Spark's scalable machine learning library, MLLIB, provides native support for the algorithm and many of its permutations, making it an optimized choice for large data set learning and prediction.  


We pair the initial OLS approach with principal component analysis (PCA) to address the large feature space resulting from our inability to apply domain knowledge to downselect features and our use of one-hot encoding for the 116 categorical variables.  Recall from our feature engineering that the application of one-hot encoding expanded our number of feature columns from 130 to 1133 and left us with a highly sparse dataset.  This *curse of dimensionality* most often introduces noise into a model's predictive abilities and PCA is a statistical aproach that will help reduce our feature set to those that provide the strongest signal for our outcome (ISLR Chapter 6).  To preliminarily understand the impact of reduction on the algorithm, we will perform a gridsearch on the optimal number of principal components to include in the regression.

To build on OLS, we will then examine if we see improved predictive performance with a **Ridge Regression** approach.  Ridge regression will introduce L2 regularization to our base OLS approach, providing a means by which we can reduce potential negative effects of the mild multicollinearity we identified in the EDA section and by which we can trade a small increase in bias for a larger decrease in variance due to the large number of features in our one-hot encoded dataset (ISLR Chapter 6).  We will similarly apply **Lasso Regression** to the dataset to see if L1 regularization is more performant.  For this algorithm, we will remove PCA preprocessing and allow the L1 penalty to self-identify the features that matter most.  Lasso regression will essentially remove features it deems uninformative to our outcome variable by zeroing the associated feature's coefficient (ISLR Chapter 6). For both the Ridge and Lasso Regression approaches, we will use gridsearch to examine several alpha values and gain a cursory understanding of the performance we might expect to see should we fine-tune the approaches.

** Tree Models:  Random Forest Regression **

Finally, we plan to vet how **Random Forest Regression** performs with our dataset.  Random Forest, an ensembling approach to decision trees, is highly desirable given our dataset.  Besides being an iterative algorithm and innately scalable, it allows us to almost naturally explore interaction terms in our data thru the sequential splits in each tree:  given the obfuscated nature of our data, this is advantageous since we were not able to apply domain knowledge to the dataset to derive or explore such possible interactions.  In addition, because a random forest is made up of many trees, grown independently, with only a subset of columns, the algorithm also attempts to mitigate against multicollinearity, a possible problem we noted.  Finally, from an implementation perspective, the algorithm only requires us to tune a handful of hyperparameters, making it an approach we should be able to fine tune with confidence.  

To explore the algorithm, we will apply Breiman's method to transform the categorical variables into numeric values.  This should provide an advantage over one-hot encoding which greatly exploded the number of columns in our dataset.  To explore the variation we may see with tuning, we will gridsearch on max_depth.

** Baseline Expectations ** 

To establish our expectations for these algorithm choices, we take the median value of the outcome variable, MAE, in the dataset.  With a value of **2115.57**, this is our crudest baseline estimate.  We also examine the the Kaggle leaderboard to understand the baseline results of a fine-tuned model:  the result is **1109.70772** (https://www.kaggle.com/c/allstate-claims-severity/leaderboard).  These two values establish a watermark by which we can understand the results of our algorithm exploration.


** Results ** 

Using a cross-validated gridsearch on a single hyperparamter for each approach, we record our best results as follows:
  - OLS Regression:  1303.017143
  - Ridge Regression:  1303.080524
  - Lasso Regression:  1296.568151
  - Random Forest Regression:  1212.194926
  
All of the algorithms far outperform our crude estimate of a mean and Random Forest Regression, by far, outperforms the other algorithms.  The plots below depict the performance of each approach.  Based on this data, we will choose to fine tune and implement a Random Forest Regression solution since it gave the best MAE on a limited hyperparameter sweep.

#### Spark Algorithm Exploration

In [34]:
# define helper functions for extracting models from pyspark cross-validated model objects

def extract_crossval_info_ridgelasso(cvModelObj):
  # first extract the mean of the MAE's
  # extracts the regularization parameter
  maes = []
  folds = []
  params = []
  
  i = 1
  for fold in cvModelObj.subModels:
    for model in fold:
      folds.append(i)
      maes.append(model.stages[-1].summary.meanAbsoluteError)
      params.append(model.stages[-1]._java_obj.getRegParam())
    i += 1
  
  df = pd.DataFrame({'fold' : folds, 'mae' : maes, 'param' : params})
  
  return(df)

def extract_crossval_info_linear(cvModelObj):
  # first extract the mean of the MAE's
  # extracts the PCA component number
  maes = []
  folds = []
  params = []
  
  i = 1
  for fold in cvModelObj.subModels:
    for model in fold:
      folds.append(i)
      maes.append(model.stages[-1].summary.meanAbsoluteError)
      params.append(model.stages[-2]._java_obj.getK())
    i += 1
  
  df = pd.DataFrame({'fold' : folds, 'mae' : maes, 'param' : params})
  
  return(df)

def create_lineplot(cvModelObj, title, param, type_model):
  # creates a line plot for the average MAE for each parameter value
  
  # pull out appropriate parameter from functions defined above
  if type_model == 'linear':
    info = extract_crossval_info_linear(cvModelObj)
  elif type_model == 'ridgelasso':
    info = extract_crossval_info_ridgelasso(cvModelObj)
    
  grouped = info.groupby('param').mean()
  
  #plotting
  ret_plot = sns.lineplot(x = grouped.index, y = grouped.mae)
  ret_plot.set_title(title)
  ret_plot.set_ylabel('Mean Absolute Error')
  ret_plot.set_xlabel(param)
  
  # hard code axis values for easy comparison
  ret_plot.set_ylim(1200, 1700)
  
  return ret_plot

In [35]:
# linear regression 

linreg_stages = []

# if we wanted to include the full pipeline, we would change datasource to s_df
# and uncomment next line
# linreg_stages = stageCreator(s_train_small)
pca_stage = ps_PCA(inputCol = "features", outputCol = "pcaFeatures")
linreg_stages.append(pca_stage)
linreg_stage = ps_linreg(featuresCol = pca_stage.getOutputCol(), labelCol = 'label', maxIter = 10)
linreg_stages.append(linreg_stage)

pipelineLinReg = Pipeline(stages = linreg_stages)

paramGrid = ParamGridBuilder() \
    .addGrid(pca_stage.k, np.random.randint(500, size = 10).tolist()) \
    .build()

evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'label', metricName = "mae")

crossval = CrossValidator(estimator=pipelineLinReg,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,
                          collectSubModels=True)

cvModel_linear = crossval.fit(s_df_train_onehot)


In [36]:
# linear regression plotting
display(create_lineplot(cvModel_linear, "Linear Regression with PCA", "Number of PCA features", 'linear'))

In [37]:
# ridge regression
ridge_stages = []

# if we wanted to include the full pipeline, we would change datasource to s_df
# and uncomment next line

# ridge_stages = stageCreator(s_train)
ridge_stage = ps_linreg(featuresCol = "features", labelCol = 'label', elasticNetParam=0.0)
ridge_stages.append(ridge_stage)

pipelineRidge = Pipeline(stages = ridge_stages)

paramGrid = ParamGridBuilder() \
    .addGrid(ridge_stage.regParam,  np.random.uniform(low=0.0001, high=.1, size=20).tolist()) \
    .build()

evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'label', metricName = "mae")

crossval = CrossValidator(estimator=pipelineRidge,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,
                          collectSubModels=True)

cvModel_ridge = crossval.fit(s_df_train_onehot)

In [38]:
# ridge regression plotting
display(create_lineplot(cvModel_ridge, "Ridge Regression", "Alpha", "ridgelasso"))

In [39]:
# lasso regression

lasso_stages = []
# lasso_stages = stageCreator(s_train)
lasso_stage = ps_linreg(featuresCol = "features", labelCol = 'label', elasticNetParam=1.0)
lasso_stages.append(lasso_stage)

pipelineLasso = Pipeline(stages = lasso_stages)

paramGrid = ParamGridBuilder() \
    .addGrid(lasso_stage.regParam,  np.random.uniform(low=0.0001, high=.1, size=10).tolist()) \
    .build()

evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'label', metricName = "mae")

crossval = CrossValidator(estimator=pipelineLasso,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,
                          collectSubModels = True)

cvModel_lasso = crossval.fit(s_df_train_onehot)

In [40]:
# lasso regression plotting
display(create_lineplot(cvModel_lasso, "Lasso Regression", "Alpha", "ridgelasso"))

In [41]:
# random forest
# Identify the categorical column names
cat_cols = [w for w in s_df_train_rf.columns if 'cat' in w]

# Identify the continuous valued column names
cont_cols = [w for w in s_df_train_rf.columns if 'cont' in w]

# first need features in a vector
relevant_cols = cat_cols + cont_cols
vectorizer = VectorAssembler(inputCols = relevant_cols, outputCol = "features")

# now define rf
rf_model = ps_rf(featuresCol = 'features', labelCol = 'label')

pipelinerf = Pipeline(stages = [vectorizer, rf_model])

# parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf_model.maxDepth, np.random.randint(low=2, high=20, size = 10).tolist())\
    .build()

evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'label', metricName = "mae")

crossval = CrossValidator(estimator=pipelinerf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,
                          collectSubModels = True)

cvModel_rf = crossval.fit(s_df_train_rf)


In [42]:
print("Best Max Depth: " + str(cvModel_rf.bestModel.stages[-1].getOrDefault('maxDepth')))

In [43]:
rf_maes = cvModel_rf.avgMetrics
rf_params = [list(cvModel_rf.getEstimatorParamMaps()[i].values())[0] for i in range(len(cvModel_rf.getEstimatorParamMaps()))]

#plotting
ret_plot = sns.lineplot(x = rf_params, y = rf_maes)
ret_plot.set_title("Random Forest Regression")
ret_plot.set_ylabel('Mean Absolute Error')
ret_plot.set_xlabel("Max Depth")

# hard code axis values for easy comparison
ret_plot.set_ylim(1200, 1700)

display(ret_plot)

#### sklearn Algorithm Exploration

In [45]:
# Calculate the median of the outcome variable as a baseline.
print("==>  Median MAE for training dataset:", df_train.loss.median())

# Define a pipeline for Linear Regression, using PCA for dimensionality reduction 
pipeline_lr = skPipeline(steps=[('preprocess', preprocessor), 
                                ('pca', PCA(n_components=2)),
                                ('classifier', LinearRegression())])

pipeline_ridge = skPipeline(steps=[('preprocess', preprocessor),
                                   ('pca', PCA(n_components=350)),
                                   ('classifier', linear_model.Ridge())])

# Define a pipeline for Lasso Regression using L1 regularization for dimensionality reduction
pipeline_lasso = skPipeline(steps=[('preprocess', preprocessor),
                                   ('classifier', Lasso())])

# Define a pipeline for Random Forest using Brieman's method for transforming categorical features.
pipeline_rf = skPipeline(steps=[('preprocess', randforest_preprocessor), 
                                ('classifier', RandomForestRegressor(max_depth=2, random_state=random_seed, n_estimators=10))])

# Build a list of our sklearn pipelines
pipelines = [pipeline_rf, pipeline_lr, pipeline_lasso, pipeline_ridge]
pipeline_names = ['Random Forest Regressor', 'Linear Regression', 'Lasso Regression', 'Ridge Regression']

# Define hyperparameter ranges for gridsearch
hyperparameter_grid = {'Linear Regression': {'pca__n_components': np.random.randint(500, size = 10)},
                       'Lasso Regression': {'classifier__alpha': np.random.uniform(low=0.0001, high=.1, size=10)},
                       'Ridge Regression': {'classifier__alpha': np.random.uniform(low=0.0001, high=.1, size=10)},
                       'Random Forest Regressor': {'classifier__max_depth': np.random.randint(low=2, high=20, size = 10)}
                      }

results_params = []
results_scores = []

# For each pipeline, find the best hyperparameters using gridsearch and the mean absolute error metric
for i, p in enumerate(pipelines):
    grid = GridSearchCV(p, cv=3, param_grid=hyperparameter_grid[pipeline_names[i]], scoring='neg_mean_absolute_error')
    if pipeline_names[i] == 'Random Forest Regressor':
      grid.fit(x_all, y_train)
    else:
      grid.fit(x_train, y_train)
    print("==>  %s Best: %f using %s" % (pipeline_names[i], grid.best_score_, grid.best_params_))
    results_params.append(grid.cv_results_['params'])
    results_scores.append(grid.cv_results_['mean_test_score'])

In [46]:
# Plot the results of our algorithm exploration.
fig, ax1 = plt.subplots(1, 4, figsize=(12, 4), sharey = True)

pca_n = [d['pca__n_components'] for d in results_params[1]]
sns.lineplot(x=pca_n, y=abs(results_scores[1]), ax=ax1[0])
ax1[0].set_title('OLS Regression')
ax1[0].set_xlabel('PCA Components')

alphas = [d['classifier__alpha'] for d in results_params[3]]
sns.lineplot(x=alphas, y=abs(results_scores[3]), ax=ax1[1])
ax1[1].set_title('Ridge Regression')
ax1[1].set_xlabel('Alpha')

alphas = [d['classifier__alpha'] for d in results_params[2]]
sns.lineplot(x=alphas, y=abs(results_scores[2]), ax=ax1[2])
ax1[2].set_title('Lasso Regression')
ax1[2].set_xlabel('Alpha')

max_depths_n = [d['classifier__max_depth'] for d in results_params[0]]
sns.lineplot(x=max_depths_n, y=abs(results_scores[0]), ax=ax1[3])
ax1[3].set_title('Random Forest Regression')
ax1[3].set_ylabel('Mean Absolute Error')
ax1[3].set_xlabel('Max Depth')

display(fig)

# Algorithm Implementation

Since the random forest algorithm performed best in our basic hyperparameter tuning, we decided to do further exploration of how this algorithm works. The building block of a random forest is a decision tree which creates binary splits to subset the data into groups that minimize error (in our case, mean absolute error). In a most general procedure, we consider all possible split criteria for all possible input variables. So in our case for a sample dataset, we have `cat1`, `cat2`, and `cont_1` which represent two categorical variables and one continuous variable used to predict some outcome variable `y`.

In [49]:
# mini data definition
from pyspark.sql.types import *
sample_schema = StructType([
  StructField('cat1', IntegerType(), True),
  StructField('cat2', IntegerType(), True),
  StructField('cont_1', FloatType(), True),
  StructField('y', FloatType(), True)
])

sample_df = spark.createDataFrame([
            (1, 0, 0.97, 14.2),
            (1, 1, 0.77, 13.4),
            (0, 1, 0.81, 11.1),
            (0, 0, 0.55, 9.8),
            (1, 2, 0.25, 10.1)
], schema = sample_schema)

sample_df.show()

An important note that since we are using MAE as our error metric, we need to use medians rather than means as a baseline since this parameter is optimal for MAE estimation. Our median `y` value of the whole set is `11.1`, so this is our baseline prediction leading to a MAE of `((14.2 - 11.1) + (13.4 - 11.1) + (11.1 - 11.1) + (11.1 - 9.8) + (11.1 - 10.1)) / 5 = 1.54`.

Now, we want to evaluate all possible ways to make a binary split on the data on a basis of the input parameters. There are two different types of inputs we might have- categorical or numerical. Numerical values have potentially lots of possible split points for making greather than/ less than comparisons which makes them more computationally difficult; most code implementations will have a cutoff on the number of possible splits in the data they they might consider. Categorical variables can be compared on a binary basis of if they are a particular category or not. We can also use Brieman's method to replace the categories with a relevant aggregation of the outcome variable grouped by the categories of the inputs; for instance, we can replace `cat1` for values of `cat1 = 1` with the median value of `y` when `cat = 1` ( in this case, `13.8`). Simlilarly, we can do the same for when `cat1=0`. Brieman's method gives us the advantage of being able to make numerical comparisons in addition to reducing the effective number of decisions that need to be made. In the case of binary categorical variables this won't make a difference, but for non-binary it certainly will. 

Using Breiman's method, we can transform our data into the following:


_Source for median being optimal for MAE, with derivation: https://medium.com/@gennadylaptev/median-and-mae-3e85f92df2d7)_

_Source for Breiman's method: Async lecture section 12.8 for UC Berkeley Data Science w261_

In [51]:
# need to make user-defined function for aggregating medians
def median(values_list):
    med = np.median(values_list)
    return float(med)
udf_median = F.udf(median, FloatType())

br_df = sample_df

# note: this method does not appear to scale well
for i in range(1, 3):
  cat_var_str = 'cat' + str(i)
  tmp_cat_var_str = cat_var_str + "_"
  br_var_str = 'br_cat' + str(i)

#   print("Performing for " + cat_var_str)
  group_df = br_df.groupBy([cat_var_str])
  df_grouped = group_df.agg(udf_median(F.collect_list('y')).alias('median'))
  df_grouped = df_grouped.withColumnRenamed(cat_var_str, tmp_cat_var_str)

  br_df = br_df.join(df_grouped, F.col(cat_var_str) == F.col(tmp_cat_var_str), how = 'left')\
              .withColumnRenamed('median', br_var_str).drop(tmp_cat_var_str)
  
br_df = br_df.drop('cat1').drop('cat2')
br_df.show()

So, Breiman's method now employed, we can move on to calculating the MAE of making every possible split in the data from our current node which contains all of the data. For our first categorical variable, all of the possible split points are considered in the calculations below:

> **Variable: `cat_1` (Breiman encoded)**
> 
> split point 1: $$ brcat1 \leq 13.4$$
>
> for the values that fit criteria (TRUE) we have 3 total data points with a median of `13.4`: 
> $$y_0 = 13.4,  y_2 = 10.1, y_3 = 14.2$$
>
> for the values that do not fit criteria (FALSE) we have 2 total data points with a median of `10.45`:
> $$y_1 = 11.1, y_4 = 9.8$$
>
> Median Absolute Error (MAE) of split point: 
> $$ 
> MAE = \frac{1}{n} ( \sum |\hat{y} - y|\_{TRUE} +  \sum|\hat{y} -y|\_{FALSE})
> $$
> $$ 
> MAE = \frac{1}{5} ((|13.4 - 13.4| +  |10.1-13.4| + |14.2-13.4|) +(|11.1-10.45| + |9.8-10.45|))
> $$
> $$ 
> MAE = \frac{1}{5} (0 +  3.3 + 0.8 + 0.65 + 0.65)
> $$
> $$ 
> MAE = \frac{1}{5} (5.4) = 1.08
> $$
>
> **Variable: `cat_2` (Breiman encoded)**
> 
> split point 1: $$ brcat2 < 12$$
>
> for the values that fit criteria (TRUE) we have 1 total data points with a median of `10.1`: 
> $$y_1 = 10.1$$
>
> for the values that do not fit criteria (FALSE) we have 4 total data points with a median of `12.25`:
> $$y_0 = 13.4, y_1 = 11.1, y_3 = 14.2, y_4 = 9.8$$
>
> Median Absolute Error (MAE) of split point: 
> $$ 
> MAE = \frac{1}{5} ((|10.1 - 10.1|) +  (|13.4-12.25| + |11.1-12.25| +(|14.2-12.25| + |9.8-12.25|))
> $$
> $$ 
> MAE = 1.38
> $$
> 
> split point 2: $$ brcat2 < 12$$
>
> for the values that fit criteria (TRUE) we have 3 total data points with a median of `10.1`: 
> $$y_2 = 10.1, y_3 = 14.2, y_4 = 9.8$$
>
> for the values that do not fit criteria (FALSE) we have 2 total data points with a median of `12.25`:
> $$y_0 = 13.4, y_1 = 11.1$$
>
> Median Absolute Error (MAE) of split point: 
> $$ 
> MAE = \frac{1}{5} ((|10.1 - 10.1|) +  (|14.2-10.1| + |14.2-9.8| +(|13.4-12.25| + |11.1-12.25|))
> $$
> $$ 
> MAE = 2.16
> $$
>
> **Variable: `cont_1`**
> 
> split point 1: $$ cont1 < 0.55$$
>
> for the values that fit criteria (TRUE) we have 1 total data points with a median of `10.1`: 
> $$y_2 = 10.1$$
>
> for the values that do not fit criteria (FALSE) we have 4 total data points with a median of `12.25`:
> $$y_0 = 13.4, y_1 = 11.1, y_3 = 14.2, y_4 = 9.8$$
>
> We notice this is an equivalent split to `br_cat2 < 12`. Therefore, 
> $$ 
> MAE = 13.4
> $$
> 
> split point 2: $$ cont1 < 0.77$$
>
> for the values that do not fit criteria (TRUE) we have 2 total data points with a median of `9.95`:
> $$y_2 = 10.1, y_4 = 9.8$$
>
> for the values that fit criteria (FALSE) we have 3 total data points with a median of `13.4`: 
> $$y_0 = 13.4, y_1 = 11.1, y_3 = 14.2$$
>
> Median Absolute Error (MAE) of split point: 
> $$ 
> MAE = \frac{1}{5} ((|13.4-13.4| + |13.4-11.1| + |13.4-14.2|) +(|10.1-9.95| + |9.8-9.95|))
> $$
> $$ 
> MAE = 0.882
> $$
>
> split point 3: $$ cont1 < 0.81$$
>
> for the values that do not fit criteria (TRUE) we have 3 total data points with a median of `10.1`:
> $$y_0 = 13.4, y_2 = 10.1, y_4 = 9.8$$
>
> for the values that fit criteria (FALSE) we have 2 total data points with a median of `12.65`: 
> $$y_1 = 11.1, y_3 = 14.2$$
>
> Median Absolute Error (MAE) of split point: 
> $$ 
> MAE = \frac{1}{5} ((|11.1-12.65| + |14.2-12.65|) + (|13.4-10.1| + |10.1-10.1| + |9.8-10.1|))
> $$
> $$ 
> MAE = 1.34
> $$
>
> split point 4: $$ cont1 < 0.97$$
>
> for the values that do not fit criteria (TRUE) we have 4 total data points with a median of `10.6`:
> $$y_0 = 13.4, y_1 = 11.1, y_2 = 10.1, y_4 = 9.8$$
>
> for the values that fit criteria (FALSE) we have 1 total data points with a median of `14.2`: 
> $$y_3 = 14.2$$
>
> Median Absolute Error (MAE) of split point: 
> $$ 
> MAE = \frac{1}{5} ((|14.2-14.2|) + (|13.4-10.6| + |11.1-10.6| + |10.1-10.6| + |9.8-10.6|))
> $$
> $$ 
> MAE = 0.92
> $$

We see that splitting the data on `cont_1 < 0.77` provides child nodes that minimize MAE compared to all other split nodes, so this will be our choice of how to subset our data. We can continue this process for the two child nodes we created through this process theoretically until we have leaf nodes with just one example each. We note that this is a greedy algorithm at each stage. This would give us a decision tree that minimizes MAE for our training set (but may not generalize well).

Bringing this back to random forests, random forests are composed of many different decision trees (usually hundreds at a time). The main idea of doing this is to create lots of weak predictors which on an individual basis may not perform well, but when aggregated together perform very well. Each decision tree should be generated such that it is different than other trees, and there are a variety of ways to do that.

We construct each tree with a subset of the observations in the data, and might consider limited variables at each node for further splitting. Additionally, we typically cut off the tree to control for overfitting of individual trees; this can be done through pruning with an alpha parameter after the tree has been fully built out. We can also implement simpler methods that work towards the same purpose by putting a minimum number of data points that must be in a node to consider further splitting or specifying maximum depth. By controlling the proportion of data in each tree, features to be considered in split points, and number of split points to have overall, we can control the "randomness" of each tree in our forest. We can also tune for how many trees to be in the forest as an additional hyperparameter.

Random forests are great for distributed computing because they have a lot of levels of parallelization. First, each tree does not depend on any other tree; these can be trained independently. Additionally, the calculation of the next best split point can be parallelized, since the MAE calculation for each considered split point depends only on the data included in the node, not other split points being considered. We notice a big change in performance with using trees with MAE as an evaluation criteria rather than mean squared error. MSE minimizes variance in each child node, which can be calculated without a sort unlike medians.

#### sklearn Algorithm Implementation

In [56]:
# Define a pipeline for Random Forest using Brieman's method for transforming categorical features.
pipeline_rf = skPipeline(steps=[('preprocess', randforest_preprocessor), 
                                ('classifier', RandomForestRegressor(random_state=random_seed))])

# Define hyperparameter ranges for gridsearch
randomforest_grid = {'classifier__max_depth': [5, 9, 13, 15],
                     'classifier__n_estimators': [100, 200, 300, 400],
                     'classifier__max_features' : ['sqrt', 'log2', round((1/3) * df_train_x_rfpreprocessed.shape[1])]
                    }

# Execute gridsearch
grid = GridSearchCV(pipeline_rf, cv=10, param_grid=randomforest_grid, scoring='neg_mean_absolute_error')
grid.fit(x_all, y_train)
print("==>  Best: %f using %s" % (grid.best_score_, grid.best_params_))

In [57]:
# Using the best fit model, generate predictions for the test dataset
preds = grid.predict(x_test)
score = np.mean(np.abs(y_test - preds))
print("Final score on test dataset:", score)

In [58]:
fig, ax = plt.subplots(figsize=(5,3))
diff = y_test - preds
sns.distplot(diff, bins = 100, kde=False, rug=False)
ax.set_title('Distribution of Residuals')
ax.set_ylabel('Frequency')
ax.set_xlabel('Residual')
display(fig)

### Pyspark Algorithm Implementation

In [60]:
# random forest
# Identify the categorical column names
cat_cols = [w for w in s_df_train_rf.columns if 'cat' in w]

# Identify the continuous valued column names
cont_cols = [w for w in s_df_train_rf.columns if 'cont' in w]

# first need features in a vector
relevant_cols = cat_cols + cont_cols
vectorizer = VectorAssembler(inputCols = relevant_cols, outputCol = "features")

# now define rf
rf_model = ps_rf(featuresCol = 'features', labelCol = 'label')

pipelinerf = Pipeline(stages = [vectorizer, rf_model])

# parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf_model.maxDepth, [5, 9, 13, 15])\
    .addGrid(rf_model.numTrees, [100, 200, 300, 400])\
    .addGrid(rf_model.featureSubsetStrategy, ['sqrt', 'log2', 'onethird'])\
    .build()

evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'label', metricName = "mae")

crossval = CrossValidator(estimator=pipelinerf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=10,
                          collectSubModels = False)

cvModel_rf_final = crossval.fit(s_df_train_rf)

In [61]:
# get training MAE
evaluator.evaluate(cvModel_rf_final.transform(s_df_train_rf))


In [62]:
# create spark version of test data
x_all = x_test.copy()
x_all['y']  = y_test
    
mergedlist = ['y']
mergedlist.extend(categorical_cols)

randforest_preprocessor = skPipeline(steps = [('brieman', MyEncoder(mergedlist))])

# Fit and transform the transformer for Random Forest 
df_test_x_rfpreprocessed = randforest_preprocessor.fit_transform(x_all)

# need the loss variable
df_test_temp = df_test_x_rfpreprocessed
df_test_temp['loss'] = y_test

s_df_test_rf = spark.createDataFrame(df_test_temp)
s_df_test_rf = s_df_test_rf.withColumn('label', s_df_test_rf.loss)

In [63]:
# get MAE on test set
evaluator.evaluate(cvModel_rf_final.transform(s_df_test_rf))

In [64]:
cvModel_rf_final.bestModel.stages[-1].extractParamMap()

From Pyspark, our best model parameters were `numTrees = 400`, `maxDepth = 15`, `featureSubsetStrategy = 'onethird'`. These number of trees and maximum features are the same as sklearn, but the maximum depth came back slightly different.

# Conclusions

Our first objective in this project was to explore machine learning algorithms to predict insurance claim losses. We found the random forest algorithm performed very near to the leaderboard results in Kaggle. It is clear that this algorithm design is ideal for this type of data: it allows us to explore multiple reactions between features in a random way (given our feature space is obscured), as well as being innately scalable. 

Our sklearn model had a MAE of 1131 on the training data, and 1293 on the test data. Our PySpark model had a MAE of 1019 on the training data, and 1192 on the test data. Although the kaggle leaderboard uses a different dataset for its own use, a MAE of 1293 would be around rank 2495 of 3045, and a MAE of 1192 would be 2140 of 3045 entries (a difference of about 350 places).

## Spark vs. sklearn

Our second objective of this project was to compare the two pipelines of Spark and sklearn - the distributed library and the python machine learning powerhouse. 

On the surface they are quite similar (their imports even share names), but examination in greater detail reveals significant differences. 

#### Structure

Both sklearn (through pandas) and Spark use DataFrame structures for data storage and manipulation. In sklearn the entire DataFrame is used for training, but in Spark all features have to be packed into a single column - extracting each row of values into a vector. Spark therefore trains off of only one column of data, which contains multiple columns of data - coding this aspect is not complex, but requires extra thought.

#### Scalability 

In terms of scalability, theoretically, Spark has the clear advantage. Sklearn has fantastic performance if (and only if) the data fits into memory. Python and sklearn do non-distributed in-memory processing and with small data sets (megabytes, perhaps up to a few gigabytes) we are in fact better off using sklearn. Spark allows for a comprehensive set of tools specifically tailored towards big data - one framework to ingest, process, apply machine learning algorithms, query, and distribute computing power. With rapidly changing dataset, Spark also allows for streaming; machine learning in real time. 

Practically speaking, we did not experience a significant difference in training time between Spark and sklearn, though we expected to see vast improvements in performance with Spark. But as we will discuss below, it's hard to know what to associate with our model design and what to pin on shared clusters.

#### Random Forest Algorithm

In terms of our algorithm selection - Random Forest - it's important to understand that under the hood, Spark and sklearn are slightly different in how they calculate feature importance. The documentation for sklearn states it is using “an optimized version of the CART algorithm”, and while it is not explicitly mentioned in the documentation, it has been inferred that Spark is using ID3 with CART. Examining these algorithms in detail is beyond the scope of this project, but it is useful to delineate briefly the small differences in how they each calculate feature importance, as it may affect model performance (especially at scale).

For each decision tree, Scikit-learn calculates a nodes importance using Gini Importance. The importance for each feature on a decision tree is then calculated as the importance of feature i and the importance of node j normalized to a value between 0 and 1 and divided by the sum of all feature importance values. The final feature importance, at the Random Forest level, is it’s average over all the trees. 

For each decision tree, Spark calculates a feature’s importance by summing the gain, scaled by the number of samples passing through the node - the importance of feature i, the number of samples reaching node j, and the impurity of node j. To calculate the final feature importance at the Random Forest level, first the feature importance for each tree is normalized in relation to the tree, then feature importance values from each tree are summed and normalized.

#### Practical Considerations

In implementing our algorithm in Databricks we faced some practical hurdles. The distributed framework added overhead; we faced this when running the PCA and linear regression took nearly as much time as running all our initial pipelines in sklearn. The PySpark pipelines did not function as smoothly for doing the same transformations as we did in sklearn - going through a whole looping process to create all the individual column transformations was a time-intensive process. In trying to implement Breiman encoding in PySpark we hit a roadblock -  attempting literal SQL joins 131 times for each categorical variable was just not feasible. This implementation can be seen in our demo data set, where the compute concerns were not nearly as high.

The choice of our evaluation metric, though logical, created an additional headache as calculating the median requires a sort and an additional pass through the data (which is not true with the mean). 

One further differentiation between sklearn and Spark is the ability to visualize the data. In this aspect sklearn has the upper hand. With support for pandas and matplotlib we can visualize results, verify assumptions, and use scipy functions as part of the machine learning workflow. 

#### Final Thoughts

Overall, Spark is an execution engine optimized to handle the workload of distributed data engineering, while sklearn is perhaps better suited to data science after your data is ingested, processed, and transformed. We found that Spark was not optimized for algorithm exploration - the API was not intuitive to compare individual model performance in the context of cross-validation. Ultimately, there is no reason not to incorporate both for different use cases inside the same environment.

For this type of data in the real world - insurance claims - it's likely the model would be retrained on a fairly regular basis, but not concurrently with the ingestion of new data. It's hard to imagine the predictions changing drastically with the addition of a new training example. However, the training time could be a factor. Training the model took upwards of 18 hours, which in this case might not be an issue as the prediction model might be more static than dynamic. Real time results would be necessary on the prediction side - not necessarily the training side.

In our experience, the use of Databricks was both a blessing and a curse. It was simple to set up the envirnment and infrastructure for distributed computing, but difficult to gauge actual performance and time complexity due to the shared nature of the compute clusters. Our algorithms took over 12 hours to run in both environments, but it is hard to know what to attribute to shared resources and what to attribute to model design.

#### Time Comparison for Key Analysis Portions
(times taken manually from this notebook)

In [68]:
# create data
spark_onehot = 19.37
sk_onehot = 54/60

spark_pca_linear = 60*1.22
spark_ridge = 2.54 * 60
spark_lasso = 48.62
spark_rf = 22.32
sk_all = 1.51 * 60

sk_cv = 12.48
spark_cv = 1.71 * 24

fig, ax = plt.subplots(figsize=(5,3))

cats = ['Spark OHE', 'Sklean OHE']
vals = [spark_onehot, sk_onehot]

sns.barplot(x = cats, y = vals)
ax.set_title("Time for One Hot Encoding Mechanism")
ax.set_ylabel("Time (minutes)")
display(fig)




In [69]:
fig, ax = plt.subplots(figsize=(8,3))
cats = ['Spark PCA+Linear', 'Spark Lasso', 'Spark Ridge', 'Spark RF', 'Sklean All Pipelines']
vals = [spark_pca_linear, spark_lasso, spark_ridge, spark_rf, sk_all]

sns.barplot(x = cats, y = vals)
ax.set_title("Time for Modeling Pipeline")
ax.set_ylabel("Time (minutes)")
display(fig)

In [70]:
fig, ax = plt.subplots(figsize=(5,3))
cats = ['Spark Tuning', 'Sklearn Tuning']
vals = [spark_cv, sk_cv]
sns.barplot(x = cats, y = vals)
ax.set_title("Time for Model Hypterparameter Tuning")
ax.set_ylabel("Time (hours)")
display(fig)

# Application of Course Concepts

### Feature Engineering & Data Transformation

Feature engineering and data transformation were course concepts that played a pivotal role in this project.  As is often understood in the field of data science, preparing the data for machine learning can and often should dominate the data scientist's time since the models we produce are only as good as the data we use to train and validate them.  Put pointedly, garbage in will result in garbage out.  

The AllState dataset presented a few challenges in this domain.  As we discussed, the data itself appeared to have been encoded to mask the meaning of the variables and values provided.  This eliminated any domain-specific feature engineering we would have investigated.  However, the dataset did include 116 categorical variables, or features, which required transformation in order to serve as inputs to the suite of regression algorithms we explored.  

We implemented two approaches to transforming these varaibles.  For use with the OLS, Ridge, and Lasso Regression algorithms, we applied a **one-hot encoding** (OHE) to transform the data.  OHE was specifically selected over a simple numeric label encoding to avoid any implicit reasoning by the algorithm that the numeric encoding implied an order amongst the values.  Using OHE to essentially create a new binary feature for each value of each categoricalvariable, we allowed each to serve as an indicator only of how that binary flag affected the outcome variable.  These binary features, held in a Dataframe as columns, were then **assembled as a vector** for machine learning with Spark (Spark, The Definitive Guide, Chapter 25, Preprocessing and Feature Engineering).

For the Random Forest Regression algorithm, we applied **Breiman's method** to transform the categorical variables.  To perform this transformation, we assign each unique value of a given feature to the median outcome of all observations with that unique value.  We selected to take the median instead of the mean because of our selected outcome measure, MAE. (Asychronous Session 12.8)

### Scalability

While we discussed this concept in our conclusion, it also deserves a mention here. 

Our discussion of Spark vs. sklearn is essentially a discussion of scalability and time complexity, and is analagous to the "machine learning at scale" class as a whole. If the dataset is small enough to fit in memory on a local machine, and the model can be trained once, sklearn is a viable tool. If, however, the dataset is large, or growing, or streaming, and the computational power needed to access the data and derive any meaning from the data is greater than one machine can handle - distributed computing through Spark is the only solution. Spark can also handle smaller datasets, which makes it more versatile if less intuitive. In our case, the dataset was small enough to work with both frameworks and to compare the differences. However, we did not see the differences we expected to see - likely due to the use of Databricks and shared clusters.

In the course of this project, we were naive in our initial assumptions, namely that Spark wouldn't take as long as sklearn. However, we did not take into account the nature of shared clusters on Databricks. It is difficult to compare true performance given this factor - as discussed in our conclusion.

### Tradeoffs in Model Selection

In machine learning, we are always looking for the model that will minimize error, reduce complexity, and generalize the best. In selecting a model, we automatically introduce bias: the difference between the predicted values and the true values. If our training data contains a lot of noise, our model will overfit the data due to high variance: the difference between the model's predicted value and the average model prediction. With high bias we are prone to 'underfit' the data (capturing none of the underlying pattern), and with high variance we are prone to 'overfit' the data (capturing noise along with the underlying pattern). Both render our model useless. A simple model with few parameters may have high bias and low variance, whereas a complex model with many parameters may have low bias and high variance. Therefore we must find a balance to neither overfit nor underfit our data - as a model cannot be both more complex and less complex simultaneously. In order to minimize both bias and variance, it is necessary to 'decompose' these sources of error to choose the best model. 

Practically speaking, bias-variance decomposition involves first bootstrapping the training data to simulate multiple training sets, and then fitting a model many times to the bootstrapped data to get an average model. Next, we would apply a learning algorithm to each replicated set of training data. In this case it is important to hold back some extra training data and compute the predicted values using this held back data. The bias and variance can then be plotted based on models differing in complexity to find the optimum balance.

The Random Forest algorithm minimizes bias and variance as it is bootstrapped by design.

Decision Trees on their own have extremely low bias as they tend to overfit the training data. Each “prediction” made with one Decision Tree on our data set would be one entry in the "loss" column. This results in high variance and bad results on unseen data. While this individual tree would be overfit to the training data with large error, bagging (Bootstrap Aggregating) takes advantage of the fact that a high number of uncorrelated errors would average out to zero. Bagging creates multiple trees with random samples from the training data (with replacement). This is the "Random Forest", and the tradeoffs for model complexity - and the sources of bias and variance - come from hyperparameters like the number of trees, number of splits, number of features per tree, etc. 

Had we chosen linear regression instead of Random Forest, we would have further options to reduce bias and variance: regularization. This is discussed below in OLS assumptions.

Sources: https://towardsdatascience.com/understanding-the-bias-variance-tradeoff-165e6942b229, https://people.eecs.berkeley.edu/~jrs/189/lec/12.pdf, https://towardsdatascience.com/random-forests-and-the-bias-variance-tradeoff-3b77fee339b4, ILS book

### Model Assumptions

#### Linear Models
Ordinary least squares (OLS) regression is a very flexible method that involves a few assumptions. To perform linear regression, numerical data is needed, meaning numeric encodings are needed for any categorical fields. In our example, we have done that through one-hot encoding. At this stage, you can run the linear regression just fine but there are further assumptions needed to prove that the coefficients estimated by OLS are unbiased. There are 4 key assumptions in order to prove this:
* The relationship between the input variables and output variables is actually linear in nature, with some disturbance. In other words, the data can actually be modeled with `y = b * x + b_0 + error`.
  * For our data, when we train our machine learning models we assume this to be true. In reality, this is probably a generous assumption because there are likely some factors that have different types of effects. For instance, we could easily imagine the appropriate representation of the data to be a squared variable instead of the normal variable. If we don't explicily encode this into our inputs, we inherently violate the OLS assumptions.
* We have random sampling of our data from the relevant population at large.
  * Our team is skeptical that this is a truly random subset of data, given that Allstate had to clear this dataset to be usable to the public. There is likely some subset of claims that they were more inclined to expose. That being said, with a large sample size the effects of this are (hopefully) negligible.
* The input variables each have some variance (not all the same value)
  * All of our categorical variables have at least 2 categories in the data, and our numerical predictors have many different values. This assumption checks out with no problems.
* No matter the value of our inputs, we should expect the random noise around the outcome variable to be the same.
  * This can be tested by plotting out a simple scatterplot of our residuals against any one of our input variables. An alternative approach is to look at our actual vs predicted plot. Given that this isn't a perfect straight line, we can assess that it is highly unlikely that this assumption holds true.
  
So, in short, we're pretty sure we are in violation of having unbiased estimates for our coefficients. This would definitely matter if the main purpose of our model was inference and understanding- however, with a dataset that is completely anonymized this is not so much a concern, because we don't know what any of our inputs really mean anyways. In a real world use case, we would really want to know the purposes of the model to asses how problematic this is.

While not a core assumption of OLS, another item that can trip up a linear model are highly correlated input features. The estimates of the coefficients of highly correlated features will have high variance, but on aggregate will be close to the overall effect of those variables. In other words, the model knows that the correlated features have some degree of effect, but will be unsure specifically where to attribute the effects to each of them.

_(Reference for OLS: Jeffrey Woodlridge, Introductory Econometrics, 4th Edition, Chapter 2)_

#### Lasso + Ridge

One way to combat this last shortcoming of OLS, highly correlated features, is to use regularization (either L1 or L2) to reduce the effects of any given variable. L1 tends to shrink nonrelevant variables to have a coefficient of zero, whereas L2 reduces the overall magnitude of all of the coeficients to be closer to zero (but not necessarily any one to actual zero). The difference in behavior stems from the fact that lasso regression's regularization term is the sum of the absolute value of coefficients, whereas the ridge term is the sum of squared coefficients. While the equations generated from ridge regression (L2) and lasso regression (L1) look very similar in practice to OLS, the assumptions are slightly different. The input data for these algorithms needs to be standardized to the same relative scale. The rationale for this is that if the data is not scaled, the algorithms will naturally penalize inputs with different magnitudes not uniformly and some variables may dominate the regularization. By centering and scaling, the data are all on the same relative scales and we have a better apples-to-apples comparison where the penalties are applied uniformly. With our data being inherently bounded between 0 and 1 for continuous values, we were fine to proceed without doing further processing on this step.

_(Reference for Lasso, Ridge: Hastie et all, The Elements of Statistical Learning, 2nd Edition, Section 3.4.1)_

#### Tree-based algorithms

As an alternative approach, we could use decision trees or aggregations of trees (random forest or gradient boosted trees) in order to predict our outcome. These are nice because they have relatively few assumptions. They can work with categorical and real-valued data, although the treatment for each of these varies slightly in execution slightly. 

For real-valued data, the algorithm will look to find test all possible split points in the input to find the one that reduces variance in the child nodes. Thus, all that is required that our data is able to be sorted to make this calculation. Note that no standardization or scaling is needed; this only depends on the order of the data, whatever that order is. The main situation where this can't be done is with missing values. However there is an easy fix- you can create a new column for whether the value is missing or not missing, and then fill in the missing values of the original data with a value. The tree algorithm now has an additional split decision variable it can make to ask if the knowledge that data is missing helps predict the outcome correctly.

The missing encoding is one example of how the tree deals with categorical features. In general, missingness isn't such a big deal for categorical variables, because for a categorical variable of `n` categories, we can simply encode missing values as the `n + 1` category. If a feature is treated as a nonordered category for the decision, we make binary decisions of one category vs. all others. In the sitation of a ranked category, we can encode these with the appropriate ranking and treat these as numeric. Note that you could alternatively one-hot encode the categorical variables into many different binary variables, as this boils down to making more or less the same comparison. Unlike other algorithms, this is not a necessary step and can hurt interpretability. In our data, we chose to do Breiman encoding, which encodes categorical variables with an aggregate statistic (medians in our case) per category. This gives the advantage of giving fewer optimal split points for comparison to the algorithm.

Note that adding features potentially with something like one hot encoding makes more binary decisions required to get to the same effective decision. We can adjust how much these effects are present by manipulating tree depth or the number of features considered at each split node, but overall it's probably a better decision to not increase your dimensionality artificially. 

Additionally, we can reduce the time-complexity to train a tree-based model by giving it less possible comparisons to make in the input variables. This could be done by any of the following:
* Reducing the features that are fed into the model (either via dimensionality reduction like PCA, or feature selection like lasso)
* Reducing the possible split points of our features (for instance, binning any real-valued inputs)

_(Reference for Trees: Hastie et all, The Elements of Statistical Learning, 2nd Edition, Section 9.2)_

_(Reference for effects of One Hot Encoding: https://roamanalytics.com/2016/10/28/are-categorical-variables-getting-lost-in-your-random-forests/)_

### Commutative and Associative Properties

In general, an operation (denoted as `op`) is commutative if the following holds true for some values `a` `b` and `c`:

`a op b = b op a`

An operation is associative if the following holds true:

`a op (b op c) = (a op b) op c`.

In words, this means that the commutative property says we don't need to worry about the order that we do an operation between two values, and the associative property means that we don't have to worry about how we group them. Addition and multiplication are both good examples of operations that have this property. These properties are useful when looking into mapreduce operations, since commutative and associative operations can be more effectively parallelized. If we can break down an operation into parts that have these properties and parts that do not, we can more effectively parallelize functions. For instance, a mean can be thought of as a sum component followed followed by a division. The sum can be parrallelized since it has these two properties, and then the division must happen at a single node. 

One place where we expect that we saw there effects of this in our analysis was the conscious decision to use MAE as a metric rather than MSE. With MSE, we could have been replacing means instead of medians in most of our data. Our random forest implementation required the calculation of a lot of different medians which was a computationally intensive task, requiring a sort for each one. If we had chosen MSE, we could have been leveraging the fact that a sum is both communative and associative and can be done completely in a map step of mapreduce to improve our performance.


_(Reference: Konwinski et all, Learning Spark, Chapter 6)_

_W261 Spring 2020 Final Project Team 1_

John Boudreaux, Sarah Danzi, Alex West