In [40]:
import yfinance as yf
import pandas as pd
import os
import numpy as np


# data prep

In [41]:
def get_sp500_adjusted_close(start_date: str, end_date: str) -> pd.DataFrame:
    """
    Retrieve daily adjusted closing prices for all S&P 500 constituents 
    between the given start and end dates using yfinance.
    Data is cached locally in /data folder for efficiency.

    Parameters:
        start_date (str): Start date in 'YYYY-MM-DD' format.
        end_date (str): End date in 'YYYY-MM-DD' format.

    Returns:
        pd.DataFrame: A DataFrame with dates as index and tickers as columns, 
                      containing adjusted closing prices.
    """
    # Define cache path
    os.makedirs("data", exist_ok=True)
    cache_path = f"data/sp500_adjclose_{start_date}_to_{end_date}.csv"

    # If cache exists, load and return
    if os.path.exists(cache_path):
        print(f"Loading cached data from {cache_path}")
        return pd.read_csv(cache_path, index_col=0, parse_dates=True)

    print("Cache not found. Downloading data from yfinance...")

    # Get the latest list of S&P 500 constituents from Wikipedia
    table = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')
    sp500_table = table[0]
    tickers = sp500_table['Symbol'].tolist()
    
    # Replace dot with dash for yfinance compatibility (e.g., BRK.B -> BRK-B)
    tickers = [ticker.replace('.', '-') for ticker in tickers]

    # Download data
    data = yf.download(
        tickers,
        start=start_date,
        end=end_date,
        progress=False,
        group_by='ticker',
        auto_adjust=False,
        threads=True
    )

    # Extract only adjusted close prices
    adj_close_df = pd.DataFrame({
        ticker: data[ticker]['Adj Close']
        for ticker in tickers
        if (ticker in data.columns.get_level_values(0)) and ('Adj Close' in data[ticker])
    })

    # Save to cache
    adj_close_df.to_csv(cache_path)
    print(f"Data saved to {cache_path}")

    return adj_close_df

sp500 = get_sp500_adjusted_close('2010-01-01', '2025-04-14')
sp500.index=pd.to_datetime(sp500.index)
sp500=sp500.pct_change().iloc[1:,:]

# sp500 daily simplre return
sp500.head()

Loading cached data from data/sp500_adjclose_2010-01-01_to_2025-04-14.csv


Unnamed: 0_level_0,MMM,AOS,ABT,ABBV,ACN,ADBE,AMD,AES,AFL,A,...,WMB,WTW,WDAY,WYNN,XEL,XYL,YUM,ZBRA,ZBH,ZTS
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2010-01-05,-0.006264,-0.012777,-0.00808,,0.00618,0.016446,0.001031,-0.010241,0.029009,-0.010862,...,0.012494,-0.002242,,0.060819,-0.01186,,-0.00342,-0.001744,0.031656,
2010-01-06,0.014182,0.000454,0.005553,,0.010631,-0.002122,-0.014418,-0.010348,0.008785,-0.003554,...,0.032449,0.016099,,-0.013117,0.00192,,-0.007149,-0.007687,-0.000323,
2010-01-07,0.000717,0.002951,0.008284,,-0.000935,-0.019405,-0.010449,0.000747,0.010733,-0.001296,...,-0.007083,-0.002948,,0.021356,-0.004312,,-0.000288,-0.025,0.02294,
2010-01-08,0.007047,0.014709,0.005113,,-0.003978,-0.005422,-0.004224,0.028359,-0.010018,-0.000325,...,0.008025,-0.001109,,-0.007165,0.000481,,0.000288,-0.00325,-0.021004,
2010-01-11,-0.004032,-0.004683,0.005086,,-0.00094,-0.013083,-0.030753,0.006531,0.02631,0.00065,...,-0.006192,0.009988,,-0.00324,0.00962,,0.017281,0.003261,0.0221,


# 1. percentile method

