# Day 5 - Part 3: DL로 모멘텀 전략 개선하기

## 학습 목표
- 딥러닝으로 "다음 달 수익률" 예측하기
- 모멘텀 전략과 DL 전략 **공정 비교**하기
- Out-of-Sample(OOS) 테스트로 과적합 방지하기

## 핵심 아이디어
- **기본 모멘텀**: 12개월 수익률 상위 30개 중 → Top 5 선택
- **DL 오버레이**: 같은 30개 후보를 DL로 예측 → 예측 상위 5 선택
- **공정 비교**: 같은 후보군, 같은 선택 수, 같은 백테스트 조건

In [None]:
import backtrader as bt
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

# PyTorch (DL)
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler

print("라이브러리 임포트 완료!")
print(f"   PyTorch 버전: {torch.__version__}")

## 1. 데이터 준비 (15개 종목으로 간소화)

In [None]:
# 실습용 15개 종목
tickers = {
    '005930.KS': '삼성전자',
    '000660.KS': 'SK하이닉스',
    '035420.KS': 'NAVER',
    '051910.KS': 'LG화학',
    '006400.KS': '삼성SDI',
    '005380.KS': '현대차',
    '012330.KS': '현대모비스',
    '055550.KS': '신한지주',
    '068270.KS': '셀트리온',
    '207940.KS': '삼성바이오로직스',
    '000270.KS': '기아',
    '035720.KS': '카카오',
    '028260.KS': '삼성물산',
    '009150.KS': '삼성전기',
    '086790.KS': '하나금융지주'
}

print(f"{len(tickers)}개 종목으로 실습")

## 2. 데이터 다운로드

In [None]:
start_date = '2019-01-01'
end_date = '2023-12-31'

all_data = {}

print("\n데이터 다운로드...\n")

for ticker, name in tickers.items():
    try:
        data = yf.download(ticker, start=start_date, end=end_date, progress=False)
        if not data.empty and len(data) > 300:
            all_data[ticker] = data
            print(f"{name:15s}: {len(data):4d}일")
    except:
        print(f"{name:15s}: 오류")

print(f"\n{len(all_data)}개 종목 준비 완료")

## 3. 피처 생성 (비전공자용 최소 6개)

In [None]:
def create_features(data):
    """
    각 종목의 피처 생성
    
    피처:
    1. mom_20: 20일 모멘텀
    2. mom_6m: 6개월 모멘텀
    3. mom_12m: 12개월 모멘텀
    4. sma20_ratio: 20일 이평선 대비 비율
    5. rsi14: RSI(14일)
    6. vol_20: 20일 변동성
    """
    df = data.copy()
    close = df['Close']
    
    # 1. 모멘텀 (수익률)
    df['mom_20'] = close.pct_change(20) * 100
    df['mom_6m'] = close.pct_change(126) * 100  # 6개월 ≈ 126일
    df['mom_12m'] = close.pct_change(252) * 100  # 12개월 ≈ 252일
    
    # 2. 이동평균 비율
    sma_20 = close.rolling(20).mean()
    df['sma20_ratio'] = (close / sma_20 - 1) * 100
    
    # 3. RSI
    delta = close.diff()
    gain = delta.where(delta > 0, 0).rolling(14).mean()
    loss = -delta.where(delta < 0, 0).rolling(14).mean()
    rs = gain / loss
    df['rsi14'] = 100 - (100 / (1 + rs))
    
    # 4. 변동성 (20일 표준편차)
    df['vol_20'] = close.pct_change().rolling(20).std() * 100
    
    # 5. 목표 변수: 다음 달(21일) 수익률
    df['target'] = close.shift(-21).pct_change(21) * 100
    
    # NaN 제거
    df = df.dropna()
    
    return df

# 각 종목의 피처 생성
feature_data = {}

print("\n피처 생성 중...\n")

for ticker, data in all_data.items():
    features = create_features(data)
    if len(features) > 0:
        feature_data[ticker] = features
        print(f"{tickers[ticker]:15s}: {len(features):4d}행")

print(f"\n피처 생성 완료")

# 피처 확인
sample_ticker = list(feature_data.keys())[0]
print(f"\n샘플 피처 (마지막 5행):")
print(feature_data[sample_ticker][['mom_20', 'mom_6m', 'mom_12m', 'rsi14', 'target']].tail())

## 4. 학습/테스트 데이터 분할

- **학습**: 2020-01-01 ~ 2023-10-31 (10개월)
- **테스트**: 2023-11-01 ~ 2023-12-31 (2개월 OOS)

In [None]:
train_end = '2023-10-31'
test_start = '2023-11-01'

