In [1]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt 
from arch import arch_model
from scipy.stats import norm
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import optuna
import logging
import warnings
import random
import networkx as nx
from caviar import *
logging.getLogger().setLevel(logging.WARNING)
optuna.logging.set_verbosity(optuna.logging.WARNING)
warnings.filterwarnings("ignore", category=FutureWarning)

# Setting random seeds
seed = 1
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

# Setting the CuDNN in deterministic mode
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# If multithreading or multiprocessing is used, make sure that these also use the same random seeds
torch.set_num_threads(1)

In [2]:
# Import data
data = pd.read_excel('estimate_data.xlsx')

# Exclude date columns
numeric_cols = data.columns.difference(['date'])

des_data=data.iloc[:, 1:21]
# Calculate descriptive statistics
desc_stats = des_data.describe()
# Calculate skewness and kurtosis
skewness = des_data.apply(skew)
kurtosis_values = des_data.apply(kurtosis)
# Calculate the Jarque-Bera statistic
jb_stats = des_data.apply(lambda x: jarque_bera(x)[0])
# Combine all statistics into a new DataFrame
all_stats = pd.concat([desc_stats, skewness, kurtosis_values, jb_stats], axis=1)
# Create a new DataFrame to store all the stats
stats_df = pd.DataFrame()
# Add columns for each statistic
stats_df = pd.DataFrame({
    'mean': desc_stats.loc['mean'],
    'std': desc_stats.loc['std'],
    'min': desc_stats.loc['min'],
    '25%': desc_stats.loc['25%'],
    '50%': desc_stats.loc['50%'],
    '75%': desc_stats.loc['75%'],
    'max': desc_stats.loc['max'],
    'skewness': skewness,
    'kurtosis': kurtosis_values,
    'Jarque-Bera': jb_stats
})
# Reset the index so that the variable name is a column
stats_df.reset_index(inplace=True)
stats_df.rename(columns={'index': 'Variable'}, inplace=True)
# Export results to excel
stats_df.to_excel('summary_sta.xlsx', index=False)


# Apply standardisation
scaler = StandardScaler()
data[numeric_cols] = scaler.fit_transform(data[numeric_cols])

# View Results
print(data.head())
# Convert ‘date_x’ to date format and rename to ‘date’
data['date'] = pd.to_datetime(data['date'])

vari_list = data.columns[1:]
print(vari_list)
print(data)

        date       AUD       EUR       GBP       BRL       CAD       CNY  \
0 2018-01-03  0.334863 -0.362698 -0.962935  0.667124 -0.495991 -0.530372   
1 2018-01-04  0.334211  0.656456  0.227988  0.509819  0.240629  0.542035   
2 2018-01-05  0.074953 -0.456648  0.306200 -0.001248  2.406126  0.239796   
3 2018-01-08 -0.403445 -1.234715  0.057436 -0.039856 -0.519592 -0.503176   
4 2018-01-09 -0.304518 -0.955438 -0.623592 -0.553009 -0.613347 -1.587930   

        INR       JPY       KRW  ...       ZAR       BTC       ETH       XRP  \
0 -0.145050 -0.129262 -0.635381  ...  0.888510  0.195459  1.586825  3.628685   
1  0.543494 -0.776700  0.317166  ...  0.283870  0.108466 -0.228345  0.292872   
2  0.150080 -0.612152  0.285386  ... -0.238303  2.589548  0.280511 -0.718810   
3 -0.440428  0.194247 -1.074947  ... -0.367659 -2.547256  3.853138 -0.502358   
4 -0.832109  0.857014 -0.421764  ...  0.066096 -0.799821  0.295780 -1.401472   

        BNB      DOGE       ADA        TRX       XLM      LINK

In [3]:
# Create dataframe to store results
var_df = pd.DataFrame()

# Cycle through the VaR of individual currency exchange rates
for col in vari_list:
    returns = data[col]*100
    caviar_model = CaviarModel(0.05,'asymmetric', 'RQ')
    caviar_model.fit(returns)
    returns_VaR_predicted = caviar_model.predict(returns)/100
    returns_VaR_fitted, returns_VaR_forecast = returns_VaR_predicted[:-1], returns_VaR_predicted[-1]
    var_df[col + '_VaR'] = returns_VaR_fitted
