In [1]:
import numpy as np
from scipy.optimize import minimize
from scipy.stats import norm, chi2
import pandas as pd
from statsmodels.stats.diagnostic import acorr_ljungbox, het_arch
from arch import arch_model
from scipy.stats import jarque_bera, shapiro
import os 
import numdifftools as nd
import statsmodels.api as sm

In [1651]:
def return_process(return_data: pd.DataFrame) -> pd.DataFrame:
    """
    We need monthly returns at the end of each month

    :param return_data: DataFrame with date as index, stocks as columns and close price as values
    """

    return_data.index = pd.to_datetime(return_data.index, format='%Y%m%d')
    return_data = return_data.sort_index()
    return_data = return_data.ffill()
    last_ = return_data.resample('ME').last()
    first_ = return_data.resample('ME').first()
    last_ = last_.sort_index()
    first_ = first_.sort_index()
    monthly_return = (last_ - first_) / first_ 
    monthly_return = monthly_return.ffill()
    monthly_return = monthly_return.dropna(axis=0, how='all')
    monthly_return = monthly_return.dropna(axis=1, how='any')
    lower = monthly_return.quantile(0.01)
    upper = monthly_return.quantile(0.99)
    monthly_return = monthly_return.clip(lower=lower, upper=upper, axis=1)

    daily_return = return_data.pct_change()
    daily_return = daily_return.ffill()
    daily_return = daily_return.dropna(axis=0, how='all')
    daily_return = daily_return.dropna(axis=1, how='any')
    lower = monthly_return.quantile(0.01)
    upper = monthly_return.quantile(0.99)
    daily_return = daily_return.clip(lower=lower, upper=upper, axis=1)

    return monthly_return, daily_return

In [None]:
def match_data(returns: pd.DataFrame, factors_df: pd.DataFrame) -> pd.DataFrame:
    """
    Match the data of returns and factors

    :param returns: DataFrame with date as index, stocks as columns and return rates as values
    :param factors_df: DataFrame with date as index, factors as columns and factor values as values
    :return: DataFrame with same index
    """

    returns = returns.loc[(returns.index >= factors_df.index[0]) & (returns.index <= factors_df.index[-1])]
    factors_df = factors_df.loc[(factors_df.index >= returns.index[0]) & (factors_df.index <= returns.index[-1])]

    returns, factors_df = returns.align(factors_df, join='outer', axis=0)
    returns = returns.ffill()
    factors_df = factors_df.ffill()
    returns = returns.dropna()
    factors_df = factors_df.dropna()
    return returns, factors_df

In [None]:
def regression(returns: pd.Series, factors: pd.DataFrame):
    """
    Perform regression of returns on factors

    :param returns: (n, ) Series with date as index and stock returns as values
    :param factors: (n, k) DataFrame with date as index and factors as columns, 1 period lagged
    :return: tuple of regression results
    """
    X = factors.reset_index(drop=True)
    y = returns.reset_index(drop=True)
    X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    summary_df = pd.DataFrame({
        'coef': model.params,
        'p_value': model.pvalues
    })
    summary_df = summary_df.iloc[1:]

    significant = summary_df[summary_df['p_value'] < 0.05]
    return significant

In [None]:
def residual_analysis(residuals: np.ndarray, lags=20) -> pd.DataFrame:
    """
    Perform residual analysis

    :param residuals: array of residuals
    :param lags: number of lags for the tests
    :return: tuple of mean, variance and skewness
    """
    # autocorrelation test
    lb_result = acorr_ljungbox(residuals, lags=[lags], return_df=True).iloc[0]
    
    # ARCH test
    arch_test = acorr_ljungbox(residuals**2, lags=[lags], return_df=True).iloc[0]

    # normality test
    shap = shapiro(residuals)
    jb = jarque_bera(residuals)

    result_df = pd.DataFrame({
        'Test': ['Ljung-Box', 'ARCH', 'Shapiro-Wilk', 'Jarque-Bera'],
        'Stat': [lb_result['lb_stat'], arch_test['lb_stat'], shap[0], jb[0]],
        'p-value': [lb_result['lb_pvalue'], arch_test['lb_pvalue'], shap[1], jb[1]]
    })
    return result_df

