Cell 1. Imports

In [1]:
import numpy as np
import pandas as pd

from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.covariance import LedoitWolf

import warnings
warnings.filterwarnings("ignore")

Cell 2. 설정값

In [2]:
CFG = {
    # Paths
    "EMBEDDING_PATH": "data/kospidaq_embeddings_KR_FinBERT.xlsx",
    "RETURN_PATH":    "data/report_return_mapping.xlsx",
    "ADJ_CLOSE_PATH": "data/adj_close_wide_2014_2026.xlsx",
    "MCAP_PATH":      "data/market_cap_2014_2025.xlsx",

    # Ridge
    "TRAIN_END_DATE": "2020-12-31",
    "RIDGE_ALPHA": 10.0,
    "TARGET_RETURN": "log_return_0",   # A 선택: log_return_0 고정

    # Prior / BL
    "COV_WINDOW": 252,
    "MIN_COVERAGE": 0.90,     # 252일 중 90%
    "LAMBDA": 2.5,            # risk aversion (Pi = lambda * Sigma * w_mkt)
    "TAU": 0.05,

    # Backtest
    "LONG_ONLY": True,        # True면 음수 weight 0으로 잘라서 renormalize
    "START_DATE": "2016-01-01",
    "END_DATE": "2025-12-30", # 시총 데이터 끝(2025-12-30)까지만 안전
}
RETURN_COLS = [f"log_return_{i}" for i in range(11)]


Cell 3-1. 데이터 로드 & 리포트 데이터 결합/집계 (Ridge 전에 임베딩 평균)

In [3]:
def load_wide_price_and_mcap(adj_close_path, mcap_path):
    px = pd.read_excel(adj_close_path, index_col=0)
    mc = pd.read_excel(mcap_path, index_col=0)

    px.index = pd.to_datetime(px.index)
    mc.index = pd.to_datetime(mc.index)

    px.columns = px.columns.astype(str).str.zfill(6)
    mc.columns = mc.columns.astype(str).str.zfill(6)

    px = px.sort_index()
    mc = mc.sort_index()
    return px, mc


def load_and_prepare_report_panel(embedding_path, return_path, return_cols):
    df_embed = pd.read_excel(embedding_path)
    df_ret   = pd.read_excel(return_path)

    # 행 정합성(순서 동일 전제)
    if len(df_embed) != len(df_ret):
        raise ValueError(f"행 수 불일치: embed={len(df_embed)} ret={len(df_ret)}")

    embedding_cols = [c for c in df_embed.columns if c.startswith("embedding_")]

    # date는 embed에서만, ticker_code/return은 ret에서만
    df = pd.concat(
        [
            df_embed[["date", "ticker"] + embedding_cols],
            df_ret[["ticker_code"] + return_cols],
        ],
        axis=1
    )
    df["date"] = pd.to_datetime(df["date"])
    df["ticker_code"] = df["ticker_code"].astype(str).str.zfill(6)

    # (date, ticker) 단위로 임베딩 평균 + 수익률 평균
    group_cols = ["date", "ticker_code"]
    agg_dict = {c: "mean" for c in embedding_cols}
    for c in return_cols:
        agg_dict[c] = "mean"

    df_agg = (
        df.groupby(group_cols, as_index=False)
          .agg(agg_dict)
          .sort_values(["date","ticker_code"])
          .reset_index(drop=True)
    )
    return df_agg, embedding_cols


Cell 3-2. Ridge 학습 + 예측값(pred_return) 생성 → View 패널(df_view)

In [4]:
def ridge_fit_predict_panel(df_agg, embedding_cols, target_return, train_end_date, alpha):
    df_tmp = df_agg.dropna(subset=[target_return]).copy()

    train_end_date = pd.to_datetime(train_end_date)
    train_mask = df_tmp["date"] <= train_end_date
    test_mask  = df_tmp["date"] > train_end_date

    X_train = df_tmp.loc[train_mask, embedding_cols].values
    y_train = df_tmp.loc[train_mask, target_return].values
    X_test  = df_tmp.loc[test_mask, embedding_cols].values
    y_test  = df_tmp.loc[test_mask, target_return].values

    model = Pipeline([
        ("scaler", StandardScaler()),
        ("ridge", Ridge(alpha=alpha))
    ])
    model.fit(X_train, y_train)

    df_tmp.loc[train_mask, "pred_return"] = model.predict(X_train)
    df_tmp.loc[test_mask,  "pred_return"] = model.predict(X_test)

    # 평가치도 같이 리턴
    y_pred_train = df_tmp.loc[train_mask, "pred_return"].values
    y_pred_test  = df_tmp.loc[test_mask,  "pred_return"].values

    metrics = {
        "train_r2": r2_score(y_train, y_pred_train),
        "test_r2": r2_score(y_test, y_pred_test),
        "train_mse": mean_squared_error(y_train, y_pred_train),
        "test_mse": mean_squared_error(y_test, y_pred_test),
        "n_train": len(y_train),
        "n_test": len(y_test),
    }

    df_view = df_tmp[["date","ticker_code","pred_return"]].copy()
    return df_view, model, metrics


