In [2]:
import torch
import torchvision
from torch import nn
from torchvision import transforms
import os
import re
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
import seaborn as sns
import random
import math
from math import sqrt
from PIL import Image
import requests
from io import BytesIO
# from tqdm.auto import tqdm
from tqdm.notebook import tqdm
# from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.cuda.amp import GradScaler, autocast
from transformers import AutoTokenizer
import torch.nn.functional as F
import copy
from difflib import SequenceMatcher
from timm.models.layers import DropPath, to_2tuple, trunc_normal_
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import contextlib
import string
from nltk.tokenize import word_tokenize
from sklearn.metrics.pairwise import cosine_similarity
import timm
from IPython.display import display
from pycocotools.coco import COCO
import shutil
import warnings
# Suppress FutureWarning
warnings.simplefilter(action='ignore', category=FutureWarning)

try:
    from rouge_score import rouge_scorer
    import pycocoevalcap
except:
    print("[INFO] Couldn't find ... installing it.")
    !pip install rouge_score
    !pip install pycocoevalcap
    from pycocoevalcap.bleu.bleu import Bleu
    from pycocoevalcap.meteor.meteor import Meteor
    from pycocoevalcap.rouge.rouge import Rouge
    from pycocoevalcap.cider.cider import Cider

# try:
#     import gradio as gr
# except:
#     print("[INFO] Couldn't find gradio... installing it.")
#     !pip install gradio
#     import gradio as gr

# try:
#     from torchinfo import summary
# except:
#     print("[INFO] Couldn't find torchinfo... installing it.")
#     !pip install -q torchinfo
#     from torchinfo import summary

try:
    from helper_functions import download_data, set_seeds
except:
    # Get the going_modular scripts
    print("[INFO] Couldn't find going_modular or helper_functions scripts... downloading them from GitHub.")
    !git clone https://github.com/mrdbourke/pytorch-deep-learning
    !mv pytorch-deep-learning/helper_functions.py . # get the helper_functions.py script
    !rm -rf pytorch-deep-learning
    from helper_functions import download_data, set_seeds

ModuleNotFoundError: No module named 'matplotlib'

In [None]:
def preprocess_caption(text):
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub('\s+', ' ', text).strip()
    return f"<bos> {text} <eos>"

In [None]:
def create_data_flickr(input_directory):

    # Đường dẫn tới các tập tin dữ liệu
    captions_path = os.path.join(input_directory, "captions.txt")
    image_path = os.path.join(input_directory, "Images/flickr30k_images")

    # Đọc file captions.txt
    data = pd.read_csv(captions_path, sep=',', header=None, names=['image', 'caption'], on_bad_lines='skip')

    # Loại bỏ hàng đầu tiên nếu là tiêu đề
    data = data[1:]
    data = data.dropna()

    # Thêm đường dẫn đầy đủ cho các ảnh
    data['image'] = data['image'].apply(lambda x: os.path.join(image_path, x))

    # Đặt lại chỉ số sau khi xử lý
    data = data.reset_index(drop=True)

    # Preprocess captions (nếu cần)
    data['caption'] = data['caption'].apply(preprocess_caption)

    return data


def create_data_coco(input_directory="/kaggle/input/coco-2017-dataset"):

    # Đường dẫn dữ liệu
    train_images = os.path.join(input_directory, "train2017")
    val_images = os.path.join(input_directory, "val2017")
    test_images = os.path.join(input_directory, "test2017")  # Test không có chú thích
    train_annotations = os.path.join(input_directory, "annotations/captions_train2017.json")
    val_annotations = os.path.join(input_directory, "annotations/captions_val2017.json")

    # Xử lý train
    coco_train = COCO(train_annotations)
    train_data = []
    for image_id in coco_train.getImgIds():
        img_info = coco_train.loadImgs(image_id)[0]
        captions = coco_train.loadAnns(coco_train.getAnnIds(imgIds=image_id))
        for caption in captions:
            train_data.append({"image": f"{train_images}/{img_info['file_name']}", "caption": caption['caption']})
    train_df = pd.DataFrame(train_data)

    # Xử lý val
    coco_val = COCO(val_annotations)
    val_data = []
    for image_id in coco_val.getImgIds():
        img_info = coco_val.loadImgs(image_id)[0]
        captions = coco_val.loadAnns(coco_val.getAnnIds(imgIds=image_id))
        for caption in captions:
            val_data.append({"image": f"{val_images}/{img_info['file_name']}", "caption": caption['caption']})
    val_df = pd.DataFrame(val_data)

    # Xử lý test (nếu cần)
    test_images_list = []
    if os.path.exists(test_images):
        for img_file in os.listdir(test_images):
            test_images_list.append(os.path.join(test_images, img_file))

    # Chia nhỏ tập train và val
    train_df['caption'] = train_df['caption'].apply(preprocess_caption)
    val_df['caption'] = val_df['caption'].apply(preprocess_caption)

    return train_df, val_df, test_images_list

