## Methodology: Portfolio-Weighted Mahalanobis Distance and Shock Decomposition (Covariance Matrix Formulation)

This analysis assesses market shocks from the perspective of a specific portfolio. We define a portfolio-weighted Mahalanobis distance using raw asset returns ($\mathbf{R}_t$) and the historical EWMA covariance matrix ($\mathbf{\Sigma}_{t-1}$), incorporating explicit portfolio weights ($\mathbf{W}$). This distance is then decomposed into a "weighted volatility shock component" (representing the shock if assets were uncorrelated, viewed through portfolio weights) and a "weighted correlation shock component." A separate section will detail how the correlation component links back to the underlying correlation matrix ($\mathbf{\Lambda}_{t-1}$) and standardized returns ($\mathbf{z}_t$) for enhanced interpretability.

### 1. Data Acquisition and Portfolio Definition

* **Asset Selection**: A portfolio of $N$ assets is defined.
* **Portfolio Weights ($\mathbf{w}$)**: A vector of portfolio weights $\mathbf{w} = [w_1, w_2, \dots, w_N]^T$ is defined, where $\sum_{i=1}^{N} w_i = 1$. Let $\mathbf{W}$ be an $N \times N$ diagonal matrix with the weights $w_i$ on its diagonal.
* **Data Download**: Historical daily closing prices ($P_{i,t}$) are downloaded.
* **Return Calculation**: Daily logarithmic returns $\mathbf{R}_t = [R_{1,t}, \dots, R_{N,t}]^T$ are calculated:
$$
R_{i,t} = \ln\left(\frac{P_{i,t}}{P_{i,t-1}}\right)
$$

### 2. EWMA Parameter Estimation (for day $t-1$)

An Exponentially Weighted Moving Average (EWMA) approach is used for parameters based on data up to day $t-1$.

* **EWMA Covariance Matrix ($\mathbf{\Sigma}_{t-1}$)**: The $N \times N$ EWMA covariance matrix of asset returns.
* **EWMA Variances ($\sigma_{i,t-1}^2$)**: The EWMA variance for each asset $i$ is $\sigma_{i,t-1}^2 = (\mathbf{\Sigma}_{t-1})_{ii}$. Let $\mathbf{D}_{t-1}^2$ be the $N \times N$ diagonal matrix of these variances (i.e., $\mathbf{D}_{t-1}^2 = \text{diag}(\mathbf{\Sigma}_{t-1})$). Thus, $\mathbf{D}_{t-1}^{-2}$ is a diagonal matrix with $1/\sigma_{i,t-1}^2$ on its diagonal. The matrix of standard deviations is $\mathbf{D}_{t-1} = \sqrt{\mathbf{D}_{t-1}^2}$.

### 3. Portfolio-Weighted Mahalanobis Distance ($M_{w,t}^2$)

For each day $t$, assuming mean daily returns $\boldsymbol{\mu} \approx \mathbf{0}$:

* **Total Portfolio-Weighted Mahalanobis Distance ($M_{w,t}^2$)**: This metric measures the "unusualness" of the raw asset returns $\mathbf{R}_t$ when viewed through the lens of the portfolio weights $\mathbf{W}$ and scaled by the full historical covariance structure $\mathbf{\Sigma}_{t-1}$. It is defined as:
$$
M_{w,t}^2 = \mathbf{R}_t^T \mathbf{W} \mathbf{\Sigma}_{t-1}^{-1} \mathbf{W} \mathbf{R}_t
$$
A high value of $M_{w,t}^2$ indicates that the observed joint returns, as weighted by the portfolio structure, represent a significant deviation from historical patterns of volatility and co-movement.

### 4. Decomposition of Portfolio-Weighted Mahalanobis Distance ($M_{w,t}^2$)

The total weighted shock $M_{w,t}^2$ can be decomposed as follows:

<ul>
    <li><strong>Weighted Volatility Shock Component ($VS_{w,t}$):</strong>
        <ul>
            <li><strong>Intuition</strong>: This component represents the portion of the total shock that can be attributed to the magnitude of individual asset returns, scaled by their portfolio weights and individual historical volatilities, <em>as if the assets were uncorrelated</em>.</li>
            <li><strong>Formulation</strong>: It is the portfolio-weighted Mahalanobis distance calculated using only the diagonal part of the covariance matrix ($\mathbf{D}_{t-1}^2$, representing variances only and zero covariances):
            $$
            VS_{w,t} = \mathbf{R}_t^T \mathbf{W} (\mathbf{D}_{t-1}^2)^{-1} \mathbf{W} \mathbf{R}_t = \mathbf{R}_t^T \mathbf{W} \mathbf{D}_{t-1}^{-2} \mathbf{W} \mathbf{R}_t
            $$
            </li>
            <li><strong>Interpretation</strong>: Expanding this, $VS_{w,t} = \sum_{i=1}^{N} w_i^2 \left(\frac{R_{i,t}}{\sigma_{i,t-1}}\right)^2$. This shows it represents the sum of squared portfolio-weighted standardized returns. A large $VS_{w,t}$ indicates that significant contributions to the shock came from large individual asset returns relative to their volatilities, amplified by their squared portfolio weights.</li>
        </ul>
    </li>
    <li><strong>Weighted Correlation Shock Component ($CS_{w,t}$):</strong>
        <ul>
            <li><strong>Intuition</strong>: This component quantifies the portion of the total shock that arises specifically because asset returns <em>are</em> correlated (i.e., their covariances are non-zero), rather than moving independently. It measures the impact of the historical inter-asset co-movements (captured by the off-diagonal elements of $\mathbf{\Sigma}_{t-1}$) on the total shock, beyond what individual variances explain.</li>
            <li><strong>Formulation</strong>: It is the difference between the total weighted Mahalanobis distance (which uses the full covariance matrix $\mathbf{\Sigma}_{t-1}$) and the weighted volatility shock component (which effectively assumes zero covariances):
            $$
            CS_{w,t} = M_{w,t}^2 - VS_{w,t}
            $$
            Substituting the expressions:
            $$
            CS_{w,t} = \mathbf{R}_t^T \mathbf{W} \mathbf{\Sigma}_{t-1}^{-1} \mathbf{W} \mathbf{R}_t - \mathbf{R}_t^T \mathbf{W} \mathbf{D}_{t-1}^{-2} \mathbf{W} \mathbf{R}_t
            $$
            This can be factored as:
            $$
            CS_{w,t} = \mathbf{R}_t^T \mathbf{W} \left(\mathbf{\Sigma}_{t-1}^{-1} - \mathbf{D}_{t-1}^{-2}\right) \mathbf{W} \mathbf{R}_t
            $$
            </li>
            <li><strong>Brief Interpretation</strong>: A large positive $CS_{w,t}$ indicates that historical co-movements (correlations) amplified the overall portfolio-weighted shock. A large negative $CS_{w,t}$ suggests they dampened it. The detailed connection to the correlation matrix $\mathbf{\Lambda}_{t-1}$ and standardized returns is explored in Section 5.</li>
        </ul>
    </li>
</ul>

### 5. Interpreting the Weighted Correlation Shock Component ($CS_{w,t}$)

