<h2>Allstate Claim Severity Kaggle Project</h2>

This goal of this project is to be able to predict the cost (and hence severity) of a given insurance claim for Allstate insurance company.

The dataset and project is outlined in this Kaggle project below:
- https://www.kaggle.com/c/allstate-claims-severity/data

whereas the mean absolute error is the evaluation metric.



**_Data Structure_**

Each row in this dataset represents an insurance claim. The values in 'loss' column should be predicted. Variables prefaced with 'cat' are categorical, while those prefaced with 'cont' are continuous.

<h3>Data Analysis</h3>

In [3]:
display(dbutils.fs.ls("dbfs:/FileStore/tables/allstate/"))

path,name,size
dbfs:/FileStore/tables/allstate/allstate_test.csv,allstate_test.csv,45715862
dbfs:/FileStore/tables/allstate/allstate_train.csv,allstate_train.csv,70025339


In [4]:
#Let us first load the data

dataset = (spark.read
 .option("header","true")
 .option("inferSchema","true")
 .csv("dbfs:/FileStore/tables/allstate/allstate_train.csv")
 .cache()
             )

In [5]:
print("Data set is a '%.5e' by '%i' matrix \n" 
      %(dataset.count(),len(dataset.columns)))         #Check the size of the dta matrix
print(dataset.columns,"\n")                            


We have medium sized labeled data with the following data structure:
* "id, 116 categorical, 14 continuous data and the loss (as label)"

In [7]:
dataset_nonull = dataset.dropDuplicates().na.drop()            #Remove any duplicates and null content

print("After removing null and replicate entries the data set is a '%.5e' by '%i' matrix \n" 
      %(dataset.count(),len(dataset.columns)))

* The dataset size doesn't change after dropping duplicates and null content. Hence, we deem this dataset as a clean one

In [9]:
#Let us take a quick look at the categorical data
dataset.select(dataset.columns[1:117]).toPandas()[0:10]

Unnamed: 0,cat1,cat2,cat3,cat4,cat5,cat6,cat7,cat8,cat9,cat10,cat11,cat12,cat13,cat14,cat15,cat16,cat17,cat18,cat19,cat20,cat21,cat22,cat23,cat24,cat25,cat26,cat27,cat28,cat29,cat30,cat31,cat32,cat33,cat34,cat35,cat36,cat37,cat38,cat39,cat40,...,cat77,cat78,cat79,cat80,cat81,cat82,cat83,cat84,cat85,cat86,cat87,cat88,cat89,cat90,cat91,cat92,cat93,cat94,cat95,cat96,cat97,cat98,cat99,cat100,cat101,cat102,cat103,cat104,cat105,cat106,cat107,cat108,cat109,cat110,cat111,cat112,cat113,cat114,cat115,cat116
0,A,B,A,B,A,A,A,A,B,A,B,A,A,A,A,A,A,A,A,A,A,A,B,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,...,D,B,B,D,D,B,D,C,B,D,B,A,A,A,A,A,D,B,C,E,A,C,T,B,G,A,A,I,E,G,J,G,BU,BC,C,AS,S,A,O,LB
1,A,B,A,A,A,A,A,A,B,B,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,...,D,B,B,D,D,A,B,C,B,D,B,A,A,A,A,A,D,D,C,E,E,D,T,L,F,A,A,E,E,I,K,K,BI,CQ,A,AV,BM,A,O,DP
2,A,B,A,A,B,A,A,A,B,B,B,B,B,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,B,A,A,A,A,...,D,B,B,B,D,B,D,C,B,B,B,A,A,A,A,A,D,D,C,E,E,A,D,L,O,A,B,E,F,H,F,A,AB,DK,A,C,AF,A,I,GK
3,B,B,A,B,A,A,A,A,B,A,A,A,A,A,A,A,A,A,A,A,A,A,B,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,...,D,B,B,D,D,D,B,C,B,D,B,A,A,A,A,A,D,D,C,E,E,D,T,I,D,A,A,E,E,I,K,K,BI,CS,C,N,AE,A,O,DJ
4,A,B,A,B,A,A,A,A,B,B,A,B,A,A,A,A,A,A,A,A,A,A,B,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,...,D,B,D,B,D,B,B,C,B,B,C,A,A,A,B,H,D,B,D,E,E,A,P,F,J,A,A,D,E,K,G,B,H,C,C,Y,BM,A,K,CK
5,A,B,A,A,A,A,A,A,B,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,...,D,B,D,B,D,B,B,C,B,B,B,A,A,A,A,A,D,D,D,E,C,A,P,J,D,A,A,E,E,H,F,B,BI,CS,A,AS,AE,A,K,DJ
6,A,A,A,A,B,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,B,A,A,A,A,...,D,B,B,D,D,B,D,C,B,B,B,A,A,A,A,A,D,D,D,E,C,A,P,J,A,A,C,E,E,H,F,B,BI,DK,A,J,AF,A,K,DJ
7,A,B,A,B,A,A,A,A,B,A,A,A,A,A,A,A,A,A,A,A,A,A,B,A,A,A,A,B,A,A,A,B,A,A,A,A,A,A,A,A,...,D,B,D,B,D,A,B,C,B,D,C,D,A,A,A,A,C,B,C,E,A,C,T,H,C,A,A,K,F,F,I,G,BI,EB,G,AH,Y,A,P,LO
8,A,B,B,B,B,A,A,A,B,B,B,B,B,A,A,B,A,A,A,A,A,A,B,A,A,A,A,A,A,A,A,A,A,A,A,B,A,B,A,A,...,D,B,D,B,B,B,B,C,B,D,D,A,A,B,A,A,D,C,C,E,C,D,T,C,Q,A,C,H,F,G,M,K,BI,BC,C,K,AX,A,Q,IE
9,A,B,A,A,B,B,A,A,B,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,B,A,B,B,B,...,D,B,B,B,B,B,B,C,B,D,C,D,A,A,B,H,D,B,C,E,A,C,T,F,D,A,F,K,H,G,J,G,BU,DW,A,U,S,J,O,LY