print(var_df)

Generating 1 best initial betas out of 1...
Optimizing...
when m = 1
Update 0: 25.1099239498436
Update 1: 11.062978099067212
Update 2: 11.062963595016875
Update 3: 11.062963595016875
Final loss: 11.062963595016875
Time taken(s): 1.37
Generating 1 best initial betas out of 1...
Optimizing...
when m = 1
Update 0: 35.61201491184071
Update 1: 10.78063549070473
Update 2: 10.78062869004228
Update 3: 10.780628686913094
Update 4: 10.780628686913094
Final loss: 10.780628686913094
Time taken(s): 1.34
Generating 1 best initial betas out of 1...
Optimizing...
when m = 1
Update 0: 35.65654983410919
Update 1: 10.686460813602059
Update 2: 10.686416470637313
Update 3: 10.686416470637313
Final loss: 10.686416470637313
Time taken(s): 1.90
Generating 1 best initial betas out of 1...
Optimizing...
when m = 1
Update 0: 12.345178653170079
Update 1: 10.69050069927374
Update 2: 10.504257345694162
Update 3: 10.50420327743734
Update 4: 10.504203277437341
Final loss: 10.50420327743734
Time taken(s): 2.26
Generat

In [4]:
data = pd.concat([data, var_df], axis=1)
data 

Unnamed: 0,date,AUD,EUR,GBP,BRL,CAD,CNY,INR,JPY,KRW,...,ZAR_VaR,BTC_VaR,ETH_VaR,XRP_VaR,BNB_VaR,DOGE_VaR,ADA_VaR,TRX_VaR,XLM_VaR,LINK_VaR
0,2018-01-03,0.334863,-0.362698,-0.962935,0.667124,-0.495991,-0.530372,-0.145050,-0.129262,-0.635381,...,-1.496603,-0.965267,-0.921753,-0.734290,-0.911929,-0.922160,-0.891838,-0.213160,-0.630663,-0.841068
1,2018-01-04,0.334211,0.656456,0.227988,0.509819,0.240629,0.542035,0.543494,-0.776700,0.317166,...,-1.483572,-0.935005,-1.066232,-1.079114,-1.024626,-0.850855,-1.215702,-0.847719,-1.228322,-0.786722
2,2018-01-05,0.074953,-0.456648,0.306200,-0.001248,2.406126,0.239796,0.150080,-0.612152,0.285386,...,-1.452218,-0.897536,-1.031376,-1.074541,-1.101181,-0.876938,-1.201077,-3.730683,-1.288684,-1.338418
3,2018-01-08,-0.403445,-1.234715,0.057436,-0.039856,-0.519592,-0.503176,-0.440428,0.194247,-1.074947,...,-1.435233,-1.117968,-0.984176,-1.070366,-2.199838,-1.409290,-1.361778,-3.403580,-1.292055,-1.395540
4,2018-01-09,-0.304518,-0.955438,-0.623592,-0.553009,-0.613347,-1.587930,-0.832109,0.857014,-0.421764,...,-1.430869,-1.393252,-1.440291,-1.057428,-2.494720,-1.713391,-1.503924,-4.612560,-1.280444,-1.943484
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1695,2024-10-17,0.951873,-0.738979,0.114249,-0.262937,-0.312019,-0.100826,-0.059821,-0.465327,-0.985952,...,-1.401500,-1.103762,-0.984157,-0.807831,-0.972969,-0.964342,-0.904629,-0.296820,-0.680892,-0.839340
1696,2024-10-18,-0.031487,0.371217,0.195929,-0.007935,-0.208785,1.029334,0.088750,0.689939,0.227609,...,-1.384633,-1.061084,-0.917953,-0.789106,-0.977637,-0.942883,-0.915365,-0.267755,-0.666286,-0.828467
1697,2024-10-21,-0.898234,-0.698279,-0.567894,-0.402437,-0.721574,-0.854682,-0.096920,-1.048106,-1.280341,...,-1.367547,-1.041412,-0.876093,-0.775332,-0.940681,-1.004704,-0.897413,-0.257154,-0.670018,-0.826495
1698,2024-10-22,0.437467,-0.300448,-0.145256,0.238558,0.304633,-0.186105,0.088737,-0.740440,0.018631,...,-1.363666,-1.034738,-0.825253,-0.754175,-0.893332,-1.028174,-0.890601,-0.237957,-0.657851,-0.794404


