In [1]:
import pandas as pd
from datetime import datetime
import os
import torch
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
from functions import *
from scipy.optimize import minimize

In [31]:
# ticker_list = ['REE', 'SAM', 'HAP', 'GMD', 'GIL', 'TMS', 'SAV', 'DHA', 'MHC', 'HAS'] # 10 stocks with the most observations
ticker_list = ['REE', 'SAM', 'HAP'] # 3 stocks with the most observations
limits = {
    'hose':0.07,
    'hnx':0.1,
    'upcom':0.15
}


In [32]:
# Read and merge into 1 dataset

if "stock_data.csv" in os.listdir("data"):
    merged_df = pd.read_csv(
        os.path.join("data", "stock_data.csv"),
        index_col=None
    ).assign(
        date = lambda df : pd.to_datetime(df["date"])
    )
else:
    # Read and merge data
    hnx = pd.read_csv(os.path.join("data", "CafeF.HNX.Upto31.07.2025.csv")).assign(
        floor = "hnx"
    )
    hsx = pd.read_csv(os.path.join("data", "CafeF.HSX.Upto31.07.2025.csv")).assign(
        floor = "hose"
    )
    upcom = pd.read_csv(os.path.join("data", "CafeF.UPCOM.Upto31.07.2025.csv")).assign(
        floor = "upcom"
    )
    indexes = pd.read_csv(os.path.join("data", "CafeF.INDEX.Upto06.08.2025.csv")).assign(
        floor = "index"
    )

    # Rename columns
    hnx, hsx, upcom, indexes = [
        df.rename(columns={
            "<Ticker>":"ticker",
            "<DTYYYYMMDD>":"date",
            "<Open>":"open",
            "<High>":"high",
            "<Low>":"low",
            "<Close>":"close",
            "<Volume>":"volume"
        }) for df in [hnx, hsx, upcom, indexes]
    ]
        
    # Merge and clean data
    # UPCOM has missing tickers for some reason
    merged_df = pd.concat(
        [hnx, hsx, upcom, indexes],
        axis=0
    ).reset_index(drop=True).dropna(subset="ticker")\
    .assign(
        date=lambda df : df["date"].astype(str).apply(lambda x: datetime.strptime(x, "%Y%m%d").date())
    )
    merged_df.to_csv(
        os.path.join("data", "stock_data.csv"),
        index=False
    ) # Save merged data to save time in future runs


# Data cleaning and merging

data = merged_df[["date", "ticker", "floor", "close"]].sort_values(["ticker", "date"]).assign(
    returns = lambda df : df.groupby("ticker")["close"].pct_change(),
    log_returns_pct = lambda df : np.log(df["close"] / df.groupby("ticker")["close"].shift(1))*100
)

data = data.loc[data["ticker"].str.len()==3] # Eliminate ETF, and indeces

data["limit"] = data["floor"].map(limits)
outliers = data.loc[data["returns"].abs() > data["limit"]]
clean_df = data.drop(outliers.index) # Remove outliers
print(f"% of observations removed: {round((len(outliers)/len(data))*100, 2)}%")

# NOTE: try out different samples of stocks
pivoted_df = clean_df.pivot_table(values="returns", index="date", columns="ticker") # Pivot data for better usability
pivoted_df = pivoted_df[ticker_list].dropna()

display(pivoted_df.describe())
train_df, test_df = split_train_test(pivoted_df)

% of observations removed: 1.05%


ticker,REE,SAM,HAP
count,5951.0,5951.0,5951.0
mean,0.001091,0.000699,0.00077
std,0.021411,0.023733,0.024894
min,-0.069971,-0.069999,-0.069963
25%,-0.009689,-0.01162,-0.012434
50%,0.0,0.0,0.0
75%,0.011761,0.012037,0.012855
max,0.069962,0.069919,0.069927


