In [1]:
!pip install -q datasets torch transformers


In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch
from transformers import AutoTokenizer, AutoModel

In [3]:
print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())

PyTorch version: 2.5.1+cu121
CUDA available: True


In [4]:
file_path = '/kaggle/input/restaurant-reviews/final_data.txt'

In [5]:
def read_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            file_content = f.read()
        return file_content
    except FileNotFoundError:
        print(f"Cant find file: {file_path}")
        return ""
    except Exception as e:
        print(f"Error: {e}")
        return ""

def split_aspect(aspects):
    list_sa = []
    aspect_pairs = aspects.split('}, {')
    for pair in aspect_pairs:
        pair = pair.replace('{', '').replace('}', '')
        parts = pair.split(', ')
        if len(parts) == 2:  
            aspect, sentiment = parts
            list_sa.append([aspect.strip(), sentiment.strip()])
        else:
            continue
    return list_sa

def list_data(file):
    aspect_list = ["AMBIENCE", "PRICE", "FOOD", "SERVICE"]
    opinions = file.split('\n\n')
    list_sa = []
    
    for opinion in opinions:
        if not opinion.strip():  
            continue
        lines = opinion.split("\n")
        if len(lines) < 3: 
            continue
        
        num, sentence, aspects = lines[0], lines[1], lines[2]
        aspects = split_aspect(aspects)

        if not aspects or len(aspects) < 2:
            continue
            
        list_temp = [aspect[0] for aspect in aspects]  
        
        for aspect, sentiment in aspects:
            item = {"sentence": sentence, "aspect": aspect, "sentiment": sentiment}
            list_sa.append(item)
        
        for aspect in aspect_list:
            if aspect not in list_temp:
                item = {"sentence": sentence, "aspect": aspect, "sentiment": "none"}
                list_sa.append(item)
    
    return list_sa

def label_encoder(label):
    label_map = {"positive": 1, "negative": 2, "neutral": 3, "none": 0}
    return label_map.get(label, 0)

def transform_data(file):
    data_dict = {"sentence": [], "aspect": [], "sentiment_id": [], "sentiment": []}
    list_sa = list_data(file)
    
    if not list_sa:
        print("No valid data found in the file!")
        return pd.DataFrame(data_dict)
    
    for item in list_sa:
        data_dict["sentence"].append(item['sentence'])
        data_dict['aspect'].append(item["aspect"])
        sentiment = item["sentiment"]
        data_dict['sentiment_id'].append(label_encoder(sentiment))
        data_dict['sentiment'].append(sentiment)
    
    df = pd.DataFrame(data_dict)
    return df

In [6]:
data_content = read_file(file_path)

df = transform_data(data_content)
print(df)
df['sentiment'] = df['sentiment'].replace('positive.', 'positive')

df.to_csv('transformed_data.csv', index=False, encoding='utf-8')

                                                sentence    aspect  \
0      ăn rất ngon được phục vụ chu đáo với 2 cô chú ...      FOOD   
1      ăn rất ngon được phục vụ chu đáo với 2 cô chú ...   SERVICE   
2      ăn rất ngon được phục vụ chu đáo với 2 cô chú ...     PRICE   
3      ăn rất ngon được phục vụ chu đáo với 2 cô chú ...  AMBIENCE   
4      quán này khá đông khách vào buổi tối nên phải ...      FOOD   
...                                                  ...       ...   
39755  quán cơm 418 này có không gian hơi bị rộng lại...     PRICE   
39756  tài xế chở vào đây ăn trưa trên đường đi công ...      FOOD   
39757  tài xế chở vào đây ăn trưa trên đường đi công ...  AMBIENCE   
39758  tài xế chở vào đây ăn trưa trên đường đi công ...     PRICE   
39759  tài xế chở vào đây ăn trưa trên đường đi công ...   SERVICE   

       sentiment_id sentiment  
0                 1  positive  
1                 1  positive  
2                 1  positive  
3                 0      none  

In [7]:
def check_sentiment_per_aspect_crosstab(df):
    distribution = pd.crosstab(df['aspect'], df['sentiment'], margins=True, margins_name="Total")
    print("sentiment distribution by aspect (crosstab):")
    print(distribution)

    distribution_percent = pd.crosstab(df['aspect'], df['sentiment'], normalize='index') * 100
    print("\n% sentiment by aspect:")
    print(distribution_percent.round(2))

