In [1]:
!pip install transformers

!pip install nltk

!python -m nltk.downloader punkt

!pip install pycocoevalcap

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
Collecting pycocoevalcap
  Downloading pycocoevalcap-1.2-py3-none-any.whl.metadata (3.2 kB)
Downloading pycocoevalcap-1.2-py3-none-any.whl (104.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.3/104.3 MB[0m [31m8.6 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: pycocoevalcap
Successfully installed pycocoevalcap-1.2


In [1]:
import torch
import torch.nn as nn
import math
from transformers import ViTModel
import os
from tqdm import tqdm
from process_data import create_dataloaders

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
class ViT_VN_Transformer(nn.Module):
    def __init__(self, vocab_size, embed_dim=512, num_heads=8, num_layers=4, max_len=40, unfreeze_layers=2):
        super(ViT_VN_Transformer, self).__init__()
        
        # ENCODER: Pre-trained ViT (Google)
        self.vit = ViTModel.from_pretrained("google/vit-base-patch16-224")
        
        # Đóng băng (Freeze) toàn bộ
        for param in self.vit.parameters():
            param.requires_grad = False

        if unfreeze_layers > 0:
            layers_to_train = self.vit.encoder.layer[-unfreeze_layers:]
            
            for layer in layers_to_train:
                for param in layer.parameters():
                    param.requires_grad = True
            
            for param in self.vit.layernorm.parameters():
                param.requires_grad = True

        # Kích thước đầu ra của ViT Base là 768
        self.vit_hidden_size = 768
        

        # Cầu nối (Bridge): Chuyển từ 768 (ViT) -> 512 (Decoder)
        self.feature_proj = nn.Linear(self.vit_hidden_size, embed_dim)
        
        
        # --- 2. DECODER: Transformer thuần ---
        self.embed_dim = embed_dim
        self.max_len = max_len
        
        # Embedding chữ: Biến ID số thành vector
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        
        # Positional Encoding: Giúp model biết thứ tự từ (trước/sau)
        # Ở đây dùng Learnable Positional Embedding cho đơn giản và hiệu quả
        self.pos_embedding = nn.Parameter(torch.randn(1, max_len, embed_dim))
        
        # Khối Transformer Decoder
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=embed_dim, 
            nhead=num_heads, 
            batch_first=True, # Quan trọng: input shape là (Batch, Seq, Dim)
            dim_feedforward=2048,
            dropout=0.3
        )
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
        
        # Lớp đầu ra: Biến vector 512 thành xác suất của từng từ trong từ điển
        self.fc_out = nn.Linear(embed_dim, vocab_size)
        
        self.dropout = nn.Dropout(0.1)

    def get_tgt_mask(self, size):
        # Tạo mask tam giác để che tương lai
        # Model không được nhìn thấy từ thứ 2 khi đang đoán từ thứ 1
        mask = torch.triu(torch.ones(size, size) * float('-inf'), diagonal=1)
        return mask

    def forward(self, images, captions):
        """
        Hàm này dùng để TRAINING
        images: (Batch, 3, 224, 224)
        captions: (Batch, Seq_Len) - Bao gồm cả <bos> và <eos>
        """
        device = images.device
        
        # --- A. Encode Ảnh ---
        vit_output = self.vit(pixel_values=images).last_hidden_state
        visual_features = self.feature_proj(vit_output)
        
        # --- B. Embed Text ---
        # captions shape: (Batch, Seq_Len)
        seq_len = captions.size(1)
        
        # Biến chữ thành vector + Cộng vị trí
        tgt_emb = self.embedding(captions) * math.sqrt(self.embed_dim)
        tgt_emb = tgt_emb + self.pos_embedding[:, :seq_len, :]
        tgt_emb = self.dropout(tgt_emb)
        
        # --- C. Masking ---
        # Tạo mask che tương lai (Causal Mask)
        tgt_mask = self.get_tgt_mask(seq_len).to(device)
        
        # Padding Mask (để model không quan tâm đến số 0 ở cuối câu)
        # Giả sử pad_idx = 0. Tạo mask True ở nơi có padding
        tgt_padding_mask = (captions == 0).to(device) 
        
        # --- D. Decode ---
        # memory: là visual_features
        # tgt: là caption embeddings
        output = self.decoder(
            tgt=tgt_emb, 
            memory=visual_features, 
            tgt_mask=tgt_mask,
            tgt_key_padding_mask=tgt_padding_mask,
            memory_key_padding_mask=None # ViT không có padding
        )
        
        # --- E. Dự đoán ---
        # Output: (Batch, Seq_Len, Vocab_Size)
        prediction = self.fc_out(output)
        
        return prediction

    def generate(self, images, bos_idx, eos_idx, max_len=40):
        """
        Hàm này dùng để SUY LUẬN (INFERENCE/TEST)
        Chạy vòng lặp sinh từng từ một.
        """
        self.eval()
        device = images.device
        batch_size = images.size(0)
        
        # 1. Encode ảnh (Chỉ làm 1 lần)
        with torch.no_grad():
            vit_output = self.vit(pixel_values=images).last_hidden_state
            visual_features = self.feature_proj(vit_output) # (B, 197, 512)
            
        # 2. Khởi tạo câu bắt đầu bằng <bos>
        # Input hiện tại: [BOS]
        generated = torch.full((batch_size, 1), bos_idx, dtype=torch.long).to(device)
        
        # 3. Vòng lặp sinh từ
        for _ in range(max_len):
            seq_len = generated.size(1)
            
            # Embed input hiện tại
            tgt_emb = self.embedding(generated) * math.sqrt(self.embed_dim)
            tgt_emb = tgt_emb + self.pos_embedding[:, :seq_len, :]
            
            # Đưa vào Decoder
            # Lưu ý: Lúc generate ta không cần mask che tương lai vì ta chưa có tương lai
            output = self.decoder(tgt=tgt_emb, memory=visual_features)
            
            # Lấy output của từ cuối cùng
            last_token_output = output[:, -1, :] # (Batch, Dim)
            
            # Dự đoán từ tiếp theo (Logits -> Argmax)
            logits = self.fc_out(last_token_output)
            next_token = logits.argmax(dim=-1).unsqueeze(1) # (Batch, 1)
            
            # Nối vào câu
            generated = torch.cat((generated, next_token), dim=1)
            
            # (Tùy chọn) Nếu muốn tối ưu tốc độ:
            # Kiểm tra xem tất cả các mẫu trong batch đã gặp <eos> chưa để break sớm
            # Nhưng để đơn giản code thì cứ chạy hết max_len cũng được.
            
        return generated

In [None]:
# Setup paths
CSV_PATH = "data/final_combined_ds_tokenized.csv"
VOCAB_PATH = "data/vocab_vi_underthesea.json"
DATA_ROOT = "data"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"DEVICE: {device}")

os.makedirs("model_ver5", exist_ok=True)

In [None]:
# Data Loading
print("Creating dataloaders...")
train_loader, val_loader, test_loader, train_ds, val_ds, test_ds = create_dataloaders(
    csv_path=CSV_PATH,
    vocab_path=VOCAB_PATH,
    data_root=DATA_ROOT,
    batch_size=64
)

vocab_size = len(train_ds.stoi)
pad_idx = train_ds.pad_idx
bos_idx = train_ds.bos_idx
eos_idx = train_ds.eos_idx
print(f"\nDATASET INFO:")
print(f"  Vocab size: {vocab_size}")
print(f"  Train batches: {len(train_loader)}")
print(f"  Val batches: {len(val_loader)}")
print(f"  Pad/BOS/EOS: {pad_idx}/{bos_idx}/{eos_idx}\n")


# Cấu hình Hyperparameters
VOCAB_SIZE = len(train_ds.stoi) # Lấy từ dataset đã tạo
EMBED_DIM = 512
NUM_HEADS = 8
NUM_LAYERS = 4                  # Model nhỏ gọn cho 30k ảnh
UNFREEZE_LAYERS = 2             # Mở khóa 2 lớp cuối của ViT
EPOCHS = 20                     # Train tiếng Việt từ đầu cần nhiều epoch hơn fine-tune


# Khởi tạo Model
model = ViT_VN_Transformer(
    vocab_size=VOCAB_SIZE,
    embed_dim=EMBED_DIM,
    num_heads=NUM_HEADS,
    num_layers=NUM_LAYERS,
    unfreeze_layers=UNFREEZE_LAYERS,
    max_len=40
).to(device)

# 3. Loss Function & Optimizer
criterion = nn.CrossEntropyLoss(ignore_index=train_ds.pad_idx, label_smoothing=0.1)
vit_params = list(map(id, model.vit.parameters()))
base_params = filter(lambda p: id(p) not in vit_params, model.parameters())

optimizer = torch.optim.AdamW([
    {'params': model.vit.parameters(), 'lr': 1e-5},  # Learning rate nhỏ cho Encoder
    {'params': base_params, 'lr': 1e-4}              # Learning rate chuẩn cho Decoder
], weight_decay=1e-4) # Thêm weight_decay để giảm overfitting

In [None]:
train_losses = []
val_losses = []
best_val_loss = float('inf')
PATIENCE = 3
patience_counter = 0

for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    
    # TRAINING
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Train]")
    for images, captions, lengths in progress_bar:
        images = images.to(device)
        captions = captions.to(device)
        
        # Input cho model: Bỏ từ cuối cùng (<eos> hoặc padding cuối)
        # Target (Đáp án): Bỏ từ đầu tiên (<bos>)
        decoder_input = captions[:, :-1]
        targets = captions[:, 1:]
        
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(images, decoder_input) 
        # Output shape: (Batch, Seq_Len, Vocab)
        
        # Tính Loss
        # Flatten dữ liệu để tính CrossEntropy: (Batch * Seq_Len, Vocab)
        loss = criterion(outputs.reshape(-1, VOCAB_SIZE), targets.reshape(-1))
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        progress_bar.set_postfix({'loss': loss.item()})
        
    avg_train_loss = total_loss / len(train_loader)
    train_losses.append(avg_train_loss)
    
    # --- VALIDATION ---
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for images, captions, lengths in val_loader:
            images = images.to(device)
            captions = captions.to(device)
            
            decoder_input = captions[:, :-1]
            targets = captions[:, 1:]
            
            outputs = model(images, decoder_input)
            loss = criterion(outputs.reshape(-1, VOCAB_SIZE), targets.reshape(-1))
            total_val_loss += loss.item()
            
    avg_val_loss = total_val_loss / len(val_loader)
    val_losses.append(avg_val_loss)
    
    print(f"Epoch {epoch+1}: Train Loss = {avg_train_loss:.4f} | Val Loss = {avg_val_loss:.4f}")


    # CHECKPOINTING
    # Save best model
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        patience_counter = 0
        
        torch.save(model.state_dict(), 'model_ver5/best_model.pth')
        print(f"  ✓ Best model saved! Val Loss: {avg_val_loss:.4f}")
    else:
        patience_counter += 1
        print(f"  ⚠ No improvement. Patience: {patience_counter}/{PATIENCE}")
    
    # EARLY STOPPING
    if patience_counter >= PATIENCE:
        print(f"\n⚠ Early stopping triggered after {epoch+1} epochs!")
        print(f"Best Val Loss: {best_val_loss:.4f}")
        break

