In [1]:
#importing python spark library
from pyspark.sql import SparkSession

In [2]:
#Creating a spark session
spark= SparkSession.builder.appName('Ans2').getOrCreate()

In [3]:
#importing all the tools which will be required in data prepretion and creating layers 
import numpy as np
from pyspark.ml.feature import StringIndexer , OneHotEncoder
from pyspark.ml.feature import MinMaxScaler , StandardScaler , VectorAssembler

In [4]:
from pyspark.ml import Pipeline

In [5]:
#importing the LinearRegression Model and crossValidator for model tunning.
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [6]:
#importing the RegressionEvaluator for evaluating our model.
from pyspark.ml.evaluation import RegressionEvaluator

In [7]:
#importing functions
from pyspark.sql.functions import isnan,when,count,col

In [8]:
#reading the file
df=spark.read.csv('insurance.csv', header=True,inferSchema=True)

In [9]:
#showing the data
df.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [68]:
#all the Columns
df.columns

['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges']

In [69]:
#data type in each columns 
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [12]:
#understanding Charges column 
df.select("charges").show(10)

+-----------+
|    charges|
+-----------+
|  16884.924|
|  1725.5523|
|   4449.462|
|21984.47061|
|  3866.8552|
|  3756.6216|
|  8240.5896|
|  7281.5056|
|  6406.4107|
|28923.13692|
+-----------+
only showing top 10 rows



In [13]:
#no of rowa
df.count()

1338

In [14]:
#checking of Columns have any unique values
df.groupby('charges').count().show()

+-----------+-----+
|    charges|count|
+-----------+-----+
|  9361.3268|    1|
|   2494.022|    1|
|   7441.501|    1|
| 11353.2276|    1|
|   7954.517|    1|
|  1705.6245|    1|
|  2523.1695|    1|
| 2803.69785|    1|
| 8017.06115|    1|
| 12231.6136|    1|
| 10560.4917|    1|
|  1163.4627|    1|
|45702.02235|    1|
|  15170.069|    1|
| 3279.86855|    1|
| 1632.03625|    1|
| 17748.5062|    1|
|36910.60803|    1|
| 12949.1554|    1|
|   1743.214|    1|
+-----------+-----+
only showing top 20 rows



In [15]:
#checking corelation between Bmi and Charges
df.corr('bmi','charges')

0.19834096883362903

In [16]:
#checking corelation between Bmi and age 
df.corr('bmi','age')

0.1092718815485351

In [70]:
#checking corelation between children and age 
df.corr('children','age')

0.04246899855884959

In [71]:
#checking corelation between children and bmi 
df.corr('children','bmi')

0.012758900820673989

In [None]:
# no two columns has a significant corelation that we exclude.

In [17]:
#checking null values in charges
df.select('*').where(df.charges.isNull()).count()

0

In [18]:
#checking null value in each calumns with the help of a loop
def get_null_value_count(data):
    data.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in data.columns]).show()

In [19]:
#NO null value in any culumns. 
get_null_value_count(df)

+---+---+---+--------+------+------+-------+
|age|sex|bmi|children|smoker|region|charges|
+---+---+---+--------+------+------+-------+
|  0|  0|  0|       0|     0|     0|      0|
+---+---+---+--------+------+------+-------+



In [20]:
#converting string columns to numeric with the help of string indexer
indexer=StringIndexer(inputCols=["sex","smoker","region"],outputCols=["sex_indexed","smoker_indexed","region_indexed"])
df_r=indexer.fit(df).transform(df)   
df_r.show()

+---+------+------+--------+------+---------+-----------+-----------+--------------+--------------+
|age|   sex|   bmi|children|smoker|   region|    charges|sex_indexed|smoker_indexed|region_indexed|
+---+------+------+--------+------+---------+-----------+-----------+--------------+--------------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|        1.0|           1.0|           2.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|        0.0|           0.0|           0.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|        0.0|           0.0|           0.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061|        0.0|           0.0|           1.0|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|        0.0|           0.0|           1.0|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|        1.0|           0.0|           0.0|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|        1.0|           0.0|           0.0|


In [21]:
#joining all the coulums other than target colmun.
featureassembler=VectorAssembler(inputCols=['age','sex_indexed','bmi','children','smoker_indexed','region_indexed'],outputCol="Independent Features")
output=featureassembler.transform(df_r)

