In [1]:
import pandas as pd
import numpy as np
import yfinance as yf
import pandas_ta as ta
from sklearn.model_selection import KFold
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import BaggingClassifier, RandomForestClassifier
from sklearn.metrics import matthews_corrcoef, accuracy_score, precision_score
from tqdm import tqdm
import warnings

warnings.filterwarnings('ignore')

In [2]:
file_path = '/Users/jaewonkim/Desktop/퀀트랩/lastsnp.csv'
full_df = pd.read_csv(file_path, parse_dates=['Date'])

print(f"'{file_path}' 파일 로딩 성공!")
print(f"초기 데이터 형태: {full_df.shape}")

tickers = full_df['Ticker'].unique().tolist()
print(f"\n{len(tickers)}개 종목의 메타데이터(섹터, 시가총액)를 yfinance에서 가져옵니다...")
metadata = {}
for ticker in tqdm(tickers, desc="Fetching Metadata"):
    try:
        info = yf.Ticker(ticker).info
        metadata[ticker] = {
            'Sector': info.get('sector', 'N/A'),
            'Market_Cap': info.get('marketCap', np.nan)
        }
    except Exception as e:
        metadata[ticker] = {'Sector': 'N/A', 'Market_Cap': np.nan}

metadata_df = pd.DataFrame.from_dict(metadata, orient='index').reset_index().rename(columns={'index': 'Ticker'})
full_df = pd.merge(full_df, metadata_df, on='Ticker', how='left')

full_df.dropna(subset=['Sector', 'Market_Cap'], inplace=True)
full_df = full_df[full_df['Sector'] != 'N/A']

print("\n메타데이터 병합 완료!")
print(f"최종 데이터 형태: {full_df.shape}")

'/Users/youngjaekim/Desktop/퀀트랩/lastsnp.csv' 파일 로딩 성공!
초기 데이터 형태: (556129, 8)

373개 종목의 메타데이터(섹터, 시가총액)를 yfinance에서 가져옵니다...


Fetching Metadata: 100%|██████████████████████| 373/373 [03:56<00:00,  1.57it/s]



메타데이터 병합 완료!
최종 데이터 형태: (556033, 10)


In [3]:
def feature_engineering(df):
    df.sort_values(by=['Ticker', 'Date'], inplace=True)
    
    # 논문 정의에 따른 '일중 수익률' 계산: (종가 / 시가) - 1
    df['Intraday_Return'] = (df['Close'] / df['Open']) - 1
    
    # 섹터별 평균 일중 수익률 및 비정상 수익률 (Abnormal Return)
    sector_return = df.groupby(['Date', 'Sector'])['Intraday_Return'].transform('mean')
    df['Abnormal_Return'] = df['Intraday_Return'] - sector_return
    
    # EMA 피처 (45개)
    print("EMA 피처 45개를 생성합니다...")
    short_windows = [1, 3, 5, 7, 9, 11, 13, 15, 17]
    long_windows = [3, 5, 7, 9, 11, 13, 15, 17, 19]
    combinations = []
    for s in short_windows:
        for l in long_windows:
            if l >= s + 2:
                combinations.append((s, l))
    for s, l in tqdm(combinations, desc="Generating 45 EMA Features"):
        ema_short = df.groupby('Ticker')['Close'].transform(lambda x: ta.ema(x, length=s))
        ema_long = df.groupby('Ticker')['Close'].transform(lambda x: ta.ema(x, length=l))
        df[f'EMA_{l}_{s}'] = (ema_short - ema_long) / ema_long

    # VIX 피처 (원본)
    df['VIX_20'] = ta.ema(df['VIX_Close'], length=20)
    df['VIX_100'] = ta.ema(df['VIX_Close'], length=100)
    df['VIX_Feature'] = df['VIX_20'] - df['VIX_100']

    # 개별 주식 변동성 피처
    df['Stock_Vol_5'] = df.groupby('Ticker')['Abnormal_Return'].transform(lambda x: x.ewm(span=5).std())
    df['Stock_Vol_20'] = df.groupby('Ticker')['Abnormal_Return'].transform(lambda x: x.ewm(span=20).std())
    df['Stock_Unique_Volatility'] = df['Stock_Vol_5'] - df['Stock_Vol_20']

    return df.dropna()

processed_df = feature_engineering(full_df.copy())
print("\n피처 엔지니어링 완료.")

EMA 피처 45개를 생성합니다...


Generating 45 EMA Features: 100%|███████████████| 45/45 [00:17<00:00,  2.58it/s]



피처 엔지니어링 완료.


