In [340]:
# Import library yang dibutuhkan
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

In [341]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim  # Add optim here
from collections import Counter
from torchvision import transforms
from torch.nn.utils.rnn import pad_sequence
import itertools
import numpy as np

Mac GPU Accelerator check

In [342]:
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

tensor([1.], device='mps:0')


In [343]:
# Define device
device = torch.device("mps")

In [360]:
class Vocabulary:
    def __init__(self, freq_threshold=5):
        # freq_threshold: Batas frekuensi minimum kata yang harus ada dalam vocabulary
        # Kata yang frekuensinya di bawah threshold akan digantikan dengan token <UNK>
        self.freq_threshold = freq_threshold
        
        # Kamus untuk memetakan indeks ke kata (contoh: 0 -> "<PAD>", 1 -> "<SOS>")
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        
        # Kamus untuk memetakan kata ke indeks (contoh: "<PAD>" -> 0, "<SOS>" -> 1)
        self.stoi = {v: k for k, v in self.itos.items()}

    def __len__(self):
        # Mengembalikan jumlah kata dalam vocabulary
        return len(self.itos)

    def build_vocabulary(self, sentences):
        # Membuat vocabulary berdasarkan frekuensi kemunculan kata dalam kalimat
        # Menghitung frekuensi kata dari setiap kalimat
        frequencies = Counter(itertools.chain(*[sentence.lower().split() for sentence in sentences]))
        idx = 4  # Indeks dimulai dari 4 karena indeks 0-3 sudah dipakai untuk <PAD>, <SOS>, <EOS>, <UNK>
        
        # Memasukkan kata-kata yang frekuensinya lebih besar atau sama dengan freq_threshold ke dalam vocabulary
        for word, freq in frequencies.items():
            if freq >= self.freq_threshold:
                self.stoi[word] = idx  # Menambahkan kata ke kamus string-to-index
                self.itos[idx] = word  # Menambahkan indeks ke kamus index-to-string
                idx += 1  # Increment indeks
        
        # Pastikan token <UNK> ada dalam vocabulary
        if "<UNK>" not in self.stoi:
            self.stoi["<UNK>"] = 3
            self.itos[3] = "<UNK>"

    def numericalize(self, text):
        # Mengubah kalimat (teks) menjadi daftar angka berdasarkan vocabulary
        # Jika kata tidak ada dalam vocabulary, akan digantikan dengan token <UNK>
        tokenized_text = text.lower().split()
        return [
            self.stoi.get(word, self.stoi["<UNK>"])  # Mengambil indeks kata, atau <UNK> jika kata tidak ada
            for word in tokenized_text
        ]


In [365]:
# Path data dan file captions
data_dir = 'Images'  # Direktori gambar
captions_file = os.path.join(data_dir, 'captions.txt')  # File captions

# List untuk menyimpan path gambar dan caption
image_paths, captions = [], []

# Membaca dan memproses file captions.txt
with open(captions_file, 'r') as file:
    for line in file:
        parts = line.strip().split('\t')
        if len(parts) == 2:
            image_path, caption = parts
            image_paths.append(os.path.join(data_dir, image_path))  # Gabungkan path gambar
            captions.append(caption)  # Simpan caption
        else:
            print(f"Skipping improperly formatted line: {line}")

# Membangun vocabulary dari captions
vocab = Vocabulary(freq_threshold=2)  # Threshold kata muncul minimal 2 kali
vocab.build_vocabulary(captions)  # Bangun vocabulary

# Tentukan panjang maksimal caption (misalnya, panjang maksimum dari caption yang ada)
max_length = max(len(caption.split()) for caption in captions)  # Tentukan panjang maksimal

# Tokenisasi caption
captions_tokenized = [
    [vocab.stoi["<SOS>"]] + vocab.numericalize(caption) + [vocab.stoi["<EOS>"]]  # Tokenisasi dengan <SOS> dan <EOS>
    for caption in captions
]

print("Sample tokenized captions:", captions_tokenized[:5])  # Tampilkan 5 caption tokenized pertama