In [33]:
def negative_log_likelihood(params, returns):
    """
    Negative log-likelihood (Gaussian QML) for a single return series. Used to find the optimize parameters
    """

    # omega, alpha, beta = params
    # Try fixing omega to prevent covariance exploding
    _, alpha, beta = params
    omega = np.var(returns)


    if (omega <= 0) or (alpha < 0) or (beta < 0) or (alpha + beta >= 0.9999): # Check condition omega > 0, alpha, beta > 0 and alpha + beta < 1
        return np.inf 
    
    n_period = returns.size
    variances = np.empty(n_period) # Array of variance

    variance_0 = np.var(returns) if np.var(returns) > 1e-12 else 1.0 # To ensure positive definiteness

    variances[0] = variance_0 # Use sample variance as the first variance

    for t in range(1, n_period):
        # Diagonal matrix of variances
        variances[t] = omega + alpha*returns[t-1]**2 + beta*variances[t-1] # Univariate GARCH
        if not np.isfinite(variances[t]) or variances[t] <= 1e-16:
            return np.inf # Ensure positive definiteness
        
    log_likelihood = -0.5 * (np.log(2*np.pi) + np.log(variances) + (returns**2)/variances)

    return -np.sum(log_likelihood)


def univariate_garch(returns: np.ndarray, x0=(1e-6, 0.05, 0.9)):
    """
    Fit Univariate GARCH
    Args:
        returns: np.array of return
        x0: inital parameters
    Return:
        DCC input parameters
        "omega":omega,
        "alpha":alpha,
        "beta":beta,
        "variances": variances,
        "residuals":resid_standardized,
        "success":ugarch.success
    """

    returns = np.asanyarray(returns).astype(float)
    returns = returns - np.mean(returns) # Demean return
    
    bounds = [
        (1e-12, None), # Must be positive
        (0.0, 1.0), # Can be semipositive
        (0.0, 1.0) # Can be semipositive
    ]
    constraints = (
        {
            "type":'ineq',
            "fun": lambda p: 0.999 - (p[1] + p[2]) # Ensure alpha + beta < 1
        },
    )
    
    ugarch = minimize(
        negative_log_likelihood, x0,
        args=(returns,),
        bounds=bounds,
        constraints=constraints,
        method="SLSQP"
    )

    omega, alpha, beta = ugarch.x

    # Conditional variances and standardized residuals
    n_period = returns.size
    variances = np.empty(n_period) # Initialize diagonal matrix of variances
    variances[0] = np.var(returns) if np.var(returns) > 1e-12 else 1.0 # Assign first variance
    for t in range(1, n_period):    
        # Univariate GARCH
        variances[t] = omega + alpha*returns[t-1]**2 + beta*variances[t-1]
    
    resid_standardized = returns / np.sqrt(np.clip(variances, 1e-12, None)) # Standardize residuals

    return {
        "omega":omega,
        "alpha":alpha,
        "beta":beta,
        "variances": variances,
        "residuals":resid_standardized,
        "success":ugarch.success
    }


def fit_univariate_garch(df:pd.DataFrame):
    """
    Fit GARCH(1, 1) for each stock column
    Args:
        df: pd.DataFrame. Should be cleaned off NA
    """

    n_period, n_asset = df.shape # Get period length and number of asset
    variance_mtrx = np.zeros((n_period, n_asset)) # Initalize matrix of conditional variances for each asset
    residual_mtrx = np.zeros((n_period, n_asset)) # Initialize matrix of standardized residuals for each assset
    params = {}

    # Fit Univariate GARCH to each stock returns
    for i, col in enumerate(df.columns): 
        ugarch = univariate_garch(df[col].values)
        params[col] = {
            key:ugarch[key] for key in ["omega", "alpha", "beta", "success"]
        }
        variance_mtrx[:, i] = ugarch["variances"]
        residual_mtrx[:, i] = ugarch["residuals"]

    return variance_mtrx, residual_mtrx, params # D_t^2, eps_t, params