In [42]:
def tail_risk_percentile(weights: np.array, significance_level: float = 0.05, data: pd.DataFrame =sp500):
    """
    使用时记得将weights的顺序与data中columns的顺序对其
    """
    assert weights.shape[0] == len(data.columns), "Dimension mismatch"

    # use 0 to fill all the missing returns of individual stocks
    # assuming there is no profit/loss for the stock at that day
    data = data.fillna(0)

    # use broadcasting rule
    portfolio_ret = np.sum(np.array(data) * weights, axis=1)

    significance_level = significance_level * 100
    var = np.percentile(portfolio_ret, significance_level)

    index = np.where(portfolio_ret <= var)
    es = np.mean(portfolio_ret[index])

    return var, es


In [43]:
# test

stocks = list(sp500.columns)
weights = np.ones(len(stocks)) / len(stocks)

np.isclose(sum(weights), 1), len(stocks)

(np.True_, 503)

In [44]:
var, es = tail_risk_percentile(weights)
var, es

(np.float64(-0.015777328381941257), np.float64(-0.02582830163225229))

# 2. assume normal distribution

In [45]:
from scipy import stats

def tail_risk_normal(weights: np.array, mu: np.array, sigma: np.array, 
                       significance_level: float = 0.05):
    """
    assume:
    w @ r is Normal(w.T @ mu, w.T @ sigma @ w)

    weights: shape (n,)
    mu: shape (n, 1)
    sigma: shape (n, n)

    """
    assert weights.shape[0] == mu.shape[0] == sigma.shape[0] == sigma.shape[1], "Dimension mismatch"

    n = mu.shape[0]
    mu = mu.reshape(n, 1)
    w = weights.reshape(n, 1)

    # Portfolio mean and standard deviation
    portfolio_mean = w.T @ mu
    portfolio_std = np.sqrt(w.T @ sigma @ w)
    
    # Value at Risk
    var = stats.norm.ppf(significance_level) * portfolio_std + portfolio_mean
    
    # Expected Shortfall
    es = -1 * stats.norm.pdf(stats.norm.ppf(significance_level)) / significance_level
    es = es * portfolio_std + portfolio_mean
    
    var = var.flatten()[0]
    es = es.flatten()[0]
    return var, es

In [46]:
# test

mu = np.array(sp500.mean())
sigma = np.array(sp500.cov())

tail_risk_normal(weights, mu, sigma)

(np.float64(-0.01841526282980817), np.float64(-0.02327093800432498))

In [47]:
# from scipy import stats
# import numpy as np

# def tail_risk_lognormal(weights: np.array, mu: np.array, sigma: np.array, 
#                         significance_level: float = 0.05):
#     """
#     assume:
#     log(1 + r) is Normal(mu, sigma)
#     注意这里mu和sigma对应的是log ret, 而不是ret

#     then we have:
#     log(1 + w @ r) is approximately Normal(w.T @ mu, w.T @ sigma @ w)
    
#     Parameters:
#     weights: shape (n,)
#     mu: shape (n, 1) - mean of log returns
#     sigma: shape (n, n) - covariance of log returns
#     significance_level: probability level (e.g., 0.05 for 95% confidence)
    
#     Returns:
#     var: Value at Risk
#     es: Expected Shortfall
#     """
#     assert weights.shape[0] == mu.shape[0] == sigma.shape[0] == sigma.shape[1], "Dimension mismatch"

#     n = mu.shape[0]
#     mu = mu.reshape(n, 1)
#     w = weights.reshape(n, 1)

#     # Portfolio parameters for log-normal distribution
#     portfolio_mu = w.T @ mu
#     portfolio_var = w.T @ sigma @ w
    
#     # For log-normal, quantile is exp(mu + sigma * norm.ppf(p))
#     log_quantile = portfolio_mu + np.sqrt(portfolio_var) * stats.norm.ppf(significance_level)
#     var = np.exp(log_quantile) - 1
    
#     # Expected Shortfall for log-normal
#     z_alpha = stats.norm.ppf(significance_level)
#     es = (np.exp(portfolio_mu + 0.5 * portfolio_var) * 
#               stats.norm.cdf(z_alpha - np.sqrt(portfolio_var)) / 
#               significance_level - 1)
    
#     # Convert to scalars
#     var = var.item()
#     es = es.item()
    
#     return var, es

In [48]:
# #test

