### Initial package imports

In [None]:
import pandas as pd
import scipy.stats as st
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from typing import Dict, List, Union
import random
import warnings

# Suppress specific warnings if needed (e.g., from GARCH optimization)
warnings.filterwarnings('ignore', category=RuntimeWarning)
warnings.filterwarnings('ignore', category=FutureWarning)

## Data Loading and Initial Preparation

In [None]:
def load_and_prepare_data(indices_path='Data/Cleaned_Indices_Assignment1.csv', 
                          interest_rate_path='Data/ECB_Data_10yr_Treasury_bond.csv',
                          start_date_str='2012-01-04'):
    """Loads, merges, cleans, and prepares the initial DataFrame."""
    pd.set_option('display.float_format', '{:.6f}'.format)
    # Read the data
    main_df = pd.read_csv(indices_path, sep=';')
    interest_rate_bond_df = pd.read_csv(interest_rate_path, sep=',')

    # Convert date columns to datetime format
    main_df['Date'] = pd.to_datetime(main_df['Date'], format='%d-%m-%Y')
    interest_rate_bond_df['Date'] = pd.to_datetime(interest_rate_bond_df['Date'], format='%Y-%m-%d')

    # Merge dataframes
    main_df = pd.merge(main_df, interest_rate_bond_df, on='Date', how='left')

    # Clean data
    main_df = main_df.dropna(axis=0, subset=['Yield curve spot rate, 10-year maturity - Government bond'])
    main_df = main_df[main_df['Date'] >= start_date_str]

    # Set Date as index and sort
    main_df = main_df.set_index('Date')
    main_df = main_df.sort_index()
    
    # Calculate initial returns (needed before portfolio metrics)
    for col in ['S&P500_Closing', 'Dax40_Closing', 'Nikkei_Closing']:
        main_df[f'C_{col.replace("_Closing", "")}_Returns'] = main_df[col].pct_change()
        # Fill the first NaN return with 0
        main_df[f'C_{col.replace("_Closing", "")}_Returns'].iloc[0] = 0.0
        
    return main_df

# Load data initially
main_df_initial = load_and_prepare_data()

## Portfolio Configuration

In [None]:
# Portfolio weights and initial investment
weights_dict = {
    'S&P500': 0.4,
    'DAX40': 0.3,
    'NIKKEI': 0.15,
    'EU-BOND': 0.15,
}
weights_array = np.array([weights_dict['S&P500'], weights_dict['DAX40'], weights_dict['NIKKEI'], weights_dict['EU-BOND']])

starting_investment = 10000000  # 10 million euros
interest_bond_initial = starting_investment * weights_dict['EU-BOND'] # Initial bond value
start_date = pd.to_datetime('2012-01-04')

def get_initial_invested_amounts(df, starting_investment, weights_dict, start_date):
    """Calculates initial amounts invested in each asset's currency."""
    starting_row = df.loc[start_date]
    usd_to_eur = float(starting_row['USD/EUR'])
    jpy_to_eur = float(starting_row['JPY/EUR'])
    
    invested_amount_SP500 = starting_investment * weights_dict['S&P500'] / usd_to_eur
    invested_amount_DAX40 = starting_investment * weights_dict['DAX40']
    invested_amount_NIKKEI = starting_investment * weights_dict['NIKKEI'] / jpy_to_eur
    invested_amount_EU_BOND = starting_investment * weights_dict['EU-BOND']
    
    return [invested_amount_SP500, invested_amount_DAX40, invested_amount_NIKKEI, invested_amount_EU_BOND]

invested_amounts_initial = get_initial_invested_amounts(main_df_initial, starting_investment, weights_dict, start_date)
print("Initial Invested Amounts (Local Currency):")
print(invested_amounts_initial)

## Function Definitions

### Portfolio Calculation Functions

In [None]:
def calculate_bond_metrics(df, interest_bond_initial):
    """Calculates daily bond value, profit/loss, and daily rate."""
    df_copy = df.copy() # Work on a copy
    days_per_annum = 365
    
    interest_bond_vector = np.zeros(len(df_copy))
    interest_bond_profit_vector = np.zeros(len(df_copy))
    interest_bond_loss_vector = np.zeros(len(df_copy))
    daily_rates = np.zeros(len(df_copy))
    
    # Find the index corresponding to the start date
    start_idx_loc = df_copy.index.get_loc(start_date)
    
    # Set initial value at the correct starting index
    if start_idx_loc < len(df_copy):
         interest_bond_vector[start_idx_loc] = interest_bond_initial
    else:
         print("Warning: Start date not found in DataFrame index for bond calculation.")
         return df_copy # Return original if start date is wrong

    # Calculate bond values day by day
    for i in range(len(df_copy)):
        # Adding 1.5% credit risk spread, adjust rate for weekends/holidays (7/5)
        daily_rate = (((df_copy['Yield curve spot rate, 10-year maturity - Government bond'].iloc[i] + 1.5) / days_per_annum) * (7/5)) / 100
        daily_rates[i] = daily_rate
        
        # Calculate current value based on previous day, starting from day after start_date
        if i > start_idx_loc:
            previous_value = interest_bond_vector[i-1]
            current_value = previous_value * (1 + daily_rate)
            interest_bond_vector[i] = current_value
            
            change = current_value - previous_value
            interest_bond_profit_vector[i] = change
            interest_bond_loss_vector[i] = -change
        elif i < start_idx_loc:
             # Handle days before the start date if necessary (e.g., set to NaN or 0)
             interest_bond_vector[i] = np.nan # Or 0, depending on desired behavior
             interest_bond_profit_vector[i] = np.nan
             interest_bond_loss_vector[i] = np.nan
             daily_rates[i] = np.nan
             
    # Add vectors to the dataframe
    df_copy['Interest_Bond'] = interest_bond_vector
    df_copy['Interest_Bond_Profit'] = interest_bond_profit_vector
    df_copy['Interest_Bond_Loss'] = interest_bond_loss_vector
    df_copy['Interest_Bond_daily_rate'] = daily_rates
    
    # Fill NaNs before start date if they exist
    df_copy.fillna(method='bfill', inplace=True) # Backfill might be appropriate here
    df_copy.fillna(0.0, inplace=True) # Fill any remaining NaNs with 0
    
    return df_copy

In [None]:
def calculate_investment_values(df, invested_amounts, start_date):
    """Calculates the daily value of each equity investment based on returns."""
    df_copy = df.copy()
    invested_amount_SP500, invested_amount_DAX40, invested_amount_NIKKEI, _ = invested_amounts
    
    # Initialize columns
    df_copy['SP500_Investment'] = np.nan
    df_copy['DAX40_Investment'] = np.nan
    df_copy['NIKKEI_Investment'] = np.nan

    # Set initial investment values at the start date
    if start_date in df_copy.index:
        df_copy.loc[start_date, 'SP500_Investment'] = invested_amount_SP500
        df_copy.loc[start_date, 'DAX40_Investment'] = invested_amount_DAX40
        df_copy.loc[start_date, 'NIKKEI_Investment'] = invested_amount_NIKKEI
    else:
        print("Warning: Start date not found for setting initial investment values.")
        return df_copy

    # Calculate daily investment values using pct_change (returns)
    # Ensure returns columns exist and handle potential NaNs
    equity_returns_cols = ['C_S&P500_Returns', 'C_Dax40_Returns', 'C_Nikkei_Returns']
    investment_cols = ['SP500_Investment', 'DAX40_Investment', 'NIKKEI_Investment']
    
    for ret_col, inv_col in zip(equity_returns_cols, investment_cols):
        if ret_col not in df_copy.columns:
             print(f"Warning: Return column {ret_col} not found. Skipping {inv_col}.")
             continue
             
        # Forward fill initial investment value to allow calculation
        # df_copy[inv_col] = df_copy[inv_col].ffill() # Careful with ffill if data gaps exist
        
        # Calculate using cumulative product of (1 + return)
        # Find the index location of the start date
        start_idx_loc = df_copy.index.get_loc(start_date)
        initial_value = df_copy.loc[start_date, inv_col]
        
        # Calculate cumulative returns factor starting from the day after start_date
        cumulative_factor = (1 + df_copy[ret_col].iloc[start_idx_loc+1:]).cumprod()
        
        # Apply the cumulative factor to the initial investment value
        df_copy.loc[df_copy.index[start_idx_loc+1:], inv_col] = initial_value * cumulative_factor
        
        # Handle potential NaNs if returns had NaNs
        df_copy[inv_col].fillna(method='ffill', inplace=True) # Forward fill gaps
        df_copy[inv_col].fillna(0.0, inplace=True) # Fill remaining NaNs
        
    # EU Government Bond value is taken from the 'Interest_Bond' column
    if 'Interest_Bond' in df_copy.columns:
        df_copy['EU_BOND_Investment'] = df_copy['Interest_Bond']
    else:
        print("Warning: 'Interest_Bond' column not found for EU_BOND_Investment.")
        df_copy['EU_BOND_Investment'] = np.nan
        df_copy['EU_BOND_Investment'].fillna(method='ffill', inplace=True)
        df_copy['EU_BOND_Investment'].fillna(0.0, inplace=True)
        
    return df_copy

