In [1]:
import matplotlib.pyplot as plt # plotting
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
#from pyspark.sql import Row
from pyspark.sql import SQLContext
#from pyspark import SparkFiles
from pyspark.sql.types import *
import datetime
import pyspark.sql.functions as f
#from sklearn.model_selection import StratifiedShuffleSplit
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1599211381522_0148,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
#get SQL context for the current Spark session to run SQL read
sqlContext = SQLContext(sc)

*<font size="5"> Replace ADLS_NAME in the cell below with the ADLS Storage name you just created </font>*

In [3]:
# Location of training data
ins_train_file_loc = "abfs://data@<ADLS NAME>.dfs.core.windows.net/car_insurance_claim.csv"
# Set model storage directory path. This is where models will be saved. Replace the <ADLS_NAME> with the actual ADLS Name
CatModelLoc = "abfs://models@<ADLS_NAME>.dfs.core.windows.net/CatMod/"; # The last backslash is needed;
IntModelLoc = "abfs://models@<ADLS_NAME>.dfs.core.windows.net/IntMod/"
PipelineLoc = "abfs://models@<ADLS_NAME>.dfs.core.windows.net/PipelineMod/"

In [4]:
# Let's make sure that we got the locations correct
print(ins_train_file_loc)

abfs://data@dlinsure.dfs.core.windows.net/car_insurance_claim.csv


**<font size="11">READ THE CSV AND CLEAN THE DATA </font>**

*<font size="5">we use sqlcontext to read the file to infer the schema as it is in the csv</font>*


In [5]:
ClaimData = sqlContext.read.csv(ins_train_file_loc, header=True, inferSchema=True)

In [6]:
ClaimData.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- BIRTH: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- HOMEKIDS: integer (nullable = true)
 |-- YOJ: integer (nullable = true)
 |-- INCOME: string (nullable = true)
 |-- MSTATUS: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- EDUCATION: string (nullable = true)
 |-- OCCUPATION: string (nullable = true)
 |-- TRAVTIME: integer (nullable = true)
 |-- BLUEBOOK: string (nullable = true)
 |-- TIF: integer (nullable = true)
 |-- CAR_TYPE: string (nullable = true)
 |-- OLDCLAIM: string (nullable = true)
 |-- CLM_FREQ: integer (nullable = true)
 |-- MVR_PTS: integer (nullable = true)
 |-- CLM_AMT: string (nullable = true)
 |-- CAR_AGE: integer (nullable = true)
 |-- CLAIM_FLAG: integer (nullable = true)

In [7]:
#Display the columns and see how the columns are processed
pd.DataFrame(ClaimData.take(5), columns=ClaimData.columns)

          ID      BIRTH  AGE  HOMEKIDS  YOJ    INCOME MSTATUS GENDER  \
0   63581743  16-Mar-39   60         0   11  $67,349     z_No      M   
1  132761049  21-Jan-56   43         0   11  $91,449     z_No      M   
2  921317019  18-Nov-51   48         0   11  $52,881     z_No      M   
3  727598473   5-Mar-64   35         1   10  $16,039      Yes    z_F   
4  450221861   5-Jun-48   51         0   14      None     Yes      M   

       EDUCATION     OCCUPATION  TRAVTIME  BLUEBOOK  TIF CAR_TYPE  OLDCLAIM  \
0            PhD   Professional        14  $14,230    11  Minivan   $4,461    
1  z_High School  z_Blue Collar        22  $14,940     1  Minivan       $0    
2      Bachelors        Manager        26  $21,970     1      Van       $0    
3  z_High School       Clerical         5   $4,010     4    z_SUV  $38,690    
4   <High School  z_Blue Collar        32  $15,440     7  Minivan       $0    

   CLM_FREQ  MVR_PTS CLM_AMT  CAR_AGE  CLAIM_FLAG  
0         2        3     $0        18   