<p>This section details the connection of the Weighted Correlation Shock Component ($CS_{w,t}$) to the underlying EWMA correlation matrix ($\mathbf{\Lambda}_{t-1}$) and standardized asset returns ($\mathbf{z}_t$), providing a deeper understanding of its meaning.</p>
<ul>
    <li><strong>Key Result</strong>: The Weighted Correlation Shock Component, $CS_{w,t}$, initially defined in Section 4 using the covariance matrix $\mathbf{\Sigma}_{t-1}$ and raw returns $\mathbf{R}_t$, can be equivalently expressed in terms of standardized returns ($\mathbf{z}_t$), portfolio weights ($\mathbf{W}$), and the inverse of the historical EWMA correlation matrix ($\mathbf{\Lambda}_{t-1}^{-1}$) as follows:
    $$
    CS_{w,t} = \mathbf{z}_t^T \mathbf{W} (\mathbf{\Lambda}_{t-1}^{-1} - \mathbf{I}) \mathbf{W} \mathbf{z}_t
    $$
    In this form, $\mathbf{z}_t = \mathbf{D}_{t-1}^{-1}\mathbf{R}_t$ are the standardized returns, $\mathbf{W}$ is the diagonal matrix of portfolio weights, $\mathbf{\Lambda}_{t-1}$ is the EWMA correlation matrix, and $\mathbf{I}$ is the identity matrix. This expression explicitly highlights how $CS_{w,t}$ is driven by the interaction of the historical correlation structure with portfolio-weighted standardized returns.
    </li>
    <li><strong>Derivation of the Equivalent Expression for $CS_{w,t}$</strong>:
        <ul>
            <li>We start with the definition of $CS_{w,t}$ from Section 4:
            $$
            CS_{w,t} = \mathbf{R}_t^T \mathbf{W} \left(\mathbf{\Sigma}_{t-1}^{-1} - \mathbf{D}_{t-1}^{-2}\right) \mathbf{W} \mathbf{R}_t
            $$
            </li>
            <li>The term $(\mathbf{\Sigma}_{t-1}^{-1} - \mathbf{D}_{t-1}^{-2})$ isolates the impact of covariances. This term can be rewritten using the correlation matrix $\mathbf{\Lambda}_{t-1} = \mathbf{D}_{t-1}^{-1}\mathbf{\Sigma}_{t-1}\mathbf{D}_{t-1}^{-1}$ and the identity matrix $\mathbf{I}$ via the identity:
            $$
            \mathbf{\Sigma}_{t-1}^{-1} - \mathbf{D}_{t-1}^{-2} = \mathbf{D}_{t-1}^{-1}(\mathbf{\Lambda}_{t-1}^{-1} - \mathbf{I})\mathbf{D}_{t-1}^{-1}
            $$
            </li>
            <li>Substituting this identity into the expression for $CS_{w,t}$:
            $$
            CS_{w,t} = \mathbf{R}_t^T \mathbf{W} \left[ \mathbf{D}_{t-1}^{-1}(\mathbf{\Lambda}_{t-1}^{-1} - \mathbf{I})\mathbf{D}_{t-1}^{-1} \right] \mathbf{W} \mathbf{R}_t
            $$
            </li>
            <li>Next, we introduce standardized returns $\mathbf{z}_t = \mathbf{D}_{t-1}^{-1}\mathbf{R}_t$, which implies $\mathbf{R}_t = \mathbf{D}_{t-1}\mathbf{z}_t$. Since $\mathbf{D}_{t-1}$ is diagonal, $\mathbf{R}_t^T = \mathbf{z}_t^T \mathbf{D}_{t-1}$. Substituting these:
            $$
            CS_{w,t} = (\mathbf{D}_{t-1}\mathbf{z}_t)^T \mathbf{W} \mathbf{D}_{t-1}^{-1}(\mathbf{\Lambda}_{t-1}^{-1} - \mathbf{I})\mathbf{D}_{t-1}^{-1} \mathbf{W} (\mathbf{D}_{t-1}\mathbf{z}_t)
            $$
            </li>
            <li>This expression simplifies to:
            $$
            CS_{w,t} = \mathbf{z}_t^T (\mathbf{D}_{t-1} \mathbf{W} \mathbf{D}_{t-1}^{-1}) (\mathbf{\Lambda}_{t-1}^{-1} - \mathbf{I}) (\mathbf{D}_{t-1}^{-1} \mathbf{W} \mathbf{D}_{t-1}) \mathbf{z}_t
            $$
            </li>
            <li>Given that both $\mathbf{D}_{t-1}$ (diagonal matrix of $\sigma_{i,t-1}$) and $\mathbf{W}$ (diagonal matrix of $w_i$) are diagonal matrices, the products involving them simplify: $(\mathbf{D}_{t-1} \mathbf{W} \mathbf{D}_{t-1}^{-1}) = \mathbf{W}$ and $(\mathbf{D}_{t-1}^{-1} \mathbf{W} \mathbf{D}_{t-1}) = \mathbf{W}$.</li>
            <li>Applying these simplifications leads to the final equivalent expression stated in the key result:
            $$
            CS_{w,t} = \mathbf{z}_t^T \mathbf{W} (\mathbf{\Lambda}_{t-1}^{-1} - \mathbf{I}) \mathbf{W} \mathbf{z}_t
            $$
            </li>
        </ul>
    </li>
    <li><strong>Concluding Interpretation</strong>: This derived form explicitly shows that the Weighted Correlation Shock Component $CS_{w,t}$ quantifies how deviations of the historical correlation structure (represented by $\mathbf{\Lambda}_{t-1}^{-1}$) from an uncorrelated structure (represented by $\mathbf{I}$) interact with the portfolio-weighted standardized returns ($\mathbf{W}\mathbf{z}_t$).</li>
</ul>
<p>By plotting $M_{w,t}^2$, $VS_{w,t}$, and $CS_{w,t}$ over time, we analyze portfolio-specific shocks, separating the impact of weighted individual asset variances from the impact of their historical co-movements as captured by the covariance structure. The interpretation of $CS_{w,t}$ is enriched by understanding its direct link to the correlation matrix $\mathbf{\Lambda}_{t-1}$ and standardized returns.</p>

* **Add ptfl weights**
* **Add Shapley Value**
https://en.wikipedia.org/wiki/Shapley_value#:~:text=Shapley%20value%20regression%20is%20a,predictive%20power%20of%20the%20model.

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

# --- Configuration ---
tickers = ['XLE', 'XLF', 'XLU', 'XLI', 'XLK', 'XLV', 'XLY', 'XLP', 'XLB']
start_date = '2004-01-01'
# Use a fixed end date for reproducibility if sharing output, or today for latest data
# end_date = pd.Timestamp.today().strftime('%Y-%m-%d')
end_date = '2025-05-27' # Using the date from your last output for consistency

HALFLIFE = 126 # Approximately 6 months (trading days)
MIN_PERIODS_COV = HALFLIFE # Minimum periods for EWMA covariance matrix

# --- Portfolio Weights ---
N_ASSETS_from_tickers = len(tickers)
# For this example, using an equally weighted portfolio.
w_portfolio_array = np.array([1/N_ASSETS_from_tickers] * N_ASSETS_from_tickers)
W_diag_matrix = np.diag(w_portfolio_array) # This is \mathbf{W}

# --- !! DIAGNOSTIC CONTROL !! ---
VERBOSE_DEBUG_LOOP = False # Keep False for cleaner summary output, True for deep dive
PRINT_SERIES_DEBUG = True # Set to False if output is too long, True for debugging NaN issues

# --- 1. Download Data ---
data = pd.DataFrame()
print("--- Data Acquisition and Initial Processing ---")
try:
    print(f"Attempting to download data for: {', '.join(tickers)}")
    print(f"Date range: {start_date} to {end_date}")
    _raw_data = yf.download(tickers, start=start_date, end=end_date, progress=False)

    if _raw_data.empty:
        raise ValueError("yf.download returned an empty DataFrame.")

    if isinstance(_raw_data.columns, pd.MultiIndex):
        data = _raw_data.xs('Close', level=0, axis=1)
    elif len(tickers) == 1 and 'Close' in _raw_data.columns:
        data = _raw_data[['Close']]
        data.columns = tickers
    elif all(ticker in _raw_data.columns for ticker in tickers) and len(tickers) == len(_raw_data.columns):
        data = _raw_data
    else:
        print("Downloaded data columns structure is unexpected. Raw columns:")
        print(_raw_data.columns)
        raise ValueError("Could not reliably extract 'Close' prices.")

    if data.empty or data.isnull().all().all():
        raise ValueError("Extracted 'Close' price data is empty or all NaN.")

    data = data[tickers]
    initial_rows_before_dropna = len(data)
    data = data.dropna(axis=0, how='any')
    print(f"Data after dropna(axis=0, how='any'). Shape: {data.shape}. Rows dropped: {initial_rows_before_dropna - len(data)}")

    if data.shape[0] < MIN_PERIODS_COV + 5:
         raise ValueError(f"Insufficient data rows ({data.shape[0]}) after cleaning for EWMA. Need at least {MIN_PERIODS_COV + 5} rows.")

except Exception as e:
    print(f"CRITICAL ERROR during data acquisition or initial processing: {e}")
    exit()

print(f"Data successfully processed. Final price data usable: from {data.index.min().strftime('%Y-%m-%d')} to {data.index.max().strftime('%Y-%m-%d')}")
print(f"Shape of final processed price data: {data.shape}")

# --- 2. Calculate Returns ---
print("\n--- Return Calculation ---")
returns = np.log(data / data.shift(1)).dropna()
print(f"Shape of returns data: {returns.shape}")
print(f"Returns data from {returns.index.min().strftime('%Y-%m-%d')} to {returns.index.max().strftime('%Y-%m-%d')}")

if returns.shape[0] < MIN_PERIODS_COV + 1:
    print(f"CRITICAL ERROR: Insufficient returns data for analysis (have {returns.shape[0]} days, need at least {MIN_PERIODS_COV + 1} days). Exiting.")
    exit()

N_ASSETS = returns.shape[1]
if N_ASSETS != len(w_portfolio_array):
    print(f"CRITICAL ERROR: Mismatch between number of assets from data ({N_ASSETS}) and portfolio weights length ({len(w_portfolio_array)}).")
    exit()
print(f"Number of assets (N_ASSETS): {N_ASSETS}")

# --- 3. EWMA Covariance Series Calculation ---
print("\n--- EWMA Covariance Series Calculation ---")
print(f"Calculating EWMA covariance series with halflife={HALFLIFE}, min_periods={MIN_PERIODS_COV}")
ewma_cov_full_series = returns.ewm(halflife=HALFLIFE, min_periods=MIN_PERIODS_COV).cov()
if ewma_cov_full_series.dropna(how='all').empty:
    print("CRITICAL ERROR: EWMA covariance series is entirely NaN after calculation. Check min_periods and data quality.")
    exit()
valid_ewma_cov_dates = ewma_cov_full_series.index.get_level_values(0).unique()
if valid_ewma_cov_dates.empty:
    print("CRITICAL ERROR: No valid dates found in EWMA covariance series index.")
    exit()
first_valid_cov_date_in_series = valid_ewma_cov_dates.min()
print(f"First date with non-NaN EWMA covariance data in series: {first_valid_cov_date_in_series.strftime('%Y-%m-%d')}")


