In [1]:
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_curve
import seaborn as sns
from sklearn.model_selection import ParameterGrid
import torch
import torch.nn as nn
import torch.optim as optim
from pandas_datareader import data as pdr
from scipy.special import expit


### Goal:

The goal of this project is to predict the next-day direction (up/down) of the S&P 500 index using both:

+ A manually implemented feedforward neural network (FNN)

+ A PyTorch-based Long Short-Term Memory (LSTM) model

### Data:

+ Market Index: S&P 500 (^GSPC) via Yahoo Finance.

+ Volatility: CBOE Volatility Index (^VIX)

+ Macroeconomic Indicators from FRED:

    + Consumer Price Index (CPI)

    + Industrial Production (INDPRO)

    + 10-Year Treasury Yield (GS10)

    + University of Michigan Consumer Sentiment (UMCSENT)

In [None]:
# 1. Download and prepare data
#symbols = ["^GSPC", "^DJI", "^DJU", "^IXIC", "^RUT"]
symbols = ["^GSPC"]
data = yf.download(symbols, start="2000-01-01", end="2024-01-01",auto_adjust=True)["Close"]
data = data.dropna()

[*********************100%***********************]  5 of 5 completed


### Feature Engineering:

+ For each index:

    + Returns: 1-day, 10-days, 30-days,

    + Moving Averages: 10-day, 30-day

    + Volatility: 10-day  and 30-days rolling std dev

    + Momentum: price differences over 10 and 30 days

+ Technical Indicators:

    + RSI (Relative Strength Index)

    + MACD (Moving Average Convergence Divergence)

    + Bollinger Bands + Bollinger percentage

+ Macroeconomic & Volatility Features:

    + Log returns of CPI, Industrial Production, etc.

    + Daily % change in VIX

The final feature matrix includes over 50 features per day.

In [None]:
# 2. Feature Engineering
features = pd.DataFrame(index=data.index)
for col in data.columns:
    features[f'{col}_return_1d'] = data[col].pct_change()
    # features[f'{col}_return_5d'] = data[col].pct_change(5)
    features[f'{col}_return_10d'] = data[col].pct_change(10)
    features[f'{col}_return_30d'] = data[col].pct_change(30)
    #features[f'{col}_ma_5'] = data[col].rolling(5).mean()
    features[f'{col}_ma_10'] = data[col].rolling(10).mean()
    features[f'{col}_ma_30'] = data[col].rolling(30).mean()
    #features[f'{col}_vol_5'] = data[col].rolling(5).std()
    features[f'{col}_vol_10'] = data[col].rolling(10).std()
    features[f'{col}_vol_30'] = data[col].rolling(30).std()
    #features[f'{col}_momentum_5d'] = data[col] - data[col].shift(5)
    features[f'{col}_momentum_10d'] = data[col] - data[col].shift(10)
    features[f'{col}_momentum_30d'] = data[col] - data[col].shift(30)

# Technical Indicators 
sp500 = data["^GSPC"]
# RSI
delta = sp500.diff()
gain = delta.clip(lower=0).rolling(14).mean()
loss = -delta.clip(upper=0).rolling(14).mean()
rs = gain / loss
features['rsi'] = 100 - (100 / (1 + rs))
# MACD
ema12 = sp500.ewm(span=12, adjust=False).mean()
ema26 = sp500.ewm(span=26, adjust=False).mean()
features['macd'] = ema12 - ema26
# Bollinger Bands
rolling_mean = sp500.rolling(20).mean()
rolling_std = sp500.rolling(20).std()
features['boll_upper'] = rolling_mean + 2 * rolling_std
features['boll_lower'] = rolling_mean - 2 * rolling_std
features['boll_pct'] = (sp500 - features['boll_lower']) / (features['boll_upper'] - features['boll_lower'])

fred_symbols = {'cpi': 'CPIAUCSL', 'ind_prod': 'INDPRO', '10yr_treasury': 'GS10','consumer_sentiment': 'UMCSENT'}
for name, code in fred_symbols.items():
    fred_data = pdr.DataReader(code, 'fred', start="2000-01-01", end="2024-01-01")
    fred_data = np.log(fred_data).diff()  # log returns
    fred_data = fred_data.rename(columns={code: f'{name}_chg'})
    features = features.join(fred_data, how='left')

