In [None]:
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from scipy.interpolate import interp1d
from scipy.stats import norm
import matplotlib.pyplot as plt
from scipy.interpolate import CubicSpline
from scipy.optimize import differential_evolution
from scipy.integrate import quad


class HullWhiteModel:
    def __init__(self, T, R_market, tau=0.25, dt=0.01, T_max=30.0, M=1000):
        self.T = T
        self.R_market = R_market
        self.P_market = np.exp(-R_market * T)
        self.tau = tau
        self.dt = dt
        self.T_max = T_max
        self.M = M
        self.N = int(T_max / dt)
        self.time_grid = np.linspace(0, T_max, self.N + 1)
        self.initialize_f_curve()


    """def initialize_f_curve(self):
        self.f_spline = CubicSpline(self.T, self.R_market)
        self.df_spline = self.f_spline.derivative()
"""

    def initialize_f_curve(self):
        logP_spline = CubicSpline(self.T, np.log(self.P_market))
        self.f_spline = lambda t: -logP_spline.derivative()(t)
        self.df_spline = lambda t: -logP_spline.derivative(nu=2)(t)

    
    def f0(self, t):
        """Taux forward instantané f(0,t) interpolé"""
        return self.f_spline(t)

    def df0(self, t):
        """Dérivée du taux forward instantané f'(0,t)"""
        return self.df_spline(t)
    

    def theta(self, t, a, sigma):
        """Fonction theta(t) du modèle Hull-White"""
        return self.df0(t) + a * self.f0(t) + (sigma ** 2) / (2 * a ) * (1 - np.exp(-2 * a * t))
    
    def P0(self, t):
        """Prix de l'obligation zéro-coupon au temps 0 pour maturité t"""
        return np.exp(-self.f0(t) * t)

    def P0_vectorized(self, t_array):
        return np.array([self.P0(ti) for ti in t_array])




    def B(self, T, a):
        return (1 - np.exp(-a * T)) / a

    def A(self, T, a, sigma):
        B_T = self.B(T, a)
        f_vals = self.f0(T)  # ✅ nouveau : utilise l'interpolation spline
        term1 = B_T * f_vals
        term2 = (sigma ** 2 / (4 * a)) * (1 - np.exp(-2 * a * T)) * B_T ** 2
        return self.P0(T) * np.exp(term1 - term2)  # ✅ utilise self.P0(T)


    def calibrate_r0(self, A_T, B_T):
        mask = B_T > 1e-5
        numerateur = np.log(A_T[mask]) - np.log(self.P_market[mask])
        return np.mean(numerateur / B_T[mask])

    def black_cap_price(self, T_cap, sigma_cap, P_interp_func):
        N_caplets = int(T_cap / self.tau)
        caplet_dates = np.arange(self.tau, T_cap + 1e-8, self.tau)
        P_Ti = P_interp_func(caplet_dates)
        P_Ti_plus = P_interp_func(caplet_dates + self.tau)
        F = (P_Ti / P_Ti_plus - 1) / self.tau #formule sur les feuilles, même formule que l'euribor3M
        K = F # ATM
        cap_price = 0.0
        for i in range(N_caplets):
            T_i = caplet_dates[i]
            F_i = F[i]
            K_i = K[i]
            P_i1 = P_Ti_plus[i]
            d1 = (np.log(F_i / K_i) + 0.5 * sigma_cap ** 2 * T_i) / (sigma_cap * np.sqrt(T_i))
            d2 = d1 - sigma_cap * np.sqrt(T_i)
            caplet_price = self.tau * P_i1 * (F_i * norm.cdf(d1) - K_i * norm.cdf(d2))
            cap_price += caplet_price
        return cap_price
        


    




    def black_swaption_price(self, T_exp, T_swap, sigma_swaption, P_interp_func):
        fixed_leg_dates = np.arange(T_exp + self.tau, T_exp + T_swap + 1e-8, self.tau)
        P_T = P_interp_func(T_exp)
        P_T_k = P_interp_func(fixed_leg_dates)
        annuity = self.tau * np.sum(P_T_k)
        swap_rate = (P_T - P_T_k[-1]) / annuity
        K = swap_rate # ATM
        d1 = (np.log(swap_rate / K) + 0.5 * sigma_swaption ** 2 * T_exp) / (sigma_swaption * np.sqrt(T_exp))
        d2 = d1 - sigma_swaption * np.sqrt(T_exp)
        price = annuity * (swap_rate * norm.cdf(d1) - K * norm.cdf(d2)) #swaption payer
        #price = annuity * (K * norm.cdf(-d2) - swap_rate * norm.cdf(-d1)) #swaption receiver, dans les deux cas on trouve la même calibration
        return price

    def objective_caps(self, params):
        a, sigma = params
        if a <= 0 or sigma <= 0:
            return 1e6
        B_T = self.B(self.T, a)
        A_T = self.A(self.T, a, sigma)
        r0 = self.calibrate_r0(A_T, B_T)
        P_model = A_T * np.exp(-B_T * r0)
        R_model = -np.log(P_model) / self.T
        P_interp_func = interp1d(self.T, np.exp(-R_model * self.T), kind="linear", fill_value="extrapolate")
        cap_prices_model = np.array([
        self.black_cap_price(T_cap, vol, P_interp_func)
        for T_cap, vol in zip(self.cap_maturities, self.cap_vols)
        ])
        return np.mean((cap_prices_model - self.cap_prices_market) ** 2)
    




    def objective_swaptions(self, params):
        a, sigma = params
        if a <= 0 or sigma <= 0:
            return 1e6
        B_T = self.B(self.T, a)
        A_T = self.A(self.T, a, sigma)
        r0 = self.calibrate_r0(A_T, B_T)
        P_model = A_T * np.exp(-B_T * r0)
        R_model = -np.log(P_model) / self.T
        P_interp_func = interp1d(self.T, np.exp(-R_model * self.T), kind="linear", fill_value="extrapolate")
        swaption_prices_model = np.array([
        self.black_swaption_price(T_exp, T_swap, vol, P_interp_func)
        for T_exp, T_swap, vol in zip(self.expiries_swap, self.tenors_swap, self.swaption_vols)
        ])
        return np.mean((swaption_prices_model - self.swaption_prices_market) ** 2) #on va chercher à minimiser l'erreur quadratique moyenne



    def objective_joint(self, params, w1=0.5, w2=0.5):
        a, sigma = params
        if a <= 0 or sigma <= 0:
            return 1e6

        B_T = self.B(self.T, a)
        A_T = self.A(self.T, a, sigma)
        r0 = self.calibrate_r0(A_T, B_T)
        P_model = A_T * np.exp(-B_T * r0)
        R_model = -np.log(P_model) / self.T
        P_interp_func = interp1d(self.T, np.exp(-R_model * self.T), kind="linear", fill_value="extrapolate")

        # Prix modèle pour caplets
        cap_prices_model = np.array([
            self.black_cap_price(T_cap, vol, P_interp_func)
            for T_cap, vol in zip(self.cap_maturities, self.cap_vols)
        ])
        mse_caps = np.mean((cap_prices_model - self.cap_prices_market) ** 2)

        # Prix modèle pour swaptions
        swaption_prices_model = np.array([
            self.black_swaption_price(T_exp, T_swap, vol, P_interp_func)
            for T_exp, T_swap, vol in zip(self.expiries_swap, self.tenors_swap, self.swaption_vols)
        ])
        mse_swaps = np.mean((swaption_prices_model - self.swaption_prices_market) ** 2)

        return w1 * mse_caps + w2 * mse_swaps
    


    def calibrate_by_caps(self, cap_maturities, cap_vols, cap_prices_market, initial_guess=[0.2, 0.01]):
        self.cap_maturities = cap_maturities
        self.cap_vols = cap_vols
        self.cap_prices_market = cap_prices_market
        res = minimize(self.objective_caps, initial_guess, bounds=[(1e-4, 1), (1e-4, 0.5)])
        self.a, self.sigma = res.x
        B_T = self.B(self.T, self.a)
        A_T = self.A(self.T, self.a, self.sigma)
        self.r0 = self.calibrate_r0(A_T, B_T)
        return self.a, self.sigma, self.r0
    

    def calibrate_by_caps_global(self, cap_maturities, cap_vols, cap_prices_market, bounds=[(0.005, 0.5), (0.005, 0.1)]):
        self.cap_maturities = cap_maturities
        self.cap_vols = cap_vols
        self.cap_prices_market = cap_prices_market

        result = differential_evolution(self.objective_caps, bounds, strategy='best1bin', maxiter=1000, tol=1e-6, polish=True)
        
        self.a, self.sigma = result.x
        B_T = self.B(self.T, self.a)
        A_T = self.A(self.T, self.a, self.sigma)
        self.r0 = self.calibrate_r0(A_T, B_T)

        return self.a, self.sigma, self.r0

    


    def calibrate_by_swaptions(self, expiries, tenors, vols, swaption_prices_market, initial_guess=[0.2, 0.01]):
        self.expiries_swap = expiries
        self.tenors_swap = tenors
        self.swaption_vols = vols
        self.swaption_prices_market = swaption_prices_market
        res = minimize(self.objective_swaptions, initial_guess, bounds=[(1e-4, 1), (1e-4, 0.5)])
        self.a, self.sigma = res.x
        B_T = self.B(self.T, self.a)
        A_T = self.A(self.T, self.a, self.sigma)
        self.r0 = self.calibrate_r0(A_T, B_T)
        return self.a, self.sigma, self.r0
    

    def calibrate_by_swaptions_global(self, expiries, tenors, vols, swaption_prices_market, bounds=[(0.005, 0.5), (0.005, 0.1)]):
        self.expiries_swap = expiries
        self.tenors_swap = tenors
        self.swaption_vols = vols
        self.swaption_prices_market = swaption_prices_market
        res = differential_evolution(self.objective_swaptions, bounds, strategy='best1bin', maxiter=1000, tol=1e-6, polish=True)
        self.a, self.sigma = res.x
        B_T = self.B(self.T, self.a)
        A_T = self.A(self.T, self.a, self.sigma)
        self.r0 = self.calibrate_r0(A_T, B_T)
        return self.a, self.sigma, self.r0
    
    


    

    def calibrate_joint(self, cap_maturities, cap_vols, cap_prices_market,
                    expiries, tenors, vols, swaption_prices_market,
                    w1=0.5, w2=0.5, initial_guess=[0.1, 0.01]):
    
        # Stocker les données
        self.cap_maturities = cap_maturities
        self.cap_vols = cap_vols
        self.cap_prices_market = cap_prices_market

        self.expiries_swap = expiries
        self.tenors_swap = tenors
        self.swaption_vols = vols
        self.swaption_prices_market = swaption_prices_market

        # Minimisation de la fonction objectif conjointe
        obj = lambda params: self.objective_joint(params, w1=w1, w2=w2)
        res = minimize(obj, initial_guess, bounds=[(1e-4, 1), (1e-4, 0.5)])
        self.a, self.sigma = res.x

        # Calcul final de r0
        B_T = self.B(self.T, self.a)
        A_T = self.A(self.T, self.a, self.sigma)
        self.r0 = self.calibrate_r0(A_T, B_T)

        return self.a, self.sigma, self.r0
    

    def calibrate_joint_global(self, cap_maturities, cap_vols, cap_prices_market,
                    expiries, tenors, vols, swaption_prices_market,
                    w1=0.5, w2=0.5, bounds=[(0.005, 0.5), (0.005, 0.1)]):
    
        # Stocker les données
        self.cap_maturities = cap_maturities
        self.cap_vols = cap_vols
        self.cap_prices_market = cap_prices_market

        self.expiries_swap = expiries
        self.tenors_swap = tenors
        self.swaption_vols = vols
        self.swaption_prices_market = swaption_prices_market

        # Minimisation de la fonction objectif conjointe
        obj = lambda params: self.objective_joint(params, w1=w1, w2=w2)
        res = differential_evolution(obj, bounds, strategy='best1bin', maxiter=1000, tol=1e-6, polish=True)
        self.a, self.sigma = res.x

        # Calcul final de r0
        B_T = self.B(self.T, self.a)
        A_T = self.A(self.T, self.a, self.sigma)
        self.r0 = self.calibrate_r0(A_T, B_T)

        return self.a, self.sigma, self.r0



    def plot_curve_fit(self):
        B_T = self.B(self.T, self.a)
        A_T = self.A(self.T, self.a, self.sigma)
        P_model = A_T * np.exp(-B_T * self.r0)
        R_model = -np.log(P_model) / self.T
        plt.figure(figsize=(10, 5))
        plt.plot(self.T, self.R_market, 'o-', label="Taux marché")
        plt.plot(self.T, R_model, 's--', label="Taux Hull-White")
        plt.xlabel("Maturité (années)")
        plt.ylabel("Taux")
        plt.title("Courbe des taux : marché vs Hull-White")
        plt.legend()
        plt.grid(True)
        plt.show()






