In [2]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# install findspark using pip
!pip install -q findspark
     

In [3]:
import pandas as pd

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
     

In [5]:
df = spark.read.csv("insurance.csv", inferSchema=True,
                   header=True)


In [6]:
df.show(5)


+---+------+------+--------+------+---------+-----------+
|age|gender|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
+---+------+------+--------+------+---------+-----------+
only showing top 5 rows



In [8]:
df.count()

1338

In [9]:
len(df.columns)


7

In [10]:
df.printSchema()


root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [11]:
df.describe().show()


+-------+------------------+------+------------------+-----------------+------+---------+------------------+
|summary|               age|gender|               bmi|         children|smoker|   region|           charges|
+-------+------------------+------+------------------+-----------------+------+---------+------------------+
|  count|              1338|  1338|              1338|             1338|  1338|     1338|              1338|
|   mean| 39.20702541106129|  null|30.663396860986538|  1.0949177877429|  null|     null|13270.422265141257|
| stddev|14.049960379216147|  null| 6.098186911679012|1.205492739781914|  null|     null|12110.011236693992|
|    min|                18|female|             15.96|                0|    no|northeast|         1121.8739|
|    max|                64|  male|             53.13|                5|   yes|southwest|       63770.42801|
+-------+------------------+------+------------------+-----------------+------+---------+------------------+



In [12]:
df.head(5)

[Row(age=19, gender='female', bmi=27.9, children=0, smoker='yes', region='southwest', charges=16884.924),
 Row(age=18, gender='male', bmi=33.77, children=1, smoker='no', region='southeast', charges=1725.5523),
 Row(age=28, gender='male', bmi=33.0, children=3, smoker='no', region='southeast', charges=4449.462),
 Row(age=33, gender='male', bmi=22.705, children=0, smoker='no', region='northwest', charges=21984.47061),
 Row(age=32, gender='male', bmi=28.88, children=0, smoker='no', region='northwest', charges=3866.8552)]

In [13]:
df.corr('age', 'charges')

0.299008193330648

In [14]:
df.corr('bmi', 'charges')

0.19834096883362903

In [15]:
df.columns

['age', 'gender', 'bmi', 'children', 'smoker', 'region', 'charges']

In [16]:
from pyspark.ml.feature import StringIndexer

In [17]:
indexer=StringIndexer(inputCol= "gender",outputCol='gender_cat')
indexed=indexer.fit(df).transform(df)

In [18]:
indexer=StringIndexer(inputCol= "smoker",outputCol='smoker_cat')
indexed=indexer.fit(indexed).transform(indexed)

In [19]:
indexer=StringIndexer(inputCol= "region",outputCol='region_cat')
indexed=indexer.fit(indexed).transform(indexed)

In [20]:
indexed.show()

+---+------+------+--------+------+---------+-----------+----------+----------+----------+
|age|gender|   bmi|children|smoker|   region|    charges|gender_cat|smoker_cat|region_cat|
+---+------+------+--------+------+---------+-----------+----------+----------+----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|       1.0|       1.0|       2.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|       0.0|       0.0|       0.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|       0.0|       0.0|       0.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061|       0.0|       0.0|       1.0|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|       0.0|       0.0|       1.0|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|       1.0|       0.0|       0.0|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|       1.0|       0.0|       0.0|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|       1.0|       0.0|       1.0|

In [21]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler


In [22]:
indexed.columns

['age',
 'gender',
 'bmi',
 'children',
 'smoker',
 'region',
 'charges',
 'gender_cat',
 'smoker_cat',
 'region_cat']

In [23]:
assembler= VectorAssembler(inputCols=['age',
 'bmi',
 'children',
 'gender_cat',
 'smoker_cat',
 'region_cat'],outputCol='features')

In [24]:
assembler

VectorAssembler_8f15db0e99ce

In [25]:
output= assembler.transform(indexed)

In [26]:
output

DataFrame[age: int, gender: string, bmi: double, children: int, smoker: string, region: string, charges: double, gender_cat: double, smoker_cat: double, region_cat: double, features: vector]

In [27]:
output.show()