* The categorical data have been obfuscated. Hence it is not possible to make contextual deductions from the set.
* Also, potentially , there are more distinct categories for data after cat109

In [11]:
#Let us now take a quick look at the continuous data
dataset.select(dataset.columns[117:132]).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
cont1,188318,0.4938613645642003,0.1876401764138863,1.6E-5,0.984975
cont2,188318,0.507188356179494,0.20720173860981372,0.001149,0.862654
cont3,188318,0.4989184507216597,0.20210460819343745,0.002634,0.944251
cont4,188318,0.4918123025892123,0.21129221269283555,0.176921,0.954297
cont5,188318,0.4874277287832749,0.20902682854450408,0.281143,0.983674
cont6,188318,0.4909445337355028,0.20527256983553044,0.012683,0.997162
cont7,188318,0.4849702050680173,0.17845016396070937,0.069503,1.0
cont8,188318,0.4864373158699439,0.19937045456133265,0.23688,0.9802
cont9,188318,0.485506319895067,0.18166017135075585,8.0E-5,0.9954


* The continuous data has already been normalized and have positive values
* Loss values have a wide range and is potentially skewed, needs further investigation

In [13]:
import matplotlib.pyplot as plt
from math import log

npts = 10000
loss_samples = dataset.select('loss').take(npts)
yy = [float(y[0]) for y in loss_samples]
logy = [log(y) for y in yy]
f, axes = plt.subplots(1,2)
f.tight_layout()
axes[0].hist(yy, bins=30, log=True)
axes[0].set_title('log-Histogram of loss')
axes[1].hist(logy, bins=30, log=False)
axes[1].set_title('Histogram of log(loss)')
display(f)

* As expected the loss values have a lognormal distribution tendency and would respond well for log transformed linear regression

In [15]:
from pyspark.sql.functions import log

dataset = dataset.filter(dataset.loss > 1)                       #remove any data that would have negative log values
dataset = dataset.withColumn("log_loss",log(dataset.loss))
dataset.select("loss","log_loss").show(5)
dataset.select(dataset.columns[131:133]).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
loss,188317,3037.3538109676856,2904.085466237326,5.25,121012.25
log_loss,188317,7.685310779734525,0.8116564065242151,1.6582280766035324,11.70364705912391


In [16]:
#Let us look at the correlation between the continuous variables and the loss values

features = dataset.columns[117:132]
print('%s:   %19s\t %s\t    ' % ('feature','Loss','LogLoss'))

for feature in features:
  print("Correlation of %6s : %6.3f\t %6.3f: " %(feature,dataset.stat.corr(feature, 'loss'),dataset.stat.corr(feature, 'log_loss')))