In [5]:
# Define rolling window functions
def create_sliding_windows(data, window_size=340, step_size=170):
    train_windows = []
    val_windows = []
    test_windows = []
    
    for i in range(0, len(data) - window_size + 1, step_size):
        window = data.iloc[i:i + window_size]
        train_windows.append(window.iloc[:130])
        val_windows.append(window.iloc[130:170])
        test_windows.append(window.iloc[170:])
    
    return train_windows, val_windows, test_windows

# Create training set, validation set and test set window
train_windows, val_windows, test_windows = create_sliding_windows(data, window_size=340, step_size=170)

# Print the number of windows
train_window_count = len(train_windows)
val_window_count = len(val_windows)
test_window_count = len(test_windows)

# Print the start and end positions of each window
train_window_start_end = [(window.index[0], window.index[-1]) for window in train_windows]
val_window_start_end = [(window.index[0], window.index[-1]) for window in val_windows]
test_window_start_end = [(window.index[0], window.index[-1]) for window in test_windows]

(train_window_count, val_window_count, test_window_count, train_window_start_end, val_window_start_end, test_window_start_end)

(9,
 9,
 9,
 [(0, 129),
  (170, 299),
  (340, 469),
  (510, 639),
  (680, 809),
  (850, 979),
  (1020, 1149),
  (1190, 1319),
  (1360, 1489)],
 [(130, 169),
  (300, 339),
  (470, 509),
  (640, 679),
  (810, 849),
  (980, 1019),
  (1150, 1189),
  (1320, 1359),
  (1490, 1529)],
 [(170, 339),
  (340, 509),
  (510, 679),
  (680, 849),
  (850, 1019),
  (1020, 1189),
  (1190, 1359),
  (1360, 1529),
  (1530, 1699)])

In [6]:
#Select CUDA or CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [7]:
class QuantileRegressionModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, dropout_prob):
        super(QuantileRegressionModel, self).__init__()
        # Define the hidden layer
        self.hidden_layers = nn.ModuleList([nn.Linear(input_dim, hidden_dim) if i == 0 else nn.Linear(hidden_dim, hidden_dim) for i in range(num_layers)])
        self.output_layer = nn.Linear(hidden_dim, output_dim) # Define the output layer
        self.activation = nn.ReLU()   # Use the ReLU activation function
        self.dropout = nn.Dropout(dropout_prob)  # Use Dropout to prevent overfitting
    
    def forward(self, x):
        for layer in self.hidden_layers:
            x = self.activation(layer(x))  # Activation function applied to each hidden layer
            x = self.dropout(x)  # Apply Dropout
        x = self.output_layer(x)  # Final output layer
        return x


# Define the objective function
def quantile_loss(y_true, y_pred, tau=0.05):
    diff = y_true - y_pred
    loss = torch.where(diff < 0, (tau - 1) * diff, tau * diff)
    return torch.mean(loss)

def combined_loss(y_true, y_pred, model, lambda1=0.001, lambda2=0.001):
    quantile = quantile_loss(y_true, y_pred)
    l1_reg = lambda1 * sum(torch.sum(torch.abs(param)) for param in model.parameters())
    l2_reg = lambda2 * sum(torch.sum(param**2) for param in model.parameters())
    return quantile + l1_reg + l2_reg

In [8]:
#1. initial setup
# List of names of currencies
currencies =  vari_list
best_model_state = {}  # Save the state and hyperparameters of the best model
VaR_values = {}  # Save VaR values for each currency at different time periods

# List of DataFrames storing the results of each currency's predictions
results_df_list = []
# Data set characteristics
input_dim = len(vari_list)-1
output_dim = 1

