In [5]:
import math
from datetime import datetime
from calendar import monthrange
from typing import Callable, Sequence, Union, Tuple
import warnings

warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
from pandas_datareader import data as pdr
from dateutil.relativedelta import relativedelta

WatchDate = Union[str, datetime]
FOMCDates = Sequence[Union[str, datetime]]
OHLCLoader = Callable[[str], pd.DataFrame]

In [6]:
def _normalize_watch_date(watch_date: WatchDate) -> datetime:
    if isinstance(watch_date, str):
        return datetime.strptime(watch_date, "%Y-%m-%d")
    elif isinstance(watch_date, datetime):
        return watch_date
    else:
        raise ValueError("watch_date must be 'YYYY-MM-DD' string or datetime")


def _normalize_fomc_dates(fomc_dates: FOMCDates) -> list[datetime]:
    if all(isinstance(d, datetime) for d in fomc_dates):
        res = list(fomc_dates)
    elif all(isinstance(d, str) for d in fomc_dates):
        res = [datetime.strptime(d, "%Y-%m-%d") for d in fomc_dates]
    else:
        raise ValueError(
            "fomc_dates must be list of datetime or list of 'YYYY-MM-DD' strings"
        )
    return sorted(res)


def _starting_no_fomc_month(
    watch_date: datetime, fomc_dates: list[datetime]
) -> Tuple[int, int]:
    """
    원래 FOMC.starting_no_fomc_month 와 동일한 로직.
    watch_date 이전 구간에서 'No FOMC'인 첫 달을 찾는다.
    """
    fomc_ym = [
        d.strftime("%Y-%m")
        for d in fomc_dates
        if d.strftime("%Y-%m") <= watch_date.strftime("%Y-%m")
    ]
    if not fomc_ym:
        raise ValueError("fomc_dates가 지나치게 과거/미래만 포함하는 것 같습니다.")

    target = watch_date
    while target.strftime("%Y-%m") >= fomc_ym[0]:
        if target.strftime("%Y-%m") not in fomc_ym:
            return target.year, target.month
        target = target - relativedelta(months=1)

    raise ValueError(
        "Starting No-FOMC month not found. FOMC 일정 리스트를 확인해 주세요."
    )


def _ending_no_fomc_month(
    watch_date: datetime, fomc_dates: list[datetime], num_upcoming: int
) -> Tuple[int, int]:
    """
    원래 FOMC.ending_no_fomc_month 로직.
    watch_date 이후 num_upcoming 개의 회의를 포함한 뒤,
    그 다음 'No FOMC' 달을 찾는다.
    """
    fomc_ym_fwd = [
        d.strftime("%Y-%m") for d in fomc_dates if d >= watch_date
    ]
    if not fomc_ym_fwd:
        raise ValueError("watch_date 이후 FOMC 일정이 없습니다.")

    target = watch_date
    fomc_counter = 0
    ending_no_fomc = None

    while target.strftime("%Y-%m") <= fomc_ym_fwd[-1]:
        ym = target.strftime("%Y-%m")
        if ym in fomc_ym_fwd:
            fomc_counter += 1
        else:
            ending_no_fomc = target
            if fomc_counter >= num_upcoming:
                break
        target = target + relativedelta(months=1)

    if fomc_counter < num_upcoming:
        raise ValueError(
            f"num_upcoming={num_upcoming} 회의를 충족하지 못했습니다. fomc_dates 범위를 늘려야 합니다."
        )
    if ending_no_fomc is None:
        raise ValueError("Ending No-FOMC month not found. 일정 리스트를 확인해 주세요.")

    return ending_no_fomc.year, ending_no_fomc.month


def _generate_month_list(
    watch_date: datetime, fomc_dates: list[datetime], num_upcoming: int
) -> list[str]:
    y0, m0 = _starting_no_fomc_month(watch_date, fomc_dates)
    y1, m1 = _ending_no_fomc_month(watch_date, fomc_dates, num_upcoming)

    month_index = pd.date_range(
        start=f"{y0}-{m0:02d}", end=f"{y1}-{m1:02d}", freq="MS"
    )
    return [d.strftime("%Y-%m") for d in month_index]


