# BDDA - Question 2
## Insurance Dataset

### Hadoop Commands
### 1. Start Hadoop
#### ./allstart.sh
### 2. Copy File From Local to Hadoop
#### hadoop fs -copyFromLocal insurance.csv
### 3. Open Jupyter Notebook
#### pysparknb

### Import Libraries

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Insurance').getOrCreate()

In [4]:
import numpy as np

from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml import Pipeline

In [5]:
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import time

### Import Data

In [6]:
df = spark.read.csv('insurance.csv' , header=True , inferSchema=True)

### Data Exploration

In [7]:
df.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [8]:
df.columns

['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges']

In [9]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [10]:
df.head()

Row(age=19, sex='female', bmi=27.9, children=0, smoker='yes', region='southwest', charges=16884.924)

In [11]:
df.tail(2)

[Row(age=21, sex='female', bmi=25.8, children=0, smoker='no', region='southwest', charges=2007.945),
 Row(age=61, sex='female', bmi=29.07, children=0, smoker='yes', region='northwest', charges=29141.3603)]

In [12]:
df.describe().show()

+-------+------------------+------+------------------+-----------------+------+---------+------------------+
|summary|               age|   sex|               bmi|         children|smoker|   region|           charges|
+-------+------------------+------+------------------+-----------------+------+---------+------------------+
|  count|              1338|  1338|              1338|             1338|  1338|     1338|              1338|
|   mean| 39.20702541106129|  null|30.663396860986538|  1.0949177877429|  null|     null|13270.422265141257|
| stddev|14.049960379216147|  null| 6.098186911679012|1.205492739781914|  null|     null|12110.011236693992|
|    min|                18|female|             15.96|                0|    no|northeast|         1121.8739|
|    max|                64|  male|             53.13|                5|   yes|southwest|       63770.42801|
+-------+------------------+------+------------------+-----------------+------+---------+------------------+



### Feature Engineering

In [17]:
# Being that sex, smoker, region and region are strings and regression algorithms requires numbers
# We replace the strings with numerical values using StringIndexers
# This is esentially encoding

index = [StringIndexer(inputCol="sex" , outputCol="sex_indexed"),
        StringIndexer(inputCol="smoker" , outputCol="smoker_indexed"),
        StringIndexer(inputCol="region" , outputCol="region_indexed") 
        ]

In [23]:
# Initialise the pipeline which will run the StringIndexers and updated columns will be stored to df_new
pipe = Pipeline(stages=index)
df_new = pipe.fit(df).transform(df)

In [24]:
df_new.show()

+---+------+------+--------+------+---------+-----------+-----------+--------------+--------------+
|age|   sex|   bmi|children|smoker|   region|    charges|sex_indexed|smoker_indexed|region_indexed|
+---+------+------+--------+------+---------+-----------+-----------+--------------+--------------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|        1.0|           1.0|           2.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|        0.0|           0.0|           0.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|        0.0|           0.0|           0.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061|        0.0|           0.0|           1.0|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|        0.0|           0.0|           1.0|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|        1.0|           0.0|           0.0|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|        1.0|           0.0|           0.0|


In [25]:
df_new.columns

['age',
 'sex',
 'bmi',
 'children',
 'smoker',
 'region',
 'charges',
 'sex_indexed',
 'smoker_indexed',
 'region_indexed']

In [26]:
df_new.head()

Row(age=19, sex='female', bmi=27.9, children=0, smoker='yes', region='southwest', charges=16884.924, sex_indexed=1.0, smoker_indexed=1.0, region_indexed=2.0)

In [27]:
df_new.tail(2)

[Row(age=21, sex='female', bmi=25.8, children=0, smoker='no', region='southwest', charges=2007.945, sex_indexed=1.0, smoker_indexed=0.0, region_indexed=2.0),
 Row(age=61, sex='female', bmi=29.07, children=0, smoker='yes', region='northwest', charges=29141.3603, sex_indexed=1.0, smoker_indexed=1.0, region_indexed=1.0)]

In [30]:
df_new.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)
 |-- sex_indexed: double (nullable = false)
 |-- smoker_indexed: double (nullable = false)
 |-- region_indexed: double (nullable = false)



In [31]:
### Combine features together using vector assembler
### Here we combine Age, BMI, Sex & Smoker and save the resultant vector into the compiled_features column

assemble = VectorAssembler(inputCols=["age" , "bmi" , "sex_indexed" , "smoker_indexed"] , outputCol="compiled_features")

In [33]:
### Apply VectorAssembler and transform the data

comp = assemble.transform(df_new)

In [34]:
### Create a new dataframe with the compiled features and the charges

updated_df = comp.select("compiled_features" , "charges")

### Split the updated dataset into Train & Test

In [35]:
train_df, test_df = updated_df.randomSplit([0.7,0.3])

### Model - Linear Regression

In [37]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="compiled_features" , labelCol="charges")

In [38]:
regressor = lr.fit(train_df)

In [39]:
predict = regressor.evaluate(test_df)

In [40]:
###  Evaluation of the Model

from pyspark.ml.evaluation import RegressionEvaluator

In [41]:
evaluated_result = RegressionEvaluator(labelCol="charges" , predictionCol="predict")

In [45]:
predict.predictions.show(10)

+--------------------+-----------+------------------+
|   compiled_features|    charges|        prediction|
+--------------------+-----------+------------------+
|[18.0,27.28,1.0,1.0]| 18223.4512|26265.840463866676|
| [18.0,28.5,0.0,0.0]|   1712.227|1901.3848606212432|
| [18.0,30.4,0.0,0.0]|   3481.868|2526.9150313715127|
|[18.0,33.155,1.0,...| 2207.69745|3737.3047941785844|
|[18.0,33.77,0.0,0.0]|  1725.5523|3636.4080184391005|
|[18.0,36.85,1.0,1.0]| 36149.4835|29416.537166014095|
|[18.0,37.29,1.0,0.0]|  2219.4451|5098.6559815745695|
|[18.0,38.28,1.0,0.0]|14133.03775| 5424.590123176027|
|[18.0,39.16,1.0,0.0]|  1633.0444| 5714.309360155099|
|[18.0,41.14,0.0,0.0]|  1146.7966|6062.8066281388365|
+--------------------+-----------+------------------+
only showing top 10 rows



In [49]:
print("Coefficients: %s" % str(regressor.coefficients))
print("Intercepts: %s" % str(regressor.intercept))

Coefficients: [267.28964290098287,329.2264056580375,303.37101521917685,24462.74080292906]
Intercepts: -12292.781272850518


In [51]:
trainingSummary = regressor.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RSME: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

numIterations: 1
objectiveHistory: [0.0]
+-------------------+
|          residuals|
+-------------------+
| 3921.9106663305474|
| -9844.042556123706|
| 1941.0898117830516|
| 2115.5320711547615|
|-10333.068190311815|
|-10132.128664068507|
|  12195.06516540077|
| 1617.2207345545444|
| 1586.0762760170303|
|  962.0967253097765|
| 1515.0347206873907|
| 1368.0650662544358|
| 1448.2296731115266|
| 1117.6716315100707|
| -9751.267614737333|
|   807.464812579191|
|  589.4536028165965|
| 1166.4226005054052|
|  712.7936705223924|
| -6.258437825692681|
+-------------------+
only showing top 20 rows

RSME: 6190.550997
r2: 0.754463


### RSME = 6190.5509
### R2 = 0.754