In [8]:
#Make the data more readable
ClaimData_df=ClaimData.withColumnRenamed("YOJ","YearsOnJob")\
            .withColumnRenamed("TRAVTIME","Travel_time")\
            .withColumnRenamed("TIF","Time_In_Force")\
            .withColumnRenamed("OLDCLAIM","OLDCLAIM_AMT")\
            .withColumnRenamed("CLM_FREQ","CLAIM_FREQ")\
            .withColumnRenamed("CLM_AMT","TARGET_CLAIM") \
            .withColumnRenamed("CLAIM_FLAG","CRASH_FLAG")

In [9]:
#Display DUplicate Data
ClaimData_df.groupBy(ClaimData_df.columns)\
            .count()\
            .where(f.col('count') > 1)

DataFrame[ID: int, BIRTH: string, AGE: int, HOMEKIDS: int, YearsOnJob: int, INCOME: string, MSTATUS: string, GENDER: string, EDUCATION: string, OCCUPATION: string, Travel_time: int, BLUEBOOK: string, Time_In_Force: int, CAR_TYPE: string, OLDCLAIM_AMT: string, CLAIM_FREQ: int, MVR_PTS: int, TARGET_CLAIM: string, CAR_AGE: int, CRASH_FLAG: int, count: bigint]

**<fontsize='10'>we have made sure columns are readable. Now let us clean the data and make sure there are no nulls or unnecessary columns</font>**

In [10]:
#Clean the string data and remove unnecessary characters
ClaimData_df=ClaimData_df.withColumn("INCOME", f.regexp_replace(f.col("INCOME"), "[$#,]", ""))
ClaimData_df=ClaimData_df.withColumn("BLUEBOOK", f.regexp_replace(f.col("BLUEBOOK"), "[$#,]", ""))
ClaimData_df=ClaimData_df.withColumn("OLDCLAIM_AMT", f.regexp_replace(f.col("OLDCLAIM_AMT"), "[$#,]", ""))
ClaimData_df=ClaimData_df.withColumn("TARGET_CLAIM", f.regexp_replace(f.col("TARGET_CLAIM"), "[$#,]", ""))

In [11]:
ClaimData_df=ClaimData_df.withColumn("MSTATUS", f.regexp_replace(f.col("MSTATUS"), "z_", ""))
ClaimData_df=ClaimData_df.withColumn("GENDER", f.regexp_replace(f.col("GENDER"), "z_", ""))
ClaimData_df=ClaimData_df.withColumn("EDUCATION", f.regexp_replace(f.col("EDUCATION"), "[z_<]", ""))
ClaimData_df=ClaimData_df.withColumn("OCCUPATION", f.regexp_replace(f.col("OCCUPATION"), "z_", ""))
ClaimData_df=ClaimData_df.withColumn("CAR_TYPE", f.regexp_replace(f.col("CAR_TYPE"), "z_", ""))

In [12]:
#Trim to remove all extra whitespaces so casting to integer is easy
ClaimData_df=ClaimData_df.withColumn("INCOME", f.trim(ClaimData_df.INCOME))
ClaimData_df=ClaimData_df.withColumn("BLUEBOOK", f.trim(ClaimData_df.BLUEBOOK))
ClaimData_df=ClaimData_df.withColumn("OLDCLAIM_AMT", f.trim(ClaimData_df.OLDCLAIM_AMT))
ClaimData_df=ClaimData_df.withColumn("TARGET_CLAIM", f.trim(ClaimData_df.TARGET_CLAIM))

In [13]:
#Drop all null values from all rows
ClaimData_df = ClaimData_df.na.drop()

