In [None]:
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler

# 테슬라 주가 데이터 다운로드 (예: 최근 5년간의 일일 주가)
ticker = "TSLA"
data = yf.download(ticker, period="5y", interval="1d")
data = data[['Close']]  # 종가만 사용
print(data.head())

# 데이터 정규화 (0과 1 사이로 변환)
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)

# 시퀀스 데이터 준비 함수
def create_sequences(dataset, time_step=60):
    X, y = [], []
    for i in range(len(dataset) - time_step - 1):
        X.append(dataset[i:(i + time_step), 0])
        y.append(dataset[i + time_step, 0])
    return np.array(X), np.array(y)

time_step = 60
X, y = create_sequences(scaled_data, time_step)

# PyTorch 텐서로 변환 및 차원 재조정
X = torch.from_numpy(X).float().unsqueeze(-1)  # shape: (samples, timesteps, features)
y = torch.from_numpy(y).float().unsqueeze(-1)  # shape: (samples, 1)

dataset = TensorDataset(X, y)

train_size = int(len(dataset) * 0.8)
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=True)

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM 레이어 정의
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        # 출력층 정의
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # 초기 은닉 상태 및 셀 상태 정의 (batch_size, hidden_size)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # LSTM 순전파: out shape -> (batch, seq_length, hidden_size)
        out, _ = self.lstm(x, (h0, c0))
        
        # 마지막 타임스텝의 출력을 사용하여 예측
        out = self.fc(out[:, -1, :])
        return out

# 하이퍼파라미터 설정
input_size = 1
hidden_size = 50
num_layers = 2
output_size = 1

# 모델 초기화
model = LSTMModel(input_size, hidden_size, num_layers, output_size)

# 장치 설정 (GPU 사용 가능 시 GPU 사용)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        
        train_losses.append(loss.item())
    
    # 에포크별 학습 손실 출력
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {np.mean(train_losses):.6f}")

model.eval()
predictions = []
actuals = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        outputs = model(batch_X)
        predictions.append(outputs.cpu().numpy())
        actuals.append(batch_y.numpy())

predictions = np.concatenate(predictions, axis=0)
actuals = np.concatenate(actuals, axis=0)

# 정규화 해제 (원래 값 복원)
predictions = scaler.inverse_transform(predictions)
actuals = scaler.inverse_transform(actuals)

# 실제 날짜 인덱스 가져오기 (테스트 데이터 부분)
test_data = data.iloc[-len(actuals):]  # 실제 test 기간에 해당하는 데이터

# 시각화
plt.figure(figsize=(12,6))
plt.plot(test_data.index, actuals, label='Test Actual')
plt.plot(test_data.index, predictions, label='Test Prediction')
plt.xlabel('Date')
plt.ylabel('Tesla Close Price')
plt.legend()
plt.show()