In [1]:
import pandas as pd
import numpy as np

In [2]:
#read feather file
df = pd.read_feather('sp100_sequences_pct_change.feather')

In [3]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [4]:
df_test = df[df['Date'] > '2018-01-01']


In [5]:
df_test.shape
df_test.describe()
df_test.info()
df_test.columns
print(df_test.head(5))
print(df_test.tail(5))

<class 'pandas.core.frame.DataFrame'>
Index: 181857 entries, 4528 to 573484
Columns: 1003 entries, Ticker to Date t
dtypes: datetime64[ns](1), float64(1001), object(1)
memory usage: 1.4+ GB
     Ticker       Date  Date t-1001  Date t-1000  Date t-999  Date t-998  \
4528   AAPL 2018-01-02     0.005235     0.019899    0.020077   -0.005580   
4529   AAPL 2018-01-03     0.019899     0.020077   -0.005580   -0.024501   
4530   AAPL 2018-01-04     0.020077    -0.005580   -0.024501    0.015536   
4531   AAPL 2018-01-05    -0.005580    -0.024501    0.015536    0.004444   
4532   AAPL 2018-01-08    -0.024501     0.015536    0.004444    0.008468   

      Date t-997  Date t-996  Date t-995  Date t-994  Date t-993  Date t-992  \
4528   -0.024501    0.015536    0.004444    0.008468   -0.018177    0.008112   
4529    0.015536    0.004444    0.008468   -0.018177    0.008112   -0.079927   
4530    0.004444    0.008468   -0.018177    0.008112   -0.079927   -0.011353   
4531    0.008468   -0.018177    0

In [6]:
# list of calculations for each ticker
violations = np.zeros(len(df_test.Ticker.unique()))
# saves the dates of the violations for each ticker
Violation_dates = []

data_len = df_test.groupby('Ticker').size()
print(data_len)

Ticker
AAPL     1821
ABBV     1821
ABT      1821
ACN      1821
ADBE     1821
AIG      1821
AMD      1821
AMGN     1821
AMT      1821
AMZN     1821
AVGO     1821
AXP      1821
BA       1821
BAC      1821
BK       1821
BKNG     1821
BLK      1821
BMY      1821
BRK-B    1821
C        1821
CAT      1821
CHTR     1821
CL       1821
CMCSA    1821
COF      1821
COP      1821
COST     1821
CRM      1821
CSCO     1821
CVS      1821
CVX      1821
DE       1821
DHR      1821
DIS      1821
DUK      1821
EMR      1821
FDX      1821
GD       1821
GE       1821
GILD     1821
GM       1821
GOOG     1821
GOOGL    1821
GS       1821
HD       1821
HON      1821
IBM      1821
INTC     1821
INTU     1821
ISRG     1821
JNJ      1821
JPM      1821
KO       1821
LIN      1821
LLY      1821
LMT      1821
LOW      1821
MA       1821
MCD      1821
MDLZ     1821
MDT      1821
MET      1821
META     1821
MMM      1821
MO       1821
MRK      1821
MS       1821
MSFT     1821
NEE      1821
NFLX     1821
NKE      1821

In [7]:
quantiles_to_test = [0.001, 0.01, 0.02, 0.05, 0.1]
violations_dict = {}
violation_dates_dict = {}

tickers = sorted(df_test.Ticker.unique())

for q in quantiles_to_test:
    violations = []
    violation_dates = []
    
    for ticker in tickers:
        df_ticker = df_test[df_test['Ticker'] == ticker].copy()
        dates = df_ticker['Date'].copy()
        target_y = df_ticker.iloc[:, 1002].to_numpy()  # shape (n_days,)
        feature_data = df_ticker.iloc[:, 2:1001].to_numpy()  # shape (n_days, 999)

        # Compute historical VaR (quantile) **per day**
        q_values = np.quantile(feature_data, q, axis=1)  # shape (n_days,)

        mask = target_y < q_values  # violations on each day

        violations.append(np.sum(mask))
        violation_dates.append(dates[mask].tolist())

    violations_df = pd.DataFrame({'violations': violations}, index=tickers)
    violations_df['violations'] = violations_df['violations'].astype(int)

    violations_dict[q] = violations_df
    violation_dates_dict[q] = violation_dates