# sp500_log = np.log(sp500 + 1)
# mu = np.array(sp500_log.mean())
# sigma = np.array(sp500_log.cov())
# tail_risk_lognormal(weights, mu, sigma)

# 3. Chebyshev Bound and Worst-Case VaR (based on mu and sigma upper and lower bound)

In [49]:
## using cvxpy

import numpy as np
from scipy import stats
import cvxpy as cp
import time


def worst_case_var(returns:pd.DataFrame,
                   confidence_level:float = 0.05, 
                   n_bootstrap:int = 1000):
    """
    use bootstraping to gte confidence intervals for stock mean_returns and covariacne matrix

    """
    # bootstraping to get intervals for mean and covariance
    #################################################################################
    #################################################################################
    # 1. Load historical returns data
    returns = np.array(returns.fillna(0))
    n_assets = returns.shape[1]
    T = returns.shape[0]

    # 2. Bootstrap sampling
    print(f'begin bootstarping with confidence level: {confidence_level}, number of iter: {n_bootstrap}')
    start_time_bootstrap = time.time()

    bootstrap_means = np.zeros((n_bootstrap, n_assets))
    bootstrap_covs = np.zeros((n_bootstrap, n_assets, n_assets))

    for b in range(n_bootstrap):
        # Sample with replacement
        indices = np.random.choice(T, size=T, replace=True)
        bootstrap_sample = returns[indices, :]
        
        # Calculate mean and covariance
        bootstrap_means[b] = np.mean(bootstrap_sample, axis=0)
        bootstrap_covs[b] = np.cov(bootstrap_sample, rowvar=False)


    # 3. Calculate confidence intervals (95%)
    alpha = confidence_level
    mu_lower = np.percentile(bootstrap_means, alpha/2*100, axis=0)
    mu_upper = np.percentile(bootstrap_means, (1-alpha/2)*100, axis=0)

    Sigma_lower = np.zeros((n_assets, n_assets))
    Sigma_upper = np.zeros((n_assets, n_assets))
    for i in range(n_assets):
        for j in range(n_assets):
            Sigma_lower[i,j] = np.percentile(bootstrap_covs[:, i, j], alpha/2*100)
            Sigma_upper[i,j] = np.percentile(bootstrap_covs[:, i, j], (1-alpha/2)*100)
    
    end_time_bootstrap = time.time()

    print(f"bootstrap running time: {end_time_bootstrap-start_time_bootstrap}")

    #################################################################################
    #################################################################################
    

    #Optimization section starts here
    #################################################################################
    #################################################################################
    # 4. Set up and solve the SDP for worst-case VaR
    epsilon = 0.05  # For 95% VaR
    y_epsilon = np.sqrt((1-epsilon)/epsilon)

    # Define variables
    Q_plus = cp.Variable((n_assets, n_assets), symmetric=True)
    Q_minus = cp.Variable((n_assets, n_assets), symmetric=True)
    u_plus = cp.Variable(n_assets)
    u_minus = cp.Variable(n_assets)
    v = cp.Variable()

    # Initial portfolio weights
    # Equal weight initial portfolio
    w = cp.Parameter(n_assets)
    w.value = np.ones(n_assets) / n_assets  
    w_col = cp.reshape(w, (n_assets, 1))

    mat = [[Q_plus - Q_minus, w_col/2], 
                [w_col.T/2, cp.reshape(v, (1, 1))]]

    # Constraints
    constraints = [
        Q_plus >= 0,
        Q_minus >= 0,
        u_plus >= 0,
        u_minus >= 0,
        cp.bmat(mat) >> 0,
        # cp.lambda_min(mat) >= 0,
        u_minus - u_plus == w
    ]

    # Objective function
    objective = cp.Minimize(
        cp.trace(Q_plus @ Sigma_upper) - cp.trace(Q_minus @ Sigma_lower) + 
        y_epsilon**2 * v + u_plus.T @ mu_upper - u_minus.T @ mu_lower
    )

    # Solve the problem
    problem = cp.Problem(objective, constraints)
    problem.solve(solver=cp.SCS)

    # The worst-case VaR for the current portfolio
    worst_case_var = problem.value

    #################################################################################
    #################################################################################

    return worst_case_var


