In [None]:
"""
ML-Driven Yield Curve Forecasting

This script:
    -Loads and cleans yield curve data from Excel files
    -Creates features like lagged rates, rolling averages, partial derivatives w.r.t. 
    time and maturity by modelling a multivariate function from SmoothBivariateSpline
    -Trains the Ridge, Random Forest and XGBoost machine learning models
    -Makes multi-day forecasts of future yield curves
"""

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.interpolate import SmoothBivariateSpline
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from sklearn.linear_model import Ridge
from sklearn.ensemble import RandomForestRegressor
import xgboost as xgb
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

In [None]:
#DATA LOADING AND CLEANING

def combine_dataframes(filename_1 : str, filename_2 : str, sheetName : str, skipRows : int, rename_startcol_1 : str, rename_loopcol_1 : str, rename_startcol_2 : str, rename_loopcol_2 : str) -> pd.DataFrame:
    """
    Combines two Excel files of yield curve data into one dataset.
    
    The UK yield data is in two separate files with different column names so the column
    names are changed and the two datasets are concatenated.
    
    Args:
        filename_1: Path to older Excel file
        filename_2: Path to newer Excel file
        sheetName: Which sheet to read from both files
        skipRows: How many header rows to skip
        rename_startcol_1: What the date column is called in file 1
        rename_loopcol_1: Pattern for maturity columns in file 1
        rename_startcol_2: What the date column is called in file 2
        rename_loopcol_2: Pattern for maturity columns in file 2
        
    Returns:
        Combined dataframe with clearer column names
    """
    
    #Read the Excel files
    data_1 = pd.read_excel(filename_1, sheet_name=sheetName, skiprows=skipRows)
    data_2 = pd.read_excel(filename_2, sheet_name=sheetName, skiprows=skipRows)

    #Rename the specified columns to 'Date'
    data_1.rename(columns={rename_startcol_1: 'Date'}, inplace=True)
    data_2.rename(columns={rename_startcol_2: 'Date'}, inplace=True)

    #Rename columns in a loop based on the starting substring
    i = 0
    for col in data_1.columns:
        if col.startswith(rename_loopcol_1):
            data_1.rename(columns={col: f"{i / 2:.1f}"}, inplace=True)
        i += 1
    j = 0
    for col in data_2.columns:
        if col.startswith(rename_loopcol_2):
            data_2.rename(columns={col: f"{j / 2:.1f}"}, inplace=True)
        j += 1
    
    #Combine the two DataFrames
    combined_data = pd.concat([data_1, data_2], ignore_index=True)
    
    return combined_data

def clean_up_data(data: pd.DataFrame) -> pd.DataFrame:
    """
    Clean up the raw yield data to make it ready for analysis.
    
    Converts text dates to proper datetime objects.
    Removes rows where the last column has an NaN value because if this value is empty 
    then the rest of the row is empty.
    Gets rid of columns where more than 25% of its values are NaN
    
    Args:
        data: Raw yield curve data with Date column and maturity columns
        
    Returns:
        Clean dataframe with dates as index and no missing data issues
    """
    #Convert 'Date' column to datetime format so that date-based operations can be performed
    data['Date'] = pd.to_datetime(data['Date'])

    #Now modify so that we can index by date
    data.set_index('Date', inplace=True)

    #Remove all rows in last column with NaN values
    data.dropna(subset=[data.columns[-1]], inplace=True)

    #Decide to either keep or remove first column if more than a quarter of its values are NaN
    if (data[data.columns[0]].isna().sum()) / len(data[data.columns[0]]) > 0.25:
        data.drop(columns=[data.columns[0]], inplace=True)
    
    return data

