In [1]:

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score

from sklearn.model_selection import GridSearchCV, RandomizedSearchCV

import yfinance as yf
import numpy as np
import pandas as pd


def get_ta_data(data):
    from ta import add_all_ta_features
    df = data

    # 열 이름에서 'AAPL'을 제거
    df.columns = [col[0] for col in df.columns]

    df = df[['Open', 'High', 'Low', 'Close', 'Volume']]
    # df.set_index('Datetime', inplace=True)
    adf = add_all_ta_features(
        df,
        open="Open", high="High", low="Low", close="Close", volume="Volume"
    )
    adf = adf.drop(
        ['trend_psar_down', 'trend_psar_up'], axis=1
    )
    adf.dropna(inplace=True)
    return adf

data = yf.download('AAPL', start='2023-01-01')

_X = data[['Open', 'High', 'Low', 'Close', 'Volume']]
X = get_ta_data(_X)
y = np.where(X['Close'].shift(-5) > X['Close'], 1, -1)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

model = RandomForestClassifier(n_estimators=100, random_state=42)

param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [10, 20, 30, 40, 50],
}

grid_search = GridSearchCV(model, param_grid, cv=5, scoring='accuracy')
grid_search.fit(X_train, y_train)

print(f'Best Parameters: {grid_search.best_params_}')
model = RandomForestClassifier(**grid_search.best_params_)

cvs = cross_val_score(model, X_train, y_train, cv=5, scoring='f1')

print(f'Acc Scores: {cvs}')
print(f'Acc Scores Mean: {cvs.mean()}')



  data = yf.download('AAPL', start='2023-01-01')
[*********************100%***********************]  1 of 1 completed
  self._psar[i] = high2


Best Parameters: {'max_depth': 20, 'n_estimators': 100}
Acc Scores: [0.84955752 0.84033613 0.83050847 0.88333333 0.82142857]
Acc Scores Mean: 0.8450328071831702


In [6]:
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score

from ta import add_all_ta_features
import yfinance as yf
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

# =========================
# 1. TA feature 생성 함수
# =========================
def get_ta_data(data: pd.DataFrame) -> pd.DataFrame:
    df = data.copy()

    # 멀티인덱스 컬럼 처리: ('AAPL','Open') -> 'Open' 으로 변경
    if isinstance(df.columns, pd.MultiIndex):
        # MultiIndex의 경우: level 0은 티커명, level 1은 실제 컬럼명
        # 두 레벨 모두 확인해서 올바른 레벨 선택
        level_0 = df.columns.get_level_values(0).unique()
        level_1 = df.columns.get_level_values(1).unique()
        
        # level 1에 'Open', 'High' 등이 있으면 level 1 사용
        if any(col in level_1 for col in ['Open', 'High', 'Low', 'Close', 'Volume']):
            df.columns = df.columns.get_level_values(1)
        # level 0에 있으면 level 0 사용 (드문 경우)
        elif any(col in level_0 for col in ['Open', 'High', 'Low', 'Close', 'Volume']):
            df.columns = df.columns.get_level_values(0)
        else:
            # 둘 다 아니면 level 1 사용 (기본값)
            df.columns = df.columns.get_level_values(1)
    else:
        # 일반 Index인데 튜플 형태인 경우 처리
        if len(df.columns) > 0 and isinstance(df.columns[0], tuple):
            df.columns = [col[1] if len(col) > 1 else col[0] for col in df.columns]

    # 필요한 컬럼만 선택
    required_cols = ['Open', 'High', 'Low', 'Close', 'Volume']
    
    # 컬럼이 존재하는지 확인
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        # 디버깅 정보 출력
        print(f"디버깅: 컬럼 타입 = {type(df.columns)}")
        print(f"디버깅: 원본 컬럼 = {list(data.columns)}")
        print(f"디버깅: 변환 후 컬럼 = {list(df.columns)}")
        raise ValueError(f"필요한 컬럼을 찾을 수 없습니다: {missing_cols}. 사용 가능한 컬럼: {list(df.columns)}")
    
    df = df[required_cols]

    adf = add_all_ta_features(
        df,
        open="Open", high="High", low="Low", close="Close", volume="Volume"
    )

    # 원래 코드에 있던 PSAR 컬럼 제거 (존재할 때만)
    drop_cols = [c for c in ['trend_psar_down', 'trend_psar_up'] if c in adf.columns]
    if drop_cols:
        adf = adf.drop(drop_cols, axis=1)

    adf.dropna(inplace=True)
    return adf