In [22]:
#single column for all the independent features
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|[19.0,1.0,27.9,0....|
|[18.0,0.0,33.77,1...|
|[28.0,0.0,33.0,3....|
|[33.0,0.0,22.705,...|
|[32.0,0.0,28.88,0...|
|[31.0,1.0,25.74,0...|
|[46.0,1.0,33.44,1...|
|[37.0,1.0,27.74,3...|
|[37.0,0.0,29.83,2...|
|[60.0,1.0,25.84,0...|
|[25.0,0.0,26.22,0...|
|[62.0,1.0,26.29,0...|
|[23.0,0.0,34.4,0....|
|[56.0,1.0,39.82,0...|
|[27.0,0.0,42.13,0...|
|[19.0,0.0,24.6,1....|
|[52.0,1.0,30.78,1...|
|[23.0,0.0,23.845,...|
|[56.0,0.0,40.3,0....|
|[30.0,0.0,35.3,0....|
+--------------------+
only showing top 20 rows



In [23]:
#making a data set wuth two columns that are dependent and independent.
finalized_data=output.select("Independent Features","charges")

In [24]:
#showing final data. 
finalized_data.show()

+--------------------+-----------+
|Independent Features|    charges|
+--------------------+-----------+
|[19.0,1.0,27.9,0....|  16884.924|
|[18.0,0.0,33.77,1...|  1725.5523|
|[28.0,0.0,33.0,3....|   4449.462|
|[33.0,0.0,22.705,...|21984.47061|
|[32.0,0.0,28.88,0...|  3866.8552|
|[31.0,1.0,25.74,0...|  3756.6216|
|[46.0,1.0,33.44,1...|  8240.5896|
|[37.0,1.0,27.74,3...|  7281.5056|
|[37.0,0.0,29.83,2...|  6406.4107|
|[60.0,1.0,25.84,0...|28923.13692|
|[25.0,0.0,26.22,0...|  2721.3208|
|[62.0,1.0,26.29,0...| 27808.7251|
|[23.0,0.0,34.4,0....|   1826.843|
|[56.0,1.0,39.82,0...| 11090.7178|
|[27.0,0.0,42.13,0...| 39611.7577|
|[19.0,0.0,24.6,1....|   1837.237|
|[52.0,1.0,30.78,1...| 10797.3362|
|[23.0,0.0,23.845,...| 2395.17155|
|[56.0,0.0,40.3,0....|  10602.385|
|[30.0,0.0,35.3,0....|  36837.467|
+--------------------+-----------+
only showing top 20 rows



In [25]:
#splitting data set into test and train (20-80 )
train_data,test_data=finalized_data.randomSplit([0.8,0.2])
                          

In [27]:
#defining linearRegression columns. 
regressor=LinearRegression(featuresCol='Independent Features', labelCol='charges')

In [28]:
#using it on our data set
regressor=regressor.fit(train_data)

In [29]:
#finging the coefficients
regressor.coefficients

DenseVector([255.5946, 245.5816, 342.1488, 506.0454, 23660.7014, 242.5066])

In [30]:
#finding the intercept
regressor.intercept

-13100.273107330542

In [31]:
#predictions 
pred_results=regressor.evaluate(test_data)

In [32]:
#predicition values
pred_results.predictions.show()

+--------------------+-----------+-------------------+
|Independent Features|    charges|         prediction|
+--------------------+-----------+-------------------+
|(6,[0,2],[18.0,41...|  1146.7966|  5576.431682931072|
|(6,[0,2],[23.0,26...|  1815.8759| 1848.7679792329054|
|(6,[0,2],[23.0,41...|  1837.2819|  7117.859416029927|
|(6,[0,2],[27.0,23...|   2483.736| 1704.4191186636272|
|(6,[0,2],[36.0,29...|   4399.731|  6262.952853271419|
|(6,[0,2],[48.0,29...|   7789.635|  9330.088440293037|
|(6,[0,2],[50.0,25...|   8442.667|   8335.82300856892|
|(6,[0,2],[53.0,31...|27346.04207| 11172.607112637435|
|(6,[0,2],[58.0,49...| 11381.3254|  18510.03542621302|
|(6,[0,2],[59.0,26...|  11743.299| 11012.538372892059|
|(6,[0,2],[62.0,39...| 12982.8747|   16408.5954605477|
|(6,[0,2],[63.0,41...| 13405.3903|  17191.09923647921|
|[18.0,0.0,22.99,0...|  1704.5681|  93.95090150647047|
|[18.0,0.0,23.32,1...|  1711.0268| -14.61446610832536|
|[18.0,0.0,23.75,0...|  1705.6245|  353.9839854003494|
|[18.0,0.0

In [72]:
#Finding MAE, MSE and R*2
pred_results.r2,pred_results.meanAbsoluteError,pred_results.meanSquaredError


(0.7274968489881224, 4229.10787793482, 39227258.07665372)

In [49]:
#building LinearRegression for cross validation 
lr = LinearRegression(labelCol="charges", featuresCol="Independent Features")

In [58]:

lrparamGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0])
             #  .addGrid(lr.regParam, [0.01, 0.1, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
             #  .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10, 20, 50,70,100,120,140,160,180,200])
             #  .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [59]:
#giving the columns which it need to tune
lrevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="charges", metricName="rmse")

In [60]:
#Building cross validation Model
lrcv = CrossValidator(estimator = lr,
                    estimatorParamMaps = lrparamGrid,
                    evaluator = lrevaluator,
                    numFolds = 10)

In [61]:
#fitting train to above model
lrcvModel = lrcv.fit(train_data)

In [62]:
lrcvSummary = lrcvModel.bestModel.summary


In [63]:
print(lrcvSummary)

<pyspark.ml.regression.LinearRegressionTrainingSummary object at 0x7fce944324a8>


In [64]:
lrpredictions=lrcvModel.transform(test_data)

In [65]:
#Calculating RMSE
print('RMSE:', lrevaluator.evaluate(lrpredictions))

RMSE: 6262.315754225717
