# Final assesment

In [5]:
import pandas as pd
import numpy as np
from numpy import random as rdm
wd = r"C:/Documentos/University/Econometrics/Thesis/"
#wd = r"C:/Users/Data Science/Downloads/Thesis/"

# Simulation Class

In [40]:
class Simulation:
    def __init__(self, size, start_age, max_age, num_simulations, value_upfront, seed):
        """
        Initialize the population simulator with multiple populations
        
        Args:
            size (int): Number of individuals per population (n in the paper)
            start_age (int): Starting age of the population (x in the paper, default Dutch retirement age is 67)
            max_age (int): Maximum age to simulate
            num_simulations (int): Number of populations to simulate
            value_upfront (float): Value each member pays to join the GSA (v_N_i(0) in the paper)
            seed (int): random seed
        """
        # Initialize the Population parameters
        self.size = size  # n in the paper
        self.start_age = start_age  # x in the paper
        self.max_age = max_age
        self.num_simulations = num_simulations
        
        self.timeline = self.max_age - self.start_age
        # L_x+t in the paper (equation 3) - tracks surviving members
        self.alive = np.zeros((self.num_simulations, self.size, self.timeline), dtype=int)
        
        # Initialize portfolio values
        self.value_upfront = value_upfront  # v_N_i(0) in the paper
        # v_N_i(t) in the paper - nominal account value for each member
        self.v_N_i = self.alive.copy()
        # V_N(t) in the paper - total nominal portfolio value
        self.V_N = self.alive[:,0].copy()
        # B_i(t) in the paper (equation 7) - nominal cashflows
        self.cashflows = self.alive.copy()
        
        # Initialize random generator
        self.rng = rdm.default_rng(seed)
        
        # Initialize survival probabilities array
        self.survival_probabilities = None
        
        # Initialize variables for real values
        self.v_R_i = None  # Real account value for each member
        self.V_R = None    # Real total portfolio value
        self.B_R_i = None  # Real cashflows (equation 20)
        
        # Stability tolerance parameters (section 1.6)
        self.epsilon_1 = 0.1  # Nominal lower tolerance
        self.epsilon_2 = 0.1  # Nominal upper tolerance
        self.epsilon_R_1 = 0.15  # Real lower tolerance
        self.epsilon_R_2 = 0.15  # Real upper tolerance
        self.epsilon_P_1 = 0.1  # Cumulative real lower tolerance
        self.epsilon_P_2 = 0.1  # Cumulative real upper tolerance
        
    def set_stability_tolerances(self, epsilon_1, epsilon_2, epsilon_R_1, epsilon_R_2, epsilon_P_1, epsilon_P_2):
        """
        Set the tolerance levels for stability checks as defined in section 1.6 of the paper.
        
        Args:
            epsilon_1 (float): Nominal lower tolerance (equation 21)
            epsilon_2 (float): Nominal upper tolerance (equation 21)
            epsilon_R_1 (float): Real lower tolerance (equation 22)
            epsilon_R_2 (float): Real upper tolerance (equation 22)
            epsilon_P_1 (float): Cumulative real lower tolerance (equation 23)
            epsilon_P_2 (float): Cumulative real upper tolerance (equation 23)
        """
        self.epsilon_1 = epsilon_1
        self.epsilon_2 = epsilon_2
        self.epsilon_R_1 = epsilon_R_1
        self.epsilon_R_2 = epsilon_R_2
        self.epsilon_P_1 = epsilon_P_1
        self.epsilon_P_2 = epsilon_P_2
        
    def set_survival_probabilities(self, survival_probs):
        """
        Set the age-based survival probabilities (p_x+t in the paper, equation 4)
        
        Args:
            survival_probs (np.ndarray): Array of shape (n, 2) where each row contains 
                [age, probability] pairs
        """
        # Check if the provided probabilities start at start_age
        if survival_probs[0, 0] != self.start_age:
            # Find the index where age >= start_age
            start_idx = np.searchsorted(survival_probs[:, 0], self.start_age)
            survival_probs = survival_probs[start_idx:]
        
        # Ensure there are enough probabilities for the timeline-1 transitions
        if len(survival_probs) < self.timeline - 1:
            raise ValueError("Insufficient survival probabilities for the simulation timeline.")
        
        # Convert survival probabilities to float64
        self.survival_probabilities = survival_probs.astype(np.float64)
        
    def run_mortality_simulation(self):
        """
        Run the population simulation using vectorized binomial sampling and cumulative product.
        Implements equation (5) from the paper: L_x+t ~ Bin(L_x+t-1, p_x+t-1)
        """
        if self.survival_probabilities is None:
            raise ValueError("Survival probabilities have not been set.")
        
        # Extract relevant survival probabilities (timeline-1 entries)
        relevant_probs = self.survival_probabilities[:self.timeline-1, 1]
        
        # Generate all survival trials upfront using binomial distribution (equation 5)
        # Shape: num_simulations x size x timeline-1
        trials = self.rng.binomial(n=1, p=relevant_probs, size=(self.num_simulations, self.size, self.timeline-1))
        
        # Initialize the alive array with ones at t=0 (all members start alive)
        self.alive[..., 0] = 1
        
        # Compute cumulative product of trials across the timeline dimension
        # This ensures individuals remain dead after their first death
        cumulative_trials = np.ones((self.num_simulations, self.size, self.timeline), dtype=int)
        cumulative_trials[..., 1:] = trials
        self.alive = np.cumprod(cumulative_trials, axis=2)
    
    def calculate_cashflows_and_portfolio(self, interest_rates, annuity_discount):
        """
        Calculate cash flows and update portfolio values according to the paper's model.
        
        Args:
            interest_rates (np.ndarray): R_t in the paper (equation 11) - 1D array of interest rates
            annuity_discount (float): discount rate r for annuity calculation (equation 8-9)
        """
        # Calculate annuity factors (ä_x+t in equation 8)
        annuity_factors = self.calculate_apv(annuity_discount)
        
        # Ensure annuity factors are positive and non-zero
        annuity_factors = np.maximum(annuity_factors, 1e-10)
        
        timeline = self.timeline
        num_sim = self.num_simulations
        size = self.size

        # Initialize portfolio values at t=0 (equation: v_N_i(0) = value_upfront)
        self.v_N_i[..., 0] = self.alive[..., 0] * self.value_upfront

        for t in range(timeline - 1):
            # Current portfolio values for each member at time t
            current_portfolio = self.v_N_i[..., t]
            
            # Ensure positive portfolio values (avoid negative balances)
            current_portfolio = np.maximum(current_portfolio, 0)

            # Calculate cashflows B_i(t) using equation (7)
            # B_i(t) = I{T_i > t} * v_N_i(t) / ä_x+t
            B_t = (current_portfolio / annuity_factors[t]) * self.alive[..., t]
            
            # Ensure cashflows do not exceed the portfolio value and are non-negative
            B_t = np.minimum(B_t, current_portfolio)
            B_t = np.maximum(B_t, 0)
            self.cashflows[..., t] = B_t

            # Subtract cashflow and apply interest rate to the remaining balance
            remaining_after_cashflow = current_portfolio - B_t
            remaining_after_cashflow = np.maximum(remaining_after_cashflow, 0)  # Prevent negative values
            
            R_t = interest_rates[t]
            temp = remaining_after_cashflow * (1 + R_t)

            # Determine deceased members between t and t+1
            deceased_mask = (self.alive[..., t] == 1) & (self.alive[..., t+1] == 0)
            
            # Calculate total remaining funds from deceased members (after cashflow and return)
            total_deceased = np.sum(temp * deceased_mask, axis=1, keepdims=True)
            
            # Count survivors (denominator of equation 13)
            survivors = np.sum(self.alive[..., t+1], axis=1, keepdims=True)
            survivors_safe = np.maximum(survivors, 1e-10)  # Avoid division by zero

            # Compute longevity credit (λ(t+1) in equation 13)
            longevity_credit = total_deceased / survivors_safe
            longevity_credit = np.maximum(longevity_credit, 0)  # Ensure non-negative

            # Update portfolio values for survivors: add longevity credit
            self.v_N_i[..., t+1] = (temp + longevity_credit * self.alive[..., t+1]) * self.alive[..., t+1]

        # Set final period cashflows (distribute remaining portfolio)
        t = timeline - 1
        current_portfolio = self.v_N_i[..., t]
        current_portfolio = np.maximum(current_portfolio, 0)
        self.cashflows[..., t] = current_portfolio * self.alive[..., t]

        # Update total portfolio (V_N(t) from equation 10)
        for t in range(timeline):
            self.V_N[:, t] = np.sum(self.v_N_i[:, :, t], axis=1)
    
    def real_cashflows(self, inflation_rates):
        """
        Convert nominal values to real terms using inflation rates (equation 1 and 20).
        
        Args:
            inflation_rates (np.ndarray): 1D array of I_t = CPI_t/CPI_0 for each t (equation 2).
        """
        # Reshape for broadcasting
        inflation_3d = inflation_rates[np.newaxis, np.newaxis, :]
        inflation_2d = inflation_rates[np.newaxis, :]

        # Equation (1): V_R(t) = V_N(t) / I_t
        self.v_R_i = self.v_N_i / inflation_3d  # Real account value for each member
        self.V_R = self.V_N / inflation_2d      # Real total portfolio value
        
        # Equation (20): B_R_i(t) = B_i(t) / I_t
        self.B_R_i = self.cashflows / inflation_3d  # Real cashflows

    def check_stability(self, max_age=None):
        """
        Check what percentage of simulations meet stability requirements defined in equations (21), (22), and (23)
        up to a specified age.
        
        Args:
            max_age (int, optional): Maximum age until which stability is checked. 
                                    If None, checks until the end of simulation.
        
        Returns:
            tuple: (nominal_stable_pct, real_stable_pct, cumulative_real_stable_pct)
                - nominal_stable_pct: Percentage of simulations where nominal cashflows remain within tolerance
                - real_stable_pct: Percentage of simulations where real cashflows remain within tolerance
                - cumulative_real_stable_pct: Percentage of simulations where cumulative real cashflows remain within tolerance
        """
        if self.B_R_i is None:
            raise ValueError("Real cashflows have not been calculated. Run real_cashflows first.")
        
        # Convert max_age to corresponding time index if provided
        if max_age is not None:
            if max_age < self.start_age:
                raise ValueError(f"max_age must be at least {self.start_age}")
            # Calculate the timeline index corresponding to max_age
            t_max = min(max_age - self.start_age + 1, self.timeline)
        else:
            t_max = self.timeline
        
        # Get initial cashflows (t=0) for each simulation and member
        initial_cashflows = self.cashflows[..., 0]
        
        # Calculate bounds for stability checks
        # Equation (21): B_i(t) ∈ [(1-ε_1)B_i(0), (1+ε_2)B_i(0)]
        lower_bound_nominal = (1 - self.epsilon_1) * initial_cashflows
        upper_bound_nominal = (1 + self.epsilon_2) * initial_cashflows
        
        # Equation (22): B_R_i(t) ∈ [(1-ε_R_1)B_i(0), (1+ε_R_2)B_i(0)]
        lower_bound_real = (1 - self.epsilon_R_1) * initial_cashflows
        upper_bound_real = (1 + self.epsilon_R_2) * initial_cashflows
        
        # Check stability only up to t_max
        nominal_stable = np.ones((self.num_simulations, self.size), dtype=bool)
        real_stable = np.ones((self.num_simulations, self.size), dtype=bool)
        
        # For each time period up to t_max, check if cashflows remain within bounds
        for t in range(t_max):
            # For nominal cashflows: check if within bounds AND the person is alive
            current_nominal = self.cashflows[..., t]
            alive_mask = self.alive[..., t] == 1
            
            # Only consider stability violations for living individuals
            nominal_in_bounds = ((current_nominal >= lower_bound_nominal) & 
                                (current_nominal <= upper_bound_nominal))
            
            # Consider criteria met for deceased individuals (avoid counting them in violations)
            nominal_valid = nominal_in_bounds | ~alive_mask
            
            # If ANY time period violates stability, mark the simulation-member pair as unstable
            nominal_stable = nominal_stable & nominal_valid
            
            # Same process for real cashflows
            current_real = self.B_R_i[..., t]
            real_in_bounds = ((current_real >= lower_bound_real) & 
                            (current_real <= upper_bound_real))
            real_valid = real_in_bounds | ~alive_mask
            real_stable = real_stable & real_valid
        
        # Calculate percentage of simulation-members that maintained stability
        # First, filter to only consider individuals who were alive at t=0
        initially_alive = self.alive[..., 0] == 1
        
        # Calculate percentages
        total_initially_alive = np.sum(initially_alive)
        nominal_stable_count = np.sum(nominal_stable & initially_alive)
        real_stable_count = np.sum(real_stable & initially_alive)
        
        nominal_stable_pct = (nominal_stable_count / total_initially_alive) * 100 if total_initially_alive > 0 else 0
        real_stable_pct = (real_stable_count / total_initially_alive) * 100 if total_initially_alive > 0 else 0
        
        # For cumulative stability check (equation 23)
        # Only consider cashflows up to t_max
        cum_real_cashflows = np.sum(self.B_R_i[..., :t_max], axis=2)
        initial_value = self.value_upfront
        lower_bound_cum = (1 - self.epsilon_P_1) * initial_value
        upper_bound_cum = (1 + self.epsilon_P_2) * initial_value
        
        # Check if cumulative cashflows are within bounds for initially alive members
        cumulative_real_stable = ((cum_real_cashflows >= lower_bound_cum) & 
                                (cum_real_cashflows <= upper_bound_cum))
        
        cumulative_real_stable_count = np.sum(cumulative_real_stable & initially_alive)
        cumulative_real_stable_pct = (cumulative_real_stable_count / total_initially_alive) * 100 if total_initially_alive > 0 else 0
        
        return nominal_stable_pct, real_stable_pct, cumulative_real_stable_pct
    
    def get_live_people(self) -> np.ndarray:
        """
        Calculate number of live individuals at each time period for all simulations.
        Returns L_x+t from equation (3) summed over all members.
        
        Returns:
            np.ndarray: Shape (timeline, num_simulations)
        """
        return self.alive.sum(axis=1).T
    
    def get_empirical_survival_probability(self) -> np.ndarray:
        """
        Calculate empirical survival probabilities (ˆp_x+t in equation 6) for each simulation at every time t.
        
        Returns:
            np.ndarray: Shape (timeline, num_simulations)
        """
        live = self.get_live_people()  # (timeline, simulations)
        empirical_p = np.zeros_like(live, dtype=np.float64)
        
        # Compute p(t) = L(t+1)/L(t) with division-by-zero protection (equation 6)
        np.divide(live[1:], live[:-1], out=empirical_p[:-1], where=live[:-1] != 0)
        return empirical_p
    
    def calculate_apv(self, discount_rate: float) -> np.ndarray:
        """
        Calculate the Actuarial Present Value (ä_x+t in equation 8).
        
        Args:
            discount_rate (float): Discount rate r for APV calculation
            
        Returns:
            np.ndarray: Annuity factors for each time period
        """
        if self.survival_probabilities is None:
            raise ValueError("Survival probabilities not set.")
        
        # Extract survival probabilities p_x+t
        p = self.survival_probabilities[:self.timeline-1, 1]
        timeline = self.timeline
        annuity_factors = np.zeros(timeline)
        
        # Calculate ä_x+t according to equation (8)
        for t in range(timeline):
            max_j = timeline - t
            # Calculate j_p_x+t (probability of surviving j more years at age x+t)
            jp = np.ones(max_j)
            for j in range(1, max_j):
                if t + j - 1 < len(p):
                    jp[j] = jp[j-1] * p[t + j - 1]
                else:
                    jp[j] = 0.0
                    
            # Calculate discount factors 1/(1+r)^j
            discount_factors = 1 / (1 + discount_rate) ** np.arange(max_j)
            
            # Calculate annuity factor as sum of discounted survival probabilities
            annuity_factors[t] = np.sum(jp * discount_factors)
            
        return annuity_factors
        

    def relevant_metrics(self):
        """
        Compute relevant metrics across simulations and return two DataFrames.
        Means are calculated only over living members using vectorized operations.
        
        Returns:
            tuple: 
                - pd.DataFrame: Metrics over time with mean and variance of payments, portfolios, and people alive.
                - pd.DataFrame: Aggregated sum of cashflows with mean, variance, and original value upfront.
        """
        timeline = self.timeline
        index = pd.Index([self.start_age + t for t in range(timeline)], name='Age')
        
        # Precompute live_people array (timeline x num_simulations)
        live_people = self.get_live_people()  # Shape (timeline, num_simulations)
        
        # Initialize lists to collect data for the first DataFrame
        metrics_data = []
        for t in range(timeline):
            # Get number of alive people for current time t across simulations
            alive_t = live_people[t, :]
            mean_alive = np.mean(alive_t)
            var_alive = np.var(alive_t, ddof=1) if len(alive_t) > 1 else 0
            
            # Get alive mask for current time t (shape: num_simulations x size)
            alive_mask = self.alive[:, :, t]  # 1 for alive, 0 for dead
            
            # Get data for current time t
            cashflows_t = self.cashflows[:, :, t]
            cashflows_real_t = self.B_R_i[:, :, t] if self.B_R_i is not None else None
            portfolio_personal_t = self.v_N_i[:, :, t]
            portfolio_total_t = self.V_N[:, t]
            
            # Get real portfolio data if available
            portfolio_personal_real_t = self.v_R_i[:, :, t] if self.v_R_i is not None else None
            portfolio_total_real_t = self.V_R[:, t] if self.V_R is not None else None
            
            # Calculate means for each simulation, only considering alive members
            # For each simulation, sum the values for alive members and divide by count of alive members
            
            # Sum of cashflows for alive members in each simulation
            cashflow_sums = np.sum(cashflows_t * alive_mask, axis=1)
            # Count of alive members in each simulation
            alive_counts = np.sum(alive_mask, axis=1)
            # Safe division (avoid divide by zero)
            alive_counts_safe = np.maximum(alive_counts, 1)
            # Mean cashflow per alive member in each simulation
            sim_means_nom = cashflow_sums / alive_counts_safe
            
            # Mean and variance across simulations
            mean_payment_nom = np.mean(sim_means_nom)
            var_payment_nom = np.var(sim_means_nom, ddof=1) if len(sim_means_nom) > 1 else 0
            
            # Same approach for real cashflows if available
            if cashflows_real_t is not None:
                cashflow_real_sums = np.sum(cashflows_real_t * alive_mask, axis=1)
                sim_means_real = cashflow_real_sums / alive_counts_safe
                mean_payment_real = np.mean(sim_means_real)
                var_payment_real = np.var(sim_means_real, ddof=1) if len(sim_means_real) > 1 else 0
            else:
                mean_payment_real = None
                var_payment_real = None
            
            # Same approach for personal portfolio (nominal)
            portfolio_sums = np.sum(portfolio_personal_t * alive_mask, axis=1)
            sim_means_personal = portfolio_sums / alive_counts_safe
            mean_port_personal = np.mean(sim_means_personal)
            var_port_personal = np.var(sim_means_personal, ddof=1) if len(sim_means_personal) > 1 else 0
            
            # Same approach for personal portfolio (real) if available
            if portfolio_personal_real_t is not None:
                portfolio_real_sums = np.sum(portfolio_personal_real_t * alive_mask, axis=1)
                sim_means_personal_real = portfolio_real_sums / alive_counts_safe
                mean_port_personal_real = np.mean(sim_means_personal_real)
                var_port_personal_real = np.var(sim_means_personal_real, ddof=1) if len(sim_means_personal_real) > 1 else 0
            else:
                mean_port_personal_real = None
                var_port_personal_real = None
            
            # Total portfolio is already summed over all members (nominal)
            mean_port_total = np.mean(portfolio_total_t)
            var_port_total = np.var(portfolio_total_t, ddof=1) if len(portfolio_total_t) > 1 else 0
            
            # Total portfolio (real) if available
            if portfolio_total_real_t is not None:
                mean_port_total_real = np.mean(portfolio_total_real_t)
                var_port_total_real = np.var(portfolio_total_real_t, ddof=1) if len(portfolio_total_real_t) > 1 else 0
            else:
                mean_port_total_real = None
                var_port_total_real = None
            
            # Append the row data
            metrics_data.append([
                mean_payment_nom, mean_payment_real,
                var_payment_nom, var_payment_real,
                mean_port_personal, mean_port_personal_real,
                var_port_personal, var_port_personal_real,
                mean_port_total, mean_port_total_real,
                var_port_total, var_port_total_real,
                mean_alive, var_alive
            ])
        
        # Create the first DataFrame
        columns_df1 = [
            'mean_payment_nominal', 'mean_payment_real',
            'var_payment_nominal', 'var_payment_real',
            'mean_portfolio_personal_nominal', 'mean_portfolio_personal_real',
            'var_portfolio_personal_nominal', 'var_portfolio_personal_real',
            'mean_portfolio_total_nominal', 'mean_portfolio_total_real',
            'var_portfolio_total_nominal', 'var_portfolio_total_real',
            'mean_people_alive', 'var_people_alive'
        ]
        df1 = pd.DataFrame(metrics_data, index=index, columns=columns_df1)
        
        # Calculate for the second DataFrame
        # For total sums, we want to include all payments regardless of alive status
        sum_nominal = self.cashflows.sum(axis=(1, 2))
        sum_real = self.B_R_i.sum(axis=(1, 2)) if self.B_R_i is not None else None
        
        mean_sum_nom = np.mean(sum_nominal)
        var_sum_nom = np.var(sum_nominal, ddof=1) if len(sum_nominal) > 1 else 0
        
        if sum_real is not None:
            mean_sum_real = np.mean(sum_real)
            var_sum_real = np.var(sum_real, ddof=1) if len(sum_real) > 1 else 0
        else:
            mean_sum_real = None
            var_sum_real = None
        
        value_upfront_total = self.size * self.value_upfront  # Total upfront per simulation
        
        # Create the second DataFrame
        data_df2 = [
            [mean_sum_nom, mean_sum_real],
            [var_sum_nom, var_sum_real],
            [value_upfront_total, value_upfront_total]
        ]
        df2 = pd.DataFrame(
            data_df2,
            index=['Mean', 'Variance', 'Value Upfront'],
            columns=['Nominal', 'Real']
        )
        
        return df1, df2

    def percentiles(self):
        """
        Compute percentiles for various metrics across simulations and return DataFrames in a dictionary.
        
        Returns:
            dict: A dictionary of DataFrames containing percentiles for different metrics.
                Keys include 'year_income_nominal', 'year_income_nominal_alive',
                'year_income_real', 'year_income_real_alive', 'total_cashflow_nominal',
                and 'total_cashflow_real'.
        """
        percentiles = np.arange(0, 101, 5)
        timeline = self.timeline
        ages = [self.start_age + t for t in range(timeline)]
        dfs = {}
        
        # Initialize lists to hold percentile data for each time period
        year_income_nominal = []
        year_income_nominal_alive = []
        year_income_real = []
        year_income_real_alive = []
        
        for t in range(timeline):
            # Nominal year income (all members, including dead)
            data_nom = self.cashflows[:, :, t].flatten()
            perc_nom = np.percentile(data_nom, percentiles)
            year_income_nominal.append(perc_nom)
            
            # Nominal year income given alive (only alive members)
            alive_mask = self.alive[:, :, t].flatten().astype(bool)
            data_nom_alive = data_nom[alive_mask]
            if data_nom_alive.size > 0:
                perc_nom_alive = np.percentile(data_nom_alive, percentiles)
            else:
                perc_nom_alive = np.full(len(percentiles), np.nan)
            year_income_nominal_alive.append(perc_nom_alive)
            
            # Real year income (all members, including dead if real data exists)
            if self.B_R_i is not None:
                data_real = self.B_R_i[:, :, t].flatten()
                perc_real = np.percentile(data_real, percentiles)
            else:
                perc_real = np.full(len(percentiles), np.nan)
            year_income_real.append(perc_real)
            
            # Real year income given alive (only alive members if real data exists)
            if self.B_R_i is not None:
                data_real_alive = self.B_R_i[:, :, t].flatten()[alive_mask]
                if data_real_alive.size > 0:
                    perc_real_alive = np.percentile(data_real_alive, percentiles)
                else:
                    perc_real_alive = np.full(len(percentiles), np.nan)
            else:
                perc_real_alive = np.full(len(percentiles), np.nan)
            year_income_real_alive.append(perc_real_alive)
        
        # Create DataFrames for year income metrics
        columns = [f'{p}%' for p in percentiles]
        index = pd.Index(ages, name='Age')
        
        dfs['year_income_nominal'] = pd.DataFrame(year_income_nominal, index=index, columns=columns)
        dfs['year_income_nominal_alive'] = pd.DataFrame(year_income_nominal_alive, index=index, columns=columns)
        dfs['year_income_real'] = pd.DataFrame(year_income_real, index=index, columns=columns) if self.B_R_i is not None else pd.DataFrame()
        dfs['year_income_real_alive'] = pd.DataFrame(year_income_real_alive, index=index, columns=columns) if self.B_R_i is not None else pd.DataFrame()
        
        # Non-time metrics: total cashflows (sum over all time periods)
        if self.cashflows is not None:
            sum_nominal = self.cashflows.sum(axis=2).flatten()
            perc_total_nom = np.percentile(sum_nominal, percentiles)
            dfs['total_cashflow_nominal'] = pd.DataFrame([perc_total_nom], columns=columns, index=['Total'])
        else:
            dfs['total_cashflow_nominal'] = pd.DataFrame()
        
        if self.B_R_i is not None:
            sum_real = self.B_R_i.sum(axis=2).flatten()
            perc_total_real = np.percentile(sum_real, percentiles)
            dfs['total_cashflow_real'] = pd.DataFrame([perc_total_real], columns=columns, index=['Total'])
        else:
            dfs['total_cashflow_real'] = pd.DataFrame()
        
        return dfs

