In [None]:
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from transformers import BertTokenizer, BertModel, BartTokenizer, BartModel, RobertaTokenizer, RobertaModel
import torch.optim as optim
import numpy as np
from torch.utils.data import Dataset, DataLoader

In [21]:
def get_token_index(tokens, start):
    char_index = 0
    
    for i, token in enumerate(tokens):
        token_start = char_index
        token_end = char_index + len(token)
        
        if token_start <= start and start <= token_end:
            return i
        
        char_index = token_end + 1
    
    return -1

In [22]:
def preprocessing(input_file, output_file):  
    with open(input_file, 'r', encoding='utf-8') as file:
        data = json.load(file)

    preprocessed_data = []

    for item in data:
        sentence = item['sentence']
        tokens = sentence.split()
        aspect_terms = item['aspect_terms']

        for aspect_term in aspect_terms:
            start = int(aspect_term['from'])
            end = int(aspect_term['to'])
            index = get_token_index(tokens, start)

            preprocessed_item = {
                'tokens': tokens,
                'polarity': aspect_term['polarity'],
                'aspect_term': aspect_term['term'].split(),
                'index': index
            }
            preprocessed_data.append(preprocessed_item)

    with open(output_file, 'w', encoding='utf-8') as out_file:
        json.dump(preprocessed_data, out_file, indent=4, ensure_ascii=False)

In [23]:
preprocessing('train.json', 'train_task_2.json')
preprocessing('val.json', 'val_task_2.json')

In [24]:
class ABSADataset(Dataset):
    def __init__(self, data, label_map, bert_model="bert-base-uncased", max_len=35):
        """
        Args:
            data (list): List of dicts with 'tokens', 'aspect', and 'polarity'.
            label_map (dict): Mapping of sentiment labels to integers.
            bert_model (str): Pretrained BERT model name.
            max_len (int): Max sequence length for tokenization.
        """
        self.data = data
        self.label_map = label_map
        self.tokenizer = BertTokenizer.from_pretrained(bert_model)
        self.bert_model = BertModel.from_pretrained(bert_model)
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        sentence = " ".join(item["tokens"])  # Convert tokens to a full sentence
        aspect = " ".join(item["aspect_term"])
        polarity = self.label_map[item["polarity"]]

        # Tokenize sentence
        sentence_inputs = self.tokenizer(sentence, padding="max_length", truncation=True,
                                         max_length=self.max_len, return_tensors="pt")
        
        # Tokenize aspect
        aspect_inputs = self.tokenizer(aspect, padding="max_length", truncation=True,
                                       max_length=self.max_len, return_tensors="pt")

        # Compute BERT embeddings (disable gradients for efficiency)
        with torch.no_grad():
            sentence_embedding = self.bert_model(**sentence_inputs).last_hidden_state.squeeze(0)  # [seq_len, emb_dim]
            aspect_embedding = self.bert_model(**aspect_inputs).last_hidden_state[:, 0, :]  # CLS token

        return sentence_embedding, aspect_embedding, torch.tensor(polarity, dtype=torch.long)

In [25]:
class ABSA(nn.Module):
    def __init__(self, embedding_dim=768, hidden_size=128, aspect_dim=768, num_lstm_layers=1, dropout = 0.5):
        super(ABSA, self).__init__()
        self.hidden_size = hidden_size

        # GRU (Processes BERT word embeddings)
        self.gru = nn.GRU(embedding_dim, hidden_size, num_layers=num_lstm_layers, batch_first=True)
        # Attention mechanism (Concatenating hidden states with aspect embeddings)
        self.attention = nn.Linear(hidden_size + aspect_dim, 1, bias=False)  # GRU outputs 2*hidden_size

        # Transformation Layer Before Softmax
        self.fc_hidden = nn.Linear(hidden_size, hidden_size)  
        self.activation = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.norm = nn.LayerNorm(hidden_size)

        # Final output layer (4 sentiment classes: pos, neg, neutral, conflict)
        self.fc_output = nn.Linear(hidden_size, 4)

    def forward(self, word_embeddings, aspect_embeddings):
        """
        sentences: [batch_size, seq_len]
        aspects: [batch_size] (aspect terms as indices)
        """
        batch_size, seq_len, _ = word_embeddings.shape  

        # GRU Processing
        lstm_out, _ = self.gru(word_embeddings)  # Output shape: [batch_size, seq_len, hidden_size * 2]
        lstm_out = self.norm(lstm_out)
        # Concatenate Aspect Embedding with GRU Hidden States
        aspect_repeated = aspect_embeddings.expand(-1, seq_len, -1)
        
        # Concatenate GRU output with aspect embeddings
        att_input = torch.cat([lstm_out, aspect_repeated], dim=-1)  # [batch_size, seq_len, hidden_size + aspect_dim]
        
        # Compute Aspect-aware Attention Scores
        att_weights = torch.tanh(self.attention(att_input))  # [batch_size, seq_len, 1]
        att_weights = torch.softmax(att_weights, dim=1)  # Normalize across sequence
        
        # Compute Weighted Sum of GRU Hidden States
        weighted_sum = torch.sum(lstm_out * att_weights, dim=1)  # [batch_size, hidden_size]
        
        # Fully Connected Transformation Layer
        transformed_features = self.fc_hidden(weighted_sum)  # [batch_size, hidden_size // 2]
        transformed_features = self.activation(transformed_features)  # Non-linearity
        transformed_features = self.dropout(transformed_features)  
        
        # Sentiment Prediction (Softmax over Output)
        output = self.fc_output(transformed_features)  # [batch_size, output_dim]
        
        return output

