In [1]:
import pandas as pd
import numpy as np

ibnr_file = "inputs/IBNR_Allocation_v1.15.xlsx"
mbe_file = "inputs/ABE&MBE_v1.13_202503_v4.xlsm"

In [2]:
ibnr_data = pd.read_excel(
        ibnr_file,
        sheet_name="IBNR Reserves Calculation",
        skiprows=10,
        engine="openpyxl",
    )

In [3]:
def get_ultimate_premiums(mbe_file:str=mbe_file)-> dict:
    ultimates = {}
    lobs = ["ENOFF",
    "ENONS",
    "ENCON",
    "ENOTH",
    "PRTOT",
    "LIIND",
    "LIMLT",
    "LISHT",
    "DCFRO",
    "DCMAR",
    "DCBON",
    "DCFIL",
    "DCPRO",
    "DCENR",
    "DCSPE",
    "DCLIA"]
    excel_file = pd.ExcelFile(mbe_file)
    for lob in lobs:
        # Read values from N13 to N21 (Excel rows 13 to 21, column N) for each LOB sheet
        sheet = pd.read_excel(excel_file, sheet_name=f"Summary_{lob}", header=None)
        # Excel rows are 1-indexed, so rows 13 to 21 are indices 12 to 20
        ultimates[lob] = sheet.iloc[12:21, 13].values  # column N is index 13
    return ultimates

def get_ultimates_df() -> pd.DataFrame:
    ultimates = get_ultimate_premiums()
    years = [i+2017 for i in range(len(ultimates["ENOFF"]))] # Assuming all LOBs have the same number of years
    return pd.DataFrame(ultimates, index=years)

ultimates_df = get_ultimates_df()

In [4]:
import_cols =[
    "Policy reference", 
    "UWY",
    "New LOB",
    "gep",
    "BE LR Attr", "BE LR Attr+Large",
    "ClmI Attr",
    "ClmI Large",
    "SIBNR",
    "Initial IBNR",
    "negative IBNR by UWY*LOB",
    "weights for non negatives",
    "allocation of negative IBNR",
    "Final IBNR",
    "Initial IBNR",
    "negative IBNR by UWY*LOB",
    "weights for non negatives",
    "allocation of negative IBNR",
    "Final IBNR",
    "IBNR Stat earned Attr+Large"]

data = ibnr_data[import_cols]

clm_i_large = pd.pivot_table(
            data,
            index=["UWY"],
            columns=["New LOB"],
            values=["ClmI Large"],
            aggfunc="sum",
        )

clm_i_attr = pd.pivot_table(
            data,
            index=["UWY"],
            columns=["New LOB"],
            values=["ClmI Attr"],
            aggfunc="sum",
        )

ultimate_lr_data = pd.read_excel(
    ibnr_file,
    sheet_name="Inputs from other files",
    skiprows=24,
    engine="openpyxl"
)

unearned_lr_data = pd.read_excel(
    mbe_file,
    sheet_name="PAD",
    skiprows=5,
    engine="openpyxl"
)

unearned_lr_data = unearned_lr_data[["LOB", "Mean", "Standard \nDeviation", "Variance"]]
unearned_lr_data.columns = ["LOB", "Mean", "Standard Deviation", "Variance"]

# Select only the relevant columns
ultimate_lr_data = ultimate_lr_data[["LOB", "UWY", "BE LR Attr", "BE LR Attr+Large"]]
# Example: select row where LOB == "ENTOT" and UWY == 2025
# ultimate_lr_data.loc[(ultimate_lr_data["LOB"] == "ENTOT") & (ultimate_lr_data["UWY"] == 2025)]

# unearned_lr_data[(unearned_lr_data["LOB"] == "ENOFF")]["Standard Deviation"].values[0]


In [5]:
class LRData:
    def __init__(self, data):
        self.data = data

    def __getitem__(self, key):
        lob, year = key
        return self.data[(self.data["LOB"] == lob) & (self.data["UWY"] == year)]

    def __call__(self, lob, year):
        return self.data[(self.data["LOB"] == lob) & (self.data["UWY"] == year)]

