In [None]:
import re
import unicodedata

def canonicalize_text(text):
    text = re.sub(r'[\d\W_]+', ' ', text)

    text = ''.join(
        c for c in unicodedata.normalize('NFD', text)
        if unicodedata.category(c) != 'Mn'
    )

    text = text.lower()

    text = text.strip()

    return text

In [None]:
from transformers import BertTokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
import nltk
from nltk.tree import Tree
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch
from tqdm import tqdm

def read_ptb_tree(tree_string):
    return Tree.fromstring(tree_string)

def extract_sentence_and_label(tree):
    label = (tree.label())

    words = tree.leaves()
    sentence = ' '.join(words)

    return sentence, label

def read_file(file_path):
    data = []
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            tree = read_ptb_tree(line.strip())
            sentence, label = extract_sentence_and_label(tree)
            data.append({'sentence': sentence, 'label': label})
    return data

In [None]:
class SST5_Dataset(Dataset):
    def __init__(self, file_path):
        self.data = [
            (
                tokenizer(
                    canonicalize_text(row['sentence']),
                    add_special_tokens=True,
                    max_length=512,  
                    padding='max_length', 
                    truncation=True,      
                    return_tensors="pt"   
                ), 
                int(row['label']) if isinstance(row['label'], str) else row['label']
            )
            for row in read_file(file_path)
        ]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        X,y = self.data[idx]
        input_ids = X['input_ids'].squeeze()
        attention_mask = X['attention_mask'].squeeze()
        label = torch.tensor(y, dtype=torch.long)
        return input_ids, attention_mask, y

In [None]:
train_path = '/kaggle/input/treeset/train.txt'
test_path = '/kaggle/input/treeset/test.txt'
val_path = '/kaggle/input/treeset/dev.txt'

trainset = SST5_Dataset(train_path)
testset = SST5_Dataset(test_path)
valset = SST5_Dataset(val_path)

Sampling data

In [None]:
# from torch.utils.data import Subset
# import random

# train_indices = random.sample(range(len(trainset)), 100)
# test_indices = random.sample(range(len(testset)), 20)
# val_indices = random.sample(range(len(valset)), 20)

# trainset = Subset(trainset, train_indices)
# testset = Subset(testset, test_indices)
# valset = Subset(valset, val_indices)

In [None]:
!pip install loguru -qU

In [None]:
import torch
import torch.nn.functional as F
import torch.nn as nn

# # Chọn task cần thực hiện (ở đây chỉ dùng sst)
# def model_prediction(model, batch, task_name = 'default'):
#     return {
#         'default': lambda: model(*batch),
#         'sst': lambda: model.predict_sentiment(*batch),
#         # 'para': lambda: model.predict_paraphrase(*batch),
#         # 'sts': lambda: model.predict_similarity(*batch),
#     }[task_name]()

