### Objective 

#### Setting up pyspark 

In [1]:
# Import packages
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf

# Setup pyspark sc
conf = SparkConf().setAll([('spark.executor.memory', '2g')])
sc =SparkContext(conf=conf)
sqlContext = SQLContext(sc)

### Dataset 

Command to copy data file into HADOOP

### Data read and analysis

In [2]:
# data loading
data = sqlContext.read.format('com.databricks.spark.csv').\
        options(header='true', inferschema='true').\
        load('Attachment_1635667446.csv')

In [3]:
# columns schema of the data
data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



##### Patients count by various categories 

In [4]:
# Count by sex
data.groupBy('sex').count().show()

+------+-----+
|   sex|count|
+------+-----+
|female|  662|
|  male|  676|
+------+-----+



In [5]:
# Count by dependent childrens
data.groupBy('children').count().show()

+--------+-----+
|children|count|
+--------+-----+
|       1|  324|
|       3|  157|
|       5|   18|
|       4|   25|
|       2|  240|
|       0|  574|
+--------+-----+



In [6]:
# Count by smoker type
data.groupBy('smoker').count().show()

+------+-----+
|smoker|count|
+------+-----+
|    no| 1064|
|   yes|  274|
+------+-----+



In [7]:
# Count by region
data.groupBy('region').count().show()

+---------+-----+
|   region|count|
+---------+-----+
|northwest|  325|
|southeast|  364|
|northeast|  324|
|southwest|  325|
+---------+-----+



### Data Transformation 

In [8]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [9]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
# String Indexing
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(data) for column in list(set(data.columns)-set(['Provider_Id, Provider_Zip_Code, Total_Discharges, Average_Covered_Charges, Average_Total_Payments, Average_Medicare_Payments'])) ]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(data).transform(data)
df_r.show(5)

+---+------+------+--------+------+---------+-----------+---------+------------+-------------+---------+------------+---------+--------------+
|age|   sex|   bmi|children|smoker|   region|    charges|sex_index|smoker_index|charges_index|bmi_index|region_index|age_index|children_index|
+---+------+------+--------+------+---------+-----------+---------+------------+-------------+---------+------------+---------+--------------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|      1.0|         1.0|        340.0|    412.0|         2.0|      1.0|           0.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|      0.0|         0.0|        358.0|    283.0|         0.0|      0.0|           1.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|      0.0|         0.0|        891.0|     32.0|         0.0|     17.0|           3.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061|      0.0|         0.0|        500.0|    130.0|         1.0|     30.0|           0.0|

In [10]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Feature assembler
assembler = VectorAssembler(
    inputCols=["age","children","bmi","region_index", "smoker_index", "sex_index"], outputCol="features")
output = assembler.transform(df_r)
output.select('features', 'charges').show(10)

+--------------------+-----------+
|            features|    charges|
+--------------------+-----------+
|[19.0,0.0,27.9,2....|  16884.924|
|[18.0,1.0,33.77,0...|  1725.5523|
|[28.0,3.0,33.0,0....|   4449.462|
|[33.0,0.0,22.705,...|21984.47061|
|[32.0,0.0,28.88,1...|  3866.8552|
|[31.0,0.0,25.74,0...|  3756.6216|
|[46.0,1.0,33.44,0...|  8240.5896|
|[37.0,3.0,27.74,1...|  7281.5056|
|[37.0,2.0,29.83,3...|  6406.4107|
|[60.0,0.0,25.84,1...|28923.13692|
+--------------------+-----------+
only showing top 10 rows



### Data Splitting 

In [11]:
# data split for modelling
splits = output.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

### Model Training 

In [12]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
# Model training
lr = LinearRegression(maxIter=5, regParam=0.0, labelCol='charges', solver="normal")
mymodel = lr.fit(train_df)

In [13]:
# Model training summary
trainingSummary = mymodel.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 5969.818040
r2: 0.759403


### Model Predictions 

In [14]:
# predictions on the test set using trained model
predictions = mymodel.transform(test_df)
predictions.show(5)

+---+------+------+--------+------+---------+----------+---------+------------+-------------+---------+------------+---------+--------------+--------------------+------------------+
|age|   sex|   bmi|children|smoker|   region|   charges|sex_index|smoker_index|charges_index|bmi_index|region_index|age_index|children_index|            features|        prediction|
+---+------+------+--------+------+---------+----------+---------+------------+-------------+---------+------------+---------+--------------+--------------------+------------------+
| 18|female| 21.66|       0|   yes|northeast|14283.4594|      1.0|         1.0|        267.0|    128.0|         3.0|      0.0|           0.0|[18.0,0.0,21.66,3...| 24453.53104470655|
| 18|female|28.215|       0|    no|northeast|2200.83085|      1.0|         0.0|        501.0|     89.0|         3.0|      0.0|           0.0|[18.0,0.0,28.215,...|2319.2883886873988|
| 18|female|30.115|       0|    no|northeast|21344.8467|      1.0|         0.0|        479

### Model Accuracy Measurement 

In [15]:
evaluator = RegressionEvaluator(labelCol="charges")
rmse = evaluator.evaluate(predictions,{evaluator.metricName:"rmse" })

import numpy as np
np.sqrt(rmse), rmse

(79.18602899510151, 6270.427188013057)

In [16]:
print("R Squared (R2) on test data = %g" % evaluator.evaluate(predictions,{evaluator.metricName:"r2" }))

R Squared (R2) on test data = 0.724998


### Conclusion 