## Life Table

In [7]:
life_table = pd.read_excel(wd + "Data/life_tables.xlsx")
table = life_table[life_table['Year'] == 2022]
table['px'] = 1 - table.loc[: ,'qx']
true_survival = table.loc[:,['Age', 'px']].to_numpy()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  table['px'] = 1 - table.loc[: ,'qx']


## Interest and inflation

In [8]:
interet_rate = pd.read_csv(wd + "Data/(03-01-25)_Market_interest_rates_(Month).csv")
CPI_table = pd.read_excel(wd + "Data/CPI_netherlands.xlsx", sheet_name="Monthly")
CPI_yearly = CPI_table.groupby(CPI_table['observation_date'].dt.year, as_index=False).first()
interest_rate_Netherlands = interet_rate.loc[(interet_rate["sub item"] == "10 year") &
                                             (interet_rate["region/currency"] == "Netherlands") ][["Period", "waarde"]]

interest_rate_Netherlands["Period"] = pd.to_datetime(interest_rate_Netherlands["Period"], format="%Y-%m")
interest_rate_Netherlands = interest_rate_Netherlands.sort_values(by="Period", ascending=False).reset_index().drop("index", axis =1)
annuity = interest_rate_Netherlands.mean()


# Forecast

In [9]:
from arch import arch_model
from scipy.stats import t
from scipy.stats import Normal