In [14]:
#Let's adjust the datatypes to make sure we assign the right dayatypes
ClaimData_df=ClaimData_df.withColumn('INCOME',ClaimData_df["INCOME"].cast(IntegerType()))
ClaimData_df=ClaimData_df.withColumn('BLUEBOOK',ClaimData_df["BLUEBOOK"].cast(IntegerType()))
ClaimData_df=ClaimData_df.withColumn('OLDCLAIM_AMT',ClaimData_df["OLDCLAIM_AMT"].cast(IntegerType()))
ClaimData_df=ClaimData_df.withColumn('TARGET_CLAIM',ClaimData_df["TARGET_CLAIM"].cast(IntegerType()))
ClaimData_df=ClaimData_df.withColumn('CRASH_FLAG',ClaimData_df["CRASH_FLAG"].cast(IntegerType()))

In [15]:
#check final schema
ClaimData_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- BIRTH: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- HOMEKIDS: integer (nullable = true)
 |-- YearsOnJob: integer (nullable = true)
 |-- INCOME: integer (nullable = true)
 |-- MSTATUS: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- EDUCATION: string (nullable = true)
 |-- OCCUPATION: string (nullable = true)
 |-- Travel_time: integer (nullable = true)
 |-- BLUEBOOK: integer (nullable = true)
 |-- Time_In_Force: integer (nullable = true)
 |-- CAR_TYPE: string (nullable = true)
 |-- OLDCLAIM_AMT: integer (nullable = true)
 |-- CLAIM_FREQ: integer (nullable = true)
 |-- MVR_PTS: integer (nullable = true)
 |-- TARGET_CLAIM: integer (nullable = true)
 |-- CAR_AGE: integer (nullable = true)
 |-- CRASH_FLAG: integer (nullable = true)

In [16]:
#describe the numeric data to find any anomalies
numeric_features=[t[0] for t in ClaimData_df.dtypes if t[1]== 'int']
ClaimData_df.select(numeric_features).describe().toPandas().transpose()

                   0                    1                   2       3  \
summary        count                 mean              stddev     min   
ID              8091  4.975425108930911E8  2.85190360837908E8  246910   
AGE             8091   44.691138301816835   8.643031693932897      16   
HOMEKIDS        8091    0.739216413298727  1.1276799767862071       0   
YearsOnJob      8091   10.440489432703004    4.17340593998557       0   
INCOME          8091    57787.98207885305   44070.69398819117       0   
Travel_time     8091    33.61055493758497  15.821258478790469       5   
BLUEBOOK        8091   15173.859844271412   8071.717088459349    1500   
Time_In_Force   8091    5.333951303917933   4.128523345661906       1   
OLDCLAIM_AMT    8091   4027.5213199851687   8791.546766776568       0   
CLAIM_FREQ      8091   0.7911259424051416  1.1504286750162802       0   
MVR_PTS         8091   1.7210480781114819  2.1791064700240286       0   
TARGET_CLAIM    8091   1503.4407366209368   4631.68

In [17]:
#Let's drop that row that shows has negative car age
ClaimData_df=ClaimData_df.where(ClaimData_df.CAR_AGE!=-3)

*<fontsize='5'> Let us examine the data *</font> 

In [18]:
# Scatter and density plots
def plotScatterMatrix(df, plotSize, textSize):
    numeric_features=[t[0] for t in df.dtypes if t[1]== 'int']
    df=df.select(numeric_features).toPandas() # keep only numerical columns
    df = df[[col for col in df if df[col].nunique() > 1]] # keep columns where there are more than 1 unique values
    n = len(df.columns)
    columnNames = list(df)
    if n > 10: # reduce the number of columns for matrix inversion of kernel density plots
        columnNames = columnNames[:10]
    df = df[columnNames]
    ax = pd.scatter_matrix(df, alpha=0.75, figsize=[plotSize, plotSize])
    corrs = df.corr().values
    for i, j in zip(*plt.np.triu_indices_from(ax, k = 1)):
        v = ax[i, 0]
        v.yaxis.label.set_rotation(0)
        v.yaxis.label.set_ha('right')
        v.set_yticks(())
        h = ax[9, i]
        h.xaxis.label.set_rotation(90)    
        h.set_xticks(())
        ax[i, j].annotate('%.3f' % corrs[i, j], (0.8, 0.2), xycoords='axes fraction', ha='center', va='center', size=textSize)
    plt.suptitle('Scatter and Density Plot')
    plt.show()