# =========================
# 2. 데이터 다운로드 & 전처리
# =========================
data = yf.download('AAPL', start='2023-01-01')

X_df = get_ta_data(data)

# 레이블: 5일 뒤 종가가 지금보다 높으면 1, 아니면 0
y = np.where(X_df['Close'].shift(-5) > X_df['Close'], 1, 0)

# shift로 생긴 NaN 제거
mask = ~pd.isna(y)
X_df = X_df[mask]
y = y[mask]

X = X_df.values.astype(np.float32)
y = y.astype(int)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train).astype(np.float32)
X_test_scaled = scaler.transform(X_test).astype(np.float32)

input_dim = X_train_scaled.shape[1]

# =========================
# 3. PyTorch 모델 정의
# =========================
class MLP(nn.Module):
    def __init__(self, input_dim: int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, 1),  # binary classification (logit)
        )

    def forward(self, x):
        # 출력: (batch,) 로 맞춰줌
        return self.net(x).squeeze(-1)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# =========================
# 4. 학습/평가 유틸 함수
# =========================
def train_one_fold(model, train_loader, val_loader, device,
                   epochs=30, lr=1e-3, patience=5):
    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    best_val_loss = float('inf')
    best_state = None
    patience_cnt = 0

    for epoch in range(1, epochs + 1):
        # ----- Train -----
        model.train()
        train_loss = 0.0
        for xb, yb in train_loader:
            xb = xb.to(device)
            yb = yb.to(device)

            optimizer.zero_grad()
            logits = model(xb)
            loss = criterion(logits, yb)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * xb.size(0)

        train_loss /= len(train_loader.dataset)

        # ----- Validation -----
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for xb, yb in val_loader:
                xb = xb.to(device)
                yb = yb.to(device)
                logits = model(xb)
                loss = criterion(logits, yb)
                val_loss += loss.item() * xb.size(0)

        val_loss /= len(val_loader.dataset)

        print(f"Epoch [{epoch:02d}] "
              f"Train Loss: {train_loss:.4f}  Val Loss: {val_loss:.4f}")

        # Early stopping 체크
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_state = model.state_dict()
            patience_cnt = 0
        else:
            patience_cnt += 1
            if patience_cnt >= patience:
                print("Early stopping!")
                break

    # 가장 좋은 상태로 복구
    if best_state is not None:
        model.load_state_dict(best_state)

    return model


def predict_proba(model, loader, device):
    model.eval()
    probs_list = []
    with torch.no_grad():
        for xb, _ in loader:
            xb = xb.to(device)
            logits = model(xb)
            probs = torch.sigmoid(logits)
            probs_list.append(probs.cpu().numpy())
    return np.concatenate(probs_list, axis=0)

# =========================
# 5. K-Fold Cross Validation
# =========================
kf = KFold(n_splits=5, shuffle=True, random_state=42)
batch_size = 64

fold_f1_scores = []

for fold, (tr_idx, val_idx) in enumerate(kf.split(X_train_scaled), 1):
    print(f"\n===== Fold {fold} =====")

    X_tr = X_train_scaled[tr_idx]
    y_tr = y_train[tr_idx]
    X_val = X_train_scaled[val_idx]
    y_val = y_train[val_idx]

    # Torch Tensor로 변환
    X_tr_tensor = torch.tensor(X_tr, dtype=torch.float32)
    y_tr_tensor = torch.tensor(y_tr, dtype=torch.float32)  # BCEWithLogits → float
    X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val, dtype=torch.float32)

    train_ds = TensorDataset(X_tr_tensor, y_tr_tensor)
    val_ds = TensorDataset(X_val_tensor, y_val_tensor)

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)

    model = MLP(input_dim).to(device)

    # 한 fold 학습
    model = train_one_fold(model, train_loader, val_loader, device,
                           epochs=30, lr=1e-3, patience=5)

    # validation에 대한 F1 스코어
    val_loader_for_pred = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
    val_probs = predict_proba(model, val_loader_for_pred, device)
    val_pred = (val_probs > 0.5).astype(int)

    f1 = f1_score(y_val, val_pred)
    fold_f1_scores.append(f1)
    print(f"Fold {fold} F1 Score: {f1:.4f}")