def _generate_contract_list_boj(
    month_list: list[str],
    prefix: str = "TOA3M",   # 실제 쓰는 티커 prefix로 바꿔 쓰세요. 예: "JTA"
    year_fmt: str = "YYYY",   # "YY" -> 25, "YYYY" -> 2025
) -> list[str]:
    """
    BOJ용 3M TONA 선물 티커 생성.
    H: March, M: June, U: September, Z: December
    - 1,2,3월  -> H (3월 분기)
    - 4,5,6월  -> M (6월 분기)
    - 7,8,9월  -> U (9월 분기)
    - 10,11,12월 -> Z (12월 분기)
    """
    quarter_code = {
        3: "H",
        6: "M",
        9: "U",
        12: "Z",
    }

    contract_list = []
    for ym in month_list:
        y, m = ym.split("-")
        y_i, m_i = int(y), int(m)

        # 해당 월이 속한 분기 마지막 달
        if m_i <= 3:
            q_month = 3
        elif m_i <= 6:
            q_month = 6
        elif m_i <= 9:
            q_month = 9
        else:
            q_month = 12

        code = quarter_code[q_month]

        if year_fmt == "YY":
            y_str = str(y_i)[-2:]
        else:
            y_str = str(y_i)

        # 예: prefix="JTA" 이면 JTAH25 형태
        symbol = f"{prefix}{code}{y_str}"
        contract_list.append(symbol)

    return contract_list


def _generate_meeting_list(
    month_list: list[str], fomc_dates: list[datetime]
) -> list[str]:
    """
    각 month(YYYY-MM)에 해당 월에 FOMC가 있으면 그 날짜(YYYY-MM-DD), 없으면 'No FOMC'
    """
    res = []
    for ym in month_list:
        yy, mm = ym.split("-")
        yy_i, mm_i = int(yy), int(mm)
        matches = [
            d.strftime("%Y-%m-%d")
            for d in fomc_dates
            if d.year == yy_i and d.month == mm_i
        ]
        res.append(matches[0] if matches else "No FOMC")
    return res


def _generate_order_list(
    watch_date: datetime, month_list: list[str], meeting_list: list[str]
) -> list[int]:
    """
    원래 FOMC.generate_order_list 와 동일한 로직 구현.
    month_list / meeting_list 기준으로,
    watch_date를 기준으로 과거/미래 회의에 음수/양수 번호를 달고,
    회의가 없는 달은 0.
    """
    calc_yr, calc_mn = watch_date.year, watch_date.month
    calc_ym = f"{calc_yr}-{calc_mn:02d}"

    try:
        idx = next(i for i, m in enumerate(month_list) if m == calc_ym)
    except StopIteration:
        raise ValueError("watch_date가 month_list 범위 밖입니다.")

    # watch_date가 회의 있는 달인 경우, 이미 회의가 끝난 상태면 과거로 취급
    if meeting_list[idx] == "No FOMC" or (
        meeting_list[idx] != "No FOMC"
        and datetime.strptime(meeting_list[idx], "%Y-%m-%d") <= watch_date
    ):
        bwd = meeting_list[: idx + 1]
        bwd.reverse()
        fwd = meeting_list[idx + 1 :]
    else:
        bwd = meeting_list[:idx]
        bwd.reverse()
        fwd = meeting_list[idx:]

    # forward: upcoming 회의
    fomc_order_fwd = []
    cnt = 1
    for date_str in fwd:
        if date_str == "No FOMC":
            fomc_order_fwd.append(0)
        else:
            fomc_order_fwd.append(cnt)
            cnt += 1

    # backward: past 회의
    fomc_order_bwd = []
    cnt = -1
    for date_str in bwd:
        if date_str == "No FOMC":
            fomc_order_bwd.append(0)
        else:
            fomc_order_bwd.append(cnt)
            cnt -= 1
    fomc_order_bwd.reverse()

    return fomc_order_bwd + fomc_order_fwd


def build_fomc_calendar_summary(
    watch_date: WatchDate,
    fomc_dates: FOMCDates,
    num_upcoming: int,
) -> pd.DataFrame:
    """
    기존 FOMC 클래스의 핵심 정보를 하나의 DataFrame으로 반환.
    index: YYYY-MM
    columns: ['Contract', 'Meeting', 'Order']
    """

    wd = _normalize_watch_date(watch_date)
    fd = _normalize_fomc_dates(fomc_dates)

    month_list = _generate_month_list(wd, fd, num_upcoming)
    contract_list = _generate_contract_list_boj(month_list)
    meeting_list = _generate_meeting_list(month_list, fd)
    order_list = _generate_order_list(wd, month_list, meeting_list)

    summary = pd.DataFrame(
        {
            "Contract": contract_list,
            "Meeting": meeting_list,
            "Order": order_list,
        },
        index=month_list,
    )
    summary.index.name = "YYYY-MM"

    return summary