# Lớp này kế thừa từ object
# Đây là định nghĩa hàm regularization được thêm vào để tăng độ mượt model
class AdversarialReg(object):
    def __init__(self, model,
                epsilon: float = 1e-5,
                lambda_: float = 1,
                eta: float = 1e-3,
                sigma: float = 1e-5,
                K: int = 1):
        # gọi hàm khởi tạo của lớp cha
        super(AdversarialReg, self).__init__()
        self.embed_backup = {} # từ điển lưu trữ các tham số embedding ban đầu (sau cần khôi phục)
        self.grad_backup = {} # từ điển lưu trữ các gradient ban đầu (sau khôi phục được)
        self.model = model
        self.epsilon = epsilon # mức độ thay đổi các tham số
        self.lambda_ = lambda_ # hệ số điều chỉnh tính toán hàm mất mát
        self.eta = eta # learning rate
        self.sigma = sigma # độ mạnh của noise được thêm vào 
        self.K= K # iterations

    # Lưu các gradient được dùng tính toán (để sau khôi phục lại)
    def save_gradients(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad: #kiểm tra param có được tính toán trong backpropagation k
                if param.grad == None: # chưa được tính toán
                    self.grad_backup[name] = None
                else:
                    self.grad_backup[name] = param.grad.clone()

    # lọc những tham số tgia tính toán là emb_name và lưu lại
    def save_embeddings(self, emb_name):
        # print(emb_name, type(emb_name))
        for name, param in self.model.named_parameters():
            if param.requires_grad and emb_name in name:
                self.embed_backup[name] = param.data.clone()

    # khôi phục các gradient tính toán 
    def restore_gradient(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad:
                assert name in self.grad_backup #đảm bảo tham số tồn tại
                param.grad = self.grad_backup[name] #khôi phục giá trị

    # khôi phục những tham số tính toán emb_name
    def restore_embeddings(self, emb_name): 
        
        for name, param in self.model.named_parameters():
            if param.requires_grad and emb_name in name:
                assert name in self.embed_backup
                param.data = self.embed_backup[name]
        self.embed_backup = {}

    #thêm nhiễu vào các tham số
    def generate_noise(self, emb_name):
        for name, param in self.model.named_parameters():
            if param.requires_grad and emb_name in name:
                noise = param.data.new(param.size()).normal_(0, 1) * self.sigma
                param.data.add_(noise)

    # điều chỉnh tham số nhúng sau khi bị change mà đảm bảo không vượt quá ngưỡng xđ vởi epsilon
    def project(self, param_name, param_data):
        change = param_data - self.embed_backup[param_name]
        change = torch.clamp(change, min = -self.epsilon, max = self.epsilon)
        return self.embed_backup[param_name] + change

    # điều chỉnh tham số nhúng theo hướng gradient ascent (not GD)
    def emb_ascent(self, emb_name):
         for name, param in self.model.named_parameters():
             if param.requires_grad and emb_name in name:
                 norm = torch.norm(param.grad, p = float('inf'))
                 if norm != 0 and not torch.isnan(norm):
                     param.data.add_(self.eta * (param.grad / norm))
                     param.data = self.project(name, param.data)

    # Tính toán KL divergence đối xứng -> sử dụng trong quá trình train
    def symmetric_kl(self, inputs, target):
        loss = F.kl_div(F.log_softmax(inputs, dim = -1),
                       F.log_softmax(target, dim = -1),
                       reduction = 'batchmean',
                       log_target = True)
        loss += F.kl_div(F.log_softmax(target, dim = -1),
                        F.log_softmax(inputs, dim = -1),
                        reduction = 'batchmean',
                        log_target = True)
        return loss

    # Một cách tính toán D_KL tương tự -> dùng cho việc đánh giá hiệu suất (val, test?)
    def symmetric_kl_check(self, inputs, target, reduce = True):
        epsilon = 1e-6
        bs = inputs.size(0)
        p = F.log_softmax(inputs, 1).exp()
        y = F.log_softmax(target, 1).exp()
        rp = -(1.0 / (p + epsilon) - 1 + epsilon).detach().log()
        ry = -(1.0 / (y + epsilon) - 1 + epsilon).detach().log()

        if reduce:
            return ((p * (rp - ry) * 2).sum() / bs)
        else:
            return ((p * (rp - ry) * 2).sum())

    def max_loss_reg(self, batch, emb_name): #embedding or embedding.
        emb_name = 'embedding'
        # print(emb_name, type(emb_name))
        input_ids, attention_mask = batch
        self.model.eval() #chuyển model sang chế độ đánh giá, tắt dropout
        
        logits = self.model(input_ids, attention_mask)  # tính toán f ban đầu
        # print(logits, type(logits))

        self.save_gradients()
        self.save_embeddings(emb_name)

        self.generate_noise(emb_name)

        for _ in range(self.K):
            self.model.zero_grad()

            adv_logits = self.model(input_ids, attention_mask) # Tính toán f sau khi có thêm nhiễu
            # print(adv_logits, type(adv_logits))
            adv_loss = self.symmetric_kl(adv_logits, logits)

            adv_loss.backward()
            self.emb_ascent(emb_name)

        self.restore_gradient()

        adv_logits = self.model(input_ids, attention_mask) #model_prediction(self.model, batch, task_name)
        adv_loss = self.symmetric_kl(adv_logits, logits.detach())

        self.restore_embeddings(emb_name)
        self.model.train()

        return self.lambda_ * adv_loss
    

In [None]:
# Định nghĩa lớp momentum Bregman proximal point để tránh tham số bị cập nhật quá mức
class MBPP(object):
    def __init__(self, model, beta: float = 0.995, mu: float = 1):
        self.model = model
        self.beta = beta
        self.mu = mu
        self.theta_state = {} #lưu trữ giá trị của tham số tại từng bước cập nhật

        # Cập nhật theta_0 bằng tham số model sau khi pre-trained
        for name, param in self.model.named_parameters():
            self.theta_state[name] = param.data.clone()

    # Cập nhật tất cả tham số theo quy tắc 
    def apply_momentum(self, named_parameters):
        for name, param in self.model.named_parameters():
            self.theta_state[name] = (1 - self.beta) * param.data.clone() + self.beta * self.theta_state[name]

    # Tính D_Breg
    def bregman_divergence(self, batch, logits):
        input_ids, attention_mask = batch
        # chuyển logits thành xác suất cho từng lớp bằng softmax
        theta_prob = F.softmax(logits, dim = -1) 

        # sao lưu tham số hiện tại của mô hình (param.data) vào param_bak
        # thay thế bằng tham số lưu ở theta_state để tính toán mà k mất tham số gốc
        param_bak = {}
        for name, param in self.model.named_parameters():
            param_bak[name] = param.data.clone()
            param.data = self.theta_state[name]

        with torch.no_grad():
            logits = self.model(input_ids, attention_mask) #model_prediction(self.model, batch, taskname)
            theta_til_prob = F.softmax(logits, dim = -1).detach()

        # khôi phục lại tham số 
        for name, param in self.model.named_parameters():
            param.data = param_bak[name]

        #torch.cuda.empty_cache()

        # Tính toán 
        l_s = F.kl_div(theta_prob.log(), theta_til_prob, reduction = 'batchmean') + \
        F.kl_div(theta_til_prob.log(), theta_prob, reduction = 'batchmean')

        return self.mu * l_s

In [None]:
class UnSup_BERT(nn.Module):
    def __init__(self, bert, is_unsup_train=True):
        super(UnSup_BERT, self).__init__()

        self.bert = bert
        self.dropout = nn.Dropout(0.3)
        self.is_unsup_train = is_unsup_train

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=False)
        pooled = outputs['pooler_output']

        if not self.is_unsup_train:
            return pooled

        return self.dropout(pooled)

In [None]:
from transformers import BertTokenizer, BertModel
import torch
import torch.nn as nn
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class BERT_MLP(nn.Module):
    def __init__(self, cl_bert, num_labels):
        super(BERT_MLP, self).__init__()
        
        # BERT + supCL
        self.bert = cl_bert

        # Frozen BERT
        # for para in self.bert.parameters():
        #     para.requires_grad = False
        
        # MLP
        self.mlp = nn.Sequential(
            nn.Linear(768, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, num_labels)
        )
    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        logits = self.mlp(outputs)
        return logits


bert = BertModel.from_pretrained('bert-base-uncased')
cl_bert = UnSup_BERT(bert, is_unsup_train=True)
cl_bert.load_state_dict(torch.load('/kaggle/input/unsupcse-bert/pytorch/default/1/best_model_uncl.pth'))

model = BERT_MLP(cl_bert,num_labels=5)
model.to(device)


# model_path = '/kaggle/input/smart/pytorch/default/1/bert-smart-sst-5ep.pth'
# model.load_state_dict(torch.load(model_path))

In [None]:
from loguru import logger
from torch.cuda.amp import GradScaler, autocast

def train_one_epoch(model, data_loader, optimizer, device):
    model.train()
    total_loss = 0

    scaler = GradScaler()
    pgd = AdversarialReg(model) 
    mbpp = MBPP(model)

    for input_ids, attention_mask, labels in tqdm(data_loader, desc="Training"):
        input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)

        optimizer.zero_grad()

        with autocast():
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = F.cross_entropy(outputs, labels)
        scaler.scale(loss).backward(retain_graph=True)

        # Backpropagation cho adversarial loss
        with autocast():
            adv_loss = pgd.max_loss_reg((input_ids, attention_mask), outputs)
        scaler.scale(adv_loss).backward()

        # Backpropagation cho Bregman divergence
        with autocast():
            breg_div = mbpp.bregman_divergence((input_ids, attention_mask), outputs)
        scaler.scale(breg_div).backward()

        scaler.step(optimizer)  # Thay thế optimizer.step()
        scaler.update() 
        # optimizer.step()
        mbpp.apply_momentum(model.named_parameters())
        total_loss += loss.item()
        
    avg_loss = total_loss / len(data_loader)
    logger.info(f'Training loss: {avg_loss:.4f}')
    return avg_loss