def modify_data_for_modelling(data: pd.DataFrame) -> pd.DataFrame:
    """
    Converts the DataFrame so that each row is a (time, maturity) -> interest rate point.

    Yield curve data is a spreadsheet with dates as rows and maturities as columns.
    Converted into individual observations because the machine learning models need this
    format.
    
    Args:
        data: Wide format yield data (dates x maturities)
        
    Returns:
        Long format data with columns: Time, Maturity, Rate
    """
    
    #Put Date from an index back into a column
    data_reset = data.reset_index()

    #Remove 'Date' column after resetting index
    data_reset.drop(columns=['Date'], inplace=True)

    #Find the start time (first date in the dataset) and convert dates to number of days since start date using old data index
    start_date = data.index.min()
    times = (data.index - start_date).days #Uses data not data_reset because data_reset has lost the date index

    #Now insert times as the first column in data_reset
    data_reset.insert(loc=0, column='Time', value=times)

    #Now melt the DataFrame to long format (make each row a (time, maturity) ↦ interest rate point)
    long_data = data_reset.melt(id_vars=['Time'], var_name='Maturity', value_name='Rate')

    #Convert 'Maturity' column to float type
    long_data['Maturity'] = long_data['Maturity'].astype(float)

    return long_data

def sort_chronologically(data: pd.DataFrame) -> pd.DataFrame:
    """
    Sorts the data by time then maturity.
    """
    sorted_data = data.sort_values(["Time", "Maturity"]).copy()
    return sorted_data

In [None]:
#YIELD SURFACE MODELING

def model_multivariate_function(values, s: float) -> SmoothBivariateSpline:
    """
    Models a continuous multivariate function of interest rate = f(time, maturity).
    
    Args:
        values: Tuple of (time_points, maturity_points, rate_values)
        s: Smoothing factor
        
    Returns:
        A model that can interpolate rates and calculate derivatives
    """
    #Unpack the input tuple
    timeValues, maturityValues, rateValues = values

    #Create the model using SmoothBivariateSpline
    model = SmoothBivariateSpline(timeValues, maturityValues, rateValues, s=s)

    return model

def test_multivariate_model(model: SmoothBivariateSpline, values) -> None:
    """
    Tests the multivariate model by calculating and printing RMSE and R² metrics.
    """
    #Unpack the input tuple
    timeValues, maturityValues, rateValues = values
    
    #Get the predicted rates from the model
    predicted_rates = model.ev(timeValues, maturityValues)

    #Calculate RMSE and R²
    rmse = np.sqrt(mean_squared_error(rateValues, predicted_rates))
    r2 = r2_score(rateValues, predicted_rates)
    print(f"Root Mean Squared Error (RMSE) for multivariate model: {rmse}")
    print(f"R-squared (R²) for multivariate model: {r2}")

def calculate_partial_derivatives(model: SmoothBivariateSpline, data, first_derivative: str, second_derivative: str):
    """
    Calculates the partial derivatives of the model with respect to time and maturity.

    Partial derivative of rate w.r.t. time is added as a feature.
    Partial derivative of rate w.r.t. maturity is added as a feature.

    Args:
    model: Fitted smooth surface
    data: Points where we want to calculate derivatives
    first_derivative: Column name for time variable
    second_derivative: Column name for maturity variable
        
    Returns:
        Two arrays: (time_derivatives, maturity_derivatives)
    """
    #Calculate derivative w.r.t. time
    first_partial_derivative = model.ev(
        data[first_derivative].values, data[second_derivative].values, dx=1, dy=0 #w.r.t. first variable (time)
        )
    
    #Calculate derivative w.r.t. maturity
    second_partial_derivative = model.ev(
        data[first_derivative].values, data[second_derivative].values, dx=0, dy=1 #w.r.t. second variable (maturity)
    )

    return first_partial_derivative, second_partial_derivative

def add_partial_derivatives(data: pd.DataFrame, partial_derivatives) -> pd.DataFrame:
    """
    Adds the calculated partial derivatives as new columns to the original DataFrame.
    """
    #Unpack the input tuple
    partial_derivative_time, partial_derivative_maturity = partial_derivatives

    #Add the partial derivatives as new columns to the DataFrame
    data['Partial_Derivative_Time'] = partial_derivative_time
    data['Partial_Derivative_Maturity'] = partial_derivative_maturity

    return data

In [None]:
#CREATING FEATURES FOR MACHINE LEARNING

