In [1]:
# 시작하기: 퍼사드 임포트
from qdl.facade import QDL
import pandas as pd
import numpy as np
from qdl import validator

In [2]:
q = QDL()
# 팩터 리스트 정의
factors = ["market_equity", "be_me", "ope_be", "at_gr1", "ret_12_1", "ret_1_0"]

wide_market_equity = q.load_char(
    country="usa",
    vintage="1972-",
    char="market_equity",
    date_col="eom")

# 루프 돌면서 wide DataFrame 생성
for factor in factors:
    globals()[f"wide_{factor}"] = q.load_char(
        country="usa",
        vintage="1972-",
        char=factor,
        date_col="eom"
    )

# 확인 (각 DataFrame shape 출력)
for factor in factors:
    df = globals()[f"wide_{factor}"]
    print(f"wide_{factor}: {df.shape}")

wide_market_equity: (638, 38200)
wide_be_me: (638, 38200)
wide_ope_be: (638, 38200)
wide_at_gr1: (638, 38200)
wide_ret_12_1: (638, 38200)
wide_ret_1_0: (638, 38200)


In [3]:
# 특성(Chars) 데이터셋 (와이드): size_grp
wide_size_grp = q.load_char(
    country="usa",
    vintage="1972-",
    char="size_grp",
    date_col = "eom")

# 특성(Chars) 데이터셋 (와이드): 초과수익률 
wide_ret_exc = q.load_char(
    country="usa",
    vintage="1972-",
    char="ret_exc",
    date_col="eom")


In [4]:
# 결과 저장용 딕셔너리
top_portfolios = {}
middle_portfolios = {}
bottom_portfolios = {}

for factor in factors:
    print(f"=== {factor} 포트폴리오 분류 시작 ===")
    
    # 1. non-micro 마스크
    non_micro_mask = ~wide_size_grp.isin(["micro", "nano"])
    
    # 2. non-micro 데이터만 추출
    wide_df = globals()[f"wide_{factor}"]
    wide_non_micro = wide_df.where(non_micro_mask)
    
    # 3. cross-sectional rank (non-micro 기준)
    xs_rank = wide_non_micro.rank(axis=1, method="min", pct=True)
    
    high_mask = xs_rank >= 0.666
    low_mask  = xs_rank <= 0.333
    
    non_micro_top    = xs_rank.where(high_mask)
    non_micro_bottom = xs_rank.where(low_mask)
    non_micro_middle = xs_rank.where(~high_mask & ~low_mask)
    
    # 4. 분위수 cutoff (non-micro만 기준)
    q_low  = wide_non_micro.quantile(0.333, axis=1)
    q_high = wide_non_micro.quantile(0.666, axis=1)
    
    # 5. micro 데이터만 추출
    micro_mask = wide_size_grp.isin(["micro", "nano"])
    wide_micro = wide_df.where(micro_mask)
    
    # 6. micro 포트폴리오 분류 (non-micro cutoff 기준 사용)
    micro_top    = wide_micro.where(wide_micro.ge(q_high, axis=0))
    micro_bottom = wide_micro.where(wide_micro.le(q_low, axis=0))
    micro_middle = wide_micro.where(
        (wide_micro.gt(q_low, axis=0)) & (wide_micro.lt(q_high, axis=0))
    )
    
    # 7. non-micro + micro 합치기 (최종 포트폴리오)
    top_portfolios[factor]    = non_micro_top.combine_first(micro_top)
    middle_portfolios[factor] = non_micro_middle.combine_first(micro_middle)
    bottom_portfolios[factor] = non_micro_bottom.combine_first(micro_bottom)
    
    print(f"=== {factor} 완료 ===\n")

=== market_equity 포트폴리오 분류 시작 ===
=== market_equity 완료 ===

=== be_me 포트폴리오 분류 시작 ===
=== be_me 완료 ===

=== ope_be 포트폴리오 분류 시작 ===
=== ope_be 완료 ===

=== at_gr1 포트폴리오 분류 시작 ===
=== at_gr1 완료 ===

=== ret_12_1 포트폴리오 분류 시작 ===