* There doesn't seem to be a strong correlation between individual variables to loss . 
* cont2, cont3 and cont7 are the ones most strongly correlated to loss.

In [18]:
#plotting the continuous data against loss might give us further insight

import numpy as np
import matplotlib.pyplot as plt

features = dataset.columns[117:131]
nfeatures = len(features)
rowlength = 3
lastrowlength = nfeatures % rowlength
nrows = int(np.ceil(float(nfeatures)/rowlength))

npts = 400

f, axes_all = plt.subplots(nrows, rowlength, sharey=True, figsize=(10,10))
f.tight_layout()
loss_samples = dataset.select('loss').take(npts)
yy = [y for y in loss_samples]

for irow in range(nrows):
  if (irow == nrows-1):
    thisrowlength = lastrowlength
  else:
    thisrowlength = rowlength
  first = rowlength*irow
  last = min(rowlength*(irow+1),nfeatures)
  feats = features[first:last]
  for iplot in range(thisrowlength):
    data = dataset.select(feats[iplot]).take(npts)
    xx = [x for x in data]    
    axes_all[irow][iplot].scatter(xx,yy,s=10,alpha=0.4)
    axes_all[irow][iplot].set_xlabel(feats[iplot], fontsize='medium')
    axes_all[irow][iplot].set_ylabel('loss')
    axes_all[irow][iplot].get_yaxis().set_ticks([])
display(f)

* There is again no significant trend between continuous variables and the loss data
* cont2 variable seems to have been binned and has discrete values

<h3>Data Preparation</h3>

In [21]:
#Let us first convert the categorical data into numerical. Here, the most frequent category gets the lowest index. 

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

df = dataset
PL1 = Pipeline(stages=[
    StringIndexer(inputCol=c, outputCol='{}_Ind'.format(c)).setHandleInvalid("error") for c in df.columns if c.startswith("cat") 
])

df_indexed = PL1.fit(df).transform(df)

In [22]:
#Let us check the feature size of each categorical data column
nCategory ={}
headers = df_indexed.schema.names

for h in headers:  
  if h.endswith("_Ind"):  nCategory[h] = df_indexed.select(h).distinct().count()
print(nCategory)


The categorical data after cat109 indeed have more levels with cat116 having the mamimum feature size of 326. This knowledge will be useful while setting up the random forest regressor.

In [24]:
# We'll need a list of headers for the columns we would like to assemble into a vector 
assemblerInputs =[]

for h in headers:  
  if h.endswith("_Ind") or h.startswith("cont"): assemblerInputs.append(h)      


In [25]:
#Time to assemble the features vector and isolate the related columns for training

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols= assemblerInputs,
    outputCol="features")

dataVec = assembler.transform(df_indexed).select("features","loss")
dataVec.show(5)


In [26]:
#Let us split the training data into two. 

trainVec, testVec = dataVec.randomSplit(
  [0.8, 0.2],                    # 80-20 split
  seed= 5)                       # For reproducibility


<h3>Modeling</h3>

We'll start setting up a baseline. Here, the baseline prediction will be the mean of the population.

In [28]:
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = dataVec.select(
    _mean(col('loss')).alias('mean'),
    _stddev(col('loss')).alias('std')
).collect()

mean = float(df_stats[0]['mean'])
sigma = float(df_stats[0]['std'])

print("Mean: %.2f, Standard Deviation: %.2f" %(mean,sigma))

In [29]:
from pyspark.sql.functions import lit

baselineDF = trainVec.withColumn("prediction", lit(mean))
baselineDF.select("loss","prediction").show(5)

In [30]:
#Let us setup our evaluation metrics for all the models to be trained

from pyspark.ml.evaluation import RegressionEvaluator

FOMs = []

def get_eval_metrics(model_name, predictDF, label_col, prediction_col):
  
  evaluator = RegressionEvaluator(labelCol=label_col, predictionCol=prediction_col)
  mae = evaluator.evaluate(predictDF, {evaluator.metricName: "mae"})
  rms = evaluator.evaluate(predictDF, {evaluator.metricName: "rmse"})
  r2 = evaluator.evaluate(predictDF, {evaluator.metricName: "r2"})
  print("%s MAE: %.3f, RMS: %.3f, R2: %.3f" % (model_name,mae,rms,r2))

  return [model_name,mae,rms,r2]