# Hàm dùng để tính toán loss ,accuracy của validation và test
def eval_one_epoch(model, data_loader, device):
    model.eval() 
    total_loss = 0
    correct_predictions = 0

    with torch.no_grad():  
        for input_ids, attention_mask, labels in tqdm(data_loader, desc="Evaluating"):
            input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)
            
            # Tiến hành dự đoán
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = F.cross_entropy(outputs, labels)
            total_loss += loss.item()

            # Tính số lượng dự đoán chính xác
            preds = torch.argmax(outputs, dim=1)
            correct_predictions += torch.sum(preds == labels).item()

    avg_loss = total_loss / len(data_loader)  
    accuracy = correct_predictions / len(data_loader.dataset)  
    logger.info(f'Validation loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}')
    return avg_loss, accuracy


# Bước chạy tổng hợp train, val, test
def train(model, trainset, valset, device, num_epochs=5, batch_size=8, learning_rate=1e-5, save_dir='/kaggle/working/'):
    # chuẩn bị sẵn dữ liệu cho từng bước
    train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True) #xáo trộn data (chỉ cần cho train)
    val_loader = DataLoader(valset, batch_size=batch_size)

    # khởi tạo 1 bộ tối ưu hóa từ thư viện có sẵn
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    best_val_loss = float('inf')
    best_epoch = 0

    best_model_path = f"{save_dir}best_model.pth"
    
    # Đánh giá quá từng epoch
    for epoch in range(num_epochs):
        torch.cuda.empty_cache()
        logger.info(f'Epoch {epoch + 1}/{num_epochs}')
        
        train_loss = train_one_epoch(model, train_loader, optimizer, device)

        val_loss, val_accuracy = eval_one_epoch(model, val_loader, device)


        if val_loss < best_val_loss:
            logger.info(f"Validation loss improved from {best_val_loss:.4f} to {val_loss:.4f}. Saving model...")
            best_val_loss = val_loss
            best_epoch = epoch
            
            torch.save(model.state_dict(), best_model_path)
            logger.info(f"Model saved to {best_model_path}")
            
    logger.info(f"Training completed. Best validation loss: {best_val_loss:.4f} at epoch {best_epoch + 1}.")
        
        

