In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import scipy.stats as stats

from scipy.linalg import svd
import statsmodels.api as sm

In [2]:
def save_matrix(mat, path, ages=range(35,65+1), years=range(2015,2022+1)):
    df = pd.DataFrame(mat, columns=[f"Age {i}" for i in ages], index=[f'Year {i}' for i in years])
    df.to_csv(path, sep=',', index=True, encoding='utf-8')

def plot_matrix(X, Y, Z, title="Taux de mortalité", save=None):
    X, Y = np.meshgrid(X, Y)
    
    # Create a 3D plot
    fig = plt.figure(figsize=(10, 7))
    ax = fig.add_subplot(111, projection='3d')
    
    # Plot the surface
    surface = ax.plot_surface(X, Y, Z, cmap='plasma', edgecolor='none')
    
    # Add a color bar which maps values to colors
    fig.colorbar(surface, shrink=0.5, aspect=10, pad=0.1)
    
    # Set titles and labels
    ax.set_title(title)
    ax.set_xlabel('Age', labelpad=15)
    ax.set_ylabel('Année', labelpad=15)
    ax.set_zlabel('Taux', labelpad=15)

    ax.tick_params(axis='z', labelsize=8, pad=5)
    
    ax.view_init(elev=40, azim=120) 
    
    if save is not None:
        plt.savefig(save, bbox_inches='tight')

    # Show the plot
    plt.show()

def plot_matrix2(X, Y, Z, n, title="Taux de mortalité"):
    X, Y = np.meshgrid(X, Y)
    
    # Create a 3D plot
    fig = plt.figure(figsize=(10, 7))
    ax = fig.add_subplot(111, projection='3d')
    
    # Plot the full matrix with a standard colormap
    surface_full = ax.plot_surface(X, Y, Z, cmap='gray', edgecolor='none', alpha=0.8)
    
    # Plot the last n rows with a different colormap
    X_last_n = X[-n:, :]
    Y_last_n = Y[-n:, :]
    Z_last_n = Z[-n:, :]
    
    surface_highlight = ax.plot_surface(X_last_n, Y_last_n, Z_last_n, cmap='plasma', edgecolor='none')
    
    # Add a color bar which maps values to colors
    fig.colorbar(surface_highlight, shrink=0.5, aspect=10, pad=0.1)
    
    # Set titles and labels
    ax.set_title(title)
    ax.set_xlabel('Age', labelpad=15)
    ax.set_ylabel('Année', labelpad=15)
    ax.set_zlabel('Taux', labelpad=15)

    ax.tick_params(axis='z', labelsize=8, pad=5)
    
    ax.view_init(elev=40, azim=120)
    
    # Show the plot
    plt.show()

def cal_ax(log_mt):
    return np.mean(log_mt, axis=0)

def cal_hax(log_mt,ax):
    return log_mt - ax # broadcasting

def cal_bx_kt_s(hax):
    # Perform Singular Value Decomposition
    U, S, Vt = svd(hax, full_matrices=False)
    
    # First left singular vector, first singular value, and first right singular vector
    k = U[:, 0]
    s1 = S[0]
    bx = Vt[0, :]
    
    # Scale the first left singular vector by the first singular value
    kt = np.array(k * s1)
    
    return bx, kt

def lee_carter(mt):
    log_mt = np.log(mt)
    ax = cal_ax(log_mt)
    hax = cal_hax(log_mt, ax)
    bx, kt = cal_bx_kt_s(hax)

    return [ax, bx, kt]

def rand_walk_drift_forecast(kt, n):
    # Calculate the drift (average change per time step)
    c = (kt[-1] - kt[0]) / (len(kt) - 1)
    
    # Generate an array of future time steps
    steps = np.arange(1, n + 1)
    
    # Calculate the forecast using vectorized operations
    forecast = kt[-1] + c * steps
    
    return forecast

def forecast_mt(ax,bx,f_kt):
    # Convert inputs to NumPy arrays for vectorized operations
    ax = np.array(ax)
    bx = np.array(bx)
    f_kt = np.array(f_kt)

    return np.exp(ax[np.newaxis, :] + f_kt[:, np.newaxis] * bx)

def forecast_lee_carter(mt,n):
    ax, bx, kt = lee_carter(mt)
    f_kt = rand_walk_drift_forecast(kt,n)
    forecasts = forecast_mt(ax, bx, f_kt)

    return forecasts

def get_ref_qbrut(filename):
    with open(f"HMD_inputs/{filename}.txt", "r") as f:
        lines = f.readlines()
        columns = lines[2].split()
        data = [l.split() for l in lines[3:]]
    df = pd.DataFrame(data, columns=columns)

    df = df.astype({
        'Year': 'int',
        'Age': 'str',  # Keep as string to handle "110+" replacement
        'mx': 'float',
        'qx': 'float',
        'ax': 'float',
        'lx': 'int',
        'dx': 'int',
        'Lx': 'int',
        'Tx': 'int',
        'ex': 'float'
    })

    # Replace "110+" with 110 in the 'Age' column
    df['Age'] = df['Age'].replace('110+', 110).astype(int)
    df['mx'] = df["mx"].replace(0, 1e-6)

    mx_matrix = df.pivot(index='Year', columns='Age', values='mx').to_numpy()

    pivot_df = df.pivot(index='Age', columns='Year', values='mx')
    pivot_df.index.name = None
    pivot_df.to_csv(f"HMD_inputs/matrices/{filename}.csv")

    ages = np.arange(pivot_df.index.min(), pivot_df.index.max()+1)
    years = np.arange(pivot_df.columns.min(), pivot_df.columns.max()+1)

    return ages, years, mx_matrix

