In [1]:
import torch
import torch.nn as nn
import math

# TRANSFORMER WITH WORD-BASED TOKENIZER


In [2]:
class TransformerPhonemeEmbedding(nn.Module):
    def __init__(self, vocab_size: int, d_model: int, max_len: int = 512, padding_idx: int = 0, dropout_rate: float = 0.1):
        """
        Khởi tạo lớp Embedding phù hợp với Transformer.

        Args:
            vocab_size (int): Kích thước từ điển của tất cả các âm vị.
            d_model (int): Kích thước cuối cùng của vector đầu ra (kích thước ẩn của Transformer).
                           Phải chia hết cho 4 nếu dùng phương pháp Concatenate.
            max_len (int): Chiều dài tối đa của câu.
            padding_idx (int): Chỉ số của token PAD (thường là 0).
            dropout_rate (float): Tỷ lệ Dropout.
        """
        super().__init__()
        
        # 1. Phoneme Embedding Setup
        # Kích thước embedding cho mỗi thành phần âm vị
        # Giả sử ta dùng Concatenate, mỗi thành phần sẽ có kích thước d_model / 4
        assert d_model % 4 == 0, "d_model phải chia hết cho 4 cho Concatenate."
        self.phoneme_embed_dim = d_model // 4
        
        self.onset_embed = nn.Embedding(vocab_size, self.phoneme_embed_dim, padding_idx=padding_idx)
        self.medial_embed = nn.Embedding(vocab_size, self.phoneme_embed_dim, padding_idx=padding_idx)
        self.nucleus_embed = nn.Embedding(vocab_size, self.phoneme_embed_dim, padding_idx=padding_idx)
        self.coda_embed = nn.Embedding(vocab_size, self.phoneme_embed_dim, padding_idx=padding_idx)
        
        self.d_model = d_model
        
        # 2. Positional Encoding Setup (Được học)
        # Tốt hơn là dùng Positional Encoding cố định (như Transformer gốc)
        self.pos_encoder = self._get_fixed_positional_encoding(d_model, max_len)
        
        # 3. Dropout
        self.dropout = nn.Dropout(dropout_rate)


    def _get_fixed_positional_encoding(self, d_model: int, max_len: int) -> nn.Parameter:
        """
        Tạo Positional Encoding cố định (dạng sin/cos) theo kiến trúc Transformer gốc.
        """
        # Tạo ma trận PE (max_len, d_model)
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        # Thêm một chiều Batch, biến thành Parameter để nó được lưu trữ (nhưng không được học)
        pe = pe.unsqueeze(0) 
        return nn.Parameter(pe, requires_grad=False)
        

    def forward(self, input_tensor: torch.Tensor) -> torch.Tensor:
        """
        Args:
            input_tensor: Tensor có shape (Batch_Size, Seq_Len, 4).
                          4 cột là ID của Onset, Medial, Nucleus, Coda.
                          
        Returns:
            Tensor có shape (Batch_Size, Seq_Len, d_model) sẵn sàng cho Decoder/Encoder.
        """
        B, L, _ = input_tensor.shape
        
        # 1. PHONEME EMBEDDING (Học Biểu diễn Âm vị)
        
        # Tách input tensor thành 4 tensor riêng biệt (chỉ số 0, 1, 2, 3)
        onset_ids = input_tensor[..., 0]
        medial_ids = input_tensor[..., 1]
        nucleus_ids = input_tensor[..., 2]
        coda_ids = input_tensor[..., 3]
        
        # Lấy Embedding cho từng thành phần
        onset_embedded = self.onset_embed(onset_ids)      
        medial_embedded = self.medial_embed(medial_ids)    
        nucleus_embedded = self.nucleus_embed(nucleus_ids)  
        coda_embedded = self.coda_embed(coda_ids)          
        
        # CONCATENATE 4 vector lại (B, L, d_model)
        phoneme_embedding = torch.cat(
            (onset_embedded, medial_embedded, nucleus_embedded, coda_embedded), 
            dim=-1 
        )
        
        # 2. POSITIONAL ENCODING (Thêm Vị trí)
        # Lấy Positional Encoding cho chiều dài hiện tại L
        # 
        positional_encoding = self.pos_encoder[:, :L, :]
        
        # Cộng Positional Encoding vào Phoneme Embedding
        output = phoneme_embedding + positional_encoding
        
        # 3. DROPOUT
        return self.dropout(output)

In [None]:
import torch