In [1655]:
def garch_order(residuals: np.ndarray) -> tuple:
    """
    GARCH order selection using AIC
    
    """
    residuals = residuals 
    best_aic = np.inf
    
    best_order = (0, 0)
    for p in range(1, 3):
        for q in range(0, 3):
            if p == 0 and q == 0:
                continue
            model = arch_model(residuals, vol='Garch', p=p, q=q, o=0, rescale=False)
            results = model.fit(disp='off')
            if results.aic < best_aic:
                best_aic = results.aic
                best_order = (p, q)
    return best_order

In [None]:
def GARCH_M_FUNC(params: np.array, returns: pd.Series, factors: pd.DataFrame, p: int=1, q: int=1) -> pd.DataFrame:
    """
    GARCH-M function
    :param params: array of parameters
    :param returns: (n, ) Series with date as index and stock returns as values
    :param factors: (n, k) DataFrame with date as index and factors as columns, 1 period lagged
    :param p: order of GARCH
    :param q: order of ARCH
    :return: tuple of residuals and conditional variance
    """
    n = len(returns)
    k = factors.shape[1]

    alpha = params[0]
    beta = params[1:k+1]
    lambd = params[k+1]
    omega = params[k+2]
    a = params[k+3: k+3+p]
    b = params[k+3+p: k+3+p+q]
    if omega + sum(a) + sum(b) > 0.99:
        return None, None
    
    # todo:  simply fill the first p and q values which are not accurate.
    eps = np.zeros(n)
    mean_return = returns.mean()
    for t in range(max(p, q)):
        eps[t] = returns.iloc[t] - mean_return
    h = np.ones(n) * np.var(returns)
    for t in range(max(p,q), n):
        arch_sum = np.sum([a[i] * eps[t - i - 1]**2 for i in range(p)]) if p > 0 else 0
        garch_sum = np.sum([b[j] * h[t - j - 1] for j in range(q)]) if q > 0 else 0
        h[t] = omega + arch_sum + garch_sum

        h[t] = np.clip(h[t], 1e-6, 1e6)
        if k < 1:
            eps[t] = returns.iloc[t] - alpha - lambd * np.sqrt(h[t])
        else:
            eps[t] = returns.iloc[t] - alpha - np.dot(beta, factors.iloc[t]) - lambd * np.sqrt(h[t])
        eps[t] = np.clip(eps[t], -1e6, 1e6) 
    
    eps = eps[max(p,q):]
    h = h[max(p,q):]

    return eps, h

In [1657]:
from statsmodels.tools.numdiff import approx_hess

def compute_delta_tstats(params, likelihood_func, returns, factors_df, p, q):
    """
    Fast t-stat computation via Hessian (delta method)
    :param params: fitted parameters
    :param likelihood_func: log-likelihood function
    :return: std_errors, t_stats, p_values
    """
    try:
        hessian = approx_hess(params, lambda p_: likelihood_func(p_, returns, factors_df, p, q))
        cov_matrix = np.linalg.pinv(hessian)
        raw_vars = np.diag(cov_matrix)
        fallback = np.nanmedian(raw_vars[raw_vars > 0]) if np.any(raw_vars > 0) else 1e-4
        raw_vars = np.where(~np.isfinite(raw_vars) | (raw_vars < 1e-12), fallback, raw_vars)
        std_errors = np.sqrt(raw_vars)
        std_errors = np.clip(std_errors, 1e-6, 1e6)
        t_stats = params / std_errors
        t_stats = np.clip(t_stats, -100, 100)
        p_values = 2 * (1 - norm.cdf(np.abs(t_stats)))

        return std_errors, t_stats, p_values
    except Exception as e:
        print("Delta t-stat estimation failed:", e)
        return None, None, None

In [None]:
def garch_m_likelihood(params: np.ndarray, returns: pd.Series, factors: pd.DataFrame, p: int=1, q: int=1) -> float:
    """
    GARCH-M likelihood function
    :param params: array of parameters
    :param returns: (n, ) Series with date as index and stock returns as values
    :param factors: (n, k) DataFrame with date as index and factors as columns, 1 period lagged
    :param p: order of GARCH
    :param q: order of ARCH
    :return: log-likelihood
    """
    eps, h = GARCH_M_FUNC(params, returns, factors, p, q)
    # add punishment to make sure the model is stationary
    if eps is None:
        return 1e6
    loglik = 0
    for t in range(0, len(eps)):
        eps_sq = eps[t]**2
        eps_sq_div_ht = eps_sq / max(h[t], 1e-6)   
        eps_sq_div_ht = min(eps_sq_div_ht, 1e6)           
        log_ht = np.log(max(h[t], 1e-6))                   

        loglik += 0.5 * (log_ht + eps_sq_div_ht)
    return loglik