for currency in currencies:
    # Define the objective function for Bayesian optimisation
    def objective(trial):
        # Define the search space for hyperparameters here
        hidden_dim = trial.suggest_int('hidden_dim', 1, 100)
        num_layers = trial.suggest_int('num_layers', 1, 1)
        dropout_prob = trial.suggest_float('dropout_prob', 0.1, 0.6)
        learning_rate = trial.suggest_loguniform('learning_rate', 1e-6, 1e-1)
        lambda1 = trial.suggest_loguniform('lambda1', 1e-6, 1e-3)
        lambda2 = trial.suggest_loguniform('lambda2', 1e-6, 1e-3)

        # Create rolling windows
        train_windows, val_windows, test_windows = create_sliding_windows(data, window_size=340, step_size=170)

        val_losses = []
        test_losses = []
        all_predictions = []

        for train_window, val_window, test_window in zip(train_windows, val_windows, test_windows):
            model = QuantileRegressionModel(input_dim, hidden_dim, output_dim, num_layers, dropout_prob).to(device)
            optimizer = optim.Adam(model.parameters(), lr=learning_rate)

            # Training and validation sets
            Y_col = currency
            X_cols = [c for c in currencies if c != currency]
            train_X = torch.tensor(train_window[X_cols].values, dtype=torch.float32).to(device)
            train_Y = torch.tensor(train_window[Y_col].values, dtype=torch.float32).view(-1, 1).to(device)
            val_X = torch.tensor(val_window[X_cols].values, dtype=torch.float32).to(device)
            val_Y = torch.tensor(val_window[Y_col].values, dtype=torch.float32).view(-1, 1).to(device)

            # Test sets
            test_X_cols = [f'{c}_VaR' for c in currencies if c != currency]
            test_Y_col = f'{currency}_VaR'
            test_X = torch.tensor(test_window[test_X_cols].values, dtype=torch.float32).to(device)
            test_Y = torch.tensor(test_window[test_Y_col].values, dtype=torch.float32).view(-1, 1).to(device)

            model.train()
            for epoch in range(70):
                optimizer.zero_grad()
                predictions = model(train_X)
                loss = combined_loss(train_Y, predictions, model, lambda1, lambda2)
                loss.backward()
                optimizer.step()

            model.eval()
            with torch.no_grad():
                val_predictions = model(val_X)
                val_loss = combined_loss(val_Y, val_predictions, model, lambda1, lambda2)
                val_losses.append(val_loss.item())

                test_predictions = model(test_X)
                test_loss = combined_loss(test_Y, test_predictions, model, lambda1, lambda2)
                test_losses.append(test_loss.item())
                test_predictions_array = test_predictions.detach().cpu().numpy()
                all_predictions.append(test_predictions_array)

        # Save states and hyperparameters of the best model
        best_model_state[currency] = {
            'state_dict': model.state_dict(),
            'params': {
                'hidden_dim': hidden_dim,
                'num_layers': num_layers,
                'dropout_prob': dropout_prob,
                'learning_rate': learning_rate,
                'lambda1': lambda1,
                'lambda2': lambda2
            }
        }
        VaR_values[currency] = test_Y.cpu().numpy()  # Assuming test_Y is the VaR value

        # Return average verification loss
        return sum(val_losses) / len(val_losses)

    # Create an Optuna study object for hyperparameter searching
    study = optuna.create_study(direction='minimize')
    study.optimize(objective, n_trials=50)  # Make 50 attempts

    # Output the optimal result
    print(f"Best parameters for {currency}: {study.best_params}")
    print(f"Best validation loss for {currency}: {study.best_value}")

    # Retrain the model using the found optimal hyperparameters
    best_params = study.best_params

    # Create rolling windows
    train_windows, val_windows, test_windows = create_sliding_windows(data, window_size=340, step_size=170)

    val_losses = []
    test_losses = []
    all_predictions = []

    for train_window, val_window, test_window in zip(train_windows, val_windows, test_windows):
        # Model instantiation
        model = QuantileRegressionModel(input_dim, best_params['hidden_dim'], output_dim, best_params['num_layers'], best_params['dropout_prob']).to(device)
        optimizer = optim.Adam(model.parameters(), lr=best_params['learning_rate'])

        # Extract X and Y for training, validation and test sets
        Y_col = currency
        X_cols = [c for c in currencies if c != currency]
        train_X = torch.tensor(train_window[X_cols].values, dtype=torch.float32).to(device)
        train_Y = torch.tensor(train_window[Y_col].values, dtype=torch.float32).view(-1, 1).to(device)
        val_X = torch.tensor(val_window[X_cols].values, dtype=torch.float32).to(device)
        val_Y = torch.tensor(val_window[Y_col].values, dtype=torch.float32).view(-1, 1).to(device)
        test_X_cols = [f'{c}_VaR' for c in currencies if c != currency]
        test_Y_col = f'{currency}_VaR'
        test_X = torch.tensor(test_window[test_X_cols].values, dtype=torch.float32).to(device)
        test_Y = torch.tensor(test_window[test_Y_col].values, dtype=torch.float32).view(-1, 1).to(device)

        # Training models
        model.train()
        for epoch in range(70):  # 70 rounds of training
            optimizer.zero_grad()
            predictions = model(train_X)
            loss = combined_loss(train_Y, predictions, model, best_params['lambda1'], best_params['lambda2'])
            loss.backward()
            optimizer.step()
          

        # Validate the model
        model.eval()
        with torch.no_grad():
            val_predictions = model(val_X)
            val_loss = combined_loss(val_Y, val_predictions, model, best_params['lambda1'], best_params['lambda2'])
            val_losses.append(val_loss.item())

            # Test model performance
            test_predictions = model(test_X)
            test_loss = combined_loss(test_Y, test_predictions, model, best_params['lambda1'], best_params['lambda2'])
            test_losses.append(test_loss.item())
            test_predictions_array = test_predictions.detach().cpu().numpy()
            all_predictions.append(test_predictions_array)

    # Use np.concatenate to stitch together all predictions
    all_predictions_combined = np.concatenate(all_predictions, axis=0)

    # Print the spliced result and its shape
    print(f"All predictions for {currency}:", all_predictions_combined)
    print(f"Shape of all_predictions_combined for {currency}:", all_predictions_combined.shape)

    # Output results
    best_model_state[currency]['val_losses_nn'] = val_losses
    best_model_state[currency]['test_losses_nn'] = test_losses
    print(f"Average Validation Loss for {currency}: {sum(val_losses) / len(val_losses)}")
    print(f"Average Test Loss for {currency}: {sum(test_losses) / len(test_losses)}")

        # Create lists storing predicted values and dates
    CoVaR_list = []
    dates_list = []

    # Extract predicted values and dates for each test window
    for test_window, test_predictions in zip(test_windows, all_predictions):
        dates = test_window['date']  # 获取当前窗口的日期
        dates_list.extend(dates)
        CoVaR_list.extend(test_predictions.flatten())  # Expand and add predictions to the list

    # Print a list of dates and their lengths
    print(f"Length of dates_list for {currency}:", len(dates_list))
    print(f"Length of CoVaR_list for {currency}:", len(CoVaR_list))

    # Create a DataFrame with dates and projected values
    CoVaR_df = pd.DataFrame({
        'date': dates_list,
        f'{currency}_CoVaR': CoVaR_list
    })

    # :: Ensure consistency in date format of raw and projected data
    CoVaR_df['date'] = pd.to_datetime(CoVaR_df['date'])

    # Add results to results list
    results_df_list.append(CoVaR_df)

