In [2]:
import numpy as np
import pandas as pd
import math

from scipy.optimize import minimize, Bounds, fmin_cg
from skopt import gp_minimize
from deap import base, creator, tools, algorithms
import random

from matplotlib import pyplot as plt
import seaborn as sns

import yfinance as yf
import pandas_datareader as pdr
import pandas_market_calendars as mcal

import datetime


In [8]:
#S&P 500 ticker
SnP_componet_ticker = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0].Symbol
Russell_1000_componet_ticker = pd.read_html('https://en.wikipedia.org/wiki/Russell_1000_Index')[3].Symbol
SnP600_componet_ticker = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_600_companies')[0].Symbol

def final_ticker(T_list):
    return list(set(T_list))

index_banch_mart = ["SPY"]

tickers = final_ticker(SnP_componet_ticker)

def get_stock_data(tickers_list, start_date, end_date):
    
    all_tickers = list(dict.fromkeys(tickers_list))
    
    data = yf.download(all_tickers, start=start_date, end=end_date)['Close']
    if isinstance(data, pd.DataFrame):
        data = data.dropna(axis=1)
        available_tickers = list(data.columns)
    else:
        available_tickers = all_tickers
    
    return data, available_tickers

end_date = datetime.date.today()
start_date = end_date - datetime.timedelta(days=756*3) #3 years analysis


df,tickers = get_stock_data(tickers,start_date,end_date)

df_BM,_ = get_stock_data(index_banch_mart, start_date,end_date)
number_of_stocks = df.shape[1]

[*********************100%***********************]  503 of 503 completed 503 completed

2 Failed downloads:
['BF.B']: YFPricesMissingError('possibly delisted; no price data found  (1d 2018-12-19 -> 2025-03-05)')
[*********************100%***********************]  503 of 503 completed['BRK.B']: YFTzMissingError('possibly delisted; no timezone found')
[*********************100%***********************]  1 of 1 completed


S&P 500 is weighted by free-float market capitalization.

In [23]:
def get_market_caps(tickers):
    market_caps = {}
    for ticker in tickers:
        try:
            stock = yf.Ticker(ticker)
            market_cap = stock.info.get('marketCap', None)
            if market_cap:
                market_caps[ticker] = market_cap
        except:
            pass
    return market_caps

weights = {ticker: 1/len(tickers) for ticker in tickers}

# Main idea:
Take advantage of the mismatch between the volatility of index and it's constituent.
$$
\sigma_\mathrm{index}^2 = \sum_{i} w_i^2\sigma_i^2 + 2\sum_{i \neq j} \rho_{ij}w_i w_j\sigma_i\sigma_j,
$$

$$
\sigma_\mathrm{weighted \ stock} = \sum_{i} w_i\sigma_i.
$$
where $\rho_{ij}$ is the correalation between stock $i$, $j$.

Note 
$$
\sigma_\mathrm{index} \leq \sigma_\mathrm{weighted \ stock}.
$$

Trading logic:

If $\sigma_\mathrm{index} \leq \sigma_\mathrm{weighted \ stock}$ and $\rho_{ij}$ is low -> Short index volatility, long stock volatilities.

If $\sigma_\mathrm{index} \approx \sigma_\mathrm{weighted \ stock}$ and $\rho_{ij}$ is hight -> Long index volatility, short stock volatilities.

In [None]:
stock_returns = df.pct_change().dropna()
index_returns = df_BM.pct_change().dropna()

stock_vols = stock_returns.rolling(window=20).std() * np.sqrt(252)  # Annualized
index_vol = index_returns.rolling(window=20).std() * np.sqrt(252)  # Annualized

In [32]:
def calculate_implied_correlation(index_vol_series, stock_vols_df, weights=None):
    common_idx = index_vol_series.index.intersection(stock_vols_df.index)
    index_vol_aligned = index_vol_series.loc[common_idx]
    stock_vols_aligned = stock_vols_df.loc[common_idx]
    
    n = stock_vols_aligned.shape[1]
    
    if weights is None:
        weights = {col: 1/n for col in stock_vols_aligned.columns}
    
    weight_array = np.array([weights.get(col, 1/n) for col in stock_vols_aligned.columns])
    
    weighted_vol_squared_sum = np.zeros(len(common_idx))
    weighted_vol_sum_squared = np.zeros(len(common_idx))
    
    for i, date in enumerate(common_idx):
        vols = stock_vols_aligned.loc[date].values
        weighted_vol_squared_sum[i] = np.sum((weight_array**2) * (vols**2))
        weighted_vol_sum = np.sum(weight_array * vols)
        weighted_vol_sum_squared[i] = weighted_vol_sum**2
    
    index_vol_squared = index_vol_aligned.values**2
    
    numerator = index_vol_squared - weighted_vol_squared_sum
    denominator = weighted_vol_sum_squared - weighted_vol_squared_sum
    
    implied_corr = pd.Series(numerator / denominator, index=common_idx)
    return implied_corr.clip(-1, 1)


