### The goal of this notebook is to predict whether a customer would be interested in Vehicle Insurance 

##### The data is as follows:

* `id` — Unique ID for the customer
* `Gender` — Gender of the customer
* `Age` —  Age of the customer
* `Driving_License` — 0 : Customer does not have DL, 1 : Customer already has DL
* `Region_Code` — Unique code for the region of the customer
* `Previously_Insured` — 1 : Customer already has Vehicle Insurance, 0 : Customer doesn't have Vehicle Insurance
* `Vehicle_Age` — Age of the Vehicle
* `Vehicle_Damage` — 1 : Customer got his/her vehicle damaged in the past. 0 : Customer didn't get his/her vehicle damaged in the past.
* `Annual_Premium` — The amount customer needs to pay as premium in the year
* `PolicySalesChannel` — Anonymized Code for the channel of outreaching to the customer ie. Different Agents, Over Mail, Over Phone, In Person, etc.
* `Vintage` — Number of Days, Customer has been associated with the company
* `Response` — 1 : Customer is interested, 0 : Customer is not interested


In [54]:
## set up  SparkSession
##  A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, 
## cache tables, and read parquet files. 

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Distributed big data project / PySpark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Set up SparkSession  [doc](https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession)

In [55]:
spark

### Loading Data

In [56]:
## loading the data 
df = spark.read.load("/home/charles/M1/distribured-bigdata-system/project/data/train.csv",
                     format="csv",
                     sep=",", 
                     inferSchema="true",
                     header="true")
df.show(5)
df.printSchema()

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
|  1|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|
|  2|  Male| 76|              1|        3.0|                 0|   1-2 Year|            No|       33536.0|                26.0|    183|       0|
|  3|  Male| 47|              1|       28.0|                 0|  > 2 Years|           Yes|       38294.0|                26.0|     27|       1|
|  4|  Male| 21|              1|       11.0|                 1|   < 1 Year|            No|       28619.0|               152.0|    203|  

### Data exploration

In [57]:
df = df.dropna(how='any') # first let's drop the NA value

In [58]:
# The column id is not revelant  is just the identifer user. Let's drop it. 

df = df.select('Gender',
               'Age',
               'Driving_License',
               'Region_Code',
               'Previously_Insured',
               'Vehicle_Age',
               'Vehicle_Damage',
               'Annual_Premium',
               'Vintage',
               'Response')

cols = df.columns
df.printSchema()

root
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Driving_License: integer (nullable = true)
 |-- Region_Code: double (nullable = true)
 |-- Previously_Insured: integer (nullable = true)
 |-- Vehicle_Age: string (nullable = true)
 |-- Vehicle_Damage: string (nullable = true)
 |-- Annual_Premium: double (nullable = true)
 |-- Vintage: integer (nullable = true)
 |-- Response: integer (nullable = true)



##### Perform descriptive analytics

In [59]:
df.select('Age',
          'Annual_Premium',
          'Vintage').describe().show()

+-------+------------------+------------------+------------------+
|summary|               Age|    Annual_Premium|           Vintage|
+-------+------------------+------------------+------------------+
|  count|            381109|            381109|            381109|
|   mean|38.822583565331705|30564.389581458323|154.34739667654136|
| stddev|15.511611018095321|17213.155056980126|  83.6713036265871|
|    min|                20|            2630.0|                10|
|    max|                85|          540165.0|               299|
+-------+------------------+------------------+------------------+



In [60]:
df.select('Gender',
          'Driving_License',
          'Region_Code',
          'Previously_Insured',
          'Vehicle_Age',
          'Vehicle_Damage'
         ).describe().show()

+-------+------+-------------------+------------------+------------------+-----------+--------------+
|summary|Gender|    Driving_License|       Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|
+-------+------+-------------------+------------------+------------------+-----------+--------------+
|  count|381109|             381109|            381109|            381109|     381109|        381109|
|   mean|  null| 0.9978693759528114|26.388807401557035|0.4582101183650871|       null|          null|
| stddev|  null|0.04610954420779957|13.229888025788474| 0.498251198887226|       null|          null|
|    min|Female|                  0|               0.0|                 0|   1-2 Year|            No|
|    max|  Male|                  1|              52.0|                 1|  > 2 Years|           Yes|
+-------+------+-------------------+------------------+------------------+-----------+--------------+



### Extracting, transforming and selecting features [doc](https://spark.apache.org/docs/latest/ml-features.html)

In [61]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# categoricalColumns contain all the feature encode as string
categoricalColumns = ['Gender',
                      'Driving_License',
                      'Region_Code',
                      'Previously_Insured',
                      'Vehicle_Age',
                      'Vehicle_Damage',
                     ]

stages = [] # empty list

for categoricalCol in categoricalColumns :
    
    # StringIndexer encodes a string column of labels to a column of label indices
    # If the input column is numeric, StringIndexer cast it to string and index the string values
    stringIndexer = StringIndexer(
        inputCol = categoricalCol, 
        outputCol = categoricalCol + '_Index'
    ) # indexer
    
    # One-hot encoding maps a categorical feature, represented as a label index, to a binary vector with at most a 
    # single one-value indicating the presence of a specific feature value from among the set of all feature values.
    encoder = OneHotEncoder(
        inputCols=[stringIndexer.getOutputCol()],
        outputCols=[categoricalCol + "_classVec"]
    )
    
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'Response', outputCol = 'label') 

stages += [label_stringIdx]

In [62]:
numericCols = ['Age','Annual_Premium','Vintage']

assemblerInputs = [c + "_classVec" for c in categoricalColumns] + numericCols