check_sentiment_per_aspect_crosstab(df)

sentiment distribution by aspect (crosstab):
sentiment  negative  neutral   none  positive  Total
aspect                                              
AMBIENCE        883      135   4434      4443   9895
DELIVERY         27        2      0       149    178
FOOD           1283      946    582      7087   9898
PRICE          1065      567   4423      3839   9894
SERVICE        1277      171   3872      4575   9895
Total          4535     1821  13311     20093  39760

% sentiment by aspect:
sentiment  negative  neutral   none  positive
aspect                                       
AMBIENCE       8.92     1.36  44.81     44.90
DELIVERY      15.17     1.12   0.00     83.71
FOOD          12.96     9.56   5.88     71.60
PRICE         10.76     5.73  44.70     38.80
SERVICE       12.91     1.73  39.13     46.24


In [8]:
from sklearn.model_selection import train_test_split

def split_data(df, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
    
    train_df, temp_df = train_test_split(df, train_size=train_ratio, random_state=42, stratify=df['sentiment_id'])
  
    val_ratio_adjusted = val_ratio / (val_ratio + test_ratio)  
    val_df, test_df = train_test_split(temp_df, train_size=val_ratio_adjusted, random_state=42, stratify=temp_df['sentiment_id'])
    
    return train_df, val_df, test_df

train_df, val_df, test_df = split_data(df)

check_sentiment_per_aspect_crosstab(train_df)

sentiment distribution by aspect (crosstab):
sentiment  negative  neutral   none  positive  Total
aspect                                              
AMBIENCE        705      107   3573      3544   7929
DELIVERY         21        2      0       122    145
FOOD           1026      760    471      5710   7967
PRICE           843      461   3497      3046   7847
SERVICE        1033      127   3107      3653   7920
Total          3628     1457  10648     16075  31808

% sentiment by aspect:
sentiment  negative  neutral   none  positive
aspect                                       
AMBIENCE       8.89     1.35  45.06     44.70
DELIVERY      14.48     1.38   0.00     84.14
FOOD          12.88     9.54   5.91     71.67
PRICE         10.74     5.87  44.56     38.82
SERVICE       13.04     1.60  39.23     46.12


In [9]:
import torch
from torch.utils.data import Dataset
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base")

class CustomDataset(Dataset):
    def __init__(self, df, tokenizer, max_length=256):
        self.df = df
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        sentence = row['sentence']
        aspect = row['aspect']
        sentiment_id = row['sentiment_id']
        
        input_text = f"{sentence} [SEP] {aspect}"      # separate sentence and aspect 
        
        encoded = self.tokenizer.encode_plus(
            input_text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoded['input_ids'].squeeze(0),  
            'attention_mask': encoded['attention_mask'].squeeze(0),
            'labels': torch.tensor(sentiment_id, dtype=torch.long)
        }

train_dataset = CustomDataset(train_df, tokenizer)
val_dataset = CustomDataset(val_df, tokenizer)
test_dataset = CustomDataset(test_df, tokenizer)

config.json:   0%|          | 0.00/557 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/895k [00:00<?, ?B/s]

bpe.codes:   0%|          | 0.00/1.14M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/3.13M [00:00<?, ?B/s]

In [10]:
import pytorch_lightning as pl
from torch.utils.data import DataLoader
from transformers import AutoTokenizer

class ABSADataModule(pl.LightningDataModule):
    def __init__(self, train_df, val_df, test_df, tokenizer_name="vinai/phobert-base", max_length=256, batch_size=8):
        super().__init__()
        self.train_df = train_df
        self.val_df = val_df
        self.test_df = test_df
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
        self.max_length = max_length
        self.batch_size = batch_size
    
    def setup(self, stage=None):
        self.train_dataset = CustomDataset(self.train_df, self.tokenizer, self.max_length)
        self.val_dataset = CustomDataset(self.val_df, self.tokenizer, self.max_length)
        self.test_dataset = CustomDataset(self.test_df, self.tokenizer, self.max_length)
    
    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=4)
    
    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=4)
    
    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False, num_workers=4)

