Installing Hadoop and java packages required to run pyspark in Google Colab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q spark-3.0.2-bin-hadoop3.2.tgz
!tar xf /content/drive/MyDrive/job-a-thon/spark-3.0.2-bin-hadoop3.2.tgz
!pip install -q findspark

# PySpark 

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop3.2"
import findspark
findspark.init()
findspark.find()

'/content/spark-3.0.2-bin-hadoop3.2'

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Packages required

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.functions import mean as _mean
from pyspark.sql.types import IntegerType,DoubleType
from pyspark.sql.types import StringType
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

# Train and Test dataset location 

In [None]:
PATH_TO_SAMPLE_SUBMISSION = '/content/drive/MyDrive/job-a-thon/sample_submission.csv'
PATH_TO_TEST = '/content/drive/MyDrive/job-a-thon/test.csv'
PATH_TO_TRAIN = '/content/drive/MyDrive/job-a-thon/train.csv'

# Data Preprocessing

In [None]:
train = spark.read.csv(PATH_TO_TRAIN, header=True, inferSchema=True)
test = spark.read.csv(PATH_TO_TEST, header=True, inferSchema=True)
sample = spark.read.csv(PATH_TO_SAMPLE_SUBMISSION,header=True,inferSchema=True)
train.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- City_Code: string (nullable = true)
 |-- Region_Code: integer (nullable = true)
 |-- Accomodation_Type: string (nullable = true)
 |-- Reco_Insurance_Type: string (nullable = true)
 |-- Upper_Age: integer (nullable = true)
 |-- Lower_Age: integer (nullable = true)
 |-- Is_Spouse: string (nullable = true)
 |-- Health Indicator: string (nullable = true)
 |-- Holding_Policy_Duration: string (nullable = true)
 |-- Holding_Policy_Type: double (nullable = true)
 |-- Reco_Policy_Cat: integer (nullable = true)
 |-- Reco_Policy_Premium: double (nullable = true)
 |-- Response: integer (nullable = true)



feature_columns: for collecting features from various stages of data exploration 

In [None]:
feature_columns = []

# Train : Preview

In [None]:
import pandas as pd
pd.DataFrame(train.take(5), columns=train.columns).transpose()

Unnamed: 0,0,1,2,3,4
ID,1,2,3,4,5
City_Code,C3,C5,C5,C24,C8
Region_Code,3213,1117,3732,4378,2190
Accomodation_Type,Rented,Owned,Owned,Owned,Rented
Reco_Insurance_Type,Individual,Joint,Individual,Joint,Individual
Upper_Age,36,75,32,52,44
Lower_Age,36,22,32,48,44
Is_Spouse,No,No,No,No,No
Health Indicator,X1,X2,,X1,X2
Holding_Policy_Duration,14+,,1.0,14+,3.0


## Checking for missing values for Train and test

In [None]:
amount_missing_train = train.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in train.columns])
amount_missing_train.show()

+---+---------+-----------+-----------------+-------------------+---------+---------+---------+------------------+-----------------------+-------------------+---------------+-------------------+--------+
| ID|City_Code|Region_Code|Accomodation_Type|Reco_Insurance_Type|Upper_Age|Lower_Age|Is_Spouse|  Health Indicator|Holding_Policy_Duration|Holding_Policy_Type|Reco_Policy_Cat|Reco_Policy_Premium|Response|
+---+---------+-----------+-----------------+-------------------+---------+---------+---------+------------------+-----------------------+-------------------+---------------+-------------------+--------+
|0.0|      0.0|        0.0|              0.0|                0.0|      0.0|      0.0|      0.0|0.2297669116780001|     0.3979992924806415| 0.3979992924806415|            0.0|                0.0|     0.0|
+---+---------+-----------+-----------------+-------------------+---------+---------+---------+------------------+-----------------------+-------------------+---------------+----------

In [None]:
amount_missing_test = test.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in test.columns])
amount_missing_test.show()

+---+---------+-----------+-----------------+-------------------+---------+---------+---------+-------------------+-----------------------+-------------------+---------------+-------------------+
| ID|City_Code|Region_Code|Accomodation_Type|Reco_Insurance_Type|Upper_Age|Lower_Age|Is_Spouse|   Health Indicator|Holding_Policy_Duration|Holding_Policy_Type|Reco_Policy_Cat|Reco_Policy_Premium|
+---+---------+-----------+-----------------+-------------------+---------+---------+---------+-------------------+-----------------------+-------------------+---------------+-------------------+
|0.0|      0.0|        0.0|              0.0|                0.0|      0.0|      0.0|      0.0|0.23054345333639073|    0.39454253611556983|0.39454253611556983|            0.0|                0.0|
+---+---------+-----------+-----------------+-------------------+---------+---------+---------+-------------------+-----------------------+-------------------+---------------+-------------------+



