In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from transformers import (
    BertTokenizerFast, BertForSequenceClassification,
    XLNetTokenizerFast, XLNetForSequenceClassification
)
import matplotlib.pyplot as plt


In [None]:
# Load & split dataset
df = pd.read_csv("processed_data.csv")

# Train/Val/Test split
train_df, test_df = train_test_split(
    df, test_size=0.2, stratify=df["label"], random_state=42
)
train_df, val_df = train_test_split(
    train_df, test_size=0.1, stratify=train_df["label"], random_state=42
)

print(f"Train: {len(train_df)}, Val: {len(val_df)}, Test: {len(test_df)}")


In [None]:
# Tokenize data
class NewsDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=256):
        self.texts = texts.tolist()
        self.labels = labels.tolist()
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        enc = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        return {
            "input_ids": enc["input_ids"].squeeze(0),
            "attention_mask": enc["attention_mask"].squeeze(0),
            "labels": torch.tensor(self.labels[idx], dtype=torch.long)
        }


In [None]:
# Build model
def build_model_and_tokenizer(model_type: str, num_labels: int):
    """
    model_type: 'bert' hoặc 'xlnet'
    """
    if model_type == "bert":
        tokenizer = BertTokenizerFast.from_pretrained(
            "bert-base-uncased", do_lower_case=True
        )
        model = BertForSequenceClassification.from_pretrained(
            "bert-base-uncased", num_labels=num_labels
        )
    elif model_type == "xlnet":
        tokenizer = XLNetTokenizerFast.from_pretrained(
            "xlnet-base-cased", do_lower_case=True
        )
        model = XLNetForSequenceClassification.from_pretrained(
            "xlnet-base-cased", num_labels=num_labels
        )
    else:
        raise ValueError("Chỉ hỗ trợ 'bert' hoặc 'xlnet'")
    return tokenizer, model


In [None]:
# Training class
class Trainer:
    def __init__(self, model, optimizer, criterion,
                 train_loader, val_loader, device,
                 epochs=5, patience=1):
        self.model = model.to(device)
        self.optimizer = optimizer
        self.criterion = criterion
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.device = device
        self.epochs = epochs
        self.patience = patience
        self.history = {
            "train_loss": [], "train_acc": [],
            "val_loss": [],   "val_acc": []
        }

    def train_epoch(self):
        self.model.train()
        losses, correct = [], 0
        for batch in self.train_loader:
            self.optimizer.zero_grad()
            batch = {k: v.to(self.device) for k,v in batch.items()}
            outputs = self.model(
                input_ids=batch["input_ids"],
                attention_mask=batch["attention_mask"],
                return_dict=True
            )
            logits = outputs.logits
            loss = self.criterion(logits, batch["labels"])
            loss.backward()
            self.optimizer.step()

            losses.append(loss.item())
            preds = logits.argmax(dim=1)
            correct += (preds == batch["labels"]).sum().item()

        return correct / len(self.train_loader.dataset), np.mean(losses)

    def eval_epoch(self, loader):
        self.model.eval()
        losses, correct = [], 0
        with torch.no_grad():
            for batch in loader:
                batch = {k: v.to(self.device) for k,v in batch.items()}
                logits = self.model(
                    input_ids=batch["input_ids"],
                    attention_mask=batch["attention_mask"],
                    return_dict=True
                ).logits
                loss = self.criterion(logits, batch["labels"])
                losses.append(loss.item())
                preds = logits.argmax(dim=1)
                correct += (preds == batch["labels"]).sum().item()
        return correct / len(loader.dataset), np.mean(losses)

    def train(self):
        best_loss = float("inf")
        patience_ctr = 0

        for epoch in range(1, self.epochs+1):
            tr_acc, tr_loss = self.train_epoch()
            va_acc, va_loss = self.eval_epoch(self.val_loader)

            self.history["train_acc"].append(tr_acc)
            self.history["train_loss"].append(tr_loss)
            self.history["val_acc"].append(va_acc)
            self.history["val_loss"].append(va_loss)

            print(f"[{epoch}/{self.epochs}] "
                  f"Train loss={tr_loss:.4f}, acc={tr_acc:.4f} | "
                  f"Val loss={va_loss:.4f}, acc={va_acc:.4f}")

            if va_loss < best_loss:
                best_loss = va_loss
                patience_ctr = 0
                torch.save(self.model.state_dict(), f"best_{model_type}.pt")
            else:
                patience_ctr += 1
                if patience_ctr > self.patience:
                    print("Early stopping")
                    break

        # Load best
        self.model.load_state_dict(torch.load(f"best_{model_type}.pt"))
        return self.history