In [11]:
import pytorch_lightning as pl
import torch
import torch.nn as nn
from torch.optim import AdamW 
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
from torch.optim.lr_scheduler import ReduceLROnPlateau 

class ABSAModel(pl.LightningModule):
    def __init__(self, 
                 model_name="vinai/phobert-base", 
                 num_labels=4, 
                 class_weights=None, 
                 learning_rate=2e-5, 
                 bert_learning_rate=2e-6,  
                 freeze_bert_layers=True, 
                 hidden_dropout_prob=0.3):
        super().__init__()

        self.save_hyperparameters("model_name", "num_labels", "learning_rate", "bert_learning_rate", "freeze_bert_layers", "hidden_dropout_prob") 
        # Don't save class_weights to hparams as it can be a large tensor
        self.class_weights = class_weights 

        self.bert_model = AutoModel.from_pretrained(model_name)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        
        if freeze_bert_layers:
            print("Freezing BERT layers except the last two.")
            for param in self.bert_model.parameters():
                param.requires_grad = False
            num_layers_to_unfreeze = 4
            for layer in self.bert_model.encoder.layer[-num_layers_to_unfreeze:]:
                for param in layer.parameters():
                    param.requires_grad = True
            # pooler unfreeze if needed
            if hasattr(self.bert_model, 'pooler') and self.bert_model.pooler is not None:
                 for param in self.bert_model.pooler.parameters():
                     param.requires_grad = True
        else:
            print("Fine-tuning all BERT layers.")
            for param in self.bert_model.parameters():
                param.requires_grad = True

        self.classifier = nn.Sequential(
            nn.Dropout(hidden_dropout_prob),
            nn.Linear(self.bert_model.config.hidden_size, 512),
            nn.ReLU(),
            nn.Dropout(hidden_dropout_prob),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(hidden_dropout_prob),
            nn.Linear(256, num_labels)
        )
        
        self.criterion = torch.nn.CrossEntropyLoss(weight=self.class_weights) 
        
        self.val_outputs = []
        self.test_outputs = []
        
        self.aspects = ["AMBIENCE", "PRICE", "FOOD", "SERVICE"]
        self.sentiments = ["positive", "negative", "neutral"]
        self.sentiment_map = {1: "positive", 2: "negative", 3: "neutral", 0: "none"}

    def forward(self, input_ids, attention_mask):
        bert_output = self.bert_model(
            input_ids=input_ids, 
            attention_mask=attention_mask
        )
        pooled_output = bert_output.last_hidden_state[:, 0, :] # CLS token embedding
        logits = self.classifier(pooled_output)
        return logits

    def training_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        logits = self(input_ids, attention_mask)
        loss = self.criterion(logits, labels)
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        logits = self(input_ids, attention_mask)
        loss = self.criterion(logits, labels)
        preds = torch.argmax(logits, dim=1)
        self.log('val_loss', loss, on_epoch=True, prog_bar=True)
        self.val_outputs.append({'val_loss': loss, 'preds': preds, 'labels': labels})
        return {'val_loss': loss}

    def on_validation_epoch_end(self):
        if not self.val_outputs:
            return
        preds = torch.cat([x['preds'] for x in self.val_outputs]).detach().cpu().numpy()
        labels = torch.cat([x['labels'] for x in self.val_outputs]).detach().cpu().numpy()
        accuracy = accuracy_score(labels, preds)
        f1 = f1_score(labels, preds, average='weighted') 
        self.log('val_accuracy', accuracy, prog_bar=True)
        self.log('val_f1', f1, prog_bar=True)
        self.val_outputs.clear() 

    def test_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        logits = self(input_ids, attention_mask)
        loss = self.criterion(logits, labels)
        preds = torch.argmax(logits, dim=1)
        self.test_outputs.append({'test_loss': loss,'preds': preds,'labels': labels,'logits': logits})
        return {'test_loss': loss}

    def on_test_epoch_end(self):
        if not self.test_outputs:
            return
        preds = torch.cat([x['preds'] for x in self.test_outputs]).detach().cpu().numpy()
        labels = torch.cat([x['labels'] for x in self.test_outputs]).detach().cpu().numpy()
        test_loss = torch.stack([x['test_loss'] for x in self.test_outputs]).mean()
        accuracy = accuracy_score(labels, preds)
        f1 = f1_score(labels, preds, average='weighted')
        self.log('test_loss', test_loss)
        self.log('test_accuracy', accuracy)
        self.log('test_f1', f1)
        # self.test_predictions = preds 
        # self.test_labels = labels
        self.test_outputs.clear() 

    def configure_optimizers(self):
        bert_params = []
        classifier_params = []
        print("\nConfiguring Optimizer Parameter Groups:")
        for name, param in self.named_parameters():
            if param.requires_grad: 
                if 'bert_model' in name:
                    bert_params.append(param)
                else:
                    classifier_params.append(param)

        print(f"  Found {len(bert_params)} parameters in BERT group (requires_grad=True).")
        print(f"  Found {len(classifier_params)} parameters in Classifier group (requires_grad=True).")

        optimizer_grouped_parameters = [
            {
                'params': bert_params,
                'lr': self.hparams.bert_learning_rate 
            },
            {
                'params': classifier_params,
                'lr': self.hparams.learning_rate 
            }
        ]

        optimizer = AdamW(optimizer_grouped_parameters)
        
        scheduler_config = {
            'scheduler': ReduceLROnPlateau(
                optimizer,
                mode='max',      
                factor=0.1,      
                patience=3,      
                min_lr=1e-7,     
                verbose=True
            ),
            'monitor': 'val_f1', 
            'interval': 'epoch', 
            'frequency': 1       
        }
        
        print(f"\nOptimizer: AdamW")
        print(f"  Classifier Learning Rate: {self.hparams.learning_rate}")
        print(f"  BERT Learning Rate: {self.hparams.bert_learning_rate}") 
        print(f"Scheduler: ReduceLROnPlateau (monitoring '{scheduler_config['monitor']}')\n")

        return [optimizer], [scheduler_config] 
        
    def save_model(self, filepath):
        save_dict = {
            'model_state_dict': self.state_dict(),
            # 'bert_model_state_dict': self.bert_model.state_dict(), 
            # 'classifier_state_dict': self.classifier.state_dict(),
            'hyperparameters': self.hparams, 
            'class_weights': self.class_weights, 
            'tokenizer': self.tokenizer, 
            'sentiment_map': self.sentiment_map,
            'aspects': self.aspects,
            'sentiments': self.sentiments
        }
        torch.save(save_dict, filepath)
        print(f"Model and associated info saved to {filepath}")

    @staticmethod
    def load_model(filepath, device='cpu'):
        try:
             checkpoint = torch.load(filepath, map_location=device, weights_only=False)
        except:
             print("Warning: Failed to load with weights_only=False. Trying weights_only=True. Tokenizer might need manual reloading.")
             checkpoint = torch.load(filepath, map_location=device, weights_only=True)


        hparams = checkpoint['hyperparameters']

        model = ABSAModel(
            model_name=hparams.get('model_name', "vinai/phobert-base"), 
            num_labels=hparams.get('num_labels', 4),
            learning_rate=hparams.get('learning_rate', 2e-5),
            bert_learning_rate=hparams.get('bert_learning_rate', 2e-6),
            freeze_bert_layers=hparams.get('freeze_bert_layers', True), 
            hidden_dropout_prob=hparams.get('hidden_dropout_prob', 0.3),
             # Load class_weights
            class_weights=checkpoint.get('class_weights', None) 
        )
        
        # Load state dict 
        model.load_state_dict(checkpoint['model_state_dict'])
        
        # If weights_only=True and tokenizer fails to load, need to reload manually
        if 'tokenizer' in checkpoint:
             model.tokenizer = checkpoint['tokenizer']
        else:
             print("Tokenizer not found in checkpoint, reloading from model name...")
             model.tokenizer = AutoTokenizer.from_pretrained(model.hparams.model_name)

        model.sentiment_map = checkpoint.get('sentiment_map', {1: "positive", 2: "negative", 3: "neutral", 0: "none"})
        model.aspects = checkpoint.get('aspects', ["AMBIENCE", "PRICE", "FOOD", "SERVICE"])
        model.sentiments = checkpoint.get('sentiments', ["positive", "negative", "neutral"])
        
        model.eval() 
        model.to(device) 
        print(f"Model loaded successfully from {filepath} to {device}")
        return model

