In [1]:
import torch

# Memeriksa apakah GPU tersedia dan PyTorch menggunakan GPU
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

print(f"Using device: {device}")

Using device: cuda


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision import models
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import nltk
import pandas as pd
from sklearn.model_selection import train_test_split
from nltk.tokenize import word_tokenize
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.meteor_score import meteor_score
from rouge_score import rouge_scorer
from pycocoevalcap.cider.cider import Cider
from pycocoevalcap.spice.spice import Spice
import matplotlib.pyplot as plt


nltk.download("punkt")

# Konfigurasi GPU jika tersedia
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\dimas\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [3]:
print(f"Is CUDA supported by this system? {torch.cuda.is_available()}")
print(f"CUDA version: {torch.version.cuda}")

# Storing ID of current CUDA device
cuda_id = torch.cuda.current_device()
print(f"ID of current CUDA device: {torch.cuda.current_device()}")

print(f"Name of current CUDA device: {torch.cuda.get_device_name(cuda_id)}")

Is CUDA supported by this system? True
CUDA version: 12.1
ID of current CUDA device: 0
Name of current CUDA device: NVIDIA GeForce RTX 4050 Laptop GPU


In [4]:
# Load the CSV file to see its structure
csv_file = "./dataset/data_caption/flickr_annotations_30k.csv"
df = pd.read_csv(csv_file)
print(df.head())

                                                 raw               sentids  \