In [None]:
# Custom Dataset
class ImageCaptionDataset(Dataset):
    def __init__(self, dataframe, image_transform=None, tokenizer=None, max_length=30):
        self.data = dataframe
        self.image_transform = image_transform
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data.iloc[idx, 0]
        caption = self.data.iloc[idx, 1]

        # Load and transform image
        image = Image.open(img_path).convert("RGB")
        image = self.image_transform(image)

        # Tokenize the caption using tokenizer
        tokenized = self.tokenizer(
            caption,
            padding="max_length",  # Pad to max_length
            truncation=True,       # Truncate to max_length
            max_length=self.max_length,
            return_tensors="pt"   # Return PyTorch tensors
        )

        # Extract input IDs
        token_ids = tokenized["input_ids"].squeeze(0)  # Remove batch dimension

        return image, token_ids

max_length = 40
image_size = 224

# Transforms
# image_transforms = transforms.Compose([
#     transforms.Resize((image_size, image_size)),
#     transforms.ToTensor(),
# ])

image_transforms = transforms.Compose([
    transforms.Resize(256),  # Resize ngắn nhất thành 256 để giữ tỷ lệ
    transforms.CenterCrop(image_size),  # Cắt trung tâm ảnh để đảm bảo kích thước
    transforms.ToTensor(),  # Chuyển thành tensor
])

# Đọc dữ liệu
input_directory = "/kaggle/input/flickr30k"
data_test = create_data_flickr(input_directory=input_directory)

input_directory = "/kaggle/input/coco-2017-dataset/coco2017"
data_train, data_val, test_images_coco = create_data_coco(input_directory=input_directory)

# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.add_special_tokens({
    'bos_token': '<bos>',
    'eos_token': '<eos>',
    'pad_token': '<pad>'
})

# Dataset and DataLoader
train_dataset = ImageCaptionDataset(data_train, image_transform=image_transforms, tokenizer=tokenizer, max_length=max_length)
val_dataset = ImageCaptionDataset(data_val, image_transform=image_transforms, tokenizer=tokenizer, max_length=max_length)
test_dataset = ImageCaptionDataset(data_test, image_transform=image_transforms, tokenizer=tokenizer, max_length=max_length)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=os.cpu_count(), pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=os.cpu_count(), pin_memory=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=os.cpu_count(), pin_memory=True, prefetch_factor=4)

In [None]:
# Token Embedding
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.scale = sqrt(embedding_dim)

    def forward(self, tokens):
        return self.embedding(tokens) * self.scale
        
# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, embedding_dim, dropout=0.1, maxlen=100):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(-torch.arange(0, embedding_dim, 2) * math.log(10000) / embedding_dim)
        pos = torch.arange(0, maxlen).unsqueeze(1)
        pos_embedding = torch.zeros(maxlen, embedding_dim)
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(0)
        self.dropout = nn.Dropout(p=dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, x):
        return self.dropout(x + self.pos_embedding[:, :x.size(1), :])