In [4]:
# EMA 피처 이산화 (섹터별)
ema_cols = [col for col in processed_df.columns if 'EMA_' in col]
for col in tqdm(ema_cols, desc="Binning EMA features"):
    processed_df[col] = processed_df.groupby(['Date', 'Sector'])[col].transform(
        lambda x: pd.qcut(x, 5, labels=False, duplicates='drop')
    )

# VIX 피처 이산화 (30일 롤링 윈도우)
vix_daily = processed_df[['Date', 'VIX_Feature']].drop_duplicates().set_index('Date').sort_index()
def assign_rolling_quintile(series):
    try:
        bins = pd.qcut(series, 5, retbins=True, duplicates='drop')[1]
        return pd.cut([series.iloc[-1]], bins=bins, labels=False, include_lowest=True)[0]
    except (ValueError, IndexError): return np.nan
vix_daily['VIX_quintile'] = vix_daily['VIX_Feature'].rolling(window=30, min_periods=5).apply(assign_rolling_quintile, raw=False)
processed_df = pd.merge(processed_df, vix_daily[['VIX_quintile']], on='Date', how='left')

# 기타 피처 이산화 (시가총액, 고유 변동성)
for col in ['Market_Cap', 'Stock_Unique_Volatility']:
     processed_df[col] = processed_df.groupby('Date')[col].transform(
        lambda x: pd.qcut(x, 5, labels=False, duplicates='drop')
    )

# 타겟 변수 생성 및 데이터 분할
processed_df.dropna(inplace=True)
processed_df['Target'] = (processed_df['Abnormal_Return'] > 0).astype(int)

train_df = processed_df[processed_df['Date'] < '2020-01-01'].copy()
test_df = processed_df[processed_df['Date'] >= '2020-01-01'].copy()
print(f"\n학습 데이터: {train_df.shape}, 테스트 데이터: {test_df.shape}")

Binning EMA features: 100%|█████████████████████| 45/45 [24:31<00:00, 32.71s/it]



학습 데이터: (6487225, 65), 테스트 데이터: (1260336, 65)


In [7]:
# 성능 이슈로 n_splits=10-->5, n_estimators=500-->50
def ssfi_feature_selection(df, features):
    sample_weights = np.abs(df['Abnormal_Return'])
    y = df['Target']
    cv = KFold(n_splits=5, shuffle=False)
    feature_scores = {}
    for feature in tqdm(features, desc="SSFI Feature Selection"):
        X = df[[feature]].astype(int)
        scores = []
        for train_idx, val_idx in cv.split(X):
            X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
            sw_train, sw_val = sample_weights.iloc[train_idx], sample_weights.iloc[val_idx]
            
            model = BaggingClassifier(DecisionTreeClassifier(), n_estimators=50, random_state=42, n_jobs=-1)
            model.fit(X_train, y_train, sample_weight=sw_train)
            preds = model.predict(X_val)
            scores.append(matthews_corrcoef(y_val, preds, sample_weight=sw_val))
        feature_scores[feature] = np.mean(scores)
    return sorted(feature_scores.items(), key=lambda x: x[1], reverse=True)[:10]

ema_features = [col for col in train_df.columns if 'EMA_' in col]
top_10_features = ssfi_feature_selection(train_df, ema_features)
selected_features = [f[0] for f in top_10_features]
print("\n상위 10개 피처:", selected_features)

SSFI Feature Selection: 100%|████████████████| 45/45 [1:43:22<00:00, 137.82s/it]


상위 10개 피처: ['EMA_3_1', 'EMA_5_1', 'EMA_7_1', 'EMA_9_1', 'EMA_11_1', 'EMA_13_1', 'EMA_15_1', 'EMA_5_3', 'EMA_17_1', 'EMA_19_1']





In [8]:
# Sector 정수 인코딩
train_df['Sector_encoded'] = pd.factorize(train_df['Sector'])[0]
test_df['Sector_encoded'] = pd.factorize(test_df['Sector'])[0]

# 피처셋 정의
additional_features = ['Sector_encoded', 'Market_Cap', 'VIX_quintile', 'Stock_Unique_Volatility']
train_features = train_df[selected_features + additional_features]
test_features = test_df[selected_features + additional_features]

# 학습/테스트 데이터 컬럼 맞춤
train_labels, test_labels = train_features.align(test_features, join='inner', axis=1, fill_value=0)

# 최종 데이터셋 준비
X_train_primary = train_labels[selected_features]
X_test_primary = test_labels[selected_features]
X_train_add = train_labels[[col for col in additional_features if col in train_labels.columns]]
X_test_add = test_labels[[col for col in additional_features if col in test_labels.columns]]
y_train = train_df.loc[train_labels.index, 'Target']
y_test = test_df.loc[test_labels.index, 'Target']
sw_train = np.abs(train_df.loc[train_labels.index, 'Abnormal_Return'])
sw_test = np.abs(test_df.loc[test_labels.index, 'Abnormal_Return'])