# Forward-fill to match daily frequency
features[[f'{k}_chg' for k in fred_symbols]] = features[[f'{k}_chg' for k in fred_symbols]].fillna(method='ffill')

# Add VIX (market volatility)
vix = yf.download("^VIX", start="2000-01-01", end="2024-01-01",auto_adjust=True)["Close"].pct_change()
vix.name = "vix_return"
features = features.join(vix, how='left')

  features[[f'{k}_chg' for k in fred_symbols]] = features[[f'{k}_chg' for k in fred_symbols]].fillna(method='ffill')
[*********************100%***********************]  1 of 1 completed


### Target Variable
+ Binary classification:

    + 1: Next-day return of S&P 500 > 0

    + 0: Otherwise

### Preprocessing
+ NaN handling: Forward fill for macro data; full row drop where necessary

+ Standardization: All features scaled using StandardScaler

In [4]:
# 3. Target variable
# Drop rows with NaNs before creating the target
features = features.dropna()

# Create target AFTER all features are complete
threshold = 0.01
future_return = data["^GSPC"].pct_change(5).shift(-5)
future_return = future_return.reindex(features.index)  # align with features

# Define target only where both future return and features are available
features['target'] = np.where(future_return > threshold, 1,
                       np.where(future_return < -threshold, 0, np.nan))
features = features.dropna(subset=['target'])  # drop unlabeled rows

# 4. Prepare data
assert not features.drop(columns='target').isnull().any().any(), "There are still NaNs in the features"
X = features.drop(columns='target').values
y = features['target'].values
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# 5. Walk-forward evaluation
split_size = int(len(X_scaled) * 0.8)
X_train, X_test = X_scaled[:split_size], X_scaled[split_size:]
y_train, y_test = y[:split_size], y[split_size:]

### Models:
+ Manual Feedforward Neural Network (FNN)
    
    + Implemented from scratch

    + Architecture: 10 hidden layers with [4096, 2048, 1024, 512, 256, 128, 64, 32, 16, 8] neurons

    + Activation: ReLU + Sigmoid (output)

    + Regularization: Dropout (0.2), L2 regularization

    + Optimizer: Adam (manual implementation)

    + Early Stopping based on validation loss

+ PyTorch LSTM Model

    + LSTM layers with 256 hidden units and 4 layers

    + Dense + Sigmoid output layer

    + Dropout (0.3) between layers

    + Class imbalance handled using pos_weight in BCEWithLogitsLoss

    + Optimized with Adam

    + Trained using mini-batch time series windows ( 30-day lookback)

    + Includes early stopping

