Importing Libariries to work on the project
Setting the Directory

In [1]:
import os
import sys
 
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Loading the file to do  Modelling

In [31]:
df = spark.read.csv('data/insurance.csv',inferSchema=True,header=True)

Dataset Description

age: age of primary beneficiary

sex: insurance contractor gender, female, male

bmi: Body mass index, providing an understanding of body, weights that are relatively high or low relative to height, objective index of body weight (kg / m ^ 2) using the ratio of height to weight, ideally 18.5 to 24.9

children: Number of children covered by health insurance / Number of dependents

smoker: Smoking

region: the beneficiary's residential area in the US, northeast, southeast, southwest, northwest.

charges: Individual medical costs billed by health insurance - Target variable

In [32]:
df.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [33]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



Checking for Null values in the dataset

In [5]:
for col in df.columns:
    print("no. of cells in column", col, "with null values:", df.filter(df[col].isNull()).count())

no. of cells in column age with null values: 0
no. of cells in column sex with null values: 0
no. of cells in column bmi with null values: 0
no. of cells in column children with null values: 0
no. of cells in column smoker with null values: 0
no. of cells in column region with null values: 0
no. of cells in column charges with null values: 0


No null values in the dataset

In [6]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
age,1338,39.20702541106129,14.049960379216147,18,64
sex,1338,,,female,male
bmi,1338,30.663396860986538,6.098186911679012,15.96,53.13
children,1338,1.0949177877429,1.205492739781914,0,5
smoker,1338,,,no,yes
region,1338,,,northeast,southwest
charges,1338,13270.422265141257,12110.011236693992,10043.249,9991.03765


Encoding variables for model

In [35]:
#Label encoder
from pyspark.ml.feature import StringIndexer
indexed = df
for col in ["sex","smoker","region"]:
    stringIndexer = StringIndexer(inputCol=col, outputCol=col+"_encoded")
    indexed = stringIndexer.fit(indexed).transform(indexed)
indexed.show()

+---+------+------+--------+------+---------+-----------+-----------+--------------+--------------+
|age|   sex|   bmi|children|smoker|   region|    charges|sex_encoded|smoker_encoded|region_encoded|
+---+------+------+--------+------+---------+-----------+-----------+--------------+--------------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|        1.0|           1.0|           1.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|        0.0|           0.0|           0.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|        0.0|           0.0|           0.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061|        0.0|           0.0|           2.0|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|        0.0|           0.0|           2.0|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|        1.0|           0.0|           0.0|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|        1.0|           0.0|           0.0|


In [36]:
#One hot encoder
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="region_encoded",outputCol="region_vec",dropLast=True)
encoded = encoder.transform(indexed)
encoded.show()

+---+------+------+--------+------+---------+-----------+-----------+--------------+--------------+-------------+
|age|   sex|   bmi|children|smoker|   region|    charges|sex_encoded|smoker_encoded|region_encoded|   region_vec|
+---+------+------+--------+------+---------+-----------+-----------+--------------+--------------+-------------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|        1.0|           1.0|           1.0|(3,[1],[1.0])|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|        0.0|           0.0|           0.0|(3,[0],[1.0])|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|        0.0|           0.0|           0.0|(3,[0],[1.0])|
| 33|  male|22.705|       0|    no|northwest|21984.47061|        0.0|           0.0|           2.0|(3,[2],[1.0])|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|        0.0|           0.0|           2.0|(3,[2],[1.0])|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|        1.0|           0.0|    

Extracting the X and Y variables

In [38]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["age","sex_encoded","bmi","children","smoker_encoded","region_vec"],
                            outputCol="features")
feature_vec=assembler.transform(encoded).select('features','charges')
feature_vec.columns

['features', 'charges']

In [39]:
feature_vec.select("features")

DataFrame[features: vector]

###### Split the data into train and test sets


In [40]:
# Split the data into train and test sets
train_data, test_data = feature_vec.randomSplit([.75,.25],seed=0)

In [47]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol = 'features',labelCol='charges',seed=0)

In [61]:
from pyspark.ml.tuning import ParamGridBuilder
import numpy as np
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 31, num = 3)]) \
    .addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 16, num = 3)]) \
    .build()

In [65]:
evaluator = RegressionEvaluator( labelCol='charges', metricName='r2')

In [66]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import   RegressionEvaluator
cv = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)



In [67]:

cvModel = cv.fit(train_data)

In [68]:
#Best Model Params
score_params_list = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
max(score_params_list,key=lambda item:item[0])

(0.8479122579306009,
 {Param(parent='RandomForestRegressor_4545b460e98f24f1afa3', name='numTrees', doc='Number of trees to train (>= 1).'): 20,
  Param(parent='RandomForestRegressor_4545b460e98f24f1afa3', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 16})

In [71]:
predictions = cvModel.transform(test_data)
evaluator.evaluate(predictions) 

0.8145303265904511

From the Above Randonm Forest model we can see that R2  using pyspark is 81.45 and R2 for same model using Sklearn is 82.66 for the dataset 