print("\n===== Cross-Validation 결과 =====")
print("F1 Scores:", fold_f1_scores)
print("F1 Mean:", np.mean(fold_f1_scores))

# =========================
# 6. 최종 모델 (train 전체로 학습 후 test 평가)
# =========================
# train 내부에서 다시 80/20으로 나눠서 val 사용
X_tr_all, X_val_all, y_tr_all, y_val_all = train_test_split(
    X_train_scaled, y_train, test_size=0.2, random_state=42, stratify=y_train
)

X_tr_all_t = torch.tensor(X_tr_all, dtype=torch.float32)
y_tr_all_t = torch.tensor(y_tr_all, dtype=torch.float32)
X_val_all_t = torch.tensor(X_val_all, dtype=torch.float32)
y_val_all_t = torch.tensor(y_val_all, dtype=torch.float32)

train_ds_all = TensorDataset(X_tr_all_t, y_tr_all_t)
val_ds_all = TensorDataset(X_val_all_t, y_val_all_t)

train_loader_all = DataLoader(train_ds_all, batch_size=batch_size, shuffle=True)
val_loader_all = DataLoader(val_ds_all, batch_size=batch_size, shuffle=False)

final_model = MLP(input_dim).to(device)
final_model = train_one_fold(final_model, train_loader_all, val_loader_all, device,
                             epochs=30, lr=1e-3, patience=5)

# Test 세트 평가
X_test_t = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_t = torch.tensor(y_test, dtype=torch.float32)

test_ds = TensorDataset(X_test_t, y_test_t)
test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False)

test_probs = predict_proba(final_model, test_loader, device)
test_pred = (test_probs > 0.5).astype(int)

test_f1 = f1_score(y_test, test_pred)
print("\n===== Test 성능 =====")
print(f"Test F1 Score: {test_f1:.4f}")


  data = yf.download('AAPL', start='2023-01-01')
[*********************100%***********************]  1 of 1 completed
  self._psar[i] = high2


Using device: cpu

===== Fold 1 =====
Epoch [01] Train Loss: 0.6917  Val Loss: 0.6746
Epoch [02] Train Loss: 0.6766  Val Loss: 0.6644
Epoch [03] Train Loss: 0.6696  Val Loss: 0.6572
Epoch [04] Train Loss: 0.6709  Val Loss: 0.6544
Epoch [05] Train Loss: 0.6586  Val Loss: 0.6539
Epoch [06] Train Loss: 0.6648  Val Loss: 0.6550
Epoch [07] Train Loss: 0.6509  Val Loss: 0.6495
Epoch [08] Train Loss: 0.6437  Val Loss: 0.6382
Epoch [09] Train Loss: 0.6349  Val Loss: 0.6345
Epoch [10] Train Loss: 0.6159  Val Loss: 0.6276
Epoch [11] Train Loss: 0.6102  Val Loss: 0.6289
Epoch [12] Train Loss: 0.5905  Val Loss: 0.6203
Epoch [13] Train Loss: 0.5783  Val Loss: 0.6151
Epoch [14] Train Loss: 0.5655  Val Loss: 0.6179
Epoch [15] Train Loss: 0.5686  Val Loss: 0.6119
Epoch [16] Train Loss: 0.5478  Val Loss: 0.6079
Epoch [17] Train Loss: 0.5308  Val Loss: 0.5979
Epoch [18] Train Loss: 0.5002  Val Loss: 0.5958
Epoch [19] Train Loss: 0.5057  Val Loss: 0.5936
Epoch [20] Train Loss: 0.5127  Val Loss: 0.5729
Ep