class ImageCaptionModel(nn.Module):
    def __init__(
        self,
        model_name,
        embedding_dim=768,
        vocab_size=5000,
        num_heads_decoder=8,
        num_transformer_decoder_layers=6,
        ffn_dim=2048,
        dropout_decoder=0.1,
    ):
        """
        Initialize the ImageCaptionModel.

        Args:
            encoder: Vision encoder model (e.g., Swin Transformer, ViT).
            embedding_dim: Dimension of embeddings.
            vocab_size: Size of the vocabulary.
            num_heads_decoder: Number of attention heads in the decoder.
            num_transformer_decoder_layers: Number of layers in the decoder.
            ffn_dim: Dimension of the feed-forward network in the decoder.
            dropout_decoder: Dropout rate in the decoder.
        """
        super().__init__()
        # Swin Transformer Encoder
        self.encoder = timm.create_model(model_name, pretrained=True, num_classes=0)
        self.input_dim = self.encoder.num_features
        self.linear_proj = nn.Linear(self.input_dim, embedding_dim)

        # Token Embedding
        self.token_embedding = TokenEmbedding(vocab_size, embedding_dim)

        # Positional Encoding
        self.positional_encoding = PositionalEncoding(embedding_dim, dropout=dropout_decoder)

        # Transformer Decoder
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=embedding_dim,
            nhead=num_heads_decoder,
            dim_feedforward=ffn_dim,
            dropout=dropout_decoder,
        )
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_transformer_decoder_layers)

        # Linear layer to project to vocabulary size
        self.generator = nn.Linear(embedding_dim, vocab_size)

    def encode(self, img):
        """
        Encodes the image using the Swin Transformer encoder.
        """
        img = img.to(next(self.encoder.parameters()).device) # Ensure data is on the same device as encoder
        features = self.encoder.forward_features(img)  # [Batch, H, W, C]

        # Permute to [Batch, C, H, W] for downstream processing
        features = features.permute(0, 3, 1, 2)  # [Batch, C, H, W]

        # Flatten spatial dimensions
        features = features.flatten(2).permute(0, 2, 1)  # [Batch, seq_len, C]

        # Project features to embedding dimension
        memory = self.linear_proj(features)  # [Batch, seq_len, embed_dim]
        return memory

    def decode(self, tokens, memory, tgt_mask, tgt_padding_mask=None):
        """
        Decodes the target sequence using the Transformer decoder.

        Args:
            tokens: Target sequence tokens of shape [batch_size, tgt_seq_len].
            memory: Encoded image features of shape [batch_size, num_patches, embedding_dim].
            tgt_mask: Causal mask for the target sequence.
            tgt_padding_mask: Padding mask for the target sequence.

        Returns:
            decoder_output: Decoded output of shape [batch_size, tgt_seq_len, embedding_dim].
        """
        # Token embedding + positional encoding
        tgt_emb = self.token_embedding(tokens)
        tgt_emb = self.positional_encoding(tgt_emb)

        decoder_output = self.decoder(
            tgt_emb.permute(1, 0, 2),  # [tgt_seq_len, batch_size, embed_dim]
            memory.permute(1, 0, 2),  # [num_patches, batch_size, embed_dim]
            tgt_mask=tgt_mask,
            tgt_key_padding_mask=tgt_padding_mask,
        )
        return decoder_output

    def forward(self, img, tokens, tgt_mask, tgt_padding_mask=None):
        """
        Forward pass for the model.

        Args:
            img: Input image tensor of shape [batch_size, 3, H, W].
            tokens: Target sequence tokens of shape [batch_size, tgt_seq_len].
            tgt_mask: Causal mask for the target sequence.
            tgt_padding_mask: Padding mask for the target sequence.

        Returns:
            logits: Predicted logits of shape [batch_size, tgt_seq_len, vocab_size].
        """
        memory = self.encode(img)
        decoded = self.decode(tokens, memory, tgt_mask, tgt_padding_mask)
        logits = self.generator(decoded.permute(1, 0, 2))  # Project to vocabulary
        return logits

    def generate_square_subsequent_mask(self, seq_len, device):
      tgt_mask = (torch.triu(torch.ones((seq_len, seq_len), device=device)) == 1).transpose(0, 1)
      tgt_mask = tgt_mask.float().masked_fill(tgt_mask == 0, float('-inf')).masked_fill(tgt_mask == 1, float(0.0))
      return tgt_mask

    def create_mask(self, tgt, pad_token_id, device):
      tgt_seq_len = tgt.shape[1]
      # Attention mask (float)
      attention_mask = self.generate_square_subsequent_mask(tgt_seq_len, device=device)
      # Padding mask (float)
      padding_mask = (tgt == tokenizer.pad_token_id).float().to(device)
      return attention_mask, padding_mask

    def calculate_accuracy(self, logits, targets):
        """
        Calculate token-level accuracy.

        Args:
            logits: Predicted logits of shape [batch_size, tgt_seq_len, vocab_size].
            targets: Target tokens of shape [batch_size, tgt_seq_len].

        Returns:
            accuracy: Token-level accuracy.
        """
        predictions = logits.argmax(dim=-1)
        correct = (predictions == targets).float()
        mask = (targets != 0).float()  # Exclude padding tokens
        accuracy = (correct * mask).sum() / mask.sum()
        return accuracy.item()

    def train_epoch(self, dataloader, optimizer, criterion, tokenizer, device, clip_norm):
        self.train()
        total_loss = 0
        total_accuracy = 0
        scaler = GradScaler()
    
        for image_tensor, target_sequence in tqdm(dataloader, desc="Training", unit="batch"):
            image_tensor, target_sequence = image_tensor.to(device), target_sequence.to(device)
            decoder_input = target_sequence[:, :-1]
            decoder_target = target_sequence[:, 1:]
    
            tgt_mask, tgt_padding_mask = self.create_mask(decoder_input, tokenizer.pad_token_id, device)
            optimizer.zero_grad()
    
            with autocast():
                logits = self.forward(image_tensor, decoder_input, tgt_mask, tgt_padding_mask)
                loss = criterion(logits.view(-1, logits.size(-1)), decoder_target.reshape(-1))
                accuracy = self.calculate_accuracy(logits, decoder_target)  # Compute accuracy
    
            scaler.scale(loss).backward()
            scaler.unscale_(optimizer)
            nn.utils.clip_grad_norm_(self.parameters(), clip_norm)
            scaler.step(optimizer)
            scaler.update()
    
            total_loss += loss.item()
            total_accuracy += accuracy  # Accumulate accuracy
    
        avg_loss = total_loss / len(dataloader)
        avg_accuracy = total_accuracy / len(dataloader)
        return avg_loss, avg_accuracy

    def validate_epoch(self, dataloader, criterion, tokenizer, device):
        self.eval()
        total_loss = 0
        total_accuracy = 0
        num_batches = len(dataloader)

        with torch.no_grad():
            for image_tensor, target_sequence in tqdm(dataloader, desc="Validation", unit="batch"):
                image_tensor, target_sequence = image_tensor.to(device), target_sequence.to(device)

                # Prepare inputs for Transformer
                tgt_input = target_sequence[:, :-1]
                tgt_output = target_sequence[:, 1:]
                tgt_mask, tgt_padding_mask = self.create_mask(tgt_input, tokenizer.pad_token_id, device)

                logits = self(image_tensor, tgt_input, tgt_mask, tgt_padding_mask)
                loss = criterion(logits.view(-1, logits.size(-1)), tgt_output.reshape(-1))

                # Calculate metrics
                total_loss += loss.item()
                total_accuracy += model.calculate_accuracy(logits, tgt_output)

        avg_loss = total_loss / num_batches
        avg_accuracy = total_accuracy / num_batches
        return avg_loss, avg_accuracy

    def plot_loss_curves(self, train_losses, val_losses, train_accuracies, val_accuracies):

        plt.figure(figsize=(10, 5))
        plt.plot(train_accuracies, label="Train accuracy")
        plt.plot(val_accuracies, label="Validation accuracy")
        plt.xlabel("Epochs")
        plt.ylabel("Accuracy")
        plt.title("Accuracy Curves")
        plt.legend()
        plt.show()

        plt.figure(figsize=(10, 5))
        plt.plot(train_losses, label="Train Loss")
        plt.plot(val_losses, label="Validation Loss")
        plt.xlabel("Epochs")
        plt.ylabel("Loss")
        plt.title("Loss Curves")
        plt.legend()
        plt.show()