## Statistical Tests for Performance Comparison for VaR
### 1. Binomial Test
### 2. Unconditional Coverage Test (Kupiec Test)
### 3. Conditional Coverage Test (Christoffersen Test)
### 4. Dynamic Quantile Test (DQT)

In [None]:
# Binomial test for the number of violations
# We will use the binomial test to check if the number of violations is significantly different from what we would expect under the null hypothesis.
from scipy.stats import binomtest

alphas = [0.01, 0.02, 0.05, 0.1]
data_len = df_test.groupby('Ticker').size().reindex(tickers).to_numpy()

for q in quantiles_to_test:
    print(f"\n### Results for quantile = {q} ###")
    p = q
    violations = violations_dict[q]
    
    non_rejections_left = [0] * len(alphas)
    non_rejections_right = [0] * len(alphas)

    for a_idx, alpha in enumerate(alphas):
        for i in range(len(data_len)):
            n = data_len[i]
            k = violations.iloc[i, 0]

            # Left-sided test
            pval_left = binomtest(k, n, p=p, alternative='less').pvalue
            if pval_left > alpha:
                non_rejections_left[a_idx] += 1

            # Right-sided test
            pval_right = binomtest(k, n, p=p, alternative='greater').pvalue
            if pval_right > alpha:
                non_rejections_right[a_idx] += 1

    # Print results for this quantile
    for a_idx, alpha in enumerate(alphas):
        print(f"Alpha: {alpha}")
        print(f"  Left-sided non-rejections: {non_rejections_left[a_idx]} / {len(data_len)}")
        print(f"  Right-sided non-rejections: {non_rejections_right[a_idx]} / {len(data_len)}")



### Results for quantile = 0.001 ###
Alpha: 0.01
  Left-sided non-rejections: 101 / 101
  Right-sided non-rejections: 75 / 101
Alpha: 0.02
  Left-sided non-rejections: 101 / 101
  Right-sided non-rejections: 49 / 101
Alpha: 0.05
  Left-sided non-rejections: 101 / 101
  Right-sided non-rejections: 25 / 101
Alpha: 0.1
  Left-sided non-rejections: 101 / 101
  Right-sided non-rejections: 25 / 101

### Results for quantile = 0.01 ###
Alpha: 0.01
  Left-sided non-rejections: 101 / 101
  Right-sided non-rejections: 77 / 101
Alpha: 0.02
  Left-sided non-rejections: 101 / 101
  Right-sided non-rejections: 67 / 101
Alpha: 0.05
  Left-sided non-rejections: 101 / 101
  Right-sided non-rejections: 48 / 101
Alpha: 0.1
  Left-sided non-rejections: 101 / 101
  Right-sided non-rejections: 35 / 101

### Results for quantile = 0.02 ###
Alpha: 0.01
  Left-sided non-rejections: 101 / 101
  Right-sided non-rejections: 75 / 101
Alpha: 0.02
  Left-sided non-rejections: 101 / 101
  Right-sided non-rejections:

In [9]:
# Unconditional coverage test
from scipy.stats import chi2
import numpy as np
import pandas as pd

# --- Preliminaries ---
# Ensure the tickers order is consistent between your violation calculations and the n_obs counts.
tickers = sorted(df_test.Ticker.unique())
# Recompute observation count per ticker in the same order:
data_len = df_test.groupby('Ticker').size().reindex(tickers).to_numpy()

# Define the list of quantiles (each quantile q sets the target violation rate p0 = q)
quantiles_to_test = [0.001, 0.01, 0.02, 0.05, 0.1]
# alphas for the test decision
alphas = [0.01, 0.02, 0.05, 0.1]