In [None]:
def calculate_portfolio_metrics(df, starting_investment, start_date):
    """Calculates total portfolio value, daily change, loss, and returns."""
    df_copy = df.copy()
    
    # Ensure required columns exist, fillna just in case
    required_cols = ['SP500_Investment', 'USD/EUR', 'DAX40_Investment', 
                     'NIKKEI_Investment', 'JPY/EUR', 'EU_BOND_Investment']
    for col in required_cols:
        if col not in df_copy.columns:
             print(f"Warning: Required column {col} missing for portfolio metrics.")
             df_copy[col] = 0.0 # Assign default value or handle appropriately
        else:
             df_copy[col] = df_copy[col].fillna(0.0)
             
    # Calculate total portfolio value in EUR
    df_copy['Portfolio_Value_EUR'] = (
        df_copy['SP500_Investment'] * df_copy['USD/EUR'] +
        df_copy['DAX40_Investment'] +
        df_copy['NIKKEI_Investment'] * df_copy['JPY/EUR'] +
        df_copy['EU_BOND_Investment']
    )

    # Set the first day's value to the initial investment
    if start_date in df_copy.index:
        df_copy.loc[start_date, 'Portfolio_Value_EUR'] = starting_investment
    
    # Calculate daily change, loss, and returns
    df_copy['Portfolio_Change_EUR'] = df_copy['Portfolio_Value_EUR'].diff()
    df_copy['Portfolio_loss'] = -df_copy['Portfolio_Change_EUR']
    df_copy['Portfolio_Daily_Returns'] = df_copy['Portfolio_Value_EUR'].pct_change()

    # Set the first day's change, loss, and return to 0
    if start_date in df_copy.index:
        df_copy.loc[start_date, ['Portfolio_Change_EUR', 'Portfolio_loss', 'Portfolio_Daily_Returns']] = 0.0
        
    # Fill any NaNs created by diff/pct_change (e.g., first row)
    df_copy.fillna({'Portfolio_Change_EUR': 0.0, 'Portfolio_loss': 0.0, 'Portfolio_Daily_Returns': 0.0}, inplace=True)
    
    return df_copy

In [None]:
def recalculate_dependent_columns(df, starting_investment, weights_dict, start_date, interest_bond_initial):
    """Recalculates all dependent columns after a stress event."""
    df_recalc = df.copy()
    
    # 1. Recalculate Returns based on potentially stressed closing prices
    print("Recalculating returns...")
    for col in ['S&P500_Closing', 'Dax40_Closing', 'Nikkei_Closing']:
        return_col = f'C_{col.replace("_Closing", "")}_Returns'
        if col in df_recalc.columns:
            df_recalc[return_col] = df_recalc[col].pct_change()
            # Handle the first NaN value
            if not df_recalc.empty:
                 df_recalc[return_col].iloc[0] = 0.0
        else:
            print(f"Warning: Column {col} not found for return recalculation.")
            
    # 2. Recalculate Bond Metrics based on potentially stressed yield
    print("Recalculating bond metrics...")
    df_recalc = calculate_bond_metrics(df_recalc, interest_bond_initial)
    
    # 3. Recalculate Investment Values based on new returns and bond value
    print("Recalculating investment values...")
    # Need initial invested amounts in local currency
    invested_amounts = get_initial_invested_amounts(df_recalc, starting_investment, weights_dict, start_date)
    df_recalc = calculate_investment_values(df_recalc, invested_amounts, start_date)
    
    # 4. Recalculate Portfolio Metrics based on new investment values and exchange rates
    print("Recalculating portfolio metrics...")
    df_recalc = calculate_portfolio_metrics(df_recalc, starting_investment, start_date)
    
    print("Recalculation complete.")
    return df_recalc

### VaR and ES Calculation Functions

In [None]:
def VaR(alpha, r= 0, s= 1, df= 0):
    """
    Get the VaR of the normal or student-t model.
    Assumes VaR is for LOSSES (positive value).
    """
    if (df == 0):
        # Normal distribution: VaR = mu + sigma * Z_alpha
        # Since we model losses, VaR = E[Loss] + std(Loss) * Z_alpha
        dVaR0 = st.norm.ppf(alpha)
        dVaR = r + s*dVaR0
    else:
        # Student-t distribution
        dVaR0 = st.t.ppf(alpha, df= df)
        # Scale factor to match volatility
        dS2t = df/(df-2) # Variance of standard t-distribution
        if dS2t <= 0: # Handle df <= 2
             return np.nan
        c = s / np.sqrt(dS2t)
        dVaR = r + c*dVaR0
    return dVaR

In [None]:
def ES(alpha, r= 0, s= 1, df= 0):
    """
    Get the ES of the normal/student model for LOSSES.
    """
    if (df == 0):
        # Normal distribution: ES = mu + sigma * pdf(Z_alpha) / (1-alpha)
        dVaR0 = st.norm.ppf(alpha)
        dES0 = st.norm.pdf(dVaR0) / (1-alpha)
        dES = r + s*dES0
    else:
        # Student-t distribution
        dVaR0 = st.t.ppf(alpha, df= df)
        # ES formula for t-distribution
        if df <= 1: # ES not defined for df=1
             return np.nan
        dES0 = st.t.pdf(dVaR0, df= df)*((df + dVaR0**2)/(df-1)) / (1-alpha)
        # Scale factor
        if df <= 2: # Variance not defined for df<=2
             return np.nan
        dS2t = df/(df-2)
        c = s / np.sqrt(dS2t)
        dES = r + c*dES0
    return dES

### Risk Calculation Method Functions (Historical, Var/Cov, Multi-day)

In [None]:
def calculate_daily_loss_variables(time_window, current_date):
    # Calculate the mean and standard deviation of portfolio loss from the time windows
    loss_dict = {
        "Date": current_date,
        "Portfolio_mean_loss": np.nanmean(time_window['Portfolio_loss']),
        "Portfolio_std_loss": np.nanstd(time_window['Portfolio_loss'])
    }
    return loss_dict

In [None]:
def calculate_var_cov(window, current_date, vAlpha, mean_loss, portfolio_std_loss, df=0):
    """
    Calculate Value at Risk and ES using variance-covariance method.
    Returns VaR/ES for LOSSES.
    """
    var_results = []
    es_results = []
    for alpha in vAlpha:
        var_results.append(VaR(alpha, mean_loss, portfolio_std_loss, df=df))
        es_results.append(ES(alpha, mean_loss, portfolio_std_loss, df=df))
    
    # Set label for distribution type
    if df == 0:
        dist_label = "Normal"
    else:
        dist_label = f"T{df}"
        
    return {
        'Date': current_date,
        f'VaR {dist_label}': np.array(var_results),
        f'ES {dist_label}': np.array(es_results)
    }

def calculate_historical_var_es(window, current_date, vAlpha):
    """
    Calculate VaR and ES using historical simulation method for LOSSES.
    """
    # Extract portfolio loss values from the window
    historical_losses = window['Portfolio_loss'].dropna()
    
    if len(historical_losses) == 0:
         return {
            'Date': current_date,
            'VaR Historical': np.full(len(vAlpha), np.nan),
            'ES Historical': np.full(len(vAlpha), np.nan)
        }
        
    # Sort losses in ascending order (higher losses are larger positive numbers)
    sorted_losses = np.sort(historical_losses)
    
    # Calculate VaR for alpha levels
    var_hist = np.percentile(sorted_losses, vAlpha * 100)
    
    # Calculate ES for each alpha level
    es_hist = []
    for i, alpha in enumerate(vAlpha):
        # ES is the mean of losses greater than or equal to VaR
        losses_above_var = sorted_losses[sorted_losses >= var_hist[i]]
        es_val = losses_above_var.mean() if len(losses_above_var) > 0 else np.nan # Handle case where no losses >= VaR
        es_hist.append(es_val)
    
    return {
        'Date': current_date,
        'VaR Historical': np.array(var_hist),
        'ES Historical': np.array(es_hist)
    }