In [31]:
baseline_FOM = get_eval_metrics("Baseline (avg)", baselineDF,"loss","prediction")
FOMs.append(baseline_FOM)

* The errors are bad as expected. Let us try to fit a bit more complicated models to get better results.

In [33]:
from pyspark.ml.regression import LinearRegression, LinearRegressionModel

linR = LinearRegression(maxIter=25, regParam=0.01, solver="normal", featuresCol='features', labelCol='loss')
linModel = linR.fit(trainVec)

In [34]:
predictDF_lin = linModel.transform(testVec)
predictDF_lin.select("loss","prediction").show(5)

In [35]:
linear_FOM = get_eval_metrics("Linear Regression", predictDF_lin,"loss","prediction")
FOMs.append(linear_FOM)

* The results are significantly better but let us keep exploring
* Maybe we can make use of the lognormal distribution of the loss data

In [37]:
from pyspark.ml.regression import GeneralizedLinearRegression

glr = GeneralizedLinearRegression(family="gaussian", link="log", labelCol='loss', featuresCol='features', maxIter=25, regParam = 0.01, linkPredictionCol= 'linkPrediction')

GLRmodel = glr.fit(trainVec)


In [38]:
predictDF_log = GLRmodel.transform(testVec)
predictDF_log.select("loss","prediction",'linkPrediction').show(5)


In [39]:
log_FOM = get_eval_metrics("Log Transform", predictDF_log,"loss","prediction")
FOMs.append(log_FOM)

* We see this model can explain the variance in the dataset better than the linear regression. Yet its MAE is worse. My guess is this model is optimized for RMS and hence has a better performance there. Yet, there are not that many parameters to tune over here and there is also no other option to change the loss function to at this high level.

* Up next, we'll check the performance of the Random Forest Regression on our data. Since the parameter settings for RFR are less straightforward, we'll do a hyper parameter tuning

In [41]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator 
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel

maxBins  = 326                        # Max num of branches to be created >= max num of levels in a category
seed     = None                       # Keeping seeds random
maxDepth    = [10,15]                 # Branches won't be created further than this level
minInfoGain = [0.2,0.5,1]             # Branches won't be created if their info gain is less than this
numTrees  = [20]                      # Number of trees in the forest to reduce overfitting
numFolds  = 10                        # Number of groups to divide the training data for cross-validation

rfr = (RandomForestRegressor(featuresCol='features')
      .setLabelCol("loss")                  
      .setMaxBins(maxBins)
      .setSeed(seed) 
)

evaluator = RegressionEvaluator(labelCol="loss", predictionCol="prediction")

paramGrid = ParamGridBuilder() \
      .addGrid(rfr.numTrees, numTrees) \
      .addGrid(rfr.maxDepth, maxDepth) \
      .addGrid(rfr.minInfoGain, minInfoGain) \
      .build()

cv = CrossValidator() \
      .setEstimator(rfr) \
      .setEvaluator(evaluator) \
      .setEstimatorParamMaps(paramGrid) \
      .setNumFolds(numFolds)

cvModel = cv.fit(trainVec) 

In [42]:
predictDF_rfr = cvModel.transform(testVec)
predictDF_rfr.select("loss","prediction").show(5)

In [43]:
rfr_FOM = get_eval_metrics("Random Forest", predictDF_rfr,"loss","prediction")

In [44]:
bestModel = cvModel.bestModel          #get the best model from cross-validation
bestModel.extractParamMap()            #get the default and set values for all parameters
#bestModel.explainParam("minInfoGain")  #get the default and set values for a single parameter

Using the bestmodel parameters to avoid rerunning crossvalidation each time

In [46]:
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel

maxBins     = 326                     # Max num of branches to be created >= max num of levels in a category
seed        = 42                      # Setting a seed for reproducibility
maxDepth    = 10                      # Branches won't be created further than this level
minInfoGain = 0.5                     # Branches won't be created if their info gain is less than this
numTrees    = 20                      # Number of trees in the forest to reduce overfitting

rfr = (RandomForestRegressor(featuresCol='features')
      .setLabelCol("loss")                  
      .setMaxBins(maxBins)
      .setSeed(seed) 
      .setMaxDepth(maxDepth)
      .setMinInfoGain(minInfoGain)
      .setNumTrees(numTrees)
)