Cell 3-3. Prior(Σ, Π) 계산: 252일 + 90% + Ledoit–Wolf + 시총 스냅샷

In [5]:
def compute_log_returns(px_wide):
    return np.log(px_wide / px_wide.shift(1))


def compute_prior_at_date(view_date, px, mc, window=252, min_coverage=0.90, risk_aversion=2.5):
    view_date = pd.to_datetime(view_date)

    if view_date not in mc.index:
        return None  # 시총 없으면 skip
    if view_date not in px.index:
        return None  # 가격 없으면 skip

    ret = compute_log_returns(px)

    if view_date not in ret.index:
        return None

    end_loc = ret.index.get_loc(view_date)
    start_loc = end_loc - window + 1
    if start_loc < 0:
        return None

    ret_win = ret.iloc[start_loc:end_loc+1]

    min_obs = int(np.ceil(window * min_coverage))
    valid_obs = ret_win.notna().sum(axis=0)
    tickers_cov = valid_obs[valid_obs >= min_obs].index

    mcap_t = mc.loc[view_date]
    tickers_mcap = mcap_t.dropna().index

    tickers_univ = sorted(list(set(tickers_cov).intersection(set(tickers_mcap))))
    if len(tickers_univ) < 2:
        return None

    X = ret_win[tickers_univ].dropna(axis=0, how="any")
    if len(X) < min_obs:
        return None

    lw = LedoitWolf().fit(X.values)
    Sigma = lw.covariance_

    mcap_vec = mc.loc[view_date, tickers_univ].values.astype(float)
    w_mkt = mcap_vec / np.nansum(mcap_vec)

    Pi = risk_aversion * (Sigma @ w_mkt)

    return {
        "tickers_univ": tickers_univ,
        "Sigma": Sigma,
        "Pi": Pi,
        "w_mkt": w_mkt
    }


Cell 3-4. View 정합성(교집합) + BL posterior + weights

In [6]:
def make_view_at_date(df_view, view_date):
    view_date = pd.to_datetime(view_date)
    tmp = df_view[df_view["date"] == view_date].copy()
    if tmp.empty:
        return None
    tmp["ticker_code"] = tmp["ticker_code"].astype(str).str.zfill(6)
    tmp = tmp.sort_values("ticker_code")
    return tmp


def align_prior_and_view(prior_dict, view_tmp):
    tickers_univ = prior_dict["tickers_univ"]
    Sigma = prior_dict["Sigma"]
    Pi = prior_dict["Pi"]
    w_mkt = prior_dict["w_mkt"]

    prior_set = set(tickers_univ)
    view_set = set(view_tmp["ticker_code"].unique())

    common = sorted(list(prior_set.intersection(view_set)))
    if len(common) < 2:
        return None

    idx_map = {t:i for i,t in enumerate(tickers_univ)}
    idx = np.array([idx_map[t] for t in common], dtype=int)

    Sigma_c = Sigma[np.ix_(idx, idx)]
    Pi_c = Pi[idx]
    w_c = w_mkt[idx]

    view_c = view_tmp[view_tmp["ticker_code"].isin(common)].sort_values("ticker_code")
    Q = view_c["pred_return"].values
    tickers_common = view_c["ticker_code"].values

    missing_in_prior = sorted(list(view_set - prior_set))
    missing_in_view  = sorted(list(prior_set - view_set))

    return {
        "tickers": tickers_common,
        "Sigma": Sigma_c,
        "Pi": Pi_c,
        "w_mkt": w_c,
        "Q": Q,
        "missing_in_prior": missing_in_prior,
        "missing_in_view": missing_in_view
    }


def black_litterman_posterior(Pi, Sigma, Q, Omega, tau=0.05):
    N = len(Q)
    P = np.eye(N)  # absolute views

    Sigma_t = tau * Sigma
    Sigma_t_inv = np.linalg.inv(Sigma_t)
    Omega_inv = np.linalg.inv(Omega)

    A = Sigma_t_inv + P.T @ Omega_inv @ P
    b = Sigma_t_inv @ Pi + P.T @ Omega_inv @ Q

    mu_bl = np.linalg.solve(A, b)
    return mu_bl


