In [None]:
!pip install torch --upgrade

Collecting torch
  Downloading torch-2.9.1-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (30 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.8.93 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.8.93-py3-none-manylinux2010_x86_64.manylinux_2_12_x86_64.whl.metadata (1.7 kB)
Collecting nvidia-cuda-runtime-cu12==12.8.90 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.8.90-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (1.7 kB)
Collecting nvidia-cuda-cupti-cu12==12.8.90 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.8.90-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (1.7 kB)
Collecting nvidia-cublas-cu12==12.8.4.1 (from torch)
  Downloading nvidia_cublas_cu12-12.8.4.1-py3-none-manylinux_2_27_x86_64.whl.metadata (1.7 kB)
Collecting nvidia-cufft-cu12==11.3.3.83 (from torch)
  Downloading nvidia_cufft_cu12-11.3.3.83-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (1.7 kB)
Collecting nvidia-curand-cu12==10.3.9.90 (from torch)
  

In [None]:
import os, math, random, re, time

from pathlib import Path
from typing import List

import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np

In [None]:
# Code hàm set seed
# Mục đích: đảm bảo được tính nhất quán trong quá trình training (ví dụ: đảm bảo các lần xáo dữ liệu đề như nhau)
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
# Hàm tokenize
def simple_tokenize(text):
    text = text.lower()
    # Xử lý loại bỏ các ký tự đặc biệt như dấu câu, khoảng trắng dư thừa
    tokens = re.findall(r"[a-z0-9]+", text)
    # Nếu tokens rỗng => trả về ["<empty>"]
    # Ngược lại, trả về tokens
    return tokens if tokens else ["<empty>"]

In [None]:
class Vocab:
    def __init__(self, min_freq=2, max_size=50000, specials=None):
        # Tạo một dictionary để đếm tần suất xuất hiện của token trong câu (key: token str, value: freq int)
        self.freqs = {}
        # Tạo một list để map ID về token (dùng để decode)
        self.itos = []
        # Tạo một dictionary để map token với ID (dùng để encode)
        self.stoi = {}
        # Giảm lượng vocab bằng cách loại bỏ các token xuất hiện quá ít trong câu
        # Các token hiếm có thể gây ra noise trong quá trình huấn luyện
        # Tốn bộ nhớ khi được đem đi tính "attention"
        self.min_freq = min_freq
        self.max_size = max_size
        # Giới hạn kích thước vocab tối đa
        # Tạo list các token đặc biệt
        # <pad>: thêm vào tăng độ dài câu văn
        # <unk>: các token lạ, không có trong bộ vocab
        # <cls>: đánh dấu câu văn được dùng cho bài toán phân loại
        self.specials = ["<pad>", "<unk>", "<cls>"]

    # Đếm tần suất xuất hiện của các token trong câu
    def add_token(self, token):
        self.freqs[token] = self.freqs.get(token, 0) + 1

    # Xây dựng bộ vocab
    # Sau khi add_token từ bộ dataset, cần phải build để tạo stoi/itos
    # Nếu mà không build => quá trình encode sẽ bị lỗi
    def build(self):
        self.itos = list(self.specials)
        for i, sp in enumerate(self.specials):
            self.stoi[sp] = i
        # Sắp xếp tần suất xuất hiện tăng dần
        # items = sorted([(t, f) for t, f in self.freqs.items() if t not in self.speicals])
        # Sắp xếp tần suất xuất hiện giảm dần
        items = sorted([(t, f) for t, f in self.freqs.items() if t not in self.specials],
                       key=lambda x: (-x[1], x[0]))
        for tok, freq in items:
            if freq < self.min_freq or len(self.itos) >= self.max_size:
                continue
            self.stoi[tok] = len(self.itos)
            self.itos.append(tok)

    # Encode (tạo các input dạng số cho model)
    def encode(self, tokens):
        unk = self.stoi.get("<unk>", 1)
        return [self.stoi.get(token, unk) for token in tokens]

    # Hai phương thức trả về ID của <pad> và <cls>
    # static (Vocab.pad_idx())
    # property (vocab = Vocab) => vocab.build() NO => YES vocab.pad_idx
    @property
    def pad_idx(self): return self.stoi["<pad>"]
    @property
    def cls_idx(self): return self.stoi["<cls>"]

    def __len__(self):
        return len(self.itos)

In [None]:
class TextClsDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len=64, add_cls=True):
        # Tạo list lưu các văn bản
        self.texts = texts
        # Tạo list lưu các label của text
        self.labels = labels
        # Lưu bộ vocab
        self.vocab = vocab
        # Khai báo độ dài tối đa cho phép của một văn bản
        self.max_len = max_len
        self.add_cls = add_cls

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        # Lấy text và label tại idx, để xử lý từng sample cụ thể
        text = self.texts[idx]
        label = self.labels[idx]
        # Text ban đầu: "Tôi yêu pizza"
        # Tokenize text
        tokens = simple_tokenize(text)
        # => ["tôi", "yêu", "pizza"]
        # Thêm token <cls> vào đầu văn bản, vì đây là bài toán phân loại text
        if self.add_cls: tokens = ["<cls>"] + tokens
        # => ["<cls>", "tôi", "yêu", "pizza"]

        # Encode token thành ID
        ids = self.vocab.encode(tokens)[:self.max_len] # Giới hạn độ dài văn bản
        # Tạo một list attention mask
        # Ý nghĩa: sẽ ra hiệu cho model biết được là token nào sẽ được áp phép "attention"
        # Với <pad> => mô hình không được phép áp phép "attention"
        attn = [1] * len(ids)
        # Output: attn sẽ là [1, 1, ..., 1] cho token + [0, 0, ..., 0] cho các token <pad>
        while len(ids) < self.max_len:
            ids.append(self.vocab.pad_idx)
            attn.append(0)

        return torch.tensor(ids), torch.tensor(attn), torch.tensor(label)