# DCC estimation
def dcc_NLL(params, residuals):
    '''
    Return negative correlation log-likelihood for DCC(1,1). Residuals is a matrix of TxM
    ''' 
    alpha, beta = params
    if (alpha < 0) or (beta < 0) or (alpha + beta >= 0.9999): # conditions
        return np.inf
    
    n_period, n_asset = residuals.shape
    # Unconditional correlation of residuals
    S = np.corrcoef(residuals.T) # Initialize correlation matrix between assets return residuals aka unconditional correlation matrix of the standardized residuals

    # Initialize Q with S
    Q = S.copy()
    NLL = 0.0
    for t in range(n_period):
        # Update Q_t (if t>=1 use residual[t-1], if t=0, use previous Q)
        if t > 0:
            prev_resid = residuals[t-1:t, :].T # Matrix M x 1
            # Correlation matrix of residuals
            Q = (1-alpha-beta)*S + alpha*(prev_resid@prev_resid.T) + beta*Q # DCC estimator

        # Get diagonal matrix of conditional standard deviation
        D_t = np.sqrt( 
            np.clip(
                np.diag(Q),
                1e-12, # Ensure no division by zero
                None
            )
        )

        # Correlation matrix of the standardized residuals at time t
        R_t = np.diag(1.0/D_t) @ Q @ np.diag(1.0/D_t) # R_t = D_t^{-1} * H_t * D_t^{-1} (Engel, 2002)

        resid = residuals[t]

        try:
            # Solve R_t * x = e
            sol = np.linalg.solve(R_t, resid) # solve R_t * x = resid
            quadratic = resid @ sol
            sign, logdet = np.linalg.slogdet(R_t) # Returns the sign and the natural log of determinant of R_t
            if sign <= 0:
                return np.inf
        except np.linalg.LinAlgError:
            return np.inf
        
        # Correlation loglikelihood contribution up to constant
        NLL += 0.5 * (logdet + quadratic)
    
    return NLL


def fit_dcc(residuals, x0=(0.2, 0.97-0.02)):
    """
    Fit DCC(1,1) by minimizing negative log-likelihood
    """

    residuals = np.asarray(residuals)
    bounds = [
        (1e-8, 0.999999),
        (1e-8, 0.999999)
    ] # NOTE: optimize this part

    constraints = (
        {
            "type":"ineq",
            "fun": lambda p: 0.9999 - (p[0] + p[1])
        },
    )

    dcc = minimize(
        dcc_NLL,
        (0.05, 0.9),
        args=(residuals, ),
        method="SLSQP",
        bounds=bounds,
        constraints=constraints
    )

    alpha, beta = dcc.x
    # Reconstruct Q_t and R_t paths
    n_period, n_asset = residuals.shape
    S = np.corrcoef(residuals.T)
    Q = S.copy()
    Q_list, R_list = [], []
    for t in range(n_period):
        if t > 0:
            prev_resid = residuals[(t-1):t, :].T
            Q = (1-alpha-beta)*S + alpha*(prev_resid@prev_resid.T) + beta*Q
        diag_std = np.sqrt(
            np.clip(
                np.diag(Q),
                1e-12,
                None
            )
        )
        R = np.diag(1.0/diag_std) @ Q @ np.diag(1.0/diag_std)
        Q_list.append(Q.copy())
        R_list.append(R.copy())
    
    return {
        "a":alpha,
        "b":beta,
        "Qt":Q_list,
        "Rt":R_list,
        "S":S,
        "success":dcc.success
    }
# Build covariance matrix H_t and forecast

def build_covmatrix(var_matrix, R_list):
    '''
    Args:
        var_matrix: T x M matrix of conditional variances from Univariate GARCH
        R_list: List of correlation matrices from DCC
    Return list of H_t = D_t * R_t * D_t from univariate GARCH and DCC R_t
    '''
    n_period, n_asset = var_matrix.shape
    covmatrix_list = []
    for t in range(n_period):
        D = np.diag(
            np.sqrt(var_matrix[t, :])
        ) # diagonal matrix of conditional standard deviation
        cov_matrix = D @ R_list[t] @ D # Conditional covariance matrix
        covmatrix_list.append(cov_matrix)
    
    return covmatrix_list


