In [0]:
from pyspark.sql.functions import *
from pyspark.sql import functions as F
import pandas as pd



In [0]:
claims_df = spark.read.json(path="/FileStore/tables/claims.json", multiLine=True)

In [0]:
display(claims_df.take(5))

ClaimID,Claim_Logged_On,Claim_Processed_On,Claim_Rejected,authorities_contacted,bodily_injuries,collision_type,incident_city,incident_date,incident_location,incident_severity,incident_state,incident_type,injury,number_of_vehicles_involved,police_report_available,property,vehicle,witnesses,Claim Year,Claim Month,processing_delay
6600116,10-08-2017 19:45,,Y,Other,2,Rear Collision,Columbus,29-07-2017 11:55,4862 Lincoln Hwy,Total Loss,WV,Single Vehicle Collision,9250.0,1,?,6000.0,1800.0,1,2017,8,
2400349,18-05-2017 11:40,,N,Police,2,Front Collision,Northbend,16-05-2017 19:41,1515 Embaracadero St,Total Loss,VA,Single Vehicle Collision,2000.0,1,?,5500.0,2000.0,2,2017,5,
7301700,24-01-2018 23:16,26-01-2018 17:32,N,Ambulance,2,Front Collision,Springfield,08-01-2018 07:55,7459 Flute St,Total Loss,OH,Single Vehicle Collision,4250.0,1,?,5500.0,3350.0,2,2018,1,2.0
72497,12-03-2018 21:06,,N,,2,?,Northbend,12-03-2018 01:50,2878 Britain Hwy,Minor Damage,WV,Vehicle Theft,2000.0,1,?,5500.0,5700.0,1,2018,3,
17284390,26-02-2018 22:28,27-03-2018 23:29,N,Police,2,Rear Collision,Northbrook,25-02-2018 13:50,2430 MLK Ave,Minor Damage,NC,Single Vehicle Collision,2700.0,1,NO,5500.0,5099.99,2,2018,2,29.0


In [0]:
# Add new columns for year and month of claim (claim_logged_on)

claims_df = claims_df.withColumn('Claim Year', year(to_timestamp('Claim_Logged_On', 'dd-MM-yyyy HH:mm'))).\
        withColumn('Claim Month',month(to_timestamp('Claim_Logged_On', 'dd-MM-yyyy HH:mm')))

display(claims_df.take(5))

ClaimID,Claim_Logged_On,Claim_Processed_On,Claim_Rejected,authorities_contacted,bodily_injuries,collision_type,incident_city,incident_date,incident_location,incident_severity,incident_state,incident_type,injury,number_of_vehicles_involved,police_report_available,property,vehicle,witnesses,Claim Year,Claim Month,processing_delay
6600116,10-08-2017 19:45,,Y,Other,2,Rear Collision,Columbus,29-07-2017 11:55,4862 Lincoln Hwy,Total Loss,WV,Single Vehicle Collision,9250.0,1,?,6000.0,1800.0,1,2017,8,
2400349,18-05-2017 11:40,,N,Police,2,Front Collision,Northbend,16-05-2017 19:41,1515 Embaracadero St,Total Loss,VA,Single Vehicle Collision,2000.0,1,?,5500.0,2000.0,2,2017,5,
7301700,24-01-2018 23:16,26-01-2018 17:32,N,Ambulance,2,Front Collision,Springfield,08-01-2018 07:55,7459 Flute St,Total Loss,OH,Single Vehicle Collision,4250.0,1,?,5500.0,3350.0,2,2018,1,2.0
72497,12-03-2018 21:06,,N,,2,?,Northbend,12-03-2018 01:50,2878 Britain Hwy,Minor Damage,WV,Vehicle Theft,2000.0,1,?,5500.0,5700.0,1,2018,3,
17284390,26-02-2018 22:28,27-03-2018 23:29,N,Police,2,Rear Collision,Northbrook,25-02-2018 13:50,2430 MLK Ave,Minor Damage,NC,Single Vehicle Collision,2700.0,1,NO,5500.0,5099.99,2,2018,2,29.0


In [0]:
# Total claims by year and months

total_claim_df = claims_df.groupby('Claim Year','Claim Month').agg(F.count('ClaimID').alias('Total Claims'))
display(total_claim_df.take(5))

Claim Year,Claim Month,Total Claims
2017,3,28
2017,8,38
2017,10,38
2018,1,63
2018,3,64


In [0]:
# Total invalid claims by year and months

# Added new column to calculate delay between claim logged in and proccesed time
claims_df = claims_df.withColumn('processing_delay',datediff(to_date("Claim_Processed_On",'dd-MM-yyyy HH:mm'), to_date("Claim_Logged_On",'dd-MM-yyyy HH:mm')))

# specifying conditions for invalid claims
invalid_claim_cond1 = 'police_report_available == "NO"'
invalid_claim_cond2 = '( \
                       ( (incident_severity == Major Damage) | (incident_severity == Total Loss) ) | \
                       ( (injury > property) & (injury > vehicle) ) \
                       ) & \
                       (processing_delay > 2)'
invalid_claim_cond3 = 'property > 1500'
invalid_claim_cond4  = 'Claim_Rejected == "N"'


invalid_claims_df = claims_df.filter( (invalid_claim_cond1 or invalid_claim_cond2 or invalid_claim_cond3) and (invalid_claim_cond4) ).groupby('Claim Year','Claim Month').agg(F.count('ClaimID').alias('Total Invalid claims'))


display(invalid_claims_df.take(5))

Claim Year,Claim Month,Total Invalid claims
2017,3,20
2017,8,28
2017,10,27
2018,1,48
2018,3,43


In [0]:
# Join both metrics to get the final claim metric

claims_metric = total_claim_df.join(invalid_claims_df, ['Claim Year','Claim Month']).select('Claim Year', 'Claim Month', 'Total claims', 'Total Invalid claims')

display(claims_metric)

Claim Year,Claim Month,Total claims,Total Invalid claims
2017,3,28,20
2017,8,38,28
2017,10,38,27
2018,1,63,48
2018,3,64,43
2018,8,69,54
2017,7,38,26
2018,5,69,51
2016,10,2,1
2017,12,77,51