def calculate_realized_correlation(returns, window=20):
    avg_correlations = []
    dates = []
    
    # Step through time, computing the average correlation at each point
    for i in range(window, len(returns)):

        window_returns = returns.iloc[i-window:i]
        
        # Calculate correlation matrix
        corr_matrix = window_returns.corr().values
        
        # Get upper triangle (excluding diagonal)
        mask = np.triu(np.ones_like(corr_matrix, dtype=bool), k=1)
        upper_triangle = corr_matrix[mask]
        
        # Calculate average correlation
        avg_corr = np.nanmean(upper_triangle)
        avg_correlations.append(avg_corr)
        dates.append(returns.index[i])
    
    return pd.Series(avg_correlations, index=dates)

# Calculate correlations
implied_corr = calculate_implied_correlation(index_vol.iloc[:, 0], stock_vols, weights)
realized_corr = calculate_realized_correlation(stock_returns)

# Make sure they're aligned
common_idx = implied_corr.index.intersection(realized_corr.index)
implied_corr = implied_corr.loc[common_idx]
realized_corr = realized_corr.loc[common_idx]


In [36]:
realized_corr

2019-01-22    0.596614
2019-01-23    0.595163
2019-01-24    0.574804
2019-01-25    0.503962
2019-01-28    0.372128
                ...   
2025-02-26    0.116221
2025-02-27    0.111575
2025-02-28    0.111188
2025-03-03    0.125135
2025-03-04    0.124430
Length: 1538, dtype: float64

In [None]:
# Step 3: Build trading strategy
threshold = 0.1
# Generate dispersion signal - 1 when implied > realized + threshold
dispersion_signal = ((implied_corr - realized_corr) > threshold).astype(int)

# Initialize P&L series
dispersion_pnl = pd.Series(0.0, index=dispersion_signal.index[:-1])

# Calculate P&L - fixing the error in your code
for i in range(len(dispersion_signal) - 1):
    current_date = dispersion_signal.index[i]
    next_date = dispersion_signal.index[i+1]
    
    # Important fix: use .iloc[i] to get the scalar value, not a Series
    signal_value = dispersion_signal.iloc[i]
    
    if signal_value == 1:
        # We're in a dispersion trade
        dispersion_pnl.loc[current_date] = implied_corr.loc[current_date] - realized_corr.loc[next_date]
    else:
        dispersion_pnl.loc[current_date] = 0

# Calculate cumulative P&L
cum_pnl = dispersion_pnl.cumsum()

# Visualize results
plt.figure(figsize=(14, 10))

# Plot 1: Implied vs Realized Correlation
plt.subplot(3, 1, 1)
plt.plot(implied_corr, label='Implied Correlation')
plt.plot(realized_corr, label='Realized Correlation')
plt.title('Implied vs Realized Correlation')
plt.legend()
plt.grid(True)

# Plot 2: Dispersion Trading Signal
plt.subplot(3, 1, 2)
plt.plot(dispersion_signal)
plt.title('Dispersion Trading Signal (1 = Trade)')
plt.grid(True)

# Plot 3: Cumulative P&L
plt.subplot(3, 1, 3)
plt.plot(cum_pnl)
plt.title('Cumulative P&L')
plt.grid(True)

plt.tight_layout()
plt.show()

# Calculate performance metrics
total_trades = dispersion_signal.sum()
winning_trades = (dispersion_pnl > 0).sum()
win_rate = winning_trades / total_trades if total_trades > 0 else 0
avg_win = dispersion_pnl[dispersion_pnl > 0].mean() if winning_trades > 0 else 0
avg_loss = dispersion_pnl[dispersion_pnl < 0].mean() if (dispersion_pnl < 0).sum() > 0 else 0
profit_factor = abs(dispersion_pnl[dispersion_pnl > 0].sum() / dispersion_pnl[dispersion_pnl < 0].sum()) if dispersion_pnl[dispersion_pnl < 0].sum() != 0 else float('inf')

print(f"Dispersion Trading Backtest Results:")
print(f"Total Trades: {total_trades}")
print(f"Win Rate: {win_rate:.2%}")
print(f"Average Win: {avg_win:.4f}")
print(f"Average Loss: {avg_loss:.4f}")
print(f"Profit Factor: {profit_factor:.2f}")
print(f"Total P&L: {cum_pnl.iloc[-1]:.4f}")