# We'll use the violations_dict created previously.
# (If you did not store the number of observations along with violations,
#  we rely on the data_len computed above.)

# --- Kupiec Unconditional Coverage Test ---
for q in quantiles_to_test:
    print(f"\n### Unconditional Coverage (Kupiec) Test for quantile = {q} ###")
    p0 = q  # target violation probability under H0
    df_viols = violations_dict[q]  # DataFrame of violation counts per ticker
    # Initialize counters for non-rejections for each alpha level:
    non_rejections_uc = [0] * len(alphas)
    
    for a_idx, alpha in enumerate(alphas):
        # Loop over each ticker (using the same order as data_len and violations_df)
        for i in range(len(data_len)):
            n = data_len[i]
            x = df_viols.iloc[i, 0]  # violation count for ticker i
            # Empirical violation rate:
            p_hat = x / n if n > 0 else 0
            
            # Handle edge cases: if x==0 or x==n, the ratio terms become 0/0.
            # By convention, if x==0, set the corresponding log term to 0;
            # likewise for x==n.
            if x == 0:
                # When x==0, p_hat==0. Use the limit value:
                LR_uc = -2 * np.log( ((1-p0)**n) / (1**n) )
            elif x == n:
                LR_uc = -2 * np.log( (p0**n) / (1**n) )
            else:
                LR_uc = -2 * ( x * np.log(p0 / p_hat) + (n - x) * np.log((1-p0) / (1-p_hat)) )
            
            # p-value from chi-square distribution with 1 degree of freedom:
            pval = 1 - chi2.cdf(LR_uc, df=1)
            if pval > alpha:
                non_rejections_uc[a_idx] += 1
    
    # Print summary results for this quantile:
    for a_idx, alpha in enumerate(alphas):
        print(f"Alpha: {alpha}")
        print(f"  Non-rejections: {non_rejections_uc[a_idx]} / {len(data_len)}")



### Unconditional Coverage (Kupiec) Test for quantile = 0.001 ###
Alpha: 0.01
  Non-rejections: 75 / 101
Alpha: 0.02
  Non-rejections: 49 / 101
Alpha: 0.05
  Non-rejections: 49 / 101
Alpha: 0.1
  Non-rejections: 25 / 101

### Unconditional Coverage (Kupiec) Test for quantile = 0.01 ###
Alpha: 0.01
  Non-rejections: 82 / 101
Alpha: 0.02
  Non-rejections: 71 / 101
Alpha: 0.05
  Non-rejections: 67 / 101
Alpha: 0.1
  Non-rejections: 48 / 101

### Unconditional Coverage (Kupiec) Test for quantile = 0.02 ###
Alpha: 0.01
  Non-rejections: 78 / 101
Alpha: 0.02
  Non-rejections: 75 / 101
Alpha: 0.05
  Non-rejections: 60 / 101
Alpha: 0.1
  Non-rejections: 48 / 101

### Unconditional Coverage (Kupiec) Test for quantile = 0.05 ###
Alpha: 0.01
  Non-rejections: 76 / 101
Alpha: 0.02
  Non-rejections: 68 / 101
Alpha: 0.05
  Non-rejections: 54 / 101
Alpha: 0.1
  Non-rejections: 46 / 101

### Unconditional Coverage (Kupiec) Test for quantile = 0.1 ###
Alpha: 0.01
  Non-rejections: 68 / 101
Alpha: 0.02

In [10]:
#conditional coverage test

from scipy.stats import chi2
import numpy as np
import pandas as pd

# Ensure consistent ordering for tickers:
tickers = sorted(df_test.Ticker.unique())

# Define the list of quantiles to test; each quantile q sets the target violation rate p0 = q.
quantiles_to_test = [0.001, 0.01, 0.02, 0.05, 0.1]
# Significance levels for our tests:
alphas = [0.01, 0.02, 0.05, 0.1]

