In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.utils.data as data

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error

# -----------------------
# Parametry
# -----------------------
CSV_PATH = "lstm/data/AAPL.csv"
TARGET_COL = "Close"
SEQ_LEN = 60
TEST_RATIO = 0.2
EPOCHS = 2000
BATCH_SIZE = 32
LEARNING_RATE = 1e-3
FORECAST_DAYS = 10
MODEL_OUT = "aapl_lstm_pytorch.pt"
PATIENCE = 10  # early stopping patience (epoki bez poprawy)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(42)
np.random.seed(42)

def extract_dataset(df, test_size = 0.2, history_length = SEQ_LEN):


    train_size = int(len(df) * (1 - test_size))
    test_size = len(df) - test_size

    train = df[:train_size].reset_index(drop=True)
    test =  df[train_size:].reset_index(drop=True)

    def portion_data(stream, history_length):
        X, y = [], []
        for i in range(0, len(stream) - history_length, 1):
            X.append(stream.values[i:i+history_length])
            y.append(stream.values[i+history_length])
        return torch.tensor(X), torch.tensor(y)

    train_x, train_y = portion_data(train, history_length)
    test_x, test_y = portion_data(test, history_length)

    return train_x, train_y, test_x, test_y

if not os.path.exists(CSV_PATH):
    raise FileNotFoundError(f"Nie znaleziono pliku: {CSV_PATH}")

df = pd.read_csv(CSV_PATH)[TARGET_COL].astype('float32')

X_train, y_train, X_test, y_test = extract_dataset(df)

loader = data.DataLoader(data.TensorDataset(X_train, y_train), shuffle=True, batch_size=8)



In [None]:
# -----------------------
# Model LSTM (PyTorch)
# -----------------------
class LSTMRegressor(nn.Module):
    def __init__(self, input_size=1, hidden_size1=64):
        super().__init__()
        # batch_first=True -> wej≈õcie: (batch, seq_len, input_size)
        self.lstm1 = nn.LSTM(input_size=input_size, hidden_size=hidden_size1,
                             batch_first=True)
        self.fc = nn.Linear(hidden_size1, 1)

    def forward(self, x):
        out, _ = self.lstm1(x)
        y = self.fc(out)
        return y
    
# -----------------------
# 3) Budowa i trening modelu
# -----------------------
model = LSTMRegressor(input_size=60, hidden_size1=64).to(DEVICE)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())


for epoch in range(1, EPOCHS + 1):
    # --- trening ---
    model.train()
    for xb, yb in loader:
        xb = xb.to(DEVICE)  # (batch, seq_len, 1)
        yb = yb.to(DEVICE)  # (batch, 1)

        optimizer.zero_grad()
        preds = model(xb)                    # (batch, 1)
        loss = criterion(preds, yb)
        loss.backward()
        optimizer.step()

    if epoch % 5 == 0:
        model.eval()
        with torch.no_grad():
            X_train = X_train.to(DEVICE)
            X_test = X_test.to(DEVICE)
            y_train = y_train.to(DEVICE)
            y_test = y_test.to(DEVICE)

            y_pred = model(X_train)
            train_rmse = np.sqrt(criterion(y_pred, y_train).cpu())
            y_pred = model(X_test)
            test_rmse = np.sqrt(criterion(y_pred, y_test).cpu())
            print("Epoch %d: train RMSE %.4f, test RMSE %.4f" % (epoch, train_rmse, test_rmse))
