In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from transformers import AutoTokenizer
import numpy as np
import pandas as pd

dataset = pd.read_csv("bitcoin_sentiments_21_24_cleaned.csv")

data_text = list(dataset['Cleaned_Description'])
dates = list(dataset['Date'])
labels = list(dataset['Sentiment_Category'])

#tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")

In [2]:
class SentimentDataset(Dataset):
    def __init__(self, texts, dates, labels, tokenizer, max_length=64):
        self.labels = labels
        self.texts = texts
        self.encodings = tokenizer(
            texts,
            truncation=True,
            padding="max_length",
            max_length=max_length,
            return_tensors="pt"
        )
        self.dates = dates

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        # Return a dictionary with input_ids, attention_mask, etc.
        item = {
            "input_ids": self.encodings["input_ids"][idx],
            "attention_mask": self.encodings["attention_mask"][idx],
            "label": torch.tensor(self.labels[idx], dtype=torch.long),
            "date": self.dates[idx],
            "text": self.texts[idx]
        }
        # token_type_ids are also available if needed
        return item


dataset = SentimentDataset(data_text, dates, labels, tokenizer, max_length=64)

In [3]:
dataset[0]

{'input_ids': tensor([  101,  2978,  3597,  2378,  3976,  2003,  9530, 19454,  8524,  3436,
          2379,  1996, 13751, 23612,  8889,  2490, 28855, 14820,  2003,  3173,
         12154,  2682, 13751,  3429, 12376,  1060, 14536,  2453,  6149,  2896,
          2000, 13751, 10630,   102,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0]),
 'attention_mask': tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 'label': tensor(0),
 'date': '2021-11-05 04:42:00',
 'text': 'Bitcoin price is consolidating near the USD 62000 support Ethereum is holding gains above USD 4550 XRP might correct lower to USD 115'}

In [4]:
train_size = int(0.8 * len(dataset))  # 80% for training
val_size = int(0.1 * len(dataset))    # 10% for validation
test_size = len(dataset) - train_size - val_size  # Remaining 10% for testing

# Split the dataset
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
def load_pretrained_embeddings(glove_file, embedding_dim):
    embeddings_index = {}
    with open(glove_file, 'r', encoding='utf-8') as f:
        for line_num, line in enumerate(f, start=1):
            # Strip and split
            values = line.strip().split()
            
            # Quick check: does the line have the right number of elements?
            if len(values) != embedding_dim + 1:
                # If not, skip or print a warning
                print(f"Skipping line {line_num}: expected {embedding_dim+1} elements, found {len(values)}")
                continue
            
            word = values[0]
            try:
                coefs = np.asarray(values[1:], dtype='float32')
            except ValueError as e:
                # If conversion fails, skip this line or handle the error
                print(f"Skipping line {line_num} due to parse error: {e}")
                continue
            
            embeddings_index[word] = coefs
    return embeddings_index

In [None]:
glove_file = "glove.840B.300d.txt"
embedding_dim = 300
embeddings_index = load_pretrained_embeddings(glove_file, embedding_dim)

In [None]:
def build_embedding_matrix(vocab_size, embeddings_index, embedding_dim):
    embedding_matrix = np.zeros((vocab_size, embedding_dim), dtype='float32')
    
    for word, i in word_to_idx.items():
        if word in embeddings_index:
            embedding_matrix[i] = embeddings_index[word]
        else:
            # If not found, could be random or zero
            embedding_matrix[i] = np.random.normal(scale=0.6, size=(embedding_dim,))
    
    return embedding_matrix

In [5]:
# used to output hidden layers to classifier now changed to Attention Layer

class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_dim=128, hidden_dim=128, num_classes=3, pretrained_emb=None, freeze_embedding=False):
        super(LSTMModel, self).__init__()
        
        if pretrained_emb is not None:
            self.embedding = nn.Embedding.from_pretrained(
                embeddings=pretrained_emb, 
                freeze=freeze_embedding  # If True, embeddings are not updated
            )
        else:
            self.embedding = nn.Embedding(vocab_size, embed_dim)
        
        # 2) An LSTM
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=2, batch_first=True, bidirectional=True)

        self.attention = nn.Linear(hidden_dim*2, 1, bias=False)
        
        # 3) Linear classifier for 3 classes
        self.classifier = nn.Linear(hidden_dim*2, num_classes)

        # Optional dropout
        self.dropout = nn.Dropout(p=0.5)
        
    def forward(self, input_ids, attention_mask=None):
        # input_ids: [batch_size, seq_len]
        # attention_mask: [batch_size, seq_len]

        # Embed tokens
        embedded = self.embedding(input_ids)  # [batch_size, seq_len, embed_dim]
        embedded = self.dropout(embedded)
        
        # LSTM
        outputs, (hidden, cell) = self.lstm(embedded)
        
        attn_scores = self.attention(outputs)
        if attention_mask is not None:
            # Convert attention_mask to float so we can do arithmetic
            # We can set pad positions to a large negative number to zero out their influence.
            mask = attention_mask.unsqueeze(-1)  # [batch_size, seq_len, 1]
            # Where mask=0 => we set scores to a large negative
            attn_scores = attn_scores.masked_fill(mask == 0, float('-inf'))
        attn_weights = torch.softmax(attn_scores, dim=1)
        context_vector = torch.sum(outputs * attn_weights, dim=1)

        # Classifier => [batch_size, num_classes]
        logits = self.classifier(context_vector)
        
        return logits

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# glove_file = "glove.840B.300d.txt"
# embedding_dim = 300
# embeddings_index = load_pretrained_embeddings(glove_file, embedding_dim)
vocab_size = tokenizer.vocab_size