In [7]:
def _normalize_ohlc(ohlc_df: pd.DataFrame, symbol: str, loader_name: str) -> pd.DataFrame:
    """
    기존 FedWatch.get_fff_history 와 동일한 검증 + Date index 강제.
    """
    if not isinstance(ohlc_df, pd.DataFrame):
        raise ValueError(
            f"'{loader_name}' did not return a pandas DataFrame for {symbol}."
        )
    if "Close" not in ohlc_df.columns:
        raise ValueError(
            f"'{loader_name}' did not return a DataFrame with 'Close' column for {symbol}."
        )

    # Date 가 index 또는 column에 있어야 함
    if "Date" not in ohlc_df.columns and ohlc_df.index.name != "Date":
        raise ValueError(
            f"'{loader_name}' did not return a DataFrame with 'Date' as index or column for {symbol}."
        )

    # index가 Date인 경우
    if ohlc_df.index.name == "Date":
        if pd.to_datetime(ohlc_df.index, errors="coerce").notna().all():
            ohlc_df.index = pd.to_datetime(ohlc_df.index, format="%Y-%m-%d")
        else:
            raise ValueError(
                f"'{loader_name}' returned non-convertible Date index for {symbol}."
            )

    # 그렇지 않으면 column 'Date'를 index로
    if ohlc_df.index.name != "Date":
        if "Date" in ohlc_df.columns and pd.to_datetime(
            ohlc_df["Date"], errors="coerce"
        ).notna().all():
            ohlc_df["Date"] = pd.to_datetime(ohlc_df["Date"], format="%Y-%m-%d")
            ohlc_df.set_index("Date", inplace=True)
        else:
            raise ValueError(
                f"'{loader_name}' returned non-convertible 'Date' column for {symbol}."
            )

    return ohlc_df


def _add_price_data(
    summary: pd.DataFrame,
    watch_date: datetime,
    ohlc_loader: OHLCLoader,
    loader_name: str,
) -> pd.DataFrame:
    """
    기존 FedWatch.add_price_data 와 동일.
    summary 에 Pstart, Pavg, Pend 열을 추가해서 반환.
    """
    p_start, p_avg, p_end = [], [], []

    watch_month = watch_date.strftime("%Y-%m")
    watch_date_str = watch_date.strftime("%Y-%m-%d")

    for ym, row in summary.iterrows():
        contract_symbol = row["Contract"]
        contract_month = ym
        month_type = row["Meeting"]

        ohlc = _normalize_ohlc(
            ohlc_loader(contract_symbol), contract_symbol, loader_name
        )

        # 미만기 계약
        if contract_month >= watch_month:
            p_avg_i = ohlc[ohlc.index <= watch_date_str].iloc[-1]["Close"]
            p_avg.append(p_avg_i)
        else:
            # 만기 지난 계약 → 그 달 마지막 거래일 기준
            yyyy_mm = datetime.strptime(contract_month, "%Y-%m")
            last_day = monthrange(yyyy_mm.year, yyyy_mm.month)[1]
            last_day_str = datetime(
                yyyy_mm.year, yyyy_mm.month, last_day
            ).strftime("%Y-%m-%d")
            p_avg_i = ohlc[ohlc.index <= last_day_str].iloc[-1]["Close"]
            p_avg.append(p_avg_i)

        if month_type == "No FOMC":
            p_start.append(p_avg_i)
            p_end.append(p_avg_i)
        else:
            p_start.append(0.0)
            p_end.append(0.0)

    summary = summary.copy()
    summary["Pstart"] = p_start
    summary["Pavg"] = p_avg
    summary["Pend"] = p_end
    return summary