def calculate_multiday_risk(main_df_indexed, vAlpha, interval, sample_size):
    """
    Calculate multi-day VaR and ES using the historical simulation method.
    Returns VaR/ES for LOSSES.
    
    Parameters:
    - main_df_indexed: DataFrame with DatetimeIndex
    - vAlpha: Confidence levels (array)
    - interval: Number of days for the multi-day calculation (e.g., 5 or 10)
    - sample_size: Rolling window size for daily VaR calculation (used for sqrt rule)
    
    Returns:
    - var_multi_df, es_multi_df: DataFrames with multi-day VaR and ES
    """
    if sample_size >= len(main_df_indexed):
        print(f"Warning: sample_size ({sample_size}) >= data length ({len(main_df_indexed)}). Cannot calculate multi-day risk.")
        empty_df = pd.DataFrame(index=main_df_indexed.index)
        for alpha in vAlpha:
             empty_df[f'VaR_{interval}d_Hist_Reg_{int(alpha*100)}'] = np.nan
             empty_df[f'VaR_{interval}d_Hist_Sqrt_{int(alpha*100)}'] = np.nan
             empty_df[f'ES_{interval}d_Hist_Reg_{int(alpha*100)}'] = np.nan
             empty_df[f'ES_{interval}d_Hist_Sqrt_{int(alpha*100)}'] = np.nan
        empty_df[f'Actual_Loss_{interval}d'] = np.nan
        return empty_df, empty_df.copy()
        
    # Filter data for the period we want to analyze (excluding initial sample)
    analysis_start_date = main_df_indexed.index[sample_size]
    time_window_multi = main_df_indexed[main_df_indexed.index >= analysis_start_date].copy()
    
    # Calculate rolling sum of losses over the interval
    loss_col = f'Portfolio_loss_{interval}d'
    time_window_multi[loss_col] = time_window_multi['Portfolio_loss'].rolling(window=interval).sum()
    
    # Drop NaNs created by rolling sum
    multi_day_losses_df = time_window_multi.dropna(subset=[loss_col]).copy()
    
    if multi_day_losses_df.empty:
        print(f"Warning: No valid {interval}-day losses found after rolling sum and dropna.")
        # Return empty DataFrames with expected columns
        empty_df = pd.DataFrame(index=main_df_indexed.index[sample_size:])
        for alpha in vAlpha:
             empty_df[f'VaR_{interval}d_Hist_Reg_{int(alpha*100)}'] = np.nan
             empty_df[f'VaR_{interval}d_Hist_Sqrt_{int(alpha*100)}'] = np.nan
             empty_df[f'ES_{interval}d_Hist_Reg_{int(alpha*100)}'] = np.nan
             empty_df[f'ES_{interval}d_Hist_Sqrt_{int(alpha*100)}'] = np.nan
        empty_df[f'Actual_Loss_{interval}d'] = np.nan
        return empty_df, empty_df.copy()
        
    # --- Historical Multi-Day VaR/ES (Regular Method) ---
    var_reg_list = []
    es_reg_list = []
    # Use expanding window for multi-day historical simulation
    for i in range(1, len(multi_day_losses_df) + 1):
        current_losses = multi_day_losses_df[loss_col].iloc[:i]
        if current_losses.empty:
             var_vals = np.full(len(vAlpha), np.nan)
             es_vals = np.full(len(vAlpha), np.nan)
        else:
            sorted_losses = np.sort(current_losses.dropna())
            if len(sorted_losses) == 0:
                 var_vals = np.full(len(vAlpha), np.nan)
                 es_vals = np.full(len(vAlpha), np.nan)
            else:
                var_vals = np.percentile(sorted_losses, vAlpha * 100)
                es_vals = []
                for j, alpha in enumerate(vAlpha):
                    losses_above_var = sorted_losses[sorted_losses >= var_vals[j]]
                    es_val = losses_above_var.mean() if len(losses_above_var) > 0 else np.nan
                    es_vals.append(es_val)
        var_reg_list.append(var_vals)
        es_reg_list.append(es_vals)
        
    # Add results to DataFrame
    for k, alpha in enumerate(vAlpha):
        multi_day_losses_df[f'VaR_{interval}d_Hist_Reg_{int(alpha*100)}'] = [res[k] for res in var_reg_list]
        multi_day_losses_df[f'ES_{interval}d_Hist_Reg_{int(alpha*100)}'] = [res[k] for res in es_reg_list]

    # --- Historical Multi-Day VaR/ES (Sqrt Rule) ---
    # Calculate rolling 1-day historical VaR/ES first
    daily_var_hist = []
    daily_es_hist = []
    daily_dates = []
    for i in range(sample_size, len(main_df_indexed)):
        window = main_df_indexed.iloc[i - sample_size:i]
        current_date = main_df_indexed.index[i]
        hist_res = calculate_historical_var_es(window, current_date, vAlpha)
        daily_var_hist.append(hist_res['VaR Historical'])
        daily_es_hist.append(hist_res['ES Historical'])
        daily_dates.append(current_date)
        
    daily_risk_dict = {}
    for k, alpha in enumerate(vAlpha):
        daily_risk_dict[f'VaR_1d_Hist_{int(alpha*100)}'] = [res[k] for res in daily_var_hist]
        daily_risk_dict[f'ES_1d_Hist_{int(alpha*100)}'] = [res[k] for res in daily_es_hist]
        
    daily_risk_df = pd.DataFrame(daily_risk_dict, index=pd.Index(daily_dates, name='Date'))

    # Merge daily risk with multi-day losses df
    multi_day_losses_df = multi_day_losses_df.merge(daily_risk_df, left_index=True, right_index=True, how='left')

    # Apply sqrt rule
    for alpha in vAlpha:
        alpha_perc = int(alpha*100)
        multi_day_losses_df[f'VaR_{interval}d_Hist_Sqrt_{alpha_perc}'] = multi_day_losses_df[f'VaR_1d_Hist_{alpha_perc}'] * np.sqrt(interval)
        multi_day_losses_df[f'ES_{interval}d_Hist_Sqrt_{alpha_perc}'] = multi_day_losses_df[f'ES_1d_Hist_{alpha_perc}'] * np.sqrt(interval)

    # Prepare output DataFrames
    var_cols = [f'VaR_{interval}d_Hist_Reg_{int(a*100)}' for a in vAlpha] + \
               [f'VaR_{interval}d_Hist_Sqrt_{int(a*100)}' for a in vAlpha]
    es_cols = [f'ES_{interval}d_Hist_Reg_{int(a*100)}' for a in vAlpha] + \
              [f'ES_{interval}d_Hist_Sqrt_{int(a*100)}' for a in vAlpha]
    
    var_multi_df = multi_day_losses_df[var_cols].copy()
    es_multi_df = multi_day_losses_df[es_cols].copy()
    # Add the actual loss column for plotting/backtesting
    var_multi_df[f'Actual_Loss_{interval}d'] = multi_day_losses_df[loss_col]
    es_multi_df[f'Actual_Loss_{interval}d'] = multi_day_losses_df[loss_col]

    return var_multi_df, es_multi_df

### Risk Calculation Method Functions (EWMA, FHS)

In [None]:
def compute_ewma_volatility(
    returns: pd.DataFrame, 
    lambdas: List[float] = [0.94, 0.97]
) -> Dict[str, pd.DataFrame]:
    """
    Compute EWMA volatility for each risk factor using different lambda values.
    """
    if not isinstance(returns, pd.DataFrame):
        raise TypeError("returns must be a pandas DataFrame")
    
    # Ensure returns are numeric and handle potential NaNs
    returns_numeric = returns.apply(pd.to_numeric, errors='coerce').fillna(0.0)
    
    results = {}
    
    for lambda_ in lambdas:
        n_obs, n_assets = returns_numeric.shape
        ewma_var = np.zeros((n_obs, n_assets))
        
        # Initialize first variance with sample variance of initial portion or overall
        initial_var = returns_numeric.var()
        ewma_var[0] = initial_var.values
        
        # Loop through time to apply EWMA variance formula
        for t in range(1, n_obs):
            ewma_var[t] = lambda_ * ewma_var[t-1] + (1 - lambda_) * returns_numeric.iloc[t-1].values**2
        
        # Convert variance to volatility (standard deviation)
        ewma_vol = np.sqrt(ewma_var)
        
        # Store results in dictionary
        results[f'lambda_{lambda_}'] = pd.DataFrame(
            ewma_vol,
            index=returns.index,
            columns=returns.columns
        )
    
    return results

In [None]:
def filter_returns(returns: pd.DataFrame, ewma_vol: dict) -> dict:
    """
    Compute standardized (filtered) returns for each lambda value.
    """
    filtered_returns = {}
    returns_numeric = returns.apply(pd.to_numeric, errors='coerce')
    
    # Filter returns for each lambda value
    for lambda_key, vol_df in ewma_vol.items():
        # Align indices before division
        vol_df_aligned, returns_aligned = vol_df.align(returns_numeric, join='inner', axis=0)
        # Avoid division by zero or near-zero volatility
        safe_vol_df = vol_df_aligned.replace(0, np.nan)
        filtered = returns_aligned / safe_vol_df
        # Handle potential NaNs resulting from division or original NaNs
        filtered_returns[lambda_key] = filtered.fillna(0.0) # Fill NaNs with 0
    
    return filtered_returns