# --- Main Calculation Loop ---
print("\n--- Main Calculation Loop for Portfolio-Weighted Mahalanobis Distance & Components ---")
mah_dist_weighted_sq_rigorous = pd.Series(index=returns.index, dtype=float, name="M2_Weighted_Rigorous")
vol_shock_weighted_contrib_rigorous = pd.Series(index=returns.index, dtype=float, name="VS_Weighted_Rigorous")
corr_shock_weighted_contrib_rigorous = pd.Series(index=returns.index, dtype=float, name="CS_Weighted_Rigorous")

processed_days_count = 0
skipped_days_summary = {
    "key_error_cov_lookup": 0, "type_error_sigma_lookup":0, "other_exc_sigma_lookup": 0,
    "nan_inf_Sigma_t_minus_1": 0, "non_pos_variance": 0,
    "ill_conditioned_Sigma": 0, "singular_Sigma_inversion": 0, "nan_inf_M2_result": 0
}
total_loop_iterations = 0
loop_start_index_val = 0
try:
    loop_start_date_for_t = returns.index[returns.index.get_loc(first_valid_cov_date_in_series) + 1]
    loop_start_index_val = returns.index.get_loc(loop_start_date_for_t)
except KeyError:
    print(f"CRITICAL ERROR: First valid EWMA cov date {first_valid_cov_date_in_series} or next day not in returns index. Alignment issue.")
    exit()

if len(returns.index) > loop_start_index_val :
    print(f"Looping from returns.index position {loop_start_index_val} to {len(returns.index) - 1}.")

    for i in range(loop_start_index_val, len(returns.index)):
        total_loop_iterations += 1
        t_date = returns.index[i]
        t_minus_1_date = returns.index[i-1]

        if VERBOSE_DEBUG_LOOP and total_loop_iterations % 250 == 0 :
            print(f"  Verbose: Processing t_date: {t_date.strftime('%Y-%m-%d')}")

        R_t_vector = returns.loc[t_date].values.reshape(N_ASSETS, 1) # Ensure column vector

        Sigma_t_minus_1 = np.array([])
        try:
            Sigma_t_minus_1_df_from_lookup = ewma_cov_full_series.loc[t_minus_1_date]
            if not isinstance(Sigma_t_minus_1_df_from_lookup, pd.DataFrame):
                skipped_days_summary["type_error_sigma_lookup"] += 1; continue
            Sigma_t_minus_1 = Sigma_t_minus_1_df_from_lookup.reindex(index=tickers, columns=tickers).values
        except KeyError:
            skipped_days_summary["key_error_cov_lookup"] += 1; continue
        except Exception as e:
            skipped_days_summary["other_exc_sigma_lookup"] += 1; continue

        if Sigma_t_minus_1.size == 0:
             skipped_days_summary["other_exc_sigma_lookup"] += 1; continue
        if np.isnan(Sigma_t_minus_1).any() or np.isinf(Sigma_t_minus_1).any():
            skipped_days_summary["nan_inf_Sigma_t_minus_1"] += 1; continue

        # Check condition number of Sigma_t_minus_1
        cond_Sigma = np.linalg.cond(Sigma_t_minus_1)
        if cond_Sigma > 1e10: # Threshold for ill-conditioning
            if VERBOSE_DEBUG_LOOP: print(f"  DEBUG: {t_date.strftime('%Y-%m-%d')}: Sigma_t-1 ill-conditioned (cond={cond_Sigma:.2e}). Skipping.")
            skipped_days_summary["ill_conditioned_Sigma"] += 1
            continue

        # Invert Sigma_t_minus_1
        Sigma_t_minus_1_inv = np.array([])
        try:
            Sigma_t_minus_1_inv = np.linalg.inv(Sigma_t_minus_1)
        except np.linalg.LinAlgError:
            if VERBOSE_DEBUG_LOOP: print(f"  DEBUG: {t_date.strftime('%Y-%m-%d')}: Singular Sigma_t-1 (LinAlgError). Skipping.")
            skipped_days_summary["singular_Sigma_inversion"] += 1
            continue
        except Exception as e:
            if VERBOSE_DEBUG_LOOP: print(f"  DEBUG: {t_date.strftime('%Y-%m-%d')}: Exception during Sigma_t-1 inversion: {e}. Skipping.")
            skipped_days_summary["singular_Sigma_inversion"] += 1
            continue
        if Sigma_t_minus_1_inv.size == 0:
            skipped_days_summary["singular_Sigma_inversion"] += 1; continue

        # --- CALCULATIONS USING COVARIANCE MATRIX DIRECTLY ---
        # M_w_t^2 = R_t^T @ W @ Sigma_inv @ W @ R_t
        M_w_t_sq_val = (R_t_vector.T @ W_diag_matrix @ Sigma_t_minus_1_inv @ W_diag_matrix @ R_t_vector).item()

        if np.isnan(M_w_t_sq_val) or np.isinf(M_w_t_sq_val):
            if VERBOSE_DEBUG_LOOP: print(f"  DEBUG: {t_date.strftime('%Y-%m-%d')}: M_w_t_sq_val is NaN or Inf. Skipping.")
            skipped_days_summary["nan_inf_M2_result"] += 1
            continue
        mah_dist_weighted_sq_rigorous.loc[t_date] = M_w_t_sq_val

        # Calculate D_t_minus_1_sq (diagonal matrix of variances from Sigma_t-1)
        diag_Sigma_t_minus_1 = np.diag(Sigma_t_minus_1) # Variances sigma_i^2
        if np.any(diag_Sigma_t_minus_1 <= 1e-12): # Check for non-positive variance again, crucial for D_inv
            if VERBOSE_DEBUG_LOOP: print(f"  DEBUG: {t_date.strftime('%Y-%m-%d')}: Non-positive variance in diag_Sigma_t-1 for D_inv. Skipping.")
            skipped_days_summary["non_pos_variance"] += 1
            continue
        D_t_minus_1_sq_inv_diag_elements = 1.0 / diag_Sigma_t_minus_1 # These are 1/sigma_i^2
        D_t_minus_1_sq_inv = np.diag(D_t_minus_1_sq_inv_diag_elements) # This is D_{t-1}^{-2}

        # VS_w_t = R_t^T @ W @ D_t-1^-2 @ W @ R_t
        VS_w_t_val = (R_t_vector.T @ W_diag_matrix @ D_t_minus_1_sq_inv @ W_diag_matrix @ R_t_vector).item()
        vol_shock_weighted_contrib_rigorous.loc[t_date] = VS_w_t_val

        # CS_w_t = M_w_t^2 - VS_w_t
        CS_w_t_val = M_w_t_sq_val - VS_w_t_val
        corr_shock_weighted_contrib_rigorous.loc[t_date] = CS_w_t_val
        # --- END OF MODIFIED CALCULATIONS ---

        processed_days_count += 1
else:
    print(f"CRITICAL WARNING: Not enough data to run the main calculation loop. Returns length={len(returns.index)}, loop_start_index_val={loop_start_index_val}")

print(f"\nLoop finished. Total iterations attempted in loop: {total_loop_iterations}")
print(f"Total days successfully processed and values stored: {processed_days_count}")
print("Summary of reasons for skipping days during calculation:")
for reason, count in skipped_days_summary.items():
    if count > 0:
        print(f"  - Skipped due to '{reason}': {count} days")

print("\n--- Rigorous Series before dropna() ---")
def print_series_info(series, name):
    if not series.empty:
        print(f"{name}: Length={len(series)}, Non-NaNs={series.count()} (NaNs={series.isna().sum()})")
        if PRINT_SERIES_DEBUG and series.count() > 0 :
            print(f"  Head of {name} (non-NaN):\n{series.dropna().head()}")
            print(f"  Tail of {name} (non-NaN):\n{series.dropna().tail()}")
        elif PRINT_SERIES_DEBUG:
            print(f"  {name} contains no non-NaN values to show head/tail.")
    else:
        print(f"{name} is empty.")

print_series_info(mah_dist_weighted_sq_rigorous, "mah_dist_weighted_sq_rigorous")
print_series_info(vol_shock_weighted_contrib_rigorous, "vol_shock_weighted_contrib_rigorous")
print_series_info(corr_shock_weighted_contrib_rigorous, "corr_shock_weighted_contrib_rigorous")

print("\n--- Final Data Preparation for Plotting ---")
mah_dist_sq_final = mah_dist_weighted_sq_rigorous.dropna()
vol_shock_contrib_final = vol_shock_weighted_contrib_rigorous.dropna()
corr_shock_contrib_final = corr_shock_weighted_contrib_rigorous.dropna()

common_valid_index = pd.Index([])
if not mah_dist_sq_final.empty:
    temp_idx = mah_dist_sq_final.index
    if not vol_shock_contrib_final.empty:
        temp_idx = temp_idx.intersection(vol_shock_contrib_final.index)
    if not corr_shock_contrib_final.empty:
        temp_idx = temp_idx.intersection(corr_shock_contrib_final.index)
    common_valid_index = temp_idx
    print(f"Length of common valid index for plotting: {len(common_valid_index)}")
else:
    print("Weighted Mahalanobis distance series (M_w2_final) is empty after dropna. Cannot plot.")