In [19]:
plotScatterMatrix(ClaimData_df, 12, 15)



<fontsize='8'> we will keep all the features for now because they do not seem to be strongly correlated </font>

<fontsize='12'> *let us prepare the columds for features by converting the string type features to categorical vectors*</font>

In [20]:
y_udf=f.udf(lambda y: "No" if y==0 else "Yes", StringType())
new_df=ClaimData_df.withColumn("CRASH_FLAGG", y_udf('CRASH_FLAG')).drop("CRASH_FLAG")
ClaimData_df.checkpoint

<bound method DataFrame.checkpoint of DataFrame[ID: int, BIRTH: string, AGE: int, HOMEKIDS: int, YearsOnJob: int, INCOME: int, MSTATUS: string, GENDER: string, EDUCATION: string, OCCUPATION: string, Travel_time: int, BLUEBOOK: int, Time_In_Force: int, CAR_TYPE: string, OLDCLAIM_AMT: int, CLAIM_FREQ: int, MVR_PTS: int, TARGET_CLAIM: int, CAR_AGE: int, CRASH_FLAG: int]>

In [21]:
new_df=new_df.withColumnRenamed("CRASH_FLAGG","CRASH_FLAG")

In [48]:
numeric_features=[t[0] for t in new_df.dtypes if t[1]== 'int']
Categoric_features=[t[0] for t in new_df.dtypes if t[1]== 'string']
print(Categoric_features)
print(numeric_features)

['BIRTH', 'MSTATUS', 'GENDER', 'EDUCATION', 'OCCUPATION', 'CAR_TYPE', 'CRASH_FLAG']
['ID', 'AGE', 'HOMEKIDS', 'YearsOnJob', 'INCOME', 'Travel_time', 'BLUEBOOK', 'Time_In_Force', 'OLDCLAIM_AMT', 'CLAIM_FREQ', 'MVR_PTS', 'TARGET_CLAIM', 'CAR_AGE']

In [49]:
#Remove the target fields from feature list
numeric_features.remove('TARGET_CLAIM')
Categoric_features.remove('CRASH_FLAG')
#Also remove the features that we know are too unique to make a difference
Categoric_features.remove('BIRTH')
numeric_features.remove('ID')

In [50]:
stages = []

for categoricalCol in Categoric_features:
    stringIndexer = StringIndexer() \
                    .setInputCol (categoricalCol) \
                    .setOutputCol (categoricalCol + '_Index') \
                    .setHandleInvalid ("skip")
    stages += [stringIndexer]
        

stages

[StringIndexer_db9f603b42c6, StringIndexer_8dda92dd1d31, StringIndexer_f3eb3eb955ae, StringIndexer_45a5d5c76658, StringIndexer_db6a76972b36]

In [51]:
for categoricalCol in Categoric_features:
    encoder = OneHotEncoderEstimator() \
                .setInputCols ([categoricalCol + '_Index']) \
                .setOutputCols ([categoricalCol + "classVec"])
    stages += [encoder]
    
stages

[StringIndexer_db9f603b42c6, StringIndexer_8dda92dd1d31, StringIndexer_f3eb3eb955ae, StringIndexer_45a5d5c76658, StringIndexer_db6a76972b36, OneHotEncoderEstimator_8dc3813bca87, OneHotEncoderEstimator_4543c1e28006, OneHotEncoderEstimator_22868e3c800b, OneHotEncoderEstimator_6a5ec6d9539b, OneHotEncoderEstimator_0ea8e47e75b8]

In [52]:
label_stringIdx = StringIndexer(inputCol = 'CRASH_FLAG', outputCol = 'label')
stages += [label_stringIdx]

In [53]:
assemblerInputs = [c + "classVec" for c in Categoric_features] + numeric_features

