In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt




In [None]:
def vasicek(r0, K, theta, sigma, T, dt):
    """ Simulate interest rate path by the Vasicek model.
    r0 : float : initial interest rate
    K : float : speed of mean reversion
    theta : float : long-term mean interest rate
    sigma : float : volatility of interest rate
    T : float : total time
    dt : float : time step size
    """
    num_steps = int(T / dt)
    rates = np.zeros(num_steps)
    rates[0] = r0
    for t in range(1, num_steps):
        dr = K * (theta - rates[t-1]) * dt + sigma * np.random.normal() * np.sqrt(dt)
        rates[t] = rates[t-1] + dr
    return rates

# Parameters
r0 = 0.05  # initial interest rate
K = 0.3   # speed of mean reversion
theta = 0.05  # long-term mean interest rate
sigma = 0.02  # volatility of interest rate
T = 10  # total time (years)
dt = 0.01  # time step in years

# Simulation
rates_vasicek = vasicek(r0, K, theta, sigma, T, dt)

# Plotting
plt.plot(np.linspace(0, T, int(T/dt)), rates_vasicek)
plt.title('Interest Rate Simulation by Vasicek Model')
plt.xlabel('Time (years)')
plt.ylabel('Interest Rate')
plt.show()

In [None]:
def cir(r0, K, theta, sigma, T, dt):
    """ Simulate interest rate path by the CIR model.
    """
    num_steps = int(T / dt)
    rates = np.zeros(num_steps)
    rates[0] = r0
    for t in range(1, num_steps):
        dr = K * (theta - rates[t-1]) * dt + sigma * np.sqrt(rates[t-1]) * np.random.normal() * np.sqrt(dt)
        rates[t] = max(0, rates[t-1] + dr)  # ensure non-negativity
    return rates

# Simulation
rates_cir = cir(r0, K, theta, sigma, T, dt)

# Plotting
plt.plot(np.linspace(0, T, int(T/dt)), rates_cir)
plt.title('Interest Rate Simulation by CIR Model')
plt.xlabel('Time (years)')
plt.ylabel('Interest Rate')
plt.show()


In [None]:
def hull_white(r0, K, theta_t, sigma, T, dt):
    """ Simulate interest rate path by the Hull-White model.
    theta_t : function : time-dependent mean reversion level
    """
    num_steps = int(T / dt)
    rates = np.zeros(num_steps)
    rates[0] = r0
    for t in range(1, num_steps):
        theta = theta_t(t * dt)  # time-dependent theta
        dr = K * (theta - rates[t-1]) * dt + sigma * np.random.normal() * np.sqrt(dt)
        rates[t] = rates[t-1] + dr
    return rates

# Example theta_t function
def theta_t(t):
    return 0.05 + 0.01 * np.sin(t)

# Simulation
rates_hw = hull_white(r0, K, theta_t, sigma, T, dt)

# Plotting
plt.plot(np.linspace(0, T, int(T/dt)), rates_hw)
plt.title('Interest Rate Simulation by Hull-White Model')
plt.xlabel('Time (years)')
plt.ylabel('Interest Rate')
plt.show()


In [None]:
def hjm_forward_rate_simulation(forward_rates, volatilities, T, dt):
    """ A very simplified HJM forward rate simulation.
    forward_rates : array : initial forward rates
    volatilities : array : volatilities for each rate
    """
    num_steps = int(T / dt)
    num_rates = len(forward_rates)
    rate_paths = np.zeros((num_rates, num_steps))
    rate_paths[:, 0] = forward_rates

    for t in range(1, num_steps):
        dW = np.random.normal(size=num_rates) * np.sqrt(dt)  # Brownian increments
        drift = -0.5 * volatilities**2 * dt  # Simplified drift term
        diffusion = volatilities * dW
        rate_paths[:, t] = rate_paths[:, t-1] + drift + diffusion

    return rate_paths

# Example usage
forward_rates = np.linspace(0.02, 0.05, 10)  # 10 different forward rates from 2% to 5%
volatilities = np.linspace(0.01, 0.03, 10)  # increasing volatility
T = 5  # 5 years
dt = 0.01  # time step

rate_paths = hjm_forward_rate_simulation(forward_rates, volatilities, T, dt)