def get_lagged_data(data: pd.DataFrame, lag: int, column: str) -> pd.DataFrame:
    """
    Generates a new DataFrame with lagged values of the chosen column by the specified lag.
    Lagging is done within each (Time, Maturity) group.

    Args:
        data: Input dataframe
        lag: How many days back to look
        column: Which column to create lagged version of
        
    Returns:
        Dataframe with new lagged column added
    """
    #Shifts the chosen column by the chosen lag and stores it in a new column
    lagged_data = data.copy()
    lagged_data[f"{column}_lag_{lag}"] = (
        lagged_data.groupby(["Maturity"])[column].shift(lag)
    )
    return lagged_data

def get_rolling_mean(data: pd.DataFrame, window: int, column: str) -> pd.DataFrame:
    """
    Generates a new DataFrame with rolling mean values of the chosen column over 
    the specified window size.

    Helps smooth out daily noise and captures the underlying trend.

    Args:
        data: Input dataframe
        window: How many days to average over
        column: Which column to calculate average for
        
    Returns:
        Dataframe with new rolling mean column
    """
    rolling_data = data.copy()
    rolling_data[f"{column}_rolling_mean_{window}"] = (rolling_data.groupby("Maturity")[column].transform(lambda x: x.rolling(window, min_periods=1).mean()).shift(1))
    return rolling_data

def get_rolling_std(data: pd.DataFrame, window: int, column: str) -> pd.DataFrame:
    """
    Generates a new DataFrame with rolling standard deviation values of the 
    chosen column over the specified window size.

    These will be added to the features.

    Args:
        data: Input dataframe
        window: How many days to look at
        column: Which column to calculate volatility for
        
    Returns:
        Dataframe with new rolling standard deviation column
    """
    rolling_data = data.copy()
    rolling_data[f"{column}_rolling_std_{window}"] = (rolling_data.groupby("Maturity")[column].transform(lambda x: x.rolling(window, min_periods=2).std()).shift(1))
    return rolling_data

def remove_na_rows(data: pd.DataFrame) -> pd.DataFrame:
    """
    Removes rows with NaN values from the DataFrame.

    Some rows will be NaN because of lagged columns.
    """
    cleaned_data = data.dropna().copy()
    return cleaned_data

In [None]:
#PREPARE DATA FOR MACHINE LEARNING

def get_X(data: pd.DataFrame, feature_columns: list[str]) -> pd.DataFrame:
    """
    Extracts the feature columns from the DataFrame to create X.
    """
    X = data[feature_columns].copy()
    X_sorted = X.sort_values("Time").copy()
    return X_sorted

def get_y(data: pd.DataFrame, target_column: str, X_sorted_df : pd.DataFrame) -> pd.Series:
    """
    Extracts the target column from the DataFrame to create y.
    """
    y = data[target_column].copy()
    y_sorted = y.loc[X_sorted_df.index].copy()  #y aligned with sorted X
    return y_sorted

def split_data(X: pd.DataFrame, y: pd.Series, train_size: float) -> tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
    """
    Splits the feature set X and target variable y into training and testing 
    sets based on the specified train size.

    Splits so that earlier data is used for training and later data is used for testing.

    Args:
        X: Features
        y: Target variable
        train_size: What fraction to use for training (e.g. 0.8 = 80%)
        
    Returns:
        (X_train, X_test, y_train, y_test)
    """
    #Chooses where to split based on the train_size
    split_index = int(len(X) * train_size)

    #Splits so that model is trained on earlier data and tested on later data
    X_train = X.iloc[:split_index].copy()
    X_test = X.iloc[split_index:].copy()
    y_train = y.iloc[:split_index].copy()
    y_test = y.iloc[split_index:].copy()

    return X_train, X_test, y_train, y_test

def consistent_indexes(data: pd.DataFrame, X_sorted : pd.DataFrame) -> pd.DataFrame:
    """
    Matches row indices.
    """
    data_consistent = data.loc[X_sorted.index].copy()
    return data_consistent

In [None]:
#MACHINE LEARNING MODELS

