In [None]:
!pip install pyspark

In [1]:
#create spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark").getOrCreate()

In [2]:
# Read the file

# I have uploaded the csv file in same location as this notebook. You can also specify the path of this file
file_location = "medical_insurance.csv"

df= spark.read.csv(file_location,header=True,inferSchema=True)
df.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [3]:
# convert categorical in numerical features

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=['sex','smoker','region'],outputCols=['sex_n','smoker_n','region_n'])
df= indexer.fit(df).transform(df)
df.show()

+---+------+------+--------+------+---------+-----------+-----+--------+--------+
|age|   sex|   bmi|children|smoker|   region|    charges|sex_n|smoker_n|region_n|
+---+------+------+--------+------+---------+-----------+-----+--------+--------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|  1.0|     1.0|     1.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|  0.0|     0.0|     0.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|  0.0|     0.0|     0.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061|  0.0|     0.0|     2.0|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|  0.0|     0.0|     2.0|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|  1.0|     0.0|     0.0|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|  1.0|     0.0|     0.0|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|  1.0|     0.0|     2.0|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|  0.0|     0.0|     3.0|
| 60|female| 25.

In [4]:
#creating new dataframe with required columns

df_r = df.select('charges','age','bmi','children','sex_n','smoker_n','region_n')
df_r.show()

+-----------+---+------+--------+-----+--------+--------+
|    charges|age|   bmi|children|sex_n|smoker_n|region_n|
+-----------+---+------+--------+-----+--------+--------+
|  16884.924| 19|  27.9|       0|  1.0|     1.0|     1.0|
|  1725.5523| 18| 33.77|       1|  0.0|     0.0|     0.0|
|   4449.462| 28|  33.0|       3|  0.0|     0.0|     0.0|
|21984.47061| 33|22.705|       0|  0.0|     0.0|     2.0|
|  3866.8552| 32| 28.88|       0|  0.0|     0.0|     2.0|
|  3756.6216| 31| 25.74|       0|  1.0|     0.0|     0.0|
|  8240.5896| 46| 33.44|       1|  1.0|     0.0|     0.0|
|  7281.5056| 37| 27.74|       3|  1.0|     0.0|     2.0|
|  6406.4107| 37| 29.83|       2|  0.0|     0.0|     3.0|
|28923.13692| 60| 25.84|       0|  1.0|     0.0|     2.0|
|  2721.3208| 25| 26.22|       0|  0.0|     0.0|     3.0|
| 27808.7251| 62| 26.29|       0|  1.0|     1.0|     0.0|
|   1826.843| 23|  34.4|       0|  0.0|     0.0|     1.0|
| 11090.7178| 56| 39.82|       0|  1.0|     0.0|     0.0|
| 39611.7577| 

In [5]:
# handle missing values using mean
# other methods can also be used such as fillna()

from pyspark.ml.feature import Imputer

imputer = Imputer(strategy='mean', inputCols=df_r.columns, outputCols=df_r.columns)
model = imputer.fit(df_r)
df_r = model.transform(df_r)

In [6]:
# Grouping independent variables