# === Données : taux zéro-coupon ===
curve = pd.read_excel("euribor3m_curve.xlsx", sheet_name="Sheet1")
T = curve["maturity"].values
R_market = curve["rate"].values / 100

# === Données : vols de caplets ===
caps = pd.read_excel("cap_floor_volatility.xlsx", index_col=0)
cap_maturities = np.array([int(label.replace("Yr", "")) for label in caps.index])
cap_vols = caps["ATM"].values / 100

# === Données : vols de swaptions ===
swaption_df = pd.read_excel("volatility_cube_swaptions_ATM.xlsx", index_col=0)
expiry_labels = swaption_df.index.values
tenor_labels = swaption_df.columns.values

def parse_maturity_label(label):
    """Convertit un label type '6Mo', '1Yr', '10Yr' en nombre d'années."""
    if 'Mo' in label:
        return int(label.replace('Mo', '')) / 12
    elif 'Yr' in label:
        return int(label.replace('Yr', ''))
    else:
        raise ValueError(f"Maturité non reconnue : {label}")

expiries, tenors, vols = [], [], []
for i, expiry in enumerate(expiry_labels):
    for j, tenor in enumerate(tenor_labels):
        expiries.append(parse_maturity_label(expiry))
        tenors.append(parse_maturity_label(tenor))
        vols.append(swaption_df.iloc[i, j] / 100)

