<a href="https://colab.research.google.com/github/LeonardSchickedanz/StockPredictionModel/blob/master/StockPredictionModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Welcome to the Stock Price Prediction Model Repository!**

This Google Colab repository is designed to help you predict stock prices using a combination of time series data, quarterly financial reports, and macroeconomic indicators. The project utilizes a **Long Short-Term Memory (LSTM)** neural network to forecast the stock price of a single stock with improved accuracy.

## **What You Can Do Here**

### **Optimize Hyperparameters**
Use **Optuna** to automatically find the best hyperparameters for the LSTM model, ensuring optimal performance.

### **Train the Model**
Train the LSTM model using **Apple's stock data (AAPL)**. The model can incorporate various features, including historical stock prices, quarterly financial reports, and macroeconomic indicators.

### **Test the Model**
Evaluate the trained model on **Netflix's stock data (NFLX)** to assess how well it generalizes to unseen data.

## **Data Sources**
The data used in this project comes from two sources:
- **Yahoo Finance (`yfinance`)** for historical stock price data.
- **Alpha Vantage (`alpha_vantage`)** for fundamental company data through quarterly reports and macroeconomic indicators of the USA.
  This data is retrieved using this [repository](https://github.com/RomelTorres/alpha_vantage) by *RomelTorres*.

## **Getting Started**
Simply follow the instructions above the code cells in order until you reach the **Model Usage Guide** at the end, where you can interact with the trained model and apply it to your own predictions.


Install necessary packages and make imports.


In [1]:
!pip install numpy alpha_vantage pandas requests yfinance scikit-learn scipy optuna torch

import numpy as np
import pandas as pd
import os
import requests
import yfinance as yf
from google.colab import drive
from alpha_vantage.fundamentaldata import FundamentalData
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from scipy.stats import spearmanr, pearsonr
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
import json
from sklearn.linear_model import LinearRegression
import plotly.graph_objects as go
from datetime import datetime
import math
import time

Collecting alpha_vantage
  Downloading alpha_vantage-3.0.0-py3-none-any.whl.metadata (12 kB)
Collecting optuna
  Downloading optuna-4.2.1-py3-none-any.whl.metadata (17 kB)
Collecting alembic>=1.5.0 (from optuna)
  Downloading alembic-1.14.1-py3-none-any.whl.metadata (7.4 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nv

### **Set Up Your Environment**
Ensure you have a GPU enabled in your Google Colab runtime for faster computation. You can do this by going to **Runtime -> Change runtime type -> Hardware accelerator** and selecting **GPU**.

In [2]:
if torch.cuda.is_available():
    print('GPU is available!')
    print('GPU Device:', torch.cuda.get_device_name(0))
else:
    print('No GPU available, using CPU instead.')


GPU is available!
GPU Device: Tesla T4




### **Using AlphaVantage Data**
If you want to fetch the data yourself, you need to obtain an API key from [AlphaVantage](https://www.alphavantage.co/support/#api-key) and assign it to `API_KEY=`. The retrieved data will be stored in the temporary files of this Google Colab session.

In this code cell, the data is fetched, cleaned, and then stored in the `data` folder. If you skip this step, the next code cell will automatically download the data from GitHub instead.



In [None]:
API_KEY = ''

In [None]:
PROJECT_URL = '/content/PredictStockPrice'

directory_economic_indicators_raw = f'{PROJECT_URL}/data/economic_indicators/raw'
directory_economic_indicators_processed = f'{PROJECT_URL}/data/economic_indicators/processed'
directory_tickers = f'{PROJECT_URL}/data/tickers'

for directory in [directory_economic_indicators_raw, directory_economic_indicators_processed, f'{directory_tickers}/AAPL', f'{directory_tickers}/NFLX']:
        os.makedirs(directory, exist_ok=True)

def api_raw_data_to_excel(ticker):
    api_d_fundamental_data = FundamentalData(key=API_KEY, output_format='pandas')
    d_quarterly_income, _ = api_d_fundamental_data.get_income_statement_quarterly(symbol=ticker)

    d_time_series = yf.download(ticker, start="2000-01-01", end="2024-12-17", interval="1d", auto_adjust=True)

    d_time_series.index = d_time_series.index.tz_localize(None) # remove timezone

    d_quarterly_income.to_excel(f'{directory_tickers}/{ticker}/d_quarterly_income_raw.xlsx', index=True)
    d_time_series.to_excel(f'{directory_tickers}/{ticker}/d_timeseries_raw.xlsx', index=True)

economic_indicators = (
    'd_real_gdp',
    'd_real_gdp_per_capita',
    'd_federal_funds_rate',
    'd_cpi',
    'd_inflation',
    'd_retail_sales',
    'd_durables',
    'd_unemployment',
    'd_nonfarm_payroll'
)

def economic_indicators_to_excel():
    main_url = 'https://www.alphavantage.co/query?function='
    api_key_url = f'&apikey={API_KEY}'

    url_list = (
        'REAL_GDP&interval=annual',
        'REAL_GDP_PER_CAPITA',
        'FEDERAL_FUNDS_RATE&interval=monthly',
        'CPI&interval=monthly',
        'INFLATION',
        'RETAIL_SALES',
        'DURABLES',
        'UNEMPLOYMENT',
        'NONFARM_PAYROLL'
    )

    dataframes = []
    for idx1, u in enumerate(url_list):
        try:
            url = f'{main_url}{u}{api_key_url}'
            r = requests.get(url)
            data = r.json()
        except requests.exceptions.RequestException as e:
            print(f"Error fetching data for {u}: {e}")
            return

        if 'Information' in data and 'rate limit' in data['Information'].lower():
            print("OUT OF API REQUESTS")
            return
        else:
            print("API request successful")

        dates = []
        values = []

        for idx2 in range(len(data['data'])):
            dates.append(data['data'][idx2]['date'])
            values.append(data['data'][idx2]['value'])

        assert len(dates) == len(values)

        df = pd.DataFrame({
            "date": dates,
            "value": values
        })
        dataframes.append(df)

        df.to_excel(f'{directory_economic_indicators_raw}/{economic_indicators[idx1]}_raw.xlsx', index=True)

def stretch_data(data, min_date, max_date, date_column='date'):
    data.set_index(date_column, inplace=True)
    data = data.resample('D').ffill()

    date_range = pd.date_range(start=min_date, end=max_date, freq='D')
    data = data.reindex(date_range)
    data = data.reset_index(names=['date'])

    data = data.iloc[::-1].reset_index(drop=True)
    return data

def date_to_unix_time_stamp(data, date_column='date'):
    data[date_column] = pd.to_datetime(data[date_column])
    data[date_column] = (data[date_column] - pd.Timestamp('1970-01-01')).dt.total_seconds()
    return data

def data_to_excel_main(ticker, call_economic_indicators_to_excel=False):
    if call_economic_indicators_to_excel:
        economic_indicators_to_excel()
    api_raw_data_to_excel(ticker)

    data_dict = {
        "real_gdp": pd.read_excel(f'{directory_economic_indicators_raw}/d_real_gdp_raw.xlsx', index_col=0),
        "real_gdp_per_capita": pd.read_excel(f'{directory_economic_indicators_raw}/d_real_gdp_per_capita_raw.xlsx', index_col=0),
        "federal_funds_rate": pd.read_excel(f'{directory_economic_indicators_raw}/d_federal_funds_rate_raw.xlsx', index_col=0),
        "cpi": pd.read_excel(f'{directory_economic_indicators_raw}/d_cpi_raw.xlsx', index_col=0),
        "inflation": pd.read_excel(f'{directory_economic_indicators_raw}/d_inflation_raw.xlsx', index_col=0),
        "retail_sales": pd.read_excel(f'{directory_economic_indicators_raw}/d_retail_sales_raw.xlsx', index_col=0),
        "durables": pd.read_excel(f'{directory_economic_indicators_raw}/d_durables_raw.xlsx', index_col=0),
        "unemployment": pd.read_excel(f'{directory_economic_indicators_raw}/d_unemployment_raw.xlsx', index_col=0),
        "nonfarm_payroll": pd.read_excel(f'{directory_economic_indicators_raw}/d_nonfarm_payroll_raw.xlsx', index_col=0),
        "quarterly_income": pd.read_excel(f'{directory_tickers}/{ticker}/d_quarterly_income_raw.xlsx', index_col=0),
        "time_series": pd.read_excel(f'{directory_tickers}/{ticker}/d_timeseries_raw.xlsx', index_col=0)
    }

    # CLEAN DATA
    d_quarterly_income = data_dict["quarterly_income"]
    d_time_series = data_dict["time_series"]

    d_time_series = d_time_series.iloc[2:].reset_index(drop=False)
    d_time_series.columns = ['date', 'close', 'high', 'low', 'open', 'volume']
    d_time_series = d_time_series[::-1].reset_index(drop=True)

    d_quarterly_income.drop(columns=['depreciation', 'reportedCurrency'], errors='ignore', inplace=True)
    d_quarterly_income.replace("None", np.nan, inplace=True)
    d_quarterly_income.fillna(0, inplace=True)
    d_quarterly_income.rename(columns={'fiscalDateEnding': 'date'}, inplace=True)

    data_dict["quarterly_income"] = d_quarterly_income
    data_dict["time_series"] = d_time_series

    for key, df in data_dict.items():
        if key not in ["quarterly_income", "time_series"]:
            new_column_name = key.lstrip('d_')
            df.rename(columns={'value': new_column_name}, inplace=True)

    for key, df in data_dict.items():
        df['date'] = pd.to_datetime(df['date'])

    max_d = min(df['date'].max() for df in data_dict.values())
    min_d = max(df['date'].min() for df in data_dict.values())

    for key, df in data_dict.items():
        data_dict[key] = stretch_data(data=df, min_date=min_d, max_date=max_d)
        data_dict[key] = date_to_unix_time_stamp(data_dict[key])

    for key, df in data_dict.items():
        df.to_excel(f'{directory_economic_indicators_processed}/d_{key}.xlsx')

    data_dict["time_series"].to_excel(f'{directory_tickers}/{ticker}/d_timeseries.xlsx')
    data_dict["quarterly_income"].to_excel(f'{directory_tickers}/{ticker}/d_quarterly_income.xlsx')

if API_KEY != '':
    data_to_excel_main('AAPL', True)
    data_to_excel_main('NFLX', False)
    print("All folders and files have been successfully loaded!")


### **Downloading Model Files from GitHub**
Run the next cell to download pre-trained model files and hyperparameters from GitHub. This works whether or not you fetched the data manually in the previous step. If you skipped the data retrieval step, the GitHub version of the data will be used automatically, ensuring that the necessary files are available for training and testing.



In [3]:
GITHUB_USER = "LeonardSchickedanz"
GITHUB_REPO = "StockPredictionModel"
BRANCH = "master"

PROJECT_URL = "/content/PredictStockPrice"
os.makedirs(PROJECT_URL, exist_ok=True)

FOLDERS = ["model"]

GITHUB_API_URL = f"https://api.github.com/repos/{GITHUB_USER}/{GITHUB_REPO}/contents"

def download_github_folder(folder_name, local_folder):
    folder_url = f"{GITHUB_API_URL}/{folder_name}?ref={BRANCH}"
    response = requests.get(folder_url)

    if response.status_code == 200:
        files = response.json()
        os.makedirs(local_folder, exist_ok=True)

        for file in files:
            file_path = os.path.join(local_folder, file["name"])

            if file["type"] == "file":
                file_url = file["download_url"]
                os.system(f"wget -q -O {file_path} {file_url}")
            elif file["type"] == "dir":
                download_github_folder(f"{folder_name}/{file['name']}", file_path)
    else:
        print(f"error fetching {folder_name}: {response.status_code}")

if not os.path.exists(os.path.join(PROJECT_URL, "data")):
    FOLDERS.append("data")

for folder in FOLDERS:
    download_github_folder(folder, os.path.join(PROJECT_URL, folder))

print("All folders and files have been successfully loaded!")

All folders and files have been successfully loaded!


### **Loading and Processing Data**
In this step, previously stored data is loaded from Excel files, merged, and converted into a tensor for model training and evaluation.

- **Time series data** and additional economic indicators are loaded from the `data` folder.
- **Macroeconomic indicators** such as GDP, inflation, unemployment, and retail sales are included if enabled.
- **Quarterly income data** is loaded for the specified stock ticker if selected.

Furthermore the methods for plotting the results are defined.



In [4]:
# t_ = tensor
# d_ = raw data
# f_ = features

directory_economic_indicators_raw = f'{PROJECT_URL}/data/economic_indicators/raw'
directory_economic_indicators_processed = f'{PROJECT_URL}/data/economic_indicators/processed'
directory_tickers = f'{PROJECT_URL}/data/tickers'

def merge_dataframes(dataframes):
    d_merged = pd.concat(dataframes, axis=1)
    d_merged = d_merged.loc[:, ~d_merged.columns.duplicated()]
    if 'value' in d_merged.columns:
        d_merged = d_merged.drop(columns='value')

    return d_merged

def data_main(ticker, with_quarterly_income=True, with_econ=True, call_data_to_excel_main=False):
    data_dict = {
        "time_series": pd.read_excel(f'{directory_tickers}/{ticker}/d_timeseries.xlsx', index_col=0),
        "econ_real_gdp": pd.read_excel(f'{directory_economic_indicators_processed}/d_real_gdp.xlsx', index_col=0),
        "econ_real_gdp_per_capita": pd.read_excel(f'{directory_economic_indicators_processed}/d_real_gdp_per_capita.xlsx', index_col=0),
        "econ_federal_funds_rate": pd.read_excel(f'{directory_economic_indicators_processed}/d_federal_funds_rate.xlsx', index_col=0),
        "econ_cpi": pd.read_excel(f'{directory_economic_indicators_processed}/d_cpi.xlsx', index_col=0),
        "econ_inflation": pd.read_excel(f'{directory_economic_indicators_processed}/d_inflation.xlsx', index_col=0),
        "econ_retail_sales": pd.read_excel(f'{directory_economic_indicators_processed}/d_retail_sales.xlsx', index_col=0),
        "econ_durables": pd.read_excel(f'{directory_economic_indicators_processed}/d_durables.xlsx', index_col=0),
        "econ_unemployment": pd.read_excel(f'{directory_economic_indicators_processed}/d_unemployment.xlsx', index_col=0),
        "econ_nonfarm_payroll": pd.read_excel(f'{directory_economic_indicators_processed}/d_nonfarm_payroll.xlsx', index_col=0),
        "quarterly_income": pd.read_excel(f'{directory_tickers}/{ticker}/d_quarterly_income.xlsx', index_col=0)
    }

    if call_data_to_excel_main:
        data_to_excel_main(ticker)

    dataframes = [data_dict["time_series"]]

    if with_econ:
        dataframes.extend(value for key, value in data_dict.items() if key.startswith("econ_"))
    if with_quarterly_income and "quarterly_income" in data_dict:
        dataframes.append(data_dict["quarterly_income"])

    d_combined = merge_dataframes(dataframes)

    d_combined.to_excel(f'{PROJECT_URL}/data/d_combined.xlsx') # for debugging
    t_combined = torch.tensor(d_combined.values).float()
    return t_combined

def plot(losses, test_losses, prediction, y_test, ticker, t_combined, look_back_days, forecast_horizon, alpha=0.2, with_baselines=True):
    plot_losses(losses, test_losses)

    d_time_series = pd.read_excel(f'{PROJECT_URL}/data/tickers/{ticker}/d_timeseries.xlsx', index_col=0)
    date = d_time_series['date'].apply(lambda x: datetime.fromtimestamp(x).date())
    date = date[:len(prediction)]
    date = date[-len(y_test):]

    prediction = prediction[::-1]
    y_test = y_test[::-1]

    prices = t_combined[:, 1].cpu().numpy()
    baselines = {}

    avg_price_predictions = np.full(len(y_test), np.nan)
    naive_predictions = np.full(len(y_test), np.nan)
    linear_reg_predictions = np.full(len(y_test), np.nan)
    ema_predictions = np.full(len(y_test), np.nan)

    for i in range(len(y_test)):
        if i + look_back_days + forecast_horizon <= len(prices):
            avg_price_predictions[i] = np.mean(prices[i:i + look_back_days])
            naive_predictions[i] = prices[i + look_back_days - 1]

            x_train = np.arange(look_back_days).reshape(-1, 1)
            y_train = prices[i:i + look_back_days].astype(float)
            model = LinearRegression().fit(x_train, y_train)
            linear_reg_predictions[i] = model.predict(np.array([[look_back_days]]))[0]

            ema = prices[i]
            for j in range(1, look_back_days):
                ema = alpha * prices[i + j] + (1 - alpha) * ema
            ema_predictions[i] = ema

    baselines['average_price'] = avg_price_predictions
    baselines['naive_persistence'] = naive_predictions
    baselines['linear_regression'] = linear_reg_predictions
    baselines['exponential_moving_average'] = ema_predictions

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=date, y=prediction, mode='lines', name='Prediction', line=dict(color='red')))
    fig.add_trace(go.Scatter(x=date, y=y_test, mode='lines', name='Actual Price', line=dict(color='blue')))

    if with_baselines:
        fig.add_trace(go.Scatter(x=date, y=baselines['average_price'], mode='lines', name='Avg Price Baseline', line=dict(color='green', dash='dash')))
        fig.add_trace(go.Scatter(x=date, y=baselines['naive_persistence'], mode='lines', name='Naive Persistence', line=dict(color='orange', dash='dash')))
        fig.add_trace(go.Scatter(x=date, y=baselines['linear_regression'], mode='lines', name='Linear Regression', line=dict(color='purple', dash='dash')))
        fig.add_trace(go.Scatter(x=date, y=baselines['exponential_moving_average'], mode='lines', name='Exponential Moving Avg', line=dict(color='brown', dash='dash')))

    fig.update_layout(title=f'Stock Prediction for {ticker}', xaxis_title='Date', yaxis_title='Price', plot_bgcolor='white')
    fig.show()

def plot_losses(losses, test_losses):
    fig = go.Figure()

    fig.add_trace(go.Scatter(
        y=losses,
        mode='lines',
        name='Training Loss',
        line=dict(color='blue')
    ))
    fig.add_trace(go.Scatter(
        y=test_losses,
        mode='lines',
        name='Test Loss',
        line=dict(color='orange')
    ))

    fig.update_layout(
        title='Training and Test Loss Over Time',
        xaxis_title='Epoch',
        yaxis_title='Loss',
        legend=dict(x=0.85, y=0.95),
        template="plotly_white"
    )

    fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='LightGray')
    fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='LightGray')

    fig.show()