Imputing missing values

In [None]:
missing_value_column = {'Health Indicator': 'Unknown', 'Holding_Policy_Type':0.0,'Holding_Policy_Duration':0}

In [None]:
train = train.fillna(missing_value_column)
test = test.fillna(missing_value_column)

#Feature Exploring

##Age of the customer
**Upper_Age** and 
**Lower_Age**

In [None]:
train = train.withColumn('Age_diff',col('Upper_Age') - col('Lower_Age'))
test = test.withColumn('Age_diff',col('Upper_Age') - col('Lower_Age'))
feature_columns.append('Age_diff')
feature_columns.append('Upper_Age')
feature_columns.append('Lower_Age')

# Understanding dataset

In [None]:
train.groupBy('Reco_Insurance_Type','Is_Spouse','Response').count().show()

+-------------------+---------+--------+-----+
|Reco_Insurance_Type|Is_Spouse|Response|count|
+-------------------+---------+--------+-----+
|              Joint|       No|       0| 1407|
|              Joint|      Yes|       0| 6370|
|         Individual|       No|       1| 9640|
|         Individual|       No|       0|30896|
|              Joint|       No|       1|  517|
|              Joint|      Yes|       1| 2052|
+-------------------+---------+--------+-----+



In [None]:
train.filter(train.Reco_Insurance_Type=='Joint').groupby('Response').mean().show()

+--------+------------------+------------------+------------------+-----------------+------------------------+--------------------+------------------------+-------------+------------------+
|Response|           avg(ID)|  avg(Region_Code)|    avg(Upper_Age)|   avg(Lower_Age)|avg(Holding_Policy_Type)|avg(Reco_Policy_Cat)|avg(Reco_Policy_Premium)|avg(Response)|     avg(Age_diff)|
+--------+------------------+------------------+------------------+-----------------+------------------------+--------------------+------------------------+-------------+------------------+
|       1|   25533.349552355|1698.2335539120281|52.294277929155314|41.51810042818217|      1.7816270922537953|  16.661346827559363|       21994.75204359671|          1.0| 10.77617750097314|
|       0|25082.185804294717|1728.1735887874502|52.751832326089755|42.45814581458146|      1.7914362864857913|  14.952166645235952|      21949.095460974746|          0.0|10.293686511508295|
+--------+------------------+------------------+--

In [None]:
train.filter(train.Reco_Insurance_Type=='Individual').groupby('Response').mean().show()

+--------+------------------+------------------+-----------------+-----------------+------------------------+--------------------+------------------------+-------------+-------------+
|Response|           avg(ID)|  avg(Region_Code)|   avg(Upper_Age)|   avg(Lower_Age)|avg(Holding_Policy_Type)|avg(Reco_Policy_Cat)|avg(Reco_Policy_Premium)|avg(Response)|avg(Age_diff)|
+--------+------------------+------------------+-----------------+-----------------+------------------------+--------------------+------------------------+-------------+-------------+
|       1|25587.823547717842|1745.5951244813277|42.98226141078838|42.98226141078838|      1.4030082987551866|  16.337033195020748|      12220.413692946058|          1.0|          0.0|
|       0| 25478.65257638529| 1732.827874158467|42.83509192128431|42.83509192128431|      1.3814733298808908|  14.646426721905748|      12192.527382185397|          0.0|          0.0|
+--------+------------------+------------------+-----------------+--------------

**Region_Code** 

Label encoding with response strike rate 

In [None]:
grouped_region = train.groupby('Region_Code').agg(*[sum(col('Response')),count(col('Response'))])
grouped_region = grouped_region.withColumn('Percentage_Response',(col('sum(Response)')/col('count(Response)'))*100)
train = train.join(grouped_region.select('Region_Code','Percentage_Response'),on='Region_Code',how='left')
test = test.join(grouped_region.select('Region_Code','Percentage_Response'),on='Region_Code',how='left')

In [None]:
feature_columns.append('Percentage_Response')

Impute missing values