# Merge all company forecasts into the original dataset
for result_df in results_df_list:
    data = pd.merge(data, result_df, on='date', how='left')

# View the merged dataset
data = data.dropna()


Best parameters for AUD: {'hidden_dim': 13, 'num_layers': 1, 'dropout_prob': 0.5963102318903262, 'learning_rate': 0.037551726250202234, 'lambda1': 1.0562885651388624e-06, 'lambda2': 2.633742007580341e-05}
Best validation loss for AUD: 0.0625670241812865
All predictions for AUD: [[-2.3660636]
 [-2.3406763]
 [-2.2862048]
 ...
 [-2.795355 ]
 [-2.862184 ]
 [-2.7913663]]
Shape of all_predictions_combined for AUD: (1530, 1)
Average Validation Loss for AUD: 0.06434721044368213
Average Test Loss for AUD: 0.13548284148176512
Length of dates_list for AUD: 1530
Length of CoVaR_list for AUD: 1530
Best parameters for EUR: {'hidden_dim': 5, 'num_layers': 1, 'dropout_prob': 0.5168687055994572, 'learning_rate': 0.021302264902341942, 'lambda1': 1.3703793972945754e-05, 'lambda2': 3.6158297689014615e-05}
Best validation loss for EUR: 0.06782035446829265
All predictions for EUR: [[-3.2522812]
 [-3.090265 ]
 [-3.0561602]
 ...
 [-2.1219125]
 [-2.1490436]
 [-2.1201363]]
