### Part-I: Theoretical formating

## Part -II. Implementation

In [107]:
# required packages

import numpy as np
import pandas as pd
from scipy.linalg import cholesky
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt
import csv

### II-I. csv files creations with dummy values


In [108]:
# Rates Parameters Data
rates_data = {
    "Currency": ["USD", "EUR", "GBP", "CHF"],
    "a": [0.2, 0.2, 0.2, 0.2],
    "sigma": [0.05, 0.05, 0.05, 0.05]
}
rates_df = pd.DataFrame(rates_data)
rates_df.to_csv("rates.csv", index=False)


In [109]:
rate_data = pd.read_csv("rates.csv")
rate_data

Unnamed: 0,Currency,a,sigma
0,USD,0.2,0.05
1,EUR,0.2,0.05
2,GBP,0.2,0.05
3,CHF,0.2,0.05


In [110]:
# FX Parameters Data

fx_data = {
    "Pair": ["EUR/USD", "GBP/USD", "CHF/USD"],
    "Initial FX": [1.1, 1.3, 0.9],
    "eta": [0.05, 0.05, 0.05]
}
fx_df = pd.DataFrame(fx_data)
fx_df.to_csv("fx.csv", index=False)


In [111]:
fx_data = pd.read_csv("fx.csv")
fx_data

Unnamed: 0,Pair,Initial FX,eta
0,EUR/USD,1.1,0.05
1,GBP/USD,1.3,0.05
2,CHF/USD,0.9,0.05


In [112]:
def create_yield_curve_csv(file_name):
    data = [
        # USD Data
        {"Currency": "USD", "T": 90, "ZeroCouponRate": 0.03},
        {"Currency": "USD", "T": 180, "ZeroCouponRate": 0.031},
        {"Currency": "USD", "T": 365, "ZeroCouponRate": 0.032},
        {"Currency": "USD", "T": 730, "ZeroCouponRate": 0.034},
        {"Currency": "USD", "T": 1825, "ZeroCouponRate": 0.035},
        {"Currency": "USD", "T": 3650, "ZeroCouponRate": 0.04},
        # EUR Data
        {"Currency": "EUR", "T": 90, "ZeroCouponRate": 0.02},
        {"Currency": "EUR", "T": 180, "ZeroCouponRate": 0.022},
        {"Currency": "EUR", "T": 365, "ZeroCouponRate": 0.025},
        {"Currency": "EUR", "T": 730, "ZeroCouponRate": 0.028},
        {"Currency": "EUR", "T": 1825, "ZeroCouponRate": 0.03},
        {"Currency": "EUR", "T": 3650, "ZeroCouponRate": 0.035},
        # GBP Data
        {"Currency": "GBP", "T": 90, "ZeroCouponRate": 0.035},
        {"Currency": "GBP", "T": 180, "ZeroCouponRate": 0.036},
        {"Currency": "GBP", "T": 365, "ZeroCouponRate": 0.037},
        {"Currency": "GBP", "T": 730, "ZeroCouponRate": 0.039},
        {"Currency": "GBP", "T": 1825, "ZeroCouponRate": 0.04},
        {"Currency": "GBP", "T": 3650, "ZeroCouponRate": 0.045},
        # CHF Data
        {"Currency": "CHF", "T": 90, "ZeroCouponRate": 0.01},
        {"Currency": "CHF", "T": 180, "ZeroCouponRate": 0.011},
        {"Currency": "CHF", "T": 365, "ZeroCouponRate": 0.012},
        {"Currency": "CHF", "T": 730, "ZeroCouponRate": 0.014},
        {"Currency": "CHF", "T": 1825, "ZeroCouponRate": 0.015},
        {"Currency": "CHF", "T": 3650, "ZeroCouponRate": 0.02},
    ]
    

    with open(file_name, mode="w", newline="") as file:
        writer = csv.DictWriter(file, fieldnames=["Currency", "T", "ZeroCouponRate"])
        writer.writeheader()  
        writer.writerows(data)  