def forecast_dcc_multi_step(
        h_last, r_last, garch_params,
        eps_last, Q_last, dcc_params, S,
        horizon=20
    ):
    """
    Multi-step forecast of conditional covariance matrices under DCC-GARCH(1,1).

    Args:
        h_last : (M,) last conditional variances
        r_last : (M,) last observed returns
        garch_params : dict of {asset: {'omega','alpha','beta'}}
        eps_last : (M,) last standardized residuals
        Q_last : (M,M) last Q matrix from DCC recursion
        dcc_params : dict {'a':..., 'b':...}
        S : (M,M) unconditional correlation matrix of eps
        horizon : int, number of steps ahead

    Returns
        H_path : list of (M,M) covariance forecasts
        h_path : (horizon, M) variance forecasts
        R_path : list of (M,M) correlation forecasts
        Q_path : list of (M,M) Q matrices
    """
    M = len(h_last)
    a, b = dcc_params["a"], dcc_params["b"]

    # ---------- Step 1: variance forecasts ----------
    h_path = np.empty((horizon, M))

    # 1-step-ahead: needs actual r_last
    for j, (name, p) in enumerate(garch_params.items()):
        omega, alpha, beta = p['omega'], p['alpha'], p['beta']
        h_path[0, j] = omega + alpha * (r_last[j]**2) + beta * h_last[j]

    # Multi-step expectation (replace r^2 with expected h)
    for k in range(1, horizon):
        for j, (name, p) in enumerate(garch_params.items()):
            omega, alpha, beta = p['omega'], p['alpha'], p['beta']
            phi = alpha + beta
            h_path[k, j] = omega + phi * h_path[k-1, j]

    # ---------- Step 2: correlation forecasts ----------
    Q_path, R_path = [], []
    
    # 1-step-ahead
    Q_next = (1 - a - b) * S + a * np.outer(eps_last, eps_last) + b * Q_last
    Q_path.append(Q_next.copy())
    dq = np.sqrt(np.clip(np.diag(Q_next), 1e-12, None))
    R_path.append(Q_next / np.outer(dq, dq))

    # 2..H: use expected recursion (E[eps eps'] ~ S)
    for k in range(1, horizon):
        Q_next = (1 - a - b) * S + a * S + b * Q_path[-1]   # simplifies to S + b*(Q_{k-1}-S)
        Q_path.append(Q_next.copy())
        dq = np.sqrt(np.clip(np.diag(Q_next), 1e-12, None))
        R_path.append(Q_next / np.outer(dq, dq))

    # ---------- Step 3: combine into H ----------
    H_path = []
    for k in range(horizon):
        D_k = np.diag(np.sqrt(h_path[k, :]))
        H_path.append(D_k @ R_path[k] @ D_k)

    return H_path, h_path, R_path, Q_path

def forecast_dcc_one_step(residuals, dcc_fit):
    '''
    One-step ahead forecast using last residual and last covariance matrix of standardized residuals (not a true correlation matrix)
    '''
    alpha, beta = dcc_fit["a"], dcc_fit["b"]
    Q_last = dcc_fit["Qt"][-1].copy()
    S = np.corrcoef(residuals.T)
    resid = residuals[-1][:, None] # Mx1
    Q_forecast = (1-alpha-beta)*S + alpha*(resid@resid.T) + beta*Q_last
    diag_std = np.sqrt(
        np.clip(
            np.diag(Q_forecast),
            1e-12, 
            None
        )
    )
    R_forecast = np.diag(1.0/diag_std) @ Q_forecast @ np.diag(1.0/diag_std)

    return Q_forecast, R_forecast

def forecast_H_one_step(h_last, garch_params, r_last):
    '''
    One-step ahead diagonal vol forecast
    '''
    n_asset = len(h_last)
    variance_forecast = np.empty(n_asset) # initialize variance
    for j, (name, p) in enumerate(garch_params.items()):
        omega, alpha, beta = p["omega"], p["alpha"], p["beta"]
        variance_forecast[j] = omega + alpha*(r_last[j]**2) + beta*h_last[j]

    return variance_forecast


In [34]:
train_df = train_df.astype(float)
h_mat, eps_mat, garch_params = fit_univariate_garch(train_df) # Fit univariate GARCH for each stock
dcc = fit_dcc(eps_mat) # Fit DCC(1,1) on standardized residuals

covmatrix_list = build_covmatrix(h_mat, dcc["Rt"]) # Get list of conditinoal covariance

In [35]:

# one step forecast
Q_forecast, R_forecast = forecast_dcc_one_step(eps_mat, dcc)
h_forecast = forecast_H_one_step(
    h_last=h_mat[-1], 
    garch_params=garch_params,
    r_last=train_df.values[-1]
)
D_forecast = np.diag(
    np.sqrt(h_forecast)
)
covmatrix_forecast = D_forecast @ R_forecast @ D_forecast

In [36]:
# Multistep forecast

H_path, h_path, R_path, Q_path = forecast_dcc_multi_step(
    h_last=h_mat[-1],
    r_last=train_df.values[-1],
    garch_params=garch_params,
    eps_last=eps_mat[-1],
    Q_last=dcc["Qt"][-1],
    dcc_params=dcc,
    S=dcc["S"]
)

In [43]:
minimum_variance_portfolio(
    covariance_matrix=H_path[-1],
    data=train_df
)

{'REE': 0.3333333333333333,
 'SAM': 0.3333333333333333,
 'HAP': 0.3333333333333333}