# 📈 LSTM Stock Price Prediction Pipeline

This notebook presents an end-to-end pipeline for forecasting the next day's **end-of-day stock prices** using LSTM models. 
The data spans 5 years and includes preprocessing, technical indicator enrichment, normalization, LSTM training, prediction, evaluation, and feature importance analysis.


In [None]:


import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os, random, yaml, logging

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error

import tensorflow as tf

from data_loader import load_stock_data
from indicators import add_indicators
from preprocessing import normalize_features, df_to_windowed_df, windowed_df_to_date_X_y
from model import build_lstm_model
from evaluation import plot_predictions, compute_metrics, compute_directional_accuracy
from feature_analysis import feature_drop_analysis

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


In [None]:

# Load configuration from YAML file
with open("config/config.yaml", "r") as f:
    config = yaml.safe_load(f)

# Set seed for reproducibility
os.environ['PYTHONHASHSEED'] = str(42)
random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)


In [None]:

all_results = {}

for ticker in config["tickers"]:
    print(f"\n=== Processing {ticker} ===")
    df = load_stock_data(config["paths"]["csv_folder"], [ticker])[0]
    df = add_indicators(df)
    df["Target"] = df["Close"].shift(-1)
    df.dropna(inplace=True)
    feature_cols = [c for c in df.columns if c not in ["Close", "Target"]]
    df_scaled, scaler_X, scaler_y = normalize_features(df.drop(columns=["Close"]), target_col="Target")
    window_size = config["lstm"]["window_size"]
    windowed_df = df_to_windowed_df(df_scaled, window_size, target_col="Target")
    dates, X, y = windowed_df_to_date_X_y(windowed_df, window_size)
    q_90 = int(len(dates) * 0.90)
    q_96 = int(len(dates) * 0.96)
    X_train, X_val, X_test = X[:q_90], X[q_90:q_96], X[q_96:]
    y_train, y_val, y_test = y[:q_90], y[q_90:q_96], y[q_96:]
    dates_train, dates_val, dates_test = dates[:q_90], dates[q_90:q_96], dates[q_96:]
    params = config["lstm"]
    model = build_lstm_model(window_size, X.shape[2], params["learning_rate"])
    history = model.fit(
        X_train, y_train,
        epochs=params["epochs"],
        batch_size=params["batch_size"],
        validation_data=(X_val, y_val),
        verbose=1
    )
    y_train_pred = scaler_y.inverse_transform(model.predict(X_train)).flatten()
    y_val_pred = scaler_y.inverse_transform(model.predict(X_val)).flatten()
    y_test_pred = scaler_y.inverse_transform(model.predict(X_test)).flatten()

    df_close_full = df["Close"].copy()
    df_close_full.index = pd.to_datetime(df_close_full.index).strftime('%Y-%m-%d')
    def reconstruct(dates_subset, preds, true_returns):
        base = df_close_full.reindex(dates_subset).fillna(method='ffill').values
        return base + preds, base + true_returns
    train_pred, y_train_orig = reconstruct(dates_train, y_train_pred, scaler_y.inverse_transform(y_train.reshape(-1, 1)).flatten())
    val_pred, y_val_orig = reconstruct(dates_val, y_val_pred, scaler_y.inverse_transform(y_val.reshape(-1, 1)).flatten())
    test_pred, y_test_orig = reconstruct(dates_test, y_test_pred, scaler_y.inverse_transform(y_test.reshape(-1, 1)).flatten())

    # Save predictions
    pred_df = pd.DataFrame({
        "Date": dates_test,
        "Actual": y_test_orig,
        "Predicted": test_pred
    })
    pred_df.to_csv(f"output/predictions_{ticker}.csv", index=False)

    # Plot and save
    plt.figure(figsize=(14,6))
    plt.plot(dates_test, y_test_orig, label='Actual Price')
    plt.plot(dates_test, test_pred, label='Predicted Price')
    plt.xticks(rotation=45)
    plt.title(f"LSTM Test Set: Actual vs Predicted Prices ({ticker})")
    plt.xlabel("Date")
    plt.ylabel("Close Price")
    plt.legend()
    plt.tight_layout()
    plt.savefig(f"output/plot_{ticker}.png")
    plt.close()

    rmse, mae = compute_metrics(y_test_orig, test_pred)
    dir_acc = compute_directional_accuracy(y_test_orig, test_pred)
    print(f"{ticker} Test RMSE: {rmse:.4f}, MAE: {mae:.4f}, Directional Accuracy: {dir_acc:.4f}")

    # Feature drop analysis and save
    results_df = feature_drop_analysis(
        df, feature_cols, window_size,
        X, y, scaler_y,
        dates_test, y_test_orig,
        rmse, mae,
        q_90, q_96,
        params["learning_rate"], params["epochs"], params["batch_size"]
    )
    results_df.to_csv(f"output/feature_drop_{ticker}.csv", index=False)

    # Save summary
    all_results[ticker] = {
        "rmse": rmse,
        "mae": mae,
        "dir_acc": dir_acc,
        "results_df": results_df
    }

