<a href="https://colab.research.google.com/github/hanupratap/Approximate-Divider/blob/main/SP500_Prediction_(CNNLSTM_SARIMA).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler

In [None]:
def get_sp500_data(start='2010-01-01'):
    """Download S&P500 data"""
    df = yf.download('^GSPC', start=start)
    return df

# SARIMA Model Prediction

In [None]:
rmse = np.sqrt(mean_squared_error(test_data[:100], predictions))

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
TESTING_UNTIL = 100 # limiting iterations

# Split data into training and test sets
train_size = int(len(data) * 0.8)
train_data = data[:train_size]
test_data = data[train_size:]

# Fit the model on training data
model = SARIMAX(train_data, order=(1, 1, 1), seasonal_order=(1, 1, 0, 12))
sarima_result = model.fit(disp=False)

# Predict one step ahead iteratively for the test set

predictions = []
for i in range(min(TESTING_UNTIL,len(test_data))):
    if i % 10 == 0:  # Log progress every 10 iterations
          print(f"Iteration {i}/{len(test_data)}: Predicting for {test_data.index[i]}")
    model_forecast = sarima_result.get_forecast(steps=1)
    pred = model_forecast.predicted_mean.iloc[0]
    predictions.append(pred)
    sarima_result = sarima_result.append(test_data.iloc[i:i+1])

# Calculate performance metrics
rmse = np.sqrt(mean_squared_error(test_data[:TESTING_UNTIL], predictions))
mape = mean_absolute_percentage_error(test_data, predictions)

print(f"Root Mean Squared Error (RMSE): {rmse:.2f}")
print(f"Mean Absolute Percentage Error (MAPE): {mape:.2%}")

# Plot actual vs predicted
plt.figure(figsize=(12, 6))
plt.plot(test_data.index, test_data, label="Actual", color="blue")
plt.plot(test_data.index, predictions, label="Predicted", color="orange")
plt.title("S&P 500 t+1 Predictions")
plt.xlabel("Date")
plt.ylabel("Price")
plt.legend()
plt.show()


Iteration 0/783: Predicting for 2021-12-31 00:00:00
Iteration 10/783: Predicting for 2022-01-14 00:00:00
Iteration 20/783: Predicting for 2022-01-28 00:00:00
Iteration 30/783: Predicting for 2022-02-11 00:00:00
Iteration 40/783: Predicting for 2022-02-25 00:00:00
Iteration 50/783: Predicting for 2022-03-11 00:00:00
Iteration 60/783: Predicting for 2022-03-25 00:00:00
Iteration 70/783: Predicting for 2022-04-08 00:00:00
Iteration 80/783: Predicting for 2022-04-22 00:00:00
Iteration 90/783: Predicting for 2022-05-06 00:00:00


ValueError: Found input variables with inconsistent numbers of samples: [783, 100]

# CNN LSTM Model Prediction

In [None]:
def get_device():
  """Get the appropriate device for M1 Mac, CUDA, or CPU"""
  if torch.backends.mps.is_available():
      return torch.device("mps")
  elif torch.cuda.is_available():
      return torch.device("cuda")
  else:
      return torch.device("cpu")