RFRmodel = rfr.fit(trainVec)

In [47]:
predictDF_rfr = RFRmodel.transform(testVec)
predictDF_rfr.select("loss","prediction").show(5)

In [48]:
rfr_FOM = get_eval_metrics("Random Forest", predictDF_rfr,"loss","prediction")
FOMs.append(rfr_FOM)

In [49]:
  print( "%-20s %-10s %-10s %-10s"  %("Model","MAE","RMS","R2"))
  
  for [name,mae,rms,r2] in FOMs:
    print( "%-20s %-10.2f %-10.2f %-10.2f"  %(name,mae,rms,r2))

The random forest seems to be doing much better compared to the rest of the models, partly due to the hyper parameter tuning. 
Let us look into a bit more in detail to the model results.

In [51]:
npts = 1000
def sample_list(predictDF,npts):
  results = predictDF.select('loss', 'prediction').take(npts)
  loss    = [r['loss'] for r in results]
  abs_err = [r['prediction']-r['loss'] for r in results]
  rel_err = [abs(r['prediction']-r['loss'])/r['loss'] for r in results]  
  return loss,abs_err,rel_err
  
[loss_base,abs_err_base,rel_err_base] = sample_list(baselineDF,npts)
[loss_lin,abs_err_lin,rel_err_lin] = sample_list(predictDF_lin,npts)
[loss_log,abs_err_log,rel_err_log] = sample_list(predictDF_log,npts)
[loss_rfr,abs_err_rfr,rel_err_rfr] = sample_list(predictDF_rfr,npts)

In [52]:
f, axes = plt.subplots(1,3,figsize=(25,8))
plot_base = axes[0].scatter(loss_base,abs_err_base,s=8,c='red',linewidth=0,alpha=0.7)
plot_lin = axes[0].scatter(loss_lin,abs_err_lin,s=8,c='blue',linewidth=0,alpha=0.7)
axes[0].legend((plot_base,plot_lin),('Baseline','Linear'))
axes[0].set_xlabel('Loss')
axes[0].set_ylabel('Error')

plot_base = axes[1].scatter(loss_base,abs_err_base,s=8,c='red',linewidth=0,alpha=0.7)
plot_lin = axes[1].scatter(loss_lin,abs_err_lin,s=8,c='blue',linewidth=0,alpha=0.7)
plot_log = axes[1].scatter(loss_log,abs_err_log,s=8,c='green',linewidth=0,alpha=0.7)
axes[1].set_xlabel('Loss')
axes[1].set_ylabel('Error')
axes[1].legend((plot_base,plot_lin,plot_log),('Baseline','Linear','Log'))

plot_base = axes[2].scatter(loss_base,abs_err_base,s=8,c='red',linewidth=0,alpha=0.7)
plot_lin = axes[2].scatter(loss_lin,abs_err_lin,s=8,c='blue',linewidth=0,alpha=0.7)
plot_log = axes[2].scatter(loss_log,abs_err_log,s=8,c='green',linewidth=0,alpha=0.7)
plot_rfr = axes[2].scatter(loss_rfr,abs_err_rfr,s=8,c='magenta',linewidth=0,alpha=0.7)
axes[2].set_xlabel('Loss')
axes[2].set_ylabel('Error')
axes[2].legend((plot_base,plot_lin,plot_log,plot_rfr),('Baseline','Linear','Log','RFR'))

display(f)

* The reduction in variance is visible from linear to log-transform and to random forest regression

In [54]:
f, axes = plt.subplots(1,2,figsize=(10,6))

plot_base = axes[0].hist([rel_err_base,rel_err_lin,rel_err_log,rel_err_rfr], bins=20, log=False)
axes[0].set_xlabel('Rel Error')
axes[0].set_ylabel('Count')
axes[0].set_xlim(0,3)
#axes[0].set_ylim(0,50)
axes[0].legend(('Baseline','Linear','Log','RFR'))


plot_base = axes[1].hist([rel_err_base,rel_err_lin,rel_err_log,rel_err_rfr], bins=10, log=False)
axes[1].set_xlabel('Rel Error')
axes[1].set_ylabel('Count')
axes[1].set_xlim(3,18)
axes[1].set_ylim(0,60)
axes[1].legend(('Baseline','Linear','Log','RFR'))

display(f)