# Plotting
for i in range(len(forward_rates)):
    plt.plot(np.linspace(0, T, int(T/dt)), rate_paths[i, :], label=f'Rate {i+1}')
plt.title('HJM Forward Rate Simulation')
plt.xlabel('Time (years)')
plt.ylabel('Forward Rate')
plt.legend()
plt.show()


In [None]:
import numpy as np

def jump_diffusion_model(r0, K, theta, sigma, lambda_j, mu_j, sigma_j, T, dt):
    """ Simulate interest rate path with jumps.
    lambda_j : float : jump frequency per year
    mu_j : float : average jump size
    sigma_j : float : jump size volatility
    """
    num_steps = int(T / dt)
    rates = np.zeros(num_steps)
    rates[0] = r0
    for t in range(1, num_steps):
        jump = 0
        if np.random.rand() < lambda_j * dt:
            jump = np.random.normal(mu_j, sigma_j)
        dr = K * (theta - rates[t-1]) * dt + sigma * np.random.normal() * np.sqrt(dt) + jump
        rates[t] = rates[t-1] + dr
    return rates


In [None]:
# This is a conceptual representation. Actual implementation will depend on your specific model and data.
def bayesian_update(prior, likelihood, data):
    """
    Update the model's parameters using Bayesian inference.
    """
    posterior = (likelihood * data) / prior  # Simplified update rule
    return posterior


In [None]:
from sklearn.mixture import GaussianMixture

def fit_gaussian_mixture(data, components):
    model = GaussianMixture(n_components=components)
    model.fit(data.reshape(-1, 1))
    return model


In [None]:
from statsmodels.tsa.arima.model import ARIMA

def autoregressive_model(data, order):
    model = ARIMA(data, order=order)
    model_fit = model.fit()
    return model_fit


In [None]:
def monte_carlo_simulation(start_price, mu, sigma, T, dt, scenarios):
    num_steps = int(T / dt)
    results = np.zeros((scenarios, num_steps))
    for s in range(scenarios):
        prices = np.zeros(num_steps)
        prices[0] = start_price
        for t in range(1, num_steps):
            prices[t] = prices[t-1] * np.exp((mu - 0.5 * sigma**2) * dt + sigma * np.random.normal() * np.sqrt(dt))
        results[s] = prices
    return results


In [None]:
def markov_chain(states, transition_matrix, steps):
    current_state = np.random.choice(states)
    history = [current_state]
    for _ in range(steps):
        current_state = np.random.choice(states, p=transition_matrix[current_state])
        history.append(current_state)
    return history


In [None]:
def price_bond(face_value, coupon_rate, market_rate, periods):
    """
    Calculates the price of a traditional fixed-coupon bond using Discounted Cash Flow (DCF).
    
    Parameters:
    face_value (float): The bond's face value (e.g., 1000)
    coupon_rate (float): The bond's coupon rate (as a decimal, e.g., 0.05 for 5%)
    market_rate (float): The current market discount rate (as a decimal)
    periods (int): The number of periods until maturity
    
    Returns:
    float: Present value of the bond
    """
    cash_flows = np.array([(face_value * coupon_rate) / (1 + market_rate) ** t for t in range(1, periods + 1)])
    final_value = face_value / (1 + market_rate) ** periods  # Discounted face value
    return np.sum(cash_flows) + final_value

# Example Parameters
face_value = 1000  # Bond face value
coupon_rate = 0.05  # 5% annual coupon
market_rate = 0.04  # Market discount rate of 4%
periods = 10  # 10 years to maturity

# Compute Bond Price
bond_price = price_bond(face_value, coupon_rate, market_rate, periods)
print(f"Traditional Bond Price: ${bond_price:.2f}")