In [12]:
# # Load checkpoint
# checkpoint_path = ""
# loaded_model = ABSAModel.load_from_checkpoint(checkpoint_path)

#trainer.fit(model, data_module, ckpt_path=checkpoint_path)

In [13]:
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint, LearningRateMonitor
import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer, AdamW

from sklearn.utils.class_weight import compute_class_weight

class_labels = train_df['sentiment_id'].unique()
class_labels.sort()
class_weights = compute_class_weight('balanced', classes=class_labels, y=train_df['sentiment_id'])
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to('cuda' if torch.cuda.is_available() else 'cpu') # Chuyển lên device

data_module = ABSADataModule(train_df, val_df, test_df, tokenizer_name="vinai/phobert-base", batch_size=8)
data_module.setup()

LEARNING_RATE = 3e-5  
BERT_LEARNING_RATE = 1e-5 

model = ABSAModel(
    model_name="vinai/phobert-base",
    num_labels=4,
    class_weights=class_weights_tensor,
    learning_rate=LEARNING_RATE,
    bert_learning_rate=BERT_LEARNING_RATE, 
    freeze_bert_layers=True,
    hidden_dropout_prob=0.3
)

early_stopping_callback = EarlyStopping(
    monitor='val_f1',
    patience=5,
    mode='max',
    verbose=True
)