Sample tokenized captions: [[1, 4, 5, 6, 7, 8, 2], [1, 9, 10, 11, 12, 13, 11, 14, 15, 16, 2], [1, 17, 6, 18, 19, 12, 13, 11, 14, 10, 20, 21, 15, 16, 2], [1, 4, 22, 12, 23, 4, 20, 21, 2], [1, 9, 24, 15, 25, 2]]


In [366]:
# Definisikan transformasi untuk gambar
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Kelas untuk dataset gambar dan caption
class ImageCaptionDataset(Dataset):
    def __init__(self, image_paths, captions, transform=None):
        self.image_paths = image_paths
        self.captions = captions
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Membaca gambar
        image = Image.open(self.image_paths[idx]).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Mendapatkan caption dan mengubahnya menjadi tensor
        caption = self.captions[idx]
        caption = torch.tensor(caption, dtype=torch.long)  # Ubah ke tensor

        return image, caption

# Misalnya, captions_tokenized sudah berisi caption yang sudah diproses
# Membuat dataset
dataset = ImageCaptionDataset(image_paths=image_paths, captions=captions_tokenized, transform=transform)

In [368]:
from torch.nn.utils.rnn import pack_padded_sequence

# Fungsi custom collate untuk melakukan padding pada caption
def collate_fn(batch):
    # Memisahkan gambar dan caption dari batch
    images, captions = zip(*batch)
    
    # Stack gambar menjadi tensor
    images = torch.stack(images, 0)
    
    # Ubah setiap caption menjadi tensor
    captions = [torch.tensor(caption) for caption in captions]
    
    # Padding caption dalam batch ke panjang maksimum di dalam batch
    padded_captions = pad_sequence(captions, batch_first=True, padding_value=vocab.stoi["<PAD>"])
    
    # Menyimpan panjang asli dari setiap caption (untuk digunakan dalam model, misalnya untuk mask atau pemrosesan)
    caption_lengths = torch.tensor([len(caption) for caption in captions])

    return images, padded_captions, caption_lengths

# Update DataLoader untuk menggunakan collate_fn custom
dataloader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)

# Testing DataLoader untuk memastikan fungsi berjalan dengan benar
images, captions_batch, _ = next(iter(dataloader))
print("Ukuran batch gambar:", images.size())  # Menampilkan ukuran tensor gambar
print("Batch caption:", captions_batch)      # Menampilkan batch caption setelah padding

Ukuran batch gambar: torch.Size([4, 3, 224, 224])
Batch caption: tensor([[   1,  224,  427,   27,    9,  379,    2,    0,    0],
        [   1,  192,   10,   94,  259,  219,    2,    0,    0],
        [   1, 1354,   40,   59,    2,    0,    0,    0,    0],
        [   1,  226,   99,   37,    6,  779,   27,  863,    2]])


  captions = [torch.tensor(caption) for caption in captions]


In [None]:
class CNNEncoder(nn.Module):
    def __init__(self, embed_size):
        super(CNNEncoder, self).__init__()
        # Layer konvolusi pertama untuk ekstraksi fitur dari gambar (RGB)
        self.conv = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1)
        
        # Fully connected layer untuk mengubah hasil ekstraksi fitur menjadi vektor berdimensi embed_size
        self.fc = nn.Linear(16 * 112 * 112, embed_size)  # Flatten hasil dari konvolusi ke vektor

    def forward(self, images):
        # Proses gambar melalui layer konvolusi
        x = self.conv(images)
        
        # Flatten hasil konvolusi menjadi vektor satu dimensi
        x = x.view(x.size(0), -1)  # Mengubah dimensi ke (batch_size, -1)
        
        # Proses hasil flatten ke dalam fully connected layer untuk menghasilkan fitur gambar
        x = self.fc(x)
        return x