if common_valid_index.empty:
    print("\nCRITICAL WARNING: No common dates for plotting after dropna(), or M_w2_final is empty.")
    if mah_dist_sq_final.empty: print("  Weighted Mahalanobis distance final series is empty.")
    if vol_shock_contrib_final.empty: print("  Weighted Volatility shock final series is empty.")
    if corr_shock_contrib_final.empty: print("  Weighted Correlation shock final series is empty.")
    exit()

mah_dist_plot = mah_dist_sq_final.reindex(common_valid_index)
vol_shock_plot = vol_shock_contrib_final.reindex(common_valid_index).fillna(0)
corr_shock_plot = corr_shock_contrib_final.reindex(common_valid_index).fillna(0)

print(f"\nFinal series lengths for plotting: M_w^2={len(mah_dist_plot)}, VS_w={len(vol_shock_plot)}, CS_w={len(corr_shock_plot)}")

if len(mah_dist_plot) == 0 :
    print("\nCRITICAL ERROR: No data to plot. Weighted Mahalanobis distance plot series is empty.")
    exit()

# --- Plotting ---
print("\n--- Plotting Results (Portfolio-Weighted, Covariance Formulation) ---")
fig, axs = plt.subplots(3, 1, figsize=(15, 12), sharex=True)

axs[0].plot(mah_dist_plot.index, mah_dist_plot, label=r'$M_{w,t}^2$', color='blue', linewidth=1)
axs[0].set_title(f'Portfolio-Weighted Mahalanobis Distance ($M_{{w,t}}^2 = \mathbf{{R}}_t^T \mathbf{{W}} \mathbf{{\Sigma}}_{{t-1}}^{{-1}} \mathbf{{W}} \mathbf{{R}}_t$) - EWMA Half-life: {HALFLIFE} days')
axs[0].set_ylabel(r'$M_{w,t}^2$')
axs[0].grid(True, linestyle=':', alpha=0.7)
if not mah_dist_plot.empty:
    threshold_m2 = mah_dist_plot.quantile(0.99)
    axs[0].axhline(threshold_m2, color='red', linestyle='--', linewidth=1, label=f'99th Percentile ({threshold_m2:.2f})')
axs[0].legend()

axs[1].plot(vol_shock_plot.index, vol_shock_plot, label=r'$VS_{w,t}$', color='green', linewidth=1)
axs[1].set_title(f'Weighted Volatility Shock ($VS_{{w,t}} = \mathbf{{R}}_t^T \mathbf{{W}} \mathbf{{D}}_{{t-1}}^{{-2}} \mathbf{{W}} \mathbf{{R}}_t$)')
axs[1].set_ylabel(r'$VS_{w,t}$')
axs[1].grid(True, linestyle=':', alpha=0.7)
if not vol_shock_plot.empty:
    threshold_vs = vol_shock_plot.quantile(0.99)
    axs[1].axhline(threshold_vs, color='red', linestyle='--', linewidth=1, label=f'99th Percentile ({threshold_vs:.2f})')
axs[1].legend()

axs[2].plot(corr_shock_plot.index, corr_shock_plot, label=r'$CS_{w,t}$', color='purple', linewidth=1)
axs[2].set_title(f'Weighted Correlation Shock ($CS_{{w,t}} = M_{{w,t}}^2 - VS_{{w,t}}$)')
axs[2].set_ylabel(r'$CS_{w,t}$')
axs[2].grid(True, linestyle=':', alpha=0.7)
if not corr_shock_plot.empty:
    threshold_cs_upper = corr_shock_plot.quantile(0.99)
    threshold_cs_lower = corr_shock_plot.quantile(0.01)
    axs[2].axhline(threshold_cs_upper, color='red', linestyle='--', linewidth=1, label=f'99th Percentile ({threshold_cs_upper:.2f})')
    axs[2].axhline(threshold_cs_lower, color='orange', linestyle='--', linewidth=1, label=f'1st Percentile ({threshold_cs_lower:.2f})')
axs[2].legend()

for ax in axs:
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
    ax.xaxis.set_major_locator(mdates.YearLocator(base=2))
    ax.xaxis.set_minor_locator(mdates.MonthLocator(bymonth=[1,7]))

fig.autofmt_xdate()
plt.xlabel('Date')
plt.tight_layout()
print("Displaying plot...")
plt.show()

print("\n--- Analysis Complete ---")
print(f"Note: Calculations use lagged (t-1) EWMA covariance (Sigma_t-1) and variances (D_t-1^2) for evaluating returns R_t.")
print(f"Portfolio weights used (w_portfolio_array): {w_portfolio_array}")

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
# from scipy.stats import chi2 # Removed as using empirical quantile

# --- Configuration ---
tickers = ['XLE', 'XLF', 'XLU', 'XLI', 'XLK', 'XLV', 'XLY', 'XLP', 'XLB']
start_date = '2004-01-01'
end_date = pd.Timestamp.today().strftime('%Y-%m-%d') # Default to today
# end_date = '2025-05-27' # Or use a fixed end date for consistency

HALFLIFE = 126 # Approximately 6 months (trading days)
MIN_PERIODS_COV = HALFLIFE

# --- Portfolio Weights ---
N_ASSETS_from_tickers = len(tickers)
w_portfolio_array = np.array([1/N_ASSETS_from_tickers] * N_ASSETS_from_tickers) # Equally weighted
W_diag_matrix = np.diag(w_portfolio_array)

# --- Attribution Analysis Configuration ---
# For the detailed TABLE printout of top N shock days
NUM_SHOCK_DAYS_FOR_TABLE_DETAIL = 3
# For the new BAR CHART, we will plot latest day + up to 3 latest shock days
NUM_RECENT_SHOCK_DAYS_FOR_BAR_CHART = 3

TRAILING_WEEK_DAYS = 5
TRAILING_MONTH_DAYS = 21
SHOCK_EVENT_QUANTILE = 0.99 # Identify top 1% of Mahalanobis distances as shocks

# --- !! DIAGNOSTIC CONTROL !! ---
VERBOSE_DEBUG_LOOP = False
PRINT_SERIES_DEBUG = False # Set to True for more detailed series printouts if debugging

# --- 1. Download Data ---
data = pd.DataFrame()
print("--- Data Acquisition and Initial Processing ---")
try:
    print(f"Attempting to download data for: {', '.join(tickers)}")
    print(f"Date range: {start_date} to {end_date}")
    _raw_data = yf.download(tickers, start=start_date, end=end_date, progress=False)

    if _raw_data.empty:
        raise ValueError("yf.download returned an empty DataFrame.")

    if isinstance(_raw_data.columns, pd.MultiIndex):
        data = _raw_data.xs('Close', level=0, axis=1)
    elif len(tickers) == 1 and 'Close' in _raw_data.columns:
        data = _raw_data[['Close']]; data.columns = tickers
    elif all(ticker in _raw_data.columns for ticker in tickers) and len(tickers) == len(_raw_data.columns):
        data = _raw_data
    else:
        print("Downloaded data columns structure is unexpected. Raw columns:")
        print(_raw_data.columns)
        raise ValueError("Could not reliably extract 'Close' prices.")

    if data.empty or data.isnull().all().all():
        raise ValueError("Extracted 'Close' price data is empty or all NaN.")

    data = data[tickers]
    initial_rows_before_dropna = len(data)
    data = data.dropna(axis=0, how='any')
    print(f"Data after dropna. Shape: {data.shape}. Rows dropped: {initial_rows_before_dropna - len(data)}")

    if data.shape[0] < MIN_PERIODS_COV + 5:
         raise ValueError(f"Insufficient data rows ({data.shape[0]}) after cleaning. Need {MIN_PERIODS_COV + 5}.")

except Exception as e:
    print(f"CRITICAL ERROR during data acquisition: {e}"); exit()

print(f"Data successfully processed. Usable: {data.index.min().strftime('%Y-%m-%d')} to {data.index.max().strftime('%Y-%m-%d')}. Shape: {data.shape}")

# --- 2. Calculate Returns ---
print("\n--- Return Calculation ---")
returns = np.log(data / data.shift(1))
num_obs_lost_to_shift = returns.iloc[:,0].isna().sum()
returns = returns.dropna()
print(f"Log returns. Shape: {returns.shape}. Initial NaNs removed: {num_obs_lost_to_shift} day(s).")
print(f"Returns data: {returns.index.min().strftime('%Y-%m-%d')} to {returns.index.max().strftime('%Y-%m-%d')}")
if returns.shape[0] < MIN_PERIODS_COV + 1: print(f"CRITICAL ERROR: Insufficient returns data ({returns.shape[0]} days). Need {MIN_PERIODS_COV + 1}."); exit()

N_ASSETS = returns.shape[1]
if N_ASSETS != len(w_portfolio_array): print(f"CRITICAL ERROR: Asset count mismatch ({N_ASSETS} vs {len(w_portfolio_array)} weights)."); exit()
print(f"Number of assets (N_ASSETS): {N_ASSETS}")