### Combined df

In [10]:
series_combined = pd.merge(interest_rate_Netherlands, CPI_table, left_on="Period", right_on="observation_date", how="left")
series_combined = series_combined.drop("observation_date", axis=1)
series_combined.set_index("Period", inplace=True)
series_combined.sort_index(inplace=True)
series_combined = series_combined.asfreq('MS')
series_combined.rename(columns={"waarde": "InterestRate", "NLDCPIALLMINMEI" : "CPI"}, inplace=True)
series_combined = series_combined.interpolate(method='linear', limit_direction='both')
#series_combined["CPI"] = series_combined["CPI"]/series_combined["CPI"].iloc[0]

series_combined["CPI"] = series_combined["CPI"].diff(1)
series_combined.fillna(0, inplace=True)
series_combinede = series_combined.copy()
#series_combined["CPI"] = series_combined["CPI"] * 100
series_combined["InterestRate"] = series_combined["InterestRate"].diff(1)
series_combined.fillna(0, inplace=True)

#### Forecast parameters

In [11]:
min_age = 67
max_age = 110
duration_gsa = max_age - min_age

forecast_steps = duration_gsa * 12

## VECM Forecast

In [12]:
from statsmodels.tsa.vector_ar.vecm import VECM, select_coint_rank

# Step 1: Test for Cointegration using Johansen Test
rank_test = select_coint_rank(
    series_combinede, 
    det_order=-1,  # No deterministic terms
    k_ar_diff=12   # Max lag for testing
)
print(f"Cointegration Rank: {rank_test.rank}")