checkpoint_callback = ModelCheckpoint(
    monitor='val_f1',
    filename='best-model-{epoch:02d}-{val_f1:.4f}',
    save_top_k=1,
    mode='max',
    verbose=True
)

lr_monitor = LearningRateMonitor(logging_interval='step')

trainer = pl.Trainer(
    max_epochs=50,
    accelerator='gpu' if torch.cuda.is_available() else 'cpu',
    devices=1 if torch.cuda.is_available() else 1,
    precision='16-mixed',
    log_every_n_steps=10,
    enable_progress_bar=True,
    logger=True,
    callbacks=[early_stopping_callback, checkpoint_callback, lr_monitor]
)

trainer.fit(model, data_module)

pytorch_model.bin:   0%|          | 0.00/543M [00:00<?, ?B/s]

Freezing BERT layers except the last two.

Configuring Optimizer Parameter Groups:
  Found 66 parameters in BERT group (requires_grad=True).
  Found 6 parameters in Classifier group (requires_grad=True).

Optimizer: AdamW
  Classifier Learning Rate: 3e-05
  BERT Learning Rate: 1e-05
Scheduler: ReduceLROnPlateau (monitoring 'val_f1')





Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

In [14]:
save_path = './absa_model.pth'
model.save_model(save_path)

Model and associated info saved to ./absa_model.pth


In [15]:
trainer.test(model, datamodule=data_module)

Testing: |          | 0/? [00:00<?, ?it/s]

[{'test_loss': 1.1790134906768799,
  'test_accuracy': 0.8910965919494629,
  'test_f1': 0.8904407024383545}]

In [16]:
loaded_model = ABSAModel.load_model(save_path, device='cpu')
print("Model loaded successfully")

Freezing BERT layers except the last two.
Model loaded successfully from ./absa_model.pth to cpu
Model loaded successfully


In [None]:
import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, classification_report