create_yield_curve_csv("yield_curves.csv")


In [113]:
yield_data = pd.read_csv("yield_curves.csv")
yield_data

Unnamed: 0,Currency,T,ZeroCouponRate
0,USD,90,0.03
1,USD,180,0.031
2,USD,365,0.032
3,USD,730,0.034
4,USD,1825,0.035
5,USD,3650,0.04
6,EUR,90,0.02
7,EUR,180,0.022
8,EUR,365,0.025
9,EUR,730,0.028


In [114]:
# Define the correlation matrix csv file
size = 7
correlation_value = 0.30

correlation_matrix = np.full((size, size), correlation_value)
np.fill_diagonal(correlation_matrix, 1)
correlation_df = pd.DataFrame(correlation_matrix)
# Save to CSV
correlation_df.to_csv("correlation_matrix.csv", index=False, header=False)

In [115]:
corr_matrix = pd.read_csv("correlation_matrix.csv", header = None).values
corr_matrix

array([[1. , 0.3, 0.3, 0.3, 0.3, 0.3, 0.3],
       [0.3, 1. , 0.3, 0.3, 0.3, 0.3, 0.3],
       [0.3, 0.3, 1. , 0.3, 0.3, 0.3, 0.3],
       [0.3, 0.3, 0.3, 1. , 0.3, 0.3, 0.3],
       [0.3, 0.3, 0.3, 0.3, 1. , 0.3, 0.3],
       [0.3, 0.3, 0.3, 0.3, 0.3, 1. , 0.3],
       [0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 1. ]])

### II-II. Jointly Simulating all drivers

In [116]:
class DriverSimulator:
    def __init__(self, correlation_matrix):
        # Load CSV files
        self.rates_df = pd.read_csv("rates.csv")  
        self.fx_df = pd.read_csv("fx.csv") 

        # Extract parameters
        self.rate_reversions = self.rates_df["a"].values  # Mean-reversion (a)
        self.rate_volatilities = self.rates_df["sigma"].values  # Volatility (σ)
        self.fx_volatilities = self.fx_df["eta"].values  # FX volatility (η)

        # Correlation matrix and Cholesky decomposition
        self.correlation_matrix = correlation_matrix
        self.cholesky_matrix = np.linalg.cholesky(correlation_matrix)

    def compute_variances(self, t1, t2):
        # Rate driver variances
        rate_variances = [
            (sigma**2 / (2 * a)) * (1 - np.exp(-2 * a * (t2 - t1)))
            for sigma, a in zip(self.rate_volatilities, self.rate_reversions)
        ]

        # FX driver variances
        fx_variances = [
            eta**2 * (t2 - t1)
            for eta in self.fx_volatilities
        ]

        # Combine rate and FX standard deviations
        return np.concatenate([np.sqrt(rate_variances), np.sqrt(fx_variances)])

    def simulate_drivers(self, t1, t2, num_paths):
        
        # Compute standard deviations
        variances = self.compute_variances(t1, t2)
        variance_matrix = np.diag(variances)

        # Generate independent normal variables
        Z = np.random.normal(0, 1, (len(variances), num_paths))

        # Apply variance and Cholesky matrix for correlation
        drivers = variance_matrix @ self.cholesky_matrix @ Z
        
        return drivers


### Statistical tests on drivers (test variances )

In [117]:
#Statistical test on variances

# Theoretical variance (between times 0 and 1)
rates_anal_var = ((1 - np.exp(-2 * 0.2 * (1 - 0))) * 0.05**2) / (2 * 0.2)
fx_anal_var = (1 - 0) * (0.05**2)

simulator = DriverSimulator(corr_matrix)
P = simulator.simulate_drivers(0, 1, 100)

# simulated variance
sim_var = np.var(P[0])
sim_var_fx = np.var(P[6])

