In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import yfinance as yf
import pickle

# 🔹 주식 데이터 가져오기
tickers = [
    "AAPL",  # 애플
    "MSFT",  # 마이크로소프트
    "GOOGL",  # 구글(알파벳)
]


# # 🔹 주식 데이터 가져오기
# tickers = [
#     "AAPL",  # 애플
#     "MSFT",  # 마이크로소프트
#     "GOOGL",  # 구글(알파벳)
#     "TSLA",  # 테슬라
#     "AMZN",  # 아마존
#     "META",  # 메타 (구 페이스북)
#     "NVDA",  # 엔비디아
#     "NFLX",  # 넷플릭스
#     "AMD",  # AMD
#     "INTC",  # 인텔
#     "PYPL",  # 페이팔
#     "DIS",  # 디즈니 (엔터테인먼트)
#     "PEP",  # 펩시코 (소비재)
#     "KO",  # 코카콜라 (소비재)
#     "XOM",  # 엑슨모빌 (에너지)
#     "PFE"   # 화이자 (제약)
# ]


df_list = []

for ticker in tickers:
    stock = yf.Ticker(ticker)
    temp = stock.history(period="max")[["Open", "High", "Low", "Close", "Volume"]]
    temp["Ticker"] = ticker
    df_list.append(temp)

# 🔹 데이터 병합
df = pd.concat(df_list)

# 🔹 로그 변환 적용 (✅ MinMax 정규화 제거)
df["Close"] = np.log1p(df["Close"])  # 로그 변환만 적용

# 🔹 이동 평균 및 지표 추가
df["MA_10"] = df.groupby("Ticker")["Close"].transform(lambda x: x.rolling(window=10).mean())
df["MA_20"] = df.groupby("Ticker")["Close"].transform(lambda x: x.rolling(window=20).mean())
df["STD_10"] = df.groupby("Ticker")["Close"].transform(lambda x: x.rolling(window=10).std())
df["STD_20"] = df.groupby("Ticker")["Close"].transform(lambda x: x.rolling(window=20).std())

# 🔹 RSI 지표 추가
def compute_rsi(data, window=14):
    delta = data.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

df["RSI_14"] = df.groupby("Ticker")["Close"].transform(lambda x: compute_rsi(x))
df["MACD"] = df.groupby("Ticker")["Close"].transform(lambda x: x.ewm(span=12, adjust=False).mean() - x.ewm(span=26, adjust=False).mean())
df["MACD_Signal"] = df.groupby("Ticker")["MACD"].transform(lambda x: x.ewm(span=9, adjust=False).mean())

# 🔹 거래량 이동 평균 및 로그 변환
df["Volume_MA_10"] = df.groupby("Ticker")["Volume"].transform(lambda x: x.rolling(window=10).mean())
df["Volume_MA_20"] = df.groupby("Ticker")["Volume"].transform(lambda x: x.rolling(window=20).mean())
df["Log_Volume"] = np.log1p(df["Volume"])

# 🔹 NaN 값 제거
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df.dropna(inplace=True)

# ✅ 필요하지 않은 컬럼 삭제 (Ticker 삭제)
df.drop(columns=["Open", "High", "Low", "Volume", "Ticker"], errors='ignore', inplace=True)

df_standardized = df.copy()
df_standardized[df.columns.difference(["Close"])] =  np.log1p(df.drop(columns=["Close"]))  # ✅ Close 제외 후 정규화

df_standardized["Close"] = df["Close"]  # ✅ Close는 정규화하지 않고 로그 변환된 값 그대로 사용
df_standardized1 = df_standardized.copy()


# 🔹 50일 단위 시퀀스 데이터 생성
sequence_length = 100
output_length = 100
X = []
y = []

for i in range(len(df_standardized) - sequence_length - output_length):
    X.append(df_standardized.iloc[i : i + sequence_length].values)  
    y.append(df_standardized.iloc[i + sequence_length : i + sequence_length + output_length]["Close"].values)  

X = np.array(X)
y = np.array(y)