MemoryError: Unable to allocate 186. MiB for an array with shape (38200, 638) and data type float64

In [None]:
# 날짜별로 포트폴리오에 포함된 주식 수 카운트

# 수정된 포트폴리오 저장 dict
filtered_top_portfolios = {}
filtered_middle_portfolios = {}
filtered_bottom_portfolios = {}

for factor in factors:
    print(f"=== {factor.upper()} 포트폴리오 필터링 ===")
    
    # 원래 포트폴리오 불러오기
    top_df    = top_portfolios[factor]
    middle_df = middle_portfolios[factor]
    bottom_df = bottom_portfolios[factor]
    
    # 날짜별 종목 수
    top_count    = top_df.notna().sum(axis=1)
    middle_count = middle_df.notna().sum(axis=1)
    bottom_count = bottom_df.notna().sum(axis=1)
    
    # 5개 미만인 날짜 찾기
    invalid_dates = (top_count < 5) | (middle_count < 5) | (bottom_count < 5)
    
    if invalid_dates.any():
        print(f"{factor}: ⚠️ {invalid_dates.sum()} 개 월 제외")
        
        # 해당 월 전체를 NaN 처리 (top/middle/bottom 모두 제외)
        top_df.loc[invalid_dates, :]    = float("nan")
        middle_df.loc[invalid_dates, :] = float("nan")
        bottom_df.loc[invalid_dates, :] = float("nan")
    else:
        print(f"{factor}: ✅ 모든 월에서 5개 이상 유지")
    
    # 필터링된 결과 저장
    filtered_top_portfolios[factor]    = top_df
    filtered_middle_portfolios[factor] = middle_df
    filtered_bottom_portfolios[factor] = bottom_df

    # 제외된 월의 날짜와 포트폴리오 종목 수 출력
for factor in factors:
    print(f"\n=== {factor.upper()} 제외된 월 상세 ===")
    
    # 원래 포트폴리오 불러오기
    top_df    = top_portfolios[factor]
    middle_df = middle_portfolios[factor]
    bottom_df = bottom_portfolios[factor]
    
    # 날짜별 종목 수
    top_count    = top_df.notna().sum(axis=1)
    middle_count = middle_df.notna().sum(axis=1)
    bottom_count = bottom_df.notna().sum(axis=1)
    
    # 5개 미만인 날짜
    invalid_dates = (top_count < 5) | (middle_count < 5) | (bottom_count < 5)
    
    if invalid_dates.any():
        invalid_info = pd.DataFrame({
            "Top_count": top_count[invalid_dates],
            "Middle_count": middle_count[invalid_dates],
            "Bottom_count": bottom_count[invalid_dates],
        })
        print(invalid_info)
    else:
        print("✅ 제외된 월 없음")

In [None]:
# 마스크 저장용 dict
top_masks = {}
middle_masks = {}
bottom_masks = {}

for factor in factors:
    print(f"=== {factor.upper()} 마스크 생성 ===")
    
    # 최종 포트폴리오 불러오기 (필터링된 버전 쓰셔도 됨)
    top_df    = filtered_top_portfolios[factor]
    middle_df = filtered_middle_portfolios[factor]
    bottom_df = filtered_bottom_portfolios[factor]
    
    # notna() → 불리언 마스크 생성
    top_masks[factor]    = top_df.notna()
    middle_masks[factor] = middle_df.notna()
    bottom_masks[factor] = bottom_df.notna()

In [None]:
# Top=Short 인 팩터들
top_as_short = ["market_equity", "at_gr1", "ret_1_0"]

# 결과 저장 dict
short_ports = {}
long_ports = {}

for factor in factors:
    print(f"=== {factor.upper()} 롱/숏 포트폴리오 계산 ===")
    
    top_mask    = top_masks[factor].shift(1)     # forward-looking 방지
    bottom_mask = bottom_masks[factor].shift(1)
    
    if factor in top_as_short:
        # Top = Short, Bottom = Long
        short_ports[factor] = -1 * wide_ret_exc.where(top_mask)
        long_ports[factor]  =      wide_ret_exc.where(bottom_mask)
    else:
        # Top = Long, Bottom = Short
        short_ports[factor] = -1 * wide_ret_exc.where(bottom_mask)
        long_ports[factor]  =      wide_ret_exc.where(top_mask)