In [55]:
assembler = VectorAssembler()\
            .setInputCols(assemblerInputs) \
            .setOutputCol("vec_features") 
stages += [assembler]

In [56]:
scaler = StandardScaler()\
         .setInputCol("vec_features") \
         .setOutputCol("features") 
stages += [scaler]

In [57]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(new_df)

testdf=pipelineModel.transform(new_df)

In [58]:
pipelineModel.write().overwrite().save(PipelineLoc)

<font size='5'> * now let us build and run the models to predict whether there is going to be a crash ('CRASH_FLAG') * </font>

In [59]:
#split the rows into 70% training and 30% testing sets
splits=testdf.randomSplit([0.7, 0.3], 2018)

In [60]:
train_df=splits[0]
test_df=splits[1]
print(train_df.count())
print(test_df.count())

5631
2459

In [61]:
#use Binomial Logistic regression to predict "CRASH_FLAG"

lr = LogisticRegression(featuresCol= 'features', labelCol='label', maxIter=10)
# Fit the model
lrModel = lr.fit(train_df)

b= np.sort(lrModel.coefficients)
plt.plot(b)
plt.ylabel('Beta Coefficients')
plt.show()

In [62]:
#run the model to predict and measure the accuracy of model
predictions=lrModel.transform(test_df)
predictions.select('label','features','rawPrediction','prediction','probability').toPandas().head(5)

   label                                           features  \
