In [75]:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder


from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score, classification_report


import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, TensorDataset
import torch.nn.functional as F

from tqdm import tqdm 


from transformers import BertTokenizer, BertModel



In [169]:
df = pd.read_csv('data/training.csv')


label_encoder = LabelEncoder()
df['label'] = label_encoder.fit_transform(df['label'])
df.head()
df = df[:1000] #for testing
# df.describe()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [184]:
def get_bert_embeddings_batch(texts, model, tokenizer, batch_size=32):
    """
    Get BERT embeddings for a batch of texts
    
    Args:
        texts (list): List of texts to embed
        model: Pretrained BERT model
        tokenizer: BERT tokenizer
        batch_size: Number of texts to process at once
    
    Returns:
        numpy.ndarray: Embeddings for all input texts
    """
    all_embeddings = []
    
    # Process in batches
    for i in tqdm(range(0, len(texts), batch_size)):
        batch_embeddings = []
        batch = texts[i:i+batch_size]
        
        # Tokenize the batch
        inputs = tokenizer(
            batch, 
            return_tensors="pt", 
            padding="max_length", 
            truncation=True, 
            max_length=64
        ).to(device)
        
        # Get embeddings
        with torch.no_grad():
            outputs = model(**inputs)
    
        embeddings = outputs.last_hidden_state
        # print(embeddings.shape)
        # embeddings = torch.mean(embeddings, dim=1) # this is to mean pool
        
        all_embeddings.append(embeddings.cpu().numpy())
        # all_embeddings.append(batch_embeddings)
    
    # Concatenate all batches
    return np.concatenate(all_embeddings, axis=0)
    # return all_embeddings

In [185]:
def batch_labels(y, batch_size):
    """
    Batches flat labels `y` into chunks of size `batch_size`.
    
    Args:
        y: Flat tensor/list of labels (shape [N_total])
        batch_size: Desired batch size
    
    Returns:
        List of label batches (each shape [batch_size]), except possibly last
    """
    if isinstance(y, list):
        y = torch.tensor(y)
    
    # Split into batches
    num_samples = len(y)
    num_batches = (num_samples + batch_size - 1) // batch_size  # Ceiling division
    
    batched_y = []
    for i in range(num_batches):
        start = i * batch_size
        end = start + batch_size
        batched_y.append(y[start:end])
    
    return batched_y

In [186]:
# Vectorizer = CountVectorizer(ngram_range=(1,2), stop_words='english', min_df=20, max_features = 500)

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model_embed = BertModel.from_pretrained('bert-base-uncased')
model_embed = model_embed.to(device)

sentences = df['sentence'].tolist()  # Convert to list
embeddings = get_bert_embeddings_batch(sentences, model_embed, tokenizer)

# Add embeddings back to DataFrame (each embedding is a 768-dim vector)
# embedding_columns = [f'bert_{i}' for i in range(768)]

# def preprocess_text(sentences, tokenizer, max_length=64):
#     encoded = tokenizer(
#         sentences.tolist(),
#         padding="max_length",
#         truncation=True,
#         max_length=max_length,
#         return_tensors="pt"
#     )
#     return encoded["input_ids"], encoded["attention_mask"]

# input_ids, attention_mask = preprocess_text(df["sentence"], tokenizer)


# x = Vectorizer.fit_transform(df['sentence']).toarray()


y = df['label'].values
# y_batched = batch_labels(y, 32)



100%|██████████| 32/32 [00:21<00:00,  1.49it/s]


In [177]:
x_train, x_test, y_train, y_test = train_test_split(embeddings, y, test_size=0.5, random_state=0) 

# x_train, x_test, y_train, y_test = train_test_split(
#     embeddings,  # List of tensors (each is a batch)
#     y_batched,          # Corresponding list of label tensors
#     test_size=0.5,
#     random_state=0,
#     shuffle=True  # Shuffle the batches (not sequences within batches)
# )


X_train_tensor = torch.tensor(x_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(x_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# model = LogisticRegression(max_iter=1000, random_state=0)
# model = MLPClassifier(hidden_layer_sizes=(100,), activation='relu', solver='adam', max_iter=500, random_state=42)
# model.fit(x_train, y_train)

In [178]:
class SentimentDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = SentimentDataset(X_train_tensor, y_train_tensor)
test_dataset = SentimentDataset(X_test_tensor, y_test_tensor)

# Create DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [87]:
class MLP(nn.Module):
    def __init__(self, input_dim):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)  
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 3)  # 3 classes (negative, neutral, positive)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)  # No activation for final layer (CrossEntropyLoss expects raw logits)
        return x