print("\n### Conditional Coverage (Christoffersen) Test ###")
for q in quantiles_to_test:
    print(f"\n### Results for quantile = {q} ###")
    p0 = q  # target violation probability under H0
    
    # Count non-rejections for each alpha (i.e. tickers that do not reject the conditional coverage null)
    non_rejections_cc = [0] * len(alphas)
    
    for a_idx, alpha in enumerate(alphas):
        # Loop over each ticker:
        for ticker in tickers:
            # Select data for the ticker:
            df_ticker = df_test[df_test['Ticker'] == ticker].copy()
            # Extract target series (realized returns/losses) and historical features.
            target_y = df_ticker.iloc[:, 1002].to_numpy()   # shape (T,)
            feature_data = df_ticker.iloc[:, 2:1001].to_numpy()  # shape (T, number_of_features)
            
            # Compute the daily quantile (VaR) from the historical distribution.
            q_values = np.quantile(feature_data, q, axis=1)  # shape (T,)
            # Violation indicator: 1 if realized return < VaR, else 0.
            mask = target_y < q_values  
            X = mask.astype(int)
            n = len(X)
            if n < 2:
                # Not enough observations to compute transitions; skip this ticker.
                continue
            
            # Total violations:
            x = np.sum(X)
            p_hat = x / n  if n > 0 else 0
            
            # --- Compute Unconditional Coverage Statistic LR_uc ---
            # Handle extreme cases: if p_hat is 0 or 1, use the limit values.
            if p_hat == 0 or p_hat == 1:
                if x == 0:
                    LR_uc = -2 * n * np.log(1 - p0)
                else:
                    LR_uc = -2 * n * np.log(p0)
            else:
                LR_uc = -2 * ( x * np.log(p0 / p_hat) + (n - x) * np.log((1 - p0) / (1 - p_hat)) )
            
            # --- Compute Independence (Transition) Test Statistic LR_ind ---
            # Calculate transition counts from time t-1 to t.
            X_prev = X[:-1]
            X_next = X[1:]
            n_00 = np.sum((X_prev == 0) & (X_next == 0))
            n_01 = np.sum((X_prev == 0) & (X_next == 1))
            n_10 = np.sum((X_prev == 1) & (X_next == 0))
            n_11 = np.sum((X_prev == 1) & (X_next == 1))
            
            # Estimate transition probabilities:
            if (n_00 + n_01) > 0:
                p01 = n_01 / (n_00 + n_01)
            else:
                p01 = 0.0
            if (n_10 + n_11) > 0:
                p11 = n_11 / (n_10 + n_11)
            else:
                p11 = 0.0
            
            # Log-likelihood under the alternative:
            logL1 = 0.0
            if (n_00 + n_01) > 0:
                if n_01 > 0:
                    logL1 += n_01 * np.log(p01)
                if n_00 > 0:
                    logL1 += n_00 * np.log(1 - p01)
            if (n_10 + n_11) > 0:
                if n_11 > 0:
                    logL1 += n_11 * np.log(p11)
                if n_10 > 0:
                    logL1 += n_10 * np.log(1 - p11)
            
            # Log-likelihood under the null (constant violation probability p_hat):
            # (If p_hat is 0 or 1 the logL0 is set to 0; such extreme cases are rarely encountered.)
            if p_hat == 0 or p_hat == 1:
                logL0 = 0.0
            else:
                logL0 = (n_01 + n_11) * np.log(p_hat) + (n_00 + n_10) * np.log(1 - p_hat)
            
            LR_ind = -2 * (logL0 - logL1)
            
            # --- Conditional Coverage Test Statistic ---
            LR_cc = LR_uc + LR_ind
            # Under H0, LR_cc ~ chi-square with 2 degrees of freedom.
            pval = 1 - chi2.cdf(LR_cc, df=2)
            
            if pval > alpha:
                non_rejections_cc[a_idx] += 1
        
        # Print summary results for the current alpha:
        print(f"Alpha: {alpha}")
        print(f"  Non-rejections: {non_rejections_cc[a_idx]} / {len(tickers)}")