Shape of all_predictions_combined for 

In [9]:
# Define the linear regression model
class LinearRegressionModel(nn.Module):
    def __init__(self, input_dim):
        super(LinearRegressionModel, self).__init__()
        self.linear = nn.Linear(input_dim, 1)

    def forward(self, x):
        return self.linear(x)

input=len(vari_list)-1
for currency in currencies:
    # Create rolling windows
    train_windows, val_windows, test_windows = create_sliding_windows(data, window_size=340, step_size=170)
    
    val_losses_lr = []
    test_losses_lr = []
    all_predictions = []
    for train_window, val_window, test_window in zip(train_windows, val_windows, test_windows):
        # Instantiated models
        model = LinearRegressionModel(input)
        optimizer = optim.Adam(model.parameters(), lr=0.01)
        # Extract X and Y for training, validation and test sets
        Y_col = currency
        X_cols = [c for c in currencies if c != currency]
        train_X = torch.tensor(train_window[X_cols].values, dtype=torch.float32).to(device)
        train_Y = torch.tensor(train_window[Y_col].values, dtype=torch.float32).view(-1, 1).to(device)
        val_X = torch.tensor(val_window[X_cols].values, dtype=torch.float32).to(device)
        val_Y = torch.tensor(val_window[Y_col].values, dtype=torch.float32).view(-1, 1).to(device)
        test_X_cols = [f'{c}_VaR' for c in currencies if c != currency]
        test_Y_col = f'{currency}_VaR'
        test_X = torch.tensor(test_window[test_X_cols].values, dtype=torch.float32).to(device)
        test_Y = torch.tensor(test_window[test_Y_col].values, dtype=torch.float32).view(-1, 1).to(device)

        # Training models
        model.train()
        for epoch in range(70):  # 100 rounds of training
            optimizer.zero_grad()
            predictions = model(train_X)
            loss = combined_loss(train_Y, predictions, model,lambda1=0.001, lambda2=0.001)
            loss.backward()
            optimizer.step()
        # Validate the model
        model.eval()
        with torch.no_grad():
            val_predictions = model(val_X)
            val_loss_lr = combined_loss(val_Y, val_predictions, model,lambda1=0.001, lambda2=0.001)
            val_losses_lr.append(val_loss_lr.item())
            

            # Test model performance
            test_predictions = model(test_X)
            test_loss_lr = combined_loss(test_Y, test_predictions, model,lambda1=0.001, lambda2=0.001)
            test_losses_lr.append(test_loss_lr.item())
            test_predictions_array = test_predictions.detach().cpu().numpy()
            all_predictions.append(test_predictions_array)

    # Use np.concatenate to stitch together all predictions
    all_predictions_combined = np.concatenate(all_predictions, axis=0)
    best_model_state[currency]['val_losses_lr'] = val_losses_lr
    best_model_state[currency]['test_losses_lr'] = test_losses_lr