In [None]:
# Hàm collate giúp chúng ta tự động gom nhiều mẫu dữ liệu lại thành một batch để đưa vào train mô hình một lần
def collate_fn(batch):
    ids = torch.stack([b[0] for b in batch])
    attn = torch.stack([b[1] for b in batch])
    labels = torch.stack([b[2] for b in batch])
    return ids, attn, labels

In [None]:
# Thiết kế kiến trúc Multi-Head Self-Attention - thành phần cốt lõi của Transformer Encoder
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        # Lưu tổng chiều embedding (D)
        self.d_model = d_model
        # Lưu số head (H), xử lý multi-head (song song)
        self.num_heads = num_heads
        # Tính chiều của mỗi head = (d_h = D / H)
        self.d_head = d_model // num_heads

        # Khai báo các Layer Linear cho Q, K, V
        # Layer Linear sẽ được sử dụng để project input thành Query (Q), tương tự cho Key (K) và Value (V)
        self.W_q = nn.Linear(d_model, d_model, bias=False)
        self.W_k = nn.Linear(d_model, d_model, bias=False)
        self.W_v = nn.Linear(d_model, d_model, bias=False)
        # Đây là các ma trận parameters sẽ được đưa vào huấn luyện => giúp model học cách biến đổi input

        # Layer project output sau khi input đã được tổng hợp từ các head (d_head)
        self.W_o = nn.Linear(d_model, d_model, bias=False)
        self.dropout = nn.Dropout(dropout) # Giúp tránh overfit

    def forward(self, x, attn_mask):
        # Lấy ra shape của input
        # B: Batch size
        # L: Sequence Length (độ dài của chuỗi sau khi padding)
        # D: Embedding Dimension (số chiều của vector biểu diễn mỗi token)
        B, L, D = x.shape
        H = self.num_heads
        d_h = self.d_head

        # 1. Project x thành Q qua W_q
        # 2. Reshape từ B, L, D sang (B, L, H, d_h) chuyển thành (B, H, L, d_h) (chuẩn bị cho bước tính muti-head self-attention)
        # => Chia D thành H head, mỗi head có d_h chiều
        Q = self.W_q(x).view(B, L, H, d_h).transpose(1, 2)
        # Tương tự cho K và V
        K = self.W_k(x).view(B, L, H, d_h).transpose(1, 2)
        V = self.W_v(x).view(B, L, H, d_h).transpose(1, 2)

        # Tính attention score
        scores = torch.matmul(Q, K.transpose(-2, -1))/math.sqrt(d_h)

        mask = attn_mask.unsqueeze(1).unsqueeze(2)
        scores = scores.masked_fill(mask==0, float("-inf"))
        attn = F.softmax(scores, dim=-1) # Biến scores thành xác suất
        attn = self.dropout(attn) # Tránh overfit

        # Tính output: attn @ V - nhân weights với value
        out = torch.matmul(attn, V).transpose(1, 2).contiguous().view(B, L, D)
        # Project output cuối qua W_o để tổng hợp thông tin từ multi-head
        out = self.W_o(out)
        return out