class LSTMDecoder(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(LSTMDecoder, self).__init__()
        # Embedding layer untuk mengubah kata menjadi representasi vektor berdimensi embed_size
        self.embed = nn.Embedding(vocab_size, embed_size)
        
        # LSTM untuk memproses urutan kata (caption) dan menghasilkan prediksi kata berikutnya
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers)
        
        # Fully connected layer untuk mengubah output LSTM menjadi prediksi kata
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, features, captions):
        # Ubah caption ke dalam bentuk embedding vektor
        embeddings = self.embed(captions)
        
        # Gabungkan fitur gambar dengan embedding caption pada dimensi pertama
        # features.unsqueeze(1) menambahkan dimensi baru untuk fitur gambar
        embeddings = torch.cat((features.unsqueeze(1), embeddings), dim=1)
        
        # Proses urutan (fitur gambar + kata-kata) melalui LSTM
        lstm_out, _ = self.lstm(embeddings)
        
        # Ubah output LSTM menjadi prediksi kata
        outputs = self.fc(lstm_out)  # Output berupa prediksi kata untuk setiap waktu langkah
        return outputs


In [None]:
# Inisialisasi parameter model
embed_size = 256  # Ukuran vektor embedding untuk kata-kata dalam caption
hidden_size = 256  # Ukuran hidden state untuk LSTM
vocab_size = len(vocab)  # Jumlah kata dalam vocabulary (jumlah total kata unik)

# Inisialisasi encoder (CNNEncoder)
encoder = CNNEncoder(embed_size)  # Encoder ini mengubah gambar menjadi representasi vektor

# Inisialisasi decoder (LSTMDecoder)
decoder = LSTMDecoder(embed_size, hidden_size, vocab_size)  # Decoder ini menghasilkan urutan kata berdasarkan fitur gambar

In [None]:
encoder.to(device)

CNNEncoder(
  (conv): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (fc): Linear(in_features=200704, out_features=256, bias=True)
)

In [350]:
decoder.to(device)

LSTMDecoder(
  (embed): Embedding(1948, 256)
  (lstm): LSTM(256, 256)
  (fc): Linear(in_features=256, out_features=1948, bias=True)
)

In [None]:
from torch.optim.lr_scheduler import StepLR

# Definisikan fungsi loss untuk pelatihan
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<PAD>"])  # Loss function mengabaikan padding token
optimizer = optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=0.001)  # Optimizer menggunakan Adam

# Jumlah epoch pelatihan
num_epochs = 5

# Scheduler untuk mengatur learning rate, dengan step_size=2 dan pengurangan 0.5 setiap 2 epoch
scheduler = StepLR(optimizer, step_size=2, gamma=0.5)

# Loop pelatihan utama
for epoch in range(num_epochs):
    encoder.train()  # Set encoder ke mode pelatihan
    decoder.train()  # Set decoder ke mode pelatihan
    
    # Iterasi melalui batch dari dataloader
    for images, captions, lengths in dataloader:
        images = images.to(device)  # Pindahkan batch gambar ke device (misal: GPU)
        captions = captions.to(device)  # Pindahkan batch caption ke device
        
        # Forward pass
        features = encoder(images)  # Proses gambar melalui encoder untuk mendapatkan fitur
        outputs = decoder(features, captions[:, :-1])  # LSTMDecoder menerima fitur dan caption tanpa token akhir

        # Target adalah caption tanpa token awal
        targets = captions[:, 1:]
        outputs = outputs[:, :targets.shape[1], :]  # Sesuaikan ukuran output dan target

        # Hitung loss
        loss = criterion(outputs.contiguous().view(-1, vocab_size), targets.contiguous().view(-1))

        # Backward pass dan optimasi dengan clipping gradien
        optimizer.zero_grad()  # Bersihkan gradien sebelumnya
        loss.backward()  # Backpropagation untuk menghitung gradien
        torch.nn.utils.clip_grad_norm_(encoder.parameters(), 1.0)  # Batasi gradien encoder
        torch.nn.utils.clip_grad_norm_(decoder.parameters(), 1.0)  # Batasi gradien decoder
        optimizer.step()  # Update parameter model
    
    # Update learning rate setelah tiap epoch sesuai dengan scheduler
    scheduler.step()  
    
    # Tampilkan loss setiap epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")


  captions = [torch.tensor(caption) for caption in captions]