def evaluate_absa_tasks(model, dataloader, device='cuda' if torch.cuda.is_available() else 'cpu'):
    """
    Evaluate the ABSA model on two subtasks: Aspect Recognition and Sentiment Classification.
    
    Args:
    model: The trained ABSA model (inherited from pl.LightningModule).
    dataloader: DataLoader for the dataset to be evaluated (usually test_dataloader).
    device: Device to run the model on ('cuda' or 'cpu').
    
    Returns:
    dict: A dictionary containing metrics for both tasks.
    - 'aspect_identification': Metrics for aspect recognition (Present/Absent).
    - 'sentiment_classification': Metrics for sentiment classification (only on aspects defined as present).
    """
    model.to(device)
    model.eval()

    aspect_true_binary = []  # (0: Absent, 1: Present)
    aspect_pred_binary = []  

    sentiment_true_filtered = [] # sentiment (1, 2, 3) when aspect present
    sentiment_pred_filtered = [] 

    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].cpu().numpy() 

            logits = model(input_ids, attention_mask)
            preds = torch.argmax(logits, dim=1).cpu().numpy()

            for label, pred in zip(labels, preds):
                # --- task 1: Aspect ---
                true_is_present = (label != 0)
                pred_is_present = (pred != 0)

                aspect_true_binary.append(int(true_is_present)) 
                aspect_pred_binary.append(int(pred_is_present)) 

                # --- task 2: Sentiment 
                if true_is_present and pred_is_present:
                    sentiment_true_filtered.append(label) 
                    sentiment_pred_filtered.append(pred)  

    results = {}

    # 1. Metrics Aspect (Binary Classification: Absent=0, Present=1)
    aspect_accuracy = accuracy_score(aspect_true_binary, aspect_pred_binary)
    aspect_precision, aspect_recall, aspect_f1, _ = precision_recall_fscore_support(
        aspect_true_binary, aspect_pred_binary, average='binary', pos_label=1 # 'Present'
    )
    aspect_precision_w, aspect_recall_w, aspect_f1_w, _ = precision_recall_fscore_support(
        aspect_true_binary, aspect_pred_binary, average='weighted'
    )
    aspect_cm = confusion_matrix(aspect_true_binary, aspect_pred_binary, labels=[0, 1])
    aspect_cr = classification_report(
        aspect_true_binary,
        aspect_pred_binary,
        target_names=['Absent (0)', 'Present (1)'],
        digits=4
    )

    results['aspect_identification'] = {
        'accuracy': aspect_accuracy,
        'precision (present)': aspect_precision, 
        'recall (present)': aspect_recall,       
        'f1-score (present)': aspect_f1,       
        'weighted_precision': aspect_precision_w,
        'weighted_recall': aspect_recall_w,
        'weighted_f1': aspect_f1_w,
        'confusion_matrix': aspect_cm.tolist(), 
        'classification_report': aspect_cr
    }


    # 2. Metrics Sentiment (Multiclass: positive=1, negative=2, neutral=3)
    if sentiment_true_filtered:
        sentiment_accuracy = accuracy_score(sentiment_true_filtered, sentiment_pred_filtered)
        # macro and weighted average
        sentiment_precision_macro, sentiment_recall_macro, sentiment_f1_macro, _ = precision_recall_fscore_support(
            sentiment_true_filtered, sentiment_pred_filtered, average='macro', labels=[1, 2, 3], zero_division=0
        )
        sentiment_precision_weighted, sentiment_recall_weighted, sentiment_f1_weighted, _ = precision_recall_fscore_support(
            sentiment_true_filtered, sentiment_pred_filtered, average='weighted', labels=[1, 2, 3], zero_division=0
        )
        sentiment_cm = confusion_matrix(sentiment_true_filtered, sentiment_pred_filtered, labels=[1, 2, 3])
        sentiment_cr = classification_report(
            sentiment_true_filtered,
            sentiment_pred_filtered,
            target_names=['positive (1)', 'negative (2)', 'neutral (3)'],
            labels=[1, 2, 3],
            digits=4,
             zero_division=0
        )

        results['sentiment_classification'] = {
            'accuracy': sentiment_accuracy,
            'macro_precision': sentiment_precision_macro,
            'macro_recall': sentiment_recall_macro,
            'macro_f1': sentiment_f1_macro,
            'weighted_precision': sentiment_precision_weighted,
            'weighted_recall': sentiment_recall_weighted,
            'weighted_f1': sentiment_f1_weighted,
            'confusion_matrix': sentiment_cm.tolist(),
            'classification_report': sentiment_cr,
            'num_samples_evaluated': len(sentiment_true_filtered)
        }
    else:
         results['sentiment_classification'] = {
            'message': "No samples where both true and predicted aspects were present.",
            'accuracy': 0,
            'macro_precision': 0,
            'macro_recall': 0,
            'macro_f1': 0,
            'weighted_precision': 0,
            'weighted_recall': 0,
            'weighted_f1': 0,
            'confusion_matrix': [],
            'classification_report': "",
            'num_samples_evaluated': 0
        }

    return results