In [None]:
def robust_covariance(params: np.ndarray, returns: pd.Series, factors_df: pd.DataFrame, p: int=1, q: int=1) -> np.ndarray:
    """
    Compute robust covariance matrix for GARCH-M model
    :param params: array of parameters
    :param returns: (n, ) Series with date as index and stock returns as values
    :param factors_df: (n, k) DataFrame with date as index and factors as columns, 1 period lagged
    :param p: order of GARCH
    :param q: order of ARCH
    :return: covariance matrix of parameters
    """

    hess_fn = nd.Hessian(lambda p_: garch_m_likelihood(p_, returns, factors_df, p, q))
    hessian = hess_fn(params)
    hessian_inv = np.linalg.pinv(hessian)

    grad_fn = nd.Gradient(lambda p_: garch_m_likelihood(p_, returns, factors_df, p, q))
    score = grad_fn(params).reshape(-1, 1)
    S = score @ score.T
    cov_matrix = hessian_inv @ S @ hessian_inv
    return cov_matrix

In [1660]:
def wald_joint_test(params, cov_matrix, idx_list):
    sub_params = params[idx_list]
    sub_cov = cov_matrix[np.ix_(idx_list, idx_list)]
    sub_cov_inv = np.linalg.pinv(sub_cov)
    W = sub_params.T @ sub_cov_inv @ sub_params
    p_value = 1 - chi2.cdf(W, df=len(idx_list))
    return W, p_value

In [None]:
def GARCHM(returns: pd.Series, factors_df: pd.DataFrame, p: int = 1, q: int = 1) -> tuple:
    """
    GARCH-M model with multiple factors

    :param returns: DataFrame with date as index, stocks as columns and close price as values
    :param factors_df: DataFrame with date as index, factors as columns and factor values as values
    :param p: order of GARCH
    :param q: order of ARCH
    :return: DataFrame with parameters, std_errors, t_stats, p_values and significant, and DataFrame with residual analysis
    """

    k = factors_df.shape[1]
    garch_params = [0.1] + [0.1/p if p != 0 else 0] * p + [0.7/(q) if q != 0 else 0] * q
    mean_return = returns.mean()
    init_params = np.array([mean_return] + [0.1]*k + [0.1] + garch_params)
    bounds = [(None, None)] + [(None, None)] * k + [(None, None)] + [(0, None)] + [(0, 1)] * (p + q)
    
    # GARCH-M model
    res = minimize(
        garch_m_likelihood,
        init_params,
        args=(returns, factors_df, p, q),
        bounds=bounds,
        method='SLSQP',
        options={'disp': True}
    )
    params = res.x

    # Significance test
    # std_errors, t_stats, p_values = compute_delta_tstats(
    #     params, garch_m_likelihood, returns, factors_df, p, q
    # )

   
    # try:
    #     hess_fn = nd.Hessian(lambda p_: garch_m_likelihood(p_, returns, factors_df, p, q))
    #     hessian = hess_fn(params)
    
    #     if not np.all(np.isfinite(hessian)):
    #         raise ValueError("Hessian contains nan or inf.")

    #     try:
    #         cov_matrix = np.linalg.pinv(hessian)
    #     except np.linalg.LinAlgError:
    #         raise ValueError("Hessian is too singular for pinv.")

    #     print("pre cov matrix", cov_matrix)
    #     cov_matrix = np.where(np.isnan(cov_matrix), 1e-6, cov_matrix) 
    #     cov_matrix = np.clip(cov_matrix, 1e-6, 1e6)
    #     print("COV MATRIX", cov_matrix)
    #     std_errors = np.sqrt(np.diag(cov_matrix))
    
    # except Exception as e:
    #     print("Switching to fallback: eigenvalue-regularized Hessian")
    #     hess_fn = nd.Hessian(lambda p_: garch_m_likelihood(p_, returns, factors_df, p, q))
    #     hessian = hess_fn(params)
    #     eigval, eigvec = np.linalg.eigh(hessian)
    #     eigval[eigval < 1e-6] = 1e-6
    #     hessian_reg = eigvec @ np.diag(eigval) @ eigvec.T

    #     cov_matrix = np.linalg.inv(hessian_reg)
    #     print("pre cov matrix", cov_matrix)
    #     cov_matrix = np.where(np.isnan(cov_matrix), 1e-6, cov_matrix) # fix unstable cov matrix
    #     cov_matrix = np.clip(cov_matrix, 1e-6, 1e6)
    #     print("COV MATRIX", cov_matrix)
    #     std_errors = np.sqrt(np.diag(cov_matrix))
    cov_matrix = robust_covariance(params, returns, factors_df, p, q)
    cov_matrix = np.where(np.isnan(cov_matrix), 1e-6, cov_matrix) # fix unstable cov matrix
    cov_matrix = np.clip(cov_matrix, 1e-6, 1e6)
    std_errors = np.sqrt(np.diag(cov_matrix))
    std_errors = np.clip(std_errors, 1e-6, 1e6)
    t_stats = params / std_errors
    p_values = 2 * (1 - norm.cdf(np.abs(t_stats)))
    

    sector = ['intercept'] + ['factor'] * k + ['garch_m'] +\
             ['garch'] * (p + q + 1)
    # Print results
    param_names = (
    ['alpha'] + factors_df.columns.tolist() +
    ['lambda', 'omega'] +
    [f'a_{i+1}' for i in range(p)] +
    [f'b_{j+1}' for j in range(q)])


    results_df = pd.DataFrame({
        'param': param_names,
        'value': params,
        'std_error': std_errors,
        't_stat': t_stats,
        'p_value': p_values,
        'significant': p_values < 0.05,
        'sector': sector
    })

    eps, h = GARCH_M_FUNC(params, returns, factors_df, p, q)

    if eps is None:
        print("Model is not stationary.")
        return None, None
    else:
        # residual analysis
        print("Residual analysis...")
        h = np.clip(h, 1e-6, 1e6) # avoid zero and inf
        standardized_residuals = np.clip(eps / np.sqrt(h), -1e6, 1e6)
        dignose_result = residual_analysis(standardized_residuals)

    return results_df, dignose_result 