Run to define the core methods for training, optimization and testing.

In [21]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

def prepare_training_data(tensor, look_back_days=90, forecast_horizon=30, closed_price_column=1, with_scaler=True):
    tensor = torch.flip(tensor, [0])

    size_rows = tensor.size(0)
    x = []
    y = []

    scalers = []
    scaled_columns = []

    price_column = tensor[:, closed_price_column].reshape(-1, 1).cpu()

    if with_scaler:
        price_scaler = MinMaxScaler(feature_range=(0, 1))
        price_column_scaled = torch.tensor(
            price_scaler.fit_transform(price_column.cpu().numpy()),
            dtype=tensor.dtype
        )
    else:
        price_scaler = None
        price_column_scaled = price_column  # No scaling

    for col_idx in range(tensor.size(1)):
        column_data = tensor[:, col_idx].unsqueeze(1)

        if with_scaler:
            scaler = StandardScaler()
            if col_idx == closed_price_column:
                scaled_column = price_column_scaled
            else:
                scaler.fit(column_data.cpu().numpy())
                scaled_column = torch.tensor(
                    scaler.transform(column_data.cpu().numpy()),
                    dtype=tensor.dtype
                )
                scalers.append(scaler)
        else:
            scaled_column = column_data  # No scaling

        scaled_columns.append(scaled_column)

    scaled_tensor = torch.cat(scaled_columns, dim=1)

    # create sequences
    for idx in range(size_rows - look_back_days - forecast_horizon):
        x_block = scaled_tensor[idx:idx + look_back_days, :]
        y_value = scaled_tensor[idx + look_back_days + forecast_horizon, closed_price_column]
        x.append(x_block)
        y.append(y_value)

    x = torch.stack(x)
    y = torch.stack(y)

    # train test split
    split_ratio = 0.8
    split_index = int(len(x) * split_ratio)
    x_train = x[:split_index]
    y_train = y[:split_index]
    x_test = x[split_index:]
    y_test = y[split_index:]

    # reshape
    y_train = y_train.view(-1, 1)
    y_test = y_test.view(-1, 1)

    return x_train, x_test, y_train, y_test, scalers if with_scaler else None, price_scaler if with_scaler else None

