#Data Preparing

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myproj').getOrCreate()
data = spark.read.csv('/FileStore/tables/train.csv',inferSchema=True,header=True)
data.printSchema()

In [0]:
# look at numerical values in a string column
data.show()

# Check Missing Value

In [0]:
# Check Missing Value
from pyspark.sql.functions import *
from pyspark.sql.functions import when, count, col
df = data.select(['Response',  'Gender',  'Age',  'Driving_License',  'Region_Code',  'Previously_Insured', 'Vehicle_Age', 'Vehicle_Damage','Annual_Premium', 'Policy_Sales_Channel','Vintage'])
na_report=df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])# dimensions of the dataframe
print("Number of Rows: ",df.count() ,"   Number of Columns: ", len(df.columns))
na_report.show()

In [0]:
# No missing value in the dataset

In [0]:
# Check Numeric Variables to identify outliers
df.select('Age','Annual_Premium','Vintage').describe().show()

# Check Outliers

######Calculate Lower Bound and Upper Bound

In [0]:
quantiles = {
    c: dict(
        zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in ['Age','Annual_Premium','Vintage']
}
quantiles

In [0]:
for i in quantiles:
    iqr = quantiles[i]['q3'] - quantiles[i]['q1']
    quantiles[i]['lower_bound'] = quantiles[i]['q1'] - (iqr * 1.5)
    quantiles[i]['upper_bound'] = quantiles[i]['q3'] + (iqr * 1.5)
print(quantiles)

######Drop Outliers

In [0]:
# Select Age: 0-85, Annual_Premium: 1912.5-61892.5, Vintage: 0-444.5
import pyspark.sql.functions as f
df_clean=df.select(
    "*",
    *[
        f.when(
            f.col(c).between(quantiles[c]['lower_bound'], quantiles[c]['upper_bound']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in ['Age','Annual_Premium','Vintage']
    ]
)
df_clean.show(10)

In [0]:
from pyspark.sql.functions import col
df_clean_1=df_clean.withColumn("outliers", col("Age_out")+col("Annual_Premium_out")+col("Vintage_out"))
df_clean_1.show()

In [0]:
# dropping outliers
df_clean_1 = df_clean_1.filter((df_clean_1.outliers == 0) )
df_clean_1 = df_clean_1.select(['Response',  'Gender',  'Age',  'Driving_License',  'Region_Code',  'Previously_Insured', 'Vehicle_Age', 'Vehicle_Damage', 'Annual_Premium', 'Policy_Sales_Channel','Vintage'])
df_clean_1.select('Age','Annual_Premium','Vintage').describe().show()

In [0]:
#Percentage of drooping
import numpy as np
print("proportion of the lost Rows: ",np.round((df_clean.count()-df_clean_1.count())/df_clean.count(),4))

#Descriptive Analysis and Visualization

In [0]:
#Check Data Balance 
df_clean_1.registerTempTable("dataclean1")
display(sqlContext.sql("select * from dataclean1"))

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
1,Male,47,1,28.0,0,> 2 Years,Yes,38294.0,26.0,27
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
1,Female,56,1,28.0,0,1-2 Year,Yes,32031.0,26.0,72
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80


In [0]:
df_clean_1.groupby('Response').count().toPandas()
# Should do resampling later

Unnamed: 0,Response,count
0,1,45155
1,0,325634


##### Resampling Dependent Variables

In [0]:
major_df = df_clean_1.filter(col("Response") == 0)
minor_df = df_clean_1.filter(col("Response") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

In [0]:
a = range(ratio)
# duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')
# combine both oversampled minority rows and previous majority rows 
oversampled_df = major_df.unionAll(oversampled_df)
oversampled_df.show()

In [0]:
oversampled_df.groupby('Response').count().toPandas()

Unnamed: 0,Response,count
0,0,325634
1,1,316085


In [0]:
# distribution of cities in database for survival
oversampled_df.registerTempTable("oversampleddf")
display(sqlContext.sql("select * from oversampleddf"))

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80
0,Female,24,1,50.0,1,< 1 Year,No,48699.0,152.0,289
0,Female,41,1,15.0,1,1-2 Year,No,31409.0,14.0,221
0,Male,76,1,28.0,0,1-2 Year,Yes,36770.0,13.0,15


# Visualization

In [0]:
# Distribution of Gender 
display(sqlContext.sql("select * from oversampleddf"))
#Result for Gender is nearly balanced

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80
0,Female,24,1,50.0,1,< 1 Year,No,48699.0,152.0,289
0,Female,41,1,15.0,1,1-2 Year,No,31409.0,14.0,221
0,Male,76,1,28.0,0,1-2 Year,Yes,36770.0,13.0,15


In [0]:
#Distribution of Driving_License
display(sqlContext.sql("select * from oversampleddf"))

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80
0,Female,24,1,50.0,1,< 1 Year,No,48699.0,152.0,289
0,Female,41,1,15.0,1,1-2 Year,No,31409.0,14.0,221
0,Male,76,1,28.0,0,1-2 Year,Yes,36770.0,13.0,15


In [0]:
# Resampling is not necessary as the data might change after resampling, we cannot drop the raw with 0 due to the effect.  

In [0]:
#Distribution of Previous_Insured
display(sqlContext.sql("select * from oversampleddf"))
#Data set has more "1" in this column, but resampling might affect the importance of the feature and influence feture selection procedure.  

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80
0,Female,24,1,50.0,1,< 1 Year,No,48699.0,152.0,289
0,Female,41,1,15.0,1,1-2 Year,No,31409.0,14.0,221
0,Male,76,1,28.0,0,1-2 Year,Yes,36770.0,13.0,15


In [0]:
#Distrbution of Vehicle_Damage
display(sqlContext.sql("select * from oversampleddf"))
#Data in this column is almost balanced.  

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80
0,Female,24,1,50.0,1,< 1 Year,No,48699.0,152.0,289
0,Female,41,1,15.0,1,1-2 Year,No,31409.0,14.0,221
0,Male,76,1,28.0,0,1-2 Year,Yes,36770.0,13.0,15


In [0]:
#Distribution of Region_Code
display(sqlContext.sql("select * from oversampleddf"))
#The distribution is very unbalanced, therefore, set group for them will be useful.  

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80
0,Female,24,1,50.0,1,< 1 Year,No,48699.0,152.0,289
0,Female,41,1,15.0,1,1-2 Year,No,31409.0,14.0,221
0,Male,76,1,28.0,0,1-2 Year,Yes,36770.0,13.0,15


In [0]:
#Distribution of Policy_Sales_Channel 
display(sqlContext.sql("select * from oversampleddf"))
#Set groups for the channels can be used to reduce the features.  

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80
0,Female,24,1,50.0,1,< 1 Year,No,48699.0,152.0,289
0,Female,41,1,15.0,1,1-2 Year,No,31409.0,14.0,221
0,Male,76,1,28.0,0,1-2 Year,Yes,36770.0,13.0,15


In [0]:
# Distribution of Age
display(sqlContext.sql("select * from oversampleddf"))

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80
0,Female,24,1,50.0,1,< 1 Year,No,48699.0,152.0,289
0,Female,41,1,15.0,1,1-2 Year,No,31409.0,14.0,221
0,Male,76,1,28.0,0,1-2 Year,Yes,36770.0,13.0,15


In [0]:
# Distrobution of Annual_Premium
display(sqlContext.sql("select * from oversampleddf"))

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80
0,Female,24,1,50.0,1,< 1 Year,No,48699.0,152.0,289
0,Female,41,1,15.0,1,1-2 Year,No,31409.0,14.0,221
0,Male,76,1,28.0,0,1-2 Year,Yes,36770.0,13.0,15


In [0]:
# Distribution of Vintage
display(sqlContext.sql("select * from oversampleddf"))
# Since most poeple in the dataset have driving license, we might consider regrouping it or drop the column.  

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
0,Male,76,1,3.0,0,1-2 Year,No,33536.0,26.0,183
0,Male,21,1,11.0,1,< 1 Year,No,28619.0,152.0,203
0,Female,29,1,41.0,1,< 1 Year,No,27496.0,152.0,39
0,Female,24,1,33.0,0,< 1 Year,Yes,2630.0,160.0,176
0,Male,23,1,11.0,0,< 1 Year,Yes,23367.0,152.0,249
0,Female,24,1,3.0,1,< 1 Year,No,27619.0,152.0,28
0,Female,32,1,6.0,1,< 1 Year,No,28771.0,152.0,80
0,Female,24,1,50.0,1,< 1 Year,No,48699.0,152.0,289
0,Female,41,1,15.0,1,1-2 Year,No,31409.0,14.0,221
0,Male,76,1,28.0,0,1-2 Year,Yes,36770.0,13.0,15


In [0]:
# Split 'reponse' into two groups since we would like to know more about the relationship for Response = 1
df_response_0 = oversampled_df.filter(col("Response") == 0)
df_response_1 = oversampled_df.filter(col("Response") == 1)

In [0]:
#Distribution of Previous_Insured (Response = 1)
df_response_1.registerTempTable("dfresponse1")
display(sqlContext.sql("select * from dfresponse1"))

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,47,1,28.0,0,> 2 Years,Yes,38294.0,26.0,27
1,Male,47,1,28.0,0,> 2 Years,Yes,38294.0,26.0,27
1,Male,47,1,28.0,0,> 2 Years,Yes,38294.0,26.0,27


In [0]:
#Distribution for Vehicle_Damage (Response = 1)
df_response_1.registerTempTable("dfresponse1")
display(sqlContext.sql("select * from dfresponse1"))

Response,Gender,Age,Driving_License,Region_Code,Previously_Insured,Vehicle_Age,Vehicle_Damage,Annual_Premium,Policy_Sales_Channel,Vintage
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,44,1,28.0,0,> 2 Years,Yes,40454.0,26.0,217
1,Male,47,1,28.0,0,> 2 Years,Yes,38294.0,26.0,27
1,Male,47,1,28.0,0,> 2 Years,Yes,38294.0,26.0,27
1,Male,47,1,28.0,0,> 2 Years,Yes,38294.0,26.0,27


#####Grouping Categorical Variables

In [0]:
#Grouping Region_Code
from pyspark.sql.functions import *
from pyspark.sql.functions import when
data_1 = oversampled_df
Region_group = when(
  col("Region_Code") < 22.0, "First"
  ).when(
  col("Region_Code") < 30.0, "Second").otherwise("Third")
data_1 = data_1.withColumn('Region_group', Region_group)
data_1 = data_1.drop('Region_Code')
data_1.show()

In [0]:
#Grouping Policy_Sales_Channel
Channel_group = when(
  col("Policy_Sales_Channel") <26, "channel1"
  ).when(
  col("Policy_Sales_Channel") < 125, "channel2").otherwise("cahnnel3")
data_1 = data_1.withColumn('Channel_group', Channel_group)
data_1 = data_1.drop('Policy_Sales_Channel')
data_1.show()

###Onehot Encoding

In [0]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

In [0]:
region_indexer = StringIndexer(inputCol='Region_group',outputCol='regionIndex')
region_encoder = OneHotEncoder(inputCol='regionIndex',outputCol='regionnVec')

channel_indexer = StringIndexer(inputCol='Channel_group',outputCol='channelIndex')
channel_encoder = OneHotEncoder(inputCol='channelIndex',outputCol='channelVec')

vehicle_age_indexer = StringIndexer(inputCol='Vehicle_Age',outputCol='Vehicle_AgeIndex')
vehicle_age_encoder = OneHotEncoder(inputCol='Vehicle_AgeIndex',outputCol='Vehicle_AgeVec')

vehicle_damage_indexer = StringIndexer(inputCol='Vehicle_Damage',outputCol='Vehicle_DamageIndex')
vehicle_damage_encoder = OneHotEncoder(inputCol='Vehicle_DamageIndex',outputCol='Vehicle_DamageVec')

gender_indexer = StringIndexer(inputCol='Gender',outputCol='genderIndex')
gender_encoder = OneHotEncoder(inputCol='genderIndex',outputCol='genderVec')

In [0]:
input_cols_OneHot = ['genderVec', 'Age',  'Driving_License',  'regionnVec',  'Previously_Insured', 'Vehicle_AgeVec', 'Vehicle_DamageVec', 'Annual_Premium', 'channelVec','Vintage']
assembler_OneHot = VectorAssembler(inputCols= input_cols_OneHot ,outputCol='features')

input_cols_Label = ['genderIndex', 'Age',  'Driving_License',  'regionIndex',  'Previously_Insured', 'Vehicle_AgeIndex', 'Vehicle_DamageIndex', 'Annual_Premium', 'channelIndex','Vintage']
assembler_Label = VectorAssembler(inputCols=input_cols_Label,outputCol='features')

In [0]:
from pyspark.ml import Pipeline
pipeline_OneHot = Pipeline(stages=[region_indexer,channel_indexer, vehicle_age_indexer, vehicle_damage_indexer, gender_indexer, region_encoder,channel_encoder, vehicle_age_encoder, vehicle_damage_encoder, gender_encoder,
                           assembler_OneHot])

pipeline_Label = Pipeline(stages=[region_indexer,channel_indexer, vehicle_age_indexer, vehicle_damage_indexer, gender_indexer, assembler_Label])

fit_model_OneHot = pipeline_OneHot.fit(data_1)
fit_model_Label = pipeline_Label.fit(data_1)

train_OneHot_results = fit_model_OneHot.transform(data_1)
train_Label_results = fit_model_Label.transform(data_1)

In [0]:
train_Label_results.show()

#Feature Selection