# 1. Import libraries & define global variables

In [9]:
import sys
print(sys.executable)

%pip install pyarrow xlsxwriter

c:\Users\bakan\AppData\Local\Programs\Python\Python313\python.exe
Note: you may need to restart the kernel to use updated packages.


In [10]:
from dataclasses import dataclass, field
from joblib import delayed, Parallel
from time import perf_counter
from typing import Self

from tqdm import tqdm

import numpy as np
import polars as pl
import pandas as pd


N_MONTHS = 60 # the first 5 years
MU_NONSMOKER, MU_SMOKER = 0.03, 0.09 

N_POLICYHOLDERS = 1_000
BENEFIT_AMOUNT_FOR_ALL_POLICYHOLDERS = 100_000
AGE = 25 # doesn't really matter in this scenario
PERCENT_SMOKER = 0.20
MAX_LIFESPAN = 1200  # 100years, doesn't really matter in this scenario

N_SIMULATIONS = 1_000

# 2. Create helper classes & functions

In [11]:
def get_time(t: float, dp: int | None = 1) -> tuple[float, str]:
    if t < 60:
        return np.around(t, dp), "seconds"
    if t < 60 * 60:
        return np.around(t / 60, dp), "minutes"
    if t < 60 * 60 * 24:
        return np.around(t / (60 * 60), dp), "hours"

    return np.around(t / (60 * 60 * 24), dp), "days"


class ExponentialMortality:
    def __init__(self, mu: float, max_lifespan: float|None=np.inf) -> None:
        self.__mu = mu 
        self.__max_lifespan = max_lifespan  # Mass point at max lifespan.
        
    @property
    def mu(self) -> float:
        return self.__mu
    
    @property
    def mu_monthly(self) -> float:
        return self.__mu / 12.0
    
    @property
    def max_lifespan(self) -> float:
        return self.__max_lifespan
    
    def copy(self) -> Self:
        return ExponentialMortality(mu=self.mu, max_lifespan=self.max_lifespan)

    def kpx(self, k: float, x: float) -> float:
        return np.exp(-self.mu_monthly*k) if x+k < self.max_lifespan else 0
    
    def kqx(self, k: float, x: float) -> float:
        return 1 - self.kpx(k=k, x=x)

    def prob_Kx_equals_k(self, k: float, x: float) -> float:
        return self.kpx(k=k, x=x) * self.kqx(k=1, x=x+k)
    
    def discrete_remaining_mortality_probs(self, x: float, t_horizon: int|float, limit: int|None=120) -> np.ndarray:
        """Generates the probability of deaths in each discrete year at and beyond age x."""
        t_horizon_floor = int(min(t_horizon, limit))
        return np.array([self.prob_Kx_equals_k(k=float(k), x=x) for k in range(t_horizon_floor+1)])
    
    def simulate_remaining_death_year(self, x: float, seed: int|None=42) -> int:
        """Generates the # integer years left for the person to live."""
        if x >= self.max_lifespan:
            return 0  # They're already dead!
        
        t_horizon = self.max_lifespan-x
        discrete_mortality_probs = self.discrete_remaining_mortality_probs(x=x, t_horizon=t_horizon)
        probs_scaled = discrete_mortality_probs / discrete_mortality_probs.sum()  # to ensure scaled probs add up to one.
        
        # Set a random seed for replicability.
        rng  = np.random.default_rng(seed)
        return rng.choice(discrete_mortality_probs.size, p=probs_scaled)
    
    
@dataclass(frozen=True, slots=True)
class Policyholder:
    """An immutable policyholder record."""
    id_: int
    age: float
    mortality: ExponentialMortality = field(repr=False)
    benefit: float

    def __post_init__(self) -> None:
        # Copy `mortality` *after* the object is created.
        # Because the dataclass is frozen, we must use `object.__setattr__`.
        object.__setattr__(self, "mortality", self.mortality.copy())
        
    def copy(self) -> Self:
        return Policyholder(id_=self.id_, age=self.age, mortality=self.mortality.copy(), benefit=self.benefit)
    
    def num_integer_years_to_death(self, seed: int|None=42) -> int:
        return self.mortality.simulate_remaining_death_year(x=self.age, seed=seed)
    
    def integer_age_of_death(self, seed: int|None=42) -> int:
        return int(self.num_integer_years_to_death(seed=seed)+self.age)

    