In [None]:
# Triển khai kiến trúc Transformer Encoder
# Self-Attention + AddNorm + Feed Forward Neural Network + AddNorm

class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        # Khởi tạo lớp Multi-Head Self-Attention
        self.mha = MultiHeadSelfAttention(d_model, num_heads, dropout)
        # Ổn định quá trình huấn luyện (tránh gradient vanish/explode)
        # Giúp cho mô hình hội tụ nhanh hơn => Học nhanh hơn
        self.norm1 = nn.LayerNorm(d_model)
        # Feed Forward Neural Network
        # Thêm tính phi tuyến => giúp model học được các biểu diễn phức tạp sau khi các token đã được áp phép "attention"
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model)
        )
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self,x,attn_mask):
        # tính Self-Attention: self.mha(x, attn_mask) – gọi lớp MHA để tạo representation mới (dùng Q, K, V từ x, áp dụng mask).
        x = self.norm1(x + self.dropout(self.mha(x,attn_mask)))
        # Tính FFN: self.ff(x) – project lên d_ff, ReLU, drop, project về d_model.
        x = self.norm2(x + self.dropout(self.ff(x)))
        return x

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super().__init__()
        # Tạo một ma trận rỗng để lưu positional encoding cho mọi token từ vị trí 0 đến max_len - 1
        pe = torch.zeros(max_len, d_model)
        # Tạo vector vị trí: 0, 1, 2, ..., 511 => mỗi hàng đại diện cho một vị trí của token trong câu
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        # Tạo một dãy số giảm dần theo cấp số nhân => Tạo tần số khác nhau cho mỗi chiều vector
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0)/d_model))
        # Áp dụng sin/cos để đảm bảo mỗi vị trí là một vector độc nhất
        # Chiều chẵn => dùng sin
        pe[:,0::2] = torch.sin(position * div_term)
        # Chiều lẻ => dùng cos
        pe[:,1::2] = torch.cos(position * div_term)
        # pe là không có train (không tính gradient) => giống hằng số đã tính sẵn
        self.register_buffer("pe", pe.unsqueeze(0))

    # x = embedding(x) + positional_encoding(position)
    def forward(self,x):
        L = x.size(1)
        return x+self.pe[:,:L,:]