class PivotDf:
    def __init__(self, df):
        self.df = df

    def _get_index(self, lob):
        return [(i,j) for (i,j) in self.df.columns if j == lob][0]
        
    def __call__(self, lob, year):
        return float(self.df[self._get_index(lob)].loc[year])



In [6]:

class SubclassUWY:
    def __init__(self, lob, year, ultimate_lr_attr, ultimate_lr_large_attr, incurred_lr_attr, incurred_lr_large, variance_weighted_credibility=1):
        self.lob = lob
        self.year = year
        self.ultimate_lr_attr = ultimate_lr_attr
        self.ultimate_lr_large = ultimate_lr_large_attr 
        self.incurred_lr_attr = incurred_lr_attr
        self.incurred_lr_large = incurred_lr_large
        self.credibity_factor = variance_weighted_credibility

        self.lr_attr = max(0, self.ultimate_lr_attr - self.incurred_lr_attr)
        self.lr_large = max(0, self.ultimate_lr_large - self.incurred_lr_large)

        self.mu_log = self.calculate_mu_log()
        self.sigma_log = self.calculate_sigma_log()

        self.samples = None # Placeholder for samples; calculate using self.run_montecarlo()
        self.aggregates = None  # Placeholder for aggregates, if needed later

    def calculate_mu_log(self):
        try:
            mu_log = np.log(self.lr_large/np.sqrt(1+self.credibity_factor/(self.lr_large)**2))
        except Exception as e:
            print(f"Error in calculating mu_log for lob: {self.lob}, year: {self.year}, setting to 0: {e}")
            mu_log = 0
        return mu_log
    
    def calculate_sigma_log(self):
        try:
            sigma_log = np.sqrt(np.log(1 + self.credibity_factor/(self.lr_large)**2))
        except Exception as e:
            print(f"Error in calculating sigma_log for lob: {self.lob}, year: {self.year}, setting to 0: {e}")
            sigma_log = 0
        return sigma_log
    
    def run_montecarlo(self, n_samples=10000):
        """Run Monte Carlo simulation to generate samples based on the log-normal distribution."""
        # Ensure mu_log and sigma_log are not zero to avoid division by zero errors
        mu = self.mu_log
        sigma = self.sigma_log
        
        if mu == sigma == 0:
            print(f"Warning: mu_log or sigma_log is zero for {self.lob} in {self.year}. Returning empty samples.")
            return np.array([])
        
        # Generate samples from a log-normal distribution
        self.samples = np.random.lognormal(mean=mu, sigma=sigma, size=n_samples)
        self.calculate_aggregates()
        pass
    
    def calculate_aggregates(self):
        """Calculate aggregates based on the generated samples."""
        if self.samples is None:
            print(f"Warning: No samples generated for {self.lob} in {self.year}. Cannot calculate aggregates.")
            return
        
        try:
            aggregates = {
                "mean": np.mean(self.samples),
                "std_dev": np.std(self.samples),
                "median": np.median(self.samples),
                "95th_percentile": np.percentile(self.samples, 95),
                "all_percentiles": np.percentile(self.samples, np.arange(1, 101)) 
            }
        except Exception as e:
            print(f"Error in calculating aggregates for {self.lob} in {self.year}: {e}")
        
    def __repr__(self):
        return (f"SubclassUWY(lob={self.lob}, year={self.year}, "
                f"ultimate_lr_attr={self.ultimate_lr_attr}, "
                f"ultimate_lr_large={self.ultimate_lr_large}, "
                f"incurred_lr_attr={self.incurred_lr_attr}, "
                f"incurred_lr_large={self.incurred_lr_large}, "
                f"credibity_factor={self.credibity_factor})")
    
    def __dict__(self):
        return {
            "lob": self.lob,
            "year": self.year,
            "ultimate_lr_attr": self.ultimate_lr_attr,
            "ultimate_lr_large": self.ultimate_lr_large,
            "incurred_lr_attr": self.incurred_lr_attr,
            "incurred_lr_large": self.incurred_lr_large,
            "credibity_factor": self.credibity_factor,
            "lr_attr": self.lr_attr,
            "lr_large": self.lr_large,
            "mu_log": self.mu_log,
            "sigma_log": self.sigma_log
        }
    