In [None]:
# 결과 저장 dict
factor_returns_ew = {}
factor_answers_ew = {}

for factor in factors:
    print(f"=== {factor.upper()} EW 팩터 수익률 계산 ===")
    
    # 1. 직접 계산 (short/long은 앞에서 만든 dict 사용)
    short_port = short_ports[factor]
    long_port  = long_ports[factor]
    
    short_ret = short_port.mean(axis=1)
    long_ret  = long_port.mean(axis=1)
    
    factor_ret_ew = short_ret + long_ret   # factor = short + long
    factor_returns_ew[factor] = factor_ret_ew
    
    # 2. reference 불러오기
    factor_answer = q.load_factors(
        country="usa",
        dataset="factor",
        weighting="ew",
        factors=[factor],
    )
    factor_answers_ew[factor] = factor_answer
    
    # 3. 그래프 비교
    ax = factor_ret_ew.cumsum().plot(label=f"{factor} (user)")
    factor_answer.loc["2020-01-31":, :].cumsum().plot(ax=ax, label=f"{factor} (ref)")
    ax.set_title("EW Factor: user vs reference")
    ax.legend()

In [None]:
# 결과 저장 dict
validation_reports = {}

def _fmt(x):
    return f"{x:.3f}" if x is not None else "nan"

for factor in factors:
    print(f"\n=== {factor.upper()} EW Factor 검증 ===")
    
    # 1. user factor (Series → DataFrame)
    user_factor = pd.DataFrame(factor_returns_ew[factor])
    user_factor.columns = [factor]
    
    # 2. reference factor
    ref_factor = factor_answers_ew[factor]
    
    # 🔎 길이 및 날짜 범위 체크
    print(f"[DEBUG] user_factor: {user_factor.dropna().shape[0]}개 "
          f"({user_factor.dropna().index.min()} ~ {user_factor.dropna().index.max()})")
    print(f"[DEBUG] ref_factor : {ref_factor.dropna().shape[0]}개 "
          f"({ref_factor.dropna().index.min()} ~ {ref_factor.dropna().index.max()})")
    
    # 3. validator 실행
    report = validator.validate_factor(
        user=user_factor,
        reference=ref_factor,
        return_plot=True,
        plot_title=f"{factor} factor(ew): user vs reference",
    )
    
    # 4. 결과 출력
    print("관측치수:", report.n_obs, 
          "시작일:", report.date_start, 
          "종료일:", report.date_end)
    print(
        "mse:", _fmt(report.mse),
        "rmse:", _fmt(report.rmse),
        "mae:", _fmt(report.mae),
        "corr:", _fmt(report.corr),
    )
    
    # 5. dict에 저장
    validation_reports[factor] = report


VW

In [None]:
# Top=Short 인 팩터들
top_as_short = ["market_equity", "at_gr1", "ret_1_0"]

# === 결과 저장 dict ===
short_ports_vw = {}
long_ports_vw = {}

# === VW 가중치 계산 함수 ===
def get_portfolio_vw_weights(me_df, mask_shifted):
    """
    me_df         : wide_{factor} (factor별 와이드 ME 데이터)
    mask_shifted  : t-1 시점 포트폴리오 마스크
    """
    port_me = me_df.where(mask_shifted)                 # 포트 ME만 남김
    return port_me.div(port_me.sum(axis=1), axis=0)     # Σ포트 ME로 나눠서 정규화