In [26]:
def train(train_dataloader, val_dataloader, model, device, num_epochs, lr):
    model.to(device)
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-3)
    train_loss_list = []
    val_loss_list = []
    best_val_loss = float("inf")
    for epoch in range(num_epochs):
        model.train()
        train_loss, train_correct, train_total = 0, 0, 0
        for batch in train_dataloader:
            tokens, aspect, labels = batch  # Extract batch components
            tokens, aspect, labels = tokens.to(device), aspect.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(tokens, aspect)  # Forward pass
            loss = criterion(outputs, labels)  # Compute loss

            loss.backward()  # Backpropagation
            optimizer.step()  # Update weights

            # Track metrics
            train_loss += loss.item()
            _, predicted = torch.max(outputs, dim=1)  # Get predicted class
            train_correct += (predicted == labels).sum().item()
            train_total += labels.size(0)

        train_acc = train_correct / train_total
        avg_train_loss = train_loss / len(train_dataloader)

        model.eval()
        val_loss, val_correct, val_total = 0, 0, 0
        with torch.no_grad():
            for batch in val_dataloader:
                tokens, aspect, labels = batch
                tokens, aspect, labels = tokens.to(device), aspect.to(device), labels.to(device)

                outputs = model(tokens, aspect)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = torch.max(outputs, dim=1)
                val_correct += (predicted == labels).sum().item()
                val_total += labels.size(0)

        val_acc = val_correct / val_total
        avg_val_loss = val_loss / len(val_dataloader)

        train_loss_list.append(avg_train_loss)
        val_loss_list.append(avg_val_loss)
        
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'ABSA.pth')

        print(f"Epoch [{epoch+1}/{num_epochs}] - "
        f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.4f} | "
        f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.4f}")
        
    return train_loss_list, val_loss_list

In [27]:
with open('train_task_2.json', 'r', encoding='utf-8') as file:
    train_data = json.load(file)

polarities = []
token_lengths = []
aspect_lengths = []
for item in train_data:
    if(item['polarity'] not in polarities):
        polarities.append(item['polarity'])
    token_lengths.append(len(item['tokens']))
    aspect_lengths.append(len(item['aspect_term']))
    
print("polarities:", polarities)
print("Max Sentence Length:", np.percentile(token_lengths, 95))
print("Max Aspect Length:", np.percentile(aspect_lengths, 95))


with open('val_task_2.json', 'r', encoding='utf-8') as file:
    val_data = json.load(file)


polarities: ['negative', 'positive', 'neutral', 'conflict']
Max Sentence Length: 35.0
Max Aspect Length: 3.0


In [28]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ABSA()
label_map = {"positive": 0, "negative": 1, "neutral": 2, "conflict": 3}
num_epochs = 10
learning_rate = 1e-3
train_dataset = ABSADataset(train_data, label_map)
val_dataset = ABSADataset(val_data, label_map)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=True)

In [23]:
train_loss_list, val_loss_list = train(train_dataloader, val_dataloader, model, device, num_epochs, learning_rate)

Epoch [1/10] - Train Loss: 0.9647, Train Acc: 0.6673 | Val Loss: 0.9572, Val Acc: 0.6765
Epoch [2/10] - Train Loss: 0.8727, Train Acc: 0.7264 | Val Loss: 0.9555, Val Acc: 0.6550
Epoch [3/10] - Train Loss: 0.8241, Train Acc: 0.7491 | Val Loss: 0.9375, Val Acc: 0.6765
Epoch [4/10] - Train Loss: 0.7993, Train Acc: 0.7646 | Val Loss: 0.9748, Val Acc: 0.6523
Epoch [5/10] - Train Loss: 0.7880, Train Acc: 0.7649 | Val Loss: 0.9812, Val Acc: 0.6712
Epoch [6/10] - Train Loss: 0.7615, Train Acc: 0.7862 | Val Loss: 1.0056, Val Acc: 0.6631
Epoch [7/10] - Train Loss: 0.7433, Train Acc: 0.7855 | Val Loss: 0.9753, Val Acc: 0.6712
Epoch [8/10] - Train Loss: 0.7219, Train Acc: 0.8021 | Val Loss: 1.0137, Val Acc: 0.6442
Epoch [9/10] - Train Loss: 0.7139, Train Acc: 0.8075 | Val Loss: 0.9880, Val Acc: 0.6765
Epoch [10/10] - Train Loss: 0.7054, Train Acc: 0.8159 | Val Loss: 1.0412, Val Acc: 0.6658