class SubclassUWYFactory:
    def __init__(self, 
            premiums_ultimate_df, 
            incurred_attr_df:PivotDf,
            incurred_large_df:PivotDf,
            ultimate_lr_data: pd.DataFrame = ultimate_lr_data, 
            unearned_lr_data: pd.DataFrame = unearned_lr_data
            ):
        
        self._years = premiums_ultimate_df.index.tolist()
        self._lobs = premiums_ultimate_df.columns.tolist()
        
        self.premiums_ultimate_df = premiums_ultimate_df
        self.incurred_attr_df = incurred_attr_df
        self.incurred_large_df = incurred_large_df
        self.ultimate_lr_data = ultimate_lr_data
        self.unearned_lr_data = unearned_lr_data
        
        self.incurred_lr_attr = self.calculate_incurred_lr_attr()
        self.incurred_lr_large = self.calculate_incurred_lr_large()

    
    def calculate_incurred_lr_attr(self) -> dict:
        # self.incurred_lr_attr.loc[("ENOFF", 2019)].values[0]
        loss_ratios = [] 
        for year in self._years:
            for lob in self._lobs:
                # Calculate the incurred loss ratio for each LOB and year
                incurred_attr = self.incurred_attr_df(lob, year)
                premiums_ultimate = self.premiums_ultimate_df.loc[year, lob]
                
                if premiums_ultimate != 0:
                    incurred_lr_attr = incurred_attr / premiums_ultimate
                else:
                    incurred_lr_attr = np.nan
                loss_ratios.append({"LOB": lob, "UWY": year, "Incurred LR Attr": incurred_lr_attr})
        return pd.DataFrame(loss_ratios).set_index(["LOB", "UWY"])
        
        
    def calculate_incurred_lr_large(self):
        # self.incurred_lr_attr.loc[("ENOFF", 2019)].values[0]
        loss_ratios = [] 
        for year in self._years:
            for lob in self._lobs:
                # Calculate the incurred loss ratio for each LOB and year
                incurred = self.incurred_large_df(lob, year)
                premiums_ultimate = self.premiums_ultimate_df.loc[year, lob]
                
                if premiums_ultimate != 0:
                    incurred_lr = incurred / premiums_ultimate
                else:
                    incurred_lr = np.nan
                loss_ratios.append({"LOB": lob, "UWY": year, "Incurred LR Large": incurred_lr})
        return pd.DataFrame(loss_ratios).set_index(["LOB", "UWY"])
        
    def create_subclass_uwy(self) -> list[SubclassUWY]:
        subclasses = []
        for lob in self._lobs:
            for year in self._years:
                ultimate_lr_attr = self.ultimate_lr_data.loc[
                    (self.ultimate_lr_data["LOB"] == lob) & 
                    (self.ultimate_lr_data["UWY"] == year), 
                    "BE LR Attr"
                ].values[0]
                ultimate_lr_large_attr = self.ultimate_lr_data.loc[
                    (self.ultimate_lr_data["LOB"] == lob) & 
                    (self.ultimate_lr_data["UWY"] == year), 
                    "BE LR Attr+Large"
                ].values[0]
                unearned_std_deviation = unearned_lr_data[(unearned_lr_data["LOB"] == lob)]["Standard Deviation"].values[0]
                unearned_mean = unearned_lr_data[(unearned_lr_data["LOB"] == lob)]["Mean"].values[0]
                variance_weighted_credibility = (ultimate_lr_large_attr*unearned_std_deviation/unearned_mean)**2
                subclasses.append(
                    SubclassUWY(
                        lob=lob,
                        year=year,
                        ultimate_lr_attr = ultimate_lr_attr,
                        ultimate_lr_large_attr =ultimate_lr_large_attr,
                        incurred_lr_attr = self.incurred_lr_attr.loc[(lob, year)].values[0],
                        incurred_lr_large = self.incurred_lr_large.loc[(lob, year)].values[0],
                        variance_weighted_credibility = variance_weighted_credibility
                    )
                )
        return subclasses
    