In [None]:
amount_missing_train = train.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in train.columns])
amount_missing_train.show()

+-----------+---+---------+-----------------+-------------------+---------+---------+---------+----------------+-----------------------+-------------------+---------------+-------------------+--------+--------+-------------------+
|Region_Code| ID|City_Code|Accomodation_Type|Reco_Insurance_Type|Upper_Age|Lower_Age|Is_Spouse|Health Indicator|Holding_Policy_Duration|Holding_Policy_Type|Reco_Policy_Cat|Reco_Policy_Premium|Response|Age_diff|Percentage_Response|
+-----------+---+---------+-----------------+-------------------+---------+---------+---------+----------------+-----------------------+-------------------+---------------+-------------------+--------+--------+-------------------+
|        0.0|0.0|      0.0|              0.0|                0.0|      0.0|      0.0|      0.0|             0.0|                    0.0|                0.0|            0.0|                0.0|     0.0|     0.0|                0.0|
+-----------+---+---------+-----------------+-------------------+---------+-

In [None]:
amount_missing_test = test.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in test.columns])
amount_missing_test.show()

+-----------+---+---------+-----------------+-------------------+---------+---------+---------+----------------+-----------------------+-------------------+---------------+-------------------+--------+--------------------+
|Region_Code| ID|City_Code|Accomodation_Type|Reco_Insurance_Type|Upper_Age|Lower_Age|Is_Spouse|Health Indicator|Holding_Policy_Duration|Holding_Policy_Type|Reco_Policy_Cat|Reco_Policy_Premium|Age_diff| Percentage_Response|
+-----------+---+---------+-----------------+-------------------+---------+---------+---------+----------------+-----------------------+-------------------+---------------+-------------------+--------+--------------------+
|        0.0|0.0|      0.0|              0.0|                0.0|      0.0|      0.0|      0.0|             0.0|                    0.0|                0.0|            0.0|                0.0|     0.0|0.012795230451731254|
+-----------+---+---------+-----------------+-------------------+---------+---------+---------+-------------

In [None]:
percentage_response = train.select('Percentage_Response').collect()
percentage_response = [float(x[0]) for x in percentage_response]

In [None]:
df_stats = train.select(_mean(col('Percentage_Response')).alias('mean')).collect()
mean = df_stats[0]['mean']
test = test.fillna({'Percentage_Response':mean})

## Reco Policy Cat

In [None]:
grouped_Reco_Policy_cat = train.groupby('Reco_Policy_Cat').agg(*[sum(col('Response')),count(col('Response'))])
grouped_Reco_Policy_cat = grouped_Reco_Policy_cat.withColumn('Percentage_Response_Reco_Policy_Cat',(col('sum(Response)')/col('count(Response)'))*100)
train = train.join(grouped_Reco_Policy_cat.select('Reco_Policy_Cat','Percentage_Response_Reco_Policy_Cat'),on='Reco_Policy_Cat',how='left')
test = test.join(grouped_Reco_Policy_cat.select('Reco_Policy_Cat','Percentage_Response_Reco_Policy_Cat'),on='Reco_Policy_Cat',how='left')

In [None]:
feature_columns.append('Percentage_Response_Reco_Policy_Cat')

# City Code

In [None]:
groupedTrain = train.groupBy('City_Code').agg(*[countDistinct('Region_Code').alias('City_Wise_Unique_Region_Code_Count'),count('Region_Code'),sum('Response')])
groupedTrain = groupedTrain.withColumn('Percentage_Response_City_Code',(col('sum(Response)')/col('count(Region_Code)'))*100)
train = train.join(groupedTrain.select('City_Code','City_Wise_Unique_Region_Code_Count','Percentage_Response_City_Code'),on='City_Code',how='left')
test = test.join(groupedTrain.select('City_Code','City_Wise_Unique_Region_Code_Count','Percentage_Response_City_Code'),on='City_Code',how='left')
feature_columns.append('Percentage_Response_City_Code')
feature_columns.append('City_Wise_Unique_Region_Code_Count')

## Holding_Policy_Type

In [None]:
mapping = {'0.0':'cat_0','1.0':'cat_1','2.0':'cat_2','3.0':'cat_3','4.0':'cat_4'}
train = train.withColumn("Holding_Policy_Type",train["Holding_Policy_Type"].cast(StringType()))
test = test.withColumn("Holding_Policy_Type",test["Holding_Policy_Type"].cast(StringType()))
train = train.replace(to_replace=mapping,subset=['Holding_Policy_Type'])
test = test.replace(to_replace=mapping,subset=['Holding_Policy_Type']) 