In [None]:
set_seeds()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Đường dẫn đến file trong thư mục Input (mô hình đã tải lên trước)
input_model_path = "/kaggle/input/model/pytorch/default/1/swin_transformerdecoder.pth"

# Đường dẫn lưu file vào thư mục Output (Working)
output_model_path = "/kaggle/working/swin_transformerdecoder.pth"

# Sao chép mô hình từ Input sang Working để tiếp tục huấn luyện
if os.path.exists(input_model_path):
    print("Copying model from Input to Working directory...")
    shutil.copy(input_model_path, output_model_path)
    print(f"Model copied to {output_model_path}")
else:
    print("No model found in Input. Training from scratch...")

# Initialize model, optimizer, and scheduler
def initialize_model_and_optimizers(checkpoint_path, tokenizer, device,
                                    model_name = "swin_large_patch4_window7_224",
                                    embedding_dim=512,
                                    vocab_size=None,
                                    num_heads_decoder=8,
                                    num_transformer_decoder_layers=6,
                                    ffn_dim=2048,
                                    dropout_decoder=0.1):
    if vocab_size is None:
        vocab_size = len(tokenizer)

    # Kiểm tra nếu checkpoint tồn tại trong working directory
    if os.path.exists(checkpoint_path):
        print("Checkpoint found. Loading model and optimizer...")
        checkpoint = torch.load(checkpoint_path)

        # Tạo mô hình từ checkpoint
        model = ImageCaptionModel(
            model_name=checkpoint["model_name"],
            embedding_dim=checkpoint["embedding_dim"],
            vocab_size=checkpoint["vocab_size"],
            num_heads_decoder=checkpoint["num_heads_decoder"],
            num_transformer_decoder_layers=checkpoint["num_transformer_decoder_layers"],
            ffn_dim=checkpoint["ffn_dim"],
            dropout_decoder=checkpoint["dropout_decoder"],
        ).to(device)

        # Load trạng thái mô hình
        model.load_state_dict(checkpoint["model_state_dict"])

        # Tạo optimizer và scheduler
        optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
        optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
        scheduler = CosineAnnealingLR(optimizer, T_max=10)
        scheduler.load_state_dict(checkpoint["scheduler_state_dict"])

        # Lấy thông tin từ checkpoint
        start_epoch = checkpoint["epoch"] + 1
        train_losses = checkpoint["train_losses"]
        val_losses = checkpoint["val_losses"]
        train_accuracies = checkpoint["train_accuracies"]
        val_accuracies = checkpoint["val_accuracies"]

        print(f"Resuming training from epoch {start_epoch}.")
    else:
        print("No checkpoint found. Training from scratch...")

        # Tạo mô hình mới
        model = ImageCaptionModel(
            model_name=model_name,
            embedding_dim=embedding_dim,
            vocab_size=vocab_size,
            num_heads_decoder=num_heads_decoder,
            num_transformer_decoder_layers=num_transformer_decoder_layers,
            ffn_dim=ffn_dim,
            dropout_decoder=dropout_decoder,
        ).to(device)

        # Tạo optimizer và scheduler
        optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
        scheduler = CosineAnnealingLR(optimizer, T_max=10)

        # Khởi tạo từ đầu
        start_epoch = 1
        train_losses, val_losses = [], []
        train_accuracies, val_accuracies = [], []

    return model, optimizer, scheduler, start_epoch, train_losses, val_losses, train_accuracies, val_accuracies

