In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
import xgboost as xgb
import matplotlib.pyplot as plt

In [2]:
# Load and preprocess the dataset
df = pd.read_csv('SP500_with_indicators_^GSPC.csv').dropna()
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)

# Create lagged features
def create_lagged_features(data, lag=30):
    """
    Generate lagged features for time series prediction.
    """
    df_lagged = data.copy()
    for i in range(1, lag + 1):
        df_lagged[f'Lag_{i}'] = df_lagged['Adj Close'].shift(i)
    df_lagged.dropna(inplace=True)  # Drop rows with NaN values (due to lagging)
    return df_lagged

# Apply lagged features
lagged_df = create_lagged_features(df[['Adj Close']], lag=30)

# Prepare data for modeling
X = lagged_df.drop(columns=['Adj Close']).values
y = lagged_df['Adj Close'].values

# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [None]:
# Time Series Cross-Validation 
tscv = TimeSeriesSplit(n_splits=5)

# Initialize Ridge Regression (L2 Regularization)
alpha = 0.5
ridge_model = Ridge(alpha=alpha)

# Initialize XGBoost
xgb_model = xgb.XGBRegressor(objective='reg:squarederror', random_state=42)

# Define the parameter grid
param_grid = {
    'max_depth': [3, 5, 7],
    'learning_rate': [0.01, 0.1, 0.3],
    'n_estimators': [50, 100, 200],
    'subsample': [0.8, 1.0],
    'colsample_bytree': [0.8, 1.0]
}

# Pre-tune XGBoost hyperparameters using GridSearchCV on full training set
split_idx = int(len(X_scaled) * 0.8)
X_train_full, y_train_full = X_scaled[:split_idx], y[:split_idx]
xgb_model = xgb.XGBRegressor(objective='reg:squarederror', random_state=42)
grid_search = GridSearchCV(
    estimator=xgb_model,
    param_grid=param_grid,
    scoring='neg_mean_squared_error',
    cv=3,  # Use 3-fold cross-validation for hyperparameter tuning
    verbose=1,
    n_jobs=-1
)
grid_search.fit(X_train_full, y_train_full)
best_params = grid_search.best_params_
print("\nBest Parameters for XGBoost:", best_params)

# Use the best hyperparameters for XGBoost in the cross-validation loop
optimized_xgb_model = xgb.XGBRegressor(objective='reg:squarederror', random_state=42, **best_params)

# Initialize CNN
class ConvNet(nn.Module):
    def __init__(self, input_length):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv1d(1, 4, kernel_size=5, stride=1, padding=2)
        self.bn1 = nn.BatchNorm1d(4)
        self.conv2 = nn.Conv1d(4, 8, kernel_size=5, stride=1, padding=2)
        self.bn2 = nn.BatchNorm1d(8)
        self.conv3 = nn.Conv1d(8, 16, kernel_size=5, stride=1, padding=2)
        self.bn3 = nn.BatchNorm1d(16)
        self.fc = nn.Linear(input_length * 32, 1)

    def forward(self, x):
        x = torch.relu(self.bn1(self.conv1(x)))
        x = torch.relu(self.bn2(self.conv2(x)))
        x = torch.relu(self.bn3(self.conv3(x)))
        x = x.view(x.size(0), -1)  # Flatten
        return self.fc(x)


Fitting 3 folds for each of 108 candidates, totalling 324 fits


In [None]:
# Cross-validation evaluation for all models
ridge_mae, xgb_mae, cnn_mae = [], [], []
ridge_rmse, xgb_rmse, cnn_rmse = [], [], []
ridge_r2, xgb_r2, cnn_r2 = [], [], []
fold = 1