def generate_square_subsequent_mask(sz, device):
    """
    Tạo mask hình tam giác vuông để che các từ tương lai.
    Input: sz (độ dài câu tóm tắt)
    Output: Tensor (sz, sz) chứa 0 và -inf
    """
    mask = (torch.triu(torch.ones((sz, sz), device=device)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

In [4]:
import torch
import torch.nn as nn
# Import các module bạn đã viết trước đó
# from models.transformer.embedding import TransformerPhonemeEmbedding
# from models.decoder import PhonemeDecoder

class ViSeq2SeqTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, max_len, device, dropout=0.1):
        super().__init__()
        
        self.device = device
        
        # 1. EMBEDDING & ENCODER
        # Embedding cho Source (Văn bản gốc)
        self.src_embedding = TransformerPhonemeEmbedding(vocab_size, d_model, max_len, padding_idx=0, dropout_rate=dropout)
        
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)

        # 2. DECODER (Đã bao gồm Embedding cho Target bên trong class PhonemeDecoder ta thiết kế trước đó)
        # Lưu ý: Chúng ta cần sửa lại PhonemeDecoder một chút để nó nhận embedding từ bên ngoài hoặc tự tạo.
        # Để tiện nhất, ta khai báo Embedding Target riêng ở đây cho đồng bộ.
        self.tgt_embedding = TransformerPhonemeEmbedding(vocab_size, d_model, max_len, padding_idx=0, dropout_rate=dropout)
        
        decoder_layer = nn.TransformerDecoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, batch_first=True)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)

        # 3. GENERATOR HEADS (4 đầu ra)
        self.onset_head = nn.Linear(d_model, vocab_size)
        self.medial_head = nn.Linear(d_model, vocab_size)
        self.nucleus_head = nn.Linear(d_model, vocab_size)
        self.coda_head = nn.Linear(d_model, vocab_size)

    def create_padding_mask(self, tensor):
        """Tạo mask cho vị trí padding (Onset == 0)"""
        # tensor: (Batch, Seq_Len, 4) -> Mask: (Batch, Seq_Len)
        return (tensor[..., 0] == 0)

    def forward(self, src, tgt):
        """
        src: (Batch, Src_Len, 4)
        tgt: (Batch, Tgt_Len, 4) - Lưu ý: Đây là Decoder Input (đã bỏ <eos>)
        """
        
        # --- BƯỚC 1: TẠO MASK ---
        # Mask che padding cho Source và Target
        src_padding_mask = self.create_padding_mask(src).to(self.device)
        tgt_padding_mask = self.create_padding_mask(tgt).to(self.device)
        
        # Mask che tương lai cho Target (Causal Mask)
        tgt_seq_len = tgt.shape[1]
        tgt_mask = generate_square_subsequent_mask(tgt_seq_len, self.device)
        
        # --- BƯỚC 2: ENCODER ---
        # Embed Source
        src_emb = self.src_embedding(src) # (Batch, Src_Len, D_Model)
        
        # Qua Encoder
        memory = self.transformer_encoder(
            src=src_emb, 
            src_key_padding_mask=src_padding_mask
        )
        
        # --- BƯỚC 3: DECODER ---
        # Embed Target
        tgt_emb = self.tgt_embedding(tgt) # (Batch, Tgt_Len, D_Model)
        
        # Qua Decoder
        # memory là output của encoder
        dec_output = self.transformer_decoder(
            tgt=tgt_emb,
            memory=memory,
            tgt_mask=tgt_mask,                   # Che tương lai
            tgt_key_padding_mask=tgt_padding_mask, # Che padding của target
            memory_key_padding_mask=src_padding_mask # Che padding của memory (source)
        )
        
        # --- BƯỚC 4: DỰ ĐOÁN (4 Nhánh) ---
        logits_onset = self.onset_head(dec_output)
        logits_medial = self.medial_head(dec_output)
        logits_nucleus = self.nucleus_head(dec_output)
        logits_coda = self.coda_head(dec_output)
        
        return logits_onset, logits_medial, logits_nucleus, logits_coda

In [None]:
class PhonemeLoss(nn.Module):
    def __init__(self, padding_idx=0):
        super().__init__()
        # ignore_index=padding_idx để không tính loss cho các token padding
        self.criterion = nn.CrossEntropyLoss(ignore_index=padding_idx)
    
    def forward(self, outputs, targets):
        """
        outputs: Tuple (logit_onset, logit_medial, logit_nucleus, logit_coda)
                 Mỗi cái có shape (Batch, Seq_Len, Vocab_Size)
                 
        targets: Tensor (Batch, Seq_Len, 4) - Ground Truth Label
        """
        p_onset, p_medial, p_nucleus, p_coda = outputs
        
        # Tách target ra 4 phần tương ứng
        t_onset = targets[..., 0]
        t_medial = targets[..., 1]
        t_nucleus = targets[..., 2]
        t_coda = targets[..., 3]
        
        # Tính Loss cho từng phần
        # CrossEntropy yêu cầu input (Batch, Class, Seq) hoặc flatten
        # Ta reshape: (Batch * Seq_Len, Vocab_Size) vs (Batch * Seq_Len)
        
        vocab_size = p_onset.shape[-1]
        
        loss_onset = self.criterion(p_onset.reshape(-1, vocab_size), t_onset.reshape(-1))
        loss_medial = self.criterion(p_medial.reshape(-1, vocab_size), t_medial.reshape(-1))
        loss_nucleus = self.criterion(p_nucleus.reshape(-1, vocab_size), t_nucleus.reshape(-1))
        loss_coda = self.criterion(p_coda.reshape(-1, vocab_size), t_coda.reshape(-1))
        
        # Tổng Loss
        total_loss = loss_onset + loss_medial + loss_nucleus + loss_coda
        
        return total_loss