# 🔹 데이터 분할
split_idx = int(len(X) * 0.8)
X_train, X_val = X[:split_idx], X[split_idx:]
y_train, y_val = y[:split_idx], y[split_idx:]

# 🔹 PyTorch 텐서 변환
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(device)
y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(device)


In [None]:
def learn_model(model_class, patience=100) : 
    # ✅ GPU 메모리 정리 (불필요한 캐시 해제)
    torch.cuda.empty_cache()

    # 🔹 모델 초기화
    model = model_class.to(device)
    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.001)
    
    # ✅ 학습 루프 조기 종료 기준
    best_val_rmse = float("inf")
    trigger_count = 0

    # ✅ 평균 주가 계산 (로그 변환 전 복원)
    avg_stock_price_exp = np.expm1(df["Close"]).mean()  # 로그 변환 복원한 평균 주가

    # 🔹 학습 루프
    for epoch in range(500):
        optimizer.zero_grad()
        model.train()
        predictions = model(X_train_tensor)
        loss = criterion(predictions, y_train_tensor)
        rmse = torch.sqrt(loss.mean())
        mape = torch.mean(torch.abs((predictions - y_train_tensor) / y_train_tensor)) * 100
        loss.backward()
        optimizer.step()
        
        model.eval()
        with torch.no_grad():
            val_predictions = model(X_val_tensor)
            val_loss = criterion(val_predictions, y_val_tensor)
            val_rmse = torch.sqrt(val_loss.mean())
            val_mape = torch.mean(torch.abs((val_predictions - y_val_tensor) / y_val_tensor)) * 100

        # ✅ RMSE 로그 변환 복원 후 퍼센트 계산
        rmse_exp = np.expm1(val_rmse.item())  # RMSE 값을 로그 변환 이전 값으로 복원
        rmse_percentage = (rmse_exp / avg_stock_price_exp) * 100  # 퍼센트 계산

        if val_rmse.item() < best_val_rmse:
            best_val_rmse = val_rmse.item()
            trigger_count = 0
            torch.save(model.state_dict(), "best_stock_predictor.pth")  
        else:
            trigger_count += 1
        
        if trigger_count >= patience:
            print(f"Early stopping at epoch {epoch+1} due to no improvement in Val RMSE")
            break
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch+1}/500], RMSE: {rmse.item():.4f}, MAPE: {mape.item():.2f}%, "
                f"Val RMSE: {val_rmse.item():.4f} (Converted: {rmse_exp:.2f}), "
                f"Val RMSE %: {rmse_percentage:.2f}%, Val MAPE: {val_mape.item():.2f}%")
            
    del model
    del optimizer
    torch.cuda.empty_cache()
    torch.cuda.ipc_collect()

## 순정 Transformer 모델

In [None]:
# ✅ 학습 루프 조기 종료 기준
best_val_rmse = float("inf")
patience = 100
trigger_count = 0

# 🔹 Transformer 모델 정의
class GPTStockPredictor(nn.Module):
    def __init__(self, input_dim=11, embed_dim=32, num_heads=8, ff_dim=128, num_layers=1, output_days=100):
        super(GPTStockPredictor, self).__init__()
        self.embedding = nn.Linear(input_dim, embed_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=ff_dim)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(embed_dim, output_days)

    def forward(self, x):
        x = self.embedding(x)
        x = x.permute(1, 0, 2)
        x = self.transformer_encoder(x)
        x = x.mean(dim=0)
        output = self.fc(x)
        return output

# 🔹 모델 초기화
model_class = GPTStockPredictor(output_days=100).to(device)
learn_model(model_class, patience=100)



## CNN 레이어 추가 transformer 모델