# Holding_Policy_Duration

In [None]:
mapping = {'0':'cat_0','1.0':'cat_1','2.0':'cat_2','3.0':'cat_3','4.0':'cat_4',
           '5.0':'cat_5','6.0':'cat_6','7.0':'cat_7','8.0':'cat_8','9.0':'cat_9'
           ,'10.0':'cat_10','11.0':'cat_11','12.0':'cat_12','13.0':'cat_13',
           '14.0':'cat_14','14+':'cat_15'}
train = train.withColumn("Holding_Policy_Duration",train["Holding_Policy_Duration"].cast(StringType()))
test = test.withColumn("Holding_Policy_Duration",test["Holding_Policy_Duration"].cast(StringType()))
train = train.replace(to_replace=mapping,subset=['Holding_Policy_Duration'])
test = test.replace(to_replace=mapping,subset=['Holding_Policy_Duration']) 

# OneHotEncoder

In [None]:
categoricalColumns = ['Accomodation_Type','Is_Spouse', 'Health Indicator', 'Holding_Policy_Type','Holding_Policy_Duration']

In [None]:
feature_columns.append('Reco_Policy_Premium')

In [None]:
for catCol in categoricalColumns:
  categories = train.select(catCol).distinct().rdd.flatMap(lambda x : x).collect()
  categories.sort()
  for category in categories[:-1]:
      function = udf(lambda item: 1 if item == category else 0, IntegerType())
      new_column_name = catCol+'_'+ str(category)
      train = train.withColumn(new_column_name, function(col(catCol)))
      test = test.withColumn(new_column_name, function(col(catCol)))
      feature_columns.append(new_column_name)

Vector Assembler and min max scaler

In [None]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")

pipeline = Pipeline(stages=[assembler , scaler])
scaler_model = pipeline.fit(train)
train = scaler_model.transform(train)
scaler_model = pipeline.fit(test)
test = scaler_model.transform(test)

Random Forest Model

In [None]:
train = train.withColumnRenamed("Response", "label")

## Class weights

In [None]:
# balancingRatio = train.filter(col('label') == 1).count() / train.count()
# calculateWeights = udf(lambda x: 1 * balancingRatio if x == 0 else (1 * (1.0 - balancingRatio)), DoubleType())
# weightedDataset = train.withColumn('classWeightCol', calculateWeights('label'))

In [None]:
# (trainingData, testData) = weightedDataset.randomSplit([0.8, 0.2])

Gradient - Boosted Tree Classifier

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10, labelCol="label", featuresCol="features_scaled",weightCol='classWeightCol')
gbtModel = gbt.fit(trainingData)
predictions = gbtModel.transform(testData)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.7454930582300876


# Model fit entire train dataset

# Down Sampling majority class

In [None]:
major_df = train.filter(col("label") == 0)
minor_df = train.filter(col("label") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
sampled_majority_df = major_df.sample(False, 1/ratio)
combined_train = sampled_majority_df.unionAll(minor_df)
balancingRatio = combined_train.filter(col('label') == 1).count() / combined_train.count()
calculateWeights = udf(lambda x: 1 * balancingRatio if x == 0 else (1 * (1.0 - balancingRatio)), DoubleType())
weightedDataset = combined_train.withColumn('classWeightCol', calculateWeights('label'))

ratio: 3


In [None]:
gbtModel = gbt.fit(weightedDataset)
gbt_cv_predictions = gbtModel.transform(test)

In [None]:
gbt_cv_predictions = gbt_cv_predictions.withColumnRenamed('prediction','Response')
dff = gbt_cv_predictions.select('ID','Response').toPandas()
dff.to_csv('submission_final_GBT_With_downsampling.csv',index=False)

Cross Validation 

In [None]:
# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# paramGrid = (ParamGridBuilder()
#              .addGrid(gbt.maxDepth, [2, 4, 6])
#              .addGrid(gbt.maxBins, [20, 30])
#              .addGrid(gbt.maxIter, [10, 15])
#              .build())
# evaluator = BinaryClassificationEvaluator()
# gbt = GBTClassifier(maxIter=10, labelCol="label", featuresCol="features_scaled",weightCol='classWeightCol')
# cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# # Run cross validations.
# cvModel = cv.fit(weightedDataset)