In [7]:
factory = SubclassUWYFactory(
    premiums_ultimate_df=ultimates_df,
    incurred_attr_df=PivotDf(clm_i_attr),
    incurred_large_df=PivotDf(clm_i_large))

factory.premiums_ultimate_df
factory._lobs

sublobs = factory.create_subclass_uwy()

  mu_log = np.log(self.lr_large/np.sqrt(1+self.credibity_factor/(self.lr_large)**2))
  sigma_log = np.sqrt(np.log(1 + self.credibity_factor/(self.lr_large)**2))
  mu_log = np.log(self.lr_large/np.sqrt(1+self.credibity_factor/(self.lr_large)**2))
  mu_log = np.log(self.lr_large/np.sqrt(1+self.credibity_factor/(self.lr_large)**2))
  sigma_log = np.sqrt(np.log(1 + self.credibity_factor/(self.lr_large)**2))


In [8]:
for sublob in sublobs:
    sublob.run_montecarlo(n_samples=100000)

In [101]:
from dataclasses import dataclass

@dataclass
class IBNR_BE_earned:
    year: int
    lob: str
    gep: float
    incurred: float

    _type: str

    def __post_init__(self):
        if self._type not in ("attr", "large"):
            raise ValueError(f"_type must be 'attr' or 'large', got {self._type}") 

    def calculate_ibnr(self, loss_ratio: float) -> float:
        """Calculate the IBNR based on the incurred amount and loss ratio."""
        if loss_ratio <= 0:
            raise ValueError("Loss ratio must be greater than zero")
        return self.incurred * loss_ratio