def evaluate_prediction(actual, forecast):
    if isinstance(actual, torch.Tensor):
        actual = actual.numpy()
    if isinstance(forecast, torch.Tensor):
        forecast = forecast.numpy()

    diff = actual - forecast

    mae = np.mean(np.abs(diff))
    mse = np.mean(diff ** 2)
    rmse = np.sqrt(mse)
    mape = np.mean(np.abs(diff / np.where(actual == 0, np.finfo(float).eps, actual))) * 100

    r_squared = 1 - (np.sum((actual - forecast) ** 2) / np.sum((actual - np.mean(actual)) ** 2))

    spearman_corr, _ = spearmanr(actual, forecast)
    pearson_corr, _ = pearsonr(actual, forecast)

    direction_true = np.sign(np.diff(actual))
    direction_pred = np.sign(np.diff(forecast))
    directional_accuracy = np.mean(direction_true == direction_pred)

    print("\n")
    print(f'Mean absolute error: {mae:.2f}')
    print(f'Mean squared error: {mse:.2f}')
    print(f'Root mean squared error: {rmse:.2f}')
    print(f'R-Squared: {r_squared:.2f}')
    print(f'Mean absolute percentage error: {mape:.2f}%')
    print(f"Spearman's correlation: {spearman_corr:.2f}")
    print(f"Pearson's correlation: {pearson_corr:.2f}")
    print(f"Directional Accuracy: {directional_accuracy * 100:.2f}%")

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_layers, output_size, dropout_rate):
        super(LSTMModel, self).__init__()
        self.lstm_layers = nn.ModuleList()
        self.dropout = nn.Dropout(dropout_rate)

        self.lstm_layers.append(nn.LSTM(input_size=input_size, hidden_size=hidden_layers[0], batch_first=True))

        for i in range(1, len(hidden_layers)):
            self.lstm_layers.append(
                nn.LSTM(input_size=hidden_layers[i - 1], hidden_size=hidden_layers[i], batch_first=True))

        self.fc = nn.Linear(hidden_layers[-1], output_size)

        self.to(device)

    def forward(self, x):
        for lstm in self.lstm_layers:
            x, _ = lstm(x)
            x = self.dropout(x)

        x = x[:, -1, :]
        x = self.fc(x)
        return x

