In [29]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split

# Define random_state for reproducibility
random_state = 42

# ----- Load Stock and Sentiment Data -----
stock_data = pd.read_csv('AAPL_trend.csv', parse_dates=['date'])
sentiment_data = pd.read_csv('sentiment_scores_title.csv', parse_dates=['date'])

# Merge sentiment scores with stock data on 'date'
stock_data['date'] = pd.to_datetime(stock_data['date'])
merged_data = pd.merge(stock_data, sentiment_data, on='date', how='left')

# ----- Define Date Range and Filter Stock Data -----
start_date = pd.to_datetime("2024-02-12")
end_date = pd.to_datetime("2025-02-19")
selected_stock_data = merged_data[(merged_data['date'] >= start_date) & (merged_data['date'] <= end_date)]

# ----- Feature Engineering for LSTM -----
X_days = 7  # Use previous 7 days of stock data
Y_days = 1  # Use 1 day of sentiment data

selected_stock_data = selected_stock_data.sort_values('date')
dates = selected_stock_data['date'].dt.strftime('%Y-%m-%d').tolist()
closing_values = selected_stock_data['closingValue'].values
sentiment_scores = selected_stock_data['sentiment_score'].values
trends = selected_stock_data['trend'].values

# Map trends to numerical labels
trend_to_label = {'increase': 0, 'decrease': 1, 'stable': 1}
labels_all = [trend_to_label[t] for t in trends]

features = []
labels = []

for i in range(max(X_days, Y_days), len(selected_stock_data)):
    stock_features = closing_values[i - X_days:i]
    sentiment_feature = sentiment_scores[i - Y_days:i]  # Use sentiment scores for the past Y_days

    feature_vector = np.concatenate([stock_features, sentiment_feature])
    features.append(feature_vector)
    labels.append(labels_all[i])

features = np.array(features)
labels = np.array(labels)
print(np.unique(labels, return_counts=True))

# ----- Train-Test Split with random_state -----
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=random_state, stratify=labels)

# ----- Define the LSTM Model -----
class StockTrendLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(StockTrendLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        out, _ = self.lstm(x, (h0, c0))
        out = out[:, -1, :]
        out = self.fc(out)
        return out

# Prepare the data for LSTM
input_size = 1
X_train_seq = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test_seq = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

X_train_tensor = torch.tensor(X_train_seq, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_seq, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# Initialize and train the model
model = StockTrendLSTM(input_size=1, hidden_size=16, num_layers=1, num_classes=3)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 30000
accuracies = []
for epoch in range(num_epochs):
    model.train()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (epoch + 1) % 100 == 0:
        model.eval()
        with torch.no_grad():
            test_outputs = model(X_test_tensor)
            _, predicted = torch.max(test_outputs, 1)
            accuracy = (predicted == y_test_tensor).sum().item() / y_test_tensor.size(0)
            accuracies.append([epoch + 1, accuracy])
            print("Test Accuracy at epoch", epoch + 1, ":", accuracy)   

# Sort accuracies array by accuracy in descending order
accuracies = np.array(accuracies)
accuracies = accuracies[accuracies[:, 1].argsort()[::-1]]
print("Top 4 accuracies:")
for i in range(4):
    print(f"Epoch: {int(accuracies[i][0])}, Accuracy: {accuracies[i][1]}")


(array([0, 1]), array([145, 104]))
Test Accuracy at epoch 100 : 0.62
Test Accuracy at epoch 200 : 0.58
Test Accuracy at epoch 300 : 0.58
Test Accuracy at epoch 400 : 0.58
Test Accuracy at epoch 500 : 0.58
Test Accuracy at epoch 600 : 0.58
Test Accuracy at epoch 700 : 0.58
Test Accuracy at epoch 800 : 0.58
Test Accuracy at epoch 900 : 0.58
Test Accuracy at epoch 1000 : 0.58
Test Accuracy at epoch 1100 : 0.58
Test Accuracy at epoch 1200 : 0.58
Test Accuracy at epoch 1300 : 0.58
Test Accuracy at epoch 1400 : 0.58
Test Accuracy at epoch 1500 : 0.58
Test Accuracy at epoch 1600 : 0.58
Test Accuracy at epoch 1700 : 0.58
Test Accuracy at epoch 1800 : 0.58
Test Accuracy at epoch 1900 : 0.58
Test Accuracy at epoch 2000 : 0.58
Test Accuracy at epoch 2100 : 0.58
Test Accuracy at epoch 2200 : 0.58
Test Accuracy at epoch 2300 : 0.58
Test Accuracy at epoch 2400 : 0.58
Test Accuracy at epoch 2500 : 0.58
Test Accuracy at epoch 2600 : 0.58
Test Accuracy at epoch 2700 : 0.58
Test Accuracy at epoch 2800 :