In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [2]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("IFD") \
        .config("spark.some.config.option", "Project") \
        .getOrCreate()
    return spark

In [3]:
spark=init_spark()

23/03/21 11:40:47 WARN Utils: Your hostname, Bhanus-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.168 instead (on interface en0)
23/03/21 11:40:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/03/21 11:40:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/21 11:40:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
data = spark.read.format("csv").option("header", "true").load("../dataset/vehicle_insurance_claim_fraud_data.csv")

In [6]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType

In [8]:
# Split the AgeOfPolicyHolder column into two columns, representing the minimum and maximum age values
data = data.withColumn("AgeOfPolicyHolder_Min", split(col("AgeOfPolicyHolder"), " ")[0].cast("int"))
data = data.withColumn("AgeOfPolicyHolder_Max", split(col("AgeOfPolicyHolder"), " ")[2].cast("int"))

# Compute the average age range for each row in the AgeOfPolicyHolder column
avg_age_range = data.select(avg((col("AgeOfPolicyHolder_Min") + col("AgeOfPolicyHolder_Max")) / 2)).collect()[0][0]

# Add a new column that copies the Age column value when it is not 0, or the average age range otherwise
data = data.withColumn("New_Age", when(col("Age") == 0, avg_age_range).otherwise(col("Age")))

In [9]:
# Convert the columns having numerical values to Double type
numerical_cols = ['WeekOfMonth', 'WeekOfMonthClaimed', 'Age', 'New_Age', 'FraudFound_P', 'RepNumber', 'Deductible', 'DriverRating', 'Year']
for col in numerical_cols:
    data = data.withColumn(col, data[col].cast(DoubleType()))

In [10]:
categorical_cols = ['Month', 'DayOfWeek', 'Make', 'AccidentArea', 'DayOfWeekClaimed',
       'MonthClaimed', 'Sex', 'MaritalStatus', 'Fault', 'PolicyType',
       'VehicleCategory', 'VehiclePrice', 'Days_Policy_Accident',
       'Days_Policy_Claim', 'PastNumberOfClaims', 'AgeOfVehicle',
       'AgeOfPolicyHolder', 'PoliceReportFiled', 'WitnessPresent', 'AgentType',
       'NumberOfSuppliments', 'AddressChange_Claim', 'NumberOfCars',
       'BasePolicy']

In [11]:
indexers = [StringIndexer(inputCol=col, outputCol=col+"_indexed", handleInvalid="keep") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_indexed", outputCol=col+"_encoded") for col in categorical_cols]
pipeline = Pipeline(stages=indexers + encoders)

In [12]:
training, test = data.randomSplit([0.8, 0.2], seed=42)

In [13]:
pipelineModel = pipeline.fit(training)
training_encoded = pipelineModel.transform(training).drop(*categorical_cols, *["PolicyNumber"])
test_encoded = pipelineModel.transform(test).drop(*categorical_cols, *["PolicyNumber"])

23/03/21 11:43:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [15]:
feature_cols=training_encoded.columns

In [16]:
feature_cols.remove('FraudFound_P')

In [17]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler

In [18]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="keep")
train_df = assembler.transform(training_encoded).select("features", "FraudFound_P")

In [19]:
rf = RandomForestClassifier(labelCol="FraudFound_P", featuresCol="features")

In [48]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [2, 5, 10]) \
    .addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
    .addGrid(rf.minInfoGain, [0.0, 0.05]) \
    .build()

In [49]:
evaluator = MulticlassClassificationEvaluator(labelCol="FraudFound_P", predictionCol="prediction", metricName="accuracy")

In [50]:
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, seed=42)

In [51]:
cvModel = cv.fit(train_df)

23/03/21 12:03:21 WARN DAGScheduler: Broadcasting large task binary with size 1027.6 KiB
23/03/21 12:03:21 WARN DAGScheduler: Broadcasting large task binary with size 1383.9 KiB
23/03/21 12:03:22 WARN DAGScheduler: Broadcasting large task binary with size 1812.1 KiB
23/03/21 12:03:22 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/03/21 12:03:23 WARN DAGScheduler: Broadcasting large task binary with size 1635.8 KiB
23/03/21 12:03:25 WARN DAGScheduler: Broadcasting large task binary with size 1014.3 KiB
23/03/21 12:03:25 WARN DAGScheduler: Broadcasting large task binary with size 1340.9 KiB
23/03/21 12:03:26 WARN DAGScheduler: Broadcasting large task binary with size 1741.0 KiB
23/03/21 12:03:26 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/03/21 12:03:30 WARN DAGScheduler: Broadcasting large task binary with size 1250.5 KiB
23/03/21 12:03:30 WARN DAGScheduler: Broadcasting large task binary with size 1583.6 KiB
23/03/21 12:03:31 WARN DAGS

23/03/21 12:06:16 WARN DAGScheduler: Broadcasting large task binary with size 1806.8 KiB
23/03/21 12:06:17 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/03/21 12:06:18 WARN DAGScheduler: Broadcasting large task binary with size 1662.8 KiB
23/03/21 12:06:20 WARN DAGScheduler: Broadcasting large task binary with size 1028.8 KiB
23/03/21 12:06:20 WARN DAGScheduler: Broadcasting large task binary with size 1370.3 KiB
23/03/21 12:06:21 WARN DAGScheduler: Broadcasting large task binary with size 1777.0 KiB
23/03/21 12:06:21 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/03/21 12:06:22 WARN DAGScheduler: Broadcasting large task binary with size 1005.8 KiB
23/03/21 12:06:24 WARN DAGScheduler: Broadcasting large task binary with size 1298.7 KiB
23/03/21 12:06:25 WARN DAGScheduler: Broadcasting large task binary with size 1645.3 KiB
23/03/21 12:06:26 WARN DAGScheduler: Broadcasting large task binary with size 2024.8 KiB
23/03/21 12:06:41 WARN DAGS

In [52]:
test_df = assembler.transform(test_encoded).select("features", "FraudFound_P")

In [53]:
print(evaluator.evaluate(cvModel.transform(train_df)))
print(evaluator.evaluate(cvModel.transform(test_df)))

23/03/21 12:08:21 WARN DAGScheduler: Broadcasting large task binary with size 1703.5 KiB
                                                                                

0.9425621168118749


23/03/21 12:08:22 WARN DAGScheduler: Broadcasting large task binary with size 1703.5 KiB


0.9457671957671958


In [54]:
prediction=cvModel.transform(test_df)

In [55]:
from pyspark.mllib.evaluation import  MulticlassMetrics

In [56]:
evaluator.setMetricName('weightedPrecision')
precision = evaluator.evaluate(prediction)
print("Precision = %s" % precision)

23/03/21 12:08:28 WARN DAGScheduler: Broadcasting large task binary with size 1703.5 KiB


Precision = 0.94871033934568


In [57]:
evaluator.setMetricName('weightedRecall')
recall = evaluator.evaluate(prediction)
print("Recall = %s" % recall)

23/03/21 12:08:32 WARN DAGScheduler: Broadcasting large task binary with size 1703.5 KiB


Recall = 0.9457671957671958


In [58]:
evaluator.setMetricName('f1')
f1_score = evaluator.evaluate(prediction)
print("F1 Score = %s" % f1_score)

23/03/21 12:08:33 WARN DAGScheduler: Broadcasting large task binary with size 1703.5 KiB


F1 Score = 0.9200527300867437


23/03/21 17:14:39 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 414729 ms exceeds timeout 120000 ms
23/03/21 17:14:39 WARN SparkContext: Killing executors is not supported by current scheduler.