In [None]:
def prepare_data(window_size=10):
  # Download data
  df = yf.download('^GSPC', start='2010-01-01')

  # Calculate features
  df['Returns'] = df['Close'].pct_change()
  df['High_Low_Ratio'] = (df['High'] / df['Low']) - 1
  df['Open_Close_Ratio'] = (df['Close'] / df['Open']) - 1
  df['Volume_Change'] = df['Volume'].pct_change()
  df['MA5_Ratio'] = (df['Close'] / df['Close'].rolling(window=5).mean()) - 1
  df['MA20_Ratio'] = (df['Close'] / df['Close'].rolling(window=20).mean()) - 1
  df['Volatility'] = df['Returns'].rolling(window=20).std()

  # Select features
  features = ['Returns', 'High_Low_Ratio', 'Open_Close_Ratio',
              'Volume_Change', 'MA5_Ratio', 'MA20_Ratio', 'Volatility']

  # Drop NaN values
  df = df.dropna()

  # Prepare target (next day's return)
  df['Target'] = df['Returns'].shift(-1)
  df = df.dropna()

  # Scale features
  scaler = MinMaxScaler()
  data = scaler.fit_transform(df[features + ['Target']])

  # Create sequences
  X, y = [], []
  for i in range(len(data) - window_size):
      X.append(data[i:(i + window_size), :-1])  # All features except Target
      y.append(data[i + window_size, -1])      # Target

  # Convert to arrays
  X, y = np.array(X), np.array(y)

  # Train-test split
  train_size = int(len(X) * 0.8)
  X_train = X[:train_size]
  y_train = y[:train_size]
  X_test = X[train_size:]
  y_test = y[train_size:]

  return X_train, y_train, X_test, y_test, scaler, df['Close']


In [None]:
class StockDataset(Dataset):
  def __init__(self, X, y):
      self.X = torch.FloatTensor(X)
      self.y = torch.FloatTensor(y)

  def __len__(self):
      return len(self.X)

  def __getitem__(self, idx):
      return self.X[idx], self.y[idx]


In [None]:
class SimpleCNNLSTM(nn.Module):
  def __init__(self, input_dim):
      super(SimpleCNNLSTM, self).__init__()

      # One CNN layer
      self.conv = nn.Conv1d(input_dim, 32, kernel_size=3, padding=1)

      # One LSTM layer
      self.lstm = nn.LSTM(32, 32, batch_first=True)

      # One dense layer
      self.fc = nn.Linear(32, 1)

  def forward(self, x):
      # CNN
      x = x.transpose(1, 2)
      x = torch.relu(self.conv(x))
      x = x.transpose(1, 2)

      # LSTM
      x, _ = self.lstm(x)
      x = x[:, -1, :]  # Take last output

      # Dense
      x = self.fc(x)
      return x

In [None]:
def train_model(model, train_loader, epochs=50):
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y.unsqueeze(1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}')


In [None]:
def plot_results(actuals, predictions, dates=None, theme='seaborn-v0_8-paper'):
    """Create comprehensive visualization of results"""
    plt.style.use(theme)

    fig = plt.figure(figsize=(20, 15))

    # 1. Time Series Plot
    ax1 = plt.subplot(3, 2, 1)
    ax1.plot(actuals, label='Actual', color='blue', alpha=0.7)
    ax1.plot(predictions, label='Predicted', color='red', alpha=0.7)
    ax1.set_title('S&P500 Price - Actual vs Predicted')
    ax1.set_xlabel('Time')
    ax1.set_ylabel('Price')
    ax1.legend()
    ax1.grid(True)

    # 2. Scatter Plot with Regression Line
    ax2 = plt.subplot(3, 2, 2)
    sns.regplot(x=actuals, y=predictions, scatter_kws={'alpha':0.5},
                line_kws={'color': 'red'}, ax=ax2)
    ax2.set_title('Actual vs Predicted - Regression Plot')
    ax2.set_xlabel('Actual Price')
    ax2.set_ylabel('Predicted Price')

    # 3. Prediction Error Distribution
    ax3 = plt.subplot(3, 2, 3)
    errors = predictions - actuals
    sns.histplot(errors, kde=True, ax=ax3)
    ax3.set_title('Prediction Error Distribution')
    ax3.set_xlabel('Prediction Error')
    ax3.set_ylabel('Frequency')

    # 4. Prediction Error vs Actual Price
    ax4 = plt.subplot(3, 2, 4)
    ax4.scatter(actuals, errors, alpha=0.5)
    ax4.axhline(y=0, color='r', linestyle='--')
    ax4.set_title('Prediction Error vs Actual Price')
    ax4.set_xlabel('Actual Price')
    ax4.set_ylabel('Prediction Error')

    # 5. Direction Prediction Accuracy
    ax5 = plt.subplot(3, 2, 5)
    direction_true = np.diff(actuals) > 0
    direction_pred = np.diff(predictions) > 0
    direction_accuracy = (direction_true == direction_pred)
    ax5.plot(direction_accuracy, label='Correct Direction', color='green', alpha=0.7)
    ax5.set_title('Direction Prediction Accuracy')
    ax5.set_xlabel('Time')
    ax5.set_ylabel('Direction Accuracy')
    ax5.legend()
    ax5.grid(True)

    # 6. Rolling Direction Accuracy
    ax6 = plt.subplot(3, 2, 6)
    rolling_accuracy = pd.Series(direction_accuracy).rolling(window=20).mean()
    ax6.plot(rolling_accuracy, label='20-day Rolling Accuracy', color='purple', alpha=0.7)
    ax6.axhline(y=0.5, color='r', linestyle='--', label='Random Guess')
    ax6.set_title('20-day Rolling Direction Accuracy')
    ax6.set_xlabel('Time')
    ax6.set_ylabel('Accuracy')
    ax6.legend()
    ax6.grid(True)

    plt.tight_layout()
    plt.show()