# --- print results beautifully ---
def print_absa_task_evaluation(results):
    print("=" * 60)
    print("      Aspect Identification Evaluation (Binary: Absent/Present)")
    print("-" * 60)
    if 'aspect_identification' in results:
        aspect_res = results['aspect_identification']
        print(f"Accuracy: {aspect_res['accuracy']:.4f}")
        print("\nMetrics for 'Present' class (label=1):")
        print(f"  Precision: {aspect_res['precision (present)']:.4f}")
        print(f"  Recall:    {aspect_res['recall (present)']:.4f}")
        print(f"  F1-Score:  {aspect_res['f1-score (present)']:.4f}")
        print("\nWeighted Averages:")
        print(f"  Precision: {aspect_res['weighted_precision']:.4f}")
        print(f"  Recall:    {aspect_res['weighted_recall']:.4f}")
        print(f"  F1-Score:  {aspect_res['weighted_f1']:.4f}")
        print("\nConfusion Matrix ([Absent, Present] x [Absent, Present]):")
        print(np.array(aspect_res['confusion_matrix']))
        print("\nClassification Report:")
        print(aspect_res['classification_report'])
    else:
        print("No results for Aspect Identification.")

    print("\n" + "=" * 60)
    print(" Sentiment Classification Evaluation (Only for Present Aspects)")
    print("-" * 60)
    if 'sentiment_classification' in results:
        sentiment_res = results['sentiment_classification']
        print(f"Number of samples evaluated (True & Pred != 'none'): {sentiment_res['num_samples_evaluated']}")
        if sentiment_res['num_samples_evaluated'] > 0:
            print(f"Accuracy: {sentiment_res['accuracy']:.4f}")
            print("\nMacro Averages (positive, negative, neutral):")
            print(f"  Precision: {sentiment_res['macro_precision']:.4f}")
            print(f"  Recall:    {sentiment_res['macro_recall']:.4f}")
            print(f"  F1-Score:  {sentiment_res['macro_f1']:.4f}")
            print("\nWeighted Averages (positive, negative, neutral):")
            print(f"  Precision: {sentiment_res['weighted_precision']:.4f}")
            print(f"  Recall:    {sentiment_res['weighted_recall']:.4f}")
            print(f"  F1-Score:  {sentiment_res['weighted_f1']:.4f}")
            print("\nConfusion Matrix ([pos, neg, neu] x [pos, neg, neu]):")
            print(np.array(sentiment_res['confusion_matrix']))
            print("\nClassification Report:")
            print(sentiment_res['classification_report'])
        else:
            print(sentiment_res['message'])
    else:
        print("No results for Sentiment Classification.")
    print("=" * 60)

test_dataloader = data_module.test_dataloader() 
evaluation_results = evaluate_absa_tasks(model, test_dataloader)
print_absa_task_evaluation(evaluation_results) 

      Aspect Identification Evaluation (Binary: Absent/Present)
------------------------------------------------------------
Accuracy: 0.9394

Metrics for 'Present' class (label=1):
  Precision: 0.9482
  Recall:    0.9614
  F1-Score:  0.9547

Weighted Averages:
  Precision: 0.9391
  Recall:    0.9394
  F1-Score:  0.9392

Confusion Matrix ([Absent, Present] x [Absent, Present]):
[[1193  139]
 [ 102 2542]]

Classification Report:
              precision    recall  f1-score   support

  Absent (0)     0.9212    0.8956    0.9083      1332
 Present (1)     0.9482    0.9614    0.9547      2644

    accuracy                         0.9394      3976
   macro avg     0.9347    0.9285    0.9315      3976
weighted avg     0.9391    0.9394    0.9392      3976


 Sentiment Classification Evaluation (Only for Present Aspects)
------------------------------------------------------------
Number of samples evaluated (True & Pred != 'none'): 2542
Accuracy: 0.9245

