## Objective is to predict the Claim_Amount using the machine learning model in pyspark.

### Update Path with SPARK_HOME and PYLIB environment variables

In [1]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

### Initialize Spark

In [2]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Application Name").setMaster("local")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

### Load all dependent libararies here

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.ml import regression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pandas as pd

## Read & Understand the Data

### Defining Schema to given data

### Create a dataframe for given data

In [4]:
inputData = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .option("ignoreTrailingWhiteSpace", "true")\
            .option("ignoreLeadingWhiteSpace", "true")\
            .option("quote", '"')\
            .load("file:///home/1843B39/CUTe_7322c/sample_set.csv")


In [5]:
inputData.printSchema()

root
 |-- Row_ID: integer (nullable = true)
 |-- Household_ID: integer (nullable = true)
 |-- Vehicle: integer (nullable = true)
 |-- Calendar_Year: integer (nullable = true)
 |-- Model_Year: integer (nullable = true)
 |-- Blind_Make: string (nullable = true)
 |-- Blind_Model: string (nullable = true)
 |-- Blind_Submodel: string (nullable = true)
 |-- Cat1: string (nullable = true)
 |-- Cat2: string (nullable = true)
 |-- Cat3: string (nullable = true)
 |-- Cat4: string (nullable = true)
 |-- Cat5: string (nullable = true)
 |-- Cat6: string (nullable = true)
 |-- Cat7: string (nullable = true)
 |-- Cat8: string (nullable = true)
 |-- Cat9: string (nullable = true)
 |-- Cat10: string (nullable = true)
 |-- Cat11: string (nullable = true)
 |-- Cat12: string (nullable = true)
 |-- OrdCat: string (nullable = true)
 |-- Var1: double (nullable = true)
 |-- Var2: double (nullable = true)
 |-- Var3: double (nullable = true)
 |-- Var4: double (nullable = true)
 |-- Var5: double (nullable = true

### Verify summary of the dataframe (how many rows and columns)

In [6]:
inputData.toPandas().describe(include = "all").transpose()

Unnamed: 0,count,unique,top,freq,mean,std,min,25%,50%,75%,max
Row_ID,1000000.0,,,,6583190.0,3806670.0,1.0,3283080.0,6582030.0,9875290.0,13184200.0
Household_ID,1000000.0,,,,4122830.0,2249240.0,1.0,2180650.0,4253850.0,6277600.0,7542060.0
Vehicle,1000000.0,,,,1.89136,1.17046,1.0,1.0,2.0,2.0,24.0
Calendar_Year,1000000.0,,,,2006.05,0.812612,2005.0,2005.0,2006.0,2007.0,2007.0
Model_Year,1000000.0,,,,1999.31,5.20987,1981.0,1996.0,2000.0,2003.0,2008.0
Blind_Make,1000000.0,72.0,K,125618.0,,,,,,,
Blind_Model,1000000.0,1206.0,K.7,45232.0,,,,,,,
Blind_Submodel,1000000.0,2512.0,K.7.3,12606.0,,,,,,,
Cat1,1000000.0,11.0,B,304921.0,,,,,,,
Cat2,1000000.0,4.0,C,447076.0,,,,,,,


In [7]:
inputData.count(), len(inputData.columns)

(1000000, 35)

In [8]:
inputData.show(5)

+--------+------------+-------+-------------+----------+----------+-----------+--------------+----+----+----+----+----+----+----+----+----+-----+-----+-----+------+----------+----------+----------+----------+----------+----------+----------+----------+-----+----------+----------+----------+----------+------------+
|  Row_ID|Household_ID|Vehicle|Calendar_Year|Model_Year|Blind_Make|Blind_Model|Blind_Submodel|Cat1|Cat2|Cat3|Cat4|Cat5|Cat6|Cat7|Cat8|Cat9|Cat10|Cat11|Cat12|OrdCat|      Var1|      Var2|      Var3|      Var4|      Var5|      Var6|      Var7|      Var8|NVCat|    NVVar1|    NVVar2|    NVVar3|    NVVar4|Claim_Amount|
+--------+------------+-------+-------------+----------+----------+-----------+--------------+----+----+----+----+----+----+----+----+----+-----+-----+-----+------+----------+----------+----------+----------+----------+----------+----------+----------+-----+----------+----------+----------+----------+------------+
|11133137|     6690958|      1|         2006|      1

###  As the '?' describes the unknown data in each column, lets identify how many columns exists with '?' (NAs)

In [9]:
# Display NA values prior to replacing ?
inputData.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in inputData.columns]).toPandas().transpose()