class LiabilityPortfolio:
    def __init__(self, policyholders: list[Policyholder], seed: int|None=42) -> None:
        self.__policyholders = [policyholder.copy() for policyholder in policyholders]
        self.__seed = seed
    
    @property
    def policyholders(self) -> list[Policyholder]:
        return self.__policyholders
    
    @property
    def n_policyholders(self) -> int:
        return len(self.policyholders)
    
    @property
    def seed(self) -> int:
        return self.__seed
    
    @property
    def death_benefits(self) -> np.ndarray:
        return np.array([policyholder.benefit for policyholder in self.policyholders])
    
    @property
    def num_integer_years_to_death(self) -> np.ndarray:
        # We need to vary the inner seed to ensure that numbers change for similarly distributed policyholders.
        # We thus do a seed*i where i represents the location of the policyholder[i] in the array.
        return np.array([policyholder.num_integer_years_to_death(seed=self.seed*i)
                         for i, policyholder in enumerate(self.policyholders)])
    
    @property
    def smoker_mask(self) -> np.ndarray:
        return np.array([p.mortality.mu == MU_SMOKER for p in self.policyholders])

    @property
    def nonsmoker_mask(self) -> np.ndarray:
        return ~self.smoker_mask
    
    def __project_results(self, n_years: int|None=None, weights: np.ndarray|None=None) -> np.ndarray:
        """return the policyholders' time to death and the dying number for each n_years given the time scope n_years and the various benefits weights"""
        if n_years is None:
            # Just take the maximum remaining lifetime of the policyholder allowed to live the longest.
            max_death_time = int(np.max([int(policyholder.mortality.max_lifespan-policyholder.age)
                                         for policyholder in self.policyholders]))
        else:
            max_death_time = n_years
        
        death_times = self.num_integer_years_to_death
        
        if weights is None:
            return np.bincount(death_times, minlength=max_death_time+1)[:max_death_time+1]
        
        return np.bincount(death_times, minlength=max_death_time+1, weights=weights)[:max_death_time+1]
    
    def __project_results_by_mask(self, mask: np.ndarray, n_years: int|None=None,
                              weights: np.ndarray|None=None) -> np.ndarray:
    # Handle empty group
        if not mask.any():
            return np.zeros(1, dtype=float if weights is not None else int)

        if n_years is None:
            max_death_time = int(np.max([
                int(p.mortality.max_lifespan - p.age)
                for p in np.array(self.policyholders, dtype=object)[mask]
            ]))
        else:
            max_death_time = n_years

        death_times = self.num_integer_years_to_death[mask]
        if weights is None:
            return np.bincount(death_times, minlength=max_death_time+1)[:max_death_time+1]
        return np.bincount(death_times, minlength=max_death_time+1, weights=weights[mask])[:max_death_time+1]
    
    def get_num_deaths_in_remaining_integer_years(self, n_years: int|None=None) -> np.ndarray:
        return {
        "Smoker": self.__project_results(self.smoker_mask, n_years=n_years),
        "Non-Smoker": self.__project_results(self.nonsmoker_mask, n_years=n_years),
    }
    
    def get_losses_in_remaining_integer_years(self, n_years: int|None=None) -> np.ndarray:
        return {
        "Smoker": self.__project_results_by_mask(self.smoker_mask, n_years=n_years, weights=self.death_benefits),
        "Non-Smoker": self.__project_results_by_mask(self.nonsmoker_mask, n_years=n_years, weights=self.death_benefits),
    }
    
    # def get_projections_df(self, n_years: int, scenario: list[int]|None=0) -> pl.DataFrame:      
    #     num_deaths = self.get_num_deaths_in_remaining_integer_years(n_years=n_years)
    #     n_alive_end = self.n_policyholders - np.cumsum(num_deaths)
    #     n_alive_start = np.r_[self.n_policyholders, n_alive_end[:-1]]
    #     dollar_losses = self.get_losses_in_remaining_integer_years(n_years=n_years)
    #     return pl.DataFrame({"Month": range(n_years+1), "Scenario #": [scenario]*(n_years+1),
    #                          "# Alive at Start": n_alive_start, "# Deaths": num_deaths,
    #                          "# Alive at End": n_alive_end,
    #                          "Loss ($)": dollar_losses,
    #                         })
    def get_projections_df(self, n_years: int, scenario: int|None=0) -> pl.DataFrame:
        """
        Return a long-form projection split into 'Smoker' and 'Non-Smoker'.
        Columns: Month, Scenario #, Group, # Alive at Start, # Deaths, # Alive at End, Loss ($)
        """
        # deaths & losses per group
        deaths_sm = self.__project_results_by_mask(self.smoker_mask,     n_years=n_years)
        deaths_ns = self.__project_results_by_mask(self.nonsmoker_mask,  n_years=n_years)
        losses_sm = self.__project_results_by_mask(self.smoker_mask,     n_years=n_years, weights=self.death_benefits)
        losses_ns = self.__project_results_by_mask(self.nonsmoker_mask,  n_years=n_years, weights=self.death_benefits)

        # initial counts
        n_sm = int(self.smoker_mask.sum())
        n_ns = int(self.nonsmoker_mask.sum())

        def build(label: str, initial_n: int, d: np.ndarray, L: np.ndarray) -> pl.DataFrame:
            n_end   = initial_n - np.cumsum(d)
            n_start = np.r_[initial_n, n_end[:-1]]
            return pl.DataFrame({
                "Month": range(n_years+1),
                "Scenario #": [scenario]*(n_years+1),
                "Group": [label]*(n_years+1),
                "# Alive at Start": n_start,
                "# Deaths": d,
                "# Alive at End": n_end,
                "Loss ($)": L,
            })

        df_sm = build("Smoker", n_sm, deaths_sm, losses_sm)
        df_ns = build("Non-Smoker", n_ns, deaths_ns, losses_ns)
        return pl.concat([df_sm, df_ns], how="vertical")

    