In [None]:
# Classifier: Embedding => Positional Encoding => Một loạt các lớp TransformerEncoderLayer => AddNorm => Linear
class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, num_classes, d_model=128, num_heads=4, num_layers=2, d_ff=256, max_len=128, pad_idx=0, dropout=0.1):
        super().__init__()
        # Input là sequence ID => chuyển thành vector trước khi thêm positional
        self.embed = nn.Embedding(vocab_size, d_model, padding_idx=pad_idx)
        # Thêm positional => thêm "vị trí" cho token trước khi qua lớp Encoder
        self.pos = PositionalEncoding(d_model, max_len=max_len)
        # Khai báo hàng loạt các lớp Transformer Encoder
        self.layers = nn.ModuleList(
            [TransformerEncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        self.norm = nn.LayerNorm(d_model)
        self.cls_head = nn.Linear(d_model, num_classes)

    def forward(self,ids,attn_mask):
        x = self.embed(ids)
        x = self.pos(x)
        for l in self.layers: x = l(x,attn_mask)
        x = self.norm(x)
        cls_rep = x[:,0,:]
        return self.cls_head(cls_rep)

In [None]:
# ------------------------------
# 7. Training
# ------------------------------
def train_one_epoch(model,loader,opt,device,criterion):
    model.train()
    total_loss,total,correct=0,0,0
    for ids,attn,labels in loader:
        ids,attn,labels=ids.to(device),attn.to(device),labels.to(device)
        opt.zero_grad()
        logits=model(ids,attn)
        loss=criterion(logits,labels)
        loss.backward()
        opt.step()
        total_loss+=loss.item()*ids.size(0)
        total+=ids.size(0)
        correct+=(logits.argmax(-1)==labels).sum().item()
    return total_loss/total, correct/total

@torch.no_grad()
def evaluate(model,loader,device,criterion):
    model.eval()
    total_loss,total,correct=0,0,0
    for ids,attn,labels in loader:
        ids,attn,labels=ids.to(device),attn.to(device),labels.to(device)
        logits=model(ids,attn)
        loss=criterion(logits,labels)
        total_loss+=loss.item()*ids.size(0)
        total+=ids.size(0)
        correct+=(logits.argmax(-1)==labels).sum().item()
    return total_loss/total, correct/total

# **Main**

In [None]:
!pip install kagglehub



In [None]:
import kagglehub

# Tải dataset
path = kagglehub.dataset_download("amananandrai/ag-news-classification-dataset")

print("Path to dataset files:", path)

Using Colab cache for faster access to the 'ag-news-classification-dataset' dataset.
Path to dataset files: /kaggle/input/ag-news-classification-dataset


In [None]:
train_df = pd.read_csv(os.path.join(path, "train.csv"), header=0, names=["class", "title", "desc"])
test_df = pd.read_csv(os.path.join(path, "test.csv"), header=0, names=["class", "title", "desc"])

In [None]:
# Tiền xử lý dữ liệu
def preprocess_agnews(df):
    # Ép kiểu int, trừ 1 để zero-based
    labels = df["class"].astype(int) - 1
    texts = (df["title"].astype(str) + " " + df["desc"].astype(str)).tolist()
    return texts, labels.tolist()

In [None]:
from sklearn.model_selection import train_test_split

X_train, y_train = preprocess_agnews(train_df)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, stratify=y_train, shuffle=True)
X_test, y_test = preprocess_agnews(test_df)

In [None]:
# Xây vocab từ dữ liệu train
vocab = Vocab(min_freq=5)
for sentences in X_train:
    for token in simple_tokenize(sentences):
        vocab.add_token(token)
vocab.build()

In [None]:
# Tạo các Dataloader
max_len = 64
batch_size=128

train_dataset = TextClsDataset(X_train, y_train, vocab, max_len)
val_dataset = TextClsDataset(X_val, y_train, vocab, max_len)
test_dataset = TextClsDataset(X_test, y_test, vocab, max_len)

train_loader = DataLoader(train_dataset, batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size, shuffle=True, collate_fn=collate_fn)

num_classes = 4

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Khởi tạo mô hình
transformer = TransformerClassifier(len(vocab), num_classes).to(device)
optimizer = torch.optim.AdamW(transformer.parameters(), lr=2e-3)
criterion = nn.CrossEntropyLoss()

In [None]:
for epoch in range(3):
    train_loss, train_accuracy = train_one_epoch(transformer, train_loader, optimizer, device, criterion)
    val_loss, val_accuracy = evaluate(transformer, val_loader, device, criterion)
    print(f"Epoch {epoch}: train_acc={train_accuracy:.2f} val_acc={val_accuracy:.2f}")

Epoch 0: train_acc=0.82 val_acc=0.25
Epoch 1: train_acc=0.90 val_acc=0.25
Epoch 2: train_acc=0.92 val_acc=0.25


In [None]:
test_loss, test_accuracy = evaluate(transformer, test_loader, device, criterion)
print(f"test_acc={train_accuracy:.2f}")

test_acc=0.92


In [None]:
# ------------------------------
# 11. Inference / Testing
# ------------------------------

def predict(model, text, vocab, max_len=64, add_cls=True):
    model.eval()
    toks = simple_tokenize(text)
    if add_cls:
        toks = ["<cls>"] + toks
    ids = vocab.encode(toks)[:max_len]
    attn = [1] * len(ids)
    while len(ids) < max_len:
        ids.append(vocab.pad_idx)
        attn.append(0)

    ids = torch.tensor(ids).unsqueeze(0).to(device)
    attn = torch.tensor(attn).unsqueeze(0).to(device)

    with torch.no_grad():
        logits = model(ids, attn)
        pred = logits.argmax(-1).item()
    return pred

# Label mapping cho AG News
id2label = {
    0: "World",
    1: "Sports",
    2: "Business",
    3: "Sci/Tech"
}

# Một vài câu test
test_sentences = [
    "NASA launches new satellite to explore Mars surface",
    "The stock market crashed due to inflation fears",
    "Manchester United won the Premier League title",
    "President meets with world leaders to discuss climate change"
]

print("=== Transformer Predictions ===")
for sent in test_sentences:
    pred = predict(transformer, sent, vocab)
    print(f"Text: {sent}\n -> Predicted: {id2label[pred]}\n")


=== Transformer Predictions ===
Text: NASA launches new satellite to explore Mars surface
 -> Predicted: Sci/Tech

Text: The stock market crashed due to inflation fears
 -> Predicted: Business

Text: Manchester United won the Premier League title
 -> Predicted: Sports

Text: President meets with world leaders to discuss climate change
 -> Predicted: World