In [None]:
def filtered_historical_simulation_multivariate(filtered_returns_dict: dict, ewma_vol_dict: dict,
                                              n_simulations: int = 10000, random_seed: int = None, 
                                              weights: np.ndarray = None) -> dict:
    """
    Perform Filtered Historical Simulation for a multi-asset portfolio for different lambda values.
    Calculates 1-day VaR and ES.
    """
    if random_seed is not None:
        np.random.seed(random_seed)
    
    results = {}
    
    # Process each lambda value
    for lambda_key in filtered_returns_dict.keys():
        filtered_returns = filtered_returns_dict[lambda_key]
        ewma_vol = ewma_vol_dict[lambda_key]
        
        if filtered_returns.empty or ewma_vol.empty:
             print(f"Warning: Empty filtered returns or EWMA vol for {lambda_key}. Skipping FHS.")
             results[lambda_key] = {'VaR': np.full(len(vAlpha), np.nan), 'ES': np.full(len(vAlpha), np.nan)}
             continue
             
        assets = filtered_returns.columns
        n_assets = len(assets)
        
        # Get the latest volatility forecast
        sigma_t_vector = ewma_vol.iloc[-1].values
        
        # Initialize simulated return matrix (n_simulations x n_assets)
        sim_returns_assets = np.zeros((n_simulations, n_assets))

        # Sample from standardized residuals for each asset
        for i, asset in enumerate(assets):
            z_asset = filtered_returns[asset].dropna().values
            if len(z_asset) == 0:
                 print(f"Warning: No residuals to sample for asset {asset} in {lambda_key}. Using zeros.")
                 z_star = np.zeros(n_simulations)
            else:
                 z_star = np.random.choice(z_asset, size=n_simulations, replace=True)
            
            # Rescale with the latest volatility forecast
            sim_returns_assets[:, i] = sigma_t_vector[i] * z_star

        # Calculate simulated portfolio returns
        if weights is not None:
            portfolio_simulated_returns = sim_returns_assets @ weights
        else:
            # If no weights, maybe return asset returns or mean? Assume portfolio context.
            print("Warning: FHS called without weights. Returning NaNs.")
            portfolio_simulated_returns = np.full(n_simulations, np.nan)
            
        # Calculate VaR and ES from simulated portfolio returns
        var_fhs = []
        es_fhs = []
        vAlpha = np.array([0.95, 0.99]) # Hardcoding alpha levels for now
        sorted_sim_losses = np.sort(-portfolio_simulated_returns) # Simulate losses
        
        for alpha in vAlpha:
            var_val = np.percentile(sorted_sim_losses, alpha * 100)
            losses_above_var = sorted_sim_losses[sorted_sim_losses >= var_val]
            es_val = losses_above_var.mean() if len(losses_above_var) > 0 else np.nan
            var_fhs.append(var_val)
            es_fhs.append(es_val)
            
        results[lambda_key] = {'VaR': np.array(var_fhs), 'ES': np.array(es_fhs)}

    return results

In [None]:
def filtered_historical_simulation_multiday(
    filtered_returns_dict: dict,
    ewma_vol_dict: dict,
    lambda_key: str,
    n_days: int = 1,
    n_simulations: int = 10000,
    random_seed: int = None,
    weights: np.ndarray = None,
    vAlpha = np.array([0.95, 0.99])
) -> dict:
    """
    Simulate N-day portfolio returns using Filtered Historical Simulation.
    Calculates N-day VaR and ES.
    """
    if random_seed is not None:
        np.random.seed(random_seed)

    filtered_returns = filtered_returns_dict.get(lambda_key)
    ewma_vol = ewma_vol_dict.get(lambda_key)

    if filtered_returns is None or ewma_vol is None or filtered_returns.empty or ewma_vol.empty:
        print(f"Warning: Missing or empty data for {lambda_key} in FHS multiday. Returning NaNs.")
        return {'VaR': np.full(len(vAlpha), np.nan), 'ES': np.full(len(vAlpha), np.nan)}

    assets = filtered_returns.columns
    n_assets = len(assets)
    # Store simulated N-day total returns for the portfolio
    sim_portfolio_nday_returns = np.zeros(n_simulations)

    # Get the latest volatility forecast vector
    sigma_t_vector = ewma_vol.iloc[-1].values

    # Pre-sample residuals for efficiency
    residuals_sampled = {}
    valid_sampling = True
    for i, asset in enumerate(assets):
        z_asset = filtered_returns[asset].dropna().values
        if len(z_asset) == 0:
            print(f"Warning: No residuals for asset {asset} in {lambda_key}. Using zeros.")
            residuals_sampled[asset] = np.zeros((n_simulations, n_days))
            # If any asset has no residuals, the simulation might be invalid
            # valid_sampling = False 
            # Depending on requirements, might want to stop or continue with zeros
        else:
            # Sample N-day paths by sampling N times independently for each simulation
            residuals_sampled[asset] = np.random.choice(z_asset, size=(n_simulations, n_days), replace=True)
            
    # if not valid_sampling:
    #     print(f"Error: Cannot perform valid FHS simulation for {lambda_key} due to missing residuals.")
    #     return {'VaR': np.full(len(vAlpha), np.nan), 'ES': np.full(len(vAlpha), np.nan)}

    # Simulate N-day returns for each asset and calculate portfolio return
    sim_asset_nday_returns = np.zeros((n_simulations, n_assets))
    for i, asset in enumerate(assets):
        # Rescale sampled residuals by the constant forecast volatility over the horizon
        # Sum the daily simulated returns to get N-day return for the asset
        sim_asset_nday_returns[:, i] = (residuals_sampled[asset] * sigma_t_vector[i]).sum(axis=1)
        
    # Calculate N-day portfolio return using weights
    if weights is not None:
        sim_portfolio_nday_returns = sim_asset_nday_returns @ weights
    else:
        print("Warning: FHS multiday called without weights. Returning NaNs.")
        sim_portfolio_nday_returns = np.full(n_simulations, np.nan)
        
    # Calculate VaR and ES from simulated N-day portfolio returns
    var_fhs_nday = []
    es_fhs_nday = []
    sorted_sim_losses = np.sort(-sim_portfolio_nday_returns) # Losses
    
    for alpha in vAlpha:
        var_val = np.percentile(sorted_sim_losses, alpha * 100)
        losses_above_var = sorted_sim_losses[sorted_sim_losses >= var_val]
        es_val = losses_above_var.mean() if len(losses_above_var) > 0 else np.nan
        var_fhs_nday.append(var_val)
        es_fhs_nday.append(es_val)
        
    return {'VaR': np.array(var_fhs_nday), 'ES': np.array(es_fhs_nday)}

### Risk Calculation Method Functions (GARCH)

In [None]:
# Maximum likelihood estimation of GARCH(1,1) parameters
def garch_likelihood(params, returns):
    omega, alpha, beta = params
    # Constraints check
    if omega <= 0 or alpha < 0 or beta < 0 or alpha + beta >= 1:
        return np.inf  # Return infinity for invalid parameters
        
    T = len(returns)
    var = np.zeros(T)
    
    # Initialize variance (e.g., with unconditional variance or sample variance)
    var[0] = omega / (1 - alpha - beta) # Unconditional variance
    if var[0] <= 0:
         var[0] = np.var(returns) # Fallback to sample variance if unconditional is non-positive
         if var[0] <= 0:
              var[0] = 1e-6 # Small positive number if sample variance is also non-positive
              
    log_likelihood = 0
    
    # Iterate through returns to calculate conditional variance and log-likelihood
    for t in range(1, T):
        var[t] = omega + alpha * returns[t-1]**2 + beta * var[t-1]
        # Ensure variance is positive
        if var[t] <= 0:
            return np.inf # Variance must be positive
        
        # Log-likelihood contribution for time t (assuming normality)
        log_likelihood += -0.5 * (np.log(2 * np.pi) + np.log(var[t]) + returns[t]**2 / var[t])
        
    # Return negative log-likelihood for minimization
    return -log_likelihood