def _fill_price_data(
    summary: pd.DataFrame,
) -> pd.DataFrame:
    """
    기존 FedWatch.fill_price_data 로직.
    FOMC 달의 Pstart/Pend 0 값을 앞/뒤 달과 회의 날짜로 보간.
    """
    p_start = summary["Pstart"].to_numpy().astype(float)
    p_avg = summary["Pavg"].to_numpy().astype(float)
    p_end = summary["Pend"].to_numpy().astype(float)

    # forward
    for i in range(1, len(p_avg) - 1):
        if p_start[i] == 0.0 and p_end[i - 1] != 0.0:
            p_start[i] = p_end[i - 1]
        if p_end[i] == 0.0 and p_start[i + 1] != 0.0:
            p_end[i] = p_start[i + 1]

    # backward
    month_index = summary.index.to_list()
    meeting_list = summary["Meeting"].to_list()

    for i in range(len(p_avg) - 2, 0, -1):
        if p_end[i] == 0.0:
            p_end[i] = p_start[i + 1]

        if p_start[i] == 0.0:
            meeting_date = datetime.strptime(meeting_list[i], "%Y-%m-%d")
            days_no = monthrange(meeting_date.year, meeting_date.month)[1]
            m = days_no - meeting_date.day + 1
            n = days_no - m
            p_start[i] = (p_avg[i] - m / (m + n) * p_end[i]) / (n / (m + n))

    summary = summary.copy()
    summary["Pstart"] = p_start
    summary["Pavg"] = p_avg
    summary["Pend"] = p_end
    return summary


def _generate_binary_hike_info(
    summary: pd.DataFrame,
    num_upcoming: int,
    step_bp: int = 5,   # Fed는 25, BOJ는 10으로 사용
) -> pd.DataFrame:
    """
    각 회의별 H0/H1(두 개의 step_bp 시나리오)와 P0/P1(각 시나리오의 확률)을 계산.
    """
    df = summary.copy()
    df = df[(df["Order"] > 0) & (df["Order"] <= num_upcoming)]

    # 내재금리 변화(단위: bp)
    df["Change"] = ((100 - df["Pend"]) - (100 - df["Pstart"])) / step_bp * 100

    df["H0"] = df["Change"].apply(lambda x: int(math.trunc(x) * step_bp))
    df["H1"] = df["Change"].apply(
        lambda x: int(math.trunc(x) * step_bp + step_bp * np.sign(x))
    )

    df["P0"] = df["Change"].apply(
        lambda x: 1 - (abs(x) - math.trunc(abs(x)))
    )
    df["P1"] = df["Change"].apply(
        lambda x: (abs(x) - math.trunc(abs(x)))
    )
    return df