for factor in factors:
    print(f"=== {factor.upper()} VW 포트폴리오 계산 ===")
    
    # 1. wide_{factor} ME 데이터 불러오기
    me_df = globals()[f"wide_{factor}"]
    
    # 2. 마스크 불러오기 (t-1 기준으로 shift 적용)
    top_mask_shifted    = top_masks[factor].shift(1)
    middle_mask_shifted = middle_masks[factor].shift(1)
    bottom_mask_shifted = bottom_masks[factor].shift(1)
    
    # 3. VW 가중치 계산
    top_weights    = get_portfolio_vw_weights(me_df.shift(1), top_mask_shifted)
    middle_weights = get_portfolio_vw_weights(me_df.shift(1), middle_mask_shifted)
    bottom_weights = get_portfolio_vw_weights(me_df.shift(1), bottom_mask_shifted)
    
    # 4. ret_exc × weight → 포트폴리오 수익률
    top_ret    = (wide_ret_exc * top_weights).sum(axis=1)
    middle_ret = (wide_ret_exc * middle_weights).sum(axis=1)
    bottom_ret = (wide_ret_exc * bottom_weights).sum(axis=1)
    
    # 5. 팩터 방향 적용 (Top=Short or Bottom=Short)
    if factor in top_as_short:
        short_ports_vw[factor] = top_ret
        long_ports_vw[factor]  = bottom_ret
    else:
        short_ports_vw[factor] = bottom_ret
        long_ports_vw[factor]  = top_ret

# === VW 가중치 계산 함수 ===
def get_portfolio_vw_weights(me_df, mask_shifted):
    """
    me_df         : wide_{factor} (factor별 와이드 ME 데이터)
    mask_shifted  : t-1 시점 포트폴리오 마스크
    """
    port_me = me_df.where(mask_shifted)                 # 포트 ME만 남김
    return port_me.div(port_me.sum(axis=1), axis=0)     # Σ포트 ME로 나눠서 정규화


# === 결과 저장 dict ===
top_weights_vw = {}
middle_weights_vw = {}
bottom_weights_vw = {}

for factor in factors:
    print(f"=== {factor.upper()} VW 가중치 계산 ===")
    
    # (1) wide_{factor} ME 데이터 불러오기
    me_df = wide_market_equity
    
    # (2) 포트폴리오 마스크 (t-1 기준 shift)
    top_mask_shifted    = top_masks[factor].shift(1)
    middle_mask_shifted = middle_masks[factor].shift(1)
    bottom_mask_shifted = bottom_masks[factor].shift(1)
    
    # (3) VW 가중치 계산
    top_weights_vw[factor]    = get_portfolio_vw_weights(me_df.shift(1), top_mask_shifted)
    middle_weights_vw[factor] = get_portfolio_vw_weights(me_df.shift(1), middle_mask_shifted)
    bottom_weights_vw[factor] = get_portfolio_vw_weights(me_df.shift(1), bottom_mask_shifted)

# === 포트폴리오별 VW 수익률 계산 함수 ===
def get_portfolio_vw_return(ret_df, weights):
    return (ret_df * weights).sum(axis=1)   # 종목별 ret × weight 합


# === 결과 저장 dict ===
top_rets_vw = {}
middle_rets_vw = {}
bottom_rets_vw = {}

for factor in factors:
    print(f"=== {factor.upper()} VW 수익률 계산 ===")
    
    # (1) 포트폴리오별 가중치 불러오기
    top_w    = top_weights_vw[factor]
    middle_w = middle_weights_vw[factor]
    bottom_w = bottom_weights_vw[factor]
    
    # (2) 수익률 계산 (ret_exc × weight)
    top_rets_vw[factor]    = get_portfolio_vw_return(wide_ret_exc, top_w)
    middle_rets_vw[factor] = get_portfolio_vw_return(wide_ret_exc, middle_w)
    bottom_rets_vw[factor] = get_portfolio_vw_return(wide_ret_exc, bottom_w)

    # 결과 저장 dict
factor_returns_vw = {}
factor_answers_vw = {}

for factor in factors:
    print(f"=== {factor.upper()} VW 팩터 수익률 계산 ===")
    
    # (1) 포트폴리오 수익률 불러오기
    top_ret    = top_rets_vw[factor]
    bottom_ret = bottom_rets_vw[factor]
    
    # (2) 팩터 방향 결정 (Top=Short 여부)
    if factor in ["market_equity", "at_gr1", "ret_1_0"]:
        factor_ret_vw = bottom_ret - top_ret   # 높은 값 short
    else:
        factor_ret_vw = top_ret - bottom_ret   # 낮은 값 short
    
    # (3) 저장
    factor_returns_vw[factor] = factor_ret_vw
    
    # (4) reference factor 불러오기
    factor_answer = q.load_factors(
        country="usa",
        dataset="factor",
        weighting="vw",
        factors=[factor],
    )
    factor_answers_vw[factor] = factor_answer
    
    # (5) 그래프 비교
    ax = factor_ret_vw.cumsum().plot(label=f"{factor} (user)")
    factor_answer.loc["2020-01-31":, :].cumsum().plot(ax=ax, label=f"{factor} (ref)")
    ax.set_title("VW Factor: user vs reference")
    ax.legend()