0  ["Two young guys with shaggy hair look at thei...       [0, 1, 2, 3, 4]   
1  ["Several men in hard hats are operating a gia...       [5, 6, 7, 8, 9]   
2  ["A child in a pink dress is climbing up a set...  [10, 11, 12, 13, 14]   
3  ["Someone in a blue shirt and hat is standing ...  [15, 16, 17, 18, 19]   
4  ["Two men, one in a gray shirt, one in a black...  [20, 21, 22, 23, 24]   

   split        filename  img_id  
0  train  1000092795.jpg       0  
1  train    10002456.jpg       1  
2  train  1000268201.jpg       2  
3  train  1000344755.jpg       3  
4  train  1000366164.jpg       4  


In [5]:
# Dataset dan Dataloader
class Flickr30kDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
        self.data = self._create_image_caption_pairs()

    def _create_image_caption_pairs(self):
        data = []
        for idx in range(len(self.annotations)):
            img_name = self.annotations.iloc[idx]["filename"]
            captions = self.annotations.iloc[idx]["raw"].split(
                ","
            )  # Split the captions by comma
            for caption in captions:
                data.append((img_name, caption.strip()))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name, caption = self.data[idx]
        img_path = os.path.join(self.root_dir, img_name)
        try:
            image = Image.open(img_path).convert("RGB")
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            raise e

        if self.transform:
            image = self.transform(image)

        tokens = word_tokenize(caption.lower())

        return image, tokens


# Transformasi untuk dataset
transform = transforms.Compose(
    [
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
    ]
)

# Membuat objek dataset dan memisahkan dataset menjadi train dan val
csv_file = "./dataset/data_caption/flickr_annotations_30k.csv"  # Ganti dengan path file CSV Anda
root_dir = (
    "./dataset/data_image/flickr30k-images/"  # Ganti dengan path direktori gambar Anda
)

dataset = Flickr30kDataset(csv_file=csv_file, root_dir=root_dir, transform=transform)
train_indices, val_indices = train_test_split(
    list(range(len(dataset))), test_size=0.2, random_state=42
)
train_set = torch.utils.data.Subset(dataset, train_indices)
val_set = torch.utils.data.Subset(dataset, val_indices)

# Membuat DataLoader
train_loader = DataLoader(train_set, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_set, batch_size=32, shuffle=False, num_workers=4)

# Membuat vocab
vocab = {"<PAD>": 0, "<UNK>": 1}  # Awal dari vocab dengan token spesial
for idx, token in enumerate(
    set(word_tokenize(" ".join(dataset.annotations["raw"].values).lower())), 2
):
    vocab[token] = idx


# Fungsi untuk mengonversi token caption menjadi tensor
def tokens_to_tensor(tokens, vocab):
    indices = [vocab[token] if token in vocab else vocab["<UNK>"] for token in tokens]
    return torch.tensor(indices, dtype=torch.long)


# Fungsi untuk batch collate
def collate_fn(batch):
    images, captions = zip(*batch)
    max_len = max(len(caption) for caption in captions)
    captions_padded = [
        tokens_to_tensor(caption, vocab).tolist()
        + [vocab["<PAD>"]] * (max_len - len(caption))
        for caption in captions
    ]
    images = torch.stack(images, dim=0)
    captions_padded = torch.tensor(captions_padded, dtype=torch.long)
    return images, captions_padded


train_loader = DataLoader(
    train_set, batch_size=32, shuffle=True, num_workers=4, collate_fn=collate_fn
)
val_loader = DataLoader(
    val_set, batch_size=32, shuffle=False, num_workers=4, collate_fn=collate_fn
)

In [None]:
# Model DFEN
class ImageEnhance(nn.Module):
    def __init__(self, resnet_model):
        super(ImageEnhance, self).__init__()
        self.resnet = resnet_model
        self.conv3 = nn.Sequential(*list(resnet_model.children())[:5])
        self.conv4 = nn.Sequential(*list(resnet_model.children())[5])
        self.conv5 = nn.Sequential(*list(resnet_model.children())[6])
        self.attention = nn.Sequential(
            nn.Linear(512, 256), nn.ReLU(), nn.Linear(256, 512), nn.Softmax(dim=1)
        )

    def forward(self, x):
        conv3_features = self.conv3(x)
        conv4_features = self.conv4(conv3_features)
        conv5_features = self.conv5(conv4_features)
        attention_weights = self.attention(
            conv5_features.view(conv5_features.size(0), -1)
        )
        enhanced_features = attention_weights * conv5_features.view(
            conv5_features.size(0), -1
        )
        return enhanced_features


class TextEnhance(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size):
        super(TextEnhance, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, features, captions):
        embeddings = self.embedding(captions)
        lstm_out, _ = self.lstm(embeddings)
        outputs = self.fc(lstm_out)
        return outputs


class DFEN(nn.Module):
    def __init__(self, resnet_model, vocab_size, embed_size, hidden_size):
        super(DFEN, self).__init__()
        self.image_enhance = ImageEnhance(resnet_model)
        self.text_enhance = TextEnhance(vocab_size, embed_size, hidden_size)

    def forward(self, images, captions):
        enhanced_features = self.image_enhance(images)
        outputs = self.text_enhance(enhanced_features, captions)
        return outputs


# Inisialisasi model dengan ResNet dan parameter lainnya
resnet_model = models.resnet50()
vocab_size = len(vocab)
embed_size = 256
hidden_size = 512

model = DFEN(resnet_model, vocab_size, embed_size, hidden_size).to(device)

In [None]:
# Pelatihan Model
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    for i, (images, captions) in enumerate(train_loader):
        images = images.to(device)
        captions = captions.to(device)

        # Forward pass
        outputs = model(images, captions)
        loss = criterion(outputs.view(-1, vocab_size), captions.view(-1))

        # Backward pass dan optimasi
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i + 1) % 100 == 0:
            print(
                f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}"
            )

In [None]:
# Evaluate the model
def evaluate_model(model, val_loader):
    model.eval()
    bleu_scores = []
    meteor_scores = []
    rouge_scores = []
    cider_scores = []
    spice_scores = []

    cider_scorer = Cider()
    spice_scorer = Spice()

    with torch.no_grad():
        for images, captions in val_loader:
            images = images.to(device)
            captions = captions.to(device)

            outputs = model(images, captions)
            predicted_captions = torch.argmax(outputs, dim=2)

            for i in range(images.size(0)):
                reference = captions[i].cpu().numpy()
                hypothesis = predicted_captions[i].cpu().numpy()

                # Konversi numpy array ke list of tokens
                reference = [str(x) for x in reference]
                hypothesis = [str(x) for x in hypothesis]

                # BLEU
                bleu_score = sentence_bleu([reference], hypothesis)
                bleu_scores.append(bleu_score)

                # METEOR
                meteor = meteor_score([" ".join(reference)], " ".join(hypothesis))
                meteor_scores.append(meteor)

                # ROUGE
                rouge_scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=True)
                rouge_score = rouge_scorer.score(
                    " ".join(reference), " ".join(hypothesis)
                )["rougeL"].fmeasure
                rouge_scores.append(rouge_score)

                # CIDEr dan SPICE
                cider_score, _ = cider_scorer.compute_score(
                    {i: [" ".join(reference)]}, {i: [" ".join(hypothesis)]}
                )
                cider_scores.append(cider_score)

                spice_score, _ = spice_scorer.compute_score(
                    {i: [" ".join(reference)]}, {i: [" ".join(hypothesis)]}
                )
                spice_scores.append(spice_score)

    # Menghitung rata-rata skor untuk setiap metrik
    avg_bleu = sum(bleu_scores) / len(bleu_scores)
    avg_meteor = sum(meteor_scores) / len(meteor_scores)
    avg_rouge = sum(rouge_scores) / len(rouge_scores)
    avg_cider = sum(cider_scores) / len(cider_scores)
    avg_spice = sum(spice_scores) / len(spice_scores)

    print(
        f"BLEU: {avg_bleu:.4f}, METEOR: {avg_meteor:.4f}, ROUGE: {avg_rouge:.4f}, CIDEr: {avg_cider:.4f}, SPICE: {avg_spice:.4f}"
    )

In [None]:
def show_predictions(model, val_loader, vocab, num_examples=5):
    model.eval()
    examples_shown = 0
    with torch.no_grad():
        for images, captions in val_loader:
            images = images.to(device)
            captions = captions.to(device)

            outputs = model(images, captions)
            predicted_captions = torch.argmax(outputs, dim=2)

            for i in range(images.size(0)):
                if examples_shown >= num_examples:
                    break

                reference = captions[i].cpu().numpy()
                hypothesis = predicted_captions[i].cpu().numpy()

                # Konversi numpy array ke list of tokens
                reference = [
                    idx_to_word(x, vocab) for x in reference if x != vocab["<PAD>"]
                ]
                hypothesis = [
                    idx_to_word(x, vocab) for x in hypothesis if x != vocab["<PAD>"]
                ]

                # Menampilkan gambar dan caption
                image = images[i].cpu().permute(1, 2, 0).numpy()
                plt.imshow(image)
                plt.title(
                    f"Prediction: {' '.join(hypothesis)}\nGround Truth: {' '.join(reference)}"
                )
                plt.axis("off")
                plt.show()

                examples_shown += 1


def idx_to_word(idx, vocab):
    # Fungsi untuk mengonversi indeks menjadi kata
    for word, index in vocab.items():
        if index == idx:
            return word
    return "<UNK>"


# Menampilkan beberapa prediksi dan ground truth dari dataset validasi
show_predictions(model, val_loader, vocab, num_examples=5)