def _calc_cum_info(
    lead_size: np.ndarray,
    lag_size: np.ndarray,
    lead_prob: np.ndarray,
    lag_prob: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    두 회의(lead/lag)를 합성하여 누적 hike size 및 probability 계산
    (기존 calc_cum_info)
    """
    size_list = lead_size[:, np.newaxis] + lag_size
    prob_list = lead_prob[:, np.newaxis] * lag_prob

    size_flat = size_list.flatten()
    prob_flat = prob_list.flatten()
    unique_size, idx = np.unique(size_flat, return_inverse=True)
    unique_prob = np.bincount(idx, weights=prob_flat)
    return unique_size, unique_prob


def compute_fedwatch_prob_table(
    watch_date: WatchDate,
    fomc_dates: FOMCDates,
    num_upcoming: int,
    ohlc_loader: OHLCLoader,
    rate_cols: bool = True,
    watch_rate_range: Tuple[float, float] | None = None,
) -> pd.DataFrame:
    """
    FOMC/FedWatch 클래스를 쓰지 않고,
    같은 계산 구조로 CME FedWatch 스타일 확률 테이블을 만드는 함수.

    Parameters
    ----------
    watch_date : str or datetime
    fomc_dates : list[str or datetime]
        전체 FOMC 일정 (과거 + 미래).
    num_upcoming : int
        watch_date 이후 몇 회의까지 포함할지.
    ohlc_loader : callable
        symbol(str) -> OHLC DataFrame('Date' index, 'Close' 컬럼 포함)을 반환하는 함수.
        (tvDatafeed, yfinance 등 자유롭게 사용)
    rate_cols : bool
        True면 컬럼 이름을 실제 타깃금리 구간("5.25-5.50")으로 변환.
        False면 bp change(예: -25, 0, 25) 그대로 유지.
    watch_rate_range : (float, float) or None
        (하한, 상한) 현재 타깃금리. None이면 FRED(DFEDTARL/DFEDTARU)에서 조회.

    Returns
    -------
    fedwatch_df : pd.DataFrame
        index: MultiIndex(WatchDate, FOMCDate)
        columns: 개별 금리구간(또는 bp change)별 확률
    """

    wd = _normalize_watch_date(watch_date)
    fd = _normalize_fomc_dates(fomc_dates)

    # 1) FOMC 캘린더 요약
    summary = build_fomc_calendar_summary(wd, fd, num_upcoming)

    # 2) 월별 선물가격 (평균/시작/끝) 채우기
    loader_name = getattr(ohlc_loader, "__name__", "ohlc_loader")
    summary = _add_price_data(summary, wd, ohlc_loader, loader_name)
    summary = _fill_price_data(summary)

    # 3) 이진 H0/H1 + P0/P1 생성
    bin_df = _generate_binary_hike_info(summary, num_upcoming)

    # 4) watch_date 기준 현재 타깃금리 구간
    if rate_cols and watch_rate_range is None:
        try:
            if wd >= datetime(2008, 12, 16):
                ll = pdr.DataReader("DFEDTARL", "fred", start=wd, end=wd).iloc[0, 0]
                ul = pdr.DataReader("DFEDTARU", "fred", start=wd, end=wd).iloc[0, 0]
                watch_rate_range = (ll, ul)
            else:
                ll = ul = pdr.DataReader("DFEDTAR", "fred", start=wd, end=wd).iloc[0, 0]
                watch_rate_range = (ll, ul)
        except Exception as e:
            raise ValueError(
                "FRED에서 타깃금리 범위를 가져오지 못했습니다. "
                "watch_rate_range=(ll, ul)을 직접 지정해 주세요."
            ) from e

    # 5) 회의별 누적 hike size / prob 계산 (convolution)
    def _extract(group: pd.DataFrame) -> dict:
        return {
            "hike_size": np.array(group[["H0", "H1"]].values[0]),
            "hike_prob": np.array(group[["P0", "P1"]].values[0]),
        }

    grouped = bin_df.groupby("Meeting").apply(_extract)

    # 첫 회의
    first_meeting = grouped.index[0]
    ms = grouped.iloc[0]["hike_size"]
    mp = grouped.iloc[0]["hike_prob"]

    data = {"FOMCDate": first_meeting}
    for size, prob in zip(ms, mp):
        data[size] = [prob]

    fedwatch_df = pd.DataFrame(data).set_index("FOMCDate")

    lead_size, lead_prob = ms, mp

    # 이후 회의들
    for i in range(1, len(grouped)):
        meet_date = grouped.index[i]
        lag_size = grouped.iloc[i]["hike_size"]
        lag_prob = grouped.iloc[i]["hike_prob"]

        ms, mp = _calc_cum_info(lead_size, lag_size, lead_prob, lag_prob)

        data = {"FOMCDate": meet_date}
        for size, prob in zip(ms, mp):
            data[size] = [prob]

        fedwatch_df = pd.concat(
            [fedwatch_df, pd.DataFrame(data).set_index("FOMCDate")]
        ).fillna(0.0)

        lead_size, lead_prob = ms, mp

    fedwatch_df.sort_index(axis=1, inplace=True)

    # watch_date MultiIndex 추가
    fedwatch_df["WatchDate"] = wd.strftime("%Y-%m-%d")
    fedwatch_df.reset_index(inplace=True)
    fedwatch_df.set_index(["WatchDate", "FOMCDate"], inplace=True)

    # 6) 컬럼명을 실제 타깃금리 구간으로 변환 (선택)
    if rate_cols:
        ll, ul = watch_rate_range
        diff = ul - ll
        new_cols = []
        for c in fedwatch_df.columns:
            col_bp = float(c) / 100.0  # 25 → 0.25bp
            if diff == 0:
                new_name = f"{ll + col_bp:.2f}"
            else:
                new_name = f"{ll + col_bp:.2f}-{ul + col_bp:.2f}"
            new_cols.append(new_name)
        fedwatch_df.columns = new_cols

    return fedwatch_df

In [23]:
def compute_bojwatch_prob_table(
    watch_date: WatchDate,
    boj_meeting_dates: FOMCDates,
    num_upcoming: int,
    ohlc_loader: OHLCLoader,
    watch_rate_range: Tuple[float, float],
    prefix: str = "TOA3M",
    year_fmt: str = "YYYY",
    step_bp: int = 5,
    rate_cols: bool = True,
) -> pd.DataFrame:
    """
    3M TONA 선물(H/M/U/Z)로 BOJ 금리결정 확률 테이블 근사.

    Parameters
    ----------
    watch_date : str or datetime
    boj_meeting_dates : list[str or datetime]
        BOJ 통화정책결정회의 일정.
    num_upcoming : int
        watch_date 이후 몇 회의까지 포함할지.
    ohlc_loader : callable
        symbol(str) -> OHLC DataFrame('Date' index, 'Close' 컬럼 포함).
    watch_rate_range : (float, float)
        현재 BOJ 타깃금리 범위 (ll, ul).
        예: (-0.10, 0.00) 등.
    prefix, year_fmt : 3M TONA 티커 규칙에 맞게 조정.
    step_bp : bp 스텝 (BOJ는 10bp 권장)
    rate_cols : True면 "ll+Δ - ul+Δ" 형식으로 컬럼 이름 변환.
    """

    wd = _normalize_watch_date(watch_date)
    fd = _normalize_fomc_dates(boj_meeting_dates)

    # 1) 캘린더 요약 (month_list / Meeting / Order 생성)
    month_list = _generate_month_list(wd, fd, num_upcoming)
    contract_list = _generate_contract_list_boj(month_list, prefix=prefix, year_fmt=year_fmt)
    meeting_list = _generate_meeting_list(month_list, fd)
    order_list = _generate_order_list(wd, month_list, meeting_list)

    summary = pd.DataFrame(
        {
            "Contract": contract_list,
            "Meeting": meeting_list,
            "Order": order_list,
        },
        index=month_list,
    )
    summary.index.name = "YYYY-MM"

    # 2) 월별 선물가격 (Pstart/Pavg/Pend)
    loader_name = getattr(ohlc_loader, "__name__", "ohlc_loader")
    summary = _add_price_data(summary, wd, ohlc_loader, loader_name)
    summary = _fill_price_data(summary)

    # 3) 이진 H0/H1 + P0/P1 (step_bp 사용)
    bin_df = _generate_binary_hike_info(summary, num_upcoming, step_bp=step_bp)

    # 4) 회의별 누적 hike size / prob 계산 (FedWatch와 동일)
    def _extract(group: pd.DataFrame) -> dict:
        return {
            "hike_size": np.array(group[["H0", "H1"]].values[0]),
            "hike_prob": np.array(group[["P0", "P1"]].values[0]),
        }

    grouped = bin_df.groupby("Meeting").apply(_extract)

    first_meeting = grouped.index[0]
    ms = grouped.iloc[0]["hike_size"]
    mp = grouped.iloc[0]["hike_prob"]

    data = {"BOJDate": first_meeting}
    for size, prob in zip(ms, mp):
        data[size] = [prob]
    bojwatch_df = pd.DataFrame(data).set_index("BOJDate")

    lead_size, lead_prob = ms, mp

    for i in range(1, len(grouped)):
        meet_date = grouped.index[i]
        lag_size = grouped.iloc[i]["hike_size"]
        lag_prob = grouped.iloc[i]["hike_prob"]

        ms, mp = _calc_cum_info(lead_size, lag_size, lead_prob, lag_prob)

        data = {"BOJDate": meet_date}
        for size, prob in zip(ms, mp):
            data[size] = [prob]

        bojwatch_df = pd.concat(
            [bojwatch_df, pd.DataFrame(data).set_index("BOJDate")]
        ).fillna(0.0)

        lead_size, lead_prob = ms, mp

    bojwatch_df.sort_index(axis=1, inplace=True)

    # 5) WatchDate MultiIndex
    bojwatch_df["WatchDate"] = wd.strftime("%Y-%m-%d")
    bojwatch_df.reset_index(inplace=True)
    bojwatch_df.set_index(["WatchDate", "BOJDate"], inplace=True)

    # 6) 컬럼명을 실제 타깃금리 구간으로 변환 (선택)
    if rate_cols:
        ll, ul = watch_rate_range
        diff = ul - ll
        new_cols = []
        for c in bojwatch_df.columns:
            col_bp = float(c) / 100.0
            if diff == 0:
                new_name = f"{ll + col_bp:.2f}"
            else:
                new_name = f"{ll + col_bp:.2f}"
            new_cols.append(new_name)
        bojwatch_df.columns = new_cols

    return bojwatch_df

In [30]:
from tvDatafeed import TvDatafeed, Interval

tv = TvDatafeed()

boj_dates = [
    "2025-01-22",
    "2025-03-24",
    "2025-04-24",
    "2025-06-16",
    "2025-07-30",
    "2025-09-17",
    "2025-10-30",
    "2025-12-18",
    "2026-01-23",
    '2026-03-19',
    '2026-04-28',
    '2026-06-16',
    '2026-07-31',
    '2026-09-18',
    '2026-10-30',
    '2026-12-18',
]


def load_tona_3m(symbol: str) -> pd.DataFrame:
    """
    tvDatafeed에서 일본 3M TONA 선물(TOA3MYYYY)을 로드.

    Parameters
    ----------
    symbol : str
        예: 'TOA3M2025'

    Returns
    -------
    pd.DataFrame
        index : DatetimeIndex (name='Date')
        columns : ['Open', 'High', 'Low', 'Close', 'Volume']
    """
    df = tv.get_hist(
        symbol=symbol,
        exchange="OSE",
        interval=Interval.in_daily,
        n_bars=3000,   # 충분히 크게
    )

    if df is None or df.empty:
        raise ValueError(f"tvDatafeed returned no data for {symbol} (OSE)")

    # tvDatafeed는 index가 datetime이므로 그대로 사용
    df = df.copy()
    df.index.name = "Date"

    # 컬럼 표준화 (혹시 모를 케이스 대비)
    df.columns = [c.capitalize() for c in df.columns]

    if "Close" not in df.columns:
        raise ValueError(f"'Close' column not found in TOA3M data for {symbol}")

    return df

watch_date = "2025-12-13"
watch_rate_range = (0.5, 1.0)

bojwatch = compute_bojwatch_prob_table(
    watch_date=watch_date,
    boj_meeting_dates=boj_dates,
    num_upcoming=5,
    ohlc_loader=load_tona_3m,
    watch_rate_range=watch_rate_range,
    prefix="TOA3M",    # 실제 티커 규칙에 맞게 수정
    year_fmt="YYYY",
    step_bp=5,
)



In [31]:
bojwatch

Unnamed: 0_level_0,Unnamed: 1_level_0,0.50,0.55,0.60,0.65,0.70,0.75,0.80,0.85,0.90
WatchDate,BOJDate,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2025-12-13,2025-12-18,0.0,0.5,0.5,0.0,0.0,0.0,0.0,0.0,0.0
2025-12-13,2026-01-23,1.421085e-13,0.5,0.5,0.0,0.0,0.0,0.0,0.0,0.0
2025-12-13,2026-03-19,0.0,0.0,8.526513e-14,0.3,0.5,0.2,0.0,0.0,0.0
2025-12-13,2026-04-28,0.0,0.0,8.526513e-14,0.3,0.5,0.2,0.0,0.0,0.0
2025-12-13,2026-06-16,0.0,0.0,0.0,0.0,8.100187e-14,0.285,0.49,0.215,0.01


In [34]:
from tqdm import tqdm

def build_bojwatch_prob_time_series(
    start_date: str | datetime,
    end_date: str | datetime,
    boj_meeting_dates: list[str | datetime],
    num_upcoming: int,
    ohlc_loader: Callable[[str], pd.DataFrame],
    watch_rate_range: Tuple[float, float],
    step_bp: int = 10,
    prefix: str = "TOA3M",      # 여기서는 계약 리스트를 쓰지 않는다면 의미 없음
    year_fmt: str = "YYYY",     # (동일)
    rate_cols: bool = True,
    freq: str = "B",            # 'B' 영업일, 'D' 일간
    on_error: str = "skip",     # 'skip' or 'nan'  (nan은 자리 유지)
) -> pd.DataFrame:
    """
    BOJWatch 확률 테이블을 watch_date별로 반복 계산해 시계열 형태로 쌓는다.

    Returns
    -------
    ts_df : pd.DataFrame
        index: MultiIndex(WatchDate, FOMCDate)
        columns: 금리구간(or bp change) 확률
    """

    # 날짜 정규화
    start_dt = pd.to_datetime(start_date)
    end_dt = pd.to_datetime(end_date)
    if start_dt > end_dt:
        raise ValueError("start_date must be <= end_date")

    watch_dates = pd.date_range(start=start_dt, end=end_dt, freq=freq)

    results = []
    errors = []

    for wd in tqdm(watch_dates):
        wd_str = wd.strftime("%Y-%m-%d")
        try:
            df = compute_bojwatch_prob_table(
                watch_date=wd_str,
                boj_meeting_dates=boj_meeting_dates,
                num_upcoming=num_upcoming,
                ohlc_loader=ohlc_loader,
                watch_rate_range=watch_rate_range,
                prefix=prefix,
                year_fmt=year_fmt,
                step_bp=step_bp,
                rate_cols=rate_cols,
            )
            results.append(df)

        except Exception as e:
            if on_error == "skip":
                errors.append((wd_str, repr(e)))
                continue
            elif on_error == "nan":
                # nan 방식은 전체 컬럼 스키마를 알아야 하므로
                # 최소 1개라도 성공한 뒤에만 의미가 있음
                errors.append((wd_str, repr(e)))
                # 일단 placeholder 기록만 해두고, 아래에서 스키마 맞춰 추가
                results.append((wd_str, None))
            else:
                raise ValueError("on_error must be 'skip' or 'nan'") from e

    if not results:
        raise ValueError("No results. All watch_dates failed.")

    # on_error='nan' 처리: 결과 리스트에 (wd_str, None)이 섞여 있을 수 있음
    if on_error == "nan":
        # 성공한 첫 df 찾기
        first_df = next((x for x in results if isinstance(x, pd.DataFrame)), None)
        if first_df is None:
            raise ValueError("All watch_dates failed (nan mode).")
        cols = first_df.columns

        normalized = []
        for item in results:
            if isinstance(item, pd.DataFrame):
                normalized.append(item)
            else:
                wd_str, _none = item
                # FOMCDate 레벨을 맞추기 위해, first_df의 FOMCDate index를 그대로 사용
                tmp = first_df.copy()
                tmp.loc[:, :] = np.nan
                # WatchDate만 바꾸기: 인덱스가 MultiIndex(WatchDate, FOMCDate)이므로
                tmp = tmp.reset_index()
                tmp["WatchDate"] = wd_str
                tmp = tmp.set_index(["WatchDate", "FOMCDate"])
                tmp = tmp[cols]
                normalized.append(tmp)

        results = normalized

    ts_df = pd.concat(results).sort_index()
    ts_df.attrs["errors"] = errors  # 실패 로그 저장(원하면 확인)
    return ts_df

In [37]:
ts = build_bojwatch_prob_time_series(
    start_date="2025-10-01",
    end_date="2025-12-12",
    boj_meeting_dates=boj_dates,
    num_upcoming=3,
    ohlc_loader=load_tona_3m,
    watch_rate_range=watch_rate_range,
    step_bp=5,
    rate_cols=True,
    freq="B",
    on_error="skip",
)

 32%|███▏      | 17/53 [01:52<06:50, 11.42s/it]ERROR:tvDatafeed.main:Connection to remote host was lost.
ERROR:tvDatafeed.main:no data, please check the exchange and symbol
 66%|██████▌   | 35/53 [04:05<02:09,  7.17s/it]ERROR:tvDatafeed.main:Connection to remote host was lost.
ERROR:tvDatafeed.main:no data, please check the exchange and symbol
 70%|██████▉   | 37/53 [04:21<02:07,  7.99s/it]ERROR:tvDatafeed.main:Connection to remote host was lost.
ERROR:tvDatafeed.main:no data, please check the exchange and symbol
 74%|███████▎  | 39/53 [04:41<02:08,  9.20s/it]ERROR:tvDatafeed.main:Connection to remote host was lost.
ERROR:tvDatafeed.main:no data, please check the exchange and symbol
 94%|█████████▍| 50/53 [06:48<00:30, 10.24s/it]ERROR:tvDatafeed.main:Connection to remote host was lost.
ERROR:tvDatafeed.main:no data, please check the exchange and symbol
100%|██████████| 53/53 [07:03<00:00,  7.98s/it]


In [38]:
ts

Unnamed: 0_level_0,Unnamed: 1_level_0,0.50,0.60,0.65,0.55,0.70,0.75
WatchDate,BOJDate,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2025-10-01,2025-10-30,0.000000e+00,0.000000e+00,0.000,,,
2025-10-01,2025-12-18,0.000000e+00,8.000000e-01,0.200,,,
2025-10-01,2026-01-23,0.000000e+00,8.000000e-01,0.200,,,
2025-10-02,2025-10-30,0.000000e+00,0.000000e+00,0.000,,,
2025-10-02,2025-12-18,0.000000e+00,8.000000e-01,0.200,,,
...,...,...,...,...,...,...,...
2025-12-11,2026-01-23,1.563194e-13,4.500000e-01,0.000,0.55,0.00,0.000
2025-12-11,2026-03-19,0.000000e+00,1.094236e-13,0.385,0.00,0.48,0.135
2025-12-12,2025-12-18,0.000000e+00,5.000000e-01,0.000,0.50,0.00,0.000
2025-12-12,2026-01-23,1.421085e-13,5.000000e-01,0.000,0.50,0.00,0.000