print("rates variance compare = ", (rates_anal_var, sim_var))
print("fx variance compare = ", (fx_anal_var, sim_var_fx))

rates variance compare =  (0.0020604997122772545, 0.0018794574104388703)
fx variance compare =  (0.0025000000000000005, 0.002569157476045232)


In [119]:
class RatesSimulator:
    def __init__(self, driver_simulator, initial_rates, time_grid):
        self.driver_simulator = driver_simulator
        self.initial_rates = initial_rates
        self.time_grid = time_grid
        self.currencies = list(initial_rates.keys())
        self.num_currencies = len(self.currencies)

    def compute_phi(self, currency, t1, t2):
        try:
            rates_row = self.driver_simulator.rates_df[self.driver_simulator.rates_df["Currency"] == currency]
            fx_row = self.driver_simulator.fx_df[self.driver_simulator.fx_df["Pair"].str.contains(currency)]
            
            if rates_row.empty or fx_row.empty:
                raise ValueError(f"Missing data for currency {currency}")
            
            # Extract parameters
            sigma = rates_row["sigma"].values[0]
            eta = fx_row["eta"].values[0]
            a = rates_row["a"].values[0]
            phi = (sigma * 0.05 * eta * (1 - np.exp(-a * (t2 - t1)))) / a
            return phi
        except Exception as e:
            print(f"Error computing phi for {currency}: {e}")
            raise

    def simulate_rates(self, num_paths):
        """Simulate interest rate paths for all currencies."""
        try:
            rate_paths = {currency: np.zeros((len(self.time_grid), num_paths)) for currency in self.currencies}
            for currency in self.currencies:
                rate_paths[currency][0, :] = self.initial_rates[currency]

            for t_idx in range(1, len(self.time_grid)):
                t1 = self.time_grid[t_idx - 1]
                t2 = self.time_grid[t_idx]

                drivers = self.driver_simulator.simulate_drivers(t1, t2, num_paths)

                for c_idx, currency in enumerate(self.currencies):
                    row = self.driver_simulator.rates_df[self.driver_simulator.rates_df["Currency"] == currency]
                    a = row["a"].values[0]
                    sigma = row["sigma"].values[0]

                    prev_rates = rate_paths[currency][t_idx - 1, :]
                    mean_reversion = np.exp(-a * (t2 - t1)) * prev_rates
                    volatility_scale = (sigma / a) * (1 - np.exp(-a * (t2 - t1)))
                    volatility_component = volatility_scale * drivers[c_idx, :]
                    phi_drift = self.compute_phi(currency, t1, t2)

                    rate_paths[currency][t_idx, :] = mean_reversion + volatility_component + phi_drift

            return rate_paths
        except Exception as e:
            print(f"Error in simulate_rates: {e}")
            raise

    def get_rate(self, currency, t_idx):
        """Get the simulated rate for a given currency at time index t_idx."""
        try:
            rate_paths = self.simulate_rates(num_paths=1)  # Simulate for 1 path
            rate_at_t = rate_paths[currency][t_idx, 0]  # Get the rate for the given time index
            return rate_at_t
        except Exception as e:
            print(f"Error getting rate for {currency} at time {t_idx}: {e}")
            raise