Unnamed: 0,0
Row_ID,0
Household_ID,0
Vehicle,0
Calendar_Year,0
Model_Year,0
Blind_Make,0
Blind_Model,0
Blind_Submodel,0
Cat1,0
Cat2,0


In [10]:
inputData.select([count(when(col(c) == "?", c)).alias(c) for c in inputData.columns]).toPandas().transpose()

Unnamed: 0,0
Row_ID,0
Household_ID,0
Vehicle,0
Calendar_Year,0
Model_Year,0
Blind_Make,653
Blind_Model,653
Blind_Submodel,653
Cat1,1995
Cat2,369621


### Replace all '?' values in the result data frame as NAs.

In [11]:
for c in inputData.columns:
    inputData = inputData.withColumn(c, when(inputData[c] == "?", float('nan')).otherwise(inputData[c]))

In [12]:
# Display NA values after replacing
inputData.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in inputData.columns]).show()

+------+------------+-------+-------------+----------+----------+-----------+--------------+----+------+----+------+------+----+------+----+----+-----+-----+-----+------+----+----+----+----+----+----+----+----+-----+------+------+------+------+------------+
|Row_ID|Household_ID|Vehicle|Calendar_Year|Model_Year|Blind_Make|Blind_Model|Blind_Submodel|Cat1|  Cat2|Cat3|  Cat4|  Cat5|Cat6|  Cat7|Cat8|Cat9|Cat10|Cat11|Cat12|OrdCat|Var1|Var2|Var3|Var4|Var5|Var6|Var7|Var8|NVCat|NVVar1|NVVar2|NVVar3|NVVar4|Claim_Amount|
+------+------------+-------+-------------+----------+----------+-----------+--------------+----+------+----+------+------+----+------+----+----+-----+-----+-----+------+----+----+----+----+----+----+----+----+-----+------+------+------+------+------------+
|     0|           0|      0|            0|         0|       653|        653|           653|1995|369621| 325|426569|426969|1995|543322| 278|   0|  314| 2316| 2134|   568|   0|   0|   0|   0|   0|   0|   0|   0|    0|     0|   

###  Remove all the columns where the no. of rows with '?' exceeds 35% for that column.

In [2]:
#inputData = inputData.drop('Cat2', "Cat4", "Cat5", "Cat7")

### Fill remaining Null Values with Zeroes in the entire dataset, if any.

In [13]:
for c in inputData.columns:
    inputData = inputData.withColumn(c, when(isnan(inputData[c]) | col(c).isNull(), 0).otherwise(inputData[c]))

In [14]:
# Check if na values are replaced
inputData.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in inputData.columns]).show()

+------+------------+-------+-------------+----------+----------+-----------+--------------+----+----+----+----+----+----+----+----+----+-----+-----+-----+------+----+----+----+----+----+----+----+----+-----+------+------+------+------+------------+
|Row_ID|Household_ID|Vehicle|Calendar_Year|Model_Year|Blind_Make|Blind_Model|Blind_Submodel|Cat1|Cat2|Cat3|Cat4|Cat5|Cat6|Cat7|Cat8|Cat9|Cat10|Cat11|Cat12|OrdCat|Var1|Var2|Var3|Var4|Var5|Var6|Var7|Var8|NVCat|NVVar1|NVVar2|NVVar3|NVVar4|Claim_Amount|
+------+------------+-------+-------------+----------+----------+-----------+--------------+----+----+----+----+----+----+----+----+----+-----+-----+-----+------+----+----+----+----+----+----+----+----+-----+------+------+------+------+------------+
|     0|           0|      0|            0|         0|         0|          0|             0|   0|   0|   0|   0|   0|   0|   0|   0|   0|    0|    0|    0|     0|   0|   0|   0|   0|   0|   0|   0|   0|    0|     0|     0|     0|     0|           0|


### Derive a new column Vehicle_age = Current Year - Model year

In [15]:
inputData = inputData.withColumn("Vehicle_age", inputData.Calendar_Year - inputData.Model_Year)

In [16]:
inputData.show(5)

