In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# **Loading Dataset**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("LinearRegression").getOrCreate()
df = spark.read.csv('/content/drive/My Drive/healthinsurance.csv', header=True, inferSchema=True)
df.show(5)

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
|  1|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|
|  2|  Male| 76|              1|        3.0|                 0|   1-2 Year|            No|       33536.0|                26.0|    183|       0|
|  3|  Male| 47|              1|       28.0|                 0|  > 2 Years|           Yes|       38294.0|                26.0|     27|       1|
|  4|  Male| 21|              1|       11.0|                 1|   < 1 Year|            No|       28619.0|               152.0|    203|  

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Driving_License: integer (nullable = true)
 |-- Region_Code: double (nullable = true)
 |-- Previously_Insured: integer (nullable = true)
 |-- Vehicle_Age: string (nullable = true)
 |-- Vehicle_Damage: string (nullable = true)
 |-- Annual_Premium: double (nullable = true)
 |-- Policy_Sales_Channel: double (nullable = true)
 |-- Vintage: integer (nullable = true)
 |-- Response: integer (nullable = true)



In [None]:
# get the dimensions of the data
(df.count() , len(df.columns))

(381109, 12)

In [None]:
df.groupby('Response').count().show()

+--------+------+
|Response| count|
+--------+------+
|       1| 46710|
|       0|334399|
+--------+------+



In [None]:
df.describe().show()

+-------+------------------+------+------------------+--------------------+------------------+-------------------+-----------+--------------+------------------+--------------------+------------------+-------------------+
|summary|                id|Gender|               Age|     Driving_License|       Region_Code| Previously_Insured|Vehicle_Age|Vehicle_Damage|    Annual_Premium|Policy_Sales_Channel|           Vintage|           Response|
+-------+------------------+------+------------------+--------------------+------------------+-------------------+-----------+--------------+------------------+--------------------+------------------+-------------------+
|  count|            381109|381109|            381109|              381109|            381109|             381109|     381109|        381109|            381109|              381109|            381109|             381109|
|   mean|          190555.0|  null|38.822583565331705|  0.9978693759528114|26.388807401557035| 0.4582101183650871|  

In [None]:
from pyspark.sql import functions as F
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
|  0|     0|  0|              0|          0|                 0|          0|             0|             0|                   0|      0|       0|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+



In [None]:
df.groupBy("Gender").agg(F.sum("Response")).show()

+------+-------------+
|Gender|sum(Response)|
+------+-------------+
|Female|        18185|
|  Male|        28525|
+------+-------------+



In [None]:
#Selecting few columns by their names
df.select(['Gender', 'Age', 'Policy_Sales_Channel']).show(5, False) # Method 1

df.select(df["Age"], df["Gender"], df["Policy_Sales_Channel"]).show(10, False) # Method 2

from pyspark.sql.functions import col # Method 3 - By import SQL function col
df.select(col("Age"), col("Gender"), col("Policy_Sales_Channel")).show(5, False)

+------+---+--------------------+
|Gender|Age|Policy_Sales_Channel|
+------+---+--------------------+
|Male  |44 |26.0                |
|Male  |76 |26.0                |
|Male  |47 |26.0                |
|Male  |21 |152.0               |
|Female|29 |152.0               |
+------+---+--------------------+
only showing top 5 rows

+---+------+--------------------+
|Age|Gender|Policy_Sales_Channel|
+---+------+--------------------+
|44 |Male  |26.0                |
|76 |Male  |26.0                |
|47 |Male  |26.0                |
|21 |Male  |152.0               |
|29 |Female|152.0               |
|24 |Female|160.0               |
|23 |Male  |152.0               |
|56 |Female|26.0                |
|24 |Female|152.0               |
|32 |Female|152.0               |
+---+------+--------------------+
only showing top 10 rows

+---+------+--------------------+
|Age|Gender|Policy_Sales_Channel|
+---+------+--------------------+
|44 |Male  |26.0                |
|76 |Male  |26.0               