In [35]:
def diebold_mariano_test(errors_nn, errors_lr):
    # Ensure that both error arrays have the same length
    min_len = min(len(errors_nn), len(errors_lr))
    errors_nn = errors_nn[:min_len]
    errors_lr = errors_lr[:min_len]
    errors_nn = np.array(errors_nn)
    errors_lr = np.array(errors_lr)

    # Calculate the average error
    mean_nn = np.mean(errors_nn)
    mean_lr = np.mean(errors_lr)
    
    # Differences in calculation errors
    error_diff = errors_nn - errors_lr
    
    # Calculate the variance of the difference (using the whole series)
    var_diff = np.var(error_diff, ddof=1)
    
    # Calculate DM statistics
    DM_statistic = (mean_nn - mean_lr) / np.sqrt(var_diff / min_len)
    
    # Calculate the p-value
    p_value = 2 * (1 - norm.cdf(abs(DM_statistic)))
    
    return DM_statistic, p_value

DM_results = {}
for currency in currencies:
    DM_statistic, p_value = diebold_mariano_test( best_model_state[currency]['val_losses_nn'], best_model_state[currency]['val_losses_lr'])
    DM_results[currency] = {'DM Statistic': DM_statistic, 'p-value': p_value}

df_DM_results = pd.DataFrame.from_dict(DM_results, orient='index')

# Rename columns
df_DM_results.columns = ['DM Statistic', 'p-value']

# Add the currency name as the first column to the data frame
df_DM_results.reset_index(inplace=True)
df_DM_results.rename(columns={'index': 'currencies'}, inplace=True)
df_DM_results = df_DM_results.T
df_DM_results.to_excel('DM_test_95.xlsx', index=False)

In [39]:
# Define functions to calculate risk spillovers
def calculate_spillover_effects(j, currency, model_state, VaR_values):
    # Extract model weights
    best_params = model_state[currency]['params']
    model = QuantileRegressionModel(input_dim, best_params['hidden_dim'], output_dim, best_params['num_layers'], best_params['dropout_prob']).to(device)
    model.load_state_dict(model_state[currency]['state_dict'])
    
    # Get linear layer weights for the model
    linear_weights = np.array(model.output_layer.weight.data.cpu().numpy())

    # Get intermediate layer weights and biases for the model
    hidden_weights = [layer.weight.data.cpu().numpy() for layer in model.hidden_layers]
    hidden_biases = [layer.bias.data.cpu().numpy() for layer in model.hidden_layers]

    # Initialise arrays storing risk spillage effects
    effects = [0] * len(currencies)

    for i, target in enumerate (currencies):
        if i<j :
            effect = 0
            for m in range(len(hidden_weights[0])):
                wh_m = np.array(hidden_weights[0][m,])
                bh_m = np.array(hidden_biases[0][m,])
                VaRvector = np.array([VaR_values[key] for key in VaR_values if key != target])
                input_sum = np.dot(wh_m, VaRvector)+bh_m
                if input_sum.all() > 0:
                    effect += linear_weights[0,m]*input_sum*wh_m[i]
                effect = np.abs(effect)
            effects[i] = effect
        else:
            if i > j:
                effect = 0
                p = i-1
                for m in range(len(hidden_weights[0])):
                    wh_m = np.array(hidden_weights[0][m,])
                    bh_m = np.array(hidden_biases[0][m,])
                    VaRvector = np.array([VaR_values[key] for key in VaR_values if key != target])
                    input_sum = np.dot(wh_m, VaRvector)+bh_m
                    if input_sum.all() > 0:
                        effect += linear_weights[0,m]*input_sum*wh_m[p]
                    effect = np.abs(effect)
                effects[i] = effect
            else:
                effects[i] = 0

    return effects

# Store risk spillover matrix at all points in time
spillover_matrix = []

# Traversing each point in time
for t in range(len(data)):
    VaR_values_t = {currency: data.iloc[t][f'{currency}_VaR'] for currency in currencies}
    spillover_matrix_t = np.zeros((len(currencies), len(currencies)))

    for j, target_currency in enumerate(currencies):
        spillover_matrix_t[j] = calculate_spillover_effects(j, target_currency, best_model_state, VaR_values_t)
    
    spillover_matrix.append(spillover_matrix_t)
    print(t)


0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
27

In [40]:
# Export CoVaR estimation results
data.to_csv('outputdata.csv', index=False)

# Export the estimation of the adjacency matrix
spillover_matrix=np.array(spillover_matrix)
np.save('spillover_matrix',spillover_matrix) 