In [None]:
# GARCH(1,1) parameter estimation using MLE
def estimate_garch_params(returns):
    """Estimates GARCH(1,1) parameters for a given return series."""
    returns_clean = returns.dropna().values
    if len(returns_clean) < 10: # Need sufficient data
         print("Warning: Insufficient data for GARCH estimation.")
         return None
         
    # Initial guess (can influence convergence)
    initial_guess = [np.var(returns_clean) * 0.01, 0.1, 0.85]
    
    # Bounds for parameters: omega > 0, 0 <= alpha < 1, 0 <= beta < 1
    bounds = [(1e-7, None), (0, 0.999), (0, 0.999)]
    
    # Constraint: alpha + beta < 1 (for stationarity)
    constraints = ({'type': 'ineq', 'fun': lambda params: 1 - params[1] - params[2]})
    
    # Run the optimization
    result = minimize(
        garch_likelihood,
        x0=initial_guess,
        args=(returns_clean,),
        method='SLSQP', # Sequential Least Squares Programming is good for constrained optimization
        bounds=bounds,
        constraints=constraints,
        options={'disp': False, 'ftol': 1e-7} # Suppress output, set tolerance
    )

    if result.success and result.x[1] + result.x[2] < 1:
        return result.x # Return optimized parameters [omega, alpha, beta]
    else:
        print(f"GARCH Optimization failed or non-stationary: {result.message}")
        # Fallback or default parameters if optimization fails
        # return [np.var(returns_clean) * 0.01, 0.05, 0.9] # Example fallback
        return None

In [None]:
# Calculate the GARCH(1,1) volatility forecast
def forecast_garch_volatility(params, returns):
    """Forecasts the next day's volatility using estimated GARCH params."""
    if params is None:
        # Handle case where estimation failed - use sample std dev as fallback
        print("Warning: GARCH params not available, using sample std dev for forecast.")
        return np.std(returns.dropna())
        
    omega, alpha, beta = params
    returns_clean = returns.dropna().values
    T = len(returns_clean)
    if T < 1:
         print("Warning: No returns data for GARCH forecast.")
         return np.nan
         
    var = np.zeros(T)
    # Initialize variance
    var[0] = omega / (1 - alpha - beta) if (1 - alpha - beta) > 0 else np.var(returns_clean)
    if var[0] <= 0: var[0] = 1e-6
    
    # Calculate historical conditional variances
    for t in range(1, T):
        var[t] = omega + alpha * returns_clean[t-1]**2 + beta * var[t-1]
        if var[t] <= 0: var[t] = var[t-1] # Prevent non-positive variance
        
    # Forecast next day's variance
    forecast_var = omega + alpha * returns_clean[-1]**2 + beta * var[-1]
    if forecast_var <= 0: forecast_var = var[-1] # Ensure positive forecast
    
    return np.sqrt(forecast_var)

In [None]:
# Calculate constant conditional correlation matrix
def calculate_ccc_matrix(returns_df):
    """Calculates the constant conditional correlation matrix from returns."""
    returns_clean = returns_df.dropna()
    if returns_clean.empty or returns_clean.shape[1] < 2:
         print("Warning: Insufficient data for correlation matrix.")
         # Return identity matrix or handle as error
         n = returns_df.shape[1]
         return np.identity(n) if n > 0 else np.array([[]])
         
    # Calculate the correlation matrix using pandas .corr() method
    corr_matrix = returns_clean.corr().values
    return corr_matrix

In [None]:
# Calculate the GARCH-CCC covariance matrix forecast
def forecast_garch_ccc_covariance(returns_df, garch_params_dict, corr_matrix):
    """Forecasts the covariance matrix using GARCH volatilities and CCC."""
    asset_cols = returns_df.columns
    n_assets = len(asset_cols)
    vol_forecasts = np.zeros(n_assets)
    
    # Get volatility forecast for each asset
    for i, col in enumerate(asset_cols):
        params = garch_params_dict.get(col)
        vol_forecasts[i] = forecast_garch_volatility(params, returns_df[col])
        
    # Create diagonal matrix of volatility forecasts
    D_t = np.diag(vol_forecasts)
    
    # Calculate the forecasted covariance matrix: H_t = D_t * R * D_t
    cov_matrix_forecast = D_t @ corr_matrix @ D_t
    
    return cov_matrix_forecast

In [None]:
# Calculate portfolio variance and volatility from covariance matrix
def calculate_portfolio_variance_volatility(weights, cov_matrix):
    """Calculates portfolio variance and volatility given weights and covariance matrix."""
    port_variance = weights.T @ cov_matrix @ weights
    # Ensure variance is non-negative
    port_variance = max(port_variance, 0)
    port_volatility = np.sqrt(port_variance)
    return port_variance, port_volatility

In [None]:
# Calculate VaR and ES using GARCH-CCC forecast (assuming normality)
def calculate_garch_ccc_var_es(weights, cov_matrix_forecast, vAlpha):
    """Calculates VaR and ES based on GARCH-CCC forecast, assuming normal distribution."""
    _, port_volatility_forecast = calculate_portfolio_variance_volatility(weights, cov_matrix_forecast)
    
    var_garch = []
    es_garch = []
    
    for alpha in vAlpha:
        # VaR = Portfolio_Volatility * Z_alpha (Losses are positive)
        var_val = port_volatility_forecast * st.norm.ppf(alpha)
        # ES = Portfolio_Volatility * E[Z | Z > Z_alpha] (Losses are positive)
        es_val = port_volatility_forecast * st.norm.pdf(st.norm.ppf(alpha)) / (1 - alpha)
        var_garch.append(var_val)
        es_garch.append(es_val)
        
    return {'VaR': np.array(var_garch), 'ES': np.array(es_garch)}

### Backtesting Functions

In [None]:
def calculate_violations(actual_losses, var_predictions):
    """Checks for VaR violations (actual loss > predicted VaR)."""
    # Ensure inputs are aligned Series
    actual_losses_aligned, var_predictions_aligned = actual_losses.align(var_predictions, join='inner')
    return actual_losses_aligned > var_predictions_aligned

def backtest_var(violations, alpha):
    """Performs basic VaR backtesting: counts violations and compares yearly rates."""
    if violations.empty:
        print("Warning: No violations data to backtest.")
        return pd.DataFrame(columns=['Actual Violations', 'Expected Violations', 'Total Observations', 'Violation Rate (%)', 'Expected Rate (%)'])
        
    violations_df = pd.DataFrame({'Violations': violations, 'Year': violations.index.year})
    yearly_violations = violations_df.groupby('Year')['Violations'].sum()
    yearly_counts = violations_df.groupby('Year')['Violations'].count()
    
    expected_violations = yearly_counts * (1 - alpha)
    violation_rate = (yearly_violations / yearly_counts) * 100
    expected_rate = (1 - alpha) * 100
    
    summary = pd.DataFrame({
        'Actual Violations': yearly_violations,
        'Expected Violations': expected_violations,
        'Total Observations': yearly_counts,
        'Violation Rate (%)': violation_rate,
        'Expected Rate (%)': expected_rate
    })
    # Add overall summary row
    overall_actual = summary['Actual Violations'].sum()
    overall_total = summary['Total Observations'].sum()
    overall_expected = overall_total * (1 - alpha)
    overall_rate = (overall_actual / overall_total) * 100 if overall_total > 0 else 0
    summary.loc['Overall'] = [overall_actual, overall_expected, overall_total, overall_rate, expected_rate]
    
    return summary

def backtest_es(actual_losses, violations, es_predictions):
    """Performs basic ES backtesting: compares average shortfall in violation periods to predicted ES."""
    # Align inputs
    actual_losses_aligned, violations_aligned, es_predictions_aligned = actual_losses.align(violations, es_predictions, join='inner')
    
    if violations_aligned.empty:
        print("Warning: No violations data for ES backtest.")
        return pd.DataFrame(columns=['Avg Actual Shortfall', 'Avg Predicted ES', 'Violation Count'])
        
    results_df = pd.DataFrame({
        'Actual_Loss': actual_losses_aligned,
        'Violation': violations_aligned,
        'Predicted_ES': es_predictions_aligned,
        'Year': actual_losses_aligned.index.year
    })
    
    # Filter for violations
    violation_data = results_df[results_df['Violation']].copy()
    
    if violation_data.empty:
         print("Info: No violations occurred for ES backtest.")
         return pd.DataFrame(columns=['Avg Actual Shortfall', 'Avg Predicted ES', 'Violation Count'])
         
    # Calculate yearly averages
    yearly_avg_actual_shortfall = violation_data.groupby('Year')['Actual_Loss'].mean()
    yearly_avg_predicted_es = violation_data.groupby('Year')['Predicted_ES'].mean()
    yearly_violation_count = violation_data.groupby('Year').size()

    summary = pd.DataFrame({
        'Avg Actual Shortfall': yearly_avg_actual_shortfall,
        'Avg Predicted ES': yearly_avg_predicted_es,
        'Violation Count': yearly_violation_count
    })
    # Add overall summary row
    overall_avg_shortfall = violation_data['Actual_Loss'].mean()
    overall_avg_es = violation_data['Predicted_ES'].mean()
    overall_count = violation_data.shape[0]
    summary.loc['Overall'] = [overall_avg_shortfall, overall_avg_es, overall_count]
    
    return summary