class LiabilitySimulator:
    def __init__(self, mortalities: tuple[ExponentialMortality, ExponentialMortality],
                 age: float,
                 n_policyholders: int,
                 percent_smoker: float,
                 benefit_amount: float,
                 n_years: int) -> None:
        self.__mortality_nonsmoker = mortalities[0].copy()
        self.__mortality_smoker = mortalities[1].copy()
        self.__age = age
        self.__n_policyholders = n_policyholders
        self.__percent_smoker = percent_smoker
        self.__benefit_amount = benefit_amount
        self.__n_years = n_years
    
    @property
    def simulator_inputs(self) -> dict[str, ExponentialMortality|float|int]:
        return {"mortality_nonsmoker": self.__mortality_nonsmoker, "mortality_smoker": self.__mortality_smoker,
                "age": self.__age, "n_policyholders": self.__n_policyholders, "percent_smoker": self.__percent_smoker,
                "benefit_amount": self.__benefit_amount, "n_years": self.__n_years,
               }
    
    
    def __generate_one_policyholder_at_time0(self, rng_uniform: float, id_: int|None=0) -> Policyholder:
        if rng_uniform <= self.__percent_smoker:
            return Policyholder(id_=id_, age=self.__age, mortality=self.__mortality_smoker, benefit=self.__benefit_amount)
        return Policyholder(id_=id_, age=self.__age, mortality=self.__mortality_nonsmoker, benefit=self.__benefit_amount)
        
    def __generate_policyholder_portfolio_at_time0(self, seed: int|None=42) -> list[Policyholder]:
        rng  = np.random.default_rng(seed=seed)
        random_uniform_array = rng.uniform(size=self.__n_policyholders)
        return [self.__generate_one_policyholder_at_time0(id_=i, rng_uniform=random_uniform_array[i])
                for i in range(self.__n_policyholders)
               ]
    
    def simulate_one(self, seed: int|None=42) -> LiabilityPortfolio:
        policyholders = self.__generate_policyholder_portfolio_at_time0(seed=seed)
        return LiabilityPortfolio(policyholders=policyholders, seed=seed)
    
    def simulate(self, n: int) -> list[LiabilityPortfolio]:                                
        return [self.simulate_one(seed=i) for i in tqdm(range(n), desc="Simulating liability portfolio - non-parallel")]
    
    def get_projections_df_across_scenarios(self, liability_portfolios: list[LiabilityPortfolio]) -> pl.DataFrame:
        n_simulations = len(liability_portfolios)
        dfs = Parallel(n_jobs=-1)(
            delayed(liability_portfolios[i].get_projections_df)(n_years=self.__n_years, scenario=i)
            for i in tqdm(range(n_simulations), desc="Packaging results")
        )
        return pl.concat(dfs, how="vertical").sort(by=["Month", "Scenario #"], descending=[False, False])