### Conditional Coverage (Christoffersen) Test ###

### Results for quantile = 0.001 ###
Alpha: 0.01
  Non-rejections: 67 / 101
Alpha: 0.02
  Non-rejections: 61 / 101
Alpha: 0.05
  Non-rejections: 40 / 101
Alpha: 0.1
  Non-rejections: 40 / 101

### Results for quantile = 0.01 ###
Alpha: 0.01
  Non-rejections: 56 / 101
Alpha: 0.02
  Non-rejections: 51 / 101
Alpha: 0.05
  Non-rejections: 45 / 101
Alpha: 0.1
  Non-rejections: 38 / 101

### Results for quantile = 0.02 ###
Alpha: 0.01
  Non-rejections: 46 / 101
Alpha: 0.02
  Non-rejections: 38 / 101
Alpha: 0.05
  Non-rejections: 26 / 101
Alpha: 0.1
  Non-rejections: 21 / 101

### Results for quantile = 0.05 ###
Alpha: 0.01
  Non-rejections: 36 / 101
Alpha: 0.02
  Non-rejections: 25 / 101
Alpha: 0.05
  Non-rejections: 12 / 101
Alpha: 0.1
  Non-rejections: 11 / 101

### Results for quantile = 0.1 ###
Alpha: 0.01
  Non-rejections: 36 / 101
Alpha: 0.02
  Non-rejections: 26 / 101
Alpha: 0.05
  Non-rejections: 16 / 101
Alpha: 0.1
  Non-rejection

In [11]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.stats import chi2

# Define the number of lags to include in the DQ test regression:
num_lags = 4

# List of quantiles (each quantile q sets the nominal violation rate for VaR)
quantiles_to_test = [0.001, 0.01, 0.02, 0.05, 0.1]
# Significance levels to check
alphas = [0.01, 0.02, 0.05, 0.1]

# Use a sorted list of tickers for consistency:
tickers = sorted(df_test.Ticker.unique())

print("\n### Dynamic Quantile (DQ) Test ###")
for q in quantiles_to_test:
    print(f"\n### Results for quantile = {q} ###")
    
    # Initialize counters for non-rejections for each alpha
    non_rejections_dq = [0] * len(alphas)
    
    # Loop over each ticker
    for ticker in tickers:
        # Extract data for the ticker
        df_ticker = df_test[df_test['Ticker'] == ticker].copy()
        # target_y: realized returns/losses (assumed to be in column index 1002)
        target_y = df_ticker.iloc[:, 1002].to_numpy()  # shape (T,)
        # feature_data: historical data for VaR calculation (assumed to be columns 2 to 1000)
        feature_data = df_ticker.iloc[:, 2:1001].to_numpy()  # shape (T, features)
        
        # Compute the daily VaR quantile from the historical distribution
        q_values = np.quantile(feature_data, q, axis=1)  # shape (T,)
        
        # Hit indicator: 1 if realized return < VaR, else 0.
        Hit = (target_y < q_values).astype(int)
        # Construct the "hit residuals" e_t = Hit - q (the nominal quantile)
        e = Hit - q
        
        T_total = len(e)
        if T_total <= num_lags:
            continue  # Skip if not enough observations for lagged regressors
        
        # Build the regression sample: for t = num_lags to T_total-1,
        # regress e_t on a constant and the lagged values: e[t-1], ..., e[t-num_lags]
        y_reg = e[num_lags:]  # dependent variable; shape (T_total - num_lags,)
        
        # Construct design matrix with lagged e's:
        X_list = []
        for t in range(num_lags, T_total):
            # For time t, get the lags: e[t-1], ..., e[t-num_lags]
            lagged_vals = e[t-num_lags:t]
            X_list.append(lagged_vals)
        X = np.array(X_list)  # shape: (T_total - num_lags, num_lags)
        # Add a constant (intercept) to the design matrix
        X = sm.add_constant(X)  # now shape: (T_total - num_lags, num_lags + 1)
        
        # Run the OLS regression: e_t = constant + beta_1 * e[t-1] + ... + beta_{num_lags} * e[t-num_lags] + error
        model = sm.OLS(y_reg, X)
        results = model.fit()
        R2 = results.rsquared
        
        T_reg = len(y_reg)  # number of observations in regression
        # DQ statistic:
        dq_stat = T_reg * R2
        # Degrees of freedom: number of regressors (including constant)
        df_dq = num_lags + 1
        
        pval = 1 - chi2.cdf(dq_stat, df=df_dq)
        
        # For each significance level, if p-value > alpha, we do not reject H0
        for a_idx, alpha in enumerate(alphas):
            if pval > alpha:
                non_rejections_dq[a_idx] += 1
    
    # Print summary results for this quantile
    for a_idx, alpha in enumerate(alphas):
        print(f"Alpha: {alpha}")
        print(f"  Non-rejections: {non_rejections_dq[a_idx]} / {len(tickers)}")