X_train_list = []
y_train_list = []
X_test_list = []
y_test_list = []

feature_cols = ['mom_20', 'mom_6m', 'mom_12m', 'sma20_ratio', 'rsi14', 'vol_20']

for ticker, df in feature_data.items():
    # 학습 데이터
    train_df = df[df.index <= train_end]
    if len(train_df) > 0:
        X_train_list.append(train_df[feature_cols].values)
        y_train_list.append(train_df['target'].values)
    
    # 테스트 데이터
    test_df = df[df.index >= test_start]
    if len(test_df) > 0:
        X_test_list.append(test_df[feature_cols].values)
        y_test_list.append(test_df['target'].values)

# 세로로 쌓기 (모든 종목 통합)
X_train = np.vstack(X_train_list)
y_train = np.concatenate(y_train_list)
X_test = np.vstack(X_test_list)
y_test = np.concatenate(y_test_list)

print(f"데이터 분할 완료")
print(f"   학습: {X_train.shape[0]:,}행 × {X_train.shape[1]}개 피처")
print(f"   테스트: {X_test.shape[0]:,}행 × {X_test.shape[1]}개 피처")

## 5. 데이터 정규화

In [None]:
# StandardScaler로 정규화
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# PyTorch 텐서로 변환
X_train_tensor = torch.FloatTensor(X_train_scaled)
y_train_tensor = torch.FloatTensor(y_train).reshape(-1, 1)
X_test_tensor = torch.FloatTensor(X_test_scaled)
y_test_tensor = torch.FloatTensor(y_test).reshape(-1, 1)

print("정규화 완료")
print(f"   학습 데이터: {X_train_tensor.shape}")
print(f"   테스트 데이터: {X_test_tensor.shape}")

## 6. 간단한 신경망 모델

In [None]:
class SimpleNN(nn.Module):
    """
    간단한 3층 신경망
    - 입력: 6개 피처
    - 은닉층 1: 32개 뉴런
    - 은닉층 2: 16개 뉴런
    - 출력: 1개 (다음 달 수익률 예측)
    """
    def __init__(self, input_size=6):
        super(SimpleNN, self).__init__()
        
        self.layers = nn.Sequential(
            nn.Linear(input_size, 32),
            nn.ReLU(),
            nn.Dropout(0.2),
            
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Dropout(0.2),
            
            nn.Linear(16, 1)
        )
    
    def forward(self, x):
        return self.layers(x)

# 모델 생성
model = SimpleNN(input_size=6)
print("모델 생성 완료")
print(f"\n{model}")

## 7. 모델 학습

In [None]:
# 학습 설정
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
epochs = 50

print(f"\n{'='*60}")
print(f"모델 학습 시작 (총 {epochs}회)")
print(f"{'='*60}\n")

# 학습 루프
train_losses = []

for epoch in range(epochs):
    model.train()
    
    # Forward
    predictions = model(X_train_tensor)
    loss = criterion(predictions, y_train_tensor)
    
    # Backward
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    train_losses.append(loss.item())
    
    # 10 에폭마다 출력
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1:3d}/{epochs}: Loss = {loss.item():.4f}")

print(f"\n학습 완료!")

# 학습 곡선
plt.figure(figsize=(10, 5))
plt.plot(train_losses, linewidth=2)
plt.title('학습 곡선 (Training Loss)', fontsize=14, fontweight='bold')
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss (MSE)', fontsize=12)
plt.grid(alpha=0.3)
plt.show()

## 8. OOS 테스트

In [None]:
# 테스트 예측
model.eval()
with torch.no_grad():
    test_predictions = model(X_test_tensor).numpy().flatten()

# 성능 지표
mse = np.mean((test_predictions - y_test) ** 2)
mae = np.mean(np.abs(test_predictions - y_test))
corr = np.corrcoef(test_predictions, y_test)[0, 1]

print(f"\n{'='*60}")
print(f"OOS 테스트 성능")
print(f"{'='*60}")
print(f"MSE: {mse:.4f}")
print(f"MAE: {mae:.4f}%")
print(f"상관계수: {corr:.4f}")
print(f"{'='*60}\n")

# 예측 vs 실제 산점도
plt.figure(figsize=(10, 6))
plt.scatter(y_test, test_predictions, alpha=0.5, s=20)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', linewidth=2)
plt.xlabel('실제 수익률 (%)', fontsize=12)
plt.ylabel('예측 수익률 (%)', fontsize=12)
plt.title('예측 vs 실제 (OOS)', fontsize=14, fontweight='bold')
plt.grid(alpha=0.3)
plt.show()

## 9. 백테스트 준비: 예측값 저장