def mv_weights_from_mu_sigma(mu, Sigma, risk_aversion=2.5, long_only=True):
    # unconstrained mean-variance: w ~ (1/lambda) * Sigma^{-1} mu
    w = np.linalg.solve(Sigma, mu) / risk_aversion

    # normalize to sum=1 for portfolio backtest
    # (이 단계는 BL 논문 그대로라기보단 “실행 가능한 포트”로 만들기 위한 관행)
    if long_only:
        w = np.maximum(w, 0.0)
        s = w.sum()
        if s <= 0:
            # 전부 음수면 fallback: equal-weight
            w = np.ones_like(w) / len(w)
        else:
            w = w / s
    else:
        s = w.sum()
        if np.isclose(s, 0):
            w = w / (np.abs(w).sum() + 1e-12)
        else:
            w = w / s

    return w


Cell 3-5. 백테스트(매일 반복): “t에서 신호 생성 → t+1 수익률에 적용” (look-ahead 방지)

In [7]:
def backtest_daily_bl(
    df_view,
    px,
    mc,
    start_date,
    end_date,
    window=252,
    min_coverage=0.90,
    lambd=2.5,
    tau=0.05,
    omega_scale=None,   # None이면 OOS MSE 기반으로 밖에서 넣거나, 아래에서 기본값 사용
    long_only=True
):
    start_date = pd.to_datetime(start_date)
    end_date   = pd.to_datetime(end_date)

    # 가격 수익률(다음날 적용)
    ret = compute_log_returns(px)

    # 가능한 날짜: view 존재 + 시총 존재 + 다음날 수익률 존재
    all_dates = sorted(set(df_view["date"].unique()).intersection(set(mc.index)).intersection(set(px.index)))
    all_dates = [d for d in all_dates if start_date <= d <= end_date]

    # 다음날 수익률을 쓰기 위해 마지막 하루는 제외
    all_dates = [d for d in all_dates if d in ret.index and (ret.index.get_loc(d) + 1) < len(ret.index)]

    nav = 1.0
    nav_series = []
    daily_records = []

    for d in all_dates:
        prior = compute_prior_at_date(
            view_date=d,
            px=px,
            mc=mc,
            window=window,
            min_coverage=min_coverage,
            risk_aversion=lambd
        )
        if prior is None:
            continue

        view_tmp = make_view_at_date(df_view, d)
        if view_tmp is None:
            continue

        aligned = align_prior_and_view(prior, view_tmp)
        if aligned is None:
            continue

        tickers = aligned["tickers"]
        Sigma = aligned["Sigma"]
        Pi = aligned["Pi"]
        Q = aligned["Q"]

        N = len(Q)

        # Omega: 기본은 "스칼라×I" (omega_scale이 주어지면 그걸 사용)
        if omega_scale is None:
            # 보수적 기본값: view 분산 스케일을 이용 (너무 작으면 view를 과신)
            # (원하면 OOS MSE를 밖에서 계산해 넣어도 됨)
            omega_val = np.var(Q) if np.var(Q) > 0 else 1e-6
        else:
            omega_val = float(omega_scale)

        Omega = np.eye(N) * omega_val

        mu_bl = black_litterman_posterior(Pi=Pi, Sigma=Sigma, Q=Q, Omega=Omega, tau=tau)
        w = mv_weights_from_mu_sigma(mu=mu_bl, Sigma=Sigma, risk_aversion=lambd, long_only=long_only)

        # 다음날 수익률 적용 (t+1)
        d_loc = ret.index.get_loc(d)
        d_next = ret.index[d_loc + 1]
        r_next = ret.loc[d_next, tickers].values

        # 포트 수익률
        port_ret = np.nansum(w * r_next)
        nav *= float(np.exp(port_ret))  # log-return이므로 exp로 누적

        nav_series.append((d_next, nav))
        daily_records.append({
            "signal_date": d,
            "hold_date": d_next,
            "N": len(tickers),
            "omega_val": omega_val,
            "port_logret": port_ret,
            "nav": nav,
            "missing_in_prior_n": len(aligned["missing_in_prior"]),
            "missing_in_view_n": len(aligned["missing_in_view"]),
        })

    nav_df = pd.DataFrame(nav_series, columns=["date", "nav"]).set_index("date")
    rec_df = pd.DataFrame(daily_records)

    return nav_df, rec_df
