### BDDA Assignment: Divik Mathur - 015031

## Prediction of Medical Cost using insurance.csv dataset

### Command to copy csv file from local system to hdfs 

hdfs dfs -copyFromLocal /home/ashok/insurance.csv

### Project Details

Business Problem - To predict the medical expenses using the personal details of the patient such as age, sex, bmi, children, smoker, region and charges.

Age - Age of primary beneficiary
Sex - Gender (male/female)
bmi - Body mass index
Children - Number of children
Smoker - Smoking
Region - Beneficiary's residential area (northeast, southeast, southwest, northwest)
Charges - Individual medical costs billed by health insurance

### Importing necessary libraries and reading the csv file

In [1]:
#Importing libraries

import numpy as np

from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml import Pipeline

In [11]:
from pyspark.ml.regression import LinearRegression

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import time

In [5]:
#Reading the file

df=spark.read.csv("insurance.csv", sep=",", header=True, inferSchema=True)

### About the data

In [6]:
df.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [7]:
df.columns

['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges']

In [8]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [9]:
df.describe().show()

+-------+------------------+------+------------------+-----------------+------+---------+------------------+
|summary|               age|   sex|               bmi|         children|smoker|   region|           charges|
+-------+------------------+------+------------------+-----------------+------+---------+------------------+
|  count|              1338|  1338|              1338|             1338|  1338|     1338|              1338|
|   mean| 39.20702541106129|  null|30.663396860986538|  1.0949177877429|  null|     null|13270.422265141257|
| stddev|14.049960379216147|  null| 6.098186911679012|1.205492739781914|  null|     null|12110.011236693992|
|    min|                18|female|             15.96|                0|    no|northeast|         1121.8739|
|    max|                64|  male|             53.13|                5|   yes|southwest|       63770.42801|
+-------+------------------+------+------------------+-----------------+------+---------+------------------+



If we look at the data, it contains both the categorical and numerical columns. We are going to leave the numerical features alone and only change the categorical columns.

### Indexing categorical columns and creating Pipeline

In [13]:
from pyspark.ml.feature import StringIndexer

In [15]:
indexers = [StringIndexer(inputCol=column, outputCol=column+'_index').fit(df) for column in list(set(df.columns)-set(['bmi, children, charges']))]

In [18]:
#Creating pipeline

pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)
df_r.show()

+---+------+------+--------+------+---------+-----------+---------+--------------+------------+-------------+---------+---------+------------+
|age|   sex|   bmi|children|smoker|   region|    charges|bmi_index|children_index|region_index|charges_index|age_index|sex_index|smoker_index|
+---+------+------+--------+------+---------+-----------+---------+--------------+------------+-------------+---------+---------+------------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|    412.0|           0.0|         2.0|        340.0|      1.0|      1.0|         1.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|    283.0|           1.0|         0.0|        358.0|      0.0|      0.0|         0.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|     32.0|           3.0|         0.0|        891.0|     17.0|      0.0|         0.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061|    130.0|           0.0|         1.0|        500.0|     30.0|      0.0|         0.0|

The resulting dataframe now contains both the categorical columns and the ones we have indexed. The Linear regression in Spark accepts a dataframe of a feature column and label column. Next we transform our dataset into a dataframe of features and labels. Our features column will contain all the features we are going to use and our label column will be the 'charges'.

### VectorAssembler

In [17]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.linalg import Vectors

In [30]:
assembler = VectorAssembler(
    inputCols = ['age', 'sex_index', 'bmi', 'children', 'smoker_index', 'region_index'],
    outputCol = 'features')
output = assembler.transform(df_r)
df_cleaned = output.select('features', 'charges')
df_cleaned.show(5)

+--------------------+-----------+
|            features|    charges|
+--------------------+-----------+
|[19.0,1.0,27.9,0....|  16884.924|
|[18.0,0.0,33.77,1...|  1725.5523|
|[28.0,0.0,33.0,3....|   4449.462|
|[33.0,0.0,22.705,...|21984.47061|
|[32.0,0.0,28.88,0...|  3866.8552|
+--------------------+-----------+
only showing top 5 rows



Let's define the model and then fit our data.

### Linear Regression 

In [31]:
#Defining and fitting the model

from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(maxIter=5, regParam=0.0, labelCol='charges', solver='normal')
mymodel = lr.fit(df_cleaned)
predictions = mymodel.transform(df_cleaned)
predictions.show()

+--------------------+-----------+------------------+
|            features|    charges|        prediction|
+--------------------+-----------+------------------+
|[19.0,1.0,27.9,0....|  16884.924| 25802.49362767054|
|[18.0,0.0,33.77,1...|  1725.5523| 3501.911813837087|
|[28.0,0.0,33.0,3....|   4449.462| 6765.039296791185|
|[33.0,0.0,22.705,...|21984.47061|3452.5446916117853|
|[32.0,0.0,28.88,0...|  3866.8552| 5246.678399141309|
|[31.0,1.0,25.74,0...|  3756.6216|3833.1199178268707|
|[46.0,1.0,33.44,1...|  8240.5896|10721.234107062717|
|[37.0,1.0,27.74,3...|  7281.5056| 7706.011707635729|
|[37.0,0.0,29.83,2...|  6406.4107| 8283.617482287782|
|[60.0,1.0,25.84,0...|28923.13692|11565.789119552786|
|[25.0,0.0,26.22,0...|  2721.3208|3051.3627231821047|
|[62.0,1.0,26.29,0...| 27808.7251|35834.504770083484|
|[23.0,0.0,34.4,0....|   1826.843| 5010.487957030264|
|[56.0,1.0,39.82,0...| 11090.7178| 14937.39364966564|
|[27.0,0.0,42.13,0...| 39611.7577|31967.711145230838|
|[19.0,0.0,24.6,1....|   183

In [32]:
#Evaluating the model using rmse value

evaluator = RegressionEvaluator(labelCol='charges')
rmse = evaluator.evaluate(predictions,{evaluator.metricName:'rmse'})

np.sqrt(rmse), rmse

(77.78257121402335, 6050.128384664614)