In [None]:
# 각 종목의 테스트 기간 예측값 저장
predictions_dict = {}
idx = 0

for ticker, df in feature_data.items():
    test_df = df[df.index >= test_start]
    
    if len(test_df) > 0:
        n = len(test_df)
        predictions_dict[ticker] = {
            'dates': test_df.index,
            'actual': test_df['target'].values,
            'predicted': test_predictions[idx:idx+n],
            'mom_12m': test_df['mom_12m'].values  # 기본 모멘텀
        }
        idx += n

print(f"{len(predictions_dict)}개 종목 예측 완료")

## 10. 전략 A: 기본 모멘텀 (12개월 수익률)

In [None]:
class MomentumOnlyStrategy(bt.Strategy):
    """
    기본 모멘텀 전략
    - 월말마다 12개월 수익률 상위 5개 선택
    """
    params = (
        ('rebalance_dates', None),
        ('top_n', 5),
    )
    
    def __init__(self):
        self.returns = {}
        for d in self.datas:
            self.returns[d._name] = (d.close / d.close(-252) - 1) * 100
        
        self.rebalance_dates = [pd.Timestamp(d) for d in self.params.rebalance_dates]
        self.next_rebalance_idx = 0
    
    def next(self):
        current_date = pd.Timestamp(self.datas[0].datetime.date(0))
        
        if self.next_rebalance_idx < len(self.rebalance_dates):
            if current_date >= self.rebalance_dates[self.next_rebalance_idx]:
                self.rebalance()
                self.next_rebalance_idx += 1
    
    def rebalance(self):
        ranking = {}
        for d in self.datas:
            try:
                ret = self.returns[d._name][0]
                if not np.isnan(ret):
                    ranking[d._name] = (d, ret)
            except:
                continue
        
        sorted_stocks = sorted(ranking.items(), key=lambda x: x[1][1], reverse=True)
        top_n = [item[1][0] for item in sorted_stocks[:self.params.top_n]]
        
        # 포지션 정리 및 재구성
        for d in self.datas:
            if self.getposition(d).size > 0 and d not in top_n:
                self.close(d)
        
        target_value = self.broker.getvalue() / self.params.top_n
        for d in top_n:
            target_shares = int(target_value / d.close[0])
            diff = target_shares - self.getposition(d).size
            if diff > 0:
                self.buy(data=d, size=diff)
            elif diff < 0:
                self.sell(data=d, size=-diff)

## 11. 전략 B: DL 오버레이 (예측 기반)

In [None]:
class DLOverlayStrategy(bt.Strategy):
    """
    DL 오버레이 전략
    - 월말마다 DL 예측 상위 5개 선택
    """
    params = (
        ('rebalance_dates', None),
        ('predictions_dict', None),
        ('top_n', 5),
    )
    
    def __init__(self):
        self.rebalance_dates = [pd.Timestamp(d) for d in self.params.rebalance_dates]
        self.next_rebalance_idx = 0
    
    def next(self):
        current_date = pd.Timestamp(self.datas[0].datetime.date(0))
        
        if self.next_rebalance_idx < len(self.rebalance_dates):
            if current_date >= self.rebalance_dates[self.next_rebalance_idx]:
                self.rebalance()
                self.next_rebalance_idx += 1
    
    def rebalance(self):
        current_date = pd.Timestamp(self.datas[0].datetime.date(0))
        
        ranking = {}
        for d in self.datas:
            ticker = d._name
            if ticker in self.params.predictions_dict:
                pred_data = self.params.predictions_dict[ticker]
                # 현재 날짜에 가장 가까운 예측값 찾기
                idx = np.argmin(np.abs((pred_data['dates'] - current_date).days))
                pred_return = pred_data['predicted'][idx]
                ranking[ticker] = (d, pred_return)
        
        sorted_stocks = sorted(ranking.items(), key=lambda x: x[1][1], reverse=True)
        top_n = [item[1][0] for item in sorted_stocks[:self.params.top_n]]
        
        # 포지션 정리 및 재구성
        for d in self.datas:
            if self.getposition(d).size > 0 and d not in top_n:
                self.close(d)
        
        target_value = self.broker.getvalue() / self.params.top_n
        for d in top_n:
            target_shares = int(target_value / d.close[0])
            diff = target_shares - self.getposition(d).size
            if diff > 0:
                self.buy(data=d, size=diff)
            elif diff < 0:
                self.sell(data=d, size=-diff)

## 12. 공정 비교 백테스트

In [None]:
# 테스트 기간 월말 날짜
test_dates = pd.date_range(start=test_start, end='2023-12-31', freq='M')

print(f"테스트 기간 리밸런싱: {len(test_dates)}회")