def train_and_test(x_train, x_test, y_train, y_test, scalers, price_scaler, ticker, model, optimizer, criterion, model_name, t_combined, look_back_days, forecast_horizon, epochs=200):
    start_time = time.time()

    model = model.to(device)
    criterion = criterion.to(device)

    train_dataset = torch.utils.data.TensorDataset(x_train, y_train)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)

    losses = []
    test_losses = []

    best_loss = float('inf')
    patience = 20
    no_improve = 0
    best_prediction = None

    for epoch in range(epochs):
        model.train()
        epoch_loss = 0

        for batch_x, batch_y in train_loader:
            batch_x = batch_x.to(device)
            batch_y = batch_y.to(device)

            optimizer.zero_grad()
            batch_pred = model(batch_x)
            loss = criterion(batch_pred, batch_y)
            epoch_loss += loss.item()
            loss.backward()
            optimizer.step()

        avg_loss = epoch_loss / len(train_loader)
        losses.append(avg_loss)

        model.eval()
        with torch.no_grad():
            device = next(model.parameters()).device
            x_test = x_test.to(device)
            y_pred_test = model(x_test)

            test_loss = criterion(y_pred_test, y_test)
            test_losses.append(test_loss.item())

            if test_loss < best_loss:
                best_loss = test_loss
                best_prediction = y_pred_test.cpu().detach().numpy().flatten()
                no_improve = 0
            else:
                no_improve += 1

            if no_improve >= patience:
                print(f"EARLY STOPPING AT EPOCH {epoch}")
                break

        print(f"Epoch {epoch}: Train Loss = {avg_loss}, Test Loss = {test_loss.item()}")

    final_prediction = price_scaler.inverse_transform(best_prediction.reshape(-1, 1)).flatten()
    y_test_cpu = y_test.cpu().numpy()
    y_test_descaled = price_scaler.inverse_transform(y_test_cpu.reshape(-1, 1)).flatten()

    torch.save(model.state_dict(), f'{PROJECT_URL}/model/weights/{model_name}.pth')

    np_y_pred = np.array(final_prediction)
    df = pd.DataFrame(np_y_pred)
    os.makedirs(f"{PROJECT_URL}/model/output/{model_name}", exist_ok=True)
    df.to_csv(f"{PROJECT_URL}/model/output/{model_name}/prediction.csv", index=False)

    pd.DataFrame(losses).to_csv(f"{PROJECT_URL}/model/output/{model_name}/losses.csv", index=False)
    pd.DataFrame(test_losses).to_csv(f"{PROJECT_URL}/model/output/{model_name}/test_losses.csv", index=False)

    evaluate_prediction(y_test_descaled, final_prediction)

    end_time = time.time()
    print(f"The duration was {(end_time - start_time) // 60} minutes and {(end_time - start_time) % 60} seconds.")
    plot(losses, test_losses, final_prediction, y_test_descaled, ticker, t_combined, look_back_days, forecast_horizon)
    plot(losses, test_losses, final_prediction, y_test_descaled, ticker, t_combined, look_back_days, forecast_horizon, with_baselines=False)