Cointegration Rank: 1


In [13]:

# Step 2: Fit VECM
rank = rank_test.rank  # Use the detected rank
vecm = VECM(
    series_combinede,
    k_ar_diff=12,         # Lag order
    coint_rank=rank,      # Cointegration rank from test
    deterministic="ci"    # Include constant in cointegration
)
vecm_fit = vecm.fit()

# Step 3: Extract VECM Residuals
residualsv = pd.DataFrame(vecm_fit.resid)


residualsv.rename(columns={0:"InterestRate", 1:"CPI"}, inplace=True)
print(residualsv.head())

   InterestRate       CPI
0      0.068883  0.112042
1      0.136027 -0.263986
2     -0.118063  0.236357
3      0.090628 -0.203470
4     -0.147223 -0.426507


In [14]:

# Step 4: Fit GARCH Models on Residuals
# CPI Residuals
garch_cpi = arch_model(residualsv["CPI"], mean="Zero", vol="GARCH", p=1, q=1, dist="t")
garch_cpi_fit = garch_cpi.fit(update_freq=0, disp="off")

# Interest Rate Residuals
garch_ir = arch_model(residualsv["InterestRate"], mean="Zero", vol="GARCH", p=1, q=1, dist="t")
garch_ir_fit = garch_ir.fit(update_freq=0, disp="off")


