In [1]:
# import python spark library
from pyspark.sql import SparkSession
import numpy as np

from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml import Pipeline

#ML Logisitic Regression
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [2]:
# Creating a spark session. It can take some time first time at least
spark=SparkSession.builder.appName('recommender').getOrCreate()

In [3]:
df= spark.read.csv('insurance.csv', header = True,inferSchema=True)

In [4]:
df.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [5]:
df.cache()

DataFrame[age: int, sex: string, bmi: double, children: int, smoker: string, region: string, charges: double]

In [6]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [7]:
df.groupby('sex').count().show()

+------+-----+
|   sex|count|
+------+-----+
|female|  662|
|  male|  676|
+------+-----+



In [8]:
df.select(["age","charges"]).show()

+---+-----------+
|age|    charges|
+---+-----------+
| 19|  16884.924|
| 18|  1725.5523|
| 28|   4449.462|
| 33|21984.47061|
| 32|  3866.8552|
| 31|  3756.6216|
| 46|  8240.5896|
| 37|  7281.5056|
| 37|  6406.4107|
| 60|28923.13692|
| 25|  2721.3208|
| 62| 27808.7251|
| 23|   1826.843|
| 56| 11090.7178|
| 27| 39611.7577|
| 19|   1837.237|
| 52| 10797.3362|
| 23| 2395.17155|
| 56|  10602.385|
| 30|  36837.467|
+---+-----------+
only showing top 20 rows



In [9]:
df.corr('age','charges')

0.299008193330648

In [10]:
df.corr('bmi','charges')

0.19834096883362903

In [11]:
df.corr('children','charges')

0.06799822684790492

In [12]:
vc=VectorAssembler(
    inputCols=['age',"bmi",'children','charges'],
    outputCol='numeric')
dx=vc.transform(df).select('numeric')


In [13]:
dx.show()