# testing

In [None]:
test_data = sp500.iloc[:, :100]
test_data = test_data.iloc[0:251, :]
test_data

Unnamed: 0_level_0,MMM,AOS,ABT,ABBV,ACN,ADBE,AMD,AES,AFL,A,...,CBRE,CDW,COR,CNC,CNP,CF,CRL,SCHW,CHTR,CVX
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2023-04-13,0.017055,0.004999,0.015287,0.011251,0.005321,0.024088,-0.002599,0.009087,0.004850,0.021104,...,0.013030,0.005955,0.011849,0.032450,0.001304,-0.000259,0.020499,-0.008282,0.012672,0.013367
2023-04-14,-0.003580,0.011605,0.003378,-0.006761,-0.021103,0.001531,-0.003692,-0.026625,-0.001358,-0.014722,...,0.000989,-0.011891,-0.001494,-0.018599,-0.010094,0.000519,-0.009290,-0.013983,-0.017634,0.002034
2023-04-17,0.009549,0.006704,0.005387,-0.002537,0.002614,-0.003848,-0.020490,-0.021319,0.005437,0.007112,...,0.020757,0.010484,0.001496,0.009255,0.007895,0.012065,0.023860,0.039393,0.013734,-0.008815
2023-04-18,-0.004776,0.014355,-0.003540,-0.009865,-0.000072,-0.000979,-0.001001,0.003699,0.004807,-0.008489,...,-0.004288,0.007252,0.000179,0.003348,-0.013381,-0.037816,-0.012131,0.023309,-0.008290,-0.002340
2023-04-19,-0.001788,-0.009629,0.078157,0.010151,-0.000214,0.006781,0.001782,0.003276,0.000747,-0.000503,...,0.003751,-0.132272,-0.017502,-0.031191,0.009262,-0.008793,-0.015290,0.028704,0.001019,0.000938
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2024-04-05,0.005412,0.004590,0.009899,0.012507,0.007656,-0.004085,0.027679,-0.003308,0.012565,0.019669,...,0.007123,0.010493,0.004875,0.010132,-0.001779,-0.012416,0.009734,0.007568,-0.013789,0.005663
2024-04-08,0.009887,-0.006511,-0.006115,-0.001176,-0.003604,-0.001732,-0.003051,0.012168,-0.003278,0.002359,...,0.003849,-0.003644,-0.011759,0.002748,0.006061,-0.062507,0.007191,0.006677,0.020786,-0.002042
2024-04-09,0.007397,-0.005749,0.017372,0.002002,0.007987,0.017077,0.005180,-0.000546,-0.023138,0.020352,...,-0.003212,-0.000039,-0.007447,-0.003700,0.002126,0.016669,0.008748,-0.001244,-0.005823,0.004526
2024-04-10,0.000216,-0.010871,-0.011295,-0.006524,-0.026294,-0.010821,-0.021314,-0.033352,-0.003006,-0.021981,...,-0.050624,-0.027414,-0.003228,-0.006326,-0.032178,-0.002835,-0.037566,-0.016325,-0.031717,0.004136


In [63]:
mu = np.array(test_data.mean())
sigma = np.array(test_data.cov())
stocks = list(test_data.columns)
weights = weights = np.ones(len(stocks)) / len(stocks)

var, es = tail_risk_normal(weights, mu, sigma)

# print(f'Estimatation of mu: {mu}')
# print(f'Estimation of sigma: ')

print(f'VaR using Normal distribution: {var}')
print(f'ES using Normal distribution: {es}')

VaR using Normal distribution: -0.01227622887034068
ES using Normal distribution: -0.015592368926353236


In [64]:
var_worst = worst_case_var(test_data)

print(f'worst-case VaR: {var_worst}')

begin bootstarping with confidence level: 0.05, number of iter: 1000
bootstrap running time: 1.0460007190704346
worst-case VaR: 0.04521922434005129




In [65]:
val_data = sp500.iloc[:, :100]
val_data = val_data.iloc[-251:, :]

tail_risk_percentile(weights, data=val_data)

(np.float64(-0.014702465226598857), np.float64(-0.02590801909833544))