def test_once(x_train, x_test, y_train, y_test, rest_scaler, price_scaler, test_ticker, model, optimizer, criterion, model_name, t_combined, look_back_days, forecast_horizon, call_data_to_excel_main=False):
    model = model.to(device)
    criterion = criterion.to(device)

    x_test = x_test.to(device)
    y_test = y_test.to(device)

    model.eval()

    with torch.no_grad():
        y_pred_test = model(x_test)

    prediction_descaled = price_scaler.inverse_transform(y_pred_test.cpu().numpy().reshape(-1, 1)).flatten()
    y_test_descaled = price_scaler.inverse_transform(y_test.cpu().numpy().reshape(-1, 1)).flatten()

    losses = pd.read_csv(f"{PROJECT_URL}/model/output/{model_name}/losses.csv")['0'].tolist()
    test_losses = pd.read_csv(f"{PROJECT_URL}/model/output/{model_name}/test_losses.csv")['0'].tolist()

    plot(losses, test_losses, prediction_descaled, y_test_descaled, test_ticker, t_combined, look_back_days, forecast_horizon)

    evaluate_prediction(y_test_descaled, prediction_descaled)

def optimize(model_name='all_features_scaled', with_quarterly_income=True, with_econ = True, with_scaler=True, trials=50):
    start_time = time.time()

    study = optuna.create_study(direction="minimize")  # minimize loss
    study.optimize(lambda trial: objective(trial=trial, with_quarterly_income=with_quarterly_income, with_econ=with_econ, with_scaler=with_scaler, model_name=model_name), n_trials=trials)

    best_params = study.best_params
    best_loss = study.best_value

    best_params['best_loss'] = best_loss
    file_path = f'{PROJECT_URL}/model/hyperparameters/{model_name}.json'
    os.makedirs(os.path.dirname(f'{PROJECT_URL}/model/hyperparameters'), exist_ok=True)

    with open(file_path, 'w') as f:
        json.dump(best_params, f, indent=4)

    end_time = time.time()
    print(f"The duration was {(end_time - start_time) // 60} minutes and {(end_time - start_time) % 60} seconds.")

def objective(trial, with_quarterly_income=True, with_econ=True, with_scaler=True, model_name='all_features_scaled'):

    num_layers = trial.suggest_int("num_layers", 3, 6)
    hidden_layers = [trial.suggest_int(f"hidden_size_{i}", 50, 1000) for i in range(num_layers)]
    dropout_rate = trial.suggest_float("dropout_rate", 0.0, 0.5)
    learning_rate = trial.suggest_float("learning_rate", 1e-3, 1e-2, log=True)
    look_back_days = trial.suggest_int("look_back_days", 20, 120)
    forecast_horizon = trial.suggest_int("forecast_horizon", 7, 60)

    input_size = 6
    if with_quarterly_income:
        input_size += 23
    if with_econ:
        input_size += 9

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = LSTMModel(input_size=input_size, hidden_layers=hidden_layers, output_size=1, dropout_rate=dropout_rate)
    model = model.to(device)

    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.0001)
    criterion = nn.MSELoss()

    ticker = 'AAPL'
    t_combined = data_main(ticker, with_quarterly_income, with_econ)
    x_train, x_test, y_train, y_test, rest_scaler, price_scaler = prepare_training_data(
        t_combined, with_scaler=with_scaler, look_back_days=look_back_days, forecast_horizon=forecast_horizon
    )

    x_train, x_test, y_train, y_test = x_train.to(device), x_test.to(device), y_train.to(device), y_test.to(device)

    print(f"Model device: {next(model.parameters()).device}")

    train_and_test(x_train, x_test, y_train, y_test, rest_scaler, price_scaler, ticker, model, optimizer, criterion, model_name, t_combined, look_back_days, forecast_horizon, epochs=10)

    with torch.no_grad():
        y_pred_test = model(x_test)
        loss = criterion(y_pred_test, y_test)

    return loss.item()

def train_and_test(x_train, x_test, y_train, y_test, scalers, price_scaler, ticker, model, optimizer, criterion, model_name,t_combined, look_back_days, forecast_horizon, epochs=200):
    start_time = time.time()

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    x_train, x_test, y_train, y_test = x_train.to(device), x_test.to(device), y_train.to(device), y_test.to(device)

    train_dataset = torch.utils.data.TensorDataset(x_train.to(device), y_train.to(device))
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)


    losses = []
    test_losses = []

    best_loss = float('inf')
    patience = 30
    no_improve = 0
    best_prediction = None

    for epoch in range(epochs):
        model.train()
        epoch_loss = 0

        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            optimizer.zero_grad()

            batch_pred = model(batch_x)

            loss = criterion(batch_pred, batch_y)
            epoch_loss += loss.item()

            loss.backward()
            optimizer.step()

        avg_loss = epoch_loss / len(train_loader)
        losses.append(avg_loss)

        model.eval()
        with torch.no_grad():
            y_pred_test = model(x_test.to(device))
            test_loss = criterion(y_pred_test, y_test)
            test_losses.append(test_loss.item())

            if test_loss < best_loss:
                best_loss = test_loss
                best_prediction = y_pred_test.cpu().detach().numpy().flatten()
                no_improve = 0
            else:
                no_improve += 1

            if no_improve >= patience:
                print(f"EARLY STOPPING AT EPOCH {epoch}")
                break

        print(f"Epoch {epoch}: Train Loss = {avg_loss}, Test Loss = {test_loss.item()}")

    if scalers is not None:
        final_prediction = price_scaler.inverse_transform(best_prediction.reshape(-1, 1)).flatten()
        y_test_descaled = price_scaler.inverse_transform(y_test.cpu().numpy().reshape(-1, 1)).flatten()
    else:
        final_prediction = best_prediction
        y_test_descaled = y_test.cpu().numpy().flatten()

    print(f"Saved parameters for model: {PROJECT_URL}/model/weights/{model_name}.pth", )
    torch.save(model.state_dict(), f'{PROJECT_URL}/model/weights/{model_name}.pth')

    np_y_pred = np.array(final_prediction)
    df = pd.DataFrame(np_y_pred)
    os.makedirs(f"{PROJECT_URL}/model/output/{model_name}", exist_ok=True)
    df.to_csv(f"{PROJECT_URL}/model/output/{model_name}/prediction.csv", index=False)

    pd.DataFrame(losses).to_csv(f"{PROJECT_URL}/model/output/{model_name}/losses.csv", index=False)
    pd.DataFrame(test_losses).to_csv(f"{PROJECT_URL}/model/output/{model_name}/test_losses.csv", index=False)

    evaluate_prediction(y_test_descaled, final_prediction)

    end_time = time.time()
    print(f"The duration was {(end_time - start_time) // 60} minutes and {(end_time - start_time) % 60} seconds.")
    plot(losses, test_losses, final_prediction, y_test_descaled, ticker, t_combined, look_back_days, forecast_horizon)

def choose_hyperparameters_manually(
        model_name='all_features_scaled',
        with_quarterly_income=True,
        with_econ=True,
        num_layers=2,
        hidden_neurons_per_layer=None,
        dropout_rate=0.2,
        learning_rate=0.001,
        look_back_days=60,
        forecast_horizon=30
):
    input_size = 6  # timeseries features + date
    if with_quarterly_income:
        input_size += 23
    if with_econ:
        input_size += 9

    if hidden_neurons_per_layer is None:
        hidden_neurons_per_layer = [64] * num_layers

    if len(hidden_neurons_per_layer) != num_layers:
        raise ValueError(
            f"Expected {num_layers} hidden layers, but got {len(hidden_neurons_per_layer)} in hidden_neurons_per_layer")

    model = LSTMModel(
        input_size=input_size,
        hidden_layers=hidden_neurons_per_layer,
        output_size=1,
        dropout_rate=dropout_rate
    ).to(device)

    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.0001)
    criterion = nn.MSELoss().to(device)

    return model, optimizer, criterion, look_back_days, forecast_horizon

def load_hyperparameters(model_name='all_features_scaled', with_quarterly_income=True, with_econ=True):
    file_path = f'{PROJECT_URL}/model/hyperparameters/{model_name}.json'
    with open(file_path, 'r') as f:
        best_params = json.load(f)

    input_size = 6
    if with_quarterly_income: input_size += 23
    if with_econ: input_size += 9
    hidden_layers = [best_params[f"hidden_size_{i}"] for i in range(best_params["num_layers"])]
    dropout_rate = best_params["dropout_rate"]
    learning_rate = best_params["learning_rate"]
    look_back_days = best_params["look_back_days"]
    forecast_horizon = best_params["forecast_horizon"]

    model = LSTMModel(
        input_size=input_size,
        hidden_layers=hidden_layers,
        output_size=1,
        dropout_rate=dropout_rate
    ).to(device)

    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.0001)
    criterion = nn.MSELoss().to(device)

    return model, optimizer, criterion, look_back_days, forecast_horizon

def main(action: str, model_name: str):
    model_list = ('all_features_scaled', 'all_features_unscaled', 'timeseries_quarterly_reports_scaled', 'timeseries_econ_scaled')

    match model_name:
      case 'all_features_scaled': with_quarterly_income = True; with_econ = True; with_scaler = True;
      case 'all_features_unscaled': with_quarterly_income = True; with_econ = True; with_scaler = False;
      case 'timeseries_quarterly_reports_scaled': with_quarterly_income = True; with_econ = False; with_scaler = True;
      case 'timeseries_econ_scaled': with_quarterly_income = False; with_econ = True; with_scaler = True;

    match action:
        case 'optimize':
            optimize(model_name=model_name, with_quarterly_income=with_quarterly_income, with_econ=with_econ, with_scaler=with_scaler, trials=100)

        case 'train':
            ticker = 'AAPL'
            t_combined = data_main(ticker, with_quarterly_income = with_quarterly_income, with_econ = with_econ)
            model, optimizer, criterion, look_back_days, forecast_horizon = load_hyperparameters(model_name=model_name, with_quarterly_income=with_quarterly_income, with_econ=with_econ)
            #model, optimizer, criterion, look_back_days, forecast_horizon = choose_hyperparameters_manually(model_name=model_name, with_econ=with_econ, with_quarterly_income=with_quarterly_income,num_layers=3, hidden_neurons_per_layer=[543, 831, 486], dropout_rate=0.3434091618324021, learning_rate=0.0010729338976430665, look_back_days = 117, forecast_horizon = 16)
            x_train, x_test, y_train, y_test, scalers, price_scaler= prepare_training_data(t_combined, look_back_days, forecast_horizon)
            train_and_test(x_train, x_test, y_train, y_test, scalers, price_scaler, ticker, model, optimizer, criterion, model_name, t_combined, look_back_days, forecast_horizon, epochs=2)

        case 'test':
            ticker = 'NFLX'
            t_combined = data_main(ticker, with_quarterly_income = with_quarterly_income, with_econ = with_econ)
            model, optimizer, criterion, look_back_days, forecast_horizon = load_hyperparameters(model_name=model_name, with_quarterly_income = with_quarterly_income, with_econ=with_econ)
            print("model_name", model_name)
            model.load_state_dict(torch.load(f'{PROJECT_URL}/model/weights/{model_name}.pth', map_location=device))
            model.to(device)

            x_train, x_test, y_train, y_test, rest_scaler, price_scaler = prepare_training_data(t_combined, look_back_days, forecast_horizon)
            test_once(x_train, x_test, y_train, y_test, rest_scaler, price_scaler, ticker, model, optimizer, criterion, model_name, t_combined, look_back_days, forecast_horizon)


Using device: cuda


# **Model Usage Guide**

## **Available Models**
You may choose between four different models:

- `all_features_scaled`: Uses time series data, quarterly reports, and macroeconomic data, all scaled.
- `all_features_unscaled`: Same as above but without scaling.
- `timeseries_quarterly_reports_scaled`: Uses time series and quarterly reports, both scaled.
- `timeseries_econ_scaled`: Uses time series data and macroeconomic data, both scaled.

## **Available Actions**
You can perform three different actions:

- `optimize`
- `train`
- `test`

---

## **Optimize**

Hyperparameter optimization is done using Optuna with a default of 100 trials. The best hyperparameters are automatically saved as a `.json` file in `model/hyperparameters` under the respective model name. The default stock is Apple.

### **Customization**
- Adjust hyperparameter ranges in the `objective` method.
- Modify the number of trials by setting the `trials` parameter in the `main` function (`case: 'optimize'`).

---

## **Train**

Training a model involves loading optimal hyperparameters and training on Apple's stock (AAPL).

### **Customization**
- Load best hyperparameters automatically using `load_hyperparameters`.
- Manually set hyperparameters with `choose_hyperparameters_manually`.
-Adjust the number of epochs (default = 200) in the `train_and_test` method via the `epochs` parameter.

---

## **Test**

Testing a trained model is performed on Netflix stock (NFLX).

### **Process**
- Loads the optimal hyperparameters automatically.
- Loads the trained model weights automatically.
- Runs the model once and displays results in the console.

---

By downloading the files from GitHub, hyperparameters and weights already exist. However, they will be overwritten as soon as you run `optimize` or `train`.


In [24]:
main(model_name='timeseries_econ_scaled', action='test')

model_name timeseries_econ_scaled



You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.





Mean absolute error: 276.16
Mean squared error: 96344.98
Root mean squared error: 310.39
R-Squared: -4.15
Mean absolute percentage error: 57.86%
Spearman's correlation: -0.28
Pearson's correlation: -0.11
Directional Accuracy: 34.20%
