In [None]:
import datetime
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
from transformers import BertTokenizer, BertModel
import spacy
import re
from sklearn.metrics import precision_recall_fscore_support

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd drive/MyDrive/

In [None]:
cd CodaBench_Sem_Eval

In [None]:
 cd val

In [None]:
# GPU/CPU Device Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load Data
train = pd.read_csv('../public_data_test/track_a/train/eng.csv')
val = pd.read_csv('../public_data_test/track_a/dev/eng.csv')
test = pd.read_csv('../public_data_test/track_a/test/eng.csv')
emotions = ["anger", "fear", "joy", "sadness", "surprise"]

# Initialize BERT Tokenizer & Model
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased').to(device)
nlp = spacy.load("en_core_web_sm")

# Define POS Embeddings
POS_VOCAB = {pos: idx for idx, pos in enumerate(nlp.pipe_labels["tagger"])}
pos_embedding_layer = nn.Embedding(len(POS_VOCAB), 16).to(device)

In [None]:
# Adversarial Training Function (FGSM)
def fgsm_attack(inputs, epsilon, gradients):
    sign_grad = gradients.sign()
    perturbed_inputs = inputs + epsilon * sign_grad
    return torch.clamp(perturbed_inputs, 0, 1)

# Define Model (BERT + POS Embeddings)
class EmotionClassifier(nn.Module):
    def __init__(self, num_classes=5):
        super(EmotionClassifier, self).__init__()
        self.bert = bert_model
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(768 + 16, num_classes)  # BERT + POS Embedding
        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_ids, attention_mask, pos_tags):
        bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state[:, 0, :]  # Shape: (batch_size, 768)
        pos_embeds = pos_embedding_layer(pos_tags).mean(dim=1)  # Aggregate across sequence length, Shape: (batch_size, 16)

        combined = torch.cat((bert_output, pos_embeds), dim=1)  # Now both tensors are (batch_size, 768 + 16)
        output = self.fc(self.dropout(combined))
        return self.softmax(output)

# Initialize Model
model = EmotionClassifier().to(device)

# Optimizer & Learning Rate Scheduling
optimizer = optim.AdamW(model.parameters(), lr=2e-5, weight_decay=1e-2)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)

# Early Stopping Criteria
class EarlyStopping:
    def __init__(self, patience=3):
        self.patience = patience
        self.counter = 0
        self.best_loss = float('inf')

    def check(self, val_loss):
        if val_loss < self.best_loss:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
        return self.counter >= self.patience

early_stopping = EarlyStopping()

# Dynamic Thresholding Function
def dynamic_thresholding(preds, labels, thresholds):
    final_preds = torch.zeros_like(preds)
    for i, threshold in enumerate(thresholds):
        final_preds[:, i] = (preds[:, i] > threshold).float()
    return final_preds

# Tokenization & POS Tagging
def preprocess(texts):
    input_ids, attention_masks, pos_tags = [], [], []

    for text in texts:
        encoded = tokenizer(text, padding='max_length', truncation=True, return_tensors='pt', max_length=32)
        pos_sequence = [POS_VOCAB.get(token.pos_, 0) for token in nlp(text)]

        # Ensure POS sequence matches max_length (BERT tokenization)
        if len(pos_sequence) < 32:
            pos_sequence += [0] * (32 - len(pos_sequence))  # Pad with zeros
        else:
            pos_sequence = pos_sequence[:32]  # Truncate if too long

        input_ids.append(encoded['input_ids'])
        attention_masks.append(encoded['attention_mask'])
        pos_tags.append(torch.tensor(pos_sequence, dtype=torch.long))  # Ensure correct dtype

    return torch.cat(input_ids), torch.cat(attention_masks), torch.stack(pos_tags)


# Prepare Data
train_texts, val_texts = train['text'].tolist(), val['text'].tolist()
train_labels, val_labels = torch.tensor(train[emotions].values), torch.tensor(val[emotions].values)
train_inputs, train_masks, train_pos = preprocess(train_texts)
val_inputs, val_masks, val_pos = preprocess(val_texts)

# Dataloaders
train_data = TensorDataset(train_inputs, train_masks, train_pos, train_labels)
val_data = TensorDataset(val_inputs, val_masks, val_pos, val_labels)
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
val_loader = DataLoader(val_data, batch_size=16, shuffle=False)

# Training Loop
best_thresholds = [0.5] * len(emotions)
for epoch in range(10):
    model.train()
    for batch in train_loader:
        input_ids, attention_mask, pos_tags, labels = [x.to(device) for x in batch]
        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask, pos_tags)
        loss = nn.BCELoss()(outputs, labels.float())
        loss.backward()
        optimizer.step()

    # Validation Step
    model.eval()
    val_loss, all_preds, all_labels = 0, [], []
    with torch.no_grad():
        for batch in val_loader:
            input_ids, attention_mask, pos_tags, labels = [x.to(device) for x in batch]
            outputs = model(input_ids, attention_mask, pos_tags)
            loss = nn.BCELoss()(outputs, labels.float())
            val_loss += loss.item()
            all_preds.append(outputs.cpu())
            all_labels.append(labels.cpu())

    scheduler.step(val_loss)

    # Dynamic Thresholding Calculation
    all_preds, all_labels = torch.cat(all_preds), torch.cat(all_labels)
    best_thresholds = [0.5] * len(emotions)  # Placeholder for future tuning
    final_preds = dynamic_thresholding(all_preds, all_labels, best_thresholds)

    # Compute Metrics
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, final_preds, average='macro')
    print(f"Epoch {epoch+1}: Val Loss {val_loss:.4f}, F1 Score {f1:.4f}")

    if early_stopping.check(val_loss):
        print("Early stopping triggered.")
        break

In [None]:
from sklearn.metrics import classification_report

In [None]:
y_pred = final_preds.numpy()
y_test = all_labels.numpy()

# Generate and print the classification report
print("\nFinal Validation Performance with Best Thresholds:")
print(classification_report(
    y_test,
    y_pred,
    target_names=emotions
))