In [120]:
class FXSimulator:
    def __init__(self, rates_simulator, fx_data, time_grid):
        self.rates_simulator = rates_simulator
        self.fx_data = fx_data
        self.time_grid = time_grid
        self.fx_paths = {}

    def simulate_fx(self, num_paths):
        for _, row in self.fx_data.iterrows():
            pair = row['Pair']
            initial_fx = row['Initial FX']
            eta_ccy = row['eta']

            ccy, usd = pair.split('/')

            # Retrieve parameters for both currencies
            rates_df = self.rates_simulator.driver_simulator.rates_df
            params_ccy = rates_df[rates_df["Currency"] == ccy].iloc[0]
            params_usd = rates_df[rates_df["Currency"] == usd].iloc[0]

            a_ccy, sigma_ccy = params_ccy["a"], params_ccy["sigma"]
            a_usd, sigma_usd = params_usd["a"], params_usd["sigma"]

            # Initialize FX path
            fx_path = np.zeros((len(self.time_grid), num_paths))
            fx_path[0, :] = initial_fx 

            # Precompute rates for all currencies
            rate_paths = self.rates_simulator.simulate_rates(num_paths)

            # Simulate FX rates over the time grid
            for t_idx in range(1, len(self.time_grid)):
                t1 = self.time_grid[t_idx - 1]
                t2 = self.time_grid[t_idx]
                dt = t2 - t1

                # Get drivers
                drivers = self.rates_simulator.driver_simulator.simulate_drivers(t1, t2, num_paths)
                fx_driver = drivers[-1, :]
                rate_driver_usd = drivers[0, :]
                rate_driver_ccy = drivers[1, :]

                rate_usd_s = rate_paths[usd][t_idx - 1, :]
                rate_ccy_s = rate_paths[ccy][t_idx - 1, :]

                # Calculate D(t)
                d_t = (
                    (1 / a_ccy) * (
                        sigma_usd * fx_driver
                        - sigma_usd * rate_driver_usd
                        + (1 - np.exp(-a_usd * dt)) * rate_usd_s
                    )
                    - (1 / a_ccy) * (
                        sigma_ccy * fx_driver
                        - sigma_ccy * rate_driver_ccy
                        + (np.exp(-a_ccy * dt) - 1) * rate_ccy_s
                    )
                    - 0.5 * eta_ccy**2
                    + eta_ccy * fx_driver
                )

                # Update FX rate
                fx_path[t_idx, :] = fx_path[t_idx - 1, :] * np.exp(d_t)

            # Store the simulated path
            self.fx_paths[pair] = fx_path

    def get_fx_curve(self, pair, t_idx):
        
        if pair not in self.fx_paths:
            raise ValueError(f"FX path for pair {pair} not simulated. Call `simulate_fx` first.")
        return self.fx_paths[pair][t_idx, :]


In [122]:
#Functions to use

def BtT(a, t, T):
    """Calculate BtT using the formula provided."""
    return (np.exp(-a * (T - t)) - 1) / a


def AtT(currency, t, T):
    """Calculate AtT for the given currency, t, and T."""
    rates_df = pd.read_csv("rates.csv")
    
    row = rates_df[rates_df["Currency"] == currency]
    if row.empty:
        raise ValueError(f"Currency {currency} not found in rates.csv")
    
    a = row["a"].values[0]
    sigma = row["sigma"].values[0]

    a1 = sigma**2 / (2 * a**2)
    b1 = (1 - np.exp(-2 * a * (T - t))) / (2 * a)
    A = np.exp(a1 * (b1 + 2 * BtT(a, t, T) + (T - t)))
    
    return A


def ExpMinusphitT(currency, t, T):
    """Calculate exp(-phi) for the given currency, t, and T."""
    rates_df = pd.read_csv("rates.csv")
    
    row = rates_df[rates_df["Currency"] == currency]
    if row.empty:
        raise ValueError(f"Currency {currency} not found in rates.csv")
    
    a = row["a"].values[0]
    sigma = row["sigma"].values[0]

    ratio = (P0T(currency, T) * AtT(currency, 0, t)) / (P0T(currency, t) * AtT(currency, 0, T))
    phi = ratio * np.exp(BtT(a, 0, t) - BtT(a, 0, T))
    
    return phi


