In [None]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('Insurance').getOrCreate()

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
dataset=spark.read.csv("Insurance.csv",inferSchema=True,header=True)

In [None]:
dataset.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [None]:
dataset.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [None]:
indexers = [StringIndexer(inputCol="Sex", outputCol="Sex2") , 
            StringIndexer(inputCol="smoker", outputCol="smoker2"),
            StringIndexer(inputCol="region", outputCol="region2")
           ]

In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages=indexers)
DF6 = pipeline.fit(dataset).transform(dataset)

DF6.show()

+---+------+------+--------+------+---------+-----------+----+-------+-------+
|age|   sex|   bmi|children|smoker|   region|    charges|Sex2|smoker2|region2|
+---+------+------+--------+------+---------+-----------+----+-------+-------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924| 1.0|    1.0|    2.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523| 0.0|    0.0|    0.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462| 0.0|    0.0|    0.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061| 0.0|    0.0|    1.0|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552| 0.0|    0.0|    1.0|
| 31|female| 25.74|       0|    no|southeast|  3756.6216| 1.0|    0.0|    0.0|
| 46|female| 33.44|       1|    no|southeast|  8240.5896| 1.0|    0.0|    0.0|
| 37|female| 27.74|       3|    no|northwest|  7281.5056| 1.0|    0.0|    1.0|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107| 0.0|    0.0|    3.0|
| 60|female| 25.84|       0|    no|northwest|28923.1

In [None]:
drop_list =["sex","smoker","region","children","region2"]
data = DF6.select([column for column in DF6.columns if column not in drop_list])

In [None]:
data.show()

+---+------+-----------+----+-------+
|age|   bmi|    charges|Sex2|smoker2|
+---+------+-----------+----+-------+
| 19|  27.9|  16884.924| 1.0|    1.0|
| 18| 33.77|  1725.5523| 0.0|    0.0|
| 28|  33.0|   4449.462| 0.0|    0.0|
| 33|22.705|21984.47061| 0.0|    0.0|
| 32| 28.88|  3866.8552| 0.0|    0.0|
| 31| 25.74|  3756.6216| 1.0|    0.0|
| 46| 33.44|  8240.5896| 1.0|    0.0|
| 37| 27.74|  7281.5056| 1.0|    0.0|
| 37| 29.83|  6406.4107| 0.0|    0.0|
| 60| 25.84|28923.13692| 1.0|    0.0|
| 25| 26.22|  2721.3208| 0.0|    0.0|
| 62| 26.29| 27808.7251| 1.0|    1.0|
| 23|  34.4|   1826.843| 0.0|    0.0|
| 56| 39.82| 11090.7178| 1.0|    0.0|
| 27| 42.13| 39611.7577| 0.0|    1.0|
| 19|  24.6|   1837.237| 0.0|    0.0|
| 52| 30.78| 10797.3362| 1.0|    0.0|
| 23|23.845| 2395.17155| 0.0|    0.0|
| 56|  40.3|  10602.385| 0.0|    0.0|
| 30|  35.3|  36837.467| 0.0|    1.0|
+---+------+-----------+----+-------+
only showing top 20 rows



In [None]:
assembler = VectorAssembler(inputCols=["age","bmi","Sex2","smoker2"],outputCol="feature")

In [None]:
output = assembler.transform(data)

In [None]:
finnal_df=output.select("feature","charges")

In [None]:
train_data,test_data = finnal_df.randomSplit([0.7,0.3])

In [None]:
train_data.show()

+--------------------+-----------+
|             feature|    charges|
+--------------------+-----------+
|[18.0,15.96,0.0,0.0]|  1694.7964|
|[18.0,17.29,0.0,1.0]| 12829.4551|
|[18.0,21.47,0.0,0.0]|  1702.4553|
|[18.0,21.565,0.0,...|13747.87235|
|[18.0,21.66,1.0,1.0]| 14283.4594|
|[18.0,21.78,0.0,0.0]|11884.04858|
|[18.0,23.085,0.0,...| 1704.70015|
|[18.0,23.32,0.0,0.0]|  1711.0268|
|[18.0,23.75,0.0,0.0]|  1705.6245|
|[18.0,25.08,1.0,0.0]|  2196.4732|
|[18.0,25.175,0.0,...|15518.18025|
|[18.0,26.125,0.0,...| 1708.92575|
|[18.0,26.18,0.0,0.0]|  2304.0022|
|[18.0,26.315,1.0,...| 2198.18985|
|[18.0,27.28,1.0,1.0]| 18223.4512|
|[18.0,27.36,0.0,1.0]| 17178.6824|
|[18.0,28.215,1.0,...| 2200.83085|
|[18.0,28.31,0.0,0.0]|11272.33139|
| [18.0,28.5,0.0,0.0]|   1712.227|
|[18.0,29.165,1.0,...|7323.734819|
+--------------------+-----------+
only showing top 20 rows



In [None]:
train_data.describe().show()

+-------+------------------+
|summary|           charges|
+-------+------------------+
|  count|               936|
|   mean|12999.827788748924|
| stddev|11932.382452084874|
|    min|         1131.5066|
|    max|       60021.39897|
+-------+------------------+



In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
lm = LinearRegression(featuresCol="feature",labelCol="charges")

In [None]:
regressor=lm.fit(train_data)

In [None]:
coefficient = regressor.coefficients
print ("The coefficients of the model are : %a" %coefficient)

The coefficients of the model are : DenseVector([269.6497, 276.0702, 159.8231, 24122.8753])


In [None]:
intercept = regressor.intercept
print ("The Intercept of the model is : %f" %intercept)

The Intercept of the model is : -10840.355189


In [None]:
pred_results=regressor.evaluate(test_data)
pred_results.predictions.show(40)



+--------------------+-----------+-------------------+
|             feature|    charges|         prediction|
+--------------------+-----------+-------------------+
|[18.0,20.79,1.0,0.0]|  1607.5101| -87.33913075220335|
|[18.0,22.99,0.0,0.0]|  1704.5681| 360.19219656077075|
|[18.0,23.21,0.0,0.0]|  1121.8739| 420.92763756006934|
|[18.0,24.09,1.0,0.0]|  2201.0971|  823.6924842372864|
|[18.0,25.46,0.0,0.0]|  1708.0014|  1042.085556871085|
|[18.0,26.73,1.0,0.0]|  1615.7667| 1552.5177762288768|
|[18.0,30.03,0.0,0.0]|  1720.3537| 2303.7263085383474|
|[18.0,30.115,1.0,...| 2203.47185| 2487.0153570590046|
|[18.0,30.115,1.0,...| 21344.8467| 2487.0153570590046|
|[18.0,31.13,1.0,0.0]|  1621.8827| 2767.2265962148595|
|[18.0,31.68,0.0,1.0]| 34303.1672| 26882.117418151647|
|[18.0,33.66,0.0,0.0]|  1136.3994| 3305.8610850267814|
| [18.0,34.1,0.0,0.0]|   1137.011| 3427.3319670253823|
| [18.0,35.2,0.0,0.0]|    1727.54|  3731.009172021879|
|[18.0,36.85,1.0,0.0]|  1629.8335|  4346.348062196641|
|[18.0,37.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluation = RegressionEvaluator(labelCol="charges", predictionCol="prediction")

In [None]:
r2 = evaluation.evaluate(pred_results.predictions, {evaluation.metricName: "r2"})
print("r2: %.3f" %r2)

r2: 0.711