# Optionally, save all_results summary
summary_df = pd.DataFrame([
    {"Ticker": t, "RMSE": v["rmse"], "MAE": v["mae"], "Directional Accuracy": v["dir_acc"]}
    for t, v in all_results.items()
])
summary_df.to_csv("output/summary_metrics.csv", index=False)
display(summary_df)


In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
from evaluation import compute_metrics, compute_directional_accuracy
from data_loader import load_stock_data
from indicators import add_indicators
from preprocessing import normalize_features, df_to_windowed_df, windowed_df_to_date_X_y
from model import build_lstm_model
from feature_analysis import feature_drop_analysis
import yaml

# Ensure output directory exists
os.makedirs("output", exist_ok=True)

# Load config
with open("config/config.yaml", "r") as f:
    config = yaml.safe_load(f)

all_results = {}

for ticker in config["tickers"]:
    print(f"\n=== Processing {ticker} ===")
    df = load_stock_data(config["paths"]["csv_folder"], [ticker])[0]
    df = add_indicators(df)
    df["Target"] = df["Close"].shift(-1)
    df.dropna(inplace=True)

    feature_cols = [c for c in df.columns if c not in ["Close", "Target"]]
    df_scaled, scaler_X, scaler_y = normalize_features(df.drop(columns=["Close"]), target_col="Target")

    window_size = config["lstm"]["window_size"]
    windowed_df = df_to_windowed_df(df_scaled, window_size, target_col="Target")
    dates, X, y = windowed_df_to_date_X_y(windowed_df, window_size)

    q_90 = int(len(dates) * 0.90)
    q_96 = int(len(dates) * 0.96)

    X_train, X_val, X_test = X[:q_90], X[q_90:q_96], X[q_96:]
    y_train, y_val, y_test = y[:q_90], y[q_90:q_96], y[q_96:]
    dates_train, dates_val, dates_test = dates[:q_90], dates[q_90:q_96], dates[q_96:]

    params = config["lstm"]
    model = build_lstm_model(window_size, X.shape[2], params["learning_rate"])
    model.fit(
        X_train, y_train,
        epochs=params["epochs"],
        batch_size=params["batch_size"],
        validation_data=(X_val, y_val),
        verbose=1
    )

    y_train_pred = scaler_y.inverse_transform(model.predict(X_train)).flatten()
    y_val_pred = scaler_y.inverse_transform(model.predict(X_val)).flatten()
    y_test_pred = scaler_y.inverse_transform(model.predict(X_test)).flatten()

    df_close_full = df["Close"].copy()
    df_close_full.index = pd.to_datetime(df_close_full.index).strftime('%Y-%m-%d')
    
    def reconstruct(dates_subset, preds, true_returns):
        base = df_close_full.reindex(dates_subset).ffill().values
        return base + preds, base + true_returns

    train_pred, y_train_orig = reconstruct(dates_train, y_train_pred, scaler_y.inverse_transform(y_train.reshape(-1, 1)).flatten())
    val_pred, y_val_orig = reconstruct(dates_val, y_val_pred, scaler_y.inverse_transform(y_val.reshape(-1, 1)).flatten())
    test_pred, y_test_orig = reconstruct(dates_test, y_test_pred, scaler_y.inverse_transform(y_test.reshape(-1, 1)).flatten())

    pred_df = pd.DataFrame({
        "Date": dates_test,
        "Actual": y_test_orig,
        "Predicted": test_pred
    })
    pred_df.to_csv(f"output/predictions_{ticker}.csv", index=False)

    plt.figure(figsize=(14, 6))
    plt.plot(dates_test, y_test_orig, label='Actual Price')
    plt.plot(dates_test, test_pred, label='Predicted Price')
    plt.xticks(rotation=45)
    plt.title(f"LSTM Test Set: Actual vs Predicted Prices ({ticker})")
    plt.xlabel("Date")
    plt.ylabel("Close Price")
    plt.legend()
    plt.tight_layout()
    plt.savefig(f"output/plot_{ticker}.png")
    plt.close()

    rmse, mae = compute_metrics(y_test_orig, test_pred)
    dir_acc = compute_directional_accuracy(y_test_orig, test_pred)
    print(f"{ticker} Test RMSE: {rmse:.4f}, MAE: {mae:.4f}, Directional Accuracy: {dir_acc:.4f}")

    results_df = feature_drop_analysis(
        df, feature_cols, window_size,
        X, y, scaler_y,
        dates_test, y_test_orig,
        rmse, mae,
        q_90, q_96,
        params["learning_rate"], params["epochs"], params["batch_size"]
    )
    results_df.to_csv(f"output/feature_drop_{ticker}.csv", index=False)

    all_results[ticker] = {
        "rmse": rmse,
        "mae": mae,
        "dir_acc": dir_acc
    }