# 전략 A: 기본 모멘텀
cerebro_a = bt.Cerebro()
cerebro_a.broker.setcash(100_000_000)
cerebro_a.broker.setcommission(commission=0.001)

for ticker, data in all_data.items():
    data_bt = bt.feeds.PandasData(
        dataname=data,
        name=ticker,
        fromdate=datetime.strptime(test_start, '%Y-%m-%d'),
        todate=datetime(2023, 12, 31)
    )
    cerebro_a.adddata(data_bt)

cerebro_a.addstrategy(MomentumOnlyStrategy, rebalance_dates=test_dates, top_n=5)
cerebro_a.addanalyzer(bt.analyzers.Returns, _name='returns')
cerebro_a.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
cerebro_a.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')

# 전략 B: DL 오버레이
cerebro_b = bt.Cerebro()
cerebro_b.broker.setcash(100_000_000)
cerebro_b.broker.setcommission(commission=0.001)

for ticker, data in all_data.items():
    data_bt = bt.feeds.PandasData(
        dataname=data,
        name=ticker,
        fromdate=datetime.strptime(test_start, '%Y-%m-%d'),
        todate=datetime(2023, 12, 31)
    )
    cerebro_b.adddata(data_bt)

cerebro_b.addstrategy(DLOverlayStrategy, rebalance_dates=test_dates, predictions_dict=predictions_dict, top_n=5)
cerebro_b.addanalyzer(bt.analyzers.Returns, _name='returns')
cerebro_b.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
cerebro_b.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')

# 실행
print(f"\n{'='*60}")
print(f"백테스트 비교 시작 (OOS: {test_start} ~ 2023-12-31)")
print(f"{'='*60}\n")

results_a = cerebro_a.run()
strat_a = results_a[0]
end_value_a = cerebro_a.broker.getvalue()

results_b = cerebro_b.run()
strat_b = results_b[0]
end_value_b = cerebro_b.broker.getvalue()

# 결과 비교
print(f"\n{'='*70}")
print(f"전략 비교 (OOS 테스트)")
print(f"{'='*70}")
print(f"{'항목':<30} {'기본 모멘텀':>15} {'DL 오버레이':>15}")
print(f"{'─'*70}")

metrics = [
    ('최종 자금 (만원)', 'end_value'),
    ('수익률 (%)', 'return'),
    ('샤프 비율', 'sharpe'),
    ('최대 낙폭 (%)', 'mdd')
]

data_a = {
    'end_value': end_value_a / 10000,
    'return': (end_value_a - 100_000_000) / 100_000_000 * 100,
    'sharpe': strat_a.analyzers.sharpe.get_analysis().get('sharperatio', 0),
    'mdd': strat_a.analyzers.drawdown.get_analysis()['max']['drawdown']
}

data_b = {
    'end_value': end_value_b / 10000,
    'return': (end_value_b - 100_000_000) / 100_000_000 * 100,
    'sharpe': strat_b.analyzers.sharpe.get_analysis().get('sharperatio', 0),
    'mdd': strat_b.analyzers.drawdown.get_analysis()['max']['drawdown']
}

for name, key in metrics:
    if key == 'end_value':
        print(f"{name:<30} {data_a[key]:>14,.0f} {data_b[key]:>14,.0f}")
    elif key in ['return', 'mdd']:
        print(f"{name:<30} {data_a[key]:>14.2f}% {data_b[key]:>14.2f}%")
    else:
        print(f"{name:<30} {data_a[key]:>15.3f} {data_b[key]:>15.3f}")

print(f"{'='*70}\n")

## 13. 결과 해석

### DL이 더 좋으면?
- 모델이 추가 정보를 잘 활용함
- 피처 엔지니어링이 효과적
- 단, 과적합 가능성 주의

### 기본 모멘텀이 더 좋으면?
- 단순한 신호도 충분히 효과적
- DL 모델이 아직 부족
- 더 많은 데이터/피처 필요

### 비슷하면?
- DL의 추가 복잡도가 불필요할 수 있음
- 수수료/비용 고려 시 단순한 게 유리

## 정리

**배운 내용**:
- 간단한 신경망으로 다음 달 수익률 예측
- 기본 모멘텀과 DL 오버레이의 **공정 비교**
- OOS 테스트로 과적합 방지
- 실전에서는 더 복잡한 모델과 피처 필요

**주의사항**:
- 2개월 OOS는 짧음 (워크-포워드 권장)
- 모델 복잡도와 과적합 트레이드오프
- 거래비용 고려 필수

**핵심 메시지**:
- DL은 만능이 아님
- 단순한 신호도 효과적일 수 있음
- 항상 OOS로 검증!