### Dynamic Quantile (DQ) Test ###

### Results for quantile = 0.001 ###
Alpha: 0.01
  Non-rejections: 22 / 101
Alpha: 0.02
  Non-rejections: 22 / 101
Alpha: 0.05
  Non-rejections: 22 / 101
Alpha: 0.1
  Non-rejections: 22 / 101

### Results for quantile = 0.01 ###
Alpha: 0.01
  Non-rejections: 8 / 101
Alpha: 0.02
  Non-rejections: 7 / 101
Alpha: 0.05
  Non-rejections: 5 / 101
Alpha: 0.1
  Non-rejections: 5 / 101

### Results for quantile = 0.02 ###
Alpha: 0.01
  Non-rejections: 7 / 101
Alpha: 0.02
  Non-rejections: 6 / 101
Alpha: 0.05
  Non-rejections: 5 / 101
Alpha: 0.1
  Non-rejections: 4 / 101

### Results for quantile = 0.05 ###
Alpha: 0.01
  Non-rejections: 8 / 101
Alpha: 0.02
  Non-rejections: 7 / 101
Alpha: 0.05
  Non-rejections: 5 / 101
Alpha: 0.1
  Non-rejections: 4 / 101

### Results for quantile = 0.1 ###
Alpha: 0.01
  Non-rejections: 16 / 101
Alpha: 0.02
  Non-rejections: 12 / 101
Alpha: 0.05
  Non-rejections: 8 / 101
Alpha: 0.1
  Non-rejections: 6 / 101


In [12]:
import numpy as np
import pandas as pd

# List of quantile levels you want to test
quantiles_to_test = [0.001, 0.01, 0.02, 0.05, 0.1]

# Convert historical feature columns to a NumPy array (for speed)
hist_data = df_test.iloc[:, 2:1001].to_numpy()
# Extract realized outcomes (assumed to be in column 1002)
target = df_test.iloc[:, 1002].to_numpy()

for q in quantiles_to_test:
    # Compute the forecast (VaR) for each row at quantile q
    forecast = np.quantile(hist_data, q, axis=1)
    
    # Compute pinball loss for each observation
    losses = np.where(target < forecast,
                      (1 - q) * (forecast - target),
                      q * (target - forecast))
    
    # Average pinball loss over all observations
    avg_loss = np.mean(losses)
    print(f"Average Pinball Loss for quantile {q}: {avg_loss:.6f}")


Average Pinball Loss for quantile 0.001: 0.000183
Average Pinball Loss for quantile 0.01: 0.000811
Average Pinball Loss for quantile 0.02: 0.001277
Average Pinball Loss for quantile 0.05: 0.002310
Average Pinball Loss for quantile 0.1: 0.003511