In [None]:
def binomial_tree_bond(face_value, coupon_rate, up_factor, down_factor, risk_free_rate, periods, option_type=None):
    """
    Prices a bond using a binomial tree approach, allowing for call or put options.
    
    Parameters:
    face_value (float): The bond's face value
    coupon_rate (float): The bond's annual coupon rate
    up_factor (float): Upward movement factor for rates
    down_factor (float): Downward movement factor for rates
    risk_free_rate (float): Risk-free rate used for discounting
    periods (int): Number of periods for the binomial tree
    option_type (str): 'call' for callable bond, 'put' for putable bond, or None
    
    Returns:
    float: Price of the bond considering the binomial tree and embedded option (if any)
    """
    # Initialize tree with terminal bond values
    bond_tree = np.zeros((periods + 1, periods + 1))

    # Calculate future bond values at maturity
    for i in range(periods + 1):
        bond_tree[i, periods] = face_value + (face_value * coupon_rate)

    # Work backward through tree
    for j in range(periods - 1, -1, -1):
        for i in range(j + 1):
            expected_value = 0.5 * (bond_tree[i, j + 1] / (1 + risk_free_rate) + bond_tree[i + 1, j + 1] / (1 + risk_free_rate))

            if option_type == 'call':
                bond_tree[i, j] = min(face_value, expected_value)  # Callable bond logic
            elif option_type == 'put':
                bond_tree[i, j] = max(face_value, expected_value)  # Putable bond logic
            else:
                bond_tree[i, j] = expected_value  # Normal bond pricing

    return bond_tree[0, 0]  # Return root node value

# Example Parameters
up_factor = 1.1
down_factor = 0.9
risk_free_rate = 0.04
option_type = "call"  # Can be None, 'call', or 'put'

# Compute Bond Prices
binomial_bond_price = binomial_tree_bond(face_value, coupon_rate, up_factor, down_factor, risk_free_rate, periods, option_type)
print(f"Bond Price (Binomial Tree - {option_type} option): ${binomial_bond_price:.2f}")


In [None]:
# ==============================
# SECTION 3: DURATION & CONVEXITY CALCULATION
# ==============================

def calculate_duration(face_value, coupon_rate, market_rate, periods):
    """
    Computes Macaulay Duration and Modified Duration of a bond.
    
    Parameters:
    face_value (float): The bond's face value
    coupon_rate (float): The bond's coupon rate
    market_rate (float): The current market discount rate
    periods (int): Number of periods
    
    Returns:
    tuple: (Macaulay Duration, Modified Duration)
    """
    cash_flows = np.array([(face_value * coupon_rate) / (1 + market_rate) ** t for t in range(1, periods + 1)])
    weighted_times = np.array([t * cf for t, cf in enumerate(cash_flows, 1)])
    bond_price = np.sum(cash_flows) + face_value / (1 + market_rate) ** periods

    macaulay_duration = np.sum(weighted_times) / bond_price
    modified_duration = macaulay_duration / (1 + market_rate)

    return macaulay_duration, modified_duration

def calculate_convexity(face_value, coupon_rate, market_rate, periods):
    """
    Computes the convexity of a bond.
    
    Parameters:
    face_value (float): The bond's face value
    coupon_rate (float): The bond's coupon rate
    market_rate (float): The current market discount rate
    periods (int): Number of periods
    
    Returns:
    float: Convexity of the bond
    """
    cash_flows = np.array([(face_value * coupon_rate) / (1 + market_rate) ** t for t in range(1, periods + 1)])
    weighted_times = np.array([(t * (t + 1) * cf) for t, cf in enumerate(cash_flows, 1)])
    bond_price = np.sum(cash_flows) + face_value / (1 + market_rate) ** periods

    convexity = np.sum(weighted_times) / (bond_price * (1 + market_rate) ** 2)
    return convexity

# Compute Duration & Convexity
macaulay_duration, modified_duration = calculate_duration(face_value, coupon_rate, market_rate, periods)
convexity = calculate_convexity(face_value, coupon_rate, market_rate, periods)

print(f"Macaulay Duration: {macaulay_duration:.2f} years")
print(f"Modified Duration: {modified_duration:.2f} years")
print(f"Convexity: {convexity:.2f}")


