Sales by Policy Type and Month : This table would contain the total sales for each policy type and each month. It would be used to analyse the performance of different policy types over time. 

In [0]:
%sql create or replace temp view vw_gold_sales_by_policy_type_and_month
as 
select 
p.policy_type,
month(p.start_date) as purchase_month,
count(month(p.start_date)) as sale_month,
sum(premium) as total_premium,
current_timestamp() as updated_timestamp
from silverlayer.policy p
group by p.policy_type,month(p.start_date) having p.policy_type is not null;

%sql select * from vw_gold_sales_by_policy_type_and_month

policy_type,purchase_month,sale_month,total_premium,updated_timestamp
Health,4,32,3520677,2024-01-10T19:42:07.847Z
Auto,6,34,3398964,2024-01-10T19:42:07.847Z
Auto,3,25,2691985,2024-01-10T19:42:07.847Z
Auto,12,32,3908371,2024-01-10T19:42:07.847Z
Health,10,33,2858976,2024-01-10T19:42:07.847Z
Auto,11,23,2663283,2024-01-10T19:42:07.847Z
Health,9,21,2219249,2024-01-10T19:42:07.847Z
Auto,1,23,2252191,2024-01-10T19:42:07.847Z
Health,6,11,1096513,2024-01-10T19:42:07.847Z
Health,2,30,3267096,2024-01-10T19:42:07.847Z


In [0]:
spark.sql("MERGE INTO goldenlayer.sales_by_policy_type_month as t using vw_gold_sales_by_policy_type_and_month as s on t.policy_type = s.policy_type when matched then update set t.purchase_month = s.purchase_month,t.sale_month = s.sale_month,t.total_premium = s.total_premium,t.updated_timestamp = current_timestamp() when not matched then insert (policy_type,purchase_month,sale_month,total_premium,updated_timestamp) values (s.policy_type,s.purchase_month,s.sale_month,s.total_premium,current_timestamp())")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql select * from goldenlayer.sales_by_policy_type_month

policy_type,purchase_month,sale_month,total_premium,updated_timestamp
Health,4,32,3520677,2024-01-10T19:48:09.035Z
Auto,6,34,3398964,2024-01-10T19:48:09.035Z
Auto,3,25,2691985,2024-01-10T19:48:09.035Z
Auto,12,32,3908371,2024-01-10T19:48:09.035Z
Health,10,33,2858976,2024-01-10T19:48:09.035Z
Auto,11,23,2663283,2024-01-10T19:48:09.035Z
Health,9,21,2219249,2024-01-10T19:48:09.035Z
Auto,1,23,2252191,2024-01-10T19:48:09.035Z
Health,6,11,1096513,2024-01-10T19:48:09.035Z
Health,2,30,3267096,2024-01-10T19:48:09.035Z


Claims by Policy Type and Status : This table would contain the number and amount of claims by policy type and claim status. It would monitor the claims process and identify any trends or issues.  

In [0]:
%sql create or replace temp view vw_gold_claims_by_policy_type_and_status
as 
select 
P.policy_type,
C.claim_status,
count(*) as total_claims,
sum(C.claim_amount) as total_claim_amount,
current_timestamp() as updated_timestamp
from silverlayer.Policy P
inner join silverlayer.Claim C
on P.policy_id = C.policy_id
group by policy_type,claim_status having P.policy_type is not null

%sql select * from vw_gold_claims_by_policy_type_and_status

In [0]:
spark.sql("MERGE INTO goldenlayer.sales_by_policy_type_and_status as t using vw_gold_claims_by_policy_type_and_status as s on t.policy_type = s.policy_type when matched then update set t.claim_status = s.claim_status,t.total_claims = s.total_claims,t.total_claim_amount = s.total_claim_amount,t.updated_timestamp = current_timestamp() when not matched then insert (policy_type,claim_status,total_claims,total_claim_amount,updated_timestamp) values (s.policy_type,s.claim_status,s.total_claims,s.total_claim_amount,current_timestamp())")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql select * from goldenlayer.sales_by_policy_type_and_status

policy_type,claim_status,total_claims,total_claim_amount,updated_timestamp
Auto,Pending,55,6649455,2024-01-10T19:53:48.866Z
Health,Denied,95,8935970,2024-01-10T19:53:48.866Z
Auto,Completed,850,85743100,2024-01-10T19:53:48.866Z
Health,In Progress,835,78298935,2024-01-10T19:53:48.866Z
Health,Completed,745,73734630,2024-01-10T19:53:48.866Z
Health,Pending,75,9006675,2024-01-10T19:53:48.866Z
Auto,In Progress,795,76829555,2024-01-10T19:53:48.866Z
Auto,Denied,55,5989300,2024-01-10T19:53:48.866Z


Analyze the claim data based on policy type like AVG, MAX, MIN, Count of claim 

In [0]:
%sql
create or replace temp view vw_gold_claims_analysis
as 
select 
P.policy_type,
avg(C.claim_amount) as avg_claim_amt,
max(C.claim_amount) as max_claim_amt,
min(C.claim_amount) as min_claim_amt,
count(*) as total_claims,
current_timestamp() as updated_timestamp  
from silverlayer.Policy P
inner join silverlayer.Claim C
on P.policy_id = C.policy_id
group by policy_type having P.policy_type is not null

%sql select * from vw_gold_claims_analysis

In [0]:
spark.sql("MERGE INTO goldenlayer.claims_analysis as t using vw_gold_claims_analysis as s on t.policy_type = s.policy_type when matched then update set t.avg_claim_amt = s.avg_claim_amt,t.max_claim_amt = s.max_claim_amt,t.min_claim_amt = s.min_claim_amt,t.total_claims = s.total_claims,t.updated_timestamp = current_timestamp() when not matched then insert (policy_type,avg_claim_amt,max_claim_amt,min_claim_amt,total_claims,updated_timestamp) values (s.policy_type,s.avg_claim_amt,s.max_claim_amt,s.min_claim_amt,s.total_claims,current_timestamp())")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql select * from goldenlayer.claims_analysis

policy_type,avg_claim_amt,max_claim_amt,min_claim_amt,total_claims,updated_timestamp
Health,97129,199995,2188,1750,2024-01-10T19:59:20.962Z
Auto,99835,197403,2283,1755,2024-01-10T19:59:20.962Z