def train_model(model, criterion, optimizer, scheduler, start_epoch, train_loader, val_loader, tokenizer, device, checkpoint_path, epochs, clip_norm, train_losses, val_losses, train_accuracies, val_accuracies):

    for epoch in range(start_epoch, epochs + 1):
        print(f"\nEpoch {epoch}/{epochs}")
        train_loss, train_accuracy = model.train_epoch(train_loader, optimizer, criterion, tokenizer, device, clip_norm)
        val_loss, val_accuracy = model.validate_epoch(val_loader, criterion, tokenizer, device)

        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)

        print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_accuracy:.4f} | "
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_accuracy:.4f}")

        scheduler.step()

        # Lưu checkpoint sau mỗi epoch vào thư mục Output (Working)
        checkpoint = {
            "epoch": epoch,
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "scheduler_state_dict": scheduler.state_dict(),
            "train_losses": train_losses,
            "val_losses": val_losses,
            "train_accuracies": train_accuracies,
            "val_accuracies": val_accuracies,
            "model_name": "swin_large_patch4_window7_224",
            "embedding_dim": 512,
            "vocab_size": len(tokenizer),
            "num_heads_decoder": 8,
            "num_transformer_decoder_layers": 6,
            "ffn_dim": 2048,
            "dropout_decoder": 0.1,
        }
        torch.save(checkpoint, checkpoint_path)
        print(f"Checkpoint saved at {checkpoint_path}")

    return train_losses, val_losses, train_accuracies, val_accuracies

In [None]:
model, optimizer, scheduler, start_epoch, train_losses, val_losses, train_accuracies, val_accuracies = initialize_model_and_optimizers(
    checkpoint_path=output_model_path, tokenizer=tokenizer, device=device
)

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
train_losses, val_losses, train_accuracies, val_accuracies = train_model(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    start_epoch=start_epoch,
    train_loader=train_loader,
    val_loader=val_loader,
    tokenizer=tokenizer,
    device=device,
    checkpoint_path=output_model_path,
    epochs=1,
    clip_norm=5.0,
    train_losses=train_losses,
    val_losses=val_losses,
    train_accuracies=train_accuracies,
    val_accuracies=val_accuracies,
)