estimating the model parameters. The scale of y is 0.02087. Parameter
estimation work better when this value is between 1 and 1000. The recommended
rescaling is 10 * y.

model or by setting rescale=False.



### VECM engine

In [15]:
forecastsv = []
last_obs = series_combined.values[-vecm_fit.k_ar:]
garch_shocks = []

for _ in range(forecast_steps):
    # 1. VECM Forecast
    # Pass None for exog_fc when the model has no exogenous variables
    fc = vecm_fit.predict(steps=1, exog_fc=None)
    
    # 2. Simulate GARCH Shocks
    # CPI Shock
    cpi_var = garch_cpi_fit.forecast(horizon=1, reindex=False).variance.iloc[-1, 0]
    eps_cpi = np.sqrt(cpi_var) * t.rvs(garch_cpi_fit.params["nu"])
    
    # Interest Rate Shock
    ir_var = garch_ir_fit.forecast(horizon=1, reindex=False).variance.iloc[-1, 0]
    eps_ir = np.sqrt(ir_var) * t.rvs(garch_ir_fit.params["nu"])
    
    # Store the shocks
    shock_vector = np.array([eps_cpi, eps_ir])
    garch_shocks.append(shock_vector)
    
    # Combine Forecasts and Shocks - ensure proper array handling
    # Convert fc to numpy array if it's not already
    fc_array = np.array(fc).flatten()  # Ensure it's a flat array
    
    # Add the shocks to the forecast
    full_fc = fc_array + shock_vector
    forecastsv.append(full_fc)
    
    # Update VECM history for next iteration
    # Remove oldest observation and add the new forecast
    last_obs = np.vstack([last_obs[1:], full_fc.reshape(1, -1)])
    
    # If VECM model requires updating with new data, do it here
    # This might involve re-estimating the model or updating internal state