+---+------+------+--------+------+---------+-----------+----------+----------+----------+--------------------+
|age|gender|   bmi|children|smoker|   region|    charges|gender_cat|smoker_cat|region_cat|            features|
+---+------+------+--------+------+---------+-----------+----------+----------+----------+--------------------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|       1.0|       1.0|       2.0|[19.0,27.9,0.0,1....|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|       0.0|       0.0|       0.0|[18.0,33.77,1.0,0...|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|       0.0|       0.0|       0.0|[28.0,33.0,3.0,0....|
| 33|  male|22.705|       0|    no|northwest|21984.47061|       0.0|       0.0|       1.0|[33.0,22.705,0.0,...|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|       0.0|       0.0|       1.0|[32.0,28.88,0.0,0...|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|       1.0|       0.0|       0.0|[31.0,25.74,0.

In [28]:
output.select('features','charges').show(5)

+--------------------+-----------+
|            features|    charges|
+--------------------+-----------+
|[19.0,27.9,0.0,1....|  16884.924|
|[18.0,33.77,1.0,0...|  1725.5523|
|[28.0,33.0,3.0,0....|   4449.462|
|[33.0,22.705,0.0,...|21984.47061|
|[32.0,28.88,0.0,0...|  3866.8552|
+--------------------+-----------+
only showing top 5 rows



In [29]:
#final data consist of features and label which is crew.
final_data=output.select('features','charges')


In [31]:
#splitting data into train and test
train_data,test_data=final_data.randomSplit([0.7,0.3])

In [32]:
train_data.describe().show()

+-------+-----------------+
|summary|          charges|
+-------+-----------------+
|  count|              921|
|   mean|13193.85903837024|
| stddev|12214.06918763569|
|    min|        1121.8739|
|    max|      63770.42801|
+-------+-----------------+



In [33]:
test_data.describe().show()

+-------+------------------+
|summary|           charges|
+-------+------------------+
|  count|               417|
|   mean|13439.522341534763|
| stddev|11889.726845508414|
|    min|         1136.3994|
|    max|       52590.82939|
+-------+------------------+



In [68]:
test_data.show(5)

+--------------------+-----------+
|            features|    charges|
+--------------------+-----------+
|(6,[0,1],[18.0,33...|  1136.3994|
|(6,[0,1],[18.0,34...|  1137.4697|
|(6,[0,1],[21.0,31...|16586.49771|
|(6,[0,1],[21.0,36...|  1534.3045|
|(6,[0,1],[22.0,33...|  1674.6323|
+--------------------+-----------+
only showing top 5 rows



In [34]:
#import LinearRegression library
from pyspark.ml.regression import LinearRegression

In [53]:
#creating an object of class LinearRegression
#object takes features and label as input arguments
lr=LinearRegression(featuresCol='features',labelCol='charges')

In [54]:
#pass train_data to train model
trained_model = lr.fit(train_data)

In [55]:
trained_model

LinearRegressionModel: uid=LinearRegression_a679b3886331, numFeatures=6

In [56]:
#evaluating model trained for Rsquared error
results = trained_model.evaluate(train_data)

In [57]:
print('Rsquared Error :',results.r2)

Rsquared Error : 0.7527121515764923


In [58]:
print(results.meanSquaredError)

36851207.642538935


In [59]:
print(results.rootMeanSquaredError)

6070.519552932758


In [60]:
print(results.meanAbsoluteError)

4073.636494051271


In [61]:
#testing Model on unlabeled data from the test data
#create unlabeled data from test_data
#testing model on unlabeled data
unlabeled_data=test_data.select('features')
unlabeled_data.show(5)


+--------------------+
|            features|
+--------------------+
|(6,[0,1],[18.0,33...|
|(6,[0,1],[18.0,34...|
|(6,[0,1],[21.0,31...|
|(6,[0,1],[21.0,36...|
|(6,[0,1],[22.0,33...|
+--------------------+
only showing top 5 rows



In [62]:
predictions=trained_model.transform(unlabeled_data)
predictions.show()


+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|(6,[0,1],[18.0,33...| 2936.534998771416|
|(6,[0,1],[18.0,34...|3176.3432892221554|
|(6,[0,1],[21.0,31...| 2882.802282623221|
|(6,[0,1],[21.0,36...| 4698.493624607387|
|(6,[0,1],[22.0,33...|3995.4161750796884|
|(6,[0,1],[23.0,41...| 6786.688100691323|
|(6,[0,1],[24.0,32...|3959.5943643142236|
|(6,[0,1],[24.0,35...| 5158.635816567918|
|(6,[0,1],[25.0,25...| 2263.025425776321|
|(6,[0,1],[28.0,38...| 6868.425209813906|
|(6,[0,1],[33.0,30...| 5716.862539475549|
|(6,[0,1],[34.0,34...| 7206.318031211746|
|(6,[0,1],[40.0,41...|11072.818554956166|
|(6,[0,1],[41.0,33...| 8793.858053895043|
|(6,[0,1],[49.0,35...|11562.528623449301|
|(6,[0,1],[49.0,36...|11870.853568314536|
|(6,[0,1],[52.0,34...|11782.862524959097|
|(6,[0,1],[53.0,31...|11182.560057053144|
|(6,[0,1],[55.0,33...|12208.746389712376|
|(6,[0,1],[58.0,36...|13936.446688341099|
+--------------------+------------

In [65]:
# Get summary of the model
summary = trained_model.summary
print('R2 score:', summary.r2)
print('MAE score:', summary.meanAbsoluteError)
print('MSE score:', summary.meanSquaredError)

R2 score: 0.7527121515764923
MAE score: 4073.636494051271
MSE score: 36851207.642538935