# 3. Simulate many

## 3.1. Set up the simulator

In [12]:
tic = perf_counter()
mortality_nonsmoker = ExponentialMortality(mu=MU_NONSMOKER, max_lifespan=MAX_LIFESPAN)
mortality_smoker = ExponentialMortality(mu=MU_SMOKER, max_lifespan=MAX_LIFESPAN)
liability_simulator = LiabilitySimulator(mortalities=(mortality_nonsmoker, mortality_smoker),
                                         age=AGE, n_policyholders=N_POLICYHOLDERS, 
                                         percent_smoker=PERCENT_SMOKER,
                                         benefit_amount=BENEFIT_AMOUNT_FOR_ALL_POLICYHOLDERS,
                                         n_years=N_MONTHS)
toc = perf_counter()
t, units = get_time(t=toc-tic, dp=2)
print(f"Time taken (setting up the simulator): {t} {units}.")


Time taken (setting up the simulator): 0.0 seconds.


## 3.1. Run simulation

In [13]:
tic = perf_counter()
liability_portfolios = liability_simulator.simulate(n=N_SIMULATIONS)
toc = perf_counter()
t, units = get_time(t=toc-tic, dp=2)
print(f"Time taken (creating {N_SIMULATIONS} simulations - non-parallel): {t} {units}.")


Simulating liability portfolio - non-parallel: 100%|██████████| 1000/1000 [00:02<00:00, 394.26it/s]

Time taken (creating 1000 simulations - non-parallel): 2.54 seconds.





## 3.2. Get results

In [14]:
tic = perf_counter()
df_liability_simulations = liability_simulator.get_projections_df_across_scenarios(
    liability_portfolios=liability_portfolios,
)


toc = perf_counter()
t, units = get_time(t=toc-tic, dp=2)
print(f"Time taken (packaging results for {N_SIMULATIONS} simulations): {t} {units}.")
display(df_liability_simulations)


df_smoker = df_liability_simulations.filter(pl.col("Group") == "Smoker")
df_nonsmoker = df_liability_simulations.filter(pl.col("Group") == "Non-Smoker")

df_smoker_pd    = pd.DataFrame(df_smoker.to_dicts())
df_nonsmoker_pd = pd.DataFrame(df_nonsmoker.to_dicts())

with pd.ExcelWriter("simulations_data.xlsx", engine="xlsxwriter") as writer:
    df_smoker_pd.to_excel(writer, sheet_name="Smoker", index=False)
    df_nonsmoker_pd.to_excel(writer, sheet_name="Non-Smoker", index=False)


Packaging results: 100%|██████████| 1000/1000 [01:17<00:00, 12.89it/s]


Time taken (packaging results for 1000 simulations): 1.37 minutes.


Month,Scenario #,Group,# Alive at Start,# Deaths,# Alive at End,Loss ($)
i64,i64,str,i64,i64,i64,f64
0,0,"""Non-Smoker""",807,0,807,0.0
0,0,"""Smoker""",193,0,193,0.0
0,1,"""Non-Smoker""",812,6,806,600000.0
0,1,"""Smoker""",188,5,183,500000.0
0,2,"""Smoker""",198,5,193,500000.0
…,…,…,…,…,…,…
60,997,"""Non-Smoker""",390,3,387,300000.0
60,998,"""Smoker""",74,2,72,200000.0
60,998,"""Non-Smoker""",381,6,375,600000.0
60,999,"""Smoker""",75,4,71,400000.0