In [179]:
class SentimentLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, num_layers=2, output_dim=3):
        super(SentimentLSTM, self).__init__()
        """
        Args:
            input_dim: Dimension of input features (768 for BERT embeddings)
            hidden_dim: LSTM hidden state dimension
            num_layers: Number of LSTM layers
            output_dim: Number of classes (3 for sentiment)
        """
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,  # Set to True for bidirectional LSTM
            dropout=0.2 if num_layers > 1 else 0  # Dropout between layers
        )
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim, output_dim)
        
        # Optional: Initialize dropout
        self.dropout = nn.Dropout(0.3)
        
    def forward(self, x):
        # BERT embeddings come in as (batch_size, 768)
        # LSTM expects (batch_size, seq_len, input_size)
        # So we add a sequence length dimension of 1
        # x = x.unsqueeze(1)  # Now shape (batch_size, 1, 768) This is only for when the input is not of sequences, but represents an embedding of the whole input.
        
        # LSTM layer
        lstm_out, (hidden, cell) = self.lstm(x)
        
        # Get the final hidden state
        # For multi-layer LSTM, we take the last layer's hidden state
        final_hidden = hidden[-1]  # Shape (batch_size, hidden_dim)
        
        # Optional dropout
        final_hidden = self.dropout(final_hidden)
        
        # Fully connected layer
        out = self.fc(final_hidden)
        
        return out

# Initialize the model


In [180]:
input_dim = 768  # BERT base embedding dimension
model = SentimentLSTM(input_dim=input_dim, hidden_dim=128, num_layers=2)
# model = MLP(input_dim)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

SentimentLSTM(
  (lstm): LSTM(768, 128, num_layers=2, batch_first=True, dropout=0.2, bidirectional=True)
  (fc): Linear(in_features=128, out_features=3, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
)

In [181]:
class CustomLoss(nn.Module):
    def __init__(self):
        super(CustomLoss, self).__init__()

    def forward(self, y_pred, y_true):
        """
        Custom loss function for sentiment analysis.

        Args:
            y_pred: Predicted logits (before softmax), shape: (batch_size, num_classes)
            y_true: True labels (LongTensor), shape: (batch_size,)

        Returns:
            loss (Tensor): Loss value to minimize
        """
        # Convert logits to probabilities
        y_pred_prob = torch.softmax(y_pred, dim=1)  # Now differentiable

        # Compute weighted sum of class indices for expected class prediction
        y_pred_expected = torch.sum(y_pred_prob * torch.arange(y_pred.shape[1], device=y_pred.device), dim=1)

        # Ensure y_true is float for computation
        y_true = y_true.float()

        # Compute Mean Absolute Error (MAE) on probabilities (differentiable)
        mae = torch.mean(torch.abs(y_pred_expected - y_true))

        # Compute the loss: 1 - custom score
        loss = 1 - 0.5 * (2 - mae)

        return loss

In [182]:
criterion = CustomLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch) 
        loss.backward() 
        optimizer.step()

        total_loss += loss.item()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}')

Epoch [1/5], Loss: 0.2600
Epoch [2/5], Loss: 0.2533
Epoch [3/5], Loss: 0.2504
Epoch [4/5], Loss: 0.2323
Epoch [5/5], Loss: 0.1927


In [None]:
# model.score(x_test,y_test)

0.6507669102235107

In [183]:
model.eval()
all_preds, all_labels = [], []

total_loss = 0
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch) 
        _, predicted = torch.max(outputs, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())
        total_loss += loss.item()

accuracy = accuracy_score(all_labels, all_preds)
print(f'Accuracy: {accuracy:.4f}')
print(classification_report(all_labels, all_preds, target_names=label_encoder.classes_))

print(total_loss/len(test_loader))

Accuracy: 0.5460
              precision    recall  f1-score   support

    negative       0.00      0.00      0.00       131
     neutral       0.51      0.88      0.64       219
    positive       0.66      0.54      0.59       150

    accuracy                           0.55       500
   macro avg       0.39      0.47      0.41       500
weighted avg       0.42      0.55      0.46       500

0.2503979839384556


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [85]:
torch.save(model.state_dict(), "sentiment_model.pth") #save the model

In [86]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


test_df = pd.read_csv("data/test.csv")

sentences_inference = test_df['sentence'].tolist()
x_inference = get_bert_embeddings_batch(sentences_inference, model_embed, tokenizer)

# x_inference = Vectorizer.fit_transform(test_df['sentence']).toarray()
X_inference_tensor = torch.tensor(x_inference, dtype=torch.float32).to(device)
test_dataset = TensorDataset(X_inference_tensor)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

input_dim = x_inference.shape[1]
model = MLP(input_dim)  # Ensure this matches your trained model
model.load_state_dict(torch.load("sentiment_model.pth", map_location=device))
model.to(device)
model.eval()  # Set to evaluation mode

predictions = []
with torch.no_grad():
    for batch in test_loader:
        X_batch = batch[0].to(device)  # Extract the input batch
        outputs = model(X_batch)  # Get logits
        _, predicted_labels = torch.max(outputs, dim=1)  # Get predicted class indices

        predictions.extend(predicted_labels.cpu().numpy())  # Convert to list

label_map = {0: "negative", 1: "neutral", 2: "positive"}

pred_labels = [label_map[p] for p in predictions]

output_df = pd.DataFrame({"id": test_df["id"], "label": pred_labels})
output_df.to_csv("predictions.csv", index=False)




100%|██████████| 374/374 [00:28<00:00, 13.18it/s]
  model.load_state_dict(torch.load("sentiment_model.pth", map_location=device))