In [None]:
#Creating a subset of dataframe using Filter
df.filter(df["Age"] > 30 ).filter(df["Age"] < 45).show(5) # Selecting subset using chain of Filter of option
df.filter((df["Age"] > 30) & (df["Age"] < 45 )).show(5, False) # Selcting subset by using AND between the criteria

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
|  1|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|
| 10|Female| 32|              1|        6.0|                 1|   < 1 Year|            No|       28771.0|               152.0|     80|       0|
| 13|Female| 41|              1|       15.0|                 1|   1-2 Year|            No|       31409.0|                14.0|    221|       0|
| 16|  Male| 37|              1|        6.0|                 0|   1-2 Year|           Yes|        2630.0|               156.0|    147|  

In [None]:
#df.drop('Gender','Vehicle_Age').show()
df.drop('Gender').show()

+---+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
| id|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|
+---+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
|  1| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|
|  2| 76|              1|        3.0|                 0|   1-2 Year|            No|       33536.0|                26.0|    183|       0|
|  3| 47|              1|       28.0|                 0|  > 2 Years|           Yes|       38294.0|                26.0|     27|       1|
|  4| 21|              1|       11.0|                 1|   < 1 Year|            No|       28619.0|               152.0|    203|       0|
|  5| 29|              1|       41.0|    

**String Indexer**

In [None]:
from pyspark.ml.feature import StringIndexer
# create object of StringIndexer class and specify input and output column
indexer1 = StringIndexer(inputCol='Vehicle_Damage',outputCol='Vehicle_Damage_Index')
indexer2 = StringIndexer(inputCol='Vehicle_Age',outputCol='Vehicle_Age_Index')
# transform the data
df = indexer1.fit(df).transform(df)
df = indexer2.fit(df).transform(df)
# view the transformed data
df.select('Vehicle_Damage','Vehicle_Damage_Index','Vehicle_Age','Vehicle_Age_Index').show(10)

+--------------+--------------------+-----------+-----------------+
|Vehicle_Damage|Vehicle_Damage_Index|Vehicle_Age|Vehicle_Age_Index|
+--------------+--------------------+-----------+-----------------+
|           Yes|                 0.0|  > 2 Years|              2.0|
|            No|                 1.0|   1-2 Year|              0.0|
|           Yes|                 0.0|  > 2 Years|              2.0|
|            No|                 1.0|   < 1 Year|              1.0|
|            No|                 1.0|   < 1 Year|              1.0|
|           Yes|                 0.0|   < 1 Year|              1.0|
|           Yes|                 0.0|   < 1 Year|              1.0|
|           Yes|                 0.0|   1-2 Year|              0.0|
|            No|                 1.0|   < 1 Year|              1.0|
|            No|                 1.0|   < 1 Year|              1.0|
+--------------+--------------------+-----------+-----------------+
only showing top 10 rows



**OneHotEncoder**

In [None]:
from pyspark.ml.feature import OneHotEncoder
OHE = OneHotEncoder(inputCols=['Vehicle_Damage_Index', 'Vehicle_Age_Index'],outputCols=['Vehicle_Damage_OHE', 'Vehicle_Age_OHE'])
# transform the data
df= OHE.fit(df).transform(df)
# view and transform the data
df.select('Vehicle_Damage', 'Vehicle_Damage_Index', 'Vehicle_Damage_OHE', 'Vehicle_Age', 'Vehicle_Age_Index', 'Vehicle_Age_OHE').show(5)

+--------------+--------------------+------------------+-----------+-----------------+---------------+
|Vehicle_Damage|Vehicle_Damage_Index|Vehicle_Damage_OHE|Vehicle_Age|Vehicle_Age_Index|Vehicle_Age_OHE|
+--------------+--------------------+------------------+-----------+-----------------+---------------+
|           Yes|                 0.0|     (1,[0],[1.0])|  > 2 Years|              2.0|      (2,[],[])|
|            No|                 1.0|         (1,[],[])|   1-2 Year|              0.0|  (2,[0],[1.0])|
|           Yes|                 0.0|     (1,[0],[1.0])|  > 2 Years|              2.0|      (2,[],[])|
|            No|                 1.0|         (1,[],[])|   < 1 Year|              1.0|  (2,[1],[1.0])|
|            No|                 1.0|         (1,[],[])|   < 1 Year|              1.0|  (2,[1],[1.0])|
+--------------+--------------------+------------------+-----------+-----------------+---------------+
only showing top 5 rows



**vector Assembler**