# Convert lists to numpy arrays for easier handling
forecasts_array = np.array(forecastsv)
garch_shocks_array = np.array(garch_shocks)

In [16]:
forecast_dates = pd.date_range(
    start=series_combined.index[-1] + pd.DateOffset(months=1),
    periods=forecast_steps,
    freq="MS"
)


In [17]:

forecast_dfvecm = pd.DataFrame(
    np.vstack(forecastsv),
    index=forecast_dates,
    columns=[ "CPI_Forecast", "InterestRate_Forecast"]
)

#### Forecast Exporting

In [18]:
forecast_exportvecm = forecast_dfvecm.copy()

CPI_0 = 131.480000000000000
i_0 = 2.567
forecast_exportvecm["CPI_Forecast"] = forecast_exportvecm["CPI_Forecast"].cumsum() + CPI_0
#forecast_exportvecm["InterestRate_Forecast"] = forecast_exportvecm["InterestRate_Forecast"].cumsum() + i_0
#forecast_exportvecm.to_excel(wd + "Data/forecastvecm.xlsx", index=True)

# Test Simulation

In [19]:
from scipy.stats import skewnorm

def rsn(n, mean=0, sd=1, skew=0):
    """Generate a negatively skewed distribution with specified mean, standard deviation, and skewness."""
    a = -skew  # Negative `a` creates left skew (right tail is thinner)
    delta = a / np.sqrt(1 + a**2)   
    loc = mean - sd * delta * np.sqrt(2 / np.pi)
    scale = sd * np.sqrt(1 - 2 * delta**2 / np.pi)
    return skewnorm.rvs(a, loc=loc, scale=scale, size=n)

