In [23]:
#!pip install -q tensorflow plotly scikit-learn requests python-dotenv


In [24]:
import os
from dotenv import load_dotenv
import requests
import pandas as pd
import numpy as np
from datetime import datetime
import plotly.graph_objects as go
import tensorflow as tf
from tensorflow.keras.models import Sequential # type: ignore
from tensorflow.keras.layers import GRU, Dense # type: ignore
from tensorflow.keras.callbacks import EarlyStopping # type: ignore
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Determinism
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)


In [25]:
# Load environment variables
load_dotenv()
API_KEY = os.getenv("API_KEY")
if not API_KEY:
    raise ValueError("API_KEY not found in .env file.")


In [26]:
symbols_text = input("Enter comma-separated symbols (e.g., AAPL,MSFT,GOOGL): ")
symbols = [s.strip().upper() for s in symbols_text.split(",") if s.strip()]

window = int(input("Enter GRU Lookback Window (hours, e.g., 60): "))
future_steps = int(input("Enter Forecast Steps (hours, e.g., 5): "))


Enter comma-separated symbols (e.g., AAPL,MSFT,GOOGL): JPM
Enter GRU Lookback Window (hours, e.g., 60): 200
Enter Forecast Steps (hours, e.g., 5): 5


In [27]:
def prep_data(prices: np.ndarray, window: int):
    if len(prices) <= window:
        return np.empty((0, window, 1)), np.empty((0, 1)), None
    scaler = MinMaxScaler()
    scaled = scaler.fit_transform(prices.reshape(-1,1).astype(np.float32))
    X, y = [], []
    for i in range(window, len(scaled)):
        X.append(scaled[i-window:i])
        y.append(scaled[i])
    X = np.array(X, dtype=np.float32)
    y = np.array(y, dtype=np.float32).reshape(-1, 1)
    return X, y, scaler

def build_gru_model(window: int):
    model = Sequential([
        GRU(16, return_sequences=False, input_shape=(window, 1)),
        Dense(1)
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.005), loss='mse')
    return model

def evaluate_model(y_true, y_pred):
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-8))) * 100
    accuracy = 100 - mape
    return {
        "MSE": mse,
        "RMSE": rmse,
        "MAE": mae,
        "R2": r2,
        "MAPE%": mape,
        "Total Accuracy%": accuracy
    }

def train_gru(symbol: str, prices: np.ndarray, window: int, epochs: int = 8, batch_size: int = 32):
    X, y, scaler = prep_data(prices, window)
    if X.shape[0] == 0:
        return None, None, None
    model = build_gru_model(window)
    es = EarlyStopping(monitor="loss", patience=2, restore_best_weights=True)
    model.fit(X, y, epochs=epochs, batch_size=batch_size, verbose=0, callbacks=[es])
    y_pred = model.predict(X, verbose=0)
    metrics = evaluate_model(scaler.inverse_transform(y), scaler.inverse_transform(y_pred))
    return model, scaler, metrics

def gru_forecast(symbol: str, prices: np.ndarray, model, scaler, window: int, steps: int):
    last_seq = prices[-window:].astype(np.float32)
    forecast = []
    for _ in range(steps):
        scaled = scaler.transform(last_seq.reshape(-1, 1))
        pred = model.predict(scaled.reshape(1, window, 1), verbose=0)
        pred_val = float(scaler.inverse_transform(pred)[0][0])
        forecast.append(pred_val)
        last_seq = np.append(last_seq[1:], pred_val)
    return np.array(forecast, dtype=np.float32)


In [28]:
def fetch_historical_data(symbol: str, size: int = 4000):
    try:
        res = requests.get(
            f"https://api.twelvedata.com/time_series?symbol={symbol}&interval=1h&outputsize={size}&apikey={API_KEY}"
        ).json()
        if "values" not in res:
            return pd.DataFrame(columns=["datetime", "close"])
        df = pd.DataFrame(res["values"])
        df["datetime"] = pd.to_datetime(df["datetime"])
        df["close"] = pd.to_numeric(df["close"], errors="coerce")
        return df.sort_values("datetime").reset_index(drop=True)
    except Exception:
        return pd.DataFrame(columns=["datetime", "close"])

def fetch_price(symbol: str):
    try:
        res = requests.get(f"https://api.twelvedata.com/price?symbol={symbol}&apikey={API_KEY}").json()
        return float(res.get("price", None)) if res.get("price", None) is not None else None
    except Exception:
        return None


In [29]:
gru_models = {}
scalers = {}
metrics_dict = {}

for symbol in symbols:
    print(f"\nProcessing {symbol}...")
    df = fetch_historical_data(symbol)
    if df.empty:
        print(f"No historical data for {symbol}.")
        continue

    latest = fetch_price(symbol)
    if latest is not None and df["datetime"].iloc[-1] < pd.Timestamp.now():
        df = pd.concat([df, pd.DataFrame({"datetime":[pd.Timestamp.now()], "close":[latest]})], ignore_index=True)

    prices = df['close'].dropna().values.astype(np.float32)

    if len(prices) <= window:
        print(f"Not enough data for {symbol}. Showing last prices only.")
        forecast = np.array([float(prices[-1])] * future_steps) if len(prices) > 0 else np.array([0.0]*future_steps)
    else:
        model, scaler, metrics = train_gru(symbol, prices, window)
        if model is None:
            forecast = np.array([float(prices[-1])] * future_steps)
        else:
            gru_models[symbol] = model
            scalers[symbol] = scaler
            metrics_dict[symbol] = metrics
            forecast = gru_forecast(symbol, prices, model, scaler, window, future_steps)

    plot_df = df.tail(300)
    future_time = pd.date_range(plot_df["datetime"].iloc[-1], periods=future_steps+1, freq="H")[1:]

    fig = go.Figure()

# Actual prices
    fig.add_trace(go.Scatter(
     x=plot_df["datetime"], y=plot_df["close"],
     mode="lines+markers",
     name=f"{symbol} Price",
     line=dict(width=3, color="green"),
     marker=dict(size=5)
))

# GRU Forecast
    fig.add_trace(go.Scatter(
     x=future_time, y=forecast,
     mode="lines+markers",
     name="GRU Forecast",
     line=dict(width=3, dash="dash", color="red"),
     marker=dict(size=5)
))

# Highlight latest actual price
    fig.add_trace(go.Scatter(
     x=[plot_df["datetime"].iloc[-1]],
     y=[plot_df["close"].iloc[-1]],
     mode="markers+text",
     marker=dict(size=10, color="blue"),
     text=["Latest"],
     textposition="top right",
     name="Latest Price"
))

# Highlight first forecast point
    fig.add_trace(go.Scatter(
     x=[future_time[0]],
     y=[forecast[0]],
     mode="markers+text",
     marker=dict(size=10, color="orange"),
     text=["Forecast Start"],
     textposition="bottom right",
     name="Forecast Start"
))

# Layout improvements
    fig.update_layout(
     width=1000,
     height=600,
     title=f"{symbol} — Price & GRU Forecast",
     xaxis_title="Datetime",
     yaxis_title="Price",
     hovermode="x unified",
     legend=dict(x=0.01, y=0.99, bgcolor="rgba(255,255,255,0.1)"),
     template="plotly_white"
)

    fig.show()


Processing JPM...



Do not pass an `input_shape`/`input_dim` argument to a layer. When using Sequential models, prefer using an `Input(shape)` object as the first layer in the model instead.


'H' is deprecated and will be removed in a future version, please use 'h' instead.