In [None]:
# Hàm chuẩn hóa đầu vào
def normalize_caption(caption):
    """Normalize captions by lowercasing, removing punctuation, and tokenizing."""
    caption = caption.lower()  # Lowercase
    caption = caption.translate(str.maketrans('', '', string.punctuation))  # Remove punctuation
    tokens = word_tokenize(caption)  # Tokenize
    return ' '.join(tokens)

# Hàm loại bỏ các reference rỗng hoặc không hợp lệ
def clean_references(reference_captions):
    """Remove empty or invalid references from a list."""
    return [caption for caption in reference_captions if caption.strip()]

# Hàm sinh caption
def generate_caption(model, image_tensor, tokenizer, max_length=35):
    """Generate captions for given images."""
    model.eval()
    image_tensor = image_tensor.to(device)

    with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):  # Mixed Precision
        memory = model.encode(image_tensor)
        batch_size = memory.shape[0]
        generated_tokens = torch.full(
            (batch_size, 1), tokenizer.bos_token_id, dtype=torch.long, device=device
        )

        for _ in range(max_length):
            tgt_mask = model.generate_square_subsequent_mask(generated_tokens.size(1), device=device)
            tgt_emb = model.embedding(generated_tokens).permute(1, 0, 2)
            decoder_output = model.decode(generated_tokens, memory, tgt_mask=tgt_mask)
            next_token_logits = model.generator(decoder_output[-1, :, :])
            next_token = next_token_logits.argmax(dim=-1, keepdim=True)

            if (next_token == tokenizer.eos_token_id).all():
                break

            generated_tokens = torch.cat([generated_tokens, next_token], dim=1)

    captions = [
        tokenizer.decode(generated_token.squeeze().tolist(), skip_special_tokens=True)
        for generated_token in generated_tokens
    ]

    return [normalize_caption(caption) for caption in captions]  # Chuẩn hóa caption

# Hàm đánh giá trên tập test
def evaluate_on_test_set(model, test_loader, tokenizer):
    """Evaluate the model on the test dataset."""
    model.eval()
    start_time = time.time()

    # Initialize scorers
    
    bleu_scorer = Bleu(4)
    meteor_scorer = Meteor()
    rouge_scorer = Rouge()
    cider_scorer = Cider()

    # Prepare gts and res
    gts = {}
    res = {}

    with torch.no_grad(), torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
        for batch_idx, (images, token_ids) in enumerate(tqdm(test_loader, desc="Evaluating test set")):
            images = images.to(device, non_blocking=True)
            token_ids = [caption.to(device, non_blocking=True) for caption in token_ids]

            # Generate captions
            predicted_captions = generate_caption(model, images, tokenizer)

            for idx, (predicted_caption, reference_tokens) in enumerate(zip(predicted_captions, token_ids)):
                global_idx = batch_idx * test_loader.batch_size + idx

                # Decode and normalize reference captions
                reference_captions = [
                    normalize_caption(tokenizer.decode(ref, skip_special_tokens=True)) for ref in reference_tokens
                ]
                reference_captions = clean_references(reference_captions)

                if not reference_captions:
                    continue

                # Add to gts and res
                gts[str(global_idx)] = reference_captions
                res[str(global_idx)] = [predicted_caption]

    # Calculate scores
    bleu_scores, _ = bleu_scorer.compute_score(gts, res)
    meteor_score, _ = meteor_scorer.compute_score(gts, res)
    rouge_score, _ = rouge_scorer.compute_score(gts, res)
    cider_score, _ = cider_scorer.compute_score(gts, res)

    # Collect results
    scores = {
        "BLEU-1": bleu_scores[0],
        "BLEU-2": bleu_scores[1],
        "BLEU-3": bleu_scores[2],
        "BLEU-4": bleu_scores[3],
        "METEOR": meteor_score,
        "ROUGE-L": rouge_score,
        "CIDEr": cider_score
    }

    print(f"Evaluation completed in {time.time() - start_time:.2f}s")
    return scores

# Đánh giá
test_scores = evaluate_on_test_set(model, test_loader, tokenizer)
print("Test set evaluation scores:", test_scores)

