In [1]:
%%bash
pip install pyspark

if [[ ! -f ./train.csv ]]; then 
   wget https://raw.githubusercontent.com/aatishsuman/health-insurance-cross-sell-prediction/main/data/train.csv
fi

if [[ ! -f ./test.csv ]]; then 
   wget https://raw.githubusercontent.com/aatishsuman/health-insurance-cross-sell-prediction/main/data/test.csv  
fi

Collecting pyspark
  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
Collecting py4j==0.10.9
  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=82c553c9e06665aefae0472453a059dd7bdc2e68b9846b31506214c681eaeaca
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


--2020-11-15 01:03:06--  https://raw.githubusercontent.com/aatishsuman/health-insurance-cross-sell-prediction/main/data/train.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21432357 (20M) [text/plain]
Saving to: ‘train.csv’

     0K .......... .......... .......... .......... ..........  0% 3.85M 5s
    50K .......... .......... .......... .......... ..........  0% 8.85M 4s
   100K .......... .......... .......... .......... ..........  0% 4.80M 4s
   150K .......... .......... .......... .......... ..........  0% 20.2M 3s
   200K .......... .......... .......... .......... ..........  1% 6.20M 3s
   250K .......... .......... .......... .......... ..........  1% 26.7M 3s
   300K .......... .......... .......... .......... ..........  1% 23.8M 3s
   350K ...

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.ml import feature, Pipeline, regression, classification, evaluation, tuning
import numpy as np
import pandas as pd

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [3]:
train = spark.read.csv('train.csv', inferSchema=True, header=True)
test = spark.read.csv('test.csv', inferSchema=True, header=True)

print(train.toPandas().shape, test.toPandas().shape)

(381109, 12) (127037, 11)


In [4]:
train.toPandas().head()

Unnamed: 0,id,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage,Response
0,1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217,1
1,2,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183,0
2,3,Male,47,1,28.0,0,> 2 Years,Yes,38294.0,26.0,27,1
3,4,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203,0
4,5,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39,0


In [5]:
numerical_columns=['Age', 'Region_Code', 'Annual_Premium', 'Policy_Sales_Channel', 'Vintage']
categorical_columns=['Gender', 'Driving_License', 'Previously_Insured', 'Vehicle_Age', 'Vehicle_Damage', 'Response']

In [6]:
feature_engineering_pipe = Pipeline(stages=[feature.StringIndexerModel.from_labels(['Male', 'Female'], inputCol='Gender', outputCol='Gender_Feature'), 
                                            feature.StringIndexerModel.from_labels(['< 1 Year', '1-2 Year', '> 2 Years'], inputCol='Vehicle_Age', outputCol='Vehicle_Age_Feature'), 
                                            feature.StringIndexerModel.from_labels(['No', 'Yes'], inputCol='Vehicle_Damage', outputCol='Vehicle_Damage_Feature')])
train_xformed = feature_engineering_pipe.fit(train).transform(train).select(numerical_columns + 
                                                                            ['Driving_License', 'Previously_Insured', 'Response', 
                                                                             fn.col('Gender_Feature').alias('Gender'), 
                                                                             fn.col('Vehicle_Age_Feature').alias('Vehicle_Age'), 
                                                                             fn.col('Vehicle_Damage_Feature').alias('Vehicle_Damage')])

In [7]:
train_xformed.toPandas().head()

Unnamed: 0,Age,Region_Code,Annual_Premium,Policy_Sales_Channel,Vintage,Driving_License,Previously_Insured,Response,Gender,Vehicle_Age,Vehicle_Damage
0,44,28.0,40454.0,26.0,217,1,0,1,0.0,2.0,1.0
1,76,3.0,33536.0,26.0,183,1,0,0,0.0,1.0,0.0
2,47,28.0,38294.0,26.0,27,1,0,1,0.0,2.0,1.0
3,21,11.0,28619.0,152.0,203,1,1,0,0.0,0.0,0.0
4,29,41.0,27496.0,152.0,39,1,1,0,1.0,0.0,0.0


In [8]:
def get_mse(features):
  train_df, validation_df = train_xformed.randomSplit([0.9, 0.1], 42)
  pipe = Pipeline(stages=[feature.VectorAssembler(inputCols=features, outputCol='features'), regression.LinearRegression(labelCol='Response')])
  evaluator = evaluation.RegressionEvaluator(labelCol='Response', metricName='mse')
  model = pipe.fit(train_df)
  return evaluator.evaluate(model.transform(validation_df)), dict(zip(model.stages[-2].getInputCols(), model.stages[-1].summary.pValues))