In [5]:
# 6. Manual Neural Network (unchanged from previous step)
class ManualNeuralNet:
    def __init__(self, input_dim, layer_sizes, output_dim=1, dropout_rate=0.2, lambda_reg=0.01):
        self.weights = []
        self.biases = []
        self.activations = []
        self.dropout_rate = dropout_rate
        self.lambda_reg = lambda_reg
        self.t = 0
        self.m, self.v = {}, {}

        layer_dims = [input_dim] + layer_sizes + [output_dim]
        for i in range(len(layer_dims) - 1):
            W = np.random.randn(layer_dims[i], layer_dims[i+1]) * 0.01
            b = np.zeros((1, layer_dims[i+1]))
            self.weights.append(W)
            self.biases.append(b)
            self.m[f"W{i}"] = np.zeros_like(W)
            self.m[f"b{i}"] = np.zeros_like(b)
            self.v[f"W{i}"] = np.zeros_like(W)
            self.v[f"b{i}"] = np.zeros_like(b)

    def sigmoid(self, z):
        #return 1 / (1 + np.exp(-z))
        return expit(z) 
    
    def relu(self, z):
        return np.maximum(0, z)

    def relu_derivative(self, z):
        return (z > 0).astype(float)

    def forward(self, X, training=True):
        self.Zs, self.As = [], [X]
        for i in range(len(self.weights) - 1):
            Z = self.As[-1] @ self.weights[i] + self.biases[i]
            A = self.relu(Z)
            if training:
                A *= np.random.binomial(1, 1 - self.dropout_rate, size=A.shape)
            self.Zs.append(Z)
            self.As.append(A)
        Z_final = self.As[-1] @ self.weights[-1] + self.biases[-1]
        A_final = self.sigmoid(Z_final)
        self.Zs.append(Z_final)
        self.As.append(A_final)
        return A_final

    def compute_loss(self, Y_hat, Y):
        m = Y.shape[0]
        reg = self.lambda_reg * sum(np.sum(np.square(w)) for w in self.weights) / (2 * m)
        return -np.sum(Y * np.log(Y_hat + 1e-8) + (1 - Y) * np.log(1 - Y_hat + 1e-8)) / m + reg

    def backward(self, X, Y, lr=0.01, beta1=0.9, beta2=0.999, epsilon=1e-8):
        m = Y.shape[0]
        self.t += 1
        grads_W, grads_b = [], []

        dZ = self.As[-1] - Y.reshape(-1, 1)
        for i in reversed(range(len(self.weights))):
            dW = self.As[i].T @ dZ / m + self.lambda_reg * self.weights[i] / m
            db = np.sum(dZ, axis=0, keepdims=True) / m
            grads_W.insert(0, dW)
            grads_b.insert(0, db)

            if i > 0:
                dA = dZ @ self.weights[i].T
                dZ = dA * self.relu_derivative(self.Zs[i-1])

        for i in range(len(self.weights)):
            self.m[f"W{i}"] = beta1 * self.m[f"W{i}"] + (1 - beta1) * grads_W[i]
            self.v[f"W{i}"] = beta2 * self.v[f"W{i}"] + (1 - beta2) * (grads_W[i] ** 2)
            self.m[f"b{i}"] = beta1 * self.m[f"b{i}"] + (1 - beta1) * grads_b[i]
            self.v[f"b{i}"] = beta2 * self.v[f"b{i}"] + (1 - beta2) * (grads_b[i] ** 2)
            m_hat_W = self.m[f"W{i}"] / (1 - beta1 ** self.t)
            v_hat_W = self.v[f"W{i}"] / (1 - beta2 ** self.t)
            m_hat_b = self.m[f"b{i}"] / (1 - beta1 ** self.t)
            v_hat_b = self.v[f"b{i}"] / (1 - beta2 ** self.t)
            self.weights[i] -= lr * m_hat_W / (np.sqrt(v_hat_W) + epsilon)
            self.biases[i] -= lr * m_hat_b / (np.sqrt(v_hat_b) + epsilon)

    def train(self, X, Y, epochs=200, lr=0.001, early_stopping_rounds=5    ):
        self.losses = []
        best_loss = float('inf')
        rounds_no_improve = 0
        for epoch in range(epochs):
            Y_hat = self.forward(X)
            loss = self.compute_loss(Y_hat, Y)
            self.losses.append(loss)
            self.backward(X, Y, lr=lr)
            if loss < best_loss:
                best_loss = loss
                rounds_no_improve = 0
            else:
                rounds_no_improve += 1
            if epoch % 10 == 0:
                print(f"Epoch {epoch}, Loss: {loss:.4f}")
            if rounds_no_improve >= early_stopping_rounds:
                print(f"Early stopping at epoch {epoch}")
                break

    def predict(self, X, threshold=0.5):
        return (self.forward(X, training=False) > threshold).astype(int)
    

In [6]:
# Train manual network
manual_nn = ManualNeuralNet(input_dim=X_train.shape[1], layer_sizes=[4096, 2048, 1024, 512, 256, 128, 64, 32, 16, 8], output_dim=1)
manual_nn.train(X_train, y_train, epochs=200, lr=0.001)

# ROC and evaluation for manual net
y_probs = manual_nn.forward(X_test, training=False).flatten()
fpr, tpr, thresholds = roc_curve(y_test, y_probs)
optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]
y_pred = (y_probs > optimal_threshold).astype(int)
accuracy = np.mean(y_pred == y_test)
print("Manual NN Test Accuracy:", round(accuracy * 100, 2), "%")

Epoch 0, Loss: 1974.0851
Epoch 10, Loss: 1972.2591
Epoch 20, Loss: 1970.4058
Epoch 30, Loss: 1968.4792
Epoch 40, Loss: 1966.3775
Epoch 50, Loss: 1963.8743
Epoch 60, Loss: 1960.1602
Epoch 70, Loss: 1955.3205
Early stopping at epoch 73
Manual NN Test Accuracy: 62.27 %


In [None]:
# 7. PyTorch LSTM
class LSTMNet(nn.Module):
    def __init__(self, input_dim, hidden_dim=256, num_layers=4, dropout=0.3, bidirectional=True):
        super(LSTMNet, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout, bidirectional=bidirectional)
        direction_factor = 2 if bidirectional else 1
        self.fc = nn.Linear(hidden_dim * direction_factor, 1)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = out[:, -1, :]
        return self.fc(out)
    