# Save summary CSV
summary_df = pd.DataFrame([
    {"Ticker": t, "RMSE": v["rmse"], "MAE": v["mae"], "Directional Accuracy": v["dir_acc"]}
    for t, v in all_results.items()
])
summary_df.to_csv("output/summary_metrics.csv", index=False)
print("\n✅ Summary saved to output/summary_metrics.csv")
display(summary_df)


In [None]:

# Example: Plot for the first ticker in config
if config["tickers"]:
    ticker = config["tickers"][0]
    results = all_results[ticker]
    results_df = results["results_df"]
    # You may want to reload predictions and test set for this ticker if needed
    # For demonstration, we plot the saved predictions
    pred_df = pd.read_csv(f"output/predictions_{ticker}.csv")
    dates_test = pred_df["Date"]
    y_test_orig = pred_df["Actual"]
    test_pred = pred_df["Predicted"]

    sns.set(style="whitegrid")
    plt.figure(figsize=(14,6))
    sns.lineplot(x=dates_test, y=y_test_orig, label='Actual Price')
    sns.lineplot(x=dates_test, y=test_pred, label='Predicted Price')
    plt.xticks(rotation=45)
    plt.title(f"🔍 LSTM Test Set: Actual vs Predicted Prices ({ticker})")
    plt.xlabel("Date")
    plt.ylabel("Close Price")
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:

import pandas as pd

def style_feature_table(df):
    return df.style         .background_gradient(cmap='Reds', subset=['RMSE Increase', 'MAE Increase'])         .format({
            'RMSE': '{:.4f}', 'MAE': '{:.4f}',
            'RMSE Increase': '{:+.4f}', 'MAE Increase': '{:+.4f}'
        })         .highlight_min(subset=['RMSE', 'MAE'], color='lightgreen')         .highlight_max(subset=['RMSE Increase', 'MAE Increase'], color='salmon')

# Example: Style feature table for the first ticker
if config["tickers"]:
    ticker = config["tickers"][0]
    results_df = all_results[ticker]["results_df"]
    styled_results_df = style_feature_table(results_df)
    styled_results_df


In [None]:

if config["tickers"]:
    ticker = config["tickers"][0]
    results_df = all_results[ticker]["results_df"]

    summary_stats = {
        "Metric": ["RMSE", "MAE", "RMSE Increase", "MAE Increase"],
        "Mean": [
            results_df["RMSE"].mean(),
            results_df["MAE"].mean(),
            results_df["RMSE Increase"].mean(),
            results_df["MAE Increase"].mean()
        ],
        "Best (Min)": [
            results_df["RMSE"].min(),
            results_df["MAE"].min(),
            results_df["RMSE Increase"].min(),
            results_df["MAE Increase"].min()
        ],
        "Worst (Max)": [
            results_df["RMSE"].max(),
            results_df["MAE"].max(),
            results_df["RMSE Increase"].max(),
            results_df["MAE Increase"].max()
        ]
    }

    summary_df = pd.DataFrame(summary_stats)

    # Fix: Only format numeric columns
    styled_summary = summary_df.style.format({
        "Mean": "{:.4f}",
        "Best (Min)": "{:.4f}",
        "Worst (Max)": "{:.4f}"
    }).set_caption("📊 Feature Drop Summary Statistics")

    styled_summary


In [None]:

# Add this cell after your metrics calculation and before saving results

