In [14]:
#libraries will be imported where ever required

In [102]:
from pyspark.sql import SparkSession

In [103]:
#creating a basic spark session
spark = SparkSession.builder.appName('insurance_dataset').getOrCreate()

In [104]:
df=spark.read.csv('D://M. Tech in Data Science & Machine Learning//Big Data Analytics//Sem_Prep//insurance//insurance.csv', header=True,inferSchema=True)
df.show(2,False)                 

+---+------+-----+--------+------+---------+---------+
|age|sex   |bmi  |children|smoker|region   |charges  |
+---+------+-----+--------+------+---------+---------+
|19 |female|27.9 |0       |yes   |southwest|16884.924|
|18 |male  |33.77|1       |no    |southeast|1725.5523|
+---+------+-----+--------+------+---------+---------+
only showing top 2 rows



In [105]:
df.count()#total length of the dataset

1338

In [106]:
len(df.columns)#total columns

7

In [107]:
df.printSchema()#column summary

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [108]:
for i, t in df.dtypes:#column and its datatypes
    print('column Name:', i, ' ','column datatype:',t)

column Name: age   column datatype: int
column Name: sex   column datatype: string
column Name: bmi   column datatype: double
column Name: children   column datatype: int
column Name: smoker   column datatype: string
column Name: region   column datatype: string
column Name: charges   column datatype: double


In [109]:
df.toPandas().describe()#statsical summary

Unnamed: 0,age,bmi,children,charges
count,1338.0,1338.0,1338.0,1338.0
mean,39.207025,30.663397,1.094918,13270.422265
std,14.04996,6.098187,1.205493,12110.011237
min,18.0,15.96,0.0,1121.8739
25%,27.0,26.29625,0.0,4740.28715
50%,39.0,30.4,1.0,9382.033
75%,51.0,34.69375,2.0,16639.912515
max,64.0,53.13,5.0,63770.42801


In [110]:
from pyspark.sql.functions import isnan, col, count, max, min, when

In [111]:
df.select([count(when(isnan(c)| col(c).isNull(),c)).alias(c)  for c in df.columns]).show()

+---+---+---+--------+------+------+-------+
|age|sex|bmi|children|smoker|region|charges|
+---+---+---+--------+------+------+-------+
|  0|  0|  0|       0|     0|     0|      0|
+---+---+---+--------+------+------+-------+



In [112]:
#There are no null values present

In [113]:
#Bucketizer

In [114]:
from pyspark.ml.feature import Bucketizer

In [115]:
df.groupby('age').count().show()

+---+-----+
|age|count|
+---+-----+
| 31|   27|
| 53|   28|
| 34|   26|
| 28|   28|
| 27|   28|
| 26|   28|
| 44|   27|
| 22|   28|
| 47|   29|
| 52|   29|
| 40|   27|
| 20|   29|
| 57|   26|
| 54|   28|
| 48|   29|
| 19|   68|
| 64|   22|
| 41|   27|
| 43|   27|
| 37|   25|
+---+-----+
only showing top 20 rows



In [116]:
df.agg({'age':'max'}).show()

+--------+
|max(age)|
+--------+
|      64|
+--------+



In [117]:
df.agg({'age':'min'}).show()

+--------+
|min(age)|
+--------+
|      18|
+--------+



In [118]:
#age group are between 18 to 64
split=[18, 30, 40 ,50 ,64]
bucketizer=Bucketizer(splits=split,inputCol='age',outputCol='age_group')
df1=bucketizer.transform(df)

In [119]:
df1.select('age_group').show(3,False)

+---------+
|age_group|
+---------+
|0.0      |
|0.0      |
|0.0      |
+---------+
only showing top 3 rows



In [120]:
#String Indexer

In [121]:
from pyspark.ml.feature import StringIndexer

In [122]:
indexers=[]
for i, t in df.dtypes:
    if t=='string':
        print('column Name:', i, ' ','column datatype:',t)
        indexers.append(i)

column Name: sex   column datatype: string
column Name: smoker   column datatype: string
column Name: region   column datatype: string


In [123]:
indexers

['sex', 'smoker', 'region']

In [124]:
indexer=StringIndexer(inputCols=['sex', 'smoker', 'region'], outputCols=['sex_index', 'smoker_index', 'region_index'])
df2=indexer.fit(df1).transform(df1)

In [125]:
df2=df2.drop('sex', 'smoker', 'region')
df2.show(2,False)

+---+-----+--------+---------+---------+---------+------------+------------+
|age|bmi  |children|charges  |age_group|sex_index|smoker_index|region_index|
+---+-----+--------+---------+---------+---------+------------+------------+
|19 |27.9 |0       |16884.924|0.0      |1.0      |1.0         |2.0         |
|18 |33.77|1       |1725.5523|0.0      |0.0      |0.0         |0.0         |
+---+-----+--------+---------+---------+---------+------------+------------+
only showing top 2 rows



In [126]:
#Vector assembler

In [127]:
df2.columns

['age',
 'bmi',
 'children',
 'charges',
 'age_group',
 'sex_index',
 'smoker_index',
 'region_index']

In [128]:
from pyspark.ml.feature import VectorAssembler

In [129]:
assembler=VectorAssembler(inputCols=['age','bmi','children','age_group','sex_index','smoker_index','region_index'], 
                          outputCol='features')
df3=assembler.transform(df2)

In [130]:
df3.select('features').show()

+--------------------+
|            features|
+--------------------+
|[19.0,27.9,0.0,0....|
|(7,[0,1,2],[18.0,...|
|(7,[0,1,2],[28.0,...|
|[33.0,22.705,0.0,...|
|[32.0,28.88,0.0,1...|
|[31.0,25.74,0.0,1...|
|[46.0,33.44,1.0,2...|
|[37.0,27.74,3.0,1...|
|[37.0,29.83,2.0,1...|
|[60.0,25.84,0.0,3...|
|(7,[0,1,6],[25.0,...|
|[62.0,26.29,0.0,3...|
|(7,[0,1,6],[23.0,...|
|[56.0,39.82,0.0,3...|
|(7,[0,1,5],[27.0,...|
|[19.0,24.6,1.0,0....|
|[52.0,30.78,1.0,3...|
|(7,[0,1,6],[23.0,...|
|[56.0,40.3,0.0,3....|
|[30.0,35.3,0.0,1....|
+--------------------+
only showing top 20 rows



In [131]:
#Train Test Split

In [132]:
train1, test1 = df4.randomSplit([0.7, 0.3], 2)
print(train1.count())
print(test1.count())

934
404


In [133]:
#Model building

In [156]:
from pyspark.ml.regression import LinearRegression

In [157]:
lr=LinearRegression(featuresCol='features',labelCol='charges')
lr_model=lr.fit(train1)

In [158]:
lr_model_summary=lr_model.summary

In [159]:
print("MSE: ", lr_model.summary.meanSquaredError)
print("MAE: ", lr_model.summary.meanAbsoluteError)
print("R-squared: ", lr_model.summary.r2) 

MSE:  35093323.19303611
MAE:  4000.731256008842
R-squared:  0.742671792971815


In [160]:
print("Coefficients: ", lr_model.coefficients)
print('')
print("Intercept: ", lr_model.intercept)

Coefficients:  [214.40936066883455,309.51243449333765,506.94517505593484,400.5024706197547,87.91822204562507,23602.01575211057,332.8722363707116]

Intercept:  -11293.799062682676


In [None]:
#---------------------------------------------------------END---------------------------------------------------#