In [71]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset


In [73]:
df = pd.read_csv('stock_data.csv')
list(df.columns.values)

['Date',
 'Open',
 'High',
 'Low',
 'Close',
 'Adj Close',
 'Volume',
 'neutral-count-finbert',
 'positive-count-finbert',
 'negative-count-finbert',
 'average-confidence-finbert',
 'average-neutral-score-gemini',
 'average-positive-score-gemini',
 'average-negative-score-gemini',
 'prediction-label',
 'ticker',
 'number-employees',
 'date_object']

In [116]:
# Load the dataset
df = pd.read_csv('stock_data.csv').dropna()

df = df.loc[df['ticker'] == 'META']
# We assume columns 'open', 'close', 'sentiment_score' for features and 'label' for target
df = df[[
 'Open',
 'High',
 'Low',
 'Close',
 'Adj Close',
 'Volume',
 'neutral-count-finbert',
 'positive-count-finbert',
 'negative-count-finbert',
 'average-confidence-finbert',
 'average-neutral-score-gemini',
 'average-positive-score-gemini',
 'average-negative-score-gemini',
 'prediction-label',
 ]]
df.reset_index(inplace=True)

# Scale the features
scaler = MinMaxScaler()
scaled_features = scaler.fit_transform(df[[
 'Open',
 'High',
 'Low',
 'Close',
 'Adj Close',
 'Volume',
 'neutral-count-finbert',
 'positive-count-finbert',
 'negative-count-finbert',
 'average-confidence-finbert',
 'average-neutral-score-gemini',
 'average-positive-score-gemini',
 'average-negative-score-gemini',
 ]])

# Create a new DataFrame with scaled features
df_scaled = pd.DataFrame(scaled_features, columns=[
 'Open',
 'High',
 'Low',
 'Close',
 'Adj Close',
 'Volume',
 'neutral-count-finbert',
 'positive-count-finbert',
 'negative-count-finbert',
 'average-confidence-finbert',
 'average-neutral-score-gemini',
 'average-positive-score-gemini',
 'average-negative-score-gemini',
 ])

# Add the target label back to the DataFrame
df_scaled['prediction-label'] = df['prediction-label']

# Create sequences for LSTM input
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data.iloc[i:i+seq_length, :-1].values)  # All features, excluding the label column
        y.append(data.iloc[i+seq_length, -1])  # The label at the next timestep
    return np.array(X), np.array(y)

# Set sequence length (e.g., 10 days of data for each prediction)
SEQ_LENGTH = 3

# Create sequences
X, y = create_sequences(df_scaled, SEQ_LENGTH)

# Convert to PyTorch tensors
X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32).view(-1, 1)  # Ensure y is of shape [batch_size, 1]


In [118]:
df_scaled

Unnamed: 0,Open,High,Low,Close,Adj Close,Volume,neutral-count-finbert,positive-count-finbert,negative-count-finbert,average-confidence-finbert,average-neutral-score-gemini,average-positive-score-gemini,average-negative-score-gemini,prediction-label
0,1.0,0.65291,1.0,1.0,1.0,0.0,0.5,1.0,0.0,0.0,0.333333,0.4,1.0,0.0
1,1.0,0.65291,1.0,1.0,1.0,0.0,0.0,0.0,1.0,0.829423,0.0,0.0,0.25,0.0
2,0.673569,0.330159,0.303299,0.49343,0.49343,0.122893,1.0,0.0,0.0,0.63401,0.166667,0.5,0.75,0.0
3,0.727743,1.0,0.643848,0.6289,0.6289,1.0,0.5,0.0,0.5,1.0,0.5,0.6,0.0,0.0
4,0.383713,0.258202,0.18804,0.23317,0.23317,0.121626,0.5,0.0,0.5,0.844531,0.833333,0.5,0.8125,0.0
5,0.0,0.0,0.0,0.0,0.0,0.202046,0.5,0.0,0.5,0.843527,1.0,1.0,0.75,1.0


In [140]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_tensor, y_tensor, test_size=0.2, shuffle=False)

# Create DataLoader for batching
train_data = TensorDataset(X_train, y_train)
test_data = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_data, batch_size=32, shuffle=False)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)


In [142]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(LSTMModel, self).__init__()
        
        # Define the LSTM layer
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
        # Define a fully connected (linear) layer for output
        self.fc = nn.Linear(hidden_size, output_size)

        # Sigmoid activation function because this is a binary classification problem
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        # LSTM outputs
        lstm_out, (hn, cn) = self.lstm(x)  # hn is the hidden state from the last LSTM layer
        
        # We take the output from the last time step
        out = self.fc(hn[-1])  # hn[-1] is the last hidden state (representing the entire sequence)
        out = self.sigmoid(out)  # Apply sigmoid to get a probability
        
        return out


In [144]:
# Hyperparameters
input_size = 13  # Number of features in each time step (open, close, sentiment_score)
hidden_size = 50  # Number of LSTM units in each layer
output_size = 1  # Output size (1 for binary classification)
num_layers = 1  # Number of LSTM layers
batch_size = 32
num_epochs = 100
learning_rate = 0.001

# Instantiate the model
model = LSTMModel(input_size, hidden_size, output_size, num_layers)

# Loss function and optimizer
criterion = nn.BCELoss()  # Using binary cross entropy loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


In [146]:
# Training loop
for epoch in range(num_epochs):
    model.train()
    
    # Loop over batches of data using train_loader
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        # Ensure that the inputs are of the correct shape [batch_size, seq_length, input_size]
        inputs = inputs.float()  # Ensure input is of type float
        targets = targets.float().view(-1, 1)  # Ensure target shape is [batch_size, 1]

        # Forward pass
        outputs = model(inputs)  # Get model predictions for the current batch
        loss = criterion(outputs, targets)  # Calculate the loss

        # Backward pass and optimization
        optimizer.zero_grad()  # Clear previous gradients
        loss.backward()  # Compute gradients
        optimizer.step()  # Update model parameters
        
    # Print loss every 10 epochs
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")


Epoch [10/100], Loss: 0.6532
Epoch [20/100], Loss: 0.5147
Epoch [30/100], Loss: 0.3380
Epoch [40/100], Loss: 0.1608
Epoch [50/100], Loss: 0.0607
Epoch [60/100], Loss: 0.0258
Epoch [70/100], Loss: 0.0143
Epoch [80/100], Loss: 0.0097
Epoch [90/100], Loss: 0.0075
Epoch [100/100], Loss: 0.0061


In [147]:
# After training, evaluate the model
model.eval()  # Set model to evaluation mode
correct = 0
total = 0

with torch.no_grad():  # Disable gradient calculation during evaluation
    for inputs, targets in test_loader:
        inputs = inputs.float()
        targets = targets.float().view(-1, 1)
        
        outputs = model(inputs)  # Get predictions
        predicted_class = (outputs >= 0.5).float()  # Classify as 1 if probability >= 0.5, else 0
        
        total += targets.size(0)
        correct += (predicted_class == targets).sum().item()

accuracy = correct / total
print(f"Accuracy on test data: {accuracy * 100:.2f}%")


Accuracy on test data: 0.00%