def compute_mape(y_true, y_pred):
    """
    Mean Absolute Percentage Error
    """
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    # Avoid division by zero
    mask = y_true != 0
    return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100

# Example usage inside your ticker loop:
rmse, mae = compute_metrics(y_test_orig, test_pred)
dir_acc = compute_directional_accuracy(y_test_orig, test_pred)
mape = compute_mape(y_test_orig, test_pred)
print(f"{ticker} Test RMSE: {rmse:.4f}, MAE: {mae:.4f}, MAPE: {mape:.2f}%, Directional Accuracy: {dir_acc:.4f}")

# Save MAPE in all_results and summary
all_results[ticker] = {
    "rmse": rmse,
    "mae": mae,
    "mape": mape,
    "dir_acc": dir_acc,
    "results_df": results_df
}

# When saving summary:
summary_df = pd.DataFrame([
    {"Ticker": t, "RMSE": v["rmse"], "MAE": v["mae"], "MAPE (%)": v["mape"], "Directional Accuracy": v["dir_acc"]}
    for t, v in all_results.items()
])
summary_df.to_csv("output/summary_metrics.csv", index=False)
import ipywidgets as widgets
from IPython.display import display

# Example: Use the first ticker's predictions for simulation
if config["tickers"]:
    ticker = config["tickers"][0]
    pred_df = pd.read_csv(f"output/predictions_{ticker}.csv")
    dates_test = pred_df["Date"].tolist()
    y_test_orig = pred_df["Actual"].values
    test_pred = pred_df["Predicted"].values

    def run_simulation(threshold_pct):
        balance = 10000
        margin = 0.01
        position = 0
        entry_price = 0
        equity_curve = []
        trades = []

        for i in range(len(y_test_orig) - 1):
            actual_today = y_test_orig[i]
            predicted_tomorrow = test_pred[i + 1]
            predicted_change = (predicted_tomorrow - actual_today) / actual_today

            action = "HOLD"
            profit = 0

            if predicted_change > threshold_pct and position == 0:
                position = 1
                entry_price = actual_today
                action = "BUY"
            elif predicted_change < -threshold_pct and position == 0:
                position = -1
                entry_price = actual_today
                action = "SELL (SHORT)"
            elif predicted_change < -threshold_pct and position == 1:
                position = 0
                profit = (actual_today - entry_price) * (1 / margin)
                balance += profit
                action = "SELL"
            elif predicted_change > threshold_pct and position == -1:
                position = 0
                profit = (entry_price - actual_today) * (1 / margin)
                balance += profit
                action = "BUY (COVER)"

            equity = balance
            if position == 1:
                equity += (actual_today - entry_price) * (1 / margin)
            elif position == -1:
                equity += (entry_price - actual_today) * (1 / margin)

            equity_curve.append(equity)

            trades.append({
                "Day": dates_test[i],
                "Price": actual_today,
                "Predicted": predicted_tomorrow,
                "Predicted Change %": predicted_change,
                "Action": action,
                "Balance": balance,
                "Equity": equity,
                "Profit": profit if "SELL" in action or "COVER" in action else None
            })

        import matplotlib.pyplot as plt
        import pandas as pd

        df_trades = pd.DataFrame(trades)
        plt.figure(figsize=(14,6))
        plt.plot(dates_test[:-1], equity_curve, label="Equity Curve")
        buy = df_trades[df_trades["Action"].str.contains("BUY")]
        sell = df_trades[df_trades["Action"].str.contains("SELL")]
        plt.scatter(buy["Day"], buy["Price"], color="green", marker="^", label="BUY")
        plt.scatter(sell["Day"], sell["Price"], color="red", marker="v", label="SELL")
        plt.title(f"💹 PnL Simulation (Threshold = {threshold_pct:.4f})")
        plt.xlabel("Date")
        plt.ylabel("Equity / Price")
        plt.xticks(rotation=45)
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()
        print(f"📊 Final Balance: {balance:.2f}")
        print(f"📝 Total Buys: {len(buy)}, Total Sells: {len(sell)}")
        display(df_trades.tail(10))

    threshold_slider = widgets.FloatSlider(value=0.005, min=0.0005, max=0.02, step=0.0005, description='Threshold')
    ui = widgets.VBox([threshold_slider])
    out = widgets.interactive_output(run_simulation, {'threshold_pct': threshold_slider})
    display(ui, out)