Macro Averages (positive, negative, neu

In [18]:
import torch
from transformers import AutoTokenizer 
import pandas as pd 

def predict_review(review_text: str, 
                   aspects_to_check: list, 
                   model, 
                   tokenizer, 
                   device: str = 'cpu', 
                   max_length: int = 256):
    
    model.eval()
    model.to(device) 
    
    results = []

    with torch.no_grad():
        for aspect in aspects_to_check:
            input_text = f"{review_text} [SEP] {aspect}"

            encoded = tokenizer.encode_plus(
                input_text,
                add_special_tokens=True,    
                max_length=max_length,      # Padding/Truncate
                padding='max_length',
                truncation=True,
                return_tensors='pt'        
            )

            input_ids = encoded['input_ids'].to(device)
            attention_mask = encoded['attention_mask'].to(device)

            logits = model(input_ids=input_ids, attention_mask=attention_mask)

            predicted_id_tensor = torch.argmax(logits, dim=1)
            predicted_id = predicted_id_tensor.item() 
            
            predicted_sentiment = model.sentiment_map.get(predicted_id, "unknown") 
            
            if predicted_sentiment != 'none':
                results.append((aspect, predicted_sentiment))

    return results

In [22]:
model_path = './absa_model.pth' 
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

try:
    loaded_model_info = ABSAModel.load_model(model_path, device=device)
    
    if isinstance(loaded_model_info, dict): 
         loaded_model = loaded_model_info['model'] 
         tokenizer = loaded_model_info.get('tokenizer') 
         if not tokenizer:
              print("Tokenizer not found in loaded dict, reloading...")
              tokenizer = AutoTokenizer.from_pretrained(loaded_model.hparams.model_name)
    elif hasattr(loaded_model_info, 'tokenizer'): 
         loaded_model = loaded_model_info
         tokenizer = loaded_model.tokenizer
    else: 
         loaded_model = loaded_model_info
         print("Model loaded, but tokenizer not found directly. Reloading tokenizer...")
         tokenizer = AutoTokenizer.from_pretrained(loaded_model.hparams.model_name)

    print("Model and Tokenizer loaded successfully.")

    example_review = "Đồ ăn ở đây rất ngon, phục vụ nhiệt tình nhưng giá hơi cao và không gian khá ồn ào."
    aspects = ["FOOD", "SERVICE", "PRICE", "AMBIENCE"] 
    max_len_inference = 256 

    predictions = predict_review(
        review_text=example_review,
        aspects_to_check=aspects,
        model=loaded_model,
        tokenizer=tokenizer,
        device=device,
        max_length=max_len_inference
    )

    print(f"\nReview: \"{example_review}\"")
    print("Predicted Aspect-Sentiment pairs (excluding 'none'):")
    if predictions:
        for aspect, sentiment in predictions:
            print(f"  - {aspect}: {sentiment}")
    else:
        print("  No non-'none' sentiments predicted.")

    example_review_2 = "Quán vắng, đồ ăn tạm được."
    predictions_2 = predict_review(example_review_2, aspects, loaded_model, tokenizer, device, max_len_inference)
    print(f"\nReview: \"{example_review_2}\"")
    print("Predicted Aspect-Sentiment pairs (excluding 'none'):")
    if predictions_2:
         for aspect, sentiment in predictions_2:
              print(f"  - {aspect}: {sentiment}")
    else:
        print("  No non-'none' sentiments predicted.")


except FileNotFoundError:
    print(f"Error: Model file not found at {model_path}")
except Exception as e:
    print(f"An error occurred during loading or prediction: {e}")
    import traceback
    traceback.print_exc()

Using device: cuda
Freezing BERT layers except the last two.
Model loaded successfully from ./absa_model.pth to cuda
Model and Tokenizer loaded successfully.

Review: "Đồ ăn ở đây rất ngon, phục vụ nhiệt tình nhưng giá hơi cao và không gian khá ồn ào."
Predicted Aspect-Sentiment pairs (excluding 'none'):
  - FOOD: positive
  - SERVICE: positive
  - PRICE: negative
  - AMBIENCE: negative

Review: "Quán vắng, đồ ăn tạm được."
Predicted Aspect-Sentiment pairs (excluding 'none'):
  - FOOD: neutral
  - AMBIENCE: neutral


In [24]:
os.path.exists('/kaggle/working/absa_model.pth')

True