In [None]:
# Plot helper
def plot_history(history, title_prefix=""):
    # Loss
    plt.figure(figsize=(5,3))
    plt.plot(history["train_loss"], label="train_loss")
    plt.plot(history["val_loss"],   label="val_loss")
    plt.title(f"{title_prefix} Loss")
    plt.legend()
    plt.show()

    # Acc
    plt.figure(figsize=(5,3))
    plt.plot(history["train_acc"], label="train_acc")
    plt.plot(history["val_acc"],   label="val_acc")
    plt.title(f"{title_prefix} Accuracy")
    plt.legend()
    plt.show()


In [None]:
# Training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_labels = train_df["label"].nunique()

results = {}
for model_type in ["bert", "xlnet"]:
    print("\n" + "="*30)
    print(f"Training & Eval: {model_type.upper()}")
    tokenizer, model = build_model_and_tokenizer(model_type, num_labels)

    # DataLoader
    train_ds = NewsDataset(train_df["text"], train_df["label"], tokenizer)
    val_ds   = NewsDataset(val_df["text"],   val_df["label"],   tokenizer)
    test_ds  = NewsDataset(test_df["text"],  test_df["label"],  tokenizer)
    train_loader = DataLoader(train_ds, batch_size=16, shuffle=True)
    val_loader   = DataLoader(val_ds,   batch_size=16, shuffle=False)
    test_loader  = DataLoader(test_ds,  batch_size=16, shuffle=False)

    # Optimizer & loss
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
    criterion = torch.nn.CrossEntropyLoss()

    # Train
    trainer = Trainer(
        model, optimizer, criterion,
        train_loader, val_loader,
        device, epochs=5, patience=1
    )
    history = trainer.train()

    # Plot
    plot_history(history, title_prefix=model_type.upper())

    # Test set
    test_acc, test_loss = trainer.eval_epoch(test_loader)
    print(f"Test {model_type}: loss={test_loss:.4f}, acc={test_acc:.4f}")

    results[model_type] = {
        "history": history,
        "test_acc": test_acc,
        "test_loss": test_loss
    }


In [None]:
# Inference engine
class InferenceEngine:
    def __init__(self, model, tokenizer, device):
        self.model = model.to(device)
        self.tokenizer = tokenizer
        self.device = device

    def predict(self, text, max_length=256):
        enc = self.tokenizer(
            text, truncation=True, padding="max_length",
            max_length=max_length, return_tensors="pt"
        )
        inputs = {k: v.to(self.device) for k,v in enc.items()}
        self.model.eval()
        with torch.no_grad():
            logits = self.model(**inputs).logits
            probs = F.softmax(logits, dim=1)
            conf, pred = probs.max(dim=1)
        return pred.item(), conf.item()

for model_type in results:
    print("\n--- Inference with", model_type.upper(), "---")
    tokenizer, _ = build_model_and_tokenizer(model_type, num_labels)
    model = build_model_and_tokenizer(model_type, num_labels)[1]
    model.load_state_dict(torch.load(f"best_{model_type}.pt"))
    engine = InferenceEngine(model, tokenizer, device)

    samples = test_df.head(10)
    for _, row in samples.iterrows():
        p, c = engine.predict(row["text"])
        print(f"Text[:50]: {row['text'][:50]:50s} | True: {row['label']} → Pred: {p} (conf={c:.2f})")