for train_idx, test_idx in tscv.split(X_scaled):
    X_train, X_test = X_scaled[train_idx], X_scaled[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]

    # Ridge Regression
    ridge_model.fit(X_train, y_train)
    ridge_pred = ridge_model.predict(X_test)
    ridge_mae.append(mean_absolute_error(y_test, ridge_pred))
    ridge_rmse.append(np.sqrt(mean_squared_error(y_test, ridge_pred)))
    ridge_r2.append(r2_score(y_test, ridge_pred))

    # XGBoost
    optimized_xgb_model.fit(X_train, y_train)
    xgb_pred = optimized_xgb_model.predict(X_test)
    xgb_mae.append(mean_absolute_error(y_test, xgb_pred))
    xgb_rmse.append(np.sqrt(mean_squared_error(y_test, xgb_pred)))
    xgb_r2.append(r2_score(y_test, xgb_pred))

    # CNN
    X_train_cnn = torch.tensor(X_train.reshape(X_train.shape[0], 1, X_train.shape[1]), dtype=torch.float32)
    y_train_cnn = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
    X_test_cnn = torch.tensor(X_test.reshape(X_test.shape[0], 1, X_test.shape[1]), dtype=torch.float32)
    y_test_cnn = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

    cnn_model = ConvNet(input_length=X_train.shape[1])
    criterion = nn.MSELoss()
    optimizer = optim.Adam(cnn_model.parameters(), lr=0.001)

    # Train CNN
    cnn_model.train()
    for epoch in range(10):  # Use fewer epochs per fold for efficiency
        optimizer.zero_grad()
        outputs = cnn_model(X_train_cnn)
        loss = criterion(outputs, y_train_cnn)
        loss.backward()
        optimizer.step()

    # Evaluate CNN
    cnn_model.eval()
    with torch.no_grad():
        cnn_pred = cnn_model(X_test_cnn).numpy().flatten()
    cnn_mae.append(mean_absolute_error(y_test, cnn_pred))
    cnn_rmse.append(np.sqrt(mean_squared_error(y_test, cnn_pred)))
    cnn_r2.append(r2_score(y_test, cnn_pred))

    print(f"Fold {fold}:")
    print(f"  Ridge - MAE={ridge_mae[-1]:.2f}, RMSE={ridge_rmse[-1]:.2f}, R2={ridge_r2[-1]:.4f}")
    print(f"  XGBoost - MAE={xgb_mae[-1]:.2f}, RMSE={xgb_rmse[-1]:.2f}, R2={xgb_r2[-1]:.4f}")
    print(f"  CNN - MAE={cnn_mae[-1]:.2f}, RMSE={cnn_rmse[-1]:.2f}, R2={cnn_r2[-1]:.4f}")
    fold += 1

# Average metrics across folds
print("\nCross-Validation Results (Average):")
print(f"Ridge - MAE: {np.mean(ridge_mae):.2f}, RMSE: {np.mean(ridge_rmse):.2f}, R2: {np.mean(ridge_r2):.4f}")
print(f"XGBoost - MAE: {np.mean(xgb_mae):.2f}, RMSE: {np.mean(xgb_rmse):.2f}, R2: {np.mean(xgb_r2):.4f}")
print(f"CNN - MAE: {np.mean(cnn_mae):.2f}, RMSE: {np.mean(cnn_rmse):.2f}, R2: {np.mean(cnn_r2):.4f}")

In [None]:
# Final Evaluation on Test Set
# Train-test split for final evaluation
split_idx = int(len(X_scaled) * 0.8)
X_train, X_test = X_scaled[:split_idx], X_scaled[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]

# Ridge Regression
ridge_model.fit(X_train, y_train)
ridge_pred = ridge_model.predict(X_test)

# XGBoost
optimized_xgb_model.fit(X_train, y_train)
xgb_pred = optimized_xgb_model.predict(X_test)

# CNN
X_train_cnn = torch.tensor(X_train.reshape(X_train.shape[0], 1, X_train.shape[1]), dtype=torch.float32)
y_train_cnn = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
X_test_cnn = torch.tensor(X_test.reshape(X_test.shape[0], 1, X_test.shape[1]), dtype=torch.float32)
y_test_cnn = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

cnn_model = ConvNet(input_length=X_train.shape[1])
criterion = nn.MSELoss()
optimizer = optim.Adam(cnn_model.parameters(), lr=0.001)

# Train CNN
cnn_model.train()
epochs = 20
for epoch in range(epochs):
    optimizer.zero_grad()
    outputs = cnn_model(X_train_cnn)
    loss = criterion(outputs, y_train_cnn)
    loss.backward()
    optimizer.step()

# Evaluate CNN
cnn_model.eval()
with torch.no_grad():
    cnn_pred = cnn_model(X_test_cnn).numpy().flatten()


In [None]:
# Final evaluation metrics
ridge_mae = mean_absolute_error(y_test, ridge_pred)
ridge_rmse = np.sqrt(mean_squared_error(y_test, ridge_pred))
ridge_r2 = r2_score(y_test, ridge_pred)