In [None]:
def read_data(stock_list=None) -> tuple:
    """
    read stocks returns and factors data
    :param stock_list: list of stocks. If None, read all stocks from ../data/raw/
    :return: DataFrame of returns and factors
    """

    factors = pd.read_csv('../data/hml.csv', index_col=0, header=0)

    factors = factors.drop("AAPL", axis=1)
    factors.index = pd.to_datetime(factors.index, format='%Y-%m-%d')
    
    lower = factors.quantile(0.01)
    upper = factors.quantile(0.99)
    factors = factors.clip(lower=lower, upper=upper, axis=1) # delete abnormal numbers
    
    # return of stock
    if stock_list is None:
        df_returns = pd.DataFrame()
        stocks = []
        for dirpath, dirnames, filenames in os.walk('../data/raw/'):
            for filename in filenames:
                if filename.endswith('.csv'):
                    stocks.append(filename.split('.')[0])
                    file_path = os.path.join(dirpath, filename)
                    df = pd.read_csv(file_path, index_col=0, header=0)
                    df.index = pd.to_datetime(df.index, format='%Y%m%d')
                    df = df[['close']]
                    df_returns = pd.concat([df_returns, df], axis=1, join='outer')
        df_returns.columns = stocks
    else:
        df_returns = pd.DataFrame()
        for stock in stock_list:
            file_path = os.path.join('../data/raw/', stock + '.csv')
            df = pd.read_csv(file_path, index_col=0, header=0)
            df.index = pd.to_datetime(df.index, format='%Y%m%d')
            df = df[['close']]
            df_returns = pd.concat([df_returns, df], axis=1, join='outer')
        df_returns.columns = stock_list


    monthly_return, daily_return = return_process(df_returns)
    
    
    # match the data
    daily_return, factors_df = match_data(daily_return, factors)
    
    return daily_return, factors_df