from pyspark.ml.feature import VectorAssembler
features = VectorAssembler(inputCols=['age','bmi','children','sex_n','smoker_n','region_n'],outputCol='Independent Features')
features = features.transform(df_r)
features.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|[19.0,27.9,0.0,1....|
|[18.0,33.77,1.0,0...|
|[28.0,33.0,3.0,0....|
|[33.0,22.705,0.0,...|
|[32.0,28.88,0.0,0...|
|[31.0,25.74,0.0,1...|
|[46.0,33.44,1.0,1...|
|[37.0,27.74,3.0,1...|
|[37.0,29.83,2.0,0...|
|[60.0,25.84,0.0,1...|
|[25.0,26.22,0.0,0...|
|[62.0,26.29,0.0,1...|
|[23.0,34.4,0.0,0....|
|[56.0,39.82,0.0,1...|
|[27.0,42.13,0.0,0...|
|[19.0,24.6,1.0,0....|
|[52.0,30.78,1.0,1...|
|[23.0,23.845,0.0,...|
|[56.0,40.3,0.0,0....|
|[30.0,35.3,0.0,0....|
+--------------------+
only showing top 20 rows



In [7]:
final_data=features.select('Independent Features','charges')
final_data.show()

+--------------------+-----------+
|Independent Features|    charges|
+--------------------+-----------+
|[19.0,27.9,0.0,1....|  16884.924|
|[18.0,33.77,1.0,0...|  1725.5523|
|[28.0,33.0,3.0,0....|   4449.462|
|[33.0,22.705,0.0,...|21984.47061|
|[32.0,28.88,0.0,0...|  3866.8552|
|[31.0,25.74,0.0,1...|  3756.6216|
|[46.0,33.44,1.0,1...|  8240.5896|
|[37.0,27.74,3.0,1...|  7281.5056|
|[37.0,29.83,2.0,0...|  6406.4107|
|[60.0,25.84,0.0,1...|28923.13692|
|[25.0,26.22,0.0,0...|  2721.3208|
|[62.0,26.29,0.0,1...| 27808.7251|
|[23.0,34.4,0.0,0....|   1826.843|
|[56.0,39.82,0.0,1...| 11090.7178|
|[27.0,42.13,0.0,0...| 39611.7577|
|[19.0,24.6,1.0,0....|   1837.237|
|[52.0,30.78,1.0,1...| 10797.3362|
|[23.0,23.845,0.0,...| 2395.17155|
|[56.0,40.3,0.0,0....|  10602.385|
|[30.0,35.3,0.0,0....|  36837.467|
+--------------------+-----------+
only showing top 20 rows



In [8]:
# using different regression to predict the charges

from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor

train_data1,test_data1 = final_data.randomSplit([0.7,0.3])
reg1 = LinearRegression(featuresCol='Independent Features',labelCol='charges')
reg1 = reg1.fit(train_data1)

train_data2,test_data2 = final_data.randomSplit([0.8,0.2])
reg2 = DecisionTreeRegressor(featuresCol='Independent Features',labelCol='charges')
reg2 = reg2.fit(train_data2)

train_data3,test_data3 = final_data.randomSplit([0.75,0.25])
reg3 = RandomForestRegressor(featuresCol='Independent Features',labelCol='charges')
reg3 = reg3.fit(train_data3)

train_data4,test_data4 = final_data.randomSplit([0.7,0.3])
reg4 = GBTRegressor(featuresCol='Independent Features',labelCol='charges')
reg4 = reg4.fit(train_data4)

In [9]:
prediction1= reg1.transform(test_data1)
prediction2= reg2.transform(test_data2)
prediction3= reg3.transform(test_data3)
prediction4= reg4.transform(test_data4)

In [10]:
prediction1.show()

+--------------------+-----------+-------------------+
|Independent Features|    charges|         prediction|
+--------------------+-----------+-------------------+
|(6,[0,1],[18.0,23...|  1121.8739|-330.02259335166855|
|(6,[0,1],[18.0,30...|  1131.5066| 1831.9407667152955|
|(6,[0,1],[18.0,33...|  1135.9407| 2827.1302499207213|
|(6,[0,1],[18.0,33...|  1135.9407| 2827.1302499207213|
|(6,[0,1],[18.0,33...|  1135.9407| 2827.1302499207213|
|(6,[0,1],[18.0,34...|   1137.011|  3067.348401039275|
|(6,[0,1],[18.0,37...|  1141.4451| 4062.5378842447008|
|(6,[0,1],[18.0,43...|  1149.3959|  5847.015578268229|
|(6,[0,1],[21.0,31...|16586.49771| 2869.2599085564234|
|(6,[0,1],[21.0,35...|  1532.4697|   4276.25193653651|
|(6,[0,1],[23.0,26...|  1815.8759|  1970.790621903905|
|(6,[0,1],[23.0,41...|  1837.2819|  6775.153644274937|
|(6,[0,1],[24.0,32...|  1981.5819|  3940.895929128774|
|(6,[0,1],[24.0,35...|  1986.9334|  5141.986684721529|
|(6,[0,1],[25.0,25...|  2137.6536|  2239.095212112923|
|(6,[0,1],

In [11]:
prediction2.show()

+--------------------+-----------+------------------+
|Independent Features|    charges|        prediction|
+--------------------+-----------+------------------+
|(6,[0,1],[18.0,23...|  1121.8739|2244.0867916878287|
|(6,[0,1],[18.0,33...|  1135.9407|2244.0867916878287|
|(6,[0,1],[18.0,33...|  1136.3994|2244.0867916878287|
|(6,[0,1],[18.0,34...|   1137.011|2244.0867916878287|
|(6,[0,1],[21.0,23...|  1515.3449|2244.0867916878287|
|(6,[0,1],[21.0,31...|16586.49771|2244.0867916878287|
|(6,[0,1],[21.0,35...|  1532.4697|2244.0867916878287|
|(6,[0,1],[23.0,26...|  1815.8759| 4159.283093898305|
|(6,[0,1],[23.0,32...|  1824.2854| 4159.283093898305|
|(6,[0,1],[23.0,41...|  1837.2819| 4159.283093898305|
|(6,[0,1],[26.0,35...|  2322.6218| 4159.283093898305|
|(6,[0,1],[27.0,23...|   2483.736| 4159.283093898305|
|(6,[0,1],[27.0,32...|  2497.0383| 4159.283093898305|
|(6,[0,1],[27.0,33...|  2498.4144| 4159.283093898305|
|(6,[0,1],[27.0,33...|  2498.4144| 4159.283093898305|
|(6,[0,1],[29.0,27...|  2867

In [12]:
prediction3.show()

+--------------------+---------+------------------+
|Independent Features|  charges|        prediction|
+--------------------+---------+------------------+
|(6,[0,1],[18.0,23...|1121.8739|4194.5605486038985|
|(6,[0,1],[18.0,33...|1136.3994| 4275.955495224611|
|(6,[0,1],[18.0,34...| 1137.011| 4275.955495224611|
|(6,[0,1],[18.0,53...|1163.4627| 4046.029042507579|
|(6,[0,1],[21.0,23...|1515.3449|4194.5605486038985|
|(6,[0,1],[21.0,35...|1532.4697|  4405.32764687757|
|(6,[0,1],[22.0,33...|1674.6323| 4789.059006211372|
|(6,[0,1],[23.0,26...|1815.8759| 5454.963199831549|
|(6,[0,1],[23.0,41...|1837.2819|4678.9854130993945|
|(6,[0,1],[24.0,32...|1981.5819|  5306.79277452311|
|(6,[0,1],[24.0,35...|1986.9334| 5204.409513170433|
|(6,[0,1],[24.0,35...|1986.9334| 5204.409513170433|
|(6,[0,1],[27.0,32...|2497.0383| 5963.233564952707|
|(6,[0,1],[28.0,38...|2689.4954| 5655.646929697834|
|(6,[0,1],[29.0,27...|2867.1196| 6293.394055818048|
|(6,[0,1],[29.0,27...|2867.1196| 6293.394055818048|
|(6,[0,1],[3

In [13]:
prediction4.show()

+--------------------+-----------+------------------+
|Independent Features|    charges|        prediction|
+--------------------+-----------+------------------+
|(6,[0,1],[18.0,23...|  1121.8739| 3591.357079092081|
|(6,[0,1],[18.0,23...|  1121.8739| 3591.357079092081|
|(6,[0,1],[18.0,30...|  1131.5066| 2954.810251888125|
|(6,[0,1],[18.0,33...|  1135.9407| 2627.391181942676|
|(6,[0,1],[18.0,33...|  1136.3994|2800.3069443445006|
|(6,[0,1],[18.0,33...|  1136.3994|2800.3069443445006|
|(6,[0,1],[18.0,37...|  1141.4451|1846.1374119077925|
|(6,[0,1],[18.0,43...|  1149.3959| 1140.298539008653|
|(6,[0,1],[18.0,53...|  1163.4627| 1140.298539008653|
|(6,[0,1],[21.0,31...|16586.49771|2788.6360426399474|
|(6,[0,1],[21.0,35...|  1532.4697|1993.5980935096966|
|(6,[0,1],[21.0,36...|  1534.3045|1993.5980935096966|
|(6,[0,1],[22.0,33...|  1674.6323|2831.4629705226153|
|(6,[0,1],[25.0,25...|  2137.6536| 4276.751995631693|
|(6,[0,1],[26.0,35...|  2322.6218|3279.2997092839614|
|(6,[0,1],[27.0,23...|   248

In [14]:
#calcuating the performance metrics

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="charges", predictionCol="prediction")

lr_r2 = evaluator.evaluate(prediction1, {evaluator.metricName: "r2"})
lr_mae = evaluator.evaluate(prediction1, {evaluator.metricName: "mae"})
lr_mse = evaluator.evaluate(prediction1, {evaluator.metricName: "mse"})

dt_r2 = evaluator.evaluate(prediction2, {evaluator.metricName: "r2"})
dt_mae = evaluator.evaluate(prediction2, {evaluator.metricName: "mae"})
dt_mse = evaluator.evaluate(prediction2, {evaluator.metricName: "mse"})

rf_r2 = evaluator.evaluate(prediction3, {evaluator.metricName: "r2"})
rf_mae = evaluator.evaluate(prediction3, {evaluator.metricName: "mae"})
rf_mse = evaluator.evaluate(prediction3, {evaluator.metricName: "mse"})

gbt_r2 = evaluator.evaluate(prediction4, {evaluator.metricName: "r2"})
gbt_mae = evaluator.evaluate(prediction4, {evaluator.metricName: "mae"})
gbt_mse = evaluator.evaluate(prediction4, {evaluator.metricName: "mse"})

print("LinearRegression:")
print("R2:", lr_r2)
print("MAE:", lr_mae)
print("MSE:", lr_mse)
print()

print("DecisionTreeRegressor:")
print("R2:", dt_r2)
print("MAE:", dt_mae)
print("MSE:", dt_mse)
print()

print("RandomForestRegressor:")
print("R2:", rf_r2)
print("MAE:", rf_mae)
print("MSE:", rf_mse)
print()

print("GBTRegressor:")
print("R2:", gbt_r2)
print("MAE:", gbt_mae)
print("MSE:", gbt_mse)
print()

LinearRegression:
R2: 0.7557687132844965
MAE: 4080.5628564864965
MSE: 35040412.91237266

DecisionTreeRegressor:
R2: 0.8507958391320941
MAE: 2612.1047420373457
MSE: 20981606.673856802

RandomForestRegressor:
R2: 0.8455019380189843
MAE: 3037.7464596595105
MSE: 21892610.213931773

GBTRegressor:
R2: 0.9119648604977716
MAE: 2146.4694067961077
MSE: 13717099.060869226