+-----------+------------+-------+-------------+----------+----------+-----------+--------------+----+----+----+----+----+----+----+----+----+-----+-----+-----+------+----------+----------+----------+----------+----------+----------+----------+----------+-----+----------+----------+----------+----------+------------+-----------+
|     Row_ID|Household_ID|Vehicle|Calendar_Year|Model_Year|Blind_Make|Blind_Model|Blind_Submodel|Cat1|Cat2|Cat3|Cat4|Cat5|Cat6|Cat7|Cat8|Cat9|Cat10|Cat11|Cat12|OrdCat|      Var1|      Var2|      Var3|      Var4|      Var5|      Var6|      Var7|      Var8|NVCat|    NVVar1|    NVVar2|    NVVar3|    NVVar4|Claim_Amount|Vehicle_age|
+-----------+------------+-------+-------------+----------+----------+-----------+--------------+----+----+----+----+----+----+----+----+----+-----+-----+-----+------+----------+----------+----------+----------+----------+----------+----------+----------+-----+----------+----------+----------+----------+------------+-----------+
|1.1133

### Extract all the non-zero target records from the dataset(> 0) into a new dataframe and verify the number of rows.

In [17]:
inputData_non_zero = inputData.filter(inputData.Claim_Amount > 0.0)

In [18]:
inputData_non_zero.count()

7249

In [19]:
### Remove the columns Row_ID, Household_ID and Vehicle from the original dataframe.
inputData_non_zero = inputData_non_zero.drop('Row_ID', 'Household_ID', 'Vehicle')
inputData_non_zero.dtypes

[('Calendar_Year', 'double'),
 ('Model_Year', 'double'),
 ('Blind_Make', 'string'),
 ('Blind_Model', 'string'),
 ('Blind_Submodel', 'string'),
 ('Cat1', 'string'),
 ('Cat2', 'string'),
 ('Cat3', 'string'),
 ('Cat4', 'string'),
 ('Cat5', 'string'),
 ('Cat6', 'string'),
 ('Cat7', 'string'),
 ('Cat8', 'string'),
 ('Cat9', 'string'),
 ('Cat10', 'string'),
 ('Cat11', 'string'),
 ('Cat12', 'string'),
 ('OrdCat', 'string'),
 ('Var1', 'double'),
 ('Var2', 'double'),
 ('Var3', 'double'),
 ('Var4', 'double'),
 ('Var5', 'double'),
 ('Var6', 'double'),
 ('Var7', 'double'),
 ('Var8', 'double'),
 ('NVCat', 'string'),
 ('NVVar1', 'double'),
 ('NVVar2', 'double'),
 ('NVVar3', 'double'),
 ('NVVar4', 'double'),
 ('Claim_Amount', 'double'),
 ('Vehicle_age', 'double')]

In [20]:
def getCorrDetails(inputData):
    dfColumns = ["Column1", "Column2", "Correlation"]
    correlationDF = pd.DataFrame(columns=dfColumns)
    columns1 = [d[0] for d in inputData.dtypes if d[1] != 'string']
    columns2 = [d[0] for d in inputData.dtypes if d[1] != 'string']
    if(len(columns1) > 1):
        for col1 in columns1:
            for col2 in columns2:
                if col1 != col2:
                    corrValue = inputData.stat.corr(col1, col2)
                    tempDF = pd.DataFrame([[col1, col2, corrValue]], columns=dfColumns)
                    correlationDF = correlationDF.append(tempDF)
            columns2.remove(col1)
    correlationDF = correlationDF.sort_values(by = "Correlation", ascending=False) 
    return correlationDF

### Check Correlation of numeric attributes

In [31]:
# Run correlation of numeric columns
print(getCorrDetails(inputData_non_zero))

### Create two new dataframes
1. train_DF, For the data where Calendar_Year = 2005 and 2006
2. test_DF, For the data where Calendar_Year = 2007

### Train & Validation Split for data

In [22]:
train_DF = inputData_non_zero.filter((inputData_non_zero.Calendar_Year != 2007.0))
test_DF = inputData_non_zero.filter((inputData_non_zero.Calendar_Year == 2007.0))

### Drop Calendar_year in train & test as train & test has distinct values

In [25]:
train_DF = train_DF.drop("Calendar_Year")
test_DF = test_DF.drop("Calendar_Year")

In [26]:
train_DF.count()

4644

In [27]:
test_DF.count()

2605

### Separate into Categorical and Continuous attributes.

In [32]:
numColumns = [d[0] for d in train_DF.dtypes if d[1] != 'string']
catColumns = [d[0] for d in train_DF.dtypes if d[1] == 'string']