In [None]:
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from sklearn.metrics import mean_absolute_error, mean_squared_error
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.formula.api import ols
import statsmodels.api as sm


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Example Backtesting Framework
def backtest_strategy(actual_rates, predicted_rates):
    """
    Simple backtesting framework comparing actual vs. predicted rates.
    
    Parameters:
    actual_rates (array-like): Historical actual interest rates
    predicted_rates (array-like): Predicted interest rates from the model
    
    Returns:
    pd.DataFrame: Error metrics table
    """
    errors = actual_rates - predicted_rates
    
    # Calculate error metrics
    mae = np.mean(np.abs(errors))
    mse = np.mean(errors ** 2)
    rmse = np.sqrt(mse)
    
    # Create a DataFrame for structured results
    results_df = pd.DataFrame({
        'Metric': ['Mean Absolute Error (MAE)', 'Mean Squared Error (MSE)', 'Root Mean Squared Error (RMSE)'],
        'Value': [mae, mse, rmse]
    })

    # Plot actual vs predicted rates
    plt.figure(figsize=(10, 6))
    plt.plot(actual_rates, label='Actual Rates', marker='o')
    plt.plot(predicted_rates, label='Predicted Rates', marker='x')
    plt.legend()
    plt.title('Backtesting: Actual vs. Predicted Interest Rates')
    plt.show()
    
    return results_df

# Sample Data
actual_rates = np.array([0.03, 0.035, 0.037, 0.04, 0.042])
predicted_rates = np.array([0.032, 0.034, 0.036, 0.041, 0.043])

# Run Backtest
backtest_results = backtest_strategy(actual_rates, predicted_rates)
print(backtest_results)


In [None]:
def adf_test(time_series):
    """
    Perform Augmented Dickey-Fuller test for stationarity.
    
    Parameters:
    time_series (array-like): The time series data
    
    Returns:
    pd.DataFrame: ADF test results
    """
    result = adfuller(time_series)
    
    adf_results = pd.DataFrame({
        'Test Statistic': [result[0]],
        'p-value': [result[1]],
        'Critical Values': [result[4]]
    })

    return adf_results

# Sensitivity Analysis Function
def sensitivity_analysis(base_value, sensitivity_range, factor):
    """
    Perform a simple sensitivity analysis.
    
    Parameters:
    base_value (float): The initial base value (e.g., bond price)
    sensitivity_range (array-like): Range of changes (e.g., interest rate changes)
    factor (float): Sensitivity factor (e.g., modified duration)
    
    Returns:
    pd.DataFrame: Sensitivity analysis results
    """
    analysis = {'Rate Change': [], 'Adjusted Value': []}
    
    for change in sensitivity_range:
        adjusted_value = base_value * (1 + factor * change)
        analysis['Rate Change'].append(change)
        analysis['Adjusted Value'].append(adjusted_value)
    
    return pd.DataFrame(analysis)

# Run ADF Test
adf_results = adf_test(actual_rates)
print("ADF Test Results:\n", adf_results)

# Run Sensitivity Analysis
sensitivity_results = sensitivity_analysis(1000, np.linspace(-0.01, 0.01, 5), -7.5)
print("Sensitivity Analysis Results:\n", sensitivity_results)


In [None]:
def calculate_aic_bic(time_series, lags):
    """
    Calculate AIC and BIC for model selection.
    
    Parameters:
    time_series (array-like): The time series data
    lags (int): Number of lags for the model
    
    Returns:
    pd.DataFrame: AIC and BIC values
    """
    model = sm.tsa.AutoReg(time_series, lags=lags).fit()
    
    return pd.DataFrame({
        'Model': [f'AR({lags})'],
        'AIC': [model.aic],
        'BIC': [model.bic]
    })

# ANOVA Table for Model Comparison
def anova_table(models_dict):
    """
    Creates an ANOVA-style table for comparing multiple models.
    
    Parameters:
    models_dict (dict): Dictionary where keys are model names and values are fitted models.
    
    Returns:
    pd.DataFrame: ANOVA-style table with model comparison.
    """
    model_names = []
    rss_values = []
    aic_values = []
    bic_values = []
    
    for model_name, model in models_dict.items():
        model_names.append(model_name)
        rss_values.append(np.sum(model.resid ** 2))  # Residual Sum of Squares
        aic_values.append(model.aic)
        bic_values.append(model.bic)
    
    return pd.DataFrame({
        'Model': model_names,
        'RSS': rss_values,
        'AIC': aic_values,
        'BIC': bic_values
    })

# Generate Models for Comparison
model1 = sm.tsa.AutoReg(actual_rates, lags=1).fit()
model2 = sm.tsa.AutoReg(actual_rates, lags=2).fit()

# Compare Models Using ANOVA Table
anova_results = anova_table({"AR(1)": model1, "AR(2)": model2})
print("ANOVA Model Comparison Table:\n", anova_results)