print("Hoàn tất huấn luyện!")

In [None]:
from evaluate import generate_captions_for_dataset, compute_bleu_meteor, compute_cider
import json

BEST_MODEL_PATH = "model_5/best_model.pth"

model.load_state_dict(torch.load(BEST_MODEL_PATH, map_location=device))

test_refs, test_hyps = generate_captions_for_dataset(
    model,
    test_ds,
    device,
    bos_idx=bos_idx,
    eos_idx=eos_idx,
    max_len=40,
    batch_size=64,
)

metrics = compute_bleu_meteor(test_refs, test_hyps)

try:
    cider_score = compute_cider(test_refs, test_hyps)
    metrics["CIDEr"] = cider_score
except Exception as e:
    print("Không tính được CIDEr, lỗi:", e)

print("ViT + Transformer:")
for k, v in metrics.items():
    print(f"{k}: {v:.4f}")


output_caption_file = "model_5/vit_transformer_generated_captions.json"
save_captions = {k: {"pred": v, "ref": test_refs.get(k, [])} for k, v in test_hyps.items()}
with open(output_caption_file, "w", encoding="utf-8") as f:
    json.dump(save_captions, f, indent=2, ensure_ascii=False)

In [None]:
# Lưu train và val losses vào file để vẽ đồ thị sau
with open("model_5/train_val_losses.txt", "w") as f:
    f.write("Train Losses:\n")
    f.write(",".join([str(loss) for loss in train_losses]) + "\n")
    f.write("Val Losses:\n")
    f.write(",".join([str(loss) for loss in val_losses]) + "\n")