In [33]:
labelColumn = "Claim_Amount"
numColumns.remove(labelColumn)
print("Numeric Columns : ", numColumns)
print("Categorical Columns : ", catColumns)
print("Target Columns : ", labelColumn)


('Numeric Columns : ', ['Model_Year', 'Var1', 'Var2', 'Var3', 'Var4', 'Var5', 'Var6', 'Var7', 'Var8', 'NVVar1', 'NVVar2', 'NVVar3', 'NVVar4', 'Vehicle_age'])
('Categorical Columns : ', ['Blind_Make', 'Blind_Model', 'Blind_Submodel', 'Cat1', 'Cat2', 'Cat3', 'Cat4', 'Cat5', 'Cat6', 'Cat7', 'Cat8', 'Cat9', 'Cat10', 'Cat11', 'Cat12', 'OrdCat', 'NVCat'])
('Target Columns : ', 'Claim_Amount')


In [34]:
## Columns with different categories in train & test
for c in catColumns:
    print(c, train_DF.select(c).distinct().count(), test_DF.select(c).distinct().count())

('Blind_Make', 48, 47)
('Blind_Model', 502, 421)
('Blind_Submodel', 875, 681)
('Cat1', 11, 11)
('Cat2', 4, 4)
('Cat3', 7, 6)
('Cat4', 4, 4)
('Cat5', 4, 4)
('Cat6', 6, 6)
('Cat7', 5, 5)
('Cat8', 4, 3)
('Cat9', 2, 2)
('Cat10', 4, 4)
('Cat11', 7, 7)
('Cat12', 6, 6)
('OrdCat', 7, 8)
('NVCat', 15, 15)


### Build Pipeline for pre-processing stages

1. Combine numeric columns into single vector column using vector assembler
2. Scale numeric features using StandardScalr
3. Convert Categorical to numeric using One Hot Encoder, String Indexer and combine them using vector assembler

In [35]:
numAssembler = VectorAssembler(inputCols=numColumns, outputCol="numFeatures")
numScaler = StandardScaler(inputCol="numFeatures", outputCol="numScaled", withMean=True, withStd=True)

In [36]:
catIndexer = [StringIndexer(inputCol=catColumn, outputCol="{0}_Indexed".format(catColumn), handleInvalid="skip") for catColumn in catColumns]
catEncoder = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_Encoded".format(indexer.getInputCol())) for indexer in catIndexer]
catAssembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in catEncoder], outputCol="catFeatures")

In [37]:
numCatAssembler = VectorAssembler(inputCols=['numScaled', 'catFeatures'], outputCol="features")

In [38]:
preProcessingStages = [numAssembler] + [numScaler] + catIndexer + catEncoder + [catAssembler] + [numCatAssembler] 

## Model Building & Evaluation

### Build Linear Regression Model

In [39]:
# Function to evaluation model
evaluator = RegressionEvaluator(labelCol=labelColumn, predictionCol="prediction") 
metricColumns = ["Model", "Data", "R_Square", "RMSE", "MSE"]
modelMetrics = pd.DataFrame(columns=metricColumns)

In [40]:
def getModelMetrics(model, data, dataSetType, modelName, modelMetrics):
    predsAndLabel = model.transform(data).select("prediction", labelColumn)
    r2 = evaluator.evaluate(predsAndLabel, {evaluator.metricName : "r2"})
    rmse = evaluator.evaluate(predsAndLabel, {evaluator.metricName : "rmse"})
    mse = evaluator.evaluate(predsAndLabel, {evaluator.metricName : "mse"})
    tempDF = pd.DataFrame([[modelName, dataSetType, r2, rmse, mse]], columns=metricColumns)
    modelMetrics = modelMetrics.append(tempDF)
    return modelMetrics

In [41]:
lr = regression.LinearRegression(maxIter=10, labelCol=labelColumn, featuresCol="features")
lrPipeline = Pipeline(stages=preProcessingStages + [lr])
lrPipelineModel = lrPipeline.fit(train_DF)

In [42]:
print(getModelMetrics(lrPipelineModel, train_DF, "Train", "LR_Model", modelMetrics))
print(getModelMetrics(lrPipelineModel, test_DF, "Train", "LR_Model", modelMetrics))

      Model   Data  R_Square        RMSE            MSE