# --- 3. EWMA Covariance Series Calculation ---
print("\n--- EWMA Covariance Series Calculation ---")
ewma_cov_full_series = returns.ewm(halflife=HALFLIFE, min_periods=MIN_PERIODS_COV).cov()
if ewma_cov_full_series.dropna(how='all').empty: print("CRITICAL ERROR: EWMA covariance series is entirely NaN."); exit()
valid_ewma_cov_dates = ewma_cov_full_series.index.get_level_values(0).unique()
if valid_ewma_cov_dates.empty: print("CRITICAL ERROR: No valid dates in EWMA covariance series index."); exit()
first_valid_cov_date_in_series = valid_ewma_cov_dates.min()
print(f"First date for $\Sigma_{{t-1}}$ availability: {first_valid_cov_date_in_series.strftime('%Y-%m-%d')}")

# --- Main Calculation Loop ---
print("\n--- Main Calculation Loop for Metrics & Daily Contributions ---")
mah_dist_series = pd.Series(index=returns.index, dtype=float, name="MahalanobisDistance")
vol_shock_series = pd.Series(index=returns.index, dtype=float, name="VolatilityShock")
corr_shock_series = pd.Series(index=returns.index, dtype=float, name="CorrelationShock")
contrib_M_daily_df = pd.DataFrame(index=returns.index, columns=tickers, dtype=float)
contrib_VS_daily_df = pd.DataFrame(index=returns.index, columns=tickers, dtype=float)
contrib_CS_daily_df = pd.DataFrame(index=returns.index, columns=tickers, dtype=float)

processed_days_count = 0; skipped_days_summary = {k:0 for k in ["no_lagged_cov","type_err_sigma","oth_exc_sigma","nan_inf_sigma","nan_sigma_retrieved","non_pos_var","ill_cond_sigma","sing_sigma_inv","nan_inf_m"]}
total_loop_iterations = 0; last_successful_calc_date = None; loop_start_index_val = 0
try:
    first_t_minus_1_idx_loc = returns.index.get_loc(first_valid_cov_date_in_series)
    loop_start_index_val = first_t_minus_1_idx_loc + 1
    if loop_start_index_val >= len(returns.index): raise ValueError("Not enough data points after first valid covariance date for loop.")
    print(f"INFO: EWMA warm-up uses returns up to {returns.index[loop_start_index_val-1].strftime('%Y-%m-%d')}.")
    print(f"INFO: Calculations start for t_date = {returns.index[loop_start_index_val].strftime('%Y-%m-%d')}.")
except KeyError: print(f"CRITICAL ERROR: Date alignment issue. {first_valid_cov_date_in_series} not in returns index."); exit()

if len(returns.index) > loop_start_index_val :
    print(f"Looping {len(returns.index) - loop_start_index_val} iterations.")
    for i in range(loop_start_index_val, len(returns.index)):
        total_loop_iterations += 1; t_date = returns.index[i]; t_minus_1_date = returns.index[i-1]
        R_t_vector = returns.loc[t_date].values.reshape(N_ASSETS, 1)
        Sigma_t_minus_1_current = np.array([])
        try:
            if t_minus_1_date not in valid_ewma_cov_dates: skipped_days_summary["no_lagged_cov_data"] +=1; continue
            Sigma_t_minus_1_df_from_lookup = ewma_cov_full_series.loc[t_minus_1_date]
            if not isinstance(Sigma_t_minus_1_df_from_lookup, pd.DataFrame): skipped_days_summary["type_err_sigma"] += 1; continue
            if Sigma_t_minus_1_df_from_lookup.isnull().values.any(): skipped_days_summary["nan_sigma_retrieved"] += 1; continue
            Sigma_t_minus_1_current = Sigma_t_minus_1_df_from_lookup.reindex(index=tickers, columns=tickers).values
        except KeyError: skipped_days_summary["key_error_cov_lookup"] += 1; continue
        except Exception: skipped_days_summary["oth_exc_sigma"] += 1; continue
        if Sigma_t_minus_1_current.size == 0: skipped_days_summary["oth_exc_sigma"] += 1; continue
        if np.isnan(Sigma_t_minus_1_current).any() or np.isinf(Sigma_t_minus_1_current).any(): skipped_days_summary["nan_inf_sigma"] += 1; continue
        cond_Sigma = np.linalg.cond(Sigma_t_minus_1_current);
        if cond_Sigma > 1e12: skipped_days_summary["ill_cond_sigma"] += 1; continue
        try: Sigma_t_minus_1_inv_current = np.linalg.inv(Sigma_t_minus_1_current)
        except Exception: skipped_days_summary["sing_sigma_inv"] += 1; continue
        if Sigma_t_minus_1_inv_current.size == 0: skipped_days_summary["sing_sigma_inv"] += 1; continue
        M_val = (R_t_vector.T @ W_diag_matrix @ Sigma_t_minus_1_inv_current @ W_diag_matrix @ R_t_vector).item()
        if np.isnan(M_val) or np.isinf(M_val) or M_val < 0: skipped_days_summary["nan_inf_m"] += 1; continue
        mah_dist_series.loc[t_date] = M_val
        diag_Sigma_t_minus_1_current = np.diag(Sigma_t_minus_1_current)
        if np.any(diag_Sigma_t_minus_1_current <= 1e-12):
            skipped_days_summary["non_pos_var"] += 1; vol_shock_series.loc[t_date]=np.nan; corr_shock_series.loc[t_date]=np.nan
            contrib_M_daily_df.loc[t_date]=np.full(N_ASSETS,np.nan); contrib_VS_daily_df.loc[t_date]=np.full(N_ASSETS,np.nan); contrib_CS_daily_df.loc[t_date]=np.full(N_ASSETS,np.nan)
            last_successful_calc_date = t_date; processed_days_count +=1; continue
        D_t_minus_1_sq_inv_current = np.diag(1.0 / diag_Sigma_t_minus_1_current)
        VS_val = (R_t_vector.T @ W_diag_matrix @ D_t_minus_1_sq_inv_current @ W_diag_matrix @ R_t_vector).item()
        if np.isnan(VS_val) or np.isinf(VS_val) or VS_val < 0: vol_shock_series.loc[t_date]=np.nan; corr_shock_series.loc[t_date]=np.nan
        else: vol_shock_series.loc[t_date] = VS_val; CS_val = M_val - VS_val; corr_shock_series.loc[t_date] = CS_val
        A_M_shock_day = W_diag_matrix @ Sigma_t_minus_1_inv_current @ W_diag_matrix; v_M_shock_day = A_M_shock_day @ R_t_vector
        C_M_k_array = R_t_vector.flatten() * v_M_shock_day.flatten(); contrib_M_daily_df.loc[t_date] = C_M_k_array
        std_devs_t_minus_1_current = np.sqrt(diag_Sigma_t_minus_1_current)
        if np.any(std_devs_t_minus_1_current <= 1e-8): C_VS_k_array = np.full(N_ASSETS, np.nan)
        else: C_VS_k_array = (w_portfolio_array**2) * ((R_t_vector.flatten() / std_devs_t_minus_1_current)**2)
        contrib_VS_daily_df.loc[t_date] = C_VS_k_array
        if not (np.isnan(C_M_k_array).any() or np.isnan(C_VS_k_array).any()): contrib_CS_daily_df.loc[t_date] = C_M_k_array - C_VS_k_array
        else: contrib_CS_daily_df.loc[t_date] = np.full(N_ASSETS, np.nan)
        last_successful_calc_date = t_date; processed_days_count += 1
else: print(f"CRITICAL WARNING: Loop condition not met.")
print(f"\nLoop finished. Iterations: {total_loop_iterations}. Successfully processed days: {processed_days_count}")
if last_successful_calc_date: print(f"Last date calculations stored: {last_successful_calc_date.strftime('%Y-%m-%d')}")
else: print("No days successfully processed.")
for reason, count in skipped_days_summary.items():
    if count > 0: print(f"  - Skipped due to '{reason}': {count} days")

def print_series_info_debug(series, name):
    if isinstance(series, pd.Series):
        if PRINT_SERIES_DEBUG and not series.dropna().empty : print(f"  Tail of {name} (non-NaN):\n{series.dropna().tail()}")
    elif isinstance(series, pd.DataFrame):
        if PRINT_SERIES_DEBUG and not series.dropna(how='all').empty: print(f"  Tail of {name} (first col, non-NaN):\n{series[series.columns[0]].dropna().tail()}")
print("\n--- Populated Series (Tail Samples if PRINT_SERIES_DEBUG=True) ---")
print_series_info_debug(mah_dist_series, "mah_dist_series (full)")
print_series_info_debug(contrib_M_daily_df, "contrib_M_daily_df (full)")

print("\n--- Final Data Prep for Plotting & Attribution ---")
mah_dist_plot_final = mah_dist_series.dropna()
vol_shock_plot_final = vol_shock_series.reindex(mah_dist_plot_final.index).fillna(0)
corr_shock_plot_final = corr_shock_series.reindex(mah_dist_plot_final.index).fillna(0)
valid_contrib_dates = mah_dist_plot_final.index
contrib_M_daily_df = contrib_M_daily_df.loc[contrib_M_daily_df.index.isin(valid_contrib_dates)].dropna(how='all')
contrib_VS_daily_df = contrib_VS_daily_df.loc[contrib_VS_daily_df.index.isin(valid_contrib_dates)].dropna(how='all')
contrib_CS_daily_df = contrib_CS_daily_df.loc[contrib_CS_daily_df.index.isin(valid_contrib_dates)].dropna(how='all')
print(f"Length of final Mahalanobis Distance plot series: {len(mah_dist_plot_final)}")
if not mah_dist_plot_final.empty: print(f"  Plot data from: {mah_dist_plot_final.index.min().strftime('%Y-%m-%d')} to {mah_dist_plot_final.index.max().strftime('%Y-%m-%d')}")
else: print("No data to plot for time series after all processing.")