In [None]:
torch.cuda.empty_cache()
train(model, trainset, valset, device)

In [None]:
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm import tqdm
import numpy as np

def test_model(model_path, testset, device, batch_size=16):

    model = BERT_MLP(cl_bert, num_labels=5)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    model.to(device)
    
    test_loader = DataLoader(testset, batch_size=batch_size)
    
    all_preds = []
    all_labels = []
    total_loss = 0
    correct_predictions = 0
    
    with torch.no_grad():
        for input_ids, attention_mask, labels in tqdm(test_loader, desc="Testing"):
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)
            
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = F.cross_entropy(outputs, labels)
            total_loss += loss.item()
            
            # Get predictions
            preds = torch.argmax(outputs, dim=1)
            correct_predictions += torch.sum(preds == labels).item()
            
            # Store predictions and labels for metric calculation
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # Calculate metrics
    accuracy = correct_predictions / len(testset)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, 
        all_preds, 
        average='weighted'
    )
    avg_loss = total_loss / len(test_loader)
    
    # Print metrics
    logger.info(f"Test Results:")
    logger.info(f"Average Loss: {avg_loss:.4f}")
    logger.info(f"Accuracy: {accuracy:.4f}")
    logger.info(f"Precision: {precision:.4f}")
    logger.info(f"Recall: {recall:.4f}")
    logger.info(f"F1 Score: {f1:.4f}")
    
    # Calculate and plot confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(
        cm, 
        annot=True, 
        fmt='d', 
        cmap='Blues',
        xticklabels=np.unique(all_labels),
        yticklabels=np.unique(all_labels)
    )
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.tight_layout()
    plt.show()
    
    metrics = {
        'loss': avg_loss,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'confusion_matrix': cm
    }
    
    return metrics

model_path = '/kaggle/working/best_model.pth'

metrics = test_model(
    model_path=model_path,
    testset=testset,
    device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
)