# VectorAssembler is a transformer that combines a given list of columns into a single vector column.
# VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type.
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol = 'features')

stages += [assembler]

### Pipeline [doc](https://spark.apache.org/docs/latest/ml-pipeline.html)

In [63]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = stages)

pipelineModel = pipeline.fit(df)

df = pipelineModel.transform(df)

selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Driving_License: integer (nullable = true)
 |-- Region_Code: double (nullable = true)
 |-- Previously_Insured: integer (nullable = true)
 |-- Vehicle_Age: string (nullable = true)
 |-- Vehicle_Damage: string (nullable = true)
 |-- Annual_Premium: double (nullable = true)
 |-- Vintage: integer (nullable = true)
 |-- Response: integer (nullable = true)



In [64]:
df.show(5)

+-----+--------------------+------+---+---------------+-----------+------------------+-----------+--------------+--------------+-------+--------+
|label|            features|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Vintage|Response|
+-----+--------------------+------+---+---------------+-----------+------------------+-----------+--------------+--------------+-------+--------+
|  1.0|(61,[0,1,2,54,57,...|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|    217|       1|
|  0.0|(61,[0,1,10,54,55...|  Male| 76|              1|        3.0|                 0|   1-2 Year|            No|       33536.0|    183|       0|
|  1.0|(61,[0,1,2,54,57,...|  Male| 47|              1|       28.0|                 0|  > 2 Years|           Yes|       38294.0|     27|       1|
|  0.0|(61,[0,1,11,56,58...|  Male| 21|              1|       11.0|                 1|   < 1 Year|            No|       2861

In [65]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)

### Classification 

#### Decision Tree Classifier  [doc](https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier)

In [66]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 6)

dtModel = dt.fit(train)

predictions = dtModel.transform(test)

predictions.select('Age', 'Previously_Insured', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+---+------------------+-----+----------------+----------+--------------------+
|Age|Previously_Insured|label|   rawPrediction|prediction|         probability|
+---+------------------+-----+----------------+----------+--------------------+
| 20|                 0|  0.0|   [474.0,102.0]|       0.0|[0.82291666666666...|
| 21|                 0|  0.0|   [474.0,102.0]|       0.0|[0.82291666666666...|
| 21|                 0|  0.0|   [474.0,102.0]|       0.0|[0.82291666666666...|
| 21|                 0|  0.0|   [474.0,102.0]|       0.0|[0.82291666666666...|
| 21|                 0|  0.0|   [474.0,102.0]|       0.0|[0.82291666666666...|
| 21|                 0|  0.0|   [474.0,102.0]|       0.0|[0.82291666666666...|
| 21|                 0|  0.0|   [474.0,102.0]|       0.0|[0.82291666666666...|
| 22|                 0|  0.0|[16904.0,2816.0]|       0.0|[0.85720081135902...|
| 22|                 0|  0.0|[16904.0,2816.0]|       0.0|[0.85720081135902...|
| 22|                 0|  0.0|[16904.0,2

In [67]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [68]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.8258442149484794


### Optimisation using Cross-Validation [doc](https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation)

In [69]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.impurity, ['gini','entropy'])
             .addGrid(dt.maxDepth, [5, 6, 10])
             .addGrid(dt.minInfoGain, [0.0, 0.2, 0.3])
             .addGrid(dt.minInstancesPerNode, [1, 2])
            ).build()


cv = CrossValidator(estimator=dt,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=5)

# Run cross-validation, and choose the best set of parameters.
cvmodel = cv.fit(train)

# Make predictions on test data. cvModel uses the best model found.
pred = cvmodel.transform(test)

print("Test Area Under ROC: " + str(evaluator.evaluate(pred, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.8288756236113918


### Random Forest Classifier [doc](https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier)

In [70]:
from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

rf_model = rf.fit(train)

# Make predictions.
rf_predictions = rf_model.transform(test)

# Select example rows to display.
rf_predictions.select('Age', 'Previously_Insured', 'label', 'rawPrediction', 'prediction', 'probability').show(30)


+---+------------------+-----+--------------------+----------+--------------------+
|Age|Previously_Insured|label|       rawPrediction|prediction|         probability|
+---+------------------+-----+--------------------+----------+--------------------+
| 20|                 0|  0.0|[87.1189224578159...|       0.0|[0.87118922457815...|
| 21|                 0|  0.0|[87.1189224578159...|       0.0|[0.87118922457815...|
| 21|                 0|  0.0|[87.1189224578159...|       0.0|[0.87118922457815...|
| 21|                 0|  0.0|[87.1189224578159...|       0.0|[0.87118922457815...|
| 21|                 0|  0.0|[87.1189224578159...|       0.0|[0.87118922457815...|
| 21|                 0|  0.0|[86.8677847545440...|       0.0|[0.86867784754544...|
| 21|                 0|  0.0|[86.8677847545440...|       0.0|[0.86867784754544...|
| 22|                 0|  0.0|[87.1189224578159...|       0.0|[0.87118922457815...|
| 22|                 0|  0.0|[87.1189224578159...|       0.0|[0.87118922457

In [71]:
print("Test Area Under ROC: " + str(evaluator.evaluate(rf_predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.8029106459018723


### Optimisation using cross validation 

In [72]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.impurity, ['gini','entropy'])
             .addGrid(rf.maxDepth, [5, 10, 15])
             .addGrid(rf.numTrees, [20, 100, 200])
            ).build()


cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=rf_evaluator, numFolds=5)
cvmodel = cv.fit(train)
pred = cvmodel.transform(test)

print("Test Area Under ROC: " + str(evaluator.evaluate(pred, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.8488420995052295
