In [1]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://miro.medium.com/max/810/0*6rCBTaD7N99u8qRT")

In [2]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('insurance').getOrCreate()

In [3]:
from pyspark.ml.regression import LinearRegression

In [4]:
#!pip install pyspark

In [5]:
dataset=spark.read.csv("insurance.csv",inferSchema=True,header=True)

In [6]:
dataset.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [7]:
dataset.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [9]:
indexers = [StringIndexer(inputCol="sex", outputCol="Sex2") , 
            StringIndexer(inputCol="smoker", outputCol="smoker2"),
            StringIndexer(inputCol="region", outputCol="region2")
           ]

In [10]:
from pyspark.ml import Pipeline

In [11]:
pipeline = Pipeline(stages=indexers)
DF6 = pipeline.fit(dataset).transform(dataset)

DF6.show()

+---+------+------+--------+------+---------+-----------+----+-------+-------+
|age|   sex|   bmi|children|smoker|   region|    charges|Sex2|smoker2|region2|
+---+------+------+--------+------+---------+-----------+----+-------+-------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924| 1.0|    1.0|    2.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523| 0.0|    0.0|    0.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462| 0.0|    0.0|    0.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061| 0.0|    0.0|    1.0|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552| 0.0|    0.0|    1.0|
| 31|female| 25.74|       0|    no|southeast|  3756.6216| 1.0|    0.0|    0.0|
| 46|female| 33.44|       1|    no|southeast|  8240.5896| 1.0|    0.0|    0.0|
| 37|female| 27.74|       3|    no|northwest|  7281.5056| 1.0|    0.0|    1.0|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107| 0.0|    0.0|    3.0|
| 60|female| 25.84|       0|    no|northwest|28923.1

In [12]:
drop_list =["sex","smoker","region","children","region2"]
data = DF6.select([column for column in DF6.columns if column not in drop_list])

In [13]:
data.show()

+---+------+-----------+----+-------+
|age|   bmi|    charges|Sex2|smoker2|
+---+------+-----------+----+-------+
| 19|  27.9|  16884.924| 1.0|    1.0|
| 18| 33.77|  1725.5523| 0.0|    0.0|
| 28|  33.0|   4449.462| 0.0|    0.0|
| 33|22.705|21984.47061| 0.0|    0.0|
| 32| 28.88|  3866.8552| 0.0|    0.0|
| 31| 25.74|  3756.6216| 1.0|    0.0|
| 46| 33.44|  8240.5896| 1.0|    0.0|
| 37| 27.74|  7281.5056| 1.0|    0.0|
| 37| 29.83|  6406.4107| 0.0|    0.0|
| 60| 25.84|28923.13692| 1.0|    0.0|
| 25| 26.22|  2721.3208| 0.0|    0.0|
| 62| 26.29| 27808.7251| 1.0|    1.0|
| 23|  34.4|   1826.843| 0.0|    0.0|
| 56| 39.82| 11090.7178| 1.0|    0.0|
| 27| 42.13| 39611.7577| 0.0|    1.0|
| 19|  24.6|   1837.237| 0.0|    0.0|
| 52| 30.78| 10797.3362| 1.0|    0.0|
| 23|23.845| 2395.17155| 0.0|    0.0|
| 56|  40.3|  10602.385| 0.0|    0.0|
| 30|  35.3|  36837.467| 0.0|    1.0|
+---+------+-----------+----+-------+
only showing top 20 rows



In [14]:
assembler = VectorAssembler(inputCols=["age","bmi","Sex2","smoker2"],outputCol="feature")

In [15]:
output = assembler.transform(data)

In [16]:
finnal_df=output.select("feature","charges")

In [17]:
train_data,test_data = finnal_df.randomSplit([0.7,0.3])

In [18]:
train_data.show()