In [None]:
# Work in Progress
class CombinedLoss(nn.Module):
    def __init__(self, alpha=0.6, beta=0.4):
        super(CombinedLoss, self).__init__()
        self.alpha = alpha  # Weight for MSE
        self.beta = beta    # Weight for direction
        self.mse = nn.MSELoss()

    def forward(self, y_pred, y_true):
        # MSE Loss
        mse_loss = self.mse(y_pred, y_true)

        # Direction Loss
        pred_diff = y_pred[1:] - y_pred[:-1]
        true_diff = y_true[1:] - y_true[:-1]

        direction_match = torch.sign(pred_diff) * torch.sign(true_diff)
        direction_loss = -torch.mean(direction_match)

        # Combined loss
        total_loss = self.alpha * mse_loss + self.beta * direction_loss
        return total_loss

## Now Training the model

In [None]:
WINDOW_SIZE = 10
BATCH_SIZE = 32
EPOCHS = 50

# Prepare data
print("Loading and preparing data...")
X_train, y_train, X_test, y_test, scaler, close_prices = prepare_data(WINDOW_SIZE)

# Create data loader
train_dataset = StockDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Create and train model
model = SimpleCNNLSTM(input_dim=X_train.shape[2])
print("Training model...")
train_model(model, train_loader, EPOCHS)

# Make predictions
model.eval()
test_dataset = StockDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

with torch.no_grad():
    predictions = []
    actuals = []
    for batch_X, batch_y in test_loader:
        outputs = model(batch_X)
        predictions.extend(outputs.squeeze().numpy())
        actuals.extend(batch_y.numpy())

predictions = np.array(predictions)
actuals = np.array(actuals)

# Convert returns to prices
test_close = close_prices[-len(predictions)-1:-1].values
pred_prices = test_close * (1 + predictions)
actual_prices = test_close * (1 + actuals)

# Calculate errors
mse = np.mean((pred_prices - actual_prices) ** 2)
rmse = np.sqrt(mse)
mae = np.mean(np.abs(pred_prices - actual_prices))
mape = np.mean(np.abs((pred_prices - actual_prices) / actual_prices)) * 100

print("\nError Metrics:")
print(f"MSE: {mse:.2f}")
print(f"RMSE: {rmse:.2f}")
print(f"MAE: {mae:.2f}")
print(f"MAPE: {mape:.2f}%")

Loading and preparing data...


[*********************100%***********************]  1 of 1 completed


Training model...
Epoch [10/50], Loss: 0.0026
Epoch [20/50], Loss: 0.0025
Epoch [30/50], Loss: 0.0025
Epoch [40/50], Loss: 0.0024
Epoch [50/50], Loss: 0.0024

Error Metrics:
MSE: 61049.25
RMSE: 247.08
MAE: 183.40
MAPE: 2.54%