0    0.0  (2.04038299006, 2.01380935057, 0.0, 0.0, 2.640...   
1    0.0  (2.04038299006, 0.0, 2.0041819363, 0.0, 0.0, 0...   
2    1.0  (2.04038299006, 0.0, 2.0041819363, 0.0, 0.0, 0...   
3    0.0  (2.04038299006, 2.01380935057, 0.0, 0.0, 2.640...   
4    0.0  (2.04038299006, 0.0, 0.0, 2.20732455419, 0.0, ...   

                       rawPrediction  prediction  \
0  [0.353134344448, -0.353134344448]         0.0   
1    [2.11544048216, -2.11544048216]         0.0   
2  [0.206407057154, -0.206407057154]         0.0   
3    [2.18973547341, -2.18973547341]         0.0   
4    [2.26485516621, -2.26485516621]         0.0   

                         probability  
0   [0.587377442364, 0.412622557636]  
1     [0.89239487877, 0.10760512123]  
2   [0.551419338535, 0.448580661465]  
3   [0.899323958629, 0.100676041371]  
4  [0.905924231399, 0.0940757686007]

In [63]:
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

('Test Area Under ROC', 0.7563209593060258)

In [64]:
#Use random forest classifiers to predict CRASH_FLAG
RF = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = RF.fit(train_df)
predictions_rf = rfModel.transform(test_df)
predictions_rf.select('label', 'features','rawPrediction', 'prediction', 'probability').show(10)

+-----+--------------------+--------------------+----------+--------------------+
|label|            features|       rawPrediction|prediction|         probability|
+-----+--------------------+--------------------+----------+--------------------+
|  0.0|(28,[0,1,4,9,12,1...|[13.5215237297247...|       0.0|[0.67607618648623...|
|  0.0|(28,[0,2,6,13,17,...|[16.5917439820685...|       0.0|[0.82958719910342...|
|  1.0|(28,[0,2,6,14,17,...|[10.1372360342537...|       0.0|[0.50686180171268...|
|  0.0|(28,[0,1,4,8,12,1...|[16.8987925076505...|       0.0|[0.84493962538252...|
|  0.0|(28,[0,3,6,13,17,...|[17.4124403360897...|       0.0|[0.87062201680448...|
|  0.0|(28,[0,1,2,5,12,1...|[10.9053031417032...|       0.0|[0.54526515708516...|
|  0.0|(28,[0,2,5,13,17,...|[15.8498946532182...|       0.0|[0.79249473266091...|
|  1.0|(28,[1,3,8,16,17,...|[16.4047413878919...|       0.0|[0.82023706939459...|
|  0.0|(28,[0,4,9,13,17,...|[16.0338304493869...|       0.0|[0.80169152246934...|
|  0.0|(28,[1,2,

In [65]:
print('Test Area Under ROC', evaluator.evaluate(predictions_rf))

('Test Area Under ROC', 0.7524760811327967)

*<font size ='8'> Now let us filter the data to pick all the rows that have had a crash to predict what their claim amount would be </font>*

In [66]:
tempdf = testdf.filter(testdf.CRASH_FLAG=="Yes")

In [67]:
splits=tempdf.randomSplit([0.7, 0.3], 2018)
train_df=splits[0]
test_df=splits[1]

<bold> Ideally, you would tune the Parameters to see which ones have the best impact and choose the best model. We're not doing that here since it takes time and is out of scope for this demonstration </bold>

In [68]:
#Run Linear Regression to predict 'TARGET_CLAIM'
claims = LinearRegression(featuresCol='features', labelCol='TARGET_CLAIM', maxIter=5, regParam=0.3)
claim_model = claims.fit(train_df)

b= np.sort(claim_model.coefficients)
plt.plot(b)
plt.ylabel('Beta Coefficients')
plt.show()

In [69]:
claim_pred = claim_model.transform(test_df)
claim_pred.select('CRASH_FLAG','TARGET_CLAIM','features','prediction').show()

+----------+------------+--------------------+------------------+
|CRASH_FLAG|TARGET_CLAIM|            features|        prediction|
+----------+------------+--------------------+------------------+
|       Yes|        5324|(28,[0,2,7,17,19,...| 6339.327476900963|
|       Yes|        6882|(28,[2,6,14,17,18...| 5134.534975649354|
|       Yes|        6907|(28,[0,1,2,5,12,1...|2949.2306128687014|
|       Yes|        1125|(28,[3,6,17,19,20...| 7667.561400494532|
|       Yes|        4259|(28,[1,2,6,15,17,...| 4565.712072418948|
|       Yes|       22244|(28,[3,5,16,17,19...| 7223.038489286063|
|       Yes|        3085|(28,[1,2,11,12,17...| 5076.875329152249|
|       Yes|        2091|(28,[1,2,11,12,17...| 5576.430390951745|
|       Yes|        6005|(28,[0,1,3,7,15,1...|4646.4551111062765|
|       Yes|       28706|(28,[1,4,9,13,17,...| 5258.461218819752|
|       Yes|        4373|(28,[1,3,7,12,17,...| 5382.225994645652|
|       Yes|       20883|(28,[2,5,16,17,19...|7112.5516035881255|
|       Ye

In [70]:
regevaluator = RegressionEvaluator()
print('Test Area Under ROC', regevaluator.evaluate(claim_pred))

('Test Area Under ROC', 5401.871214812071)

In [71]:
gbt = GBTRegressor(featuresCol='features', labelCol='TARGET_CLAIM', maxIter=5)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'TARGET_CLAIM', 'features').show(5)

+------------------+------------+--------------------+
|        prediction|TARGET_CLAIM|            features|
+------------------+------------+--------------------+
|  4759.80861919182|        5324|(28,[0,2,7,17,19,...|
|  3776.46193528764|        6882|(28,[2,6,14,17,18...|
|4938.9403517537285|        6907|(28,[0,1,2,5,12,1...|
|  6755.23963954104|        1125|(28,[3,6,17,19,20...|
|3432.6857360226772|        4259|(28,[1,2,6,15,17,...|
+------------------+------------+--------------------+
only showing top 5 rows

In [72]:
print('Test Area Under ROC', regevaluator.evaluate(gbt_predictions))

('Test Area Under ROC', 5647.310157530764)

In [73]:
#Persist the model to the containers to use them later
lrModel.write().overwrite().save(CatModelLoc)
gbt_model.write().overwrite().save(IntModelLoc)