In [1]:
import numpy as np
import pandas as pd
import yfinance as yf
from scipy.stats import norm
from scipy.linalg import cholesky

In [2]:
def get_returns_data(tickers, start_date, end_date):
    """
    주식 데이터를 다운로드하고 로그 수익률을 계산하는 함수
    """
    returns_data = []
    for ticker in tickers:
        df = yf.download(ticker, start=start_date, end=end_date, progress=False)
        df["Log_Returns"] = np.log(df["Close"] / df["Close"].shift(1))
        returns_data.append(df["Log_Returns"].dropna())
    returns_df = pd.concat(returns_data, axis=1, keys=tickers)
    return returns_df

In [3]:
def cholesky_decomposition(returns_dataframe):
    """
    공분산 행렬을 계산하고 촐레츠키 분해를 수행하는 함수
    """
    cov_matrix = returns_dataframe.cov().values
    cholesky_matrix = cholesky(cov_matrix, lower=True)
    return cholesky_matrix

In [4]:
def monte_carlo_simulation(cholesky_matrix, num_assets, num_simulations, time_horizon):
    """
    몬테카를로 시뮬레이션을 수행하는 함수
    """
    random_normals = np.random.normal(size=(num_assets, num_simulations, time_horizon))
    random_normals_reshaped = random_normals.reshape(num_assets, -1)

    correlated_normals_reshaped = np.dot(cholesky_matrix, random_normals_reshaped)
    correlated_normals = correlated_normals_reshaped.reshape(num_assets, num_simulations, time_horizon)
    
    return correlated_normals

In [5]:
def calculate_category_es(tickers, returns_dataframe, correlated_normals, portfolio_data, category_map, dt, time_horizon, conf_level):
    """
    카테고리별 ES를 계산하는 함수
    """
    categories_simulated_returns = {category: [] for category in category_map.values()}
    num_simulations = correlated_normals.shape[1]

    for i, ticker in enumerate(tickers):
        mu = returns_dataframe[ticker].mean()
        sigma = returns_dataframe[ticker].std()
        liquidity_horizon = portfolio_data[ticker][0]
        category = category_map[liquidity_horizon]

        simulated_paths = np.exp((mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * correlated_normals[i]).prod(axis=1) - 1
        categories_simulated_returns[category].extend(simulated_paths)

    es_values = []
    for category in sorted(categories_simulated_returns.keys()):
        returns_list = categories_simulated_returns[category]

        if len(returns_list) == 0:
            es_values.append(0)
            continue

        sorted_losses = np.sort(returns_list)
        cutoff_index = int(num_simulations * (1 - conf_level))
        
        if cutoff_index <= len(sorted_losses):
            es_category = -np.nanmean(sorted_losses[:cutoff_index])
        else:
            es_category = 0
        
        if np.isnan(es_category):
            es_category = 0
        
        es_values.append(es_category)
    
    return es_values

In [6]:
def calculate_portfolio_es(es_values, category_map):
    """
    포트폴리오 ES를 계산하는 함수
    """
    category_to_liquidity_horizon = {value: key for key, value in category_map.items()}
    sum_value = 0
    for i in range(1, 5):
        lh_diff = max(0, category_to_liquidity_horizon.get(i+1, 0) - category_to_liquidity_horizon.get(i, 0))
        es_value = es_values[i-1]
        sum_value += (es_value * np.sqrt(lh_diff))**2
    final_es = np.sqrt(sum_value)
    return final_es

In [7]:
def main():
    # 포트폴리오 데이터 입력
    portfolio_data = {
        "SPY": [10, 0.3804], "AAPL": [10, 0.1624], "GOOG": [10, 0.1002], "XLF": [10, 0.0628],
        "XLV": [10, 0.0576], "XLE": [10, 0.0505], "IBIT": [60, 0.0421], "TSLA": [10, 0.0374],
        "SOXX": [10, 0.0338], "XLI": [10, 0.0287], "XLP": [10, 0.0242], "NFTY": [20, 0.0200]
    }
    
    category_map = {10: 1, 20: 2, 60: 3, 120: 4, 250: 5}
    conf_level = 0.975
    start_date = "2024-01-01"
    end_date = "2024-12-31"
    num_simulations = 10000
    time_horizon = 10
    dt = 1 / 252

    tickers = list(portfolio_data.keys())
    returns_df = get_returns_data(tickers, start_date, end_date)
    cholesky_matrix = cholesky_decomposition(returns_df)
    correlated_normals = monte_carlo_simulation(cholesky_matrix, len(tickers), num_simulations, time_horizon)
    es_values = calculate_category_es(tickers, returns_df, correlated_normals, portfolio_data, category_map, dt, time_horizon, conf_level)
    es = calculate_portfolio_es(es_values, category_map)

    print(f"Portfolio Expected Shortfall is {es:.4f} in 97.5% confidence level")

In [8]:
if __name__ == "__main__":
    main()

Portfolio Expected Shortfall is 0.0044 in 97.5% confidence level