# BERT Finetuning

In [82]:
class BERTDataset(torch.utils.data.Dataset):
    def __init__(self, data, label_map, max_length=128):
        """
        Args:
            data (list): List of dicts with 'tokens', 'aspect', and 'polarity'.
            label_map (dict): Mapping of sentiment labels to integers.
            tokenizer: BERT tokenizer
            max_length: Maximum tokenized sequence length
        """
        self.data = data
        self.label_map = label_map
        self.tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        sentence = " ".join(sample["tokens"])  # Convert token list to string
        aspect = " ".join(sample["aspect_term"])  # Convert aspect list to string
        polarity = self.label_map[sample["polarity"]]  # Convert polarity to label index

        encoded = self.tokenizer(
            sentence, aspect,
            padding="max_length", truncation=True, max_length=self.max_length, return_tensors="pt"
        )

        input_ids = encoded["input_ids"].squeeze(0)  # Shape: (max_length,)
        attention_mask = encoded["attention_mask"].squeeze(0)  # Shape: (max_length,)
        label = torch.tensor(polarity, dtype=torch.long)  # Scalar tensor

        return {"input_ids": input_ids, "attention_mask": attention_mask, "label": label}

train_dataset = BERTDataset(train_data, label_map)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataset = BERTDataset(val_data, label_map)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=True)

In [83]:
class BERTfinetuned(nn.Module):
    def __init__(self, num_labels=4):
        super(BERTfinetuned, self).__init__()
        self.bert = BertModel.from_pretrained("bert-base-uncased")
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]  # CLS token representation
        cls_output = self.dropout(cls_output)
        logits = self.classifier(cls_output)
        return logits

In [84]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BERTfinetuned(num_labels=4).to(device)

# Optimizer and Loss
optimizer = optim.Adam(model.parameters(), lr=2e-5)
criterion = nn.CrossEntropyLoss()

train_loss_list = []
val_loss_list = []
best_val_loss = float("inf")

# Training loop
num_epochs = 3
for epoch in range(num_epochs):
    model.train()
    train_loss, train_correct, train_total = 0, 0, 0
    for batch in train_dataloader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)

        logits = model(input_ids, attention_mask)
        loss = criterion(logits, labels)

        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = torch.max(logits, dim=1)  # Get predicted class
        train_correct += (predicted == labels).sum().item()
        train_total += labels.size(0)

    train_acc = train_correct / train_total
    avg_train_loss = train_loss / len(train_dataloader)

    model.eval()
    val_loss, val_correct, val_total = 0, 0, 0
    with torch.no_grad():
        for batch in val_dataloader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)

            logits = model(input_ids, attention_mask)
            loss = criterion(logits, labels)

            val_loss += loss.item()
            _, predicted = torch.max(logits, dim=1)  # Get predicted class
            val_correct += (predicted == labels).sum().item()
            val_total += labels.size(0)
        
    val_acc = val_correct / val_total
    avg_val_loss = val_loss / len(val_dataloader)

    train_loss_list.append(avg_train_loss)
    val_loss_list.append(avg_val_loss)
        
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), 'finetunedBERT.pth')

    print(f"Epoch [{epoch+1}/{num_epochs}] - "
    f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.4f} | "
    f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.4f}")

Epoch [1/3] - Train Loss: 0.8751, Train Acc: 0.6440 | Val Loss: 0.7921, Val Acc: 0.6927
Epoch [2/3] - Train Loss: 0.5913, Train Acc: 0.7734 | Val Loss: 0.7573, Val Acc: 0.7224
Epoch [3/3] - Train Loss: 0.4228, Train Acc: 0.8406 | Val Loss: 0.7185, Val Acc: 0.7332


# Finetuning Pipeline