In [None]:
rand = [random.randint(0, len(data_test) - 1) for _ in range(5)]

for i in rand:
    row = data_test.iloc[i]
    image_path = row['image']  # Đường dẫn ảnh
    actual_caption = row['caption']  # Caption thực tế

    # Load image
    plt.figure(figsize=(10, 5))
    img = Image.open(image_path)
    plt.imshow(img)
    plt.axis("off")
    plt.title(f"Actual: {actual_caption}", fontsize=12, color="green")
    plt.show()

    # Transform the image
    img = img.convert("RGB")
    image_tensor = image_transforms(img).unsqueeze(0).to(device)

    # Generate caption
    predicted_caption = generate_caption(model, image_tensor, tokenizer, max_length=35)

    # Fetch reference captions
    reference_captions = data_test[data_test['image'] == image_path]['caption'].tolist()

    # Display captions
    print("Predicted Caption:", predicted_caption)
    if reference_captions:
        print("Reference Captions:")
        for ref in reference_captions:
            print("-", ref)
    else:
        print("No reference captions found for this image.")

    print("\n" + "=" * 50 + "\n")

In [3]:
from transformers import AutoTokenizer 
import os

# Load tokenizer từ mô hình đã sử dụng
tokenizer = AutoTokenizer.from_pretrained("gpt2")

# Tạo thư mục nếu chưa có
os.makedirs("tokenizer", exist_ok=True)

# Lưu tokenizer
tokenizer.save_pretrained("tokenizer")

print("✅ Tokenizer đã được lưu vào thư mục 'tokenizer'")


  from .autonotebook import tqdm as notebook_tqdm
To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


✅ Tokenizer đã được lưu vào thư mục 'tokenizer'


In [2]:
pip install transformers


Collecting transformersNote: you may need to restart the kernel to use updated packages.

  Downloading transformers-4.48.3-py3-none-any.whl.metadata (44 kB)
Collecting regex!=2019.12.17 (from transformers)
  Downloading regex-2024.11.6-cp310-cp310-win_amd64.whl.metadata (41 kB)
Collecting tokenizers<0.22,>=0.21 (from transformers)
  Downloading tokenizers-0.21.0-cp39-abi3-win_amd64.whl.metadata (6.9 kB)
Downloading transformers-4.48.3-py3-none-any.whl (9.7 MB)
   ---------------------------------------- 0.0/9.7 MB ? eta -:--:--
   - -------------------------------------- 0.3/9.7 MB ? eta -:--:--
   --- ------------------------------------ 0.8/9.7 MB 3.3 MB/s eta 0:00:03
   --- ------------------------------------ 0.8/9.7 MB 3.3 MB/s eta 0:00:03
   ----- ---------------------------------- 1.3/9.7 MB 1.4 MB/s eta 0:00:06
   ---------- ----------------------------- 2.6/9.7 MB 2.4 MB/s eta 0:00:03
   --------------- ------------------------ 3.7/9.7 MB 2.8 MB/s eta 0:00:03
   -------------


[notice] A new release of pip is available: 25.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import torchvision.models as models

model = models.swin_v2_s(weights="IMAGENET1K_V1")
print(model)


SwinTransformer(
  (features): Sequential(
    (0): Sequential(
      (0): Conv2d(3, 96, kernel_size=(4, 4), stride=(4, 4))
      (1): Permute()
      (2): LayerNorm((96,), eps=1e-05, elementwise_affine=True)
    )
    (1): Sequential(
      (0): SwinTransformerBlockV2(
        (norm1): LayerNorm((96,), eps=1e-05, elementwise_affine=True)
        (attn): ShiftedWindowAttentionV2(
          (qkv): Linear(in_features=96, out_features=288, bias=True)
          (proj): Linear(in_features=96, out_features=96, bias=True)
          (cpb_mlp): Sequential(
            (0): Linear(in_features=2, out_features=512, bias=True)
            (1): ReLU(inplace=True)
            (2): Linear(in_features=512, out_features=3, bias=False)
          )
        )
        (stochastic_depth): StochasticDepth(p=0.0, mode=row)
        (norm2): LayerNorm((96,), eps=1e-05, elementwise_affine=True)
        (mlp): MLP(
          (0): Linear(in_features=96, out_features=384, bias=True)
          (1): GELU(approximate='