+--------------------+-----------+
|             feature|    charges|
+--------------------+-----------+
|[18.0,15.96,0.0,0.0]|  1694.7964|
|[18.0,21.47,0.0,0.0]|  1702.4553|
|[18.0,21.565,0.0,...|13747.87235|
|[18.0,21.66,1.0,1.0]| 14283.4594|
|[18.0,23.32,0.0,0.0]|  1711.0268|
|[18.0,24.09,1.0,0.0]|  2201.0971|
|[18.0,25.08,1.0,0.0]|  2196.4732|
|[18.0,25.175,0.0,...|15518.18025|
|[18.0,26.18,0.0,0.0]|  2304.0022|
|[18.0,26.73,1.0,0.0]|  1615.7667|
|[18.0,27.28,1.0,1.0]| 18223.4512|
|[18.0,28.215,1.0,...| 2200.83085|
|[18.0,28.31,0.0,0.0]|11272.33139|
| [18.0,28.5,0.0,0.0]|   1712.227|
|[18.0,29.165,1.0,...|7323.734819|
|[18.0,30.115,1.0,...| 2203.47185|
|[18.0,30.115,1.0,...| 21344.8467|
|[18.0,30.14,0.0,0.0]|  1131.5066|
|[18.0,30.305,1.0,...| 2203.73595|
| [18.0,30.4,0.0,0.0]|   3481.868|
+--------------------+-----------+
only showing top 20 rows



In [19]:
train_data.describe().show()

+-------+------------------+
|summary|           charges|
+-------+------------------+
|  count|               927|
|   mean|13088.160894454144|
| stddev| 12155.83895065082|
|    min|         1131.5066|
|    max|       63770.42801|
+-------+------------------+



In [20]:
#regression libarry
from pyspark.ml.regression import LinearRegression

In [21]:
#dependent and inpendent variable
lm = LinearRegression(featuresCol="feature",labelCol="charges")

In [22]:
#training
regressor=lm.fit(train_data)

# y  = M1*x1+m2*x2+M3*x3+b

In [23]:
#coffecient
#here m1 ,m2 and M3 are coffecient
coefficient = regressor.coefficients
print ("The coefficients of the model are : %a" %coefficient)

The coefficients of the model are : DenseVector([275.8824, 308.2065, 39.0301, 24392.2132])


In [24]:
#here b is intercept

#Find out intercept Value
intercept = regressor.intercept
print ("The Intercept of the model is : %f" %intercept)

The Intercept of the model is : -12017.846096


In [25]:
pred_results=regressor.evaluate(test_data)
pred_results.predictions.show(40)



+--------------------+-----------+-------------------+
|             feature|    charges|         prediction|
+--------------------+-----------+-------------------+
|[18.0,17.29,0.0,1.0]| 12829.4551| 22669.141288549006|
|[18.0,20.79,1.0,0.0]|  1607.5101| -605.3190114113131|
|[18.0,21.78,0.0,0.0]|11884.04858|-339.22465263234517|
|[18.0,22.99,0.0,0.0]|  1704.5681|  33.70523221580879|
|[18.0,23.085,0.0,...| 1704.70015|  62.98485127413551|
|[18.0,23.21,0.0,0.0]|  1121.8739| 101.51066582456588|
|[18.0,23.75,0.0,0.0]|  1705.6245|  267.9421846824189|
|[18.0,25.46,0.0,0.0]|  1708.0014|  794.9753277322907|
|[18.0,26.125,0.0,...| 1708.92575|   999.932661140574|
|[18.0,26.315,1.0,...| 2198.18985|   1097.52199171766|
|[18.0,27.36,0.0,1.0]| 17178.6824| 25772.780908731584|
|[18.0,29.37,0.0,0.0]|  1719.4363|  2000.062806869717|
|[18.0,30.03,0.0,0.0]|  1720.3537| 2203.4791076959846|
|[18.0,31.35,1.0,0.0]|  4561.1885| 2649.3418018089487|
|[18.0,31.68,0.0,1.0]| 34303.1672| 27104.233059594415|
|[18.0,32.

In [26]:
#model Evaluation

In [27]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://4.bp.blogspot.com/-wG7IbjTfE6k/XGUvqm7TCVI/AAAAAAAAAZU/vpH1kuKTIooKTcVlnm1EVRCXLVZM9cPNgCLcBGAs/s1600/formula-MAE-MSE-RMSE-RSquared.JPG")

In [28]:
#Evaluate the model using metric like Mean Absolute Error(MAE), Root Mean Square Error(RMSE) and R-Square
from pyspark.ml.evaluation import RegressionEvaluator
evaluation = RegressionEvaluator(labelCol="charges", predictionCol="prediction")

In [29]:
# r2 - coefficient of determination
r2 = evaluation.evaluate(pred_results.predictions, {evaluation.metricName: "r2"})
print("r2: %.3f" %r2)

r2: 0.729


In [30]:
print("RMSE: {}".format(pred_results.rootMeanSquaredError))
print("MSE: {}".format(pred_results.meanSquaredError))
print("R2: {}".format(pred_results.r2))

RMSE: 6242.696260847681
MSE: 38971256.60520162
R2: 0.7291837817259196
