# 1. Global Setup

## 1.1 - Packages

### 1.1.1 - Installations

In [0]:
# Core Data Handling
%pip install pandas --quiet
%pip install numpy --quiet
%pip install openpyxl --quiet

# Modeling and Statistical Analysis
%pip install statsmodels --quiet
%pip install pygam --quiet

# Actuarial Modelling
%pip install chainladder --quiet
%pip install sparse==0.15.5 --quiet  # Newer version conflicts with chainladder

# Performance and Parallel Processing
%pip install swifter --quiet
%pip install joblib --quiet
%pip install tqdm --quiet

### 1.1.2 - Imports

In [0]:
# Core Libraries
import pandas as pd
import numpy as np
from decimal import Decimal
from datetime import datetime
import itertools
import warnings
from dateutil.relativedelta import relativedelta

# Visualisation
import matplotlib.pyplot as plt

# Statistical Modelling
import statsmodels.api as sm
import statsmodels.formula.api as smf
from pygam import PoissonGAM, GAM, s, f, te

# Actuarial Modelling
import chainladder as cl

# Performance and Parallel Processing
from tqdm import tqdm
from joblib import Parallel, delayed

# Table updating functions
from delta.tables import DeltaTable
from pyspark.sql.functions import col

### 1.1.3 - Global Parameters

In [0]:
# Modelling Parameters
development_term = 24
last_day_previous_month = datetime(2025, 5, 31)
last_n_month_lognormal = 12
latest_balance_date_str = last_day_previous_month.strftime('%Y-%m-%d')

development_period_end = 24
exclude_last_n_month = 6
n_periods = 12
acc_month_start = pd.to_datetime('2017-01-01')
valuation_dates = pd.date_range(start="2019-01-31", end=last_day_previous_month - relativedelta(months=exclude_last_n_month), freq='ME').strftime('%Y-%m-%d').tolist()

# Indexation
cpi_file_path = '/Volumes/actuaries_prd/general/ibnr/enrichment/cpi.csv'

cpi_by_quarter = spark.sql(f"""
    SELECT 
        Date,
        CPI as cpi,
        CONCAT(YEAR(Date), 'Q', QUARTER(Date)) AS quarter
    FROM actuaries_prd.reference_data.abs_quarterly_cpi
""").toPandas()

In [0]:
# Collection for loop
product_configs = {}

# Function to add (if product does not exist) or update (if product exists) configs
def update_product_config(config_dict, product_name, input_product_short=None, input_claim_data=None, input_expo_data=None, input_gwp_data=None, input_model_data=None, input_main_level=None, input_sub_levels=None):
    # If product doesn't exist, initialize it
    if product_name not in config_dict:
        config_dict[product_name] = {}

    # Update only the parameters that are provided
    if input_product_short is not None:
        config_dict[product_name]["product_short"] = input_product_short
    if input_claim_data is not None:
        config_dict[product_name]["claim_data"] = input_claim_data
    if input_expo_data is not None:
        config_dict[product_name]["expo_data"] = input_expo_data
    if input_gwp_data is not None:
        config_dict[product_name]["gwp_data"] = input_gwp_data
    if input_model_data is not None:
        config_dict[product_name]["model_data"] = input_model_data
    if input_main_level is not None:
        config_dict[product_name]["main_level"] = input_main_level
    if input_sub_levels is not None:
        config_dict[product_name]["sub_levels"] = input_sub_levels

# Function to check product_configs
def print_product_configs_summary(config_dict):
    for product, config in config_dict.items():
        print(f"\n Product: {product}")
        for key, value in config.items():
            if isinstance(value, pd.DataFrame):
                print(f"  - {key}: DataFrame with shape {value.shape}")
            else:
                print(f"  - {key}: {value}")

# 2. Product

## 2.1 - Private Motor

### 2.1.1 - APP Parameters

In [0]:
# Product Name
product = 'Private Motor'
product_short = 'PM'

# This is the level that the adequacy is calculated at
main_level = 'channel' 

# This is the level that the assumptions need to be set in the APP 
sub_levels = ['claim_type']

### 2.1.2 - Claim Data