Epoch [1/5], Loss: 4.5618
Epoch [2/5], Loss: 3.4393
Epoch [3/5], Loss: 3.2844
Epoch [4/5], Loss: 4.3584
Epoch [5/5], Loss: 3.0963


In [378]:
import torch
import torchvision.transforms as transforms
from PIL import Image

# Function to preprocess the image
def preprocess_image(image_path, device):
    transform = transforms.Compose([
        transforms.Resize((224, 224)),  # Resize to match encoder input size
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    image = Image.open(image_path).convert("RGB")
    image = transform(image).unsqueeze(0).to(device)  # Add batch dimension and move to GPU
    return image

# Function to generate a caption
def generate_caption(image_path, encoder, decoder, vocab, device, max_length=20):
    encoder.eval()
    decoder.eval()
    
    # Move models to the device (GPU)
    encoder = encoder.to(device)
    decoder = decoder.to(device)
    
    # Preprocess the image and move it to GPU
    image = preprocess_image(image_path, device)
    
    # Extract features from the image
    with torch.no_grad():
        features = encoder(image)  # Features will now be on the same device as the model
    
    # Initialize the input to the decoder with the <SOS> token
    caption = []
    input_word = torch.tensor([vocab.stoi["<SOS>"]]).unsqueeze(0).to(device)  # Move <SOS> token to device
    
    # Generate words one-by-one
    # Generate words one-by-one
    for _ in range(max_length):
        with torch.no_grad():
            # Pass the features to the decoder
            output, _ = decoder.lstm(decoder.embed(input_word), (features.unsqueeze(0), torch.zeros_like(features).unsqueeze(0)))
            output = decoder.fc(output.squeeze(1))

            # Debugging: Print the output probabilities and the predicted word
            print(f"Output logits at step {_}: {output}")
            predicted = output.argmax(1)  # Get the index of the best word
            print(f"Predicted word index: {predicted.item()}")

        # Convert index to word
        word = vocab.itos[predicted.item()]
        if word == "<EOS>":
            break
        caption.append(word)
        
        # Set the input for the next step as the predicted word
        input_word = predicted.unsqueeze(0)
    
    # Join the words to form the final caption
    return " ".join(caption)

# Example usage
image_path = "test/test.jpg"  # Replace with the path to your image
device = torch.device("mps")  # Check if GPU is available
caption = generate_caption(image_path, encoder, decoder, vocab, device)
print("Generated Caption:", caption)

Output logits at step (tensor([[[-0.7616, -0.7616,  0.0000,  0.0000, -0.7616, -0.0000,  0.0000,
           0.0000, -0.0000,  0.0000,  0.0000, -0.0000, -0.7616,  0.7616,
           0.0000,  0.0000,  0.0000,  0.0000, -0.0000,  0.0000,  0.0000,
           0.0000,  0.0000,  0.0000,  0.0000,  0.0000,  0.0000,  0.0000,
           0.0000,  0.0000,  0.0000,  0.7616, -0.7616, -0.0000, -0.0000,
           0.7616, -0.7616,  0.0000,  0.0000,  0.0000,  0.0000,  0.0000,
           0.7616,  0.0000, -0.0000,  0.7616,  0.0000,  0.0000,  0.7616,
           0.0000, -0.7616,  0.7616,  0.0000,  0.7616, -0.0000,  0.7616,
           0.0000,  0.0000,  0.0000,  0.0000,  0.0000,  0.7616, -0.0000,
           0.0000,  0.0000,  0.0000,  0.0000,  0.0000,  0.7616,  0.0000,
           0.0000,  0.0000,  0.0000,  0.0000,  0.0000,  0.7616,  0.0000,
           0.0000,  0.7616,  0.0000,  0.0000,  0.0000,  0.0000,  0.0000,
           0.7616,  0.0000,  0.0000,  0.0000,  0.0000,  0.0000, -0.7616,
           0.0000,  0.7616, 

In [None]:
# Kalo caption kek gini, mending ga dibaca... 
# Sistem 🤖 : Sabar icik bos