In [None]:
def pre_training(return_: pd.Series, factors_df: pd.DataFrame) -> tuple:
    """
    Analyze the data before training and decide order of GARCH model
    :param return_: (n, )stock return data
    :param factors_df: (n, k)factors data
    :return: tuple of garch order , residual analysis and significant factors
    """

    # regression
    # beta, residuals = regression(return_, factors_df)

    # residual analysis
    test_result = residual_analysis(return_)
    

    # if test_result[test_result['Test'] == 'ARCH']['p-value'].values[0] > 0.1:
    #     return 0, 0, test_result, []

    # GARCH order selection
    p, q = garch_order(return_)
    print(f"Best GARCH order: {p}, {q}")

    # significant factors
    if len(factors_df) < 1: # when use pure garch with no factor 
        return p, q, test_result, []
    else:
        significant = regression(return_, factors_df)
        significant_factors = significant.index.tolist()
        return p, q, test_result, significant_factors

In [None]:
def training(return_: pd.Series, factors_df: pd.DataFrame, 
             p: int=1, q: int=1, backward=True) -> tuple:
    """
    Training function
    :param returns: DataFrame with date as index, stocks as columns and close price as values
    :param factors_df: DataFrame with date as index, factors as columns and factor values as values
    :param p: GARCH order
    :param q: GARCH order
    :param backward: whether to use backward selection. True, delete the insignificant factors and rerun. False, keep all factors

    :return:
    """

    # GARCH-M model
    results_df, diagnose_result = GARCHM(return_, factors_df, p, q)
    if results_df is None:
        return None, None
    if backward:
        # backward selection
        # check factors p-value
        significant_factors = results_df[results_df['significant']]['param'].tolist()
        significant_factors = [f for f in significant_factors if f in factors_df.columns]
        if len(significant_factors) == len(factors_df.columns):
            pass
        else:
            if len(significant_factors) == 0:
                new_factor_df = pd.DataFrame()
            else:
                new_factor_df = factors_df[significant_factors]

                # rerun the model with significant factors
                results_df, diagnose_result = GARCHM(return_, new_factor_df, p, q)


    return results_df, diagnose_result

In [None]:
def predict(returns: pd.Series, train_factors: pd.DataFrame, predict_factors: pd.DataFrame, model: pd.Series, p: int=1, q: int=1, predict_period=5, )-> pd.DataFrame:
    """
    Predict the future returns using GARCH-M model
    :param returns: (n, ) stock return data
    :param train_factors: (n, k) factors data in training period
    :param predict_factors: (m, k) factors data in prediction period
    :param model: params of factor + GARCH-M model
    :param p: GARCH order
    :param q: GARCH order
    :param predict_period: number of periods to predict
    :return: DataFrame with predicted returns and volatilities
    """
    model = model.values
    # get residuals and volatility
    eps, h = GARCH_M_FUNC(model, returns, train_factors, p, q)

    # predict
    k = train_factors.shape[1]
    alpha = model[0]
    beta = model[1:k + 1]
    lambd = model[k + 1]
    omega = model[k + 2]
    a = model[k + 3: k + 3 + p]
    b = model[k + 3 + p: k + 3 + p + q]

    forecasts = []
    volatilities = []
    
    eps_extended = list(eps)
    h_extended = list(h)
    
    for step in range(predict_period):
        arch_sum = np.sum([a[i] * eps_extended[-i - 1] ** 2 for i in range(p)]) if p > 0 else 0
        garch_sum = np.sum([b[j] * h_extended[-j - 1] for j in range(q)]) if q > 0 else 0
        h_next = omega + arch_sum + garch_sum
        h_next = max(h_next, 1e-6)

        factor_input = predict_factors.iloc[step] if step < len(predict_factors) else predict_factors.iloc[-1]

        r_next = alpha + (np.dot(beta, factor_input) if k > 0 else 0) + lambd * np.sqrt(h_next)
        forecasts.append(r_next)
        volatilities.append(h_next)
        # update for next iteration
        eps_extended.append(r_next - (alpha + (np.dot(beta, factor_input) if k > 0 else 0)))
        h_extended.append(h_next)
    return forecasts, volatilities