def logit(x):
    return np.log(x/(1-x))

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# Lee Carter

In [None]:
filenames = ["HMD_IRL_F", "HMD_IRL_M", "HMD_UK_F", "HMD_UK_M"]

for filename in filenames:
    ages, years, mx_matrix = get_ref_qbrut(filename)

    plot_matrix(ages, years, mx_matrix, title=f"Taux bruts {filename}")

    nb_years_to_forecast = 2050 - years[-1]
    forecast_mx = forecast_lee_carter(mx_matrix, nb_years_to_forecast)
    full_forecast = np.vstack((mx_matrix, forecast_mx))
    projection_years = np.arange(years[0], 2050+1)

    plot_matrix(ages, projection_years, full_forecast, title=f"Taux bruts projetés {filename}")
    plot_matrix2(ages, projection_years, full_forecast, nb_years_to_forecast, title=f"Taux bruts projetés {filename}")

    projected_df = pd.DataFrame(full_forecast.T, index=ages, columns=projection_years)
    projected_df.to_csv(f"HMD_inputs/matrices/projected_{filename}.csv")

# Brass

In [None]:
def check_normality(residuals, name):
    # Histogram
    plt.figure(figsize=(10, 6))
    plt.hist(residuals, bins=30, density=True, alpha=0.6, color='g', label='Residuals')
    mu, std = np.mean(residuals), np.std(residuals)
    xmin, xmax = plt.xlim()
    x = np.linspace(xmin, xmax, 100)
    p = stats.norm.pdf(x, mu, std)
    plt.plot(x, p, 'k', linewidth=2, label='Normal distribution fit')
    plt.title('Histogram of Residuals with Normal Distribution Fit')
    plt.legend()
    plt.show()

    # Shapiro-Wilk Test
    shapiro_test = stats.shapiro(residuals)
    print(f'Shapiro-Wilk Test: W={shapiro_test.statistic}, p-value={shapiro_test.pvalue}')

    # Kolmogorov-Smirnov Test
    ks_test = stats.kstest(residuals, 'norm', args=(mu, std))
    print(f'Kolmogorov-Smirnov Test: statistic={ks_test.statistic}, p-value={ks_test.pvalue}')

    # Anderson-Darling Test
    anderson_test = stats.anderson(residuals, dist='norm')
    print(f'Anderson-Darling Test: statistic={anderson_test.statistic}')
    print('Critical values for Anderson-Darling Test:', anderson_test.critical_values)

    # Q-Q Plot
    fig, ax = plt.subplots(figsize=(6, 6))
    stats.probplot(residuals, dist="norm", plot=ax)
    ax.annotate(f'KS p-value: {ks_test.pvalue:.4f}', xy=(0.05, 0.95), xycoords='axes fraction',
        fontsize=10, ha='left', va='top', bbox=dict(facecolor='white', alpha=0.7))

    # Customize labels and title in French
    ax.get_lines()[1].set_label('Droite de référence')  # Set the reference line label
    ax.get_lines()[0].set_label('Données')  # Set the data points label

    ax.set_xlabel('Quantiles théoriques')
    ax.set_ylabel('Quantiles des données')
    ax.set_title(f'BRASS: Q-Q Plot {name}')

    # Show legend
    ax.legend(loc="lower right")

    plt.savefig(f"images/BRASS_{name}_qqplot.png", bbox_inches='tight')

    # Display the plot
    plt.show()

filenames = ["IRL_F", "IRL_M", "UK_F", "UK_M"]

projection_years = np.arange(2022, 2050 + 1)