In [0]:
claim_data = spark.sql(f"""
    SELECT 
        DATE_TRUNC('MM', g.loss_date) AS acc_month, 
        GREATEST(DATE_TRUNC('MM', a.observation_year_month),DATE_TRUNC('MM', g.loss_date)) AS obs_month,
        greatest((YEAR(a.observation_year_month) - YEAR(g.loss_date)) * 12 + MONTH(a.observation_year_month) - MONTH(g.loss_date) + 1,1 ) AS dev_month,
        CASE WHEN f.claim_code_mapped IS NULL THEN 'NAF' ELSE f.claim_code_mapped END AS claim_type,
        g.ANZO_Super_Class AS product_group,
        CASE WHEN (g.cell_name IS NULL and coalesce(right(g.policy_branch_code, 2), 'FI') = 'FI') then 'Growth' 
             WHEN (g.cell_name IS NULL AND right(g.policy_branch_code, 10) = 'MotorTrade') then 'Merc'
             WHEN g.cell_name = 'Private Motor' THEN 'Direct' 
             ELSE g.cell_name  END AS channel,
        SUM(a.new_claims_count) AS claim_count, 
        SUM(a.net_claims_incurred_movement_amount_gst_excl) AS net_claim_incurred,
        SUM(a.gross_claims_incurred_movement_amount_gst_excl) as gross_claim_incurred,
        SUM(a.claim_recoveries_movement_amount_gst_excl) as recoveries
    FROM 
        cds_prd.cds.claim_claim_transactionmonth_financialcounts a 
    LEFT JOIN
        cds_prd.rds.claim_claim_transactiondaily_financialcounts_detail g on a.claim_fkey = g.claim_origin_key
    LEFT JOIN 
        ids_prd.ref.ref_cause_of_loss c ON a.cause_of_loss_fkey = c.origin_key
    LEFT JOIN
        actuaries_prd.general.pm_claimtype_mapping f on c.claim_code = f.claim_code
    LEFT JOIN 
        (select distinct PolicyNumber,ReferenceProductCode_Ext FROM staging_prd.gw.pc_policyperiod) h ON g.Policy_Number = h.PolicyNumber
    WHERE 
        YEAR(g.loss_date) >=2017
        AND g.anzo_super_class = '{product}'
        AND a.observation_year_month <= '{latest_balance_date_str}'
        AND g.loss_date <= '{latest_balance_date_str}'
        AND c.incident_description is not null
        AND coalesce(Distribution_Area, 'NULL') <> 'Motorcycle'
        AND (REPLACE(g.account_number, '||', '') not in ('AVMYTESLA', '56PORSCHN', '56PORSCHR','1Q0009136','1Q0009137','1Q0009138','1Q0009139','1Q0009140','1Q0009142','1Q0009162','1Q0009168','1Q0009178','40STELLA') or g.account_number is null)
        AND coalesce(ReferenceProductCode_Ext, 'NULL') <>'CVT'
        AND coalesce(claims_category) <>'Catastrophe'
    GROUP BY 
        all
    ORDER BY 
        all
    """).toPandas()


### 2.1.3 - Exposure Data

In [0]:
expo_data = spark.sql(f"""
WITH policy_transformed AS (
    SELECT DISTINCT 
        CONCAT(COMPANY_CODE, POLICY_BR, POLICY_NO, POLICY_TYP) AS Policy, 
        CONCAT(ACCOUNT_BR, ACCOUNT_NO) AS Account,
        ROW_NUMBER() OVER (PARTITION BY COMPANY_CODE, POLICY_BR, POLICY_NO, POLICY_TYP ORDER BY DATE_EFFECT DESC) AS rn
    FROM STAGING_PRD.EVO.tal_evo_polh_01
    WHERE ACCOUNT_BR IS NOT NULL AND ACCOUNT_NO IS NOT NULL
),
expo_data_raw AS (
    SELECT  
        DATE_TRUNC('MM', exp_start) AS acc_month,
        CASE 
            WHEN channel = 'ANZ' THEN 'ANZ'
            WHEN channel = 'BD' THEN 'Broker Distribution'
            WHEN channel = 'ELDERS' THEN 'Elders'
            WHEN channel = 'FIOTHER' THEN 'FI Other'
            WHEN channel = 'M TRADE' THEN 'Motor Trades'
            WHEN channel = 'DIRECT' THEN 'Direct'
            WHEN channel in ('AUSPOST', 'KOGAN') THEN 'Growth'
            WHEN channel = 'MERCEDE' THEN 'Merc'
            ELSE channel
        END AS channel,
        F_policyno AS policyno,
        earnprem AS earnprem,
        exposure AS exposure
    FROM actuarial_onprem_sqlserver.dbo.fact_mpa_prem
    WHERE exp_start >= '2017-01-01' AND exp_start <= '{latest_balance_date_str}'
)
SELECT e.acc_month, e.channel, SUM(e.earnprem) AS earnprem, SUM(e.exposure) AS exposure
FROM expo_data_raw e
LEFT JOIN policy_transformed p 
    ON e.policyno = p.Policy AND p.rn = 1
WHERE p.Account IS NULL OR p.Account NOT IN ('AVMYTESLA', '56PORSCHN', '56PORSCHR','1Q0009136','1Q0009137',
                      '1Q0009138','1Q0009139','1Q0009140','1Q0009142','1Q0009162',
                      '1Q0009168','1Q0009178','40STELLA')
GROUP BY e.acc_month, e.channel
order by 1,2
""").toPandas()