+--------------------+
|             numeric|
+--------------------+
|[19.0,27.9,0.0,16...|
|[18.0,33.77,1.0,1...|
|[28.0,33.0,3.0,44...|
|[33.0,22.705,0.0,...|
|[32.0,28.88,0.0,3...|
|[31.0,25.74,0.0,3...|
|[46.0,33.44,1.0,8...|
|[37.0,27.74,3.0,7...|
|[37.0,29.83,2.0,6...|
|[60.0,25.84,0.0,2...|
|[25.0,26.22,0.0,2...|
|[62.0,26.29,0.0,2...|
|[23.0,34.4,0.0,18...|
|[56.0,39.82,0.0,1...|
|[27.0,42.13,0.0,3...|
|[19.0,24.6,1.0,18...|
|[52.0,30.78,1.0,1...|
|[23.0,23.845,0.0,...|
|[56.0,40.3,0.0,10...|
|[30.0,35.3,0.0,36...|
+--------------------+
only showing top 20 rows



In [14]:
abc=df.toPandas()

In [15]:
abc.head(2)

Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,19,female,27.9,0,yes,southwest,16884.924
1,18,male,33.77,1,no,southeast,1725.5523


In [16]:
from pyspark.sql.functions import isnan, when, count, col
def get_null_value_count(data):
    data.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in data.columns]).show()

In [17]:
get_null_value_count(df)

+---+---+---+--------+------+------+-------+
|age|sex|bmi|children|smoker|region|charges|
+---+---+---+--------+------+------+-------+
|  0|  0|  0|       0|     0|     0|      0|
+---+---+---+--------+------+------+-------+



In [18]:
df.dtypes

[('age', 'int'),
 ('sex', 'string'),
 ('bmi', 'double'),
 ('children', 'int'),
 ('smoker', 'string'),
 ('region', 'string'),
 ('charges', 'double')]

In [19]:
 cat_cols=[c[0] for c in df.dtypes if c[1]=="string"]

In [20]:
cat_cols

['sex', 'smoker', 'region']

In [21]:
len(cat_cols)

3

In [22]:
cat_cols=cat_cols[0:3]

In [23]:
ohe_cols=[c+"_ohe" for c in cat_cols]
ohe_cols

['sex_ohe', 'smoker_ohe', 'region_ohe']

In [24]:
cat_cols_si= [c+"_index" for c in cat_cols]
print (cat_cols_si)

['sex_index', 'smoker_index', 'region_index']


In [25]:
cat_cols_ohe= [c+"_ohe" for c in cat_cols]
print (cat_cols_ohe)

['sex_ohe', 'smoker_ohe', 'region_ohe']


In [26]:
num_cols=[c[0] for c in df.dtypes if c[1]!="string"]
num_cols

['age', 'bmi', 'children', 'charges']

In [27]:
df.groupby('region').count().show()

+---------+-----+
|   region|count|
+---------+-----+
|northwest|  325|
|southeast|  364|
|northeast|  324|
|southwest|  325|
+---------+-----+



In [28]:
df.groupby('region').count().orderBy("count", ascending=False).first()[0]

'southeast'

In [29]:
[[i, df.groupby(i).count().orderBy("count", ascending=False).first()[0]] for i in df.columns]

[['age', 18],
 ['sex', 'male'],
 ['bmi', 32.3],
 ['children', 0],
 ['smoker', 'no'],
 ['region', 'southeast'],
 ['charges', 1639.5631]]

# String To label

In [30]:
lable_To_integer=StringIndexer(
                inputCol="sex",
                outputCol="label"
                )

model=lable_To_integer.fit(df.select("sex"))
model.transform(df.select('sex')).take(10)

[Row(sex='female', label=1.0),
 Row(sex='male', label=0.0),
 Row(sex='male', label=0.0),
 Row(sex='male', label=0.0),
 Row(sex='male', label=0.0),
 Row(sex='female', label=1.0),
 Row(sex='female', label=1.0),
 Row(sex='female', label=1.0),
 Row(sex='male', label=0.0),
 Row(sex='female', label=1.0)]

In [31]:
## Pipeline
pipe = Pipeline(
                stages=[
                    StringIndexer(
                        inputCol="sex",
                        outputCol="label"
                    ),
                    StringIndexer(
                        inputCols=cat_cols,
                        outputCols=cat_cols_si
                    ),
                    OneHotEncoder(
                        inputCols=cat_cols_si,
                        outputCols=ohe_cols
                    ),
                    VectorAssembler(
                        inputCols=cat_cols_si+num_cols,
                        outputCol="ass_features"
                    ),
                    StandardScaler(
                        inputCol="ass_features",
                        outputCol="features"
                    )
                ]
    )

In [32]:
cat_cols

['sex', 'smoker', 'region']

In [33]:
model=pipe.fit(df)

df_trans=model.transform(df)

In [34]:
print (df_trans.columns)

['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges', 'label', 'sex_index', 'smoker_index', 'region_index', 'sex_ohe', 'smoker_ohe', 'region_ohe', 'ass_features', 'features']


In [35]:
len(df.columns)

7

In [36]:
len(df_trans.columns)

16

In [37]:
pipe = Pipeline(
                stages=[
                    StringIndexer(
                        inputCol="sex",
                        handleInvalid='keep',
                        outputCol="label"
                    ),
                    StringIndexer(
                        inputCols=cat_cols,
                        handleInvalid='keep',
                        outputCols=cat_cols_si
                    ),
                    OneHotEncoder(
                        inputCols=cat_cols_si,
                        outputCols=ohe_cols
                    ),
                    VectorAssembler(
                        inputCols=cat_cols_si+num_cols,
                        outputCol="ass_features"
                    ),
                    StandardScaler(
                        inputCol="ass_features",
                        outputCol="features"
                    ),
                    LogisticRegression(
                        featuresCol='features',
                        labelCol='label',
                        maxIter=30
                    )
                ]
    )

In [38]:
train, test=df.randomSplit([0.8,0.2], seed=12345)

In [39]:
train.count()

1094

In [40]:
test.count()

244

# Training of model

In [42]:
import time
start=time.time()
lrModel=pipe.fit(train)
end=time.time()
(end-start)/60

0.09182878732681274

# Prediction

In [43]:
predictions=lrModel.transform(test)

In [44]:
predictions.columns

['age',
 'sex',
 'bmi',
 'children',
 'smoker',
 'region',
 'charges',
 'label',
 'sex_index',
 'smoker_index',
 'region_index',
 'sex_ohe',
 'smoker_ohe',
 'region_ohe',
 'ass_features',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [45]:
predictions.select("label","prediction","rawprediction","probability").show(5)

+-----+----------+--------------------+--------------------+
|label|prediction|       rawprediction|         probability|
+-----+----------+--------------------+--------------------+
|  0.0|       0.0|[41.8780403573704...|[0.99999998834081...|
|  0.0|       0.0|[47.7358238849391...|[0.99999999966107...|
|  0.0|       0.0|[48.3279019699841...|[0.99999999896768...|
|  0.0|       0.0|[52.8889430630610...|[0.99999999996651...|
|  1.0|       1.0|[16.6545370723594...|[5.34709890089073...|
+-----+----------+--------------------+--------------------+
only showing top 5 rows



# Evaluation Of Model

In [46]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

evaluator.evaluate(predictions)

0.999593578540947

In [47]:
evaluator.getMetricName()

'areaUnderROC'

In [48]:
pipe.getStages()

[StringIndexer_ac546bd52e06,
 StringIndexer_3faaa8909f79,
 OneHotEncoder_a9e752012ef7,
 VectorAssembler_68f58c593701,
 StandardScaler_8581eacddb8d,
 LogisticRegression_f4a7ce0473ec]

# Model Tuning

In [49]:
lr=pipe.getStages()[5]

In [50]:
grid=(ParamGridBuilder().addGrid(lr.regParam, [0.01, 1, 2.0, 5.0]) \
      .addGrid(lr.elasticNetParam, [0.0,0.5,1.0]) \
      .addGrid(lr.maxIter,[1,20]).build()
     
     )

In [51]:
evaluator=BinaryClassificationEvaluator()

cv=CrossValidator(estimator =pipe, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3
              )

In [52]:
cvModel=cv.fit(train)

In [53]:
predictions=cvModel.transform(test)

In [54]:
predictions.select("label","prediction","rawprediction","probability").show(5)

+-----+----------+--------------------+--------------------+
|label|prediction|       rawprediction|         probability|
+-----+----------+--------------------+--------------------+
|  0.0|       0.0|[3.59802814146375...|[0.97904159241010...|
|  0.0|       0.0|[3.73357652048825...|[0.97909461607386...|
|  0.0|       0.0|[3.76192468590858...|[0.97927015645051...|
|  0.0|       0.0|[3.82105163096855...|[0.97755671922479...|
|  1.0|       1.0|[-0.3607175977514...|[0.02060544476922...|
+-----+----------+--------------------+--------------------+
only showing top 5 rows



In [55]:

evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

evaluator.evaluate(predictions)

1.0