# --- Plotting Time Series with Plotly (Single Figure, 3 Subplots) ---
print("\n--- Plotting Interactive Time Series with Plotly ---")
if not mah_dist_plot_final.empty:
    fig_timeseries = make_subplots(rows=3, cols=1, shared_xaxes=True,
                                   subplot_titles=('Mahalanobis Distance',
                                                   'Weighted Correlation Shock Component',
                                                   'Weighted Volatility Shock Component'))

    # Subplot 1: Mahalanobis Distance
    fig_timeseries.add_trace(go.Scatter(x=mah_dist_plot_final.index, y=mah_dist_plot_final, mode='lines', name='Mahalanobis Distance', line=dict(color='blue', width=1)), row=1, col=1)
    m_dist_plot_threshold = mah_dist_plot_final.quantile(SHOCK_EVENT_QUANTILE)
    fig_timeseries.add_hline(y=m_dist_plot_threshold, line_dash="dash", line_color="red", annotation_text=f"{SHOCK_EVENT_QUANTILE*100:.0f}th Emp. Perc ({m_dist_plot_threshold:.2f})", row=1, col=1)
    fig_timeseries.update_yaxes(title_text="Mahalanobis Dist.", row=1, col=1)

    # Subplot 2: Correlation Shock Component
    if not corr_shock_plot_final.empty:
        fig_timeseries.add_trace(go.Scatter(x=corr_shock_plot_final.index, y=corr_shock_plot_final, mode='lines', name='Correlation Shock Comp.', line=dict(color='purple', width=1)), row=2, col=1)
        threshold_cs_upper_val = corr_shock_plot_final.quantile(SHOCK_EVENT_QUANTILE)
        threshold_cs_lower_val = corr_shock_plot_final.quantile(1-SHOCK_EVENT_QUANTILE)
        fig_timeseries.add_hline(y=threshold_cs_upper_val, line_dash="dash", line_color="red", annotation_text=f"{SHOCK_EVENT_QUANTILE*100:.0f}th Perc", row=2, col=1)
        fig_timeseries.add_hline(y=threshold_cs_lower_val, line_dash="dash", line_color="orange", annotation_text=f"{(1-SHOCK_EVENT_QUANTILE)*100:.0f}th Perc", row=2, col=1)
    fig_timeseries.update_yaxes(title_text="Correlation Shock", row=2, col=1)

    # Subplot 3: Volatility Shock Component
    if not vol_shock_plot_final.empty:
        fig_timeseries.add_trace(go.Scatter(x=vol_shock_plot_final.index, y=vol_shock_plot_final, mode='lines', name='Volatility Shock Comp.', line=dict(color='green', width=1)), row=3, col=1)
        threshold_vs_val = vol_shock_plot_final.quantile(SHOCK_EVENT_QUANTILE)
        fig_timeseries.add_hline(y=threshold_vs_val, line_dash="dash", line_color="red", annotation_text=f"{SHOCK_EVENT_QUANTILE*100:.0f}th Perc", row=3, col=1)
    fig_timeseries.update_yaxes(title_text="Volatility Shock", row=3, col=1)

    fig_timeseries.update_layout(height=900, title_text=f'Portfolio-Weighted Mahalanobis Distance & Decomposition (EWMA Half-life: {HALFLIFE} days)', legend_tracegroupgap=100)
    fig_timeseries.update_xaxes(title_text="Date", row=3, col=1)
    print("Displaying combined interactive time series plot...")
    fig_timeseries.show()
else:
    print("Mahalanobis Distance time series plot not generated as data is empty.")


# --- Shock Day Attribution Analysis ---
print("\n--- Shock Day Attribution Analysis ---")

if mah_dist_plot_final.empty:
    print("No Mahalanobis distance data available for shock day attribution.")
else:
    shock_threshold_value = mah_dist_plot_final.quantile(SHOCK_EVENT_QUANTILE)
    print(f"Identifying shock days where Mahalanobis Distance >= {shock_threshold_value:.4f} (top {(1-SHOCK_EVENT_QUANTILE)*100:.0f}% of observed values).")

    shock_days_series = mah_dist_plot_final[mah_dist_plot_final >= shock_threshold_value].sort_index(ascending=False)

    print(f"Number of shock days found: {len(shock_days_series)}")

    if not shock_days_series.empty:
        print("\nShock Dates (Mahalanobis Distance >= Threshold), most recent first (all identified):")
        for date_val, value in shock_days_series.items():
            print(f"  - {date_val.strftime('%Y-%m-%d')}: Mahalanobis Distance = {value:.4f}")

        # --- Detailed Attribution Table for Top N Recent Shock Days ---
        # This table remains as it provides a different level of detail (MD, VS, CS)
        print(f"\nGenerating detailed attribution table for the {min(NUM_SHOCK_DAYS_FOR_TABLE_DETAIL, len(shock_days_series))} most recent shock day(s):")
        for k_shock_day_table, (shock_date_table, m_scalar_val_table) in enumerate(shock_days_series.head(NUM_SHOCK_DAYS_FOR_TABLE_DETAIL).items()):
            print(f"\n--- Attribution Table for Shock Day: {shock_date_table.strftime('%Y-%m-%d')} (Mahalanobis Distance = {m_scalar_val_table:.4f}) ---")
            attribution_rows_for_table = []
            periods_to_analyze = [('Last Day', 1), ('Avg Last Week (abs)', TRAILING_WEEK_DAYS), ('Avg Last Month (abs)', TRAILING_MONTH_DAYS)]
            for period_label, num_days_in_period in periods_to_analyze:
                C_M_k_period, C_VS_k_period, C_CS_k_period = [np.full(N_ASSETS, np.nan)] * 3
                if shock_date_table not in contrib_M_daily_df.index: pass
                elif num_days_in_period == 1:
                    C_M_k_period = contrib_M_daily_df.loc[shock_date_table].values
                    C_VS_k_period = contrib_VS_daily_df.loc[shock_date_table].values
                    C_CS_k_period = contrib_CS_daily_df.loc[shock_date_table].values
                else:
                    try:
                        if shock_date_table not in contrib_M_daily_df.index: raise KeyError
                        idx_loc = contrib_M_daily_df.index.get_loc(shock_date_table)
                        start_idx = max(0, idx_loc - num_days_in_period + 1)
                        start_dt = contrib_M_daily_df.index[start_idx]; end_dt = shock_date_table
                        if start_dt > end_dt: raise ValueError("Start date after end date")
                        m_slice = contrib_M_daily_df.loc[start_dt:end_dt]; vs_slice = contrib_VS_daily_df.loc[start_dt:end_dt]; cs_slice = contrib_CS_daily_df.loc[start_dt:end_dt]
                        if not m_slice.empty: C_M_k_period = m_slice.abs().mean(skipna=True).values
                        if not vs_slice.empty: C_VS_k_period = vs_slice.abs().mean(skipna=True).values
                        if not cs_slice.empty: C_CS_k_period = cs_slice.abs().mean(skipna=True).values
                    except (KeyError, ValueError): pass
                attribution_rows_for_table.append({'Period': period_label, 'Contrib_M': C_M_k_period, 'Contrib_VS': C_VS_k_period, 'Contrib_CS': C_CS_k_period})
            shock_day_table_list = []
            for asset_idx, ticker_name in enumerate(tickers):
                rd = {'Asset': ticker_name}
                for r_detail in attribution_rows_for_table:
                    p = r_detail['Period']
                    def get_val(arr,ix): return arr[ix] if isinstance(arr,np.ndarray) and ix<len(arr) and not np.isnan(arr[ix]) else 'N/A'
                    rd[f'C_MD ({p})']=get_val(r_detail['Contrib_M'],asset_idx); rd[f'C_VS ({p})']=get_val(r_detail['Contrib_VS'],asset_idx); rd[f'C_CS ({p})']=get_val(r_detail['Contrib_CS'],asset_idx)
                shock_day_table_list.append(rd)
            shock_day_contrib_df = pd.DataFrame(shock_day_table_list).set_index('Asset')
            sort_col = 'C_MD (Last Day)'
            if sort_col in shock_day_contrib_df.columns:
                shock_day_contrib_df[sort_col] = pd.to_numeric(shock_day_contrib_df[sort_col], errors='coerce')
                shock_day_contrib_df = shock_day_contrib_df.reindex(shock_day_contrib_df[sort_col].abs().sort_values(ascending=False).index).fillna('N/A')
            print(shock_day_contrib_df.to_string(float_format="%.4f"))

        # --- NEW Consolidated Plotly Grouped Bar Chart for MD Contributions ---
        print(f"\n--- Plotly Grouped Bar Chart of MD Contributions for Specific Dates ---")

        bar_chart_data_list = []
        dates_for_bar_chart = {}

        # 1. Latest Observation Day
        if not contrib_M_daily_df.empty:
            latest_obs_date = contrib_M_daily_df.index[-1]
            dates_for_bar_chart[f"Latest Obs. ({latest_obs_date.strftime('%Y-%m-%d')})"] = contrib_M_daily_df.loc[latest_obs_date].values

        # 2. Three most recent shock days (if available and different from latest_obs_date)
        num_shock_days_added = 0
        for shock_date, m_val in shock_days_series.items():
            if num_shock_days_added >= NUM_RECENT_SHOCK_DAYS_FOR_BAR_CHART:
                break
            # Avoid duplicating latest_obs_date if it's also a shock day already plotted
            date_label = f"Shock Day ({shock_date.strftime('%Y-%m-%d')})"
            if date_label not in dates_for_bar_chart: # Ensure distinct columns for distinct scenarios
                 if shock_date in contrib_M_daily_df.index:
                    dates_for_bar_chart[date_label] = contrib_M_daily_df.loc[shock_date].values
                    num_shock_days_added +=1
                 elif latest_obs_date == shock_date and f"Latest Obs. ({latest_obs_date.strftime('%Y-%m-%d')})" in dates_for_bar_chart : #if latest obs was a shock day already counted
                    pass #already added
                 else: #shock date contributions not available
                    dates_for_bar_chart[date_label] = np.full(N_ASSETS, np.nan)
                    num_shock_days_added +=1


        if not dates_for_bar_chart:
            print("No data available for the consolidated contribution bar chart.")
        else:
            bar_df_data = {'Asset': tickers}
            for label, contrib_values in dates_for_bar_chart.items():
                bar_df_data[label] = contrib_values

            bar_df_plotly_consolidated = pd.DataFrame(bar_df_data).set_index('Asset')
            # Sort assets by max absolute contribution across the displayed scenarios for better visualization
            if not bar_df_plotly_consolidated.empty:
                 bar_df_plotly_consolidated['max_abs_contrib'] = bar_df_plotly_consolidated.abs().max(axis=1)
                 bar_df_plotly_consolidated = bar_df_plotly_consolidated.sort_values(by='max_abs_contrib', ascending=False).drop(columns=['max_abs_contrib'])

            fig_contrib_bar_consolidated = go.Figure()
            for period_col_name_bar in bar_df_plotly_consolidated.columns:
                fig_contrib_bar_consolidated.add_trace(go.Bar(
                    name=period_col_name_bar,
                    x=bar_df_plotly_consolidated.index,
                    y=bar_df_plotly_consolidated[period_col_name_bar]
                ))

            fig_contrib_bar_consolidated.update_layout(
                barmode='group',
                title_text=f'Asset Contributions to Mahalanobis Distance for Key Dates',
                xaxis_title="Asset",
                yaxis_title="Contribution to Mahalanobis Distance ($C_{M,k}$)",
                legend_title="Date/Scenario"
            )
            fig_contrib_bar_consolidated.show()

    else:
        print("No shock days met threshold for detailed attribution or plotting.")