class IBNR_BE_earned_Fabric:
    def __init__(self, ibnr_data: pd.DataFrame, loss_ratios: list[SubclassUWY]):
        self.ibnr_data = ibnr_data
        self.loss_ratios = loss_ratios

        self.pol_ref = ibnr_data["Policy reference"].unique()

        # self._run_sanity_checks()

    def _run_sanity_checks(self):
        if not isinstance(self.ibnr_data, pd.DataFrame):
            raise TypeError("ibnr_data must be a pandas DataFrame")
        if not isinstance(self.loss_ratios, list):
            raise TypeError("loss_ratios must be a list of SubclassUWY objects")
        if not all(isinstance(lr, SubclassUWY) for lr in self.loss_ratios):
            raise TypeError("All items in loss_ratios must be SubclassUWY instances")
        if not all(year in self._years for year in self.ibnr_data["UWY"].unique()):
            raise ValueError("Years in ibnr_data do not match the expected years")
        if not all(lob in self._lobs for lob in self.ibnr_data["New LOB"].unique()):
            raise ValueError("LOBs in ibnr_data do not match the expected LOBs")
        if not len(self.pol_ref) == len(self.ibnr_data.shape[0]):
            raise ValueError("Policy references in ibnr_data are not unique")

    def get_BE_attr(self):
        return self.ibnr_data[["Policy reference", 
            "UWY",
            "New LOB",
            "gep",
            "ClmI Attr"]
            ].dropna().rename(columns={"Policy reference":"policy", "ClmI Attr": "ClmI", "New LOB": "lob"})

    def get_BE_large(self):
        return self.ibnr_data[["Policy reference", 
            "UWY",
            "New LOB",
            "gep",
            "ClmI Large"]
            ].dropna().rename(columns={"Policy reference":"policy", "ClmI Large": "ClmI", "New LOB": "lob"})

    def get_lr_simulation_n(self, simulation_n: int) -> dict:
        """Get the loss ratios for a specific simulation number."""
        return {f"{sublob.lob}_{sublob.year}": sublob.samples[simulation_n] for sublob in self.loss_ratios}
    
    def calculate_ibnr_be(self, type: str = "large", simulation_n: int = 0) -> list[IBNR_BE_earned]:
        if type not in ("attr", "large"):
            raise ValueError(f"Type must be 'attr' or 'large', got {type}")
        if simulation_n < 0 or simulation_n >= len(self.loss_ratios[0].samples):
            raise ValueError(f"Simulation number must be between 0 and {len(self.loss_ratios[0].samples) - 1}, got {simulation_n}")
        
        loss_ratios = self.get_lr_simulation_n(simulation_n)
        if not loss_ratios:
            raise ValueError(f"No loss ratios found for simulation number {simulation_n}")

        df = self.get_BE_attr().copy() if type == "attr" else self.get_BE_large().copy()
        
        df["loss_ratio"] = df.apply(lambda row: loss_ratios.get(f"{row['lob']}_{int(row['UWY'])}", 0), axis=1).fillna(0)

        df["Initial IBNR"] = df.apply(lambda row: row["gep"] * row["loss_ratio"] - row["ClmI"], axis=1)
        
        # Calculate negative IBNR by UWY*LOB
        negative_ibnr = df[df["Initial IBNR"] < 0].groupby(["UWY", "lob"])["Initial IBNR"].sum()
        total_positive_ibnr = df[df["Initial IBNR"] >= 0].groupby(["UWY", "lob"])["Initial IBNR"].sum()
        df["Negative IBNR by UWY*LOB"] = df.apply(
            lambda row: negative_ibnr.get((row["UWY"], row["lob"]), 0), axis=1
        )
        df["weights for non negatives"] = df.apply(
            lambda row: row["Initial IBNR"] / total_positive_ibnr.get((row["UWY"], row["lob"]), 1) if row["Initial IBNR"] > 0 else 0, 
            axis=1
        )
        df["allocation of negative IBNR"] = df.apply(
            lambda row: row["Negative IBNR by UWY*LOB"] * row["weights for non negatives"], axis=1
        )
        df["Final IBNR"] = df.apply(lambda row: max(row["Initial IBNR"] + row["allocation of negative IBNR"], 0), axis=1)
        return df


In [102]:
ibnr_be_factory = IBNR_BE_earned_Fabric(
    ibnr_data=data,
    loss_ratios=sublobs
)
# ibnr_be_earned_list = ibnr_be_factory.create_ibnr_be_earned()

In [103]:
ibnr_be_factory.get_BE_attr(), ibnr_be_factory.get_BE_large()
res = ibnr_be_factory.calculate_ibnr_be()
res[(res["UWY"] == 2018) & (res["lob"] == "DCENR")]
# res[res["LOB_UWY"] == "DCENR_2018"]

Unnamed: 0,policy,UWY,lob,gep,ClmI,loss_ratio,Initial IBNR,Negative IBNR by UWY*LOB,weights for non negatives,allocation of negative IBNR,Final IBNR
1030,REN18008983A,2018.0,DCENR,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1267,REN17006187B,2018.0,DCENR,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1356,REN18008237B,2018.0,DCENR,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [104]:
sum(res["Initial IBNR"]), sum(res["ClmI"]), sum(res["gep"]), sum(res["Final IBNR"])

(104847039.90933922, 247362100.8419571, 1022038612.5299488, 252981443.4747605)

In [91]:
loss_ratios = ibnr_be_factory.get_lr_simulation_n(0)
loss_ratios.get("ENOFF_2025")
loss_ratios.get("DCENR_2018", 0)

np.float64(nan)

In [79]:
res.to_excel("IBNR_BE_earned.xlsx", index=False)

In [114]:
import math

sum(float(item.samples[0]) for item in sublobs if item.samples is not None and len(item.samples) > 0 and not math.isnan(float(item.samples[0])))*100/10440

0.4236712244260839