def P0T(currency, T):
    df = pd.read_csv("yield_curves.csv")
    currency_df = df[df['Currency'] == currency]
    tenors = currency_df['T'].values
    rates = currency_df['ZeroCouponRate'].values

    if T in tenors:
        zc_rate = rates[np.where(tenors == T)[0][0]]
    elif T < tenors.min():
        # Extrapolate using the nearest tenor
        zc_rate = rates[np.argmin(tenors)]
    elif T > tenors.max():
        # Extrapolate using the nearest tenor
        zc_rate = rates[np.argmax(tenors)]
    else:
        lower_idx = np.max(np.where(tenors < T))
        upper_idx = np.min(np.where(tenors > T))
        T1, zc1 = tenors[lower_idx], rates[lower_idx]
        T2, zc2 = tenors[upper_idx], rates[upper_idx]
        log_zc1, log_zc2 = np.log(zc1), np.log(zc2)
        log_zc = log_zc1 + (log_zc2 - log_zc1) * (T - T1) / (T2 - T1)
        zc_rate = np.exp(log_zc)

    discount_factor = np.exp(-zc_rate * T / 365)
    return discount_factor


In [125]:
class Market:
    def __init__(self, rates_file, yield_curves_file, rates_simulator, fx_data, time_grid):
    
        self.rates_simulator = rates_simulator
        self.fx_data = pd.read_csv(fx_data)  # Read the fx.csv file into a DataFrame
        self.rates_df = pd.read_csv(rates_file)  # For rate parameters (a, sigma)
        self.yield_curves_df = pd.read_csv(yield_curves_file)  # For Zero Coupon Rates
        self.time_grid = time_grid
        
        self.fx_simulator = FXSimulator(rates_simulator, self.fx_data, self.time_grid)

        # Validate the data
        self._validate_data()

    def _validate_data(self):
        
        required_rates_columns = ['Currency', 'a', 'sigma']
        for col in required_rates_columns:
            if col not in self.rates_df.columns:
                raise ValueError(f"Missing required column '{col}' in rates_df")

        required_yield_columns = ['Currency', 'T', 'ZeroCouponRate']
        for col in required_yield_columns:
            if col not in self.yield_curves_df.columns:
                raise ValueError(f"Missing required column '{col}' in yield_curves_df")

        required_fx_columns = ['Pair', 'Initial FX', 'eta']
        for col in required_fx_columns:
            if col not in self.fx_data.columns:
                raise ValueError(f"Missing required column '{col}' in fx_data")

    def get_zero_coupon(self, currency: str, t: int, T: int):
        
        A = AtT(currency, t, T)
        row = self.rates_df[self.rates_df["Currency"] == currency]
        if row.empty:
            raise ValueError(f"Currency {currency} not found in rates_df")
        a = row["a"].values[0]
        B = BtT(a, t, T)
        exp_minus_phi = ExpMinusphitT(currency, t, T)
        rate_t = self.rates_simulator.get_rate(currency, t)

        zero_coupon_price = A * np.exp(B * rate_t) * exp_minus_phi
        return zero_coupon_price

    def get_fx(self, currency: str, t: int):
       
        if currency not in self.fx_data['Pair'].values:
            raise ValueError(f"FX pair for {currency} not found in fx_data.")
        
        self.fx_simulator.simulate_fx(num_paths=1)
        
        # Fetch the initial FX rate from the fx_data for the given pair
        Fx_rate = self.fx_simulator.get_fx_curve(currency, t)
        fx_rate = np.mean(Fx_rate)
        return fx_rate


In [159]:
fx_simulator = FXSimulator(rates_simulator, fx_df, time_grid=[0, 1, 2, 3, 4])


market = Market(
    rates_file="rates.csv",
    yield_curves_file="yield_curves.csv",
    rates_simulator=rates_simulator,
    fx_data="fx.csv",  
    time_grid=[0, 1, 2, 3, 4]
)


test_get_zc = market.get_zero_coupon("USD", 90, 180)
fx_rate = market.get_fx("EUR/USD", 2)
print(f"FX Rate for EUR/USD at t=4: {fx_rate}")
print(f"Zero-coupon with mMaturity 180 days at 2: {test_get_zc}")

FX Rate for EUR/USD at t=4: 1.1848258409764396
Zero-coupon with mMaturity 180 days at 2: 0.8214168541203152