In [None]:
class CNN_GPTStockPredictor(nn.Module):
    def __init__(self, input_dim=11, embed_dim=16, num_heads=8, ff_dim=64, num_layers=1, output_days=50):
        super(CNN_GPTStockPredictor, self).__init__()

        # ✅ 1D CNN Feature Extractor
        self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=embed_dim, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=input_dim, out_channels=embed_dim, kernel_size=5, padding=2)
        self.conv3 = nn.Conv1d(in_channels=input_dim, out_channels=embed_dim, kernel_size=7, padding=3)
        self.conv_merge = nn.Linear(embed_dim * 3, embed_dim)
        # self.conv_merge = nn.Linear(embed_dim , embed_dim)

        # ✅ Transformer Encoder
        self.embedding = nn.Linear(embed_dim, embed_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=ff_dim)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # ✅ Fully Connected Layer
        self.fc = nn.Linear(embed_dim, output_days)

    def forward(self, x):
        # 1D CNN Feature Extraction
        x = x.permute(0, 2, 1)  # (batch_size, input_dim, sequence_length)
        x1 = self.conv1(x)
        x2 = self.conv2(x)
        x3 = self.conv3(x)
        x = torch.cat([x1, x2, x3], dim=1)  # (batch_size, embed_dim * 3, sequence_length)
        # x = torch.cat([x1], dim=1)  # (batch_size, embed_dim * 3, sequence_length)
        x = x.permute(0, 2, 1)  # (batch_size, sequence_length, embed_dim * 3)
        x = self.conv_merge(x)  # (batch_size, sequence_length, embed_dim)

        # Transformer Encoder
        x = self.embedding(x)
        x = x.permute(1, 0, 2)  # (sequence_length, batch_size, embed_dim)
        x = self.transformer_encoder(x)
        x = x.mean(dim=0)

        # Output Layer
        output = self.fc(x)
        return output


In [None]:

# 🔹 모델 초기화
model_class = CNN_GPTStockPredictor(output_days=100).to(device)
learn_model(model_class, patience=100)




Epoch [10/500], RMSE: 2.7394, MAPE: 154.76%, Val RMSE: 3.4668 (Converted: 31.03), Val RMSE %: 74.11%, Val MAPE: 93.44%
Epoch [20/500], RMSE: 2.6170, MAPE: 149.60%, Val RMSE: 3.3076 (Converted: 26.32), Val RMSE %: 62.85%, Val MAPE: 88.27%


## RelativePOsitional 응용 모델

In [None]:
from torch.nn import functional as F

class RelativePositionalEncoding(nn.Module):
    def __init__(self, embed_dim, max_len=5000):
        super(RelativePositionalEncoding, self).__init__()
        self.embed_dim = embed_dim
        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-np.log(10000.0) / embed_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(1)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]


# 성능확인

In [None]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import pickle
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error

# 🔹 디바이스 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 🔹 Transformer 모델 클래스 정의 (학습한 모델과 동일한 구조)
class GPTStockPredictor(nn.Module):
    def __init__(self, input_dim=11, embed_dim=32, num_heads=8, ff_dim=128, num_layers=1, output_days=50):
        super(GPTStockPredictor, self).__init__()
        self.embedding = nn.Linear(input_dim, embed_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=ff_dim)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(embed_dim, output_days)

    def forward(self, x):
        x = self.embedding(x)
        x = x.permute(1, 0, 2)
        x = self.transformer_encoder(x)
        x = x.mean(dim=0)
        output = self.fc(x)
        return output

# 🔹 저장된 모델 불러오기
model = GPTStockPredictor(output_days=100).to(device)
model.load_state_dict(torch.load("best_stock_predictor.pth"))
model.eval()

# 🔹 AAPL 주식 데이터 불러오기
new_ticker = "Googl"
stock = yf.Ticker(new_ticker)
temp = stock.history(period="max")[["Open", "High", "Low", "Close", "Volume"]]

# 🔹 로그 변환 적용 (✅ MinMax 정규화 제거)
df = temp.copy()
df["Close"] = np.log1p(df["Close"])  # Close 값 로그 변환

# 🔹 이동 평균 및 지표 추가
df["MA_10"] = df["Close"].rolling(window=10).mean()
df["MA_20"] = df["Close"].rolling(window=20).mean()
df["STD_10"] = df["Close"].rolling(window=10).std()
df["STD_20"] = df["Close"].rolling(window=20).std()