In [0]:
%sql
select * from actuarial_onprem_sqlserver.dbo.fact_mpa_prem

### 2.1.4 - GWP Data

In [0]:
gwp_data = spark.sql(f"""select count(distinct policynumber) as policy_count, count(*) as risk_count, sum(`Premium Excl. Tax`) as GWP, 
case when Producer in ('AUSPOST', 'KOGAN') then 'Growth'
     when Producer = 'MERCEDESBENZ' then 'Merc'
     when DistributorChannel in ('FI', 'FIOTHER') then 'FI Other' 
     when DistributorChannel ='BD' then 'Broker Distribution' 
     when DistributorChannel in ('M Trade', 'MotorTrade') then 'Motor Trades'
     when DistributorChannel = 'ELDERS' then 'Elders'
     else DistributorChannel end as DistributorChannel,
CONCAT(YEAR(effectiveDt), 'Q', CEILING(MONTH(effectiveDt) / 3.0))  as YearQuarter
FROM actuarial_onprem_sqlserver.dbo.PM_Portfolio_Growth
where flag_quote = 1  and ((endorsementType = 'Submission' and   flag_bound = 1) or 
  (endorsementType = 'Renewal' and flag_paid = 1  and if(`Source System Code` = 'GW' and DistributorChannel_detailed != 'ANZ', 1, 0) = 0 or PaidDate IS NOT NULL))
 group by all
 order by all""").toPandas()

### 2.1.5 - Collection

In [0]:
# Attach and clean for earnprem and exposure data
expo_data['dev_month'] = 1
data = pd.merge(
    claim_data,
    expo_data,
    on=['acc_month', 'dev_month', 'channel'], # May need adjusting/parameterising - aggLevels is at channel + claim_type but expo_data only aggregated at channel level
    how='left'
)
data[['earnprem', 'exposure']] = data[['earnprem', 'exposure']].fillna(0)

# Filter data input
model_data = data[
    (data['dev_month'] <= development_term) &
    (data['acc_month'] >= acc_month_start)
]

In [0]:
display(model_data)

In [0]:
update_product_config(product_configs, product, input_product_short=product_short, input_claim_data=claim_data, input_expo_data=expo_data, input_gwp_data=gwp_data, input_model_data=model_data, input_main_level=main_level, input_sub_levels=sub_levels)
print_product_configs_summary(product_configs)

# 3. Model Template

## 3.1 - Product Configurations

In [0]:
# product_configs.pop("Private Motor")
print_product_configs_summary(product_configs)

## 3.2 - Model Loop

In [0]:
%run "./IBNR Modelling Template (Standardised)"

# 4. Diagnostics

In [0]:
# # Read the HTML file content
# with open("/Workspace/Shared/General/IBNR project/ibnr_modelling/temp_output.html", "r") as f:
#     html_content = f.read()

# # Display it in the notebook
# displayHTML(html_content)

# 5. Application Input

In [0]:
product_configs.items()

In [0]:
app_config_rows = [
    {
        "Product": product,
        "Table": f"actuaries_prd.general.{config.get('product_short', '').lower()}_ultimates_new",
        "Main_Level": config.get('main_level', ''),
        "Sub_Levels": config.get('sub_levels', '')
    }
    for product, config in product_configs.items()
]



# Create a DataFrame from the new configuration rows
new_df = spark.createDataFrame(app_config_rows)

# Define the target Delta table
target_table = DeltaTable.forName(spark, "actuaries_prd.general.ibnr_product_configs")

# Perform the merge (upsert)
target_table.alias("target").merge(
    new_df.alias("source"),
    "target.Product = source.Product"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()