for filename in filenames:
    df_base = pd.read_csv(f"./HMD_inputs/matrices/HMD_{filename}.csv", index_col=0)
    df_base.columns = df_base.columns.astype(int)
    df_projected_base = pd.read_csv(f"./HMD_inputs/matrices/projected_HMD_{filename}.csv", index_col=0)
    df_projected_base.columns = df_projected_base.columns.astype(int)

    df = pd.read_csv(f"./matrices/BRUT_{filename}.csv", index_col=0)
    df.columns = [int(col[4:]) for col in df.columns]
    df.index = [int(idx[5:]) for idx in df.index]
    df = df.T

    # Find common years and ages
    common_years = df.columns.intersection(df_base.columns)
    common_ages = df.index.intersection(df_base.index)

    # Filter dataframes to only include common years and ages
    qbrut_filtered = df.loc[common_ages, common_years].T.to_numpy()
    qbrut_base_filtered = df_base.loc[common_ages, common_years].T.to_numpy()
    qbrut_projected_base = df_projected_base.loc[common_ages, projection_years].T.to_numpy()

    def Brass(qx, qxbase):  
        # Create a mask for cases where qx is zero
        mask = (qx == 0)
        
        # Handle cases where logit is undefined (mask = True)
        # We will use valid indices where qx is non-zero
        valid_indices = ~mask
        
        # Perform logit transformation only on valid indices
        Yx = logit(qx[valid_indices])
        Yxbase = logit(qxbase[valid_indices])
        
        # Perform linear fitting
        a, b = np.polyfit(Yxbase.flatten(), Yx.flatten(), 1)


        # Linear regression using statsmodels
        X = sm.add_constant(Yxbase.flatten())  # Add an intercept term to the model
        model = sm.OLS(Yx.flatten(), X).fit()  # Fit the model
        print(a,b)
        print(model.summary())
        print(model.pvalues)
        
        return a, b

    a,b = Brass(qbrut_filtered, qbrut_base_filtered)
    print(f"{filename}: {a=}, {b=}")

    n_years = len(common_years)
    ncols = 3
    nrows = (n_years + ncols - 1) // ncols  # Compute the number of rows needed
    fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(ncols * 5, nrows * 4), sharey=True)

    # Flatten the axes array for easy iteration (in case of multiple rows)
    axes = axes.flatten()

    # Plotting
    for year_idx, (ax, year) in enumerate(zip(axes, common_years)):
        logit_qbrut = logit(qbrut_filtered[year_idx])
        ax.scatter(common_ages, logit_qbrut, color='blue', label='Logit des taux bruts')

        ax.plot(common_ages, a*logit(qbrut_base_filtered[year_idx]) + b, color='red', label='BRASS')

        # Perform linear regression
        valid_indices = qbrut_filtered[year_idx] != 0
        slope, intercept = np.polyfit(common_ages[valid_indices], logit_qbrut[valid_indices], 1)
        regression_line = slope * common_ages + intercept
        ax.plot(common_ages, regression_line, color='green', linestyle='--', label='Régression linéaire')

        pearson_test = stats.pearsonr(common_ages[valid_indices], logit_qbrut[valid_indices])
        print(f"{pearson_test=}")
        ax.annotate(f'p-value: {pearson_test.pvalue:.4e}', xy=(0.05, 0.95), xycoords='axes fraction',
            fontsize=10, ha='left', va='top', bbox=dict(facecolor='white', alpha=0.7))

        ax.set_xlabel('Age')
        ax.set_title(f"Année {year}")
        if year_idx == 0:
            ax.set_ylabel('Taux')
        if year_idx == n_years - 1:
            ax.legend(loc="lower right")
    # Hide any unused subplots
    for ax in axes[n_years:]:
        ax.axis('off')
    
    fig.suptitle(f"{filename}")
    # Adjust layout
    plt.tight_layout()

    plt.savefig(f"images/BRASS_{filename}_loglin.png", bbox_inches='tight')
    plt.show()

    residuals = (logit(qbrut_filtered) - (a*logit(qbrut_base_filtered)+ b))[qbrut_filtered != 0].flatten()
    check_normality(residuals, filename)

    n_years = len(projection_years)

    # Create a figure and a set of subplots
    fig, axes = plt.subplots(nrows=n_years//5, ncols=5, figsize=(25, 4*(n_years//5)), sharey=True)

    # Plotting
    for year_idx, (ax, year) in enumerate(zip(axes.flatten()[:n_years], projection_years)):
        ax.plot(common_ages, qbrut_projected_base[year_idx], color='red')
        ax.set_xlabel('Age')
        ax.set_title(f"Year {year}")
        if year_idx == 0:
            ax.set_ylabel('Taux')
        if year_idx == n_years - 1:
            ax.legend(loc='lower right')

    fig.suptitle(f"Taux bruts de ref projetés (Lee Carter)")
    # Adjust layout
    plt.tight_layout()
    plt.show()

    projected = sigmoid(a*logit(qbrut_projected_base) + b)
    plot_matrix(common_ages, projection_years, projected, title=f"BRASS: {filename}", save=f"images/BRASS_{filename}.png")

    all_ages = list(df_projected_base.index)
    qbrut_projected_base_all_ages = df_projected_base.loc[all_ages, projection_years].T.to_numpy()
    projected = sigmoid(a*logit(qbrut_projected_base_all_ages) + b)
    plot_matrix(all_ages, projection_years, projected, title=f"BRASS: {filename}", save=f"images/BRASS_{filename}_all.png")
    
    all_years = range(2015, 2050+1)
    qbrut_projected_base_all_ages = df_projected_base.loc[all_ages, all_years].T.to_numpy()
    projected = sigmoid(a*logit(qbrut_projected_base_all_ages) + b)
    save_matrix(projected, f"matrices/BRASS_{filename}_all.csv", ages=all_ages, years=range(2015, 2050+1))