# Prepare LSTM sequences
time_steps = 30
X_lstm, y_lstm = [], []
for i in range(time_steps, len(X_scaled)):
    X_lstm.append(X_scaled[i - time_steps:i])
    y_lstm.append(y[i])
X_lstm = np.array(X_lstm)
y_lstm = np.array(y_lstm)

split_idx = int(len(X_lstm) * 0.8)
X_lstm_train, X_lstm_test = X_lstm[:split_idx], X_lstm[split_idx:]
y_lstm_train, y_lstm_test = y_lstm[:split_idx], y_lstm[split_idx:]

X_lstm_train_t = torch.tensor(X_lstm_train, dtype=torch.float32)
y_lstm_train_t = torch.tensor(y_lstm_train.reshape(-1, 1), dtype=torch.float32)
X_lstm_test_t = torch.tensor(X_lstm_test, dtype=torch.float32)
y_lstm_test_t = torch.tensor(y_lstm_test.reshape(-1, 1), dtype=torch.float32)

# Compute class weights
pos_weight_value = (y_lstm_train == 0).sum() / (y_lstm_train == 1).sum()
pos_weight = torch.tensor([pos_weight_value], dtype=torch.float32)

# Hyperparameter grid
#hyperparams_grid = {'hidden_dim': [128, 256], 'num_layers': [2,4], 'dropout': [0.2,0.3], 'lr': [0.0005, 0.001]}
hyperparams_grid = {'hidden_dim': [256], 'num_layers': [4], 'dropout': [0.3], 'lr': [0.001]}

best_acc = 0
best_model = None
best_params = None

for hp in ParameterGrid(hyperparams_grid):
    model = LSTMNet(
        input_dim=X_lstm.shape[2],
        hidden_dim=hp['hidden_dim'],
        num_layers=hp['num_layers'],
        dropout=hp['dropout'],
        bidirectional=True
    )
    criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
    optimizer = optim.Adam(model.parameters(), lr=hp['lr'])

    best_loss = float('inf')
    no_improve_count = 0

    for epoch in range(50):
        model.train()
        output = model(X_lstm_train_t)
        loss = criterion(output, y_lstm_train_t)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if loss.item() < best_loss:
            best_loss = loss.item()
            no_improve_count = 0
        else:
            no_improve_count += 1

        if no_improve_count >= 5:
            break

    model.eval()
    y_logits = model(X_lstm_test_t).detach().numpy().flatten()
    y_probs = 1 / (1 + np.exp(-y_logits))
    y_preds = (y_probs > 0.5).astype(int)
    acc = np.mean(y_preds == y_lstm_test)

    if acc > best_acc:
        best_acc = acc
        best_model = model
        best_params = hp

print("Best LSTM Accuracy:", round(best_acc * 100, 2), "%")
print("Best Params:", best_params)

best_model.eval()
y_logits = best_model(X_lstm_test_t).detach().numpy().flatten()
y_probs = 1 / (1 + np.exp(-y_logits))
y_lstm_pred = (y_probs > 0.5).astype(int)

# Classification report
print("Manual NN Report:")
print(classification_report(y_test, y_pred))
print("LSTM Report:")
print(classification_report(y_lstm_test, y_lstm_pred))
#



Best LSTM Accuracy: 54.46 %
Best Params: {'dropout': 0.3, 'hidden_dim': 256, 'lr': 0.001, 'num_layers': 4}
Manual NN Report:
              precision    recall  f1-score   support

         0.0       0.54      0.08      0.14       272
         1.0       0.63      0.96      0.76       441

    accuracy                           0.62       713
   macro avg       0.58      0.52      0.45       713
weighted avg       0.59      0.62      0.52       713

LSTM Report:
              precision    recall  f1-score   support

         0.0       0.43      0.56      0.48       269
         1.0       0.66      0.53      0.59       438

    accuracy                           0.54       707
   macro avg       0.55      0.55      0.54       707
weighted avg       0.57      0.54      0.55       707



+ The manual NN overfits to the dominant class (1) due to class imbalance.

+ The LSTM shows more balanced precision and recall but underperforms in overall accuracy.

+ Threshold tuning, feature selection, or switching to a multi-class problem may offer improvement.