###  Pos B. LSTM RNN Model (Multi-feature OHLCV with Relative Strength Index (RSI))

In [32]:
# %pip install protobuf==3.20.3

import yfinance as yf
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import LSTM, Dense
import time
from datetime import datetime, timedelta

def get_lstm_func(ticker="AAPL"):
    # === Date Setup for 1-Year Window ===
    end_date = datetime.today()
    start_date = end_date - timedelta(days=365)
    
    # === Data Collection ===
    df = yf.download(ticker, start=start_date.strftime("%Y-%m-%d"), end=end_date.strftime("%Y-%m-%d"))
    
    # === RSI Computation ===
    def compute_rsi(series, period=14):
        delta = series.diff()
        gain = delta.clip(lower=0)
        loss = -delta.clip(upper=0)
        avg_gain = gain.rolling(period).mean()
        avg_loss = loss.rolling(period).mean()
        rs = avg_gain / avg_loss
        return 100 - (100 / (1 + rs))
    
    df["RSI"] = compute_rsi(df["Close"])
    df = df.dropna()

    # === Feature Scaling ===
    features = ["Close", "Open", "High", "Low", "Volume", "RSI"]
    scaler = MinMaxScaler()
    scaled = scaler.fit_transform(df[features])
    target_scaler = MinMaxScaler()
    target = target_scaler.fit_transform(df[["Close"]])

    # === Sequence Preparation ===
    X, y = [], []
    seq_len = 20
    for i in range(seq_len, len(scaled)):
        X.append(scaled[i - seq_len:i])
        y.append(target[i])
    X, y = np.array(X), np.array(y)

    # === Train/Test Split ===
    test_size = 20
    X_train, y_train = X[:-test_size], y[:-test_size]

    # === Model Definition ===
    model = Sequential()
    model.add(LSTM(64, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
    model.add(LSTM(64))
    model.add(Dense(1))
    model.compile(optimizer="adam", loss="mse")

    # === Model Training ===
    model.fit(X_train, y_train, epochs=100, batch_size=5, verbose=0)

    # === t+1 Closing Price Prediction ===
    last_sequence = scaled[-seq_len:]
    last_sequence = last_sequence.reshape(1, seq_len, len(features))
    pred_scaled = model.predict(last_sequence)
    predicted_price = target_scaler.inverse_transform(pred_scaled)[0][0]

    return round(predicted_price, 2)

###  Pos C. Transformer Neural Network Model

In [33]:
from tensorflow.keras import layers, Model
import tensorflow as tf

# === Transformer Model Definition ===
class TransformerModel(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, seq_length, num_transformer_blocks, dropout_rate=0.1):
        super(TransformerModel, self).__init__()
        self.embed_dim = embed_dim
        self.seq_length = seq_length
        self.num_transformer_blocks = num_transformer_blocks
        self.positional_encoding = self.positional_encoding(seq_length)
        self.transformer_blocks = [
            self.transformer_block(embed_dim, num_heads, ff_dim, dropout_rate)
            for _ in range(num_transformer_blocks)
        ]
        self.dropout_1 = layers.Dropout(dropout_rate)
        self.flatten = layers.Flatten()
        self.dense = layers.Dense(1)

    def call(self, inputs, training=True):
        word_emb = inputs
        pos_encoding_tiled = tf.tile(self.positional_encoding, [tf.shape(inputs)[0], 1, 1])
        word_emb += pos_encoding_tiled
        x = self.dropout_1(word_emb, training=training)
        for i in range(self.num_transformer_blocks):
            x = self.transformer_blocks[i](x, training=training)
        x = self.flatten(x)
        return self.dense(x)

    def transformer_block(self, embed_dim, num_heads, ff_dim, dropout_rate):
        inputs = layers.Input(shape=(None, embed_dim))
        attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim, dropout=dropout_rate)(inputs, inputs)
        attention = layers.Dropout(dropout_rate)(attention)
        attention = layers.LayerNormalization(epsilon=1e-6)(inputs + attention)
        outputs = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(attention)
        outputs = layers.Dropout(dropout_rate)(outputs)
        outputs = layers.Conv1D(filters=embed_dim, kernel_size=1)(outputs)
        outputs = layers.Dropout(dropout_rate)(outputs)
        outputs = layers.LayerNormalization(epsilon=1e-6)(attention + outputs)
        return Model(inputs=inputs, outputs=outputs)

    def positional_encoding(self, seq_length):
        pos = tf.cast(tf.range(seq_length)[:, tf.newaxis], dtype=tf.float32)
        i = tf.cast(tf.range(self.embed_dim)[tf.newaxis, :], dtype=tf.float32)
        angle_rads = pos / tf.pow(10000, 2 * (i // 2) / tf.cast(self.embed_dim, tf.float32))
        angle_rads = tf.where(tf.math.equal(i % 2, 0), tf.sin(angle_rads), tf.cos(angle_rads))
        return angle_rads[tf.newaxis, ...]

# === Modularized Transformer Prediction Function ===
def get_transformers_func(ticker="AAPL"):
    # === Data Collection (1-Year) ===
    end_date = datetime.today()
    start_date = end_date - timedelta(days=365)
    df = yf.download(ticker, start=start_date.strftime("%Y-%m-%d"), end=end_date.strftime("%Y-%m-%d"))
    stock_data = df.copy()

    # === Data Preparation ===
    closing_prices = stock_data["Close"].values.reshape(-1, 1)
    scaler = MinMaxScaler()
    scaled_prices = scaler.fit_transform(closing_prices)

    seq_length = 20
    X, y = [], []
    for i in range(len(scaled_prices) - seq_length):
        X.append(scaled_prices[i:i + seq_length])
        y.append(scaled_prices[i + seq_length])
    X, y = np.array(X), np.array(y)

    # === Model Hyperparameters ===
    embed_dim = 32
    num_heads = 2
    ff_dim = 32
    num_transformer_blocks = 2
    dropout_rate = 0.1
    learning_rate = 0.001
    batch_size = 64
    epochs = 100

    # === Model Initialisation ===
    model = TransformerModel(embed_dim, num_heads, ff_dim, seq_length, num_transformer_blocks, dropout_rate)
    loss_fn = tf.keras.losses.MeanSquaredError()
    optimizer = tf.keras.optimizers.Adam(learning_rate)

    @tf.function
    def train_step(inputs, targets):
        with tf.GradientTape() as tape:
            predictions = model(inputs, training=True)
            loss = loss_fn(targets, predictions)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        return loss

    # === Model Training ===
    num_batches = len(X) // batch_size
    for epoch in range(epochs):
        for i in range(num_batches):
            start_idx, end_idx = i * batch_size, (i + 1) * batch_size
            batch_X, batch_y = X[start_idx:end_idx], y[start_idx:end_idx]
            train_step(batch_X, batch_y)

    # === t+1 Closing Price Prediction ===
    last_seq = X[-1].reshape(1, seq_length, 1)
    pred_scaled = model(last_seq)
    predicted_price = scaler.inverse_transform(pred_scaled.numpy().reshape(-1, 1))[0][0]

    return round(predicted_price, 2)

In [34]:
def build_prediction_df(tickers, model_func, n_days=30):
    final_df = pd.DataFrame(columns=tickers)

    for ticker in tickers:
        # Get 1 year of Price Data
        end_date = datetime.today()
        start_date = end_date - timedelta(days=365)
        df = yf.download(ticker, start=start_date.strftime("%Y-%m-%d"), end=end_date.strftime("%Y-%m-%d"))

        # Skip if Close Price data is missing
        if "Close" not in df.columns or df["Close"].dropna().empty:
            print(f"Skipping {ticker} due to missing 'Close' prices.")
            continue

        # Get last `n_days` actual prices
        recent_prices = df["Close"].dropna().tail(n_days).copy()

        # Ensure we have enough recent prices
        if len(recent_prices) < n_days:
            print(f"Skipping {ticker} due to insufficient data (<{n_days} rows).")
            continue

        # Make sure it's a datetime index
        recent_prices.index = pd.to_datetime(recent_prices.index)

        # Predict t+1 Closing Price
        predicted_price = model_func(ticker)
        # print(predicted_price)

        # Predict next date
        predicted_date = recent_prices.index[-1] + timedelta(days=1)
        # print(predicted_date)

        # Append to existing series
        recent_prices.loc[predicted_date] = float(predicted_price)
        # print(recent_prices)
        final_df[ticker] = recent_prices.sort_index()
        # print(final_df)

    if final_df.empty:
        raise ValueError("No valid Stock data was found for any Ticker.")

    final_df.index.name = "Date"
    return final_df 