def plot_violations(violations, dates, title):
    """Plots VaR violations over time."""
    if violations.empty or dates.empty:
         print(f"Warning: Cannot plot violations for '{title}' - data is empty.")
         return
         
    plt.figure(figsize=(15, 4))
    # Ensure we only plot points where violation is True (or 1)
    violation_points = violations[violations == True]
    if not violation_points.empty:
        plt.plot(violation_points.index, np.ones(len(violation_points)), 'ro', markersize=4, alpha=0.7, label='Violation')
    else:
         # Plot an empty graph if no violations, but keep structure
         plt.plot([], [], 'ro', markersize=4, alpha=0.7, label='Violation') 
         
    # Set x-axis limits based on the full date range
    plt.xlim(dates.min(), dates.max())
    plt.ylim(-0.1, 1.1) # Set y-limits to accommodate 0 and 1
    plt.title(title)
    plt.xlabel('Date')
    plt.ylabel('Violation (1=Yes)')
    plt.yticks([0, 1])
    plt.legend()
    plt.grid(axis='y', linestyle='--')
    plt.show()

### Stress Testing Functions

In [None]:
def apply_stress(df, column, start_idx, duration, magnitude, stress_type='percentage'):
    """Applies a stress shock to a DataFrame column.
    
    Args:
        df (pd.DataFrame): The DataFrame to modify (should be a copy).
        column (str): The name of the column to stress.
        start_idx (int): The starting index for the stress period.
        duration (int): The number of days the stress lasts.
        magnitude (float): The size of the shock.
        stress_type (str): 'percentage' or 'absolute'.
        
    Returns:
        pd.DataFrame: The DataFrame with the stress applied.
    """
    df_stressed = df.copy()
    end_idx = min(start_idx + duration, len(df_stressed))
    target_indices = df_stressed.index[start_idx:end_idx]
    
    if target_indices.empty:
        print(f"Warning: No indices found for stress period starting at {start_idx}.")
        return df_stressed
        
    original_values = df_stressed.loc[target_indices, column]
    
    if stress_type == 'percentage':
        stressed_values = original_values * (1 + magnitude)
    elif stress_type == 'absolute':
        stressed_values = original_values + magnitude
    else:
        raise ValueError("stress_type must be 'percentage' or 'absolute'")
        
    df_stressed.loc[target_indices, column] = stressed_values
    print(f"Applied {stress_type} stress ({magnitude:.2f}) to {column} from {target_indices[0]} to {target_indices[-1]}")
    return df_stressed

def stress_equity(df_original, repetitions=5, max_duration=4):
    """Applies random stress shocks to equity index closing prices."""
    df = df_original.copy()
    equity_cols = ['S&P500_Closing', 'Dax40_Closing', 'Nikkei_Closing']
    magnitudes = [-0.4, -0.2, 0.2, 0.4]
    n = len(df)
    
    if n <= max_duration:
        print("DataFrame too small for stress testing.")
        return df
        
    print("\n--- Applying Equity Stress ---")
    for _ in range(repetitions):
        col_to_stress = random.choice(equity_cols)
        duration = random.randint(1, max_duration)
        start_idx = random.randint(0, n - duration)
        magnitude = random.choice(magnitudes)
        df = apply_stress(df, col_to_stress, start_idx, duration, magnitude, 'percentage')
        
    return df

def stress_currency(df_original, repetitions=5, max_duration=4):
    """Applies random stress shocks to currency exchange rates."""
    df = df_original.copy()
    currency_mags = {
        'USD/EUR': [-0.1, 0.1], 
        'JPY/EUR': [-0.2, 0.2]
    }
    currency_cols = list(currency_mags.keys())
    n = len(df)
    
    if n <= max_duration:
        print("DataFrame too small for stress testing.")
        return df
        
    print("\n--- Applying Currency Stress ---")
    for _ in range(repetitions):
        col_to_stress = random.choice(currency_cols)
        duration = random.randint(1, max_duration)
        start_idx = random.randint(0, n - duration)
        magnitude = random.choice(currency_mags[col_to_stress])
        df = apply_stress(df, col_to_stress, start_idx, duration, magnitude, 'percentage')
        
    return df

def stress_commodity(df_original, repetitions=5, max_duration=4):
    """Placeholder for commodity stress testing. Returns original DataFrame."""
    print("\n--- Applying Commodity Stress (Placeholder) ---")
    print("Stress Test Info: No commodity columns found. Skipping commodity stress test.")
    return df_original.copy()

def stress_interest_rate(df_original, repetitions=5, max_duration=4):
    """Applies random absolute shifts to the government bond yield."""
    df = df_original.copy()
    interest_col = 'Yield curve spot rate, 10-year maturity - Government bond'
    magnitudes = [-3.0, -2.0, 2.0, 3.0]
    n = len(df)
    
    if n <= max_duration:
        print("DataFrame too small for stress testing.")
        return df
        
    if interest_col not in df.columns:
        print(f"Stress Test Info: Column '{interest_col}' not found. Skipping interest rate stress test.")
        return df
        
    print("\n--- Applying Interest Rate Stress ---")
    for _ in range(repetitions):
        duration = random.randint(1, max_duration)
        start_idx = random.randint(0, n - duration)
        magnitude = random.choice(magnitudes)
        df = apply_stress(df, interest_col, start_idx, duration, magnitude, 'absolute')
        
    return df

### Main Analysis Runner Function