print("\n--- Analysis Complete ---")

# Claude Refactoring

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from functools import reduce

# Configuration
CONFIG = {
    'tickers': ['XLE', 'XLF', 'XLU', 'XLI', 'XLK', 'XLV', 'XLY', 'XLP', 'XLB'],
    'start_date': '2004-01-01',
    'end_date': pd.Timestamp.today().strftime('%Y-%m-%d'),
    'halflife': 126,
    'min_periods': 126,
    'shock_quantile': 0.99,
    'num_shock_days_table': 3,
    'num_shock_days_chart': 3,
    'trailing_periods': {'week': 5, 'month': 21}
}

# Basic utility functions
def is_valid_covariance(cov_matrix: np.ndarray, max_condition: float = 1e12) -> bool:
    """Check if covariance matrix is valid for inversion."""
    return (
        not np.isnan(cov_matrix).any() and
        not np.isinf(cov_matrix).any() and
        np.all(np.diag(cov_matrix) > 1e-12) and
        np.linalg.cond(cov_matrix) <= max_condition
    )

# Data acquisition and preprocessing
def download_data(tickers: list, start: str, end: str) -> pd.DataFrame:
    """Download and clean price data."""
    print(f"Downloading data for {len(tickers)} tickers from {start} to {end}")
    
    raw_data = yf.download(tickers, start=start, end=end, progress=False)
    
    if raw_data.empty:
        raise ValueError("No data downloaded")
    
    # Extract close prices
    if isinstance(raw_data.columns, pd.MultiIndex):
        data = raw_data.xs('Close', level=0, axis=1)
    elif len(tickers) == 1:
        data = raw_data[['Close']].rename(columns={'Close': tickers[0]})
    else:
        data = raw_data[tickers]
    
    # Clean and validate
    data = data.dropna()
    if len(data) < CONFIG['min_periods'] + 5:
        raise ValueError(f"Insufficient data: {len(data)} rows")
    
    print(f"Data shape: {data.shape}, range: {data.index.min():%Y-%m-%d} to {data.index.max():%Y-%m-%d}")
    return data

def calculate_returns(prices: pd.DataFrame) -> pd.DataFrame:
    """Calculate log returns."""
    returns = np.log(prices / prices.shift(1)).dropna()
    print(f"Returns shape: {returns.shape}")
    return returns

def calculate_ewma_covariance(returns: pd.DataFrame, halflife: int, min_periods: int) -> pd.DataFrame:
    """Calculate EWMA covariance matrix series."""
    return returns.ewm(halflife=halflife, min_periods=min_periods).cov()

# Core risk metric calculations
def calculate_metrics_for_date(
    returns_t: np.ndarray, 
    cov_matrix: np.ndarray, 
    weights: np.ndarray
) -> tuple[float, float, float, np.ndarray, np.ndarray, np.ndarray]:
    """Calculate all metrics for a single date."""
    W_diag = np.diag(weights)
    cov_inv = np.linalg.inv(cov_matrix)
    
    # Mahalanobis distance
    mahal_dist = (returns_t.T @ W_diag @ cov_inv @ W_diag @ returns_t).item()
    
    # Volatility shock (using diagonal covariance)
    vol_inv = np.diag(1.0 / np.diag(cov_matrix))
    vol_shock = (returns_t.T @ W_diag @ vol_inv @ W_diag @ returns_t).item()
    
    # Correlation shock (residual)
    corr_shock = mahal_dist - vol_shock
    
    # Contributions
    A_matrix = W_diag @ cov_inv @ W_diag
    v_vector = A_matrix @ returns_t
    contrib_mahal = returns_t.flatten() * v_vector.flatten()
    
    std_devs = np.sqrt(np.diag(cov_matrix))
    contrib_vol = (weights**2) * ((returns_t.flatten() / std_devs)**2)
    contrib_corr = contrib_mahal - contrib_vol
    
    return mahal_dist, vol_shock, corr_shock, contrib_mahal, contrib_vol, contrib_corr

def compute_risk_metrics(returns: pd.DataFrame, cov_series: pd.DataFrame) -> dict[str, pd.Series]:
    """Compute all risk metrics efficiently."""
    tickers = returns.columns.tolist()
    n_assets = len(tickers)
    weights = np.full(n_assets, 1/n_assets)
    
    # Initialize result containers
    results = {
        'mahal_dist': pd.Series(index=returns.index, dtype=float),
        'vol_shock': pd.Series(index=returns.index, dtype=float),
        'corr_shock': pd.Series(index=returns.index, dtype=float),
        'contrib_mahal': pd.DataFrame(index=returns.index, columns=tickers, dtype=float),
        'contrib_vol': pd.DataFrame(index=returns.index, columns=tickers, dtype=float),
        'contrib_corr': pd.DataFrame(index=returns.index, columns=tickers, dtype=float)
    }
    
    valid_cov_dates = cov_series.index.get_level_values(0).unique()
    processed = 0
    
    for i in range(1, len(returns)):
        t_date = returns.index[i]
        t_minus_1 = returns.index[i-1]
        
        if t_minus_1 not in valid_cov_dates:
            continue
            
        try:
            cov_matrix = cov_series.loc[t_minus_1].reindex(
                index=tickers, columns=tickers
            ).values
            
            if not is_valid_covariance(cov_matrix):
                continue
                
            returns_t = returns.loc[t_date].values.reshape(-1, 1)
            
            metrics = calculate_metrics_for_date(returns_t, cov_matrix, weights)
            mahal_dist, vol_shock, corr_shock, contrib_m, contrib_v, contrib_c = metrics
            
            results['mahal_dist'].loc[t_date] = mahal_dist
            results['vol_shock'].loc[t_date] = vol_shock
            results['corr_shock'].loc[t_date] = corr_shock
            results['contrib_mahal'].loc[t_date] = contrib_m
            results['contrib_vol'].loc[t_date] = contrib_v
            results['contrib_corr'].loc[t_date] = contrib_c
            
            processed += 1
            
        except Exception:
            continue
    
    print(f"Successfully processed {processed} days")
    return {k: v.dropna() if isinstance(v, pd.Series) else v.dropna(how='all') 
            for k, v in results.items()}

# Business logic and analysis
def identify_shock_days(mahal_series: pd.Series, quantile: float) -> pd.Series:
    """Identify shock days above threshold."""
    threshold = mahal_series.quantile(quantile)
    shock_days = mahal_series[mahal_series >= threshold].sort_index(ascending=False)
    print(f"Found {len(shock_days)} shock days (threshold: {threshold:.4f})")
    return shock_days

