In [1]:
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from transformers import RobertaModel
from transformers import RobertaTokenizer
from transformers import AdamW
from sklearn.metrics import f1_score

In [2]:
# Load the CSV file
def load_data(file_path):
    df = pd.read_csv(file_path)
    texts = df["text"].tolist()
    labels = df[["anger", "fear", "joy", "sadness", "surprise"]].values
    return texts, labels

# Load train and test data
train_file = "Emotion_data/public_data_test/track_a/train/eng.csv"
test_file = "Emotion_data/public_data_test/track_a/dev/eng.csv"
human_pred = "Emotion_data/eng_test_50 labels.csv"


train_texts, train_labels = load_data(train_file)
test_texts, test_labels = load_data(test_file)
h_texts, h_labels = load_data(human_pred)

# Initialize tokenizer
tokenizer = RobertaTokenizer.from_pretrained("roberta-large")

# Find the max token length dynamically
def find_max_length(texts, tokenizer):
    tokenized_texts = [tokenizer.tokenize(text) for text in texts]
    return max(len(tokens) for tokens in tokenized_texts)

max_length = find_max_length(train_texts + test_texts, tokenizer)  # Find max length from both train & test

print(f"Dynamic max length: {max_length}")

# Tokenize with dynamic max_length
def tokenize_texts(texts, tokenizer, max_length):
    return tokenizer(
        texts,
        max_length=max_length,
        truncation=True,
        padding="max_length",
        return_tensors="pt",
    )

train_encodings = tokenize_texts(train_texts, tokenizer, max_length)
test_encodings = tokenize_texts(test_texts, tokenizer, max_length)
h_encodings = tokenize_texts(h_texts, tokenizer, max_length)


Dynamic max length: 107


In [3]:
class EmotionDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return {
            "input_ids": self.encodings["input_ids"][idx],
            "attention_mask": self.encodings["attention_mask"][idx],
            "labels": torch.tensor(self.labels[idx], dtype=torch.float),
        }

# Create datasets
train_dataset = EmotionDataset(train_encodings, train_labels)
test_dataset = EmotionDataset(test_encodings, test_labels)
h_dataset = EmotionDataset(h_encodings, h_labels)


In [4]:
class RobertaClass(torch.nn.Module):
    def __init__(self, num_labels=5):
        super(RobertaClass, self).__init__()
        self.roberta = RobertaModel.from_pretrained("roberta-large")
        self.dropout = torch.nn.Dropout(0.5)

         # Additional fully connected layers
        self.fc1 = torch.nn.Linear(1024, 512)  
        self.fc2 = torch.nn.Linear(512, num_labels)
        #self.classifier = torch.nn.Linear(1024, num_labels)  # 1024 is the output dimension of Roberta

    def forward(self, input_ids, attention_mask):
        # Get the output from Roberta
        output = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        
        # We only care about the [CLS] token output for classification (i.e., first token in the sequence)
        cls_output = output[0][:, 0, :]  # (batch_size, hidden_size) - [CLS] token representation
        
        # Apply dropout for regularization
        cls_output = self.dropout(cls_output)
        
        # Pass through the classifier to get the logits (raw output scores for each class)
        #logits = self.classifier(cls_output)

        # Pass through the first fully connected layer
        x = torch.nn.ReLU()(self.fc1(cls_output))
        
        # Apply dropout after the fully connected layer
        x = self.dropout(x)
        
        # Final output layer for classification
        logits = self.fc2(x)
        
        return logits


In [None]:
#with scheduler and early stopping
from transformers import get_linear_schedule_with_warmup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

epochs = 20

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)

# Model, optimizer, and criterion initialization
model = RobertaClass(num_labels=5)
model.to(device)

optimizer = AdamW(model.parameters(), lr=1e-5, eps=1e-8, weight_decay=1e-5)  # Adjusted learning rate (1e-5 decay)
criterion = torch.nn.BCEWithLogitsLoss()

# Scheduler for learning rate decay
#scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1) #step_size =2
warmup_ratio = 0.1  # 10% of total training as warmup
total_steps = len(train_loader) * epochs
warmup_steps = int(total_steps * warmup_ratio)

scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=warmup_steps,
    num_training_steps=total_steps
)

# Data loaders (batch size increased)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# Early stopping variables
best_loss = float('inf')
epochs = 20