In [None]:
def run_full_analysis(df_input, vAlpha=np.array([0.95, 0.99]), sample_size=500, 
                      degrees_of_freedom=[0, 3, 4, 5, 6], fhs_lambdas=[0.94, 0.97], 
                      fhs_simulations=1000, garch_weights=weights_array):
    """Runs all VaR/ES calculations and backtesting for a given DataFrame."""
    print(f"\n=== Starting Full Analysis for DataFrame ending {df_input.index[-1]} ===")
    df_analysis = df_input.copy()
    
    # --- 1. 1-Day VaR/ES Calculations (Var/Cov, Historical) ---
    print("Calculating 1-Day VaR/ES (Var/Cov, Historical)...")
    VaR_results_1d = []
    ES_results_1d = []
    dates_1d = []
    
    # Ensure sample_size is valid
    if sample_size >= len(df_analysis):
         print(f"Warning: sample_size ({sample_size}) too large for DataFrame length ({len(df_analysis)}). Skipping rolling calculations.")
         # Return empty or partially filled results
         return {}, {}, {}, {}, {}, {}, {}
         
    for i in range(sample_size, len(df_analysis)):
        window = df_analysis.iloc[i - sample_size:i]
        current_date = df_analysis.index[i]
        dates_1d.append(current_date)
        
        loss_stats = calculate_daily_loss_variables(window, current_date)
        mean_loss = loss_stats["Portfolio_mean_loss"]
        portfolio_std_loss = loss_stats["Portfolio_std_loss"]
        
        var_row = {'Date': current_date}
        es_row = {'Date': current_date}
        
        # Var/Cov Methods
        for df_t in degrees_of_freedom:
            results = calculate_var_cov(window, current_date, vAlpha, mean_loss, portfolio_std_loss, df=df_t)
            dist_label = f"T{df_t}" if df_t > 0 else "Normal"
            var_row[f'VaR {dist_label}'] = results[f'VaR {dist_label}']
            es_row[f'ES {dist_label}'] = results[f'ES {dist_label}']
        
        # Historical Method
        hist_results = calculate_historical_var_es(window, current_date, vAlpha)
        var_row['VaR Historical'] = hist_results['VaR Historical']
        es_row['ES Historical'] = hist_results['ES Historical']
        
        VaR_results_1d.append(var_row)
        ES_results_1d.append(es_row)
        
    var_results_df = pd.DataFrame(VaR_results_1d).set_index('Date') if VaR_results_1d else pd.DataFrame()
    es_results_df = pd.DataFrame(ES_results_1d).set_index('Date') if ES_results_1d else pd.DataFrame()
    
    # --- 2. Multi-Day VaR/ES (Historical - Regular & Sqrt) ---
    print("Calculating Multi-Day VaR/ES (Historical)...")
    var_5d_df, es_5d_df = calculate_multiday_risk(df_analysis, vAlpha, 5, sample_size)
    var_10d_df, es_10d_df = calculate_multiday_risk(df_analysis, vAlpha, 10, sample_size)
    
    # --- 3. FHS Calculations (Placeholder - Requires more integration) ---
    # print("Calculating 1-Day VaR/ES (FHS - Placeholder)...")
    # fhs_results = {} # Store FHS VaR/ES here
    # Add FHS calculation logic if needed, similar structure to GARCH below
    # Need to run EWMA, filter returns, then simulate within the loop or separately
    
    # --- 4. GARCH-CCC Calculations (Placeholder - Requires more integration) ---
    # print("Calculating 1-Day VaR/ES (GARCH-CCC - Placeholder)...")
    # garch_results = {} # Store GARCH VaR/ES here
    # Add GARCH estimation and forecasting logic if needed
    # Needs careful handling within the rolling window
    
    # --- 5. Backtesting ---
    print("Running Backtesting...")
    backtest_summaries = {}
    actual_losses_full = df_analysis['Portfolio_loss']
    
    # Backtest 1-Day Models
    for model_col in var_results_df.columns:
        if model_col == 'Date': continue
        es_col = model_col.replace('VaR', 'ES')
        if es_col not in es_results_df.columns:
             print(f"Skipping backtest for {model_col}, no matching ES column.")
             continue
             
        print(f"  Backtesting {model_col}...")
        model_backtest = {}
        for i, alpha in enumerate(vAlpha):
            alpha_perc = int(alpha * 100)
            # Extract VaR/ES series for this alpha
            try:
                var_preds = var_results_df[model_col].apply(lambda x: x[i] if isinstance(x, (list, np.ndarray)) and len(x) > i else np.nan)
                es_preds = es_results_df[es_col].apply(lambda x: x[i] if isinstance(x, (list, np.ndarray)) and len(x) > i else np.nan)
            except Exception as e:
                 print(f"    Error extracting predictions for {model_col} alpha={alpha}: {e}")
                 continue
                 
            # Align actual losses with predictions
            actual_losses, var_preds_aligned = actual_losses_full.align(var_preds, join='inner')
            _, es_preds_aligned = actual_losses_full.align(es_preds, join='inner')
            
            if actual_losses.empty or var_preds_aligned.empty:
                 print(f"    Skipping alpha={alpha}, no aligned data.")
                 continue
                 
            violations = calculate_violations(actual_losses, var_preds_aligned)
            var_summary = backtest_var(violations, alpha)
            es_summary = backtest_es(actual_losses, violations, es_preds_aligned)
            
            model_backtest[f'VaR_{alpha_perc}'] = var_summary
            model_backtest[f'ES_{alpha_perc}'] = es_summary
            # plot_violations(violations, violations.index, f'{model_col} Violations (alpha={alpha})')
            
        backtest_summaries[model_col.replace('VaR ', '')] = model_backtest
        
    # Backtest Multi-Day Models (Example for 5-day Historical Regular)
    # Add similar loops for other multi-day models if needed
    print("  Backtesting Multi-Day Historical (Example: 5d Reg)...")
    for alpha in vAlpha:
        alpha_perc = int(alpha*100)
        var_col_5d = f'VaR_5d_Hist_Reg_{alpha_perc}'
        es_col_5d = f'ES_5d_Hist_Reg_{alpha_perc}'
        actual_col_5d = 'Actual_Loss_5d'
        
        if var_col_5d in var_5d_df.columns and actual_col_5d in var_5d_df.columns:
            var_preds_5d = var_5d_df[var_col_5d]
            actual_losses_5d = var_5d_df[actual_col_5d]
            es_preds_5d = es_5d_df[es_col_5d] if es_col_5d in es_5d_df.columns else None
            
            actual_losses_5d_aligned, var_preds_5d_aligned = actual_losses_5d.align(var_preds_5d, join='inner')
            violations_5d = calculate_violations(actual_losses_5d_aligned, var_preds_5d_aligned)
            var_summary_5d = backtest_var(violations_5d, alpha)
            
            model_key = f'Historical_5d_Reg'
            if model_key not in backtest_summaries: backtest_summaries[model_key] = {}
            backtest_summaries[model_key][f'VaR_{alpha_perc}'] = var_summary_5d
            
            if es_preds_5d is not None:
                 _, es_preds_5d_aligned = actual_losses_5d.align(es_preds_5d, join='inner')
                 es_summary_5d = backtest_es(actual_losses_5d_aligned, violations_5d, es_preds_5d_aligned)
                 backtest_summaries[model_key][f'ES_{alpha_perc}'] = es_summary_5d
                 
            # plot_violations(violations_5d, violations_5d.index, f'{model_key} Violations (alpha={alpha})')
        else:
            print(f"    Skipping 5d Hist Reg alpha={alpha}, missing columns.")
            
    print("=== Full Analysis Complete ===")
    return var_results_df, es_results_df, var_5d_df, es_5d_df, var_10d_df, es_10d_df, backtest_summaries

## Initial Data Calculation and Baseline Analysis

In [None]:
# Perform initial calculations on the loaded data
print("Performing initial calculations on baseline data...")
main_df_calculated = calculate_bond_metrics(main_df_initial, interest_bond_initial)
main_df_calculated = calculate_investment_values(main_df_calculated, invested_amounts_initial, start_date)
main_df_calculated = calculate_portfolio_metrics(main_df_calculated, starting_investment, start_date)

print("\nBaseline Data Head with Portfolio Metrics:")
display(main_df_calculated[['Portfolio_Value_EUR', 'Portfolio_Change_EUR', 'Portfolio_loss', 'Portfolio_Daily_Returns']].head())

# Run the full analysis on the baseline data
var_1d_base, es_1d_base, var_5d_base, es_5d_base, var_10d_base, es_10d_base, backtest_base = run_full_analysis(main_df_calculated)

## Baseline Results Display

In [None]:
print("\n--- Baseline 1-Day VaR Results (Historical) ---")
if 'VaR Historical' in var_1d_base:
    # Extract 95% and 99% VaR from the arrays
    var_hist_95 = var_1d_base['VaR Historical'].apply(lambda x: x[0] if isinstance(x, (list, np.ndarray)) else np.nan)
    var_hist_99 = var_1d_base['VaR Historical'].apply(lambda x: x[1] if isinstance(x, (list, np.ndarray)) and len(x)>1 else np.nan)
    display(pd.DataFrame({'VaR 95%': var_hist_95, 'VaR 99%': var_hist_99}).tail())
else:
    print("Historical VaR not found in baseline results.")

print("\n--- Baseline 5-Day VaR Results (Historical Regular) ---")
if 'VaR_5d_Hist_Reg_95' in var_5d_base:
    display(var_5d_base[['VaR_5d_Hist_Reg_95', 'VaR_5d_Hist_Reg_99', 'Actual_Loss_5d']].tail())
else:
    print("5-Day Historical Regular VaR not found.")

print("\n--- Baseline 10-Day VaR Results (Historical Regular) ---")
if 'VaR_10d_Hist_Reg_95' in var_10d_base:
    display(var_10d_base[['VaR_10d_Hist_Reg_95', 'VaR_10d_Hist_Reg_99', 'Actual_Loss_10d']].tail())
else:
    print("10-Day Historical Regular VaR not found.")

print("\n--- Baseline Backtest Summary (Historical VaR 99%) ---")
if 'Historical' in backtest_base and 'VaR_99' in backtest_base['Historical']:
    display(backtest_base['Historical']['VaR_99'])
else:
    print("Historical VaR 99% backtest summary not found.")

# Stress Testing

Apply stress scenarios and re-run the analysis to see the impact.

In [None]:
# Apply Stress Scenarios
stressed_df_equity = stress_equity(main_df_calculated)
stressed_df_currency = stress_currency(main_df_calculated)
stressed_df_interest = stress_interest_rate(main_df_calculated)
# stressed_df_commodity = stress_commodity(main_df_calculated) # Placeholder

# Recalculate dependent columns for each stressed DataFrame
print("\nRecalculating metrics for EQUITY stressed data...")
stressed_df_equity_recalc = recalculate_dependent_columns(stressed_df_equity, starting_investment, weights_dict, start_date, interest_bond_initial)

print("\nRecalculating metrics for CURRENCY stressed data...")
stressed_df_currency_recalc = recalculate_dependent_columns(stressed_df_currency, starting_investment, weights_dict, start_date, interest_bond_initial)

print("\nRecalculating metrics for INTEREST RATE stressed data...")
stressed_df_interest_recalc = recalculate_dependent_columns(stressed_df_interest, starting_investment, weights_dict, start_date, interest_bond_initial)

## Run Analysis on Stressed Data

In [None]:
# Run full analysis on each stressed dataset
# Note: Backtesting results on artificially stressed data might not be meaningful,
# but we run the full function for consistency.
var_1d_eq, es_1d_eq, var_5d_eq, es_5d_eq, var_10d_eq, es_10d_eq, backtest_eq = run_full_analysis(stressed_df_equity_recalc)
var_1d_cur, es_1d_cur, var_5d_cur, es_5d_cur, var_10d_cur, es_10d_cur, backtest_cur = run_full_analysis(stressed_df_currency_recalc)
var_1d_ir, es_1d_ir, var_5d_ir, es_5d_ir, var_10d_ir, es_10d_ir, backtest_ir = run_full_analysis(stressed_df_interest_recalc)