def create_attribution_table(
    shock_date: pd.Timestamp,
    contrib_data: dict[str, pd.DataFrame],
    tickers: list,
    periods: dict[str, int]
) -> pd.DataFrame:
    """Create attribution table for a shock day."""
    def get_period_contrib(df: pd.DataFrame, date: pd.Timestamp, days: int) -> np.ndarray:
        if date not in df.index:
            return np.full(len(tickers), np.nan)
        
        if days == 1:
            return df.loc[date].values
        
        try:
            idx = df.index.get_loc(date)
            start_idx = max(0, idx - days + 1)
            slice_data = df.iloc[start_idx:idx+1]
            return slice_data.abs().mean().values
        except:
            return np.full(len(tickers), np.nan)
    
    # Calculate contributions for different periods
    period_data = {}
    for period_name, days in [('Last Day', 1)] + [(f'Avg {k.title()}', v) for k, v in periods.items()]:
        for metric in ['mahal', 'vol', 'corr']:
            contrib_key = f'contrib_{metric}'
            if contrib_key in contrib_data:
                period_data[f'{metric}_{period_name}'] = get_period_contrib(
                    contrib_data[contrib_key], shock_date, days
                )
    
    # Create DataFrame
    table_data = []
    for i, ticker in enumerate(tickers):
        row = {'Asset': ticker}
        for col_name, values in period_data.items():
            row[col_name] = values[i] if not np.isnan(values[i]) else 'N/A'
        table_data.append(row)
    
    df = pd.DataFrame(table_data).set_index('Asset')
    
    # Sort by absolute contribution
    if 'mahal_Last Day' in df.columns:
        sort_col = pd.to_numeric(df['mahal_Last Day'], errors='coerce')
        df = df.reindex(sort_col.abs().sort_values(ascending=False).index)
    
    return df

# Visualization and reporting
def create_time_series_plot(metrics: dict[str, pd.Series], shock_quantile: float) -> go.Figure:
    """Create interactive time series plot."""
    fig = make_subplots(
        rows=3, cols=1, shared_xaxes=True,
        subplot_titles=['Mahalanobis Distance', 'Correlation Shock', 'Volatility Shock']
    )
    
    # Mahalanobis distance
    mahal = metrics['mahal_dist']
    threshold = mahal.quantile(shock_quantile)
    
    fig.add_trace(go.Scatter(
        x=mahal.index, y=mahal, mode='lines', 
        name='Mahalanobis Distance', line=dict(color='blue')
    ), row=1, col=1)
    
    fig.add_hline(
        y=threshold, line_dash="dash", line_color="red",
        annotation_text=f"{shock_quantile*100:.0f}th percentile ({threshold:.2f})",
        row=1, col=1
    )
    
    # Correlation shock
    corr_shock = metrics['corr_shock']
    fig.add_trace(go.Scatter(
        x=corr_shock.index, y=corr_shock, mode='lines',
        name='Correlation Shock', line=dict(color='purple')
    ), row=2, col=1)
    
    # Volatility shock
    vol_shock = metrics['vol_shock']
    fig.add_trace(go.Scatter(
        x=vol_shock.index, y=vol_shock, mode='lines',
        name='Volatility Shock', line=dict(color='green')
    ), row=3, col=1)
    
    fig.update_layout(
        height=900, 
        title=f'Portfolio Risk Metrics (EWMA Half-life: {CONFIG["halflife"]} days)',
        showlegend=True
    )
    
    return fig

def create_contribution_bar_chart(
    contrib_data: pd.DataFrame,
    shock_days: pd.Series,
    num_shock_days: int
) -> go.Figure:
    """Create grouped bar chart for contributions."""
    chart_data = {}
    
    # Latest observation
    if not contrib_data.empty:
        latest_date = contrib_data.index[-1]
        chart_data[f"Latest ({latest_date:%Y-%m-%d})"] = contrib_data.loc[latest_date]
    
    # Recent shock days
    for i, (shock_date, _) in enumerate(shock_days.head(num_shock_days).items()):
        if shock_date in contrib_data.index:
            chart_data[f"Shock ({shock_date:%Y-%m-%d})"] = contrib_data.loc[shock_date]
    
    if not chart_data:
        return go.Figure()
    
    # Create bar chart
    fig = go.Figure()
    
    for label, values in chart_data.items():
        fig.add_trace(go.Bar(name=label, x=values.index, y=values.values))
    
    fig.update_layout(
        barmode='group',
        title='Asset Contributions to Mahalanobis Distance',
        xaxis_title='Asset',
        yaxis_title='Contribution',
        legend_title='Date'
    )
    
    return fig

# Main orchestration
def main():
    """Main analysis pipeline."""
    try:
        # Data acquisition and processing
        prices = download_data(CONFIG['tickers'], CONFIG['start_date'], CONFIG['end_date'])
        returns = calculate_returns(prices)
        cov_series = calculate_ewma_covariance(returns, CONFIG['halflife'], CONFIG['min_periods'])
        
        # Risk metrics calculation
        print("\nCalculating risk metrics...")
        metrics = compute_risk_metrics(returns, cov_series)
        
        # Time series visualization
        print("\nCreating time series plot...")
        ts_fig = create_time_series_plot(metrics, CONFIG['shock_quantile'])
        ts_fig.show()
        
        # Shock day analysis
        print("\nAnalyzing shock days...")
        shock_days = identify_shock_days(metrics['mahal_dist'], CONFIG['shock_quantile'])
        
        if not shock_days.empty:
            # Attribution tables
            for i, (shock_date, mahal_val) in enumerate(shock_days.head(CONFIG['num_shock_days_table']).items()):
                print(f"\nShock Day {i+1}: {shock_date:%Y-%m-%d} (Mahalanobis: {mahal_val:.4f})")
                
                table = create_attribution_table(
                    shock_date, metrics, CONFIG['tickers'], CONFIG['trailing_periods']
                )
                print(table.to_string(float_format="%.4f"))
            
            # Contribution bar chart
            print("\nCreating contribution bar chart...")
            bar_fig = create_contribution_bar_chart(
                metrics['contrib_mahal'], shock_days, CONFIG['num_shock_days_chart']
            )
            bar_fig.show()
        
        print("\nAnalysis complete!")
        
    except Exception as e:
        print(f"Error in analysis: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
from risk_data import get_factor_data
import xarray as xr
factor_data = get_factor_data()


In [None]:
ds = factor_data.sel(factor_name=CONFIG['tickers'], factor_name_1=CONFIG['tickers'], date=slice(CONFIG['start_date'], CONFIG['end_date']), vol_type=CONFIG['halflife'], corr_type=CONFIG['halflife'])
ret = ds['ret'].to_pandas()/10000

def pd_diag(ser: pd.Series) -> pd.DataFrame:
    return pd.DataFrame(np.diag(ser), index=ser.index, columns=ser.index)

def get_cov(vol: pd.Series, corr: pd.DataFrame) -> pd.DataFrame:
    """Construct covariance matrix from volatilities and correlations."""
    D = pd_diag(vol)
    return D @ corr @ D

ds_t = ds.sel(date=ds.date.max())
cov = get_cov(vol=ds_t['vol'].to_series(), 
              corr=ds_t['corr'].to_pandas())
cov
# ds.sel(date=ds.date.max())

In [None]:
returns2 = ds.ret.to_pandas()
cov_series2 = calculate_ewma_covariance(returns2, CONFIG['halflife'], CONFIG['min_periods'])
cov_series2

In [None]:
# Data acquisition and processing
prices = download_data(CONFIG['tickers'], CONFIG['start_date'], CONFIG['end_date'])
returns = calculate_returns(prices)
cov_series = calculate_ewma_covariance(returns, CONFIG['halflife'], CONFIG['min_periods'])
cov_series



In [None]:
    
    # Risk metrics calculation
    print("\nCalculating risk metrics...")
    metrics = compute_risk_metrics(returns, cov_series)
        
    # Time series visualization
    print("\nCreating time series plot...")
    ts_fig = create_time_series_plot(metrics, CONFIG['shock_quantile'])
    ts_fig.show()
        
    # Shock day analysis
    print("\nAnalyzing shock days...")
    shock_days = identify_shock_days(metrics['mahal_dist'], CONFIG['shock_quantile'])
        
        if not shock_days.empty:
            # Attribution tables
            for i, (shock_date, mahal_val) in enumerate(shock_days.head(CONFIG['num_shock_days_table']).items()):
                print(f"\nShock Day {i+1}: {shock_date:%Y-%m-%d} (Mahalanobis: {mahal_val:.4f})")
                
                table = create_attribution_table(
                    shock_date, metrics, CONFIG['tickers'], CONFIG['trailing_periods']
                )
                print(table.to_string(float_format="%.4f"))
            
    #         # Contribution bar chart
    #         print("\nCreating contribution bar chart...")
    #         bar_fig = create_contribution_bar_chart(
    #             metrics['contrib_mahal'], shock_days, CONFIG['num_shock_days_chart']
    #         )
    #         bar_fig.show()
        
    #     print("\nAnalysis complete!")
        
    # except Exception as e:
    #     print(f"Error in analysis: {e}")
    #     raise

In [None]:

xr.Dataset(metrics)