In [None]:
def main_stock(return_: pd.Series, training_factors: pd.DataFrame, predict_factors: pd.DataFrame, predict_period=5) -> tuple:
    """
    train and predict

    :param return_: (n) stock return data
    :param training_factors: (n,) factor values in training period
    :param predict_factors: (m,) factor values in prediction period
    :param predict_period: the number of days to predict

    """
    print("Training...")
    p, q, test_result, significant_factors = pre_training(return_,training_factors) 
    if p == 0 and q == 0:
        print("No Arch effect")
        return None, None, None, None, None
    training_factors_stock = training_factors[significant_factors]
    model_result, diagnose_result = training(return_, training_factors_stock, p, q, backward=True)
    if model_result is None:
        return None, None, None, None, None
    else:
        # if the model is not converged, skip
        print(model_result)
        current_factors = model_result[model_result['sector'] == 'factor']['param'].tolist()

        # predict
        training_factors_stock = training_factors[current_factors]
        predict_factors_stock = predict_factors[current_factors]
        print("Predicting...")
        r_next, h_next = predict(return_, training_factors_stock, predict_factors_stock, model_result['value'], p, q, predict_period) 

        return r_next, h_next, model_result, test_result, diagnose_result

In [1667]:
def batch_predict(training_factors: pd.DataFrame, predict_factors: pd.DataFrame, returns: pd.DataFrame, predict_period=5):
    """
    predict the next month return of stocks
    :param training_factors: (n,) DataFrame with date as index, factors as columns and factor values as values
    :param predict_factors: (m,) DataFrame with date as index, factors as columns and factor values as values
    :param returns: (n,) DataFrame with date as index, stocks as columns and close price as values
    :param predict_period: the number of days to predict
    :return: (predict_period,) DataFrame with date as index, stocks as columns and close price as values
    """
    predict_returns = pd.DataFrame(columns=returns.columns, index=range(predict_period))
    predict_volatility = pd.DataFrame(columns=returns.columns, index=range(predict_period))
    training_info = {}
    valid_stocks = []
    for stock in returns.columns:
        print(f"Predicting {stock}...")
        return_ = returns[stock]
        r_next, h_next, model_result, test_result, diagnose_result = main_stock(return_, training_factors, predict_factors, predict_period)
        if model_result is None:
            print(f"Model for {stock} is not converged.")
            continue

        predict_returns[stock] = r_next
        predict_volatility[stock] = h_next
        training_info[stock] = {"model": model_result,
                                "test_result": test_result,
                                "diagnose_result": diagnose_result}
        valid_stocks.append(stock)
        print("Predicting finished.")


    return predict_returns, predict_volatility, training_info, valid_stocks

In [1668]:
def two_stage_garch_m(returns: pd.Series, factors_df: pd.DataFrame, p: int = 1, q: int = 1):
    """
    todo: try to separate factor model and GARCH-M and check if the result will be more stable
    Two-stage estimation of a GARCH-M model:
    1. OLS factor regression
    2. GARCH(p, q) on residuals
    3. Final regression: returns ~ factors + conditional volatility

    Returns:
        - final regression summary (with volatility term)
        - GARCH model summary
        - table with coefficients, t-stats, and p-values
    """

    # Step 1: OLS
    X1 = factors_df.copy()
    ols_model = sm.OLS(returns, X1).fit()
    residuals = ols_model.resid

    # Step 2:GARCH(p, q)
    am = arch_model(residuals, vol='Garch', p=p, q=q, dist='normal')
    garch_res = am.fit(disp="off")
    conditional_vol = garch_res.conditional_volatility

    # Step 3
    X2 = X1.copy()
    X2["volatility"] = conditional_vol
    final_model = sm.OLS(returns, X2).fit()

    summary_df = pd.DataFrame({
        "param": final_model.params.index,
        "value": final_model.params.values,
        "t_stat": final_model.tvalues,
        "p_value": final_model.pvalues,
        "significant": final_model.pvalues < 0.05
    })

    return final_model.summary(), garch_res.summary(), summary_df