def ridge_regression_model(X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, alpha: float):
    """
    Trains a Ridge Regression model using the training feature set X_train and 
    target variable y_train with the specified alpha.

    Tests RMSE and R2 of the model.

    Works well with linear relationships.

    Args:
        X_train, y_train: Training data
        X_test, y_test: Test data  
        alpha: Regularization strength (higher = simpler model)
        
    Returns:
        Trained Ridge model
    """
    #Trains model
    model = Ridge()
    model.fit(X_train, y_train)
    predictions_ridge = model.predict(X_test)
    
    #Tests
    rmse_ridge = np.sqrt(mean_squared_error(y_test, predictions_ridge))
    r2_ridge = r2_score(y_test, predictions_ridge)
    print(f"RMSE for ridge: {rmse_ridge:.6f}, R²: {r2_ridge:.4f}")
    return model

def random_forest_regression_model(X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, n_estimators: int, random_state: int):
    """
    Trains a Random Forest Regression model using the training feature set X_train and 
    target variable y_train with the specified number of estimators and random state.

    Tests RMSE and R2 of the model.

    Works well at capturing non-linear interactions between features.

    Args:
        X_train, y_train: Training data
        X_test, y_test: Test data
        n_estimators: How many trees to build
        random_state: Random seed for reproducible results
        
    Returns:
        Trained Random Forest model
    """
    #Trains model
    model_rf = RandomForestRegressor(n_estimators=n_estimators, random_state=random_state)
    model_rf.fit(X_train, y_train)
    predictions_rf = model_rf.predict(X_test)

    #Tests
    rmse_rf = np.sqrt(mean_squared_error(y_test, predictions_rf))
    r2_rf = r2_score(y_test, predictions_rf)
    print(f"Random Forest RMSE: {rmse_rf:.4f}, R²: {r2_rf:.4f}")
    return model_rf

def XGB_boost_model(X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, n_estimators: int, learning_rate: float, max_depth: int, random_state: int):
    """
    Trains an XGBoost Regression model using the training feature set X_train and 
    target variable y_train with the specified parameters.

    Tests RMSE and R2 of the model.

    Works well with non-linear patterns.

    Args:
        X_train, y_train: Training data
        X_test, y_test: Test data
        n_estimators: How many trees to build
        learning_rate: How much each tree contributes
        max_depth: How complex individual trees can be
        random_state: Random seed
        
    Returns:
        Trained XGBoost model
    """
    #Trains model
    model_xgb = xgb.XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate, random_state=random_state)
    model_xgb.fit(X_train, y_train)
    predictions_xgb = model_xgb.predict(X_test)

    #Tests
    rmse_xgb = np.sqrt(mean_squared_error(y_test, predictions_xgb))
    r2_xgb = r2_score(y_test, predictions_xgb)
    print(f"RMSE for xgb: {rmse_xgb:.4f}, R²: {r2_xgb:.4f}")
    return model_xgb

In [None]:
#FULL MODEL TESTING PIPELINE FUNCTIONS

def clean_and_set_features(filename_1: str, filename_2: str, sheetName: str):
    """
    Complete data preparation pipeline.
    
    Runs through all the data cleaning and feature creation steps in the right order.
    
    Returns:
        Clean dataframe with all features ready for machine learning
    """
    #Formats data so that it can be worked on more easily
    data = combine_dataframes(filename_1=filename_1, filename_2=filename_2, sheetName=sheetName, skipRows=4, rename_startcol_1='Unnamed: 0', rename_loopcol_1='Unnamed', rename_startcol_2='Refresh', rename_loopcol_2='Refresh')
    cleaned_data = clean_up_data(data)
    modified_data = modify_data_for_modelling(cleaned_data)
    ordered_data = sort_chronologically(modified_data)

    #Gets all the lagged rate columns
    lagged_data_1 = get_lagged_data(ordered_data, lag=1, column='Rate')
    lagged_data_5 = get_lagged_data(lagged_data_1, lag=5, column='Rate')
    lagged_data_20 = get_lagged_data(lagged_data_5, lag=20, column='Rate')
    lagged_data = lagged_data_20

    #Gets the rolling statistics
    rolling_mean_data = get_rolling_mean(lagged_data, window=5, column='Rate')
    rolling_std_data = get_rolling_std(rolling_mean_data, window=20, column='Rate')

    #Removes na rows due to lagged/rolling columns
    final_data = remove_na_rows(rolling_std_data)
    
    return final_data

