In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score
import os
import numpy as np
import pandas as pd

# Define the RNN model with LSTM
class SentimentRNN(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim, pretrained_embeddings=None, dropout=0.5):
        super(SentimentRNN, self).__init__()
        
        # If pretrained embeddings are provided, use them
        if pretrained_embeddings is not None:
            self.embedding = nn.Embedding.from_pretrained(pretrained_embeddings, freeze=False)
        else:
            self.embedding = nn.Embedding(input_dim, embedding_dim)  # Embedding layer
        
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, dropout=dropout)  # LSTM layer
        self.fc = nn.Linear(hidden_dim, output_dim)  # Fully connected layer
        self.dropout = nn.Dropout(dropout)  # Dropout to avoid overfitting

    def forward(self, x):
        embedded = self.embedding(x)  # Apply embedding layer
        lstm_out, (h_n, c_n) = self.lstm(embedded)  # Apply LSTM layer
        out = self.dropout(h_n[-1])  # Get the last hidden state
        out = self.fc(out)  # Apply fully connected layer for classification
        return out

# Hyperparameters
MAX_LEN = 128
BATCH_SIZE = 64
EPOCHS = 10
LEARNING_RATE = 0.001
NUM_CLASSES = 3  # Negative, neutral, positive sentiment
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
DROPOUT = 0.5

# Download GloVe embeddings (100-dimensional embeddings)
def load_glove_embeddings(glove_file_path, embedding_dim=100):
    embeddings = {}
    with open(glove_file_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.strip().split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            embeddings[word] = vector
    return embeddings

# Load GloVe embeddings
glove_path = 'glove/glove.6B.100d.txt'  # Path to the GloVe file (make sure to download it)
glove_embeddings = load_glove_embeddings(glove_path)

# Tokenization and indexing
def tokenize_and_create_embeddings(texts, glove_embeddings, max_features=1000):
    vectorizer = CountVectorizer(max_features=max_features, stop_words='english')
    X = vectorizer.fit_transform(texts).toarray()
    
    word_to_index = {word: idx for idx, word in enumerate(vectorizer.get_feature_names_out())}
    
    # Initialize embedding matrix with zeros
    embedding_matrix = np.zeros((len(word_to_index), EMBEDDING_DIM))
    
    for word, idx in word_to_index.items():
        if word in glove_embeddings:
            embedding_matrix[idx] = glove_embeddings[word]
    
    return X, word_to_index, embedding_matrix

# Prepare data
import kagglehub

# Download latest version
path = kagglehub.dataset_download("kazanova/sentiment140")

print("Path to dataset files:", path)

file_path = os.path.join(path, "training.1600000.processed.noemoticon.csv")

columns = ['target', 'id', 'date', 'flag', 'user', 'text']

full_data = pd.read_csv(file_path, encoding='latin-1', names=columns)

data = full_data[['target', 'text']].copy()
data.rename(columns={"target": "label"}, inplace=True)

train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)

X_train, word_to_index, embedding_matrix = tokenize_and_create_embeddings(train_data['text'], glove_embeddings)
X_test, _, _ = tokenize_and_create_embeddings(test_data['text'], glove_embeddings)

# Convert to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.long)

y_train_tensor = torch.tensor(train_data['label'].apply(lambda x: {0: 0, 2: 1, 4: 2}[x]).values, dtype=torch.long)
y_test_tensor = torch.tensor(test_data['label'].apply(lambda x: {0: 0, 2: 1, 4: 2}[x]).values, dtype=torch.long)

# Dataset and DataLoader
class SentimentDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, item):
        text = self.texts[item]
        label = self.labels[item]
        return text, label

train_dataset = SentimentDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

test_dataset = SentimentDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Initialize the RNN model with pre-trained embeddings
input_dim = len(word_to_index)
embedding_matrix_tensor = torch.tensor(embedding_matrix, dtype=torch.float32)

model = SentimentRNN(input_dim=input_dim, embedding_dim=EMBEDDING_DIM, hidden_dim=HIDDEN_DIM, output_dim=NUM_CLASSES, pretrained_embeddings=embedding_matrix_tensor)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()

# Train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

for epoch in range(EPOCHS):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Compute loss
        loss = criterion(outputs, labels)
        running_loss += loss.item()

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Get predictions and compute accuracy
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == labels).sum().item()
        total_predictions += labels.size(0)

    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = correct_predictions / total_predictions
    print(f'Epoch {epoch+1}/{EPOCHS}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')

# Evaluate the model
model.eval()
y_pred = []
y_true = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())

accuracy = accuracy_score(y_true, y_pred)
print(f'Test Accuracy: {accuracy:.4f}')

Path to dataset files: /Users/marius/.cache/kagglehub/datasets/kazanova/sentiment140/versions/2




KeyboardInterrupt: 