# Stock Price Prediction using LSTM with PyTorch

In [None]:

# Import necessary libraries
import pandas as pd
import numpy as np
from google.colab import drive
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

# Mount Google Drive
drive.mount('/content/drive')


In [None]:

# Load the dataset (had to make a shortcut from "Shared with Me" to MyDrive)
sp500 = pd.read_csv('/content/drive/MyDrive/cs3244_data/ETFs/spy.us.txt')

# Group the DataFrame by Date and Type of Price
grouped = sp500.groupby(['Date', 'Type of Price'])

# Calculate the average price for each date and type
avg_prices = grouped['Price'].mean().reset_index()

# Create a dictionary to store the average high and low prices for each date
avg_high_low = {}
for index, row in avg_prices.iterrows():
    date = row['Date']
    price_type = row['Type of Price']
    price = row['Price']
    if date not in avg_high_low:
        avg_high_low[date] = {}
    if price_type in ['high', 'low']:
        avg_high_low[date][price_type] = price

# Iterate through the DataFrame and replace high and low prices with the average
for index, row in sp500.iterrows():
    date = row['Date']
    price_type = row['Type of Price']
    if price_type in ['high', 'low'] and date in avg_high_low:
        sp500.loc[index, 'Price'] = avg_high_low[date][price_type]

# Create a new DataFrame to store the modified data
new_sp500 = pd.DataFrame(columns=['Date', 'Price', 'Type of Price'])
i = 0
while i < len(sp500):
    row = sp500.iloc[i]
    if row['Type of Price'] == 'high':
        # Check if the next row exists and is 'low'
        if i + 1 < len(sp500) and sp500.iloc[i + 1]['Type of Price'] == 'low':
            # Calculate the average of the high and low prices
            average_price = (row['Price'] + sp500.iloc[i + 1]['Price']) / 2
            # Add a new row with the average price and label 'average'
            new_sp500 = pd.concat([new_sp500, pd.DataFrame({'Date': [row['Date']], 'Price': [average_price], 'Type of Price': ['average']})], ignore_index=True)
            i += 2  # Skip the next row (low)
        else:
            # If the next row is not 'low', keep the current row as it is
            new_sp500 = pd.concat([new_sp500, pd.DataFrame({'Date': [row['Date']], 'Price': [row['Price']], 'Type of Price': [row['Type of Price']]})], ignore_index=True)
            i += 1
    else:
        # If the current row is not 'high', keep it as it is
        new_sp500 = pd.concat([new_sp500, pd.DataFrame({'Date': [row['Date']], 'Price': [row['Price']], 'Type of Price': [row['Type of Price']]})], ignore_index=True)
        i += 1

# Replace the original sp500 DataFrame with the modified one
sp500 = new_sp500


In [None]:

# Drop unnecessary columns and scale the 'Price' feature
scaler = MinMaxScaler(feature_range=(-1, 1))
sp500['Price'] = scaler.fit_transform(sp500['Price'].values.reshape(-1, 1))


In [None]:

# Define a PyTorch Dataset class for stock prices
class StockDataset(Dataset):
    def __init__(self, data, seq_length=10):
        self.data = data
        self.seq_length = seq_length

    def __len__(self):
        return len(self.data) - self.seq_length

    def __getitem__(self, index):
        x = self.data[index:index + self.seq_length]
        y = self.data[index + self.seq_length]
        return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.float32)

# Create dataset and dataloader
data = sp500['Price'].values
seq_length = 10
dataset = StockDataset(data, seq_length=seq_length)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)


In [None]:

# Define the LSTM model in PyTorch
class LSTMStockPredictor(nn.Module):
    def __init__(self, input_size=1, hidden_layer_size=100, output_size=1):
        super(LSTMStockPredictor, self).__init__()
        self.hidden_layer_size = hidden_layer_size
        self.lstm = nn.LSTM(input_size, hidden_layer_size)
        self.linear = nn.Linear(hidden_layer_size, output_size)
        self.hidden_cell = (torch.zeros(1, 1, self.hidden_layer_size),
                            torch.zeros(1, 1, self.hidden_layer_size))
    
    def forward(self, input_seq):
        lstm_out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq), 1, -1), self.hidden_cell)
        predictions = self.linear(lstm_out.view(len(input_seq), -1))
        return predictions[-1]


In [None]:

# Initialize the model, loss function, and optimizer
model = LSTMStockPredictor()
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [None]:

# Training the LSTM model
epochs = 100
for epoch in range(epochs):
    for seq, labels in dataloader:
        optimizer.zero_grad()
        model.hidden_cell = (torch.zeros(1, 1, model.hidden_layer_size),
                             torch.zeros(1, 1, model.hidden_layer_size))
        
        y_pred = model(seq)
        single_loss = loss_function(y_pred, labels)
        single_loss.backward()
        optimizer.step()
    
    if epoch % 10 == 0:
        print(f'Epoch {epoch} Loss: {single_loss.item()}')


In [None]:

# Evaluate the model (example for demonstration)
model.eval()
test_predictions = []
with torch.no_grad():
    for seq, labels in dataloader:
        model.hidden_cell = (torch.zeros(1, 1, model.hidden_layer_size),
                             torch.zeros(1, 1, model.hidden_layer_size))
        y_pred = model(seq)
        test_predictions.append(y_pred.item())

# Rescale the predictions back to original range
predictions_rescaled = scaler.inverse_transform(np.array(test_predictions).reshape(-1, 1))
print("Predictions rescaled:", predictions_rescaled[:5])  # Display a few predictions