def train_and_test_ridge_rf_xgb(data: pd.DataFrame):
    """
    Train and test all three models.
    """
    #Splits data for training/testing machine learning model
    X = get_X(data, feature_columns=['Time', 'Maturity', 'Rate_lag_1', 'Rate_lag_5', 'Rate_lag_20', 'Rate_rolling_mean_5', 'Rate_rolling_std_20'])
    y = get_y(data, target_column='Rate', X_sorted_df=X)
    X_train, X_test, y_train, y_test = split_data(X, y, train_size=0.8)

    #Gets values to model the multivariate function
    multivariate_model_values = (
        X_train['Time'].values,
        X_train['Maturity'].values,
        y_train.values
    )

    #Models the multivariate function
    multivariate_model = model_multivariate_function(values=multivariate_model_values, s=len(multivariate_model_values[2])/150) #s found by trial and error
    test_multivariate_model(multivariate_model, multivariate_model_values)

    #Adds the partial derivative columns to each X_train and X_test
    partial_time_train, partial_maturity_train = calculate_partial_derivatives(multivariate_model, X_train, 'Time', 'Maturity')
    X_train = add_partial_derivatives(X_train, (partial_time_train, partial_maturity_train))

    partial_time_test, partial_maturity_test = calculate_partial_derivatives(multivariate_model, X_test, 'Time', 'Maturity')
    X_test = add_partial_derivatives(X_test, (partial_time_test, partial_maturity_test))

    #Trains the machine learning models
    ridge_model = ridge_regression_model(X_train, y_train, X_test, y_test, alpha=1.0)
    rf_model = random_forest_regression_model(X_train, y_train, X_test, y_test, n_estimators=100, random_state=42)
    xgb_model = XGB_boost_model(X_train, y_train, X_test, y_test, n_estimators=100, learning_rate=0.1, max_depth=3, random_state=42)

    return ridge_model, rf_model, xgb_model

In [None]:
#FORECASTING FUNCTIONS

def train_models_all_data(data: pd.DataFrame, feature_columns):
    """
    Train models on the full dataset to predict future rates. Returns the models.
    """
    #Gets training values
    X = get_X(data, feature_columns)
    y = get_y(data, 'Rate', X)

    #Trains ridge, random forest and XGBoost
    ridge_model = Ridge(alpha=1.0)
    ridge_model.fit(X, y)

    rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
    rf_model.fit(X, y)

    xgb_model = xgb.XGBRegressor(n_estimators=100, learning_rate=0.1, max_depth=3, random_state=42)
    xgb_model.fit(X, y)

    return ridge_model, rf_model, xgb_model

def create_future_dataframe(past_data: pd.DataFrame, multivariate_model: SmoothBivariateSpline, days_ahead: int) -> pd.DataFrame:
    """
    Set up a dataframe for making future predictions.
    
    Creates the features and dataframe needed to predict the next days_ahead days.

    Args:
        past_data: Historical data to build on
        multivariate_model: Smooth surface model
        days_ahead: How many days to forecast
        
    Returns:
        Dataframe for forecasting
    """
    #Gets the last time in the past data
    last_time = past_data['Time'].max()

    #Get the times for the days chosen to be forecasted
    future_times = np.array([last_time + i for i in range(1, days_ahead + 1)])

    #Keep same maturities as before
    maturities = past_data['Maturity'].unique()
    
    #Creates the dataframe using the future times and the same maturities as before
    future_grid = pd.DataFrame([(time, maturity) for time in future_times for maturity in maturities], columns=['Time', 'Maturity'])
    future_grid = sort_chronologically(future_grid)

    #Create the Predicted Rate column in the new dataframe
    future_grid['Predicted_Rate'] = np.nan

    #Add the last 20 values from the past data
    last_20_times = past_data['Time'].unique()[-20:]
    last_20_data = past_data[past_data['Time'].isin(last_20_times)]
    last_20_data = sort_chronologically(last_20_data)

    last_20_data = last_20_data.rename(columns={"Rate": "Predicted_Rate"})
    future_grid = pd.concat([last_20_data, future_grid], ignore_index=True)
    future_grid = sort_chronologically(future_grid)

    #Adds the partial derivatives
    partial_time_future, partial_maturity_future = calculate_partial_derivatives(multivariate_model, future_grid, 'Time', 'Maturity')
    future_grid = add_partial_derivatives(future_grid, (partial_time_future, partial_maturity_future))

    return future_grid