In [39]:
class SentimentDataset(torch.utils.data.Dataset):
    def __init__(self, data, label_map, tokenizer, max_length=128):
        """
        Args:
            data (list): List of dicts with 'tokens', 'aspect', and 'polarity'.
            label_map (dict): Mapping of sentiment labels to integers.
            tokenizer: BERT tokenizer
            max_length: Maximum tokenized sequence length
        """
        self.data = data
        self.label_map = label_map
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        sentence = " ".join(sample["tokens"])  # Convert token list to string
        aspect = " ".join(sample["aspect_term"])  # Convert aspect list to string
        polarity = self.label_map[sample["polarity"]]  # Convert polarity to label index

        encoded = self.tokenizer(
            sentence, aspect,
            padding="max_length", truncation=True, max_length=self.max_length, return_tensors="pt"
        )

        input_ids = encoded["input_ids"].squeeze(0)  # Shape: (max_length,)
        attention_mask = encoded["attention_mask"].squeeze(0)  # Shape: (max_length,)
        label = torch.tensor(polarity, dtype=torch.long)  # Scalar tensor

        return {"input_ids": input_ids, "attention_mask": attention_mask, "label": label}

In [40]:
class BERTfinetuned(nn.Module):
    def __init__(self, num_labels=4):
        super(BERTfinetuned, self).__init__()
        self.bert = BertModel.from_pretrained("bert-base-uncased")
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]  # CLS token representation
        cls_output = self.dropout(cls_output)
        logits = self.classifier(cls_output)
        return logits

In [None]:
class BARTClassifier(nn.Module):
    def __init__(self, num_labels, model_name='facebook/bart-base'):
        super(BARTClassifier, self).__init__()
        self.bart = BartModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(self.bart.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.bart(input_ids=input_ids, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]  # CLS token representation
        cls_output = self.dropout(cls_output)
        logits = self.fc(cls_output)
        return logits

In [None]:
class RoBERTaClassifier(nn.Module):
    def __init__(self, num_labels, model_name='roberta-base'):
        super(RoBERTaClassifier, self).__init__()
        self.roberta = RobertaModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(self.roberta.config.hidden_size, num_labels)
    
    def forward(self, input_ids, attention_mask):
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]  # CLS token representation
        cls_output = self.dropout(cls_output)
        logits = self.fc(cls_output)
        return logits


In [42]:
def finetune(model, tokenizer, path):
    train_dataset = SentimentDataset(train_data, label_map, tokenizer)
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_dataset = SentimentDataset(val_data, label_map, tokenizer)
    val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=True)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Optimizer and Loss
    optimizer = optim.Adam(model.parameters(), lr=2e-5)
    criterion = nn.CrossEntropyLoss()

    train_loss_list = []
    val_loss_list = []
    best_val_loss = float("inf")

    # Training loop
    num_epochs = 3
    for epoch in range(num_epochs):
        model.train()
        train_loss, train_correct, train_total = 0, 0, 0
        for batch in train_dataloader:
            optimizer.zero_grad()
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)

            logits = model(input_ids, attention_mask)
            loss = criterion(logits, labels)

            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = torch.max(logits, dim=1)  # Get predicted class
            train_correct += (predicted == labels).sum().item()
            train_total += labels.size(0)

        train_acc = train_correct / train_total
        avg_train_loss = train_loss / len(train_dataloader)

        model.eval()
        val_loss, val_correct, val_total = 0, 0, 0
        with torch.no_grad():
            for batch in val_dataloader:
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                labels = batch["label"].to(device)

                logits = model(input_ids, attention_mask)
                loss = criterion(logits, labels)

                val_loss += loss.item()
                _, predicted = torch.max(logits, dim=1)  # Get predicted class
                val_correct += (predicted == labels).sum().item()
                val_total += labels.size(0)
            
        val_acc = val_correct / val_total
        avg_val_loss = val_loss / len(val_dataloader)

        train_loss_list.append(avg_train_loss)
        val_loss_list.append(avg_val_loss)
            
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), path)

        print(f"Epoch [{epoch+1}/{num_epochs}] - "
        f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.4f} | "
        f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.4f}")

    return train_loss_list, val_loss_list

In [None]:
bert_model = BERTfinetuned(num_labels=4).to(device)
bert_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
bert_train_loss_list, bert_val_loss_list = finetune(bert_model, bert_tokenizer, "checkBERT.pth")

In [46]:
bart_model = BARTClassifier(num_labels=4).to(device)
bart_tokenizer = BartTokenizer.from_pretrained('facebook/bart-base')
bart_train_loss_list, bart_val_loss_list = finetune(bart_model, bart_tokenizer, "finetunedBART.pth")

config.json:   0%|          | 0.00/1.72k [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to see activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


model.safetensors:   0%|          | 0.00/558M [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Epoch [1/3] - Train Loss: 1.1468, Train Acc: 0.5410 | Val Loss: 1.0562, Val Acc: 0.6415
Epoch [2/3] - Train Loss: 0.8396, Train Acc: 0.6829 | Val Loss: 0.8511, Val Acc: 0.6792
Epoch [3/3] - Train Loss: 0.6993, Train Acc: 0.7420 | Val Loss: 0.8215, Val Acc: 0.7116