In [None]:
from pyspark.ml.feature import VectorAssembler
inputCols=['Age','Driving_License','Region_Code','Previously_Insured','Vehicle_Age_Index','Vehicle_Damage_Index','Annual_Premium','Policy_Sales_Channel','Vintage']
print(inputCols)
vectorAssembler = VectorAssembler(inputCols = inputCols, outputCol = 'features')
output = vectorAssembler.transform(df)
#display features and the label column
output.select("features", "Response").show(truncate=False)

['Age', 'Driving_License', 'Region_Code', 'Previously_Insured', 'Vehicle_Age_Index', 'Vehicle_Damage_Index', 'Annual_Premium', 'Policy_Sales_Channel', 'Vintage']
+-----------------------------------------------+--------+
|features                                       |Response|
+-----------------------------------------------+--------+
|[44.0,1.0,28.0,0.0,2.0,0.0,40454.0,26.0,217.0] |1       |
|[76.0,1.0,3.0,0.0,0.0,1.0,33536.0,26.0,183.0]  |0       |
|[47.0,1.0,28.0,0.0,2.0,0.0,38294.0,26.0,27.0]  |1       |
|[21.0,1.0,11.0,1.0,1.0,1.0,28619.0,152.0,203.0]|0       |
|[29.0,1.0,41.0,1.0,1.0,1.0,27496.0,152.0,39.0] |0       |
|[24.0,1.0,33.0,0.0,1.0,0.0,2630.0,160.0,176.0] |0       |
|[23.0,1.0,11.0,0.0,1.0,0.0,23367.0,152.0,249.0]|0       |
|[56.0,1.0,28.0,0.0,0.0,0.0,32031.0,26.0,72.0]  |1       |
|[24.0,1.0,3.0,1.0,1.0,1.0,27619.0,152.0,28.0]  |0       |
|[32.0,1.0,6.0,1.0,1.0,1.0,28771.0,152.0,80.0]  |0       |
|[47.0,1.0,35.0,0.0,0.0,0.0,47576.0,124.0,46.0] |1       |
|[24.0,1.0,5

In [None]:
output.show(5)

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+--------------------+-----------------+------------------+---------------+--------------------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|Vehicle_Damage_Index|Vehicle_Age_Index|Vehicle_Damage_OHE|Vehicle_Age_OHE|            features|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+--------------------+-----------------+------------------+---------------+--------------------+
|  1|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|                 0.0|              2.0|     (1,[0],[1.0])|      (2,[],[])|[44.0,1.0,28.0,0....|
|  2|  Male| 76|              1|        3.0|

In [None]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features', outputCol="scaledFeatures", withStd=False, withMean=True)
scalerModel = scaler.fit(output)
scaledData = scalerModel.transform(output)
scaledData.select(['scaledFeatures']).show(5, truncate = False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|scaledFeatures                                                                                                                                                               |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[5.177416434667499,0.0021306240471885562,1.6111925984424893,-0.4582101183650871,1.4836122998932133,-0.4951234423747537,9889.610418541386,-86.0342946506107,62.65260332346244]|
|[37.1774164346675,0.0021306240471885562,-23.38880740155751,-0.4582101183650871,-0.5163877001067866,0.5048765576252463,2971.610418541386,-86.0342946506107,28.652603323462444]|
|[8.1774164346675,0.0021306240471885562,1.6111925984424893,-0.4582101183650871,1.4836122998932133,-0.4951234423747537,77

**PCA**

In [None]:
from pyspark.ml.feature import PCA
pca = PCA(k=2, inputCol = scaler.getOutputCol(), outputCol="pcaFeatures")
model = pca.fit(scaledData)
transformed_feature = model.transform(scaledData)
transformed_feature.select(['pcaFeatures']).show(5, truncate = False)

+-----------------------------------------+
|pcaFeatures                              |
+-----------------------------------------+
|[-9889.640570062405,-62.68362679820971]  |
|[-2971.6432718595656,-28.666926921856515]|
|[-7729.641455922532,127.32342306780409]  |
|[1945.4048090866763,-48.656157152989714] |
|[3068.404007971356,115.36240755168731]   |
+-----------------------------------------+
only showing top 5 rows



**Split Train & Test**

In [None]:
(trainingData, testData) = output.randomSplit([0.7, 0.3], seed=5)
trainingData.count()

266652

In [None]:
testData.count()

114457

**ML Algorithms**

**GBT(Gradient Boost Tree) ****

In [None]:
from __future__ import print_function
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
gbt = GBTClassifier(labelCol="Response", featuresCol="features")
gbModel = gbt.fit(trainingData)
gb_predictions = gbModel.transform(testData)
print(gb_predictions)
gb_predictions.select("prediction", "Response", "features").show(5)
multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Response', metricName = 'accuracy')
print('Gradient-boosted Trees Accuracy:', multi_evaluator.evaluate(gb_predictions))
evaluator = BinaryClassificationEvaluator(labelCol='Response', metricName='areaUnderROC')
roc_gbt=evaluator.evaluate(gb_predictions)
print('GBT_ROC SCORE:',roc_gbt )

DataFrame[id: int, Gender: string, Age: int, Driving_License: int, Region_Code: double, Previously_Insured: int, Vehicle_Age: string, Vehicle_Damage: string, Annual_Premium: double, Policy_Sales_Channel: double, Vintage: int, Response: int, Vehicle_Damage_Index: double, Vehicle_Age_Index: double, Vehicle_Damage_OHE: vector, Vehicle_Age_OHE: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]
+----------+--------+--------------------+
|prediction|Response|            features|
+----------+--------+--------------------+
|       0.0|       1|[47.0,1.0,28.0,0....|
|       0.0|       1|[56.0,1.0,28.0,0....|
|       0.0|       0|[76.0,1.0,28.0,0....|
|       0.0|       0|[25.0,1.0,35.0,1....|
|       0.0|       1|[51.0,1.0,28.0,0....|
+----------+--------+--------------------+
only showing top 5 rows

Gradient-boosted Trees Accuracy: 0.8786968031662545
GBT_ROC SCORE: 0.8528597869299667


**Random Forest**

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='Response', 
                            featuresCol='features',
                            maxDepth=5)
model = rf.fit(trainingData)
rf_predictions = model.transform(testData)
print(rf_predictions)
rf_predictions.select("prediction", "Response", "features").show(5)
multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Response', metricName = 'accuracy')
print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions))
evaluator = BinaryClassificationEvaluator(labelCol='Response', metricName='areaUnderROC')
roc_rf=evaluator.evaluate(gb_predictions)
print('RF_ROC SCORE:',roc_rf )

DataFrame[id: int, Gender: string, Age: int, Driving_License: int, Region_Code: double, Previously_Insured: int, Vehicle_Age: string, Vehicle_Damage: string, Annual_Premium: double, Policy_Sales_Channel: double, Vintage: int, Response: int, Vehicle_Damage_Index: double, Vehicle_Age_Index: double, Vehicle_Damage_OHE: vector, Vehicle_Age_OHE: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]
+----------+--------+--------------------+
|prediction|Response|            features|
+----------+--------+--------------------+
|       0.0|       1|[47.0,1.0,28.0,0....|
|       0.0|       1|[56.0,1.0,28.0,0....|
|       0.0|       0|[76.0,1.0,28.0,0....|
|       0.0|       0|[25.0,1.0,35.0,1....|
|       0.0|       1|[51.0,1.0,28.0,0....|
+----------+--------+--------------------+
only showing top 5 rows

Random Forest classifier Accuracy: 0.8786968031662545
RF_ROC SCORE: 0.8528597869299667


**logistic**

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Response", featuresCol="features")
#Training algo
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Response", "features").show(5)
evaluator = MulticlassClassificationEvaluator(labelCol="Response", predictionCol="prediction", metricName="accuracy")
print('Logistic Regression Accuracy:', multi_evaluator.evaluate(lr_prediction))
evaluator = BinaryClassificationEvaluator(labelCol='Response', metricName='areaUnderROC')
roc_LR=evaluator.evaluate(gb_predictions)
print('LR_ROC SCORE:',roc_LR )

+----------+--------+--------------------+
|prediction|Response|            features|
+----------+--------+--------------------+
|       0.0|       1|[47.0,1.0,28.0,0....|
|       0.0|       1|[56.0,1.0,28.0,0....|
|       0.0|       0|[76.0,1.0,28.0,0....|
|       0.0|       0|[25.0,1.0,35.0,1....|
|       0.0|       1|[51.0,1.0,28.0,0....|
+----------+--------+--------------------+
only showing top 5 rows

Logistic Regression Accuracy: 0.8786968031662545
LR_ROC SCORE: 0.8528597869299667


**Linear SVC**

In [None]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(labelCol="Response", featuresCol="features")
svm_model = svm.fit(trainingData)
svm_prediction = svm_model.transform(testData)
svm_prediction.select("prediction", "Response", "features").show(5)
svm_accuracy = evaluator.evaluate(svm_prediction)
print('Accuracy of Support Vector Machine is:', multi_evaluator.evaluate(svm_prediction))
evaluator = BinaryClassificationEvaluator(labelCol='Response', metricName='areaUnderROC')
roc_LSVC=evaluator.evaluate(gb_predictions)
print('LinearSVC_ROC SCORE:',roc_LSVC )

+----------+--------+--------------------+
|prediction|Response|            features|
+----------+--------+--------------------+
|       0.0|       1|[47.0,1.0,28.0,0....|
|       0.0|       1|[56.0,1.0,28.0,0....|
|       0.0|       0|[76.0,1.0,28.0,0....|
|       0.0|       0|[25.0,1.0,35.0,1....|
|       0.0|       1|[51.0,1.0,28.0,0....|
+----------+--------+--------------------+
only showing top 5 rows

Accuracy of Support Vector Machine is: 0.8786968031662545
LinearSVC_ROC SCORE: 0.8528597869299667


**FM classifier**

In [None]:
from pyspark.ml.classification import FMClassifier
fm = FMClassifier(labelCol="Response", featuresCol="features", stepSize=0.001)
fm_model = fm.fit(trainingData)
fm_prediction = fm_model.transform(testData)
fm_prediction.select("prediction", "Response", "features").show(5)
fm_accuracy = evaluator.evaluate(fm_prediction)
print('Accuracy of Factorization machines classifier is:', multi_evaluator.evaluate(svm_prediction))
evaluator = BinaryClassificationEvaluator(labelCol='Response', metricName='areaUnderROC')
roc_FM=evaluator.evaluate(gb_predictions)
print('FMClassifier_ROC SCORE:',roc_FM )

+----------+--------+--------------------+
|prediction|Response|            features|
+----------+--------+--------------------+
|       0.0|       1|[47.0,1.0,28.0,0....|
|       0.0|       1|[56.0,1.0,28.0,0....|
|       0.0|       0|[76.0,1.0,28.0,0....|
|       0.0|       0|[25.0,1.0,35.0,1....|
|       0.0|       1|[51.0,1.0,28.0,0....|
+----------+--------+--------------------+
only showing top 5 rows

Accuracy of Factorization machines classifier is: 0.8786968031662545
FMClassifier_ROC SCORE: 0.8528597869299667


**Accuracy**

In [None]:
print('Logistic Regression Accuracy:', multi_evaluator.evaluate(lr_prediction))
print('Accuracy of Factorization machines classifier is:', multi_evaluator.evaluate(svm_prediction))
print('Accuracy of Support Vector Machine is:', multi_evaluator.evaluate(svm_prediction))
print('Gradient-boosted Trees Accuracy:', multi_evaluator.evaluate(gb_predictions))
print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions))

Logistic Regression Accuracy: 0.8786968031662545
Accuracy of Factorization machines classifier is: 0.8786968031662545
Accuracy of Support Vector Machine is: 0.8786968031662545
Gradient-boosted Trees Accuracy: 0.8786968031662545
Random Forest classifier Accuracy: 0.8786968031662545


**Roc Score**

In [None]:
print('LR_ROC SCORE:',roc_LR )
print('FMClassifier_ROC SCORE:',roc_FM )
print('LinearSVC_ROC SCORE:',roc_LSVC )
print('RF_ROC SCORE:',roc_rf )
print('GBT_ROC SCORE:',roc_gbt )


LR_ROC SCORE: 0.8528597869299667
FMClassifier_ROC SCORE: 0.8528597869299667
LinearSVC_ROC SCORE: 0.8528597869299667
RF_ROC SCORE: 0.8528597869299667
GBT_ROC SCORE: 0.8528597869299667