def predict_next_day_rates(model, future_data: pd.DataFrame, feature_columns: list, day_to_predict: int) -> pd.DataFrame:
    """
    Predict rates for one specific day and update lagged features.
    
    The predicted rates are then added to the lagged rate columns and
    rolling statistics to be used as features for the next day.
    
    Args:
        model: Trained ML model
        future_data: Dataframe with historical context and future periods
        feature_columns: Which features the model uses
        day_to_predict: Which day (time value) to make predictions for
        
    Returns:
        Updated dataframe with predictions and updated lagged features
    """
    #Extracts the data for the day chosen to predict
    current_day_data = future_data[future_data['Time'] == day_to_predict].copy()
    
    #Generate predictions for all maturities on the day
    predicted_rates = model.predict(current_day_data[feature_columns])
    
    #Store predictions in the main dataframe
    prediction_mask = future_data['Time'] == day_to_predict
    future_data.loc[prediction_mask, 'Predicted_Rate'] = predicted_rates
    
    #Update lagged features for the next day
    next_day = day_to_predict + 1

    #Doesn't add lagged features if at last day
    if next_day != future_data['Time'].max() + 1:
        for i, predicted_rate in enumerate(predicted_rates):
            current_maturity = current_day_data.iloc[i]['Maturity']
            next_day_mask = ((future_data['Time'] == next_day) & 
                            (future_data['Maturity'] == current_maturity))
            
            if next_day_mask.any():
                #Get historical rate data for this maturity up to current day
                maturity_history = future_data[
                    (future_data['Maturity'] == current_maturity) & 
                    (future_data['Time'] <= day_to_predict)
                ].sort_values('Time')
                
                #Combine original rates with predicted rates, removing any missing values
                combined_rates = maturity_history['Predicted_Rate'].fillna(maturity_history['Predicted_Rate'])
                valid_rates = combined_rates.dropna()
                
                #Update 1-day lag with the rate just predicted
                future_data.loc[next_day_mask, 'Rate_lag_1'] = predicted_rate
                
                #Update 5-day lag and 5-day rolling mean
                future_data.loc[next_day_mask, 'Rate_lag_5'] = valid_rates.iloc[-5]
                recent_5_rates = valid_rates.iloc[-5:]
                future_data.loc[next_day_mask, 'Rate_rolling_mean_5'] = recent_5_rates.mean()
                
                #Update 20-day lag and 20-day rolling standard deviation
                future_data.loc[next_day_mask, 'Rate_lag_20'] = valid_rates.iloc[-20]
                recent_20_rates = valid_rates.iloc[-20:]
                future_data.loc[next_day_mask, 'Rate_rolling_std_20'] = recent_20_rates.std()
        
    return future_data

def predict_future_rates(ML_model, past_data: pd.DataFrame, future_data: pd.DataFrame, feature_columns, future_days: int) -> pd.DataFrame:
    """
    Forecasts entire dataset by predicting one day at a time.
    
    Predict day 1, add those predictions to the features of day 2, etc.

    Args:
        ML_model: Trained machine learning model
        past_data: Historical data 
        future_data: Future periods set up with create_future_dataframe()
        feature_columns: List of feature column names
        future_days: How many days ahead to predict
        
    Returns:
        Dataframe with predictions for all future days
    """
    #Finds the last time and uses as the first future time
    first_future_time = past_data['Time'].max()

    #Forecasts all chosen times, adding each forecast to the features of the next prediction
    for i in range(0, future_days + 1):
        prediction = predict_next_day_rates(ML_model, future_data, feature_columns, day_to_predict=first_future_time + i)
        future_data = prediction
    return prediction

In [None]:
#DISPLAY FORECAST