In [20]:
GSA_TEST = Simulation(size=1000, start_age=min_age, max_age=max_age,
                      num_simulations=1000, value_upfront=120_000, seed=42)
GSA_TEST.set_survival_probabilities(true_survival)

### Liquidity Shocks

In [21]:
GSA_TEST.run_mortality_simulation()

series_for_simulation = forecast_dfvecm.groupby(forecast_dfvecm.index.year).first()
series_for_simulation = series_for_simulation.sort_index(ascending=False).reset_index()

inflation_forecast = ((series_for_simulation["CPI_Forecast"].cumsum() + CPI_0)/CPI_0)
inflation_forecast = inflation_forecast.values
skew_you = rsn(len(series_for_simulation['InterestRate_Forecast']), mean=1, sd=0.5, skew=1.2)
return_rate = (series_for_simulation['InterestRate_Forecast'] + skew_you)/100
return_rate = return_rate.values 

inflation_forecast = inflation_forecast[:-2]

inflation_forecast = np.insert(inflation_forecast, 0, 1.0)  
return_rate = np.insert(return_rate, 0, i_0/100)
print(max(return_rate))

0.026921561040976694


In [22]:
annuity = interest_rate_Netherlands.mean().values[1]/100
print(annuity)
GSA_TEST.calculate_cashflows_and_portfolio(return_rate[:-2], annuity)
GSA_TEST.real_cashflows(inflation_forecast)

0.02367699300699301


In [23]:
results_time, results = GSA_TEST.relevant_metrics()

In [24]:
pmfs = GSA_TEST.percentiles()

# 80/20 Split simulation

### Stock data

In [25]:
stock_data = pd.read_csv(wd + "Data/AEX Historical Data.csv")
stock_data["Date"] = pd.to_datetime(stock_data["Date"])
stock_data = stock_data.set_index("Date")
stock_data = stock_data.sort_index()
stock_data = stock_data[["Price"]]

## ARIMA GARCH forecast

In [26]:
from statsmodels.tsa.arima.model import ARIMA


# 1. Data preparation
price_series = stock_data['Price']
# Calculate returns (typically used for GARCH modeling)
returns = price_series.pct_change().dropna() * 100  # In percentage

# 2. Fit ARIMA model to price series
# Starting with a simple ARIMA(1,1,1) model - you might need to tune this
arima_model = ARIMA(price_series, order=(0, 2, 1))
arima_fit = arima_model.fit()
print(arima_fit.summary())

# 3. Get residuals from ARIMA model for GARCH fitting
arima_resid = arima_fit.resid.dropna()

# 4. Fit GARCH model to residuals
# Using GARCH(1,1) with t-distribution for errors
garch_model = arch_model(arima_resid, vol='GARCH', p=2, q=2, dist='t')
garch_fit = garch_model.fit(disp='off')
print(garch_fit.summary())

# 5. Forecast function
def forecast_arima_garch(arima_fit, garch_fit, steps=30, last_obs=None):
    forecasts = []
    forecast_variance = []
    
    # Get the last observations if not provided
    if last_obs is None:
        last_obs = price_series.values
    
    # Make a copy to avoid modifying the original
    current_data = np.copy(last_obs)
    
    for _ in range(steps):
        # ARIMA forecast (one step ahead)
        arima_forecast = arima_fit.forecast(steps=1)
        arima_mean = arima_forecast[0]
        
        # GARCH forecast (one step ahead variance)
        garch_variance = garch_fit.forecast(horizon=1).variance.iloc[-1, 0]
        
        # Simulate shock from t-distribution
        t_shock = t.rvs(garch_fit.params["nu"]) * np.sqrt(garch_variance)
        
        # Combine ARIMA forecast with GARCH shock
        full_forecast = arima_mean + t_shock
        
        # Store forecasts
        forecasts.append(full_forecast)
        forecast_variance.append(garch_variance)
        
        # Update data for next iteration (add new forecast)
        current_data = np.append(current_data[1:], full_forecast)
        
        # Update ARIMA model (optional - can be computationally intensive)
        # This would be more accurate but slower:
        arima_model = ARIMA(current_data, order=(0,2,1))
        arima_fit = arima_model.fit()
    
    return np.array(forecasts), np.array(forecast_variance)


forecasts_ret, forecast_variance_ret = forecast_arima_garch(arima_fit, garch_fit, steps=forecast_steps)

# 7. Calculate confidence intervals
confidence_level = 0.95
t_critical = t.ppf((1 + confidence_level) / 2, garch_fit.params["nu"])
lower_bound = forecasts_ret - t_critical * np.sqrt(forecast_variance_ret)
upper_bound = forecasts_ret + t_critical * np.sqrt(forecast_variance_ret)

# 8. Prepare forecast dates for plotting





  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


                               SARIMAX Results                                
Dep. Variable:                  Price   No. Observations:                  287
Model:                 ARIMA(0, 2, 1)   Log Likelihood               -1309.469
Date:                Sat, 03 May 2025   AIC                           2622.939
Time:                        19:07:56   BIC                           2630.244
Sample:                    01-01-2001   HQIC                          2625.867
                         - 11-01-2024                                         