In [9]:
# 모델 파라미터 정의
rf_params = {
    'n_estimators': 200, 
    'max_features': 0.5, 
    'min_weight_fraction_leaf': 0.001,
    'random_state': 42,
    'n_jobs': -1
}

# 1. Primary Model 학습
print("Primary Model 학습 중...")
primary_model = RandomForestClassifier(**rf_params)
primary_model.fit(X_train_primary, y_train, sample_weight=sw_train)
primary_preds_train = primary_model.predict(X_train_primary)

# Meta Target 생성
meta_target_train = (primary_preds_train == y_train).astype(int)

# 2. Meta Model 1 (Regimes Only) 학습
print("Meta Model 1 학습 중...")
meta_model_1 = RandomForestClassifier(**rf_params)
meta_model_1.fit(X_train_add, meta_target_train, sample_weight=sw_train)

# 3. Meta Model 2 (Regimes + X) 학습
print("Meta Model 2 학습 중...")
meta_model_2 = RandomForestClassifier(**rf_params)
meta_model_2.fit(train_labels, meta_target_train, sample_weight=sw_train)

# 4. Non-Meta Model 학습
print("Non-Meta Model 학습 중...")
non_meta_model = RandomForestClassifier(**rf_params)
non_meta_model.fit(train_labels, y_train, sample_weight=sw_train)

print("\n모든 모델 학습 완료.")

Primary Model 학습 중...
Meta Model 1 학습 중...
Meta Model 2 학습 중...
Non-Meta Model 학습 중...

모든 모델 학습 완료.


In [10]:
def evaluate_model(y_true, y_pred, sample_weight):
    if len(y_true) == 0: return {}
    mcc = matthews_corrcoef(y_true, y_pred, sample_weight=sample_weight)
    accuracy = accuracy_score(y_true, y_pred, sample_weight=sample_weight)
    precision_1 = precision_score(y_true, y_pred, sample_weight=sample_weight, pos_label=1, zero_division=0)
    precision_0 = precision_score(y_true, y_pred, sample_weight=sample_weight, pos_label=0, zero_division=0)
    return {'Matthews': mcc, 'Accuracy': accuracy, 'Precision 1': precision_1, 'Precision 0': precision_0}

print("--- OOS 평가 결과 ---")
primary_preds_test = primary_model.predict(X_test_primary)
primary_results = evaluate_model(y_test, primary_preds_test, sw_test)
print("\nPrimary Model 결과:\n", pd.Series(primary_results))

non_meta_preds_test = non_meta_model.predict(test_labels)
non_meta_results = evaluate_model(y_test, non_meta_preds_test, sw_test)
print("\nNon-Meta Model 결과:\n", pd.Series(non_meta_results))

meta_preds_1 = meta_model_1.predict(X_test_add)
meta_preds_2 = meta_model_2.predict(test_labels)

# Meta Model 1
y_test_meta1 = y_test[meta_preds_1 == 1]
primary_preds_meta1 = pd.Series(primary_preds_test, index=y_test.index)[meta_preds_1 == 1]
sw_test_meta1 = sw_test[meta_preds_1 == 1]
meta1_results = evaluate_model(y_test_meta1, primary_preds_meta1, sw_test_meta1)
print("\nMeta Model 1 (필터링 후) 결과:\n", pd.Series(meta1_results))

# Meta Model 2
y_test_meta2 = y_test[meta_preds_2 == 1]
primary_preds_meta2 = pd.Series(primary_preds_test, index=y_test.index)[meta_preds_2 == 1]
sw_test_meta2 = sw_test[meta_preds_2 == 1]
meta2_results = evaluate_model(y_test_meta2, primary_preds_meta2, sw_test_meta2)
print("\nMeta Model 2 (필터링 후) 결과:\n", pd.Series(meta2_results))

--- OOS 평가 결과 ---

Primary Model 결과:
 Matthews       0.767809
Accuracy       0.883777
Precision 1    0.874151
Precision 0    0.893912
dtype: float64

Non-Meta Model 결과:
 Matthews       0.782201
Accuracy       0.891004
Precision 1    0.882483
Precision 0    0.899912
dtype: float64

Meta Model 1 (필터링 후) 결과:
 Matthews       0.767809
Accuracy       0.883777
Precision 1    0.874151
Precision 0    0.893912
dtype: float64

Meta Model 2 (필터링 후) 결과:
 Matthews       0.786959
Accuracy       0.893332
Precision 1    0.882827
Precision 0    0.904426
dtype: float64