# === Modèle temporaire pour pricing de marché ===
model_temp = HullWhiteModel(T, R_market)  # initialize_f_curve est appelé dans __init__

# === Calcul des prix de marché des caps (via Black) ===
cap_prices_market = np.array([
    model_temp.black_cap_price(T_cap, vol, model_temp.P0_vectorized)
    for T_cap, vol in zip(cap_maturities, cap_vols)
])

# === Calcul des prix de marché des swaptions (via Black) ===
swaption_prices_market = np.array([
    model_temp.black_swaption_price(T_exp, T_swap, vol, model_temp.P0)
    for T_exp, T_swap, vol in zip(expiries, tenors, vols)
])

# === Calibration par caplets ===
model_caps = HullWhiteModel(T, R_market)
a_cap, sigma_cap, r0_cap = model_caps.calibrate_by_caps_global(cap_maturities, cap_vols, cap_prices_market)
print(f"Calibration par CAPLETS : a = {a_cap:.6f}, sigma = {sigma_cap:.6f}, r0 = {r0_cap:.6f}")
model_caps.plot_curve_fit()

# === Calibration par swaptions ===
"""model_swaps = HullWhiteModel(T, R_market)
a_swp, sigma_swp, r0_swp = model_swaps.calibrate_by_swaptions_global(expiries, tenors, vols, swaption_prices_market)
print(f"Calibration par SWAPTIONS : a = {a_swp:.6f}, sigma = {sigma_swp:.6f}, r0 = {r0_swp:.6f}")
model_swaps.plot_curve_fit()

# === Calibration conjointe ===
model_joint = HullWhiteModel(T, R_market)
a_joint, sigma_joint, r0_joint = model_joint.calibrate_joint_global(
    cap_maturities, cap_vols, cap_prices_market,
    expiries, tenors, vols, swaption_prices_market,
    w1=0.5, w2=0.5
)
print(f"Calibration conjointe : a = {a_joint:.6f}, sigma = {sigma_joint:.6f}, r0 = {r0_joint:.6f}")
model_joint.plot_curve_fit()"""