## Comparison of Baseline vs. Stressed Results

In [None]:
def extract_latest_var(var_df, model_col, alpha_idx=1): # Default to 99% VaR (index 1)
    """Extracts the latest VaR value for a given model and alpha index."""
    if var_df is None or var_df.empty or model_col not in var_df.columns:
        return np.nan
    try:
        latest_val_array = var_df[model_col].iloc[-1]
        if isinstance(latest_val_array, (list, np.ndarray)) and len(latest_val_array) > alpha_idx:
            return latest_val_array[alpha_idx]
        else:
             # Handle cases where it might not be an array (e.g., multi-day results)
             # Check if column name itself implies alpha
             if f'_{int(vAlpha[alpha_idx]*100)}' in model_col:
                  return latest_val_array
             else:
                  return np.nan # Cannot determine correct alpha value
    except (IndexError, TypeError):
        return np.nan

# Compare latest 1-Day 99% Historical VaR
latest_var_base_1d = extract_latest_var(var_1d_base, 'VaR Historical')
latest_var_eq_1d = extract_latest_var(var_1d_eq, 'VaR Historical')
latest_var_cur_1d = extract_latest_var(var_1d_cur, 'VaR Historical')
latest_var_ir_1d = extract_latest_var(var_1d_ir, 'VaR Historical')

# Compare latest 5-Day 99% Historical VaR (Regular)
latest_var_base_5d = extract_latest_var(var_5d_base, 'VaR_5d_Hist_Reg_99', alpha_idx=None) # Alpha in col name
latest_var_eq_5d = extract_latest_var(var_5d_eq, 'VaR_5d_Hist_Reg_99', alpha_idx=None)
latest_var_cur_5d = extract_latest_var(var_5d_cur, 'VaR_5d_Hist_Reg_99', alpha_idx=None)
latest_var_ir_5d = extract_latest_var(var_5d_ir, 'VaR_5d_Hist_Reg_99', alpha_idx=None)

# Compare latest 10-Day 99% Historical VaR (Regular)
latest_var_base_10d = extract_latest_var(var_10d_base, 'VaR_10d_Hist_Reg_99', alpha_idx=None) # Alpha in col name
latest_var_eq_10d = extract_latest_var(var_10d_eq, 'VaR_10d_Hist_Reg_99', alpha_idx=None)
latest_var_cur_10d = extract_latest_var(var_10d_cur, 'VaR_10d_Hist_Reg_99', alpha_idx=None)
latest_var_ir_10d = extract_latest_var(var_10d_ir, 'VaR_10d_Hist_Reg_99', alpha_idx=None)

comparison_data = {
    'Scenario': ['Baseline', 'Equity Stress', 'Currency Stress', 'Interest Rate Stress'],
    '1-Day VaR 99% (Hist)': [latest_var_base_1d, latest_var_eq_1d, latest_var_cur_1d, latest_var_ir_1d],
    '5-Day VaR 99% (Hist Reg)': [latest_var_base_5d, latest_var_eq_5d, latest_var_cur_5d, latest_var_ir_5d],
    '10-Day VaR 99% (Hist Reg)': [latest_var_base_10d, latest_var_eq_10d, latest_var_cur_10d, latest_var_ir_10d]
}

comparison_df = pd.DataFrame(comparison_data).set_index('Scenario')

print("\n--- Comparison of Latest 99% Historical VaR across Scenarios ---")
display(comparison_df.round(2))

### Stress Test Interpretation

Compare the VaR values in the table above:
*   **Equity Stress:** How much did the VaR increase when equity prices experienced sharp moves? This shows sensitivity to market crashes or rallies.
*   **Currency Stress:** How did VaR change when exchange rates moved significantly? This highlights the portfolio's FX risk.
*   **Interest Rate Stress:** What was the impact of large shifts in interest rates on VaR? This indicates sensitivity to changes in bond yields.

The magnitude of the change in VaR under each stress scenario reveals the portfolio's vulnerability to that specific risk factor.

In [None]:
# Optional: Add plots comparing VaR/ES time series for baseline vs stressed scenarios
# Example: Plot 1-Day Historical VaR 99%
plt.figure(figsize=(14, 7))
if 'VaR Historical' in var_1d_base:
    var_1d_base['VaR Historical'].apply(lambda x: x[1] if isinstance(x, (list, np.ndarray)) and len(x)>1 else np.nan).plot(label='Baseline VaR 99%', alpha=0.7)
if 'VaR Historical' in var_1d_eq:
    var_1d_eq['VaR Historical'].apply(lambda x: x[1] if isinstance(x, (list, np.ndarray)) and len(x)>1 else np.nan).plot(label='Equity Stress VaR 99%', alpha=0.7, linestyle='--')
if 'VaR Historical' in var_1d_cur:
    var_1d_cur['VaR Historical'].apply(lambda x: x[1] if isinstance(x, (list, np.ndarray)) and len(x)>1 else np.nan).plot(label='Currency Stress VaR 99%', alpha=0.7, linestyle=':')
if 'VaR Historical' in var_1d_ir:
    var_1d_ir['VaR Historical'].apply(lambda x: x[1] if isinstance(x, (list, np.ndarray)) and len(x)>1 else np.nan).plot(label='Interest Rate Stress VaR 99%', alpha=0.7, linestyle='-.')

plt.title('Comparison of 1-Day 99% Historical VaR over Time')
plt.ylabel('VaR (EUR)')
plt.legend()
plt.grid(True)
plt.show()

### Original Portfolio Details (Reference)

#### Instruments:
- **S&P500**
- **DAX40**
- **NIKKEI**
- **EU Government Bond (10-year maturity, AAA-rated)**

#### Invested amount:
- **10,000,000 EURO**

#### Period:
- **01/01/2012 - 31/12/2022**

#### Weights:
- **S&P500**: 0.4  
- **DAX40**: 0.3  
- **NIKKEI**: 0.15  
- **EU Government Bond**: 0.15  

#### Measures:
- **Value at Risk (VaR)**: 1, 5, 10 days  
- **Expected Shortfall (ES)**  

## Appendix: Original Code Cells (Removed/Integrated)

In [None]:
# --- This cell's logic is now integrated into run_full_analysis ---
# loss_values = main_df['Portfolio_loss'].values
# ... (min, max, mean calculation) ...

In [None]:
# --- This cell's logic is replaced by the Baseline Results Display section ---
# var_results_df, es_results_df, var_5d, var_10d = main()
# print("VaR results")
# display(var_results_df.head())
# ...

In [None]:
# --- This cell's plotting logic can be adapted or is partially covered by backtesting plots ---
# fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 12), sharex=False)
# ... (Plotting 5d/10d VaR vs Actual Loss and Violations) ...

In [None]:
# --- This cell's plotting logic can be adapted or is partially covered by backtesting plots ---
# plt.figure(figsize=(12, 6))
# ... (Plotting Historical VaR/ES over time) ...

In [None]:
# --- This cell plots return distributions, can be kept if desired ---
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# ... (Plotting return distributions vs Normal/T) ...

In [None]:
# --- This cell's plotting logic is integrated into backtesting or comparison plots ---
# def plot_var_es_vs_actual_given_actuals(...):
# ...

In [None]:
# --- This cell's logic is integrated into multi-day risk calculation ---
# def compute_actual_portfolio_returns(...):
# ...

In [None]:
# --- This cell's logic is integrated into run_full_analysis (FHS/EWMA part, currently placeholder) ---
# var_df, es_df = rolling_fhs_multiday_var_es(...)
# plot_var_es_vs_actual_given_actuals(...)

In [None]:
# --- This cell's logic is integrated into run_full_analysis (GARCH part, currently placeholder) ---
# def main_analysis(time_window_size):
# ... (Rolling GARCH calculation and plotting) ...

In [None]:
# --- Informational cell, can be kept or removed ---
print("Original DataFrame Columns:")
print(main_df_initial.columns)
print("\nOriginal DataFrame Head:")
print(main_df_initial.head(3))

In [None]:
# --- This cell checks mean return assumption, can be kept if desired ---
# main_df['Portfolio_Daily_Returns'] = ...
# plt.figure(figsize=(12, 6))
# ... (Plotting rolling mean returns) ...

In [None]:
# --- This cell's logic is now called within run_full_analysis ---
# run_backtesting(main_df, var_results_df, es_results_df)

In [None]:
# --- Comment cell, integrated into stress testing functions ---

In [None]:
# --- Comment cell, integrated into stress testing functions ---

In [None]:
# --- Comment cell, integrated into stress testing functions ---