In [6]:
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm # Thư viện tạo thanh loading

# --- IMPORT CÁC MODULE CỦA BẠN ---
from text_sum_dataset import ViTextSumDataset
from collate_fn_phoneme import ViCollator
from vocabs.viword_vocab import ViWordVocab 
from configs.phoneme_config import Config

# Giả sử bạn đã lưu Model và Loss vào file models.py hoặc định nghĩa ngay bên trên
# from models import ViSeq2SeqTransformer, PhonemeLoss 
# (Nếu chưa tách file thì phải paste class Model và Loss vào đây trước)

def train():
    # 1. Khởi tạo Config
    config = Config()
    
    # 2. Khởi tạo Vocab & Dataset
    print("Đang xây dựng từ điển...")
    vocab_obj = ViWordVocab(config)
    
    # ⚠️ QUAN TRỌNG: Cập nhật VOCAB_SIZE vào config sau khi vocab đã chạy xong
    config.VOCAB_SIZE = len(vocab_obj.itos)
    print(f"Vocab Size: {config.VOCAB_SIZE}")

    # Cài đặt đường dẫn train
    config.path = config.TRAIN 
    print("Đang tải dữ liệu Train...")
    train_dataset = ViTextSumDataset(config, vocab_obj)

    # 3. Khởi tạo DataLoader
    collator = ViCollator(padding_idx=vocab_obj.padding_idx)
    train_loader = DataLoader(
        train_dataset, 
        batch_size=config.BATCH_SIZE, 
        shuffle=True, 
        num_workers=2, 
        collate_fn=collator
    )

    # 4. Khởi tạo Model
    print("Đang khởi tạo Model...")
    model = ViSeq2SeqTransformer(
        vocab_size=config.VOCAB_SIZE, # Lấy từ biến vừa cập nhật
        d_model=config.D_MODEL,
        nhead=config.N_HEAD,
        num_encoder_layers=config.NUM_ENCODER_LAYERS,
        num_decoder_layers=config.NUM_DECODER_LAYERS,
        dim_feedforward=config.DIM_FEEDFORWARD,
        max_len=config.MAX_LEN,
        device=config.DEVICE,
        dropout=config.DROPOUT
    ).to(config.DEVICE)

    # 5. Optimizer & Loss
    # padding_idx=0 để không tính loss cho phần đệm
    criterion = PhonemeLoss(padding_idx=vocab_obj.padding_idx)
    optimizer = torch.optim.Adam(model.parameters(), lr=config.LEARNING_RATE)

    # 6. TRAINING LOOP
    print("Bắt đầu huấn luyện...")
    model.train()
    
    for epoch in range(config.NUM_EPOCHS):
        # Tạo thanh loading bar cho Epoch hiện tại
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{config.NUM_EPOCHS}", unit="batch")
        
        total_loss = 0
        
        for batch in progress_bar:
            # Chuyển dữ liệu sang GPU/CPU
            src = batch["src"].to(config.DEVICE)             # (B, Src_Len, 4)
            tgt_input = batch["decoder_input"].to(config.DEVICE) # (B, Tgt_Len, 4)
            labels = batch["labels"].to(config.DEVICE)       # (B, Tgt_Len, 4)
            
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(src, tgt_input)
            
            # Tính Loss
            loss = criterion(outputs, labels)
            
            # Backward pass
            loss.backward()
            
            # Clip grad norm để tránh bùng nổ gradient (tùy chọn nhưng khuyên dùng cho Transformer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            
            optimizer.step()
            
            # Cập nhật thông tin lên thanh loading
            current_loss = loss.item()
            total_loss += current_loss
            progress_bar.set_postfix(loss=f"{current_loss:.4f}")
        
        # In loss trung bình của cả epoch
        avg_loss = total_loss / len(train_loader)
        print(f"Kết thúc Epoch {epoch+1} | Average Loss: {avg_loss:.4f}")
        
        torch.save(model.state_dict(), f"checkpoint_epoch_{epoch+1}.pt")

if __name__ == '__main__':
    train()

ModuleNotFoundError: No module named 'text_sum_dataset'