# embedding_matrix_np = build_embedding_matrix(vocab_size, embeddings_index, embedding_dim)
# embedding_matrix = torch.FloatTensor(embedding_matrix_np)

model = LSTMModel(
    vocab_size=vocab_size,
    #embed_dim=embedding_dim,
    hidden_dim=128,
    num_classes=3,
    #pretrained_emb=embedding_matrix,
    #freeze_embedding=True
).to(device)

In [7]:
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)
criterion = nn.CrossEntropyLoss()

In [8]:
epochs = 100
model.train()

for epoch in range(epochs):
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)
        
        # Forward pass
        logits = model(input_ids, attention_mask=attention_mask)

        # Compute loss
        loss = criterion(logits, labels)
        
        # Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")

    # Validation
    if (epoch + 1) % 5 == 0:
        model.eval()  # Set model to evaluation mode
        val_loss = 0
        correct = 0
        total = 0

        with torch.no_grad():  # Disable gradient computation for validation
            for batch in val_loader:
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                labels = batch["label"].to(device)

                # Forward pass
                logits = model(input_ids, attention_mask=attention_mask)
                
                # Compute loss
                loss = criterion(logits, labels)
                val_loss += loss.item()

                # Compute accuracy
                predictions = torch.argmax(logits, dim=-1)
                correct += (predictions == labels).sum().item()
                total += labels.size(0)

        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = correct / total

        print(f"Validation - Epoch [{epoch+1}/{epochs}], Loss: {avg_val_loss:.4f}, Accuracy: {val_accuracy:.4f}")
        
        model.train()  # Switch back to training mode

Epoch [1/100], Loss: 1.0790
Epoch [2/100], Loss: 1.0352
Epoch [3/100], Loss: 0.9945
Epoch [4/100], Loss: 0.9730
Epoch [5/100], Loss: 0.9675
Validation - Epoch [5/100], Loss: 0.9177, Accuracy: 0.5600
Epoch [6/100], Loss: 0.9558
Epoch [7/100], Loss: 0.9398
Epoch [8/100], Loss: 0.9306
Epoch [9/100], Loss: 0.9183
Epoch [10/100], Loss: 0.9121
Validation - Epoch [10/100], Loss: 0.8992, Accuracy: 0.5855
Epoch [11/100], Loss: 0.8949
Epoch [12/100], Loss: 0.8856
Epoch [13/100], Loss: 0.8747
Epoch [14/100], Loss: 0.8687
Epoch [15/100], Loss: 0.8631
Validation - Epoch [15/100], Loss: 0.8487, Accuracy: 0.6091
Epoch [16/100], Loss: 0.8531
Epoch [17/100], Loss: 0.8497
Epoch [18/100], Loss: 0.8340
Epoch [19/100], Loss: 0.8310
Epoch [20/100], Loss: 0.8165
Validation - Epoch [20/100], Loss: 0.8752, Accuracy: 0.6145
Epoch [21/100], Loss: 0.8080
Epoch [22/100], Loss: 0.8013
Epoch [23/100], Loss: 0.7917
Epoch [24/100], Loss: 0.7812
Epoch [25/100], Loss: 0.7850
Validation - Epoch [25/100], Loss: 0.8280, Ac

In [9]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

y_true = []
y_pred = []
y_texts = []
y_dates = []

model.eval()
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)
        
        logits = model(input_ids, attention_mask=attention_mask)
        preds = torch.argmax(logits, dim=-1)
        
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(preds.cpu().numpy())
        y_texts.extend(batch["text"])
        y_dates.extend(batch["date"])

# Compute metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='macro')
recall = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Accuracy: 0.7221
Precision: 0.7293
Recall: 0.7089
F1 Score: 0.7139


In [10]:
torch.save(model.state_dict(), "lstm_from_scratch.pth")

In [11]:
result = pd.DataFrame({
    'date': y_dates,
    'text': y_texts,
    'true_label': y_true,
    'predicted_label': y_pred
})

In [12]:
result.to_csv('lstm_from_scratch_preds.csv', index=False)