# 🔹 RSI 계산
def compute_rsi(data, window=14):
    delta = data.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

df["RSI_14"] = compute_rsi(df["Close"], window=14)
df["MACD"] = df["Close"].ewm(span=12, adjust=False).mean() - df["Close"].ewm(span=26, adjust=False).mean()
df["MACD_Signal"] = df["MACD"].ewm(span=9, adjust=False).mean()

# 🔹 거래량 이동 평균 및 로그 변환
df["Volume_MA_10"] = df["Volume"].rolling(window=10).mean()
df["Volume_MA_20"] = df["Volume"].rolling(window=20).mean()
df["Log_Volume"] = np.log1p(df["Volume"])

# 🔹 NaN 값 제거
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df.dropna(inplace=True)



# 🔹 저장된 정규화 객체 불러오기
with open("scaler.pkl", "rb") as f:
    scaler = pickle.load(f)
# ✅ 학습 시 사용한 컬럼 확인
train_columns = ['Close','MA_10', 'MA_20', 'STD_10', 'STD_20', 'RSI_14', 'MACD', 'MACD_Signal', 'Volume_MA_10', 'Volume_MA_20', 'Log_Volume']
print("훈련 데이터에 사용된 컬럼:", train_columns)

# ✅ 테스트 데이터에서 학습 때 사용한 컬럼만 유지
df_close = df[["Close"]].copy()  # Close 값 따로 저장 (로그 변환된 상태)
df = df[train_columns]  # 훈련에 사용된 컬럼만 선택
print("테스트 데이터 컬럼 확인:", df.columns.tolist())  # 확인용 출력



# 🔹 저장된 정규화 객체 적용 (✅ Close 값은 정규화 안 함)
df_standardized = pd.DataFrame(np.log1p(df), index=df.index, columns=df.columns)

df_standardized[df.columns.difference(["Close"])] =  np.log1p(df.drop(columns=["Close"]))  # ✅ Close 제외 후 정규화
df_standardized["Close"] = df_close["Close"]  # Close는 정규화하지 않고 로그 변환된 값 유지

# 🔹 최근 50일 데이터를 입력 데이터로 사용
sequence_length = 100
X_test = df_standardized.iloc[-sequence_length*2-1:-(sequence_length + 1)].values  # 최근 50일 데이터
X_test = np.expand_dims(X_test, axis=0)  # 배치 차원 추가
X_test_tensor_2 = torch.tensor(X_test, dtype=torch.float32).to(device)

# 🔹 예측 수행
with torch.no_grad():
    predicted = model(X_test_tensor_2).cpu().numpy().flatten()

# ✅ 예측된 값 복원 (로그 변환 역변환)
predicted_prices = np.expm1(predicted)  # ✅ np.expm1() 사용하여 원래 가격으로 변환

# ✅ 실제 데이터는 원본 그대로 사용
actual_prices = temp["Close"].iloc[-(sequence_length+1):-1].values

# ✅ RMSE 계산
rmse = np.sqrt(mean_squared_error(actual_prices, predicted_prices))

# ✅ MAPE 계산
mape = mean_absolute_percentage_error(actual_prices, predicted_prices) * 100  # 퍼센트로 변환

print(f"Test RMSE: {rmse:.4f}")
print(f"Test MAPE: {mape:.2f}%")

# ✅ 날짜 설정
dates = temp.index[-101:-1]

# 🔹 그래프 출력
plt.figure(figsize=(12, 6))
plt.plot(dates, actual_prices, label="Actual Prices", marker="o", linestyle="dashed", color="blue")
plt.plot(dates, predicted_prices, label="Predicted Prices", marker="s", linestyle="solid", color="red")
plt.xlabel("Date")
plt.ylabel("Stock Price")
plt.title(f"{new_ticker} Stock Price Prediction vs Actual")
plt.legend()
plt.xticks(rotation=45)
plt.grid()
plt.show()