In [None]:
# 결과 저장 dict
validation_reports = {}

def _fmt(x):
    return f"{x:.3f}" if x is not None else "nan"

for factor in factors:
    print(f"\n=== {factor.upper()} VW Factor 검증 ===")
    
    # 1. user factor (Series → DataFrame)
    user_factor = pd.DataFrame(factor_returns_vw[factor])
    user_factor.columns = [factor]
    
    # 2. reference factor
    ref_factor = factor_answers_vw[factor]
    
    # 🔎 길이 및 날짜 범위 체크
    print(f"[DEBUG] user_factor: {user_factor.dropna().shape[0]}개 "
          f"({user_factor.dropna().index.min()} ~ {user_factor.dropna().index.max()})")
    print(f"[DEBUG] ref_factor : {ref_factor.dropna().shape[0]}개 "
          f"({ref_factor.dropna().index.min()} ~ {ref_factor.dropna().index.max()})")
    
    # 3. validator 실행
    report = validator.validate_factor(
        user=user_factor,
        reference=ref_factor,
        return_plot=True,
        plot_title=f"{factor} factor(vw): user vs reference",
    )
    
    # 4. 결과 출력
    print("관측치수:", report.n_obs, 
          "시작일:", report.date_start, 
          "종료일:", report.date_end)
    print(
        "mse:", _fmt(report.mse),
        "rmse:", _fmt(report.rmse),
        "mae:", _fmt(report.mae),
        "corr:", _fmt(report.corr),
    )
    
    # 5. dict에 저장
    validation_reports[factor] = report


VW_cap

In [None]:
# Top=Short 인 팩터들
top_as_short = ["market_equity", "at_gr1", "ret_1_0"]

# === 결과 저장 dict ===
short_ports_vw = {}
long_ports_vw = {}

# === Capped VW 가중치 계산 함수 ===
def get_vw_cap_weights(me_df, mask_shifted, cap=0.8):
    """
    me_df        : wide_market_equity (전체 종목 ME, t-1 기준)
    mask_shifted : t-1 시점 포트폴리오 마스크
    cap          : 개별 weight 상한 (기본=0.8)
    """
    # 포트 ME만 추출
    port_me = me_df.where(mask_shifted)
    
    # 기본 VW weight (포트 내부 합 = 1)
    weights = port_me.div(port_me.sum(axis=1), axis=0)
    
    # cap 적용 후 normalize
    capped = weights.clip(upper=cap)
    capped_norm = capped.div(capped.sum(axis=1), axis=0)  # 합 다시 1
    
    return capped_norm


# === 결과 저장 dict ===
top_weights_vw_cap = {}
middle_weights_vw_cap = {}
bottom_weights_vw_cap = {}

for factor in factors:
    print(f"=== {factor.upper()} VW_CAP 가중치 계산 ===")
    
    # (1) 항상 market_equity 기준으로 가중치 계산
    me_df = wide_market_equity
    
    # (2) 포트폴리오 마스크 (t-1 기준 shift)
    top_mask_shifted    = top_masks[factor].shift(1)
    middle_mask_shifted = middle_masks[factor].shift(1)
    bottom_mask_shifted = bottom_masks[factor].shift(1)
    
    # (3) VW Cap 가중치 계산
    top_weights_vw_cap[factor]    = get_vw_cap_weights(me_df.shift(1), top_mask_shifted, cap=0.8)
    middle_weights_vw_cap[factor] = get_vw_cap_weights(me_df.shift(1), middle_mask_shifted, cap=0.8)
    bottom_weights_vw_cap[factor] = get_vw_cap_weights(me_df.shift(1), bottom_mask_shifted, cap=0.8)