0  LR_Model  Train  0.186468  410.993799  168915.902635
      Model   Data  R_Square        RMSE            MSE
0  LR_Model  Train -0.401938  372.488507  138747.687981


### 16. Display the model summary.

In [43]:
lrModelSummary = lrPipelineModel.stages[-1].summary
lrModelSummary.r2

0.1864677745206953

In [44]:
objectiveHistory = lrModelSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

objectiveHistory:
0.5
0.421569780567
0.417363411713
0.410537027443
0.409700676214
0.408030391178
0.407609824535
0.407361552439
0.407158795379
0.406861693104
0.40676611274


### Build Decision Tree regressor

In [45]:
dt = regression.DecisionTreeRegressor(maxDepth=6, labelCol=labelColumn,  featuresCol="features")
dtPipeline = Pipeline(stages=preProcessingStages + [dt])
dtPipelineModel = dtPipeline.fit(train_DF)

In [46]:
print(getModelMetrics(dtPipelineModel, train_DF, "Train", "DT_Model", modelMetrics))
print(getModelMetrics(dtPipelineModel, test_DF, "Train", "DT_Model", modelMetrics))

      Model   Data  R_Square        RMSE            MSE
0  DT_Model  Train  0.214941  403.737539  163004.000444
      Model   Data  R_Square        RMSE            MSE
0  DT_Model  Train -0.752854  416.506112  173477.341204


### Build Random Forest Regressor

In [47]:
rf = regression.RandomForestRegressor(numTrees=20, maxDepth=4, seed=42, labelCol=labelColumn, featuresCol="features")
rfPipeline = Pipeline(stages=preProcessingStages + [rf])
rfPipelineModel = rfPipeline.fit(train_DF)

In [48]:
print(getModelMetrics(rfPipelineModel, train_DF, "Train", "RF_Model", modelMetrics))
print(getModelMetrics(rfPipelineModel, test_DF, "Train", "RF_Model", modelMetrics))

      Model   Data  R_Square        RMSE            MSE
0  RF_Model  Train  0.136878  423.334916  179212.451185
      Model   Data  R_Square        RMSE            MSE
0  RF_Model  Train -0.086342  327.892698  107513.621243


### 18. Apply Cross validation technique and tune your models.

### Tuning LR Model

In [50]:
lrPrameterGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1,0.2])\
              .addGrid(lr.elasticNetParam, [0.2, 0.5])\
              .addGrid(lr.maxIter, [10, 15])\
              .build()
lrCrossVal = CrossValidator(estimator=lrPipeline,
                           estimatorParamMaps=lrPrameterGrid,
                           evaluator=evaluator,
                           numFolds=2,
                           parallelism=3)
lrCrossValModel = lrCrossVal.fit(train_DF)

In [52]:
print(getModelMetrics(lrCrossValModel.bestModel, train_DF, "Train", "LR_TUNED", modelMetrics))
print(getModelMetrics(lrCrossValModel.bestModel, test_DF, "Train", "LR_TUNED", modelMetrics))

      Model   Data  R_Square        RMSE           MSE
0  LR_TUNED  Train  0.186397  411.011655  168930.58062
      Model   Data  R_Square        RMSE            MSE
0  LR_TUNED  Train -0.384619  370.180605  137033.680393


In [55]:
print(lrCrossValModel.bestModel.stages[-1].summary.r2)

0.186397082459


### Tuning DT Model

In [56]:
paramGridDT = ParamGridBuilder().addGrid(dt.maxDepth, [4, 6, 8]).build()
dtCrossVal = CrossValidator(estimator=dtPipeline,
                            estimatorParamMaps=paramGridDT,
                            evaluator=evaluator,
                            numFolds=2,
                            parallelism=10) 
dtCrossValModel = dtCrossVal.fit(train_DF)

### 19. Compute both train and test error metrics for all the models.
### 20. Calculate Root Mean Squared Error (RMSE) and R-Square values.


In [57]:
print(getModelMetrics(dtCrossValModel.bestModel, train_DF, "Train", "DT_CV_Best", modelMetrics))
print(getModelMetrics(dtCrossValModel.bestModel, test_DF, "Test", "DT_CV_Best", modelMetrics))

        Model   Data  R_Square        RMSE            MSE
0  DT_CV_Best  Train  0.135341  423.711517  179531.449683
        Model  Data  R_Square        RMSE            MSE
0  DT_CV_Best  Test -0.403867  372.744737  138938.638834