In [1669]:
def rolling_predict(factors_df: pd.DataFrame, returns: pd.DataFrame, training_period = 24, predict_period=5):
    """
    rolling predict the next month return of stocks

    :param factors_df: (n, )DataFrame with date as index, factors as columns and factor values as values
    :param returns: (n, ) DataFrame with date as index, stocks as columns and close price as values
    :param training_period: the number of days to train
    :param fit_freq: the number of days to predict
    """
    dates = returns.iloc[training_period:].index
    predict_returns = pd.DataFrame(columns=returns.columns, index=dates)
    predict_volatility = pd.DataFrame(columns=returns.columns, index=dates)
    training_infos = {}
    for i in range(0, len(dates)):
        training_factors = factors_df.iloc[i:i + training_period]
        predict_factors = factors_df.iloc[i + training_period:i + training_period + predict_period]
        return_this = returns.iloc[i:i + training_period]
        next_returns, next_vol, current_training, valid_stocks = batch_predict(training_factors, predict_factors, return_this)

        predict_returns.iloc[i] = next_returns
        predict_volatility.iloc[i] = next_vol
        training_infos[dates[i]] = current_training 
        
    return predict_returns, predict_volatility, training_infos

In [None]:
def main(start_date='2020-01-01', end_date='2024-12-31', save_path='../data/results', window=24, pure_garchm=False, stock_list=None):
    """
    Main function

    :param start_date: the first prediction date
    :param end_date: the last prediction date
    :window: the training period ( natural days )
    :param pure_garchm: whether to use pure garch model
    :param stock_list: the list of stocks to predict
    :param fit_freq: the number of days to predict after training

    """

    # read data
    scale_ = 1  # to avoid training problems introduced by small scale
    returns, factors_df = read_data(stock_list=stock_list)
    returns = returns * scale_
    factors_df = factors_df * scale_
    # data begain date 
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)
    data_start_date = start_date - pd.DateOffset(days=window)

    workday_window = len(returns.loc[data_start_date: start_date])

    returns = returns.loc[data_start_date:end_date]
    factors_df = factors_df.loc[data_start_date:end_date]

    if pure_garchm:
        factors_df = pd.DataFrame()


    # rolling predict
    training_factors = factors_df.iloc[:workday_window]
    predict_factors = factors_df.iloc[workday_window:]
    predict_returns, predict_volatility, training_info, valid_stocks = batch_predict(training_factors, predict_factors, returns.iloc[:workday_window], predict_period=5)
    print(predict_returns)
    print(predict_volatility)
    print(valid_stocks)
    
    # predict_returns, predict_volatility, training_info = rolling_predict(factors_df, returns, training_period=workday_window)

    predict_returns = predict_returns / scale_
    predict_volatility = predict_volatility / (scale_**2)
    predict_returns = predict_returns.dropna(axis=1, how='all')
    predict_volatility = predict_volatility.dropna(axis=1, how='all') # means there is no Arch effect
    
    # save results
    predict_returns.to_csv(f'{save_path}/predict_returns.csv')
    predict_volatility.to_csv(f'{save_path}/predict_volatility.csv')
    

    for date_ in training_info.keys():
        for stock in training_info[date_].keys():
            date_str = date_.strftime('%Y%m%d')
            training_info[date_][stock]['model'].to_csv(f'{save_path}/details/training_info_{stock}_{date_str}.csv')
            training_info[date_][stock]['test_result'].to_csv(f'{save_path}/details/test_result_{stock}_{date_str}.csv')
            training_info[date_][stock]['diagnose_result'].to_csv(f'{save_path}/details/diagnose_result_{stock}_{date_str}.csv')

In [None]:
if __name__ == '__main__':
    main(start_date='2024-12-24', end_date='2024-12-31', save_path='../data/results', window=252, pure_garchm=False, stock_list=None)

Predicting CSCO...
Training...
Best GARCH order: 1, 1
Optimization terminated successfully    (Exit mode 0)
            Current function value: -594.9774295984893
            Iterations: 24
            Function evaluations: 197
            Gradient evaluations: 20
Residual analysis...
Optimization terminated successfully    (Exit mode 0)
            Current function value: -722.1702287032626
            Iterations: 70
            Function evaluations: 540
            Gradient evaluations: 68
Residual analysis...
    param      value  std_error        t_stat   p_value  significant  \
0   alpha  -0.178149   0.018733 -9.509746e+00  0.000000         True   
1     eqw   0.875903   0.003458  2.533167e+02  0.000000         True   
2  lambda  18.285025  19.032081  9.607475e-01  0.336679        False   
3   omega   0.000096   0.001000  9.560984e-02  0.923830        False   
4     a_1   0.002719   0.001000  2.719404e+00  0.006540         True   
5     b_1   0.000001   2.282693  4.808036e-07  1.0