In [23]:
%%capture
!pip install gradio safetensors yfinance optuna

In [24]:
import gradio as gr
import plotly.graph_objects as go

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
import yfinance as yf
from safetensors.torch import save_file

import optuna

In [25]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [26]:
# -------------------------------
# Model and Dataset Classes
# -------------------------------
class LSTMTimeSeries(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, output_size=1):
        super(LSTMTimeSeries, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = out[:, -1, :]
        out = self.fc(out)
        return out

In [27]:
class SPYDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        return self.X[index], self.y[index]

In [28]:
# -------------------------------
# Helper Functions
# -------------------------------
def create_sequences(data, window_size=30, target_step=1):
    X, y = [], []
    num_samples = len(data)
    for i in range(num_samples - window_size - target_step + 1):
        X_seq = data[i : i + window_size]
        y_seq = data[i + window_size + target_step - 1]
        X.append(X_seq)
        y.append(y_seq[3])  # Only predict 'Close' (index = 3)
    return np.array(X), np.array(y)

In [29]:
def load_data_for_tuning(ticker, start_date, end_date,
                         sequence_length=30,
                         prediction_days=1,
                         val_ratio=0.1, test_ratio=0.1):
    """
    Fetch data, create sequences, and split into train/val/test sets.
    val_ratio and test_ratio are fractions of the entire dataset used for validation and test.
    """
    # Fetch data
    spy_df = yf.download(ticker, start=start_date, end=end_date)
    spy_df.reset_index(inplace=True)

    features = ['Open', 'High', 'Low', 'Close']
    data = spy_df[features].values

    # Scale data
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)

    # Create sequences
    X_all, y_all = create_sequences(data_scaled, window_size=sequence_length, target_step=prediction_days)

    # Train/val/test split
    dataset_size = len(X_all)
    val_size = int(val_ratio * dataset_size)
    test_size = int(test_ratio * dataset_size)
    train_size = dataset_size - val_size - test_size

    X_train, X_val, X_test = X_all[:train_size], X_all[train_size:train_size+val_size], X_all[train_size+val_size:]
    y_train, y_val, y_test = y_all[:train_size], y_all[train_size:train_size+val_size], y_all[train_size+val_size:]

    return X_train, y_train, X_val, y_val, X_test, y_test, scaler, data

In [30]:
# -------------------------------
# Objective Function for Optuna
# -------------------------------
def objective(trial, ticker, start_date, end_date,
              sequence_length=30,
              prediction_days=1,
              n_epochs=10,
              val_ratio=0.1,
              test_ratio=0.1):

    """
    Objective function that Optuna will call multiple times,
    each time sampling different hyperparameters.
    """

    # -----------------------------
    # 1) Suggest hyperparameters
    # -----------------------------
    hidden_size = trial.suggest_int("hidden_size", 32, 256, step=32)
    num_layers = trial.suggest_int("num_layers", 1, 3)
    lr = trial.suggest_float("lr", 1e-4, 0.1, log=True)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])

    # -----------------------------
    # 2) Load data
    # -----------------------------
    X_train, y_train, X_val, y_val, X_test, y_test, scaler, data = load_data_for_tuning(
        ticker, start_date, end_date,
        sequence_length=sequence_length,
        prediction_days=prediction_days,
        val_ratio=val_ratio,
        test_ratio=test_ratio
    )

    # -----------------------------
    # 3) Create DataLoaders
    # -----------------------------
    train_dataset = SPYDataset(X_train, y_train)
    val_dataset   = SPYDataset(X_val, y_val)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # -----------------------------
    # 4) Define model, loss, opt
    # -----------------------------
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    input_size = 4  # ['Open', 'High', 'Low', 'Close']
    output_size = 1

    model = LSTMTimeSeries(input_size, hidden_size, num_layers, output_size).to(device)

    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # -----------------------------
    # 5) Train
    # -----------------------------
    model.train()
    for epoch in range(n_epochs):
        for X_batch, y_batch in train_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device).unsqueeze(1)

            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

    # -----------------------------
    # 6) Validation
    # -----------------------------
    model.eval()
    val_predictions, val_actuals = [], []
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(device)
            outputs = model(X_batch)
            val_predictions.append(outputs.cpu().numpy())
            val_actuals.append(y_batch.numpy().reshape(-1, 1))

    val_predictions = np.vstack(val_predictions)
    val_actuals = np.vstack(val_actuals)

    # Invert scaling for 'Close'
    pred_close_scaled = np.zeros((len(val_predictions), data.shape[1]))
    act_close_scaled = np.zeros((len(val_actuals), data.shape[1]))

    # Remember: data.shape[1] = 4
    pred_close_scaled[:, 3] = val_predictions.flatten()
    act_close_scaled[:, 3] = val_actuals.flatten()

    pred_close = scaler.inverse_transform(pred_close_scaled)[:, 3]
    act_close  = scaler.inverse_transform(act_close_scaled)[:, 3]

    rmse_val = float(np.sqrt(np.mean((pred_close - act_close) ** 2)))

    return rmse_val

In [31]:
# -------------------------------
# Run Optuna Tuning
# -------------------------------
def tune_hyperparams(ticker="SPY",
                     start_date="2020-01-24",
                     end_date="2025-01-31",
                     sequence_length=30,
                     prediction_days=1,
                     n_epochs=10,
                     val_ratio=0.1,
                     test_ratio=0.1,
                     n_trials=20):
    """
    Runs an Optuna study to tune hyperparameters of the LSTM model.
    Returns the best hyperparameters.
    """
    def optuna_objective(trial):
        return objective(
            trial,
            ticker=ticker,
            start_date=start_date,
            end_date=end_date,
            sequence_length=sequence_length,
            prediction_days=prediction_days,
            n_epochs=n_epochs,
            val_ratio=val_ratio,
            test_ratio=test_ratio
        )

    study = optuna.create_study(direction="minimize")
    study.optimize(optuna_objective, n_trials=n_trials)

    print("Best trial:", study.best_trial)
    print("Best RMSE:", study.best_value)
    print("Best hyperparameters:", study.best_params)
    return study.best_params

In [32]:
def get_or_train_model(train_loader, input_size, hidden_size, num_layers, output_size,
                       epochs, lr, model_file="lstm_model.safetensors"):
    """
    Tries to load a model from a safetensors file. If it does not exist (or fails to load),
    trains a new model using the provided training DataLoader and hyperparameters,
    then saves the model in safetensors format.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = LSTMTimeSeries(input_size, hidden_size, num_layers, output_size).to(device)

    try:
        if os.path.exists(model_file):
            print(f"Loading existing model from {model_file}...")
            loaded_state_dict = load_file(model_file)
            model.load_state_dict(loaded_state_dict)
        else:
            raise FileNotFoundError
    except Exception as e:
        print("Model not found or failed to load; training new model...")
        criterion = nn.MSELoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)

        model.train()
        for epoch in range(epochs):
            epoch_loss = 0.0
            for X_batch, y_batch in train_loader:
                X_batch = X_batch.to(device)
                y_batch = y_batch.to(device).unsqueeze(1)

                optimizer.zero_grad()
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                loss.backward()
                optimizer.step()

                epoch_loss += loss.item() * X_batch.size(0)
            epoch_loss /= len(train_loader.dataset)
            print(f"Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss:.6f}")

        # Save the trained model as a safetensor
        state_dict_cpu = {k: v.cpu() for k, v in model.state_dict().items()}
        save_file(state_dict_cpu, model_file)
        print(f"New model saved as {model_file}")

    # Set model to evaluation mode for inference
    model.eval()
    return model

In [33]:
# -------------------------------
# Predict & Forecast Function
# -------------------------------
def predict_spy(ticker, start_date, end_date,
                prediction_days=1,
                sequence_length=30,
                epochs=10,
                forecast_days=5,
                # Optionally pass tuned params or use defaults
                hidden_size=64,
                num_layers=1,
                lr=1e-3,
                batch_size=32):
    """
    Main function to fetch data from Yahoo Finance, train the LSTM, and extend predictions to future days.
    Optionally uses hyperparameters from Optuna (if you supply them).
    """
    spy_df = yf.download(ticker, start=start_date, end=end_date)
    spy_df.reset_index(inplace=True)

    features = ['Open', 'High', 'Low', 'Close']
    data = spy_df[features].values

    # Scale data
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)

    # Create sequences
    X_all, y_all = create_sequences(data_scaled, window_size=sequence_length, target_step=prediction_days)

    # Train/test split (80/20)
    train_size = int(0.8 * len(X_all))
    X_train, X_test = X_all[:train_size], X_all[train_size:]
    y_train, y_test = y_all[:train_size], y_all[train_size:]

    # Datasets & Loaders
    train_dataset = SPYDataset(X_train, y_train)
    test_dataset  = SPYDataset(X_test,  y_test)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False)

    # Model setup
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    input_size = len(features)
    output_size = 1

    model = LSTMTimeSeries(input_size, hidden_size, num_layers, output_size).to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Try to load an existing model; if not found, train a new one
    model = get_or_train_model(train_loader, input_size, hidden_size, num_layers,
                               output_size, epochs, lr, model_file="lstm_model.safetensors")

    # Training
    model.train()
    for epoch in range(epochs):
        epoch_loss = 0.0
        for X_batch, y_batch in train_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device).unsqueeze(1)

            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item() * X_batch.size(0)
        epoch_loss /= len(train_loader.dataset)
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss:.6f}")

    # Predictions on test set
    model.eval()
    predictions, actuals = [], []
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(device)
            outputs = model(X_batch)
            predictions.append(outputs.cpu().numpy())
            actuals.append(y_batch.numpy().reshape(-1, 1))

    predictions = np.vstack(predictions)
    actuals     = np.vstack(actuals)

    # Invert scaling (Close only)
    pred_close_scaled = np.zeros((len(predictions), data.shape[1]))
    act_close_scaled  = np.zeros((len(actuals), data.shape[1]))

    pred_close_scaled[:, 3] = predictions.flatten()
    act_close_scaled[:, 3]  = actuals.flatten()

    pred_close = scaler.inverse_transform(pred_close_scaled)[:, 3]
    act_close  = scaler.inverse_transform(act_close_scaled)[:, 3]

    mse  = np.mean((pred_close - act_close) ** 2)
    rmse = float(np.sqrt(mse))

    # Forecast future prices
    future_predictions = []
    last_sequence = data_scaled[-sequence_length:]  # Last available sequence

    with torch.no_grad():
        for _ in range(forecast_days):
            last_seq_tensor = torch.tensor(last_sequence, dtype=torch.float32).unsqueeze(0).to(device)
            next_scaled_price = model(last_seq_tensor).cpu().numpy()

            # Insert into a zero vector so we can invert scale
            next_scaled_price_full = np.zeros((1, data.shape[1]))
            next_scaled_price_full[0, 3] = next_scaled_price
            next_price = scaler.inverse_transform(next_scaled_price_full)[0, 3]

            future_predictions.append(next_price)

            # Shift sequence and add new predicted close
            next_sequence = np.roll(last_sequence, -1, axis=0)
            next_sequence[-1, 3] = next_scaled_price
            last_sequence = next_sequence

    # Combine predictions and future forecasts
    forecast_indices = np.arange(len(act_close), len(act_close) + forecast_days)
    fig = go.Figure()
    fig.add_trace(go.Scatter(y=act_close, mode='lines', name='Actual Close'))
    fig.add_trace(go.Scatter(y=pred_close, mode='lines', name='Predicted Close'))
    fig.add_trace(go.Scatter(x=forecast_indices, y=future_predictions,
                             mode='lines', name='Forecasted Prices',
                             line=dict(dash='dot')))

    fig.update_layout(
        title=f'{ticker} Actual, Predicted, and Forecasted Close Prices',
        xaxis_title='Index',
        yaxis_title='Price'
    )

    # Make sure to move tensors to CPU before saving (safetensors works best with CPU tensors)
    state_dict_cpu = {k: v.cpu() for k, v in model.state_dict().items()}

    # Save the state dictionary to a file named "lstm_model.safetensors"
    save_file(state_dict_cpu, "lstm_model.safetensors")
    print("Model saved as lstm_model.safetensors")

    return fig, rmse

In [34]:
ticker = "SPY"
start_date="2021-01-24"
end_date="2025-02-09"
prediction_days=1

In [35]:
# Evaluate best hyperparameters
best_params = tune_hyperparams(
    ticker=ticker,
    start_date=start_date,
    end_date=end_date,
    sequence_length=30,
    prediction_days=prediction_days,
    n_epochs=10,
    val_ratio=0.1,
    test_ratio=0.1,
    n_trials=20
)
print(best_params)

[I 2025-02-09 21:04:23,802] A new study created in memory with name: no-name-b329c076-0b8a-471b-9e6c-7a978df002bf
[*********************100%***********************]  1 of 1 completed
[I 2025-02-09 21:04:25,225] Trial 0 finished with value: 130.75962542992391 and parameters: {'hidden_size': 192, 'num_layers': 1, 'lr': 0.06208813965888773, 'batch_size': 16}. Best is trial 0 with value: 130.75962542992391.
[*********************100%***********************]  1 of 1 completed
[I 2025-02-09 21:04:25,536] Trial 1 finished with value: 45.429668151885295 and parameters: {'hidden_size': 224, 'num_layers': 1, 'lr': 0.0003560959275959684, 'batch_size': 128}. Best is trial 1 with value: 45.429668151885295.
[*********************100%***********************]  1 of 1 completed
[I 2025-02-09 21:04:26,190] Trial 2 finished with value: 34.74052328356455 and parameters: {'hidden_size': 64, 'num_layers': 1, 'lr': 0.09641401676052264, 'batch_size': 32}. Best is trial 2 with value: 34.74052328356455.
[******

Best trial: FrozenTrial(number=16, state=1, values=[6.979470906815442], datetime_start=datetime.datetime(2025, 2, 9, 21, 4, 35, 387082), datetime_complete=datetime.datetime(2025, 2, 9, 21, 4, 36, 154991), params={'hidden_size': 192, 'num_layers': 1, 'lr': 0.0021489302541457334, 'batch_size': 64}, user_attrs={}, system_attrs={}, intermediate_values={}, distributions={'hidden_size': IntDistribution(high=256, log=False, low=32, step=32), 'num_layers': IntDistribution(high=3, log=False, low=1, step=1), 'lr': FloatDistribution(high=0.1, log=True, low=0.0001, step=None), 'batch_size': CategoricalDistribution(choices=(16, 32, 64, 128))}, trial_id=16, value=None)
Best RMSE: 6.979470906815442
Best hyperparameters: {'hidden_size': 192, 'num_layers': 1, 'lr': 0.0021489302541457334, 'batch_size': 64}
{'hidden_size': 192, 'num_layers': 1, 'lr': 0.0021489302541457334, 'batch_size': 64}


In [36]:
# Pass into prediction model
fig, rmse = predict_spy(
    ticker=ticker,
    start_date=start_date,
    end_date=end_date,
    prediction_days=prediction_days,
    sequence_length=30,
    epochs=10,
    forecast_days=5,
    hidden_size=best_params["hidden_size"],
    num_layers=best_params["num_layers"],
    lr=best_params["lr"],
    batch_size=best_params["batch_size"]
)

[*********************100%***********************]  1 of 1 completed


Model not found or failed to load; training new model...
Epoch [1/10], Loss: 0.031693
Epoch [2/10], Loss: 0.005383
Epoch [3/10], Loss: 0.001451
Epoch [4/10], Loss: 0.000808
Epoch [5/10], Loss: 0.000610
Epoch [6/10], Loss: 0.000585
Epoch [7/10], Loss: 0.000550
Epoch [8/10], Loss: 0.000539
Epoch [9/10], Loss: 0.000536
Epoch [10/10], Loss: 0.000530
New model saved as lstm_model.safetensors
Epoch [1/10], Loss: 0.000520
Epoch [2/10], Loss: 0.000520
Epoch [3/10], Loss: 0.000520
Epoch [4/10], Loss: 0.000520
Epoch [5/10], Loss: 0.000520
Epoch [6/10], Loss: 0.000520
Epoch [7/10], Loss: 0.000520
Epoch [8/10], Loss: 0.000520
Epoch [9/10], Loss: 0.000520
Epoch [10/10], Loss: 0.000520
Model saved as lstm_model.safetensors


In [37]:
# -------------------------------
# Gradio Interface
# -------------------------------
demo = gr.Interface(
    fn=predict_spy,
    inputs=[
        gr.Textbox(label="Enter Ticker Symbol", placeholder="e.g., SPY", value="SPY"),
        gr.Textbox(label="Start Date", value="2019-01-24"),
        gr.Textbox(label="End Date", value="2025-01-28"),
        gr.Slider(label="Prediction Days Ahead", minimum=1, maximum=30, value=1, step=1),
        gr.Slider(label="Sequence Length", minimum=10, maximum=200, value=60, step=5),
        gr.Slider(label="Number of Epochs", minimum=1, maximum=50, value=10, step=1),
        gr.Slider(label="Forecast Days", minimum=1, maximum=60, value=30, step=1)
    ],
    outputs=[
        gr.Plot(label="Predictions and Forecast"),
        gr.Number(label="RMSE")
    ],
    description=(
        "Enter a stock ticker symbol (e.g., SPY) to fetch historical data from Yahoo Finance. "
        "Adjust the date range, prediction days, sequence length, and training epochs. "
        "The model predicts and forecasts future prices."
    )
)

In [38]:
if __name__ == "__main__":
    # Run hyperparameter tuning first (optional)
    # Then pass them into predict_spy (for instance) in your actual usage.

    demo.launch()

Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://2f20ce5d00be2ef06b.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