# === 포트폴리오 수익률 계산 함수 ===
def get_portfolio_return(ret_df, weights):
    return (ret_df * weights).sum(axis=1)


# === 결과 저장 dict ===
top_rets_vw_cap = {}
middle_rets_vw_cap = {}
bottom_rets_vw_cap = {}

for factor in factors:
    print(f"=== {factor.upper()} VW_CAP 포트폴리오 수익률 계산 ===")
    
    # (1) factor별 가중치 불러오기
    top_w    = top_weights_vw_cap[factor]
    middle_w = middle_weights_vw_cap[factor]
    bottom_w = bottom_weights_vw_cap[factor]
    
    # (2) 수익률 계산 (ret_exc × weight)
    top_rets_vw_cap[factor]    = get_portfolio_return(wide_ret_exc, top_w)
    middle_rets_vw_cap[factor] = get_portfolio_return(wide_ret_exc, middle_w)
    bottom_rets_vw_cap[factor] = get_portfolio_return(wide_ret_exc, bottom_w)

    # 결과 저장 dict
factor_returns_vw_cap = {}
factor_answers_vw_cap = {}

for factor in factors:
    print(f"=== {factor.upper()} VW_CAP 팩터 수익률 계산 ===")
    
    # (1) 포트폴리오 수익률 불러오기
    top_ret    = top_rets_vw_cap[factor]
    bottom_ret = bottom_rets_vw_cap[factor]
    
    # (2) 팩터 방향 결정 (Top=Short 여부)
    if factor in top_as_short:
        factor_ret_vw_cap = bottom_ret - top_ret   # 높은 값 short
    else:
        factor_ret_vw_cap = top_ret - bottom_ret   # 낮은 값 short
    
    # (3) 저장
    factor_returns_vw_cap[factor] = factor_ret_vw_cap
    
    # (4) reference factor 불러오기
    factor_answer = q.load_factors(
        country="usa",
        dataset="factor",
        weighting="vw_cap",
        factors=[factor],
    )
    factor_answers_vw_cap[factor] = factor_answer
    
    # (5) 그래프 비교
    ax = factor_ret_vw_cap.cumsum().plot(label=f"{factor} (user)")
    factor_answer.loc["2020-01-31":, :].cumsum().plot(ax=ax, label=f"{factor} (ref)")
    ax.set_title("VW_cap Factor: user vs reference")
    ax.legend()


In [None]:
# 결과 저장 dict
validation_reports = {}

def _fmt(x):
    return f"{x:.3f}" if x is not None else "nan"

for factor in factors:
    print(f"\n=== {factor.upper()} VW_cap Factor 검증 ===")
    
    # 1. user factor (Series → DataFrame)
    user_factor = pd.DataFrame(factor_returns_vw_cap[factor])
    user_factor.columns = [factor]
    
    # 2. reference factor
    ref_factor = factor_answers_vw_cap[factor]
    
    # 🔎 길이 및 날짜 범위 체크
    print(f"[DEBUG] user_factor: {user_factor.dropna().shape[0]}개 "
          f"({user_factor.dropna().index.min()} ~ {user_factor.dropna().index.max()})")
    print(f"[DEBUG] ref_factor : {ref_factor.dropna().shape[0]}개 "
          f"({ref_factor.dropna().index.min()} ~ {ref_factor.dropna().index.max()})")
    
    # 3. validator 실행
    report = validator.validate_factor(
        user=user_factor,
        reference=ref_factor,
        return_plot=True,
        plot_title=f"{factor} factor(vw_cap): user vs reference",
    )
    
    # 4. 결과 출력
    print("관측치수:", report.n_obs, 
          "시작일:", report.date_start, 
          "종료일:", report.date_end)
    print(
        "mse:", _fmt(report.mse),
        "rmse:", _fmt(report.rmse),
        "mae:", _fmt(report.mae),
        "corr:", _fmt(report.corr),
    )
    
    # 5. dict에 저장
    validation_reports[factor] = report