for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(input_ids, attention_mask)
        
        # Compute loss
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()  # Update the learning rate based on the scheduler

        total_loss += loss.item()


    # Print training loss
    print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(train_loader)}")

    # Save the best model based on loss
    if total_loss < best_loss:
        best_loss = total_loss
        torch.save(model.state_dict(), "best_model.pt")

# Optionally load the best model for evaluation
model.load_state_dict(torch.load("best_model.pt"))


Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch 1/20, Loss: 0.6363277403433198
Epoch 2/20, Loss: 0.5387684138696318
Epoch 3/20, Loss: 0.3958054899394168
Epoch 4/20, Loss: 0.29588748979775203
Epoch 5/20, Loss: 0.23549828310147186
Epoch 6/20, Loss: 0.17889912131463173
Epoch 7/20, Loss: 0.1383543774334854
Epoch 8/20, Loss: 0.10760627398428889
Epoch 9/20, Loss: 0.0775480948690045
Epoch 10/20, Loss: 0.06120404525191626
Epoch 11/20, Loss: 0.0494941747693528
Epoch 12/20, Loss: 0.038422551237563524
Epoch 13/20, Loss: 0.03258763900769584
Epoch 14/20, Loss: 0.027689932121469966
Epoch 15/20, Loss: 0.02182590588994561
Epoch 16/20, Loss: 0.019883289672111327
Epoch 17/20, Loss: 0.01701484100807006
Epoch 18/20, Loss: 0.0159812408130659
Epoch 19/20, Loss: 0.014804120380781332
Epoch 20/20, Loss: 0.01409222355456969


  model.load_state_dict(torch.load("best_model.pt"))


<All keys matched successfully>

In [24]:
# Evaluation
model.eval()
all_preds = []
all_labels = []
test_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        
        # Forward pass
        outputs = model(input_ids, attention_mask)
        
        # Apply sigmoid and threshold to get binary predictions
        #preds = (torch.sigmoid(outputs) > 0.5).float()  # Threshold at 0.5 for multi-label
        thresholds = torch.tensor([0.45, 0.5, 0.5, 0.5, 0.5]).to(device)  # Lower threshold for Label 0
        preds = (torch.sigmoid(outputs) > thresholds).float()

        
        # Collect predictions and labels for F1 score
        all_preds.append(preds.cpu().numpy())
        all_labels.append(labels.cpu().numpy())

# Flatten lists and calculate F1 score
all_preds = np.vstack(all_preds)  # Shape: (num_samples, num_labels)
all_labels = np.vstack(all_labels)  # Shape: (num_samples, num_labels)

# Calculate F1 score for each label individually
f1_per_label = f1_score(all_labels, all_preds, average=None)  # F1 score for each label
print("F1 Score per label:")
for idx, score in enumerate(f1_per_label):
    print(f"Label {idx} F1 Score: {score:.4f}")


F1 Score per label:
Label 0 F1 Score: 0.7742
Label 1 F1 Score: 0.8000
Label 2 F1 Score: 0.7407
Label 3 F1 Score: 0.7812
Label 4 F1 Score: 0.7213


In [25]:
# Evaluation with human predictions
model.eval()
all_preds = []
all_labels = []
test_loss = 0.0
correct = 0
total = 0

h_loader = DataLoader(h_dataset, batch_size=8, shuffle=False)

with torch.no_grad():
    for batch in h_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        
        # Forward pass
        outputs = model(input_ids, attention_mask)
        
        # Apply sigmoid and threshold to get binary predictions
        preds = (torch.sigmoid(outputs) > 0.5).float()  # Threshold at 0.5 for multi-label
        
        # Collect predictions and labels for F1 score
        all_preds.append(preds.cpu().numpy())
        all_labels.append(labels.cpu().numpy())

# Flatten lists and calculate F1 score
all_preds = np.vstack(all_preds)  # Shape: (num_samples, num_labels)
all_labels = np.vstack(all_labels)  # Shape: (num_samples, num_labels)

# Calculate F1 score for each label individually
f1_per_label = f1_score(all_labels, all_preds, average=None)  # F1 score for each label
print("F1 Score per label:")
for idx, score in enumerate(f1_per_label):
    print(f"Label {idx} F1 Score: {score:.4f}")


F1 Score per label:
Label 0 F1 Score: 0.5882
Label 1 F1 Score: 0.6122
Label 2 F1 Score: 0.7619
Label 3 F1 Score: 0.5714
Label 4 F1 Score: 0.5000