def get_stepwise_pred_list():
  predictors = numerical_columns + categorical_columns[:-1]
  while (len(predictors) > 1):
    initial_mse, p_values = get_mse(predictors) 
    predictors = list(sorted(p_values, key=p_values.get, reverse=True))
    predictors.pop(0)
    mse, p_values = get_mse(predictors)
    predictors = list(sorted(p_values, key=p_values.get, reverse=True))
    if (mse >= initial_mse):
      return predictors[::-1]
  return predictors[::-1]

best_predictors = get_stepwise_pred_list()
best_predictors

['Vehicle_Damage',
 'Vehicle_Age',
 'Previously_Insured',
 'Driving_License',
 'Policy_Sales_Channel',
 'Annual_Premium',
 'Age']

In [9]:
# getting baseline
train_xformed.groupBy('Response').count().withColumn('class percentage', fn.col('count') / train_xformed.count()).show()

+--------+------+-------------------+
|Response| count|   class percentage|
+--------+------+-------------------+
|       1| 46710|0.12256336113815208|
|       0|334399|  0.877436638861848|
+--------+------+-------------------+



In [14]:
# using best predictors
train_df, validation_df = train_xformed.randomSplit([0.9, 0.1], 42)
svc_pipe = Pipeline(stages=[feature.VectorAssembler(inputCols=best_predictors, outputCol='features'), classification.LinearSVC(labelCol='Response')])
svc_model = svc_pipe.fit(train_df)
train_predictions = svc_model.transform(train_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Response', metricName='areaUnderPR')
validation_predictions = svc_model.transform(validation_df)
print('Validation PR AUC: {}'.format(evaluator.evaluate(validation_predictions)))

accuracy = validation_predictions.where('Response == prediction').count() / validation_df.count()
print('Validation accuracy {}'.format(accuracy))

Validation PR AUC: 0.3064039665524769
Validation accuracy 0.8777809580807161


In [None]:
# paramGrid = tuning.ParamGridBuilder().addGrid(svc.maxIter, [100, 500]).addGrid(svc.regParam, [0, 0.001, 0.01, 0.1]).build()
# Validation PR AUC: 0.2947909030465189
# Best params: [maxIter: 100, regParam: 0.1]

# paramGrid = tuning.ParamGridBuilder().addGrid(svc.maxIter, [100, 500]).addGrid(svc.regParam, [0.1, 1, 10, 100]).build()
# Validation PR AUC: 0.25585751136031026
# Best params: [maxIter: 100, regParam: 1.0]

In [18]:
%%time
svc = classification.LinearSVC(labelCol='Response')
svc_pipe_cross_val = Pipeline(stages=[feature.VectorAssembler(inputCols=best_predictors, outputCol='features'), svc])
paramGrid = tuning.ParamGridBuilder().addGrid(svc.maxIter, [100, 500]).addGrid(svc.regParam, [0.1, 1, 10, 100]).build()
crossval = tuning.CrossValidator(estimator=svc_pipe_cross_val, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)
svc_model_cross_val = crossval.fit(train_df)
print('Validation PR AUC: {}'.format(evaluator.evaluate(svc_model_cross_val.transform(validation_df))))
print('Best params: [maxIter: {}, regParam: {}]'.format(svc_model_cross_val.bestModel.stages[-1].getMaxIter(), svc_model_cross_val.bestModel.stages[-1].getRegParam()))

Validation PR AUC: 0.25585751136031026
Best params: [maxIter: 100, regParam: 1.0]
CPU times: user 2.66 s, sys: 538 ms, total: 3.19 s
Wall time: 21min 37s


In [20]:
# using best params
svc_pipe = Pipeline(stages=[feature.VectorAssembler(inputCols=best_predictors, outputCol='features'), classification.LinearSVC(labelCol='Response', regParam=0.1)])
svc_model = svc_pipe.fit(train_df)
train_predictions = svc_model.transform(train_df)
validation_predictions = svc_model.transform(validation_df)
print('Validation PR AUC: {}'.format(evaluator.evaluate(validation_predictions)))

accuracy = validation_predictions.where('Response == prediction').count() / validation_df.count()
print('Validation accuracy {}'.format(accuracy))

Validation PR AUC: 0.2947907524573733
Validation accuracy 0.8781452473263771


In [17]:
# using best predictors + Gender
svc_pipe = Pipeline(stages=[feature.VectorAssembler(inputCols=best_predictors + ['Gender'], outputCol='features'), classification.LinearSVC(labelCol='Response')])
svc_model = svc_pipe.fit(train_df)
train_predictions = svc_model.transform(train_df)
validation_predictions = svc_model.transform(validation_df)
print('Validation PR AUC: {}'.format(evaluator.evaluate(validation_predictions)))

accuracy = validation_predictions.where('Response == prediction').count() / validation_df.count()
print('Validation accuracy {}'.format(accuracy))

Validation PR AUC: 0.26827374897861456
Validation accuracy 0.8768962556269678