Covariance Type:                  opg                                         
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
ma.L1         -0.9815      0.018    -53.822      0.000      -1.017      -0.946
sigma2       566.8199     41.709     13.590      0.000     485.072     648.568
Ljung-Box (L1) (Q):                   0.03   Jarque-

  arima_mean = arima_forecast[0]


In [27]:
# 10. Print forecast results
forecast_df_ret = pd.DataFrame({
    'Date': forecast_dates,
    'Forecast': forecasts_ret,
    'Lower_Bound': lower_bound,
    'Upper_Bound': upper_bound,
    'Variance': forecast_variance_ret
})
print(forecast_df_ret)

          Date     Forecast  Lower_Bound  Upper_Bound    Variance
0   2024-12-01   904.234631   840.903911   967.565352  694.585509
1   2025-01-01   905.812740   842.482019   969.143460  694.585509
2   2025-02-01   918.787465   855.456744   982.118185  694.585509
3   2025-03-01   873.078700   809.747980   936.409421  694.585509
4   2025-04-01   879.245932   815.915211   942.576652  694.585509
..         ...          ...          ...          ...         ...
511 2067-07-01  2431.380951  2368.050230  2494.711672  694.585509
512 2067-08-01  2389.665629  2326.334909  2452.996350  694.585509
513 2067-09-01  2363.300150  2299.969430  2426.630871  694.585509
514 2067-10-01  2399.357132  2336.026411  2462.687853  694.585509
515 2067-11-01  2336.326163  2272.995443  2399.656884  694.585509

[516 rows x 5 columns]


In [28]:
yearly_return = forecast_df_ret.groupby(forecast_df_ret['Date'].dt.year)['Forecast'].first().pct_change() 
print(yearly_return)
yearly_return = yearly_return.values
yearly_return[0] = 0
len(yearly_return)

Date
2024         NaN
2025    0.001745
2026   -0.050164
2027    0.159815
2028    0.143155
2029    0.189826
2030    0.038490
2031   -0.023553
2032   -0.028226
2033    0.105562
2034    0.044070
2035    0.094364
2036    0.082034
2037    0.031966
2038   -0.029194
2039    0.089028
2040    0.027484
2041   -0.037560
2042   -0.031110
2043   -0.008803
2044   -0.010990
2045   -0.048943
2046    0.025054
2047   -0.005835
2048    0.078002
2049    0.024523
2050    0.061980
2051    0.078828
2052    0.037302
2053    0.091119
2054   -0.063230
2055   -0.026843
2056    0.001110
2057    0.044558
2058   -0.002610
2059    0.048469
2060   -0.067060
2061    0.034122
2062   -0.045727
2063    0.039938
2064   -0.033761
2065   -0.006919
2066    0.018835
2067    0.032565
Name: Forecast, dtype: float64


44

## GSA with investment

In [63]:
investment_weights = (0.8, 0.2)
GSA_I = Simulation(size=2000, start_age=min_age, max_age=max_age,
                      num_simulations=2000, value_upfront=120_000, seed=42)
GSA_I.set_survival_probabilities(true_survival)
GSA_I.run_mortality_simulation()
ret_withi = return_rate[:-2] * investment_weights[0] + yearly_return[:-1] * investment_weights[1]
GSA_I.calculate_cashflows_and_portfolio(ret_withi, annuity)
GSA_I.real_cashflows(inflation_forecast)

In [37]:
results_timei, resultsi = GSA_I.relevant_metrics()
percentilesi = GSA_I.percentiles()

In [64]:
GSA_I.set_stability_tolerances(0.1, 1, 0.2, 2, 0.05, 0.05)
print(GSA_I.check_stability(90))

(np.float64(84.862775), np.float64(29.270425), np.float64(7.488849999999999))


In [None]:
percentilesi['year_income_nominal_alive'].to_excel(wd + "Data/percentiles_investment.xlsx")
percentilesi['year_income_real_alive'].to_excel(wd + "Data/percentiles_investment_real.xlsx")

In [None]:
percentilesi['total_cashflow_nominal'].to_excel(wd + "Data/total_percentiles_nominal.xlsx")
percentilesi['total_cashflow_real'].to_excel(wd + "Data/total_percentiles_real.xlsx")

In [None]:

results_timei.to_excel(wd + "Data/second_batch_ofresults.xlsx")
resultsi.to_excel(wd + "Data/totals_second_batch_ofresults.xlsx")

In [None]:
stabilities = {}

In [69]:

for i in [6000, 7000, 8000, 9000, 10000]:
    GSAn = Simulation(size=i, start_age=min_age, max_age=max_age,
                      num_simulations=1000, value_upfront=120_000, seed=42)
    GSAn.set_survival_probabilities(true_survival)
    GSAn.run_mortality_simulation()
    GSAn.calculate_cashflows_and_portfolio(ret_withi, annuity)
    GSAn.real_cashflows(inflation_forecast)
    GSAn.set_stability_tolerances(0.1, 1, 0.2, 2, 0.05, 0.05)
    stabilities[f"{i}"] = GSAn.check_stability(90)
    del GSAn
    print(i)

KeyboardInterrupt: 

In [67]:
print(stabilities)

{'1000': (np.float64(81.8637), np.float64(29.3287), np.float64(7.7527)), '2000': (np.float64(84.64735), np.float64(29.318699999999996), np.float64(7.4712)), '3000': (np.float64(86.575), np.float64(29.489666666666665), np.float64(7.493266666666666)), '4000': (np.float64(86.971575), np.float64(29.478749999999998), np.float64(7.22655)), '5000': (np.float64(88.26458000000001), np.float64(29.547240000000002), np.float64(7.102060000000001))}