xgb_mae = mean_absolute_error(y_test, xgb_pred)
xgb_rmse = np.sqrt(mean_squared_error(y_test, xgb_pred))
xgb_r2 = r2_score(y_test, xgb_pred)

cnn_mae = mean_absolute_error(y_test, cnn_pred)
cnn_rmse = np.sqrt(mean_squared_error(y_test, cnn_pred))
cnn_r2 = r2_score(y_test, cnn_pred)

# 1% Error Check
ridge_accuracy = [abs((pred - actual) / actual) <= 0.01 for pred, actual in zip(ridge_pred, y_test)]
ridge_accuracy_rate = sum(ridge_accuracy) / len(ridge_accuracy) * 100

xgb_accuracy = [abs((pred - actual) / actual) <= 0.01 for pred, actual in zip(xgb_pred, y_test)]
xgb_accuracy_rate = sum(xgb_accuracy) / len(xgb_accuracy) * 100

cnn_accuracy = [abs((pred - actual) / actual) <= 0.01 for pred, actual in zip(cnn_pred, y_test)]
cnn_accuracy_rate = sum(cnn_accuracy) / len(cnn_accuracy) * 100

# Print final evaluation metrics
print("\nFinal Evaluation on Test Set:")
print(f"Ridge - Accuracy (within 1% error): {ridge_accuracy_rate:.2f}%")
print(f"Ridge - MAE: {ridge_mae:.2f}, RMSE: {ridge_rmse:.2f}, R2: {ridge_r2:.4f}")
print(f"XGBoost - Accuracy (within 1% error): {xgb_accuracy_rate:.2f}%")
print(f"XGBoost - MAE: {xgb_mae:.2f}, RMSE: {xgb_rmse:.2f}, R2: {xgb_r2:.4f}")
print(f"CNN - Accuracy (within 1% error): {cnn_accuracy_rate:.2f}%")
print(f"CNN - MAE: {cnn_mae:.2f}, RMSE: {cnn_rmse:.2f}, R2: {cnn_r2:.4f}")


In [None]:
# Visualization
plt.figure(figsize=(15, 20))

# Ridge Regression: Actual vs Predicted
plt.subplot(4, 1, 1)
plt.plot(range(len(y_test)), y_test, label='Actual', color='blue')
plt.plot(range(len(ridge_pred)), ridge_pred, label='Ridge Predicted', color='red')
plt.title(f'Ridge Regression (L2 Regularization): Actual vs Predicted Prices')
plt.legend()
plt.grid(True)

# XGBoost: Actual vs Predicted
plt.subplot(4, 1, 2)
plt.plot(range(len(y_test)), y_test, label='Actual', color='blue')
plt.plot(range(len(xgb_pred)), xgb_pred, label='XGBoost Predicted', color='green')
plt.title(f'XGBoost: Actual vs Predicted Prices')
plt.legend()
plt.grid(True)

# CNN: Actual vs Predicted
plt.subplot(4, 1, 3)
plt.plot(range(len(y_test)), y_test, label='Actual', color='blue')
plt.plot(range(len(cnn_pred)), cnn_pred, label='CNN Predicted', color='orange')
plt.title(f'CNN: Actual vs Predicted Prices')
plt.legend()
plt.grid(True)

# 1% Error Accuracy for All Models
plt.subplot(4, 1, 4)
ridge_accuracy_plot = [1 if abs((p - a) / a) <= 0.01 else 0 for p, a in zip(ridge_pred, y_test)]
xgb_accuracy_plot = [1 if abs((p - a) / a) <= 0.01 else 0 for p, a in zip(xgb_pred, y_test)]
cnn_accuracy_plot = [1 if abs((p - a) / a) <= 0.01 else 0 for p, a in zip(cnn_pred, y_test)]

plt.plot(ridge_accuracy_plot, label='Ridge Accuracy (1% error)', color='red')
plt.plot(xgb_accuracy_plot, label='XGBoost Accuracy (1% error)', color='green')
plt.plot(cnn_accuracy_plot, label='CNN Accuracy (1% error)', color='orange')
plt.axhline(y=1, color='blue', linestyle='--', label='Within 1% Error')
plt.title('Prediction Accuracy (1 = within 1% error, 0 = outside 1% error)')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()