In [None]:
# --- Evaluate each combination ---
# This section evaluates all possible combinations of technical indicators for LSTM model performance.
# For each combination:
# 1. Select the subset of features (indicators) to include.
# 2. Normalize the data and prepare windowed datasets for LSTM input.
# 3. Split the data into training, validation, and test sets.
# 4. Train the LSTM model using the selected features.
# 5. Predict the test set and calculate performance metrics (RMSE, MAE, Directional Accuracy).
# 6. Simulate PnL (Profit and Loss) using the predictions.
# 7. Log and store the results for each combination.
# Finally, the results are saved to a CSV file for further analysis.

In [None]:
import os
import yaml
import itertools
import pandas as pd
import numpy as np
import tensorflow as tf
import logging
from datetime import datetime

from data_loader import load_stock_data
from indicators import add_indicators
from preprocessing import normalize_features, df_to_windowed_df, windowed_df_to_date_X_y
from model import build_lstm_model
from evaluation import compute_metrics, compute_directional_accuracy, simulate_pnl

# --- Setup ---
logging.basicConfig(level=logging.INFO)

# Load config
with open("config/config.yaml", "r") as f:
    config = yaml.safe_load(f)

# Reproducibility
os.environ['PYTHONHASHSEED'] = str(42)
np.random.seed(42)
tf.random.set_seed(42)

# Load Data
ticker = config['tickers'][0]
df = load_stock_data(config['paths']['csv_folder'], [ticker])[0]
df = add_indicators(df)

# Add SMA manually
df["SMA"] = df["Close"].rolling(window=14).mean()

# Set Target as Close Price (T+1)
df["Target"] = df["Close"].shift(-1)
df.dropna(inplace=True)
df_close_full = df["Close"].copy()
df_close_full.index = pd.to_datetime(df_close_full.index).strftime('%Y-%m-%d')

# Indicator sets
all_indicators = ["RSI", "BB_upper", "BB_lower", "Momentum", "MACD", "SMA"]
all_combinations = []
for r in range(1, len(all_indicators)+1):
    all_combinations.extend(itertools.combinations(all_indicators, r))

# --- Evaluate each combination ---
results = []
window_size = config['lstm']['window_size']
params = config['lstm']

for idx, combo in enumerate(all_combinations):
    try:
        selected_features = list(combo)
        df_subset = df[["Target"] + selected_features].copy()

        df_scaled, _, scaler_y = normalize_features(df_subset, target_col="Target")
        windowed_df = df_to_windowed_df(df_scaled, window_size, target_col="Target")
        dates, X, y = windowed_df_to_date_X_y(windowed_df, window_size)

        q_90 = int(len(dates) * 0.90)
        q_96 = int(len(dates) * 0.96)
        X_train, X_val, X_test = X[:q_90], X[q_90:q_96], X[q_96:]
        y_train, y_val, y_test = y[:q_90], y[q_90:q_96], y[q_96:]
        dates_test = dates[q_96:]

        model = build_lstm_model(window_size, X.shape[2], params['learning_rate'])
        model.fit(X_train, y_train, epochs=params['epochs'], batch_size=params['batch_size'], verbose=0)

        y_test_pred = scaler_y.inverse_transform(model.predict(X_test)).flatten()
        y_test_orig = scaler_y.inverse_transform(y_test.reshape(-1,1)).flatten()

        rmse, mae = compute_metrics(y_test_orig, y_test_pred)
        dir_acc = compute_directional_accuracy(y_test_orig, y_test_pred)

        pnl_result, _ = simulate_pnl(y_test_orig, y_test_pred, initial_cash=80000, ticker=ticker, output_folder="output")

        results.append({
            "Index": idx,
            "Features": ', '.join(selected_features),
            "RMSE": round(rmse, 4),
            "MAE": round(mae, 4),
            "Directional Accuracy": round(dir_acc, 4),
            "PnL Ratio": pnl_result.get("P&L Ratio", 0)
        })

        logging.info(f"✅ Combo {idx+1}/{len(all_combinations)} done: {selected_features}")

    except Exception as e:
        logging.warning(f"❌ Failed on combo {combo}: {e}")

# Save results
results_df = pd.DataFrame(results)
results_df.sort_values(by="RMSE", inplace=True)
results_df.to_csv("output/lstm_combo_results.csv", index=False)
print(results_df.head(10))


In [None]:
import tensorflow as tf
print(tf.__version__)