def get_clean_forecasts(past_data: pd.DataFrame, forecasted_data: pd.DataFrame) -> pd.DataFrame:
    """
    Converts forecasted rates to the original file format.
    """
    #Gets last time in the past data
    last_past_time = past_data['Time'].max()

    #Gets only future data
    future_data = forecasted_data[forecasted_data['Time'] >= last_past_time].copy()
    
    #Gets correct column name
    rate_column = 'Predicted_Rate' if 'Predicted_Rate' in future_data.columns else 'Rate'
    
    #Convert Time to actual dates starting from Jan 4th 2016 (start of original data)
    start_date = pd.to_datetime('2016-01-04')
    future_data['Time'] = start_date + pd.to_timedelta(future_data['Time'] - 1, unit='days')
    
    #Converst maturities to columns
    wide_format = future_data.pivot(index='Time', columns='Maturity', values=rate_column)
    wide_format.columns.name = None
    wide_format.columns = [f"{col:.1f}" for col in wide_format.columns]
    
    #Sorts the columns by increasing maturity
    sorted_columns = sorted(wide_format.columns, key=lambda x: float(x))
    return wide_format[sorted_columns]

In [None]:
#GRAPH FORECAST

def plot_yield_curve_evolution(clean_forecast, num_curves=6):
    """
    Plot yield curves at different time points to show evolution.
    """
    #Create figure
    fig, ax = plt.subplots(figsize=(12, 8))
    
    #Select evenly spaced dates
    dates = clean_forecast.index
    step = len(dates) // num_curves
    selected_dates = dates[::step][:num_curves]
    
    #Convert maturity names from string to float
    maturities = [float(col) for col in clean_forecast.columns]
    
    #Plot each date as a separate yield curve
    for date in selected_dates:
        rates = clean_forecast.loc[date].values
        ax.plot(maturities, rates, marker='o', label=date.strftime('%Y-%m-%d'))
    
    #Plot configuration
    ax.set_xlabel('Maturity (Years)')
    ax.set_ylabel('Interest Rate (%)')
    ax.set_title('Forecasted Yield Curve Evolution')
    ax.legend()
    ax.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

def plot_3d_yield_surface(clean_forecast, subsample=10):
    """
    Create 3D surface plot of the yield curve over time.
    """
    #Initialise 3D plot
    fig = plt.figure(figsize=(14, 10))
    ax = fig.add_subplot(111, projection='3d')
    
    #Subsample data for cleaner visualization
    forecast_subset = clean_forecast.iloc[::subsample]
    
    #Create meshgrid for 3D surface plotting
    #Convert dates to numeric format
    dates_numeric = mdates.date2num(forecast_subset.index)
    maturities = [float(col) for col in forecast_subset.columns]
    
    #Generate coordinate grids for surface plotting
    X, Y = np.meshgrid(dates_numeric, maturities)
    Z = forecast_subset.values.T
    
    #Create 3D surface plot
    surface = ax.plot_surface(X, Y, Z, cmap='viridis', alpha=0.8)
    
    #Format axes
    ax.set_xlabel('Date')
    ax.set_ylabel('Maturity (Years)')
    ax.set_zlabel('Interest Rate (%)')
    ax.set_title('3D Yield Surface Forecast')
    
    #Format date axis
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
    
    #Add colourbar
    fig.colorbar(surface, shrink=0.5, aspect=5)
    plt.tight_layout()
    plt.show()

def plot_rate_time_series(clean_forecast, selected_maturities=[1.0, 2.0, 5.0, 10.0, 30.0, 40.0]):
    """
    Plot time series for specific maturities.
    """
    #Create figure for time series plot
    fig, ax = plt.subplots(figsize=(14, 8))
    
    #Plot each selected maturity as a separate time series line
    for maturity in selected_maturities:
        col_name = f"{maturity:.1f}"
        #Check if maturity exists in data before plotting
        if col_name in clean_forecast.columns:
            ax.plot(clean_forecast.index, clean_forecast[col_name], 
                   label=f'{maturity}-Year', linewidth=2)
    
    #Configurations
    ax.set_xlabel('Date')
    ax.set_ylabel('Interest Rate (%)')
    ax.set_title('Forecasted Interest Rate Time Series by Maturity')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    #Format x-axis dates
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
    ax.xaxis.set_major_locator(mdates.MonthLocator(interval=2))
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

In [None]:
#Get the data in a usable format
data = clean_and_set_features("GLC Nominal daily data_2016 to 2024.xlsx", "GLC Nominal daily data_2025 to present.xlsx", "4. spot curve")

#Test the different types of machine learning models on the data
ridge_model_train_and_test, rf_model_train_and_test, xgb_model_train_and_test = train_and_test_ridge_rf_xgb(data)

#Get the whole data to be used to train the machine learning models
full_training_values = (
    data['Time'].values,
    data['Maturity'].values,
    data['Rate'].values
)

#Train the multivariate function interest rate = f(time, maturity) 
multivariate_model = model_multivariate_function(values=full_training_values, s=len(full_training_values[2])/150)

#Calculate and add the partial derivatives of the multivariate model to the data 
partial_time, partial_maturity = calculate_partial_derivatives(multivariate_model, data, 'Time', 'Maturity')
data = add_partial_derivatives(data, (partial_time, partial_maturity))

#Train the machine learning models on all the data
ridge_model, rf_model, xgb_model = train_models_all_data(data, ['Time', 'Maturity', 'Rate_lag_1', 'Rate_lag_5', 'Rate_lag_20', 'Rate_rolling_mean_5', 'Rate_rolling_std_20', 'Partial_Derivative_Time', 'Partial_Derivative_Maturity'])

In [None]:
#Create the dataframe where the forecasts will be stored
future_df = create_future_dataframe(past_data=data, multivariate_model=multivariate_model, days_ahead=365)

#Forecast the future using the machine learning models trained on all the data
future_using_ridge = predict_future_rates(ML_model=ridge_model, past_data=data, future_data=future_df.copy(), future_days=365, feature_columns=['Time', 'Maturity', 'Rate_lag_1', 'Rate_lag_5', 'Rate_lag_20', 'Rate_rolling_mean_5', 'Rate_rolling_std_20', 'Partial_Derivative_Time', 'Partial_Derivative_Maturity'])
future_using_rf = predict_future_rates(ML_model=rf_model, past_data=data, future_data=future_df.copy(), future_days=365, feature_columns=['Time', 'Maturity', 'Rate_lag_1', 'Rate_lag_5', 'Rate_lag_20', 'Rate_rolling_mean_5', 'Rate_rolling_std_20', 'Partial_Derivative_Time', 'Partial_Derivative_Maturity'])
future_using_xgb = predict_future_rates(ML_model=xgb_model, past_data=data, future_data=future_df.copy(), future_days=365, feature_columns=['Time', 'Maturity', 'Rate_lag_1', 'Rate_lag_5', 'Rate_lag_20', 'Rate_rolling_mean_5', 'Rate_rolling_std_20', 'Partial_Derivative_Time', 'Partial_Derivative_Maturity'])

In [None]:
#Gives the different forecasts in the original excel format
clean_forecast_ridge = get_clean_forecasts(data, future_using_ridge)
print(clean_forecast_ridge)

clean_forecast_rf = get_clean_forecasts(data, future_using_rf)
print(clean_forecast_rf)

clean_forecast_xgb = get_clean_forecasts(data, future_using_xgb)
print(clean_forecast_xgb)

In [None]:
#Plots the yield curve forecasted by ridge model at different dates
plot_yield_curve_evolution(clean_forecast_ridge)

#Plot 3d surface of the forecast
plot_3d_yield_surface(clean_forecast_ridge)

#Plot the time series for specific maturities
plot_rate_time_series(clean_forecast_ridge)

In [None]:
#Plots the yield curve forecasted but random forest model at different dates
plot_yield_curve_evolution(clean_forecast_rf)

#Plot 3d surface of the forecast
plot_3d_yield_surface(clean_forecast_rf)

#Plot the time series for specific maturities
plot_rate_time_series(clean_forecast_rf)

In [None]:
#Plots the yield curve forecasted by xgb model at different dates
plot_yield_curve_evolution(clean_forecast_xgb)

#Plot 3d surface of the forecast
plot_3d_yield_surface(clean_forecast_xgb)

#Plot the time series for specific maturities
plot_rate_time_series(clean_forecast_xgb)