In [None]:
# Step 1: Import Libraries
import os
import math
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
from nltk.translate.bleu_score import corpus_bleu
# from nltk.translate.meteor_score import single_meteor_score
from gensim.models import KeyedVectors
from torch.nn.utils.rnn import pad_sequence

from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torchvision

from transformers import AutoTokenizer
os.environ["TOKENIZERS_PARALLELISM"] = "false"

from tqdm.notebook import trange, tqdm

from torch.distributions import Categorical

torch.backends.cuda.matmul.allow_tf32 = True

In [None]:
import os
import nltk
from nltk.corpus import wordnet

!unzip /usr/share/nltk_data/corpora/wordnet.zip -d /usr/share/nltk_data/corpora/



In [None]:
!pip install -U nltk rouge-score

In [None]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from rouge_score import rouge_scorer


In [None]:
pip install transformers

In [None]:
from transformers import ViTModel, ViTFeatureExtractor

In [None]:
# Memuat pre-trained model dan feature extractor
vit_model = ViTModel.from_pretrained("google/vit-base-patch16-224-in21k")
feature_extractor = ViTFeatureExtractor.from_pretrained("google/vit-base-patch16-224-in21k")

In [None]:
# Step 2: Load Dataset
work_directory = "/kaggle/input/deep-learning-ic-dataset/"
data_path = os.path.join(work_directory, "captions.csv")
data = pd.read_csv(data_path)

In [None]:
from pathlib import Path
temp_directory = Path('../temp')
temp_directory.mkdir(exist_ok=True)

In [None]:
data.head(5)

In [None]:
data['filepath'] = data['filepath'].apply(lambda x: os.path.join(work_directory, x))

In [None]:
# Split into train and validation sets
train_data, val_data = train_test_split(data, test_size=0.2, random_state=42)

In [None]:
# Save the split datasets for easier access later (optional)
train_csv_path = os.path.join(temp_directory, "train_captions.csv")
val_csv_path = os.path.join(temp_directory, "val_captions.csv")

In [None]:
train_data.to_csv(train_csv_path, index=False)
val_data.to_csv(val_csv_path, index=False)

In [None]:
class CustomImageCaptionDataset(Dataset):
    def __init__(self, data_frame, transform=None):
        self.data = data_frame

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Get file path and caption
        image_info = self.data.iloc[idx]
        image_path = image_info['filepath']  # Use the filepath column directly
        caption = image_info['caption']

        # Load and transform the image
        image = Image.open(image_path).convert('RGB')  # Pastikan format PIL.Image
       
        return image, caption

In [None]:
image_size = 128

In [None]:
# Transforms

val_transform = transforms.Compose([
    transforms.Resize(image_size),
    transforms.CenterCrop(image_size),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Create datasets
train_dataset = CustomImageCaptionDataset(
    data_frame=train_data,

)

val_dataset = CustomImageCaptionDataset(
    data_frame=val_data,
)


In [None]:
batch_size = 32

In [None]:
# We'll use a pre-built Tokenizer for the BERT Model
# https://towardsdatascience.com/bert-explained-state-of-the-art-language-model-for-nlp-f8b21a9b6270
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

In [None]:
def collate_fn(batch):
    images, captions = zip(*batch)
    # Tokenize dynamically
    images, captions = zip(*batch)
    return images, list(captions)  # Return captions as a list of raw strings


In [None]:
# Data Loaders
data_loader_train = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn
)

data_loader_val = DataLoader(
    val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn
)

In [None]:
dataiter = next(iter(data_loader_val))
test_images, test_captions = dataiter

In [None]:
# Convert the PIL images to tensors using ViTFeatureExtractor
inputs = feature_extractor(images=[test_images[1]], return_tensors="pt")  # Process a single image

# Extract the image tensor
image_tensor = inputs["pixel_values"][0]  # Shape: [C, H, W]

# Visualize the image
plt.figure(figsize=(3, 3))
plt.imshow(image_tensor.permute(1, 2, 0).numpy())  # Permute to [H, W, C] for visualization
plt.show()

# Print the corresponding caption
caption = test_captions[1]
print(caption)


In [None]:
tokenizer.vocab_size

In [None]:
tokens = tokenizer(test_captions, padding=True, truncation=True, return_tensors="pt")

In [None]:
tokens['attention_mask']

In [None]:
token_ids = tokens['input_ids'][0]
tokens['input_ids']

In [None]:
print(tokenizer.decode(token_ids))

In [None]:
tokenizer.decode(1)

In [None]:
class TokenDrop(nn.Module):
    """For a batch of tokens indices, randomly replace a non-specical token.
    
    Args:
        prob (float): probability of dropping a token
        blank_token (int): index for the blank token
        num_special (int): Number of special tokens, assumed to be at the start of the vocab
    """

    def __init__(self, prob=0.1, blank_token=103 , eos_token=102):
        self.prob = prob
        self.eos_token = eos_token
        self.blank_token = blank_token

    def __call__(self, sample):
        # Randomly sample a bernoulli distribution with p=prob
        # to create a mask where 1 means we will replace that token
        mask = torch.bernoulli(self.prob * torch.ones_like(sample)).long()
        
        # only replace if the token is not the eos token
        can_drop = (~(sample == self.eos_token)).long()
        mask = mask * can_drop
        
        # Do not replace the sos tokens
        mask[:, 0] = torch.zeros_like(mask[:, 0]).long()
        
        replace_with = (self.blank_token * torch.ones_like(sample)).long()
        
        sample_out = (1 - mask) * sample + mask * replace_with
        
        return sample_out

In [None]:
def extract_patches(image_tensor, patch_size=16):
    # Get the dimensions of the image tensor
    bs, c, h, w = image_tensor.size()
    
    # Define the Unfold layer with appropriate parameters
    unfold = torch.nn.Unfold(kernel_size=patch_size, stride=patch_size)
    
    # Apply Unfold to the image tensor
    unfolded = unfold(image_tensor)
    
    # Reshape the unfolded tensor to match the desired output shape
    # Output shape: BSxLxH, where L is the number of patches in each dimension
    unfolded = unfolded.transpose(1, 2).reshape(bs, -1, c * patch_size * patch_size)
    
    return unfolded

# sinusoidal positional embeds
class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, x):
        device = x.device
        half_dim = self.dim // 2
        emb = math.log(10000) / (half_dim - 1)
        emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
        emb = x[:, None] * emb[None, :]
        emb = torch.cat((emb.sin(), emb.cos()), dim=-1)
        return emb
    
    
# Define a decoder module for the Transformer architecture
class Decoder(nn.Module):
    def __init__(self, num_emb, hidden_size=768, num_layers=3, num_heads=4):
        super(Decoder, self).__init__()

        # Create an embedding layer for tokens
        self.embedding = nn.Embedding(num_emb, hidden_size)

        # Positional embeddings
        self.pos_emb = SinusoidalPosEmb(hidden_size)

        # Transformer decoder layers
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=hidden_size, nhead=num_heads,
            dim_feedforward=hidden_size * 4, dropout=0.0,
            batch_first=True
        )
        self.decoder_layers = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

        # Output layer
        self.fc_out = nn.Linear(hidden_size, num_emb)

    def forward(self, input_seq, encoder_output, input_padding_mask=None, encoder_padding_mask=None):
        # Embedding and positional embeddings
        input_embs = self.embedding(input_seq)
        bs, l, h = input_embs.shape
        pos_emb = self.pos_emb(torch.arange(l, device=input_seq.device)).reshape(1, l, h).expand(bs, l, h)
        embs = input_embs + pos_emb
    
        # Handle optional padding mask
        #if input_padding_mask is not None:
            #print("Padding Mask Shape:", input_padding_mask.shape)
            #print("Padding Mask (Sample):", input_padding_mask[0])
    
        # Causal mask
        causal_mask = torch.triu(torch.ones(l, l, device=input_seq.device), 1).bool()
        #print("Causal Mask Shape:", causal_mask.shape)
    
        # Pass through transformer decoder layers
        output = self.decoder_layers(
            tgt=embs, memory=encoder_output, tgt_mask=causal_mask,
            tgt_key_padding_mask=input_padding_mask, memory_key_padding_mask=encoder_padding_mask
        )
        return self.fc_out(output)


    
# Define an Vision Encoder-Decoder module for the Transformer architecture
class VisionEncoderDecoder(nn.Module):
  
    def __init__(self, encoder, decoder):
        super(VisionEncoderDecoder, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, input_image, target_seq, padding_mask):
        # Use input_image directly (already preprocessed in the training loop)
        encoder_outputs = self.encoder(**input_image).last_hidden_state
    
        # Ensure padding_mask is bool
        padding_mask = padding_mask.bool()
    
        # Decode using the decoder
        decoded_seq = self.decoder(input_seq=target_seq, encoder_output=encoder_outputs,
                                   input_padding_mask=padding_mask)
        return decoded_seq

In [None]:
learning_rate = 1e-5

In [None]:
# Check if GPU is available, set device accordingly
device = torch.device(1 if torch.cuda.is_available() else 'cpu')

# Embedding Size
hidden_size = 768

# Number of Transformer blocks for the (Encoder, Decoder)
num_layers = (6, 6)

# MultiheadAttention Heads
num_heads = 8

# Size of the patches
patch_size = 8

# Create model
caption_model = VisionEncoderDecoder(
    encoder=vit_model,  # Ganti vit_model menjadi encoder
    decoder=Decoder(num_emb=tokenizer.vocab_size, hidden_size=hidden_size,
                    num_layers=num_layers[1], num_heads=num_heads)
).to(device)

# Initialize the optimizer with above parameters
optimizer = optim.Adam(caption_model.parameters(), lr=learning_rate)

scaler = torch.cuda.amp.GradScaler()

# Define the loss function
loss_fn = nn.CrossEntropyLoss(reduction="none")

td = TokenDrop(0.5)

# Initialize the training loss logger
training_loss_logger = []

In [None]:
# See how many Parameters our Model has!
num_model_params = 0
for param in caption_model.parameters():
    num_model_params += param.flatten().shape[0]

print("Number of Model Parameters : %d or >%d Juta Params!" % (num_model_params, num_model_params//1e6))

In [None]:
nepochs = 50
training_loss_logger = []
eval_loss_logger = []
eval_bleu_logger = []
eval_meteor_logger = []
eval_rouge_logger = []

In [None]:
# Initialize ROUGE scorer
rouge = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)


# Initialize metric loggers
eval_bleu_scores = []
eval_meteor_scores = []
eval_rouge_scores = {'rouge1': [], 'rouge2': [], 'rougeL': []}

In [None]:
#print(rouge_scores.keys())

In [None]:
max_length = 25

In [None]:
# Define the EarlyStopping Class
class EarlyStopping:
    def __init__(self, patience=5, verbose=False, delta=0.0, path='best_model.pt'):
        self.patience = patience
        self.verbose = verbose
        self.delta = delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
        self.path = path
        self.best_model_state = None

    def __call__(self, current_loss, model):
        if self.best_loss is None:
            self.best_loss = current_loss
            self.save_checkpoint(model)
        elif current_loss > self.best_loss - self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = current_loss
            self.save_checkpoint(model)
            self.counter = 0

    def save_checkpoint(self, model):
        """Saves model when validation loss decreases."""
        torch.save(model.state_dict(), self.path)
        if self.verbose:
            print(f'Validation loss decreased. Saving model to {self.path}')

In [None]:
# Initialize Early Stopping with model checkpoint path
early_stopping = EarlyStopping(patience=5, verbose=True, path='best_model.pt')

In [None]:
print("Length of Training Loss Logger:", len(training_loss_logger))
print("Training Loss Logger Values:", training_loss_logger[:10])


In [None]:
# Iterate over epochs
for epoch in trange(0, nepochs, leave=False, desc="Epoch"):
    # Set the model in training mode
    caption_model.train()

    # Track training loss for this epoch
    epoch_train_loss = 0.0
    num_train_batches = 0
    
    # Iterate over the training data loader
    for images, captions in tqdm(data_loader_train, desc="Training", leave=False):
        # Preprocess images using ViTFeatureExtractor
        inputs = feature_extractor(images=images, return_tensors="pt")
        inputs = {key: val.to(device) for key, val in inputs.items()}  # Send to GPU
    
        # Tokenize captions
        # Tokenize captions
        tokens = tokenizer(captions, padding=True, truncation=True, max_length=max_length, return_tensors="pt")
        token_ids = tokens['input_ids'].to(device)
        
        # Convert attention mask to bool
        padding_mask = (tokens['attention_mask'] == 0).to(device)
    
        # Shift target sequence
        bs = token_ids.size(0)
        target_ids = torch.cat((token_ids[:, 1:], torch.zeros(bs, 1, device=device).long()), 1)
    
        # Token drop augmentation
        #tokens_in = td(token_ids)
        tokens_in = token_ids

        with torch.cuda.amp.autocast():
            # Forward pass with preprocessed inputs
            pred = caption_model(inputs, tokens_in, padding_mask=padding_mask)
    
        # Compute loss
        pad_token_id = tokenizer.pad_token_id
        loss_mask = (~(target_ids == pad_token_id)).float()
        loss_vals = loss_fn(pred.transpose(1, 2), target_ids)

        # Print intermediate debugging info
        #print("Loss Values (sample):", loss_vals[0, :10])
        #print("Loss Mask (sample):", loss_mask[0, :10])
        #print("Loss Mask Sum:", loss_mask.sum().item())
        #print("Pred shape:", pred.shape)
        #print("Target IDs shape:", target_ids.shape)
        #print("Max target token:", target_ids.max().item())
        #print("Min target token:", target_ids.min().item())
        #print("Vocab size:", tokenizer.vocab_size)

        # Compute final loss
        loss = (loss_vals * loss_mask).sum() / loss_mask.sum()
        #print("Final Loss:", loss.item())

        # Backpropagation
        optimizer.zero_grad()
        #scaler.scale(loss).backward()
        
        # Accumulate loss for this epoch
        epoch_train_loss += loss.item()
        num_train_batches += 1
        
        #torch.nn.utils.clip_grad_norm_(caption_model.parameters(), max_norm=1.0)
        
        #scaler.step(optimizer)
        #scaler.update()
    
        # For debugging, you can try without AMP and scaler to isolate issues:
        loss.backward()
        torch.nn.utils.clip_grad_norm_(caption_model.parameters(), max_norm=1.0)
        optimizer.step()

        # Log average training loss for the epoch
        #avg_train_loss = epoch_train_loss / num_train_batches
        #training_loss_logger.append(avg_train_loss)

        # If the loss is NaN at this point, you can stop and inspect the printed values.
        if torch.isnan(loss):
            print("Encountered NaN loss!")
            break
        
    # Log average training loss for the epoch
    avg_train_loss = epoch_train_loss / num_train_batches
    training_loss_logger.append(avg_train_loss)
    #print(f"Epoch {epoch + 1}/{nepochs} - Avg Training Loss: {avg_train_loss:.4f}")

    
    # Set the model in eval mode
    caption_model.eval()
    epoch_eval_loss = 0.0
    num_eval_batches = 0
    bleu_scores = []
    meteor_scores = []
    rouge1_scores = []
    rouge2_scores = []
    rougel_scores = []
    
    with torch.no_grad():
        # Iterate over the training data loader
        for images, captions in tqdm(data_loader_val, desc="Eval", leave=False):

            # Preprocess images using ViTFeatureExtractor
            inputs = feature_extractor(images=images, return_tensors="pt")
            inputs = {key: val.to(device) for key, val in inputs.items()}  # Send to GPU
        
            # Tokenize captions
            tokens = tokenizer(captions, padding=True, truncation=True, max_length=max_length, return_tensors="pt")
            token_ids = tokens['input_ids'].to(device)
            # Convert attention mask to boolean padding mask
            padding_mask = (tokens['attention_mask'] == 0).to(device)

            # Shift target sequence
            bs = token_ids.size(0)
            target_ids = torch.cat((token_ids[:, 1:], torch.zeros(bs, 1, device=device).long()), 1)
            
            with torch.amp.autocast(device_type='cuda'):
                # Forward pass with preprocessed inputs
                pred = caption_model(inputs, token_ids, padding_mask=padding_mask)

            # Compute the loss
            loss_mask = (~(target_ids == 0)).float()
            loss = (loss_fn(pred.transpose(1, 2), target_ids) * loss_mask).sum() / loss_mask.sum()

            # Accumulate validation loss for this epoch
            epoch_eval_loss += loss.item()
            num_eval_batches += 1
            #eval_loss_logger.append(loss.item())

            # Decode predictions
            pred_ids = torch.argmax(pred, dim=-1)
            predicted_captions = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    
            # Calculate metrics for each pair of prediction and ground truth
            for pred_caption, gt_caption in zip(predicted_captions, captions):
                # Tokenize ground truth and hypothesis
                #reference = [gt_caption.split()]
                #hypothesis = pred_caption.split()

                reference = [tokenizer.tokenize(gt_caption.lower())]
                hypothesis = tokenizer.tokenize(pred_caption.lower())

                
                # BLEU Score
                smoothing_fn = SmoothingFunction().method1
                bleu = sentence_bleu(reference, hypothesis, smoothing_function=smoothing_fn)
                bleu_scores.append(bleu)
    
                # METEOR Score
                meteor = meteor_score(reference, hypothesis)
                meteor_scores.append(meteor)
    
                # ROUGE Scores
                scores = rouge.score(gt_caption, pred_caption)
                rouge1_scores.append(scores['rouge1'].fmeasure)
                rouge2_scores.append(scores['rouge2'].fmeasure)
                rougel_scores.append(scores['rougeL'].fmeasure)
            
            
            # Log validation loss
            #eval_loss_logger.append(loss.item())

    # Log average evaluation loss for the epoch
    avg_eval_loss = epoch_eval_loss / num_eval_batches if num_eval_batches > 0 else 0.0
    eval_loss_logger.append(avg_eval_loss)


    # Log average validation loss and BLEU score for the epoch
    avg_eval_loss = epoch_eval_loss / num_eval_batches if num_eval_batches > 0 else 0.0
    avg_bleu_score = np.mean(bleu_scores) if bleu_scores else 0.0
    avg_meteor_score = np.mean(meteor_scores) if meteor_scores else 0.0
    avg_rouge1 = np.mean(rouge1_scores) if rouge1_scores else 0.0
    avg_rouge2 = np.mean(rouge2_scores) if rouge2_scores else 0.0
    avg_rougeL = np.mean(rougel_scores) if rougel_scores else 0.0

    #eval_loss_logger.append(avg_eval_loss)
    eval_bleu_logger.append(avg_bleu_score)
    eval_meteor_logger.append(avg_meteor_score)
    eval_rouge_logger.append({'rouge1': avg_rouge1, 'rouge2': avg_rouge2, 'rougeL': avg_rougeL})
    
    print(f"Epoch {epoch + 1}/{nepochs} - Avg Eval Loss: {avg_eval_loss:.4f} - "
          f"Avg BLEU: {avg_bleu_score:.4f} - Avg Meteor: {avg_meteor_score:.4f} - "
          f"Avg Rouge1: {avg_rouge1:.4f}, Rouge2: {avg_rouge2:.4f}, RougeL: {avg_rougeL:.4f}")

    # Early Stopping Check
    early_stopping(avg_eval_loss, caption_model)
    if early_stopping.early_stop:
        print("Early stopping triggered. Restoring the best model.")
        caption_model.load_state_dict(torch.load('best_model.pt'))
        break
   

In [None]:
# Save the model's state dict
torch.save(caption_model.state_dict(), "caption_model_state_dict.pth")


In [None]:
# Save model and optimizer state dicts
torch.save({
    'epoch': nepochs,
    'model_state_dict': caption_model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': training_loss_logger[-1],
}, "checkpoint.pth")


In [None]:
from IPython.display import FileLink
FileLink(r'checkpoint.pth')


In [None]:
#eval_metrics_logger = {
    #"BLEU": avg_bleu,
    #"METEOR": avg_meteor,
    #"ROUGE-1": avg_rouge_1,
    #"ROUGE-2": avg_rouge_2,
    #"ROUGE-L": avg_rouge_l
#}

In [None]:
print("Sample caption:", captions[0])
print("Decoded:", tokenizer.decode(token_ids[0]))


In [None]:
print(len(training_loss_logger), len(eval_loss_logger), len(eval_bleu_logger))

In [None]:
len(train_dataset)

Training + Eval Loss

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Assume avg_training_loss_logger and avg_eval_loss_logger contain epoch-level losses
avg_training_loss_logger = training_loss_logger  # Replace with epoch-wise averaged training loss
avg_eval_loss_logger = eval_loss_logger          # Replace with epoch-wise averaged evaluation loss

# Define the range and step size for y-axis ticks
y_min, y_max = min(min(avg_training_loss_logger), min(avg_eval_loss_logger)), \
               max(max(avg_training_loss_logger), max(avg_eval_loss_logger))
step = (y_max - y_min) / 10  # Set 10 evenly spaced ticks (adjust if needed)
y_ticks = np.arange(y_min, y_max + step, step)

# Create the plot
plt.figure(figsize=(10, 5))

# Plot average training loss per epoch
plt.plot(range(1, len(avg_training_loss_logger) + 1), avg_training_loss_logger, 
         label="Training Loss", color='tab:blue')

# Plot average evaluation loss per epoch
plt.plot(range(1, len(avg_eval_loss_logger) + 1), avg_eval_loss_logger, 
         label="Evaluation Loss", color='tab:orange')

# Apply custom y-ticks
plt.yticks(y_ticks)

# Add labels, title, legend, and grid
plt.title("Training and Evaluation Loss per Epoch")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)

# Show the plot
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Define the range and step size for y-axis ticks for both training and eval losses
y_min, y_max = min(min(training_loss_logger), min(eval_loss_logger)), max(max(training_loss_logger), max(eval_loss_logger))
step = (y_max - y_min) / 10  # Set 10 evenly spaced ticks (adjust if needed)
y_ticks = np.arange(y_min, y_max + step, step)

# Create the plot
_ = plt.figure(figsize=(10, 5))

# Plot training loss
_ = plt.plot(training_loss_logger, label="Training Loss", color='tab:blue')

# Plot evaluation loss
_ = plt.plot(eval_loss_logger, label="Evaluation Loss", color='tab:orange')

# Apply custom y-ticks
_ = plt.yticks(y_ticks)

# Add labels, title, legend, and grid
_ = plt.title("Training and Evaluation Loss")
_ = plt.xlabel("Epochs")
_ = plt.ylabel("Loss")
_ = plt.legend()
_ = plt.grid(True)

# Show the plot
plt.show()


Bleu Score

In [None]:
# Define the range and step size for y-axis ticks
y_min, y_max = min(eval_bleu_logger), max(eval_bleu_logger)
step = (y_max - y_min) / 10  # Set 10 evenly spaced ticks (adjust if needed)
y_ticks = np.arange(y_min, y_max + step, step)

_ = plt.figure(figsize=(10, 5))
_ = plt.plot(eval_bleu_logger[:])
_ = plt.title("Bleu Score")

# Apply custom y-ticks with fewer steps
_ = plt.yticks(y_ticks)
plt.show()

Bleu + METEOR Score

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Define the range and step size for y-axis ticks for both training and eval losses
y_min, y_max = min(min(eval_bleu_logger), min(eval_meteor_logger)), max(max(eval_bleu_logger), max(eval_meteor_logger))
step = (y_max - y_min) / 10  # Set 10 evenly spaced ticks (adjust if needed)
y_ticks = np.arange(y_min, y_max + step, step)

# Create the plot
_ = plt.figure(figsize=(10, 5))

# Plot training loss
_ = plt.plot(eval_bleu_logger, label="Bleu", color='tab:blue')

# Plot evaluation loss
_ = plt.plot(eval_meteor_logger, label="Meteor", color='tab:orange')

# Apply custom y-ticks
_ = plt.yticks(y_ticks)

# Add labels, title, legend, and grid
_ = plt.title("Bleu and Meteor Value")
_ = plt.xlabel("Epochs")
_ = plt.ylabel("Loss")
_ = plt.legend()
_ = plt.grid(True)

# Show the plot
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Extract ROUGE scores
rouge1_scores = [entry['rouge1'] for entry in eval_rouge_logger]
rouge2_scores = [entry['rouge2'] for entry in eval_rouge_logger]
rougeL_scores = [entry['rougeL'] for entry in eval_rouge_logger]

# Define the range and step size for y-axis ticks for ROUGE scores
all_rouge_scores = rouge1_scores + rouge2_scores + rougeL_scores
y_min, y_max = min(all_rouge_scores), max(all_rouge_scores)
step = (y_max - y_min) / 10  # Set 10 evenly spaced ticks
y_ticks = np.arange(y_min, y_max + step, step)

# Create the plot
_ = plt.figure(figsize=(10, 5))

# Plot ROUGE scores
_ = plt.plot(rouge1_scores, label="ROUGE-1", color='tab:green')
_ = plt.plot(rouge2_scores, label="ROUGE-2", color='tab:red')
_ = plt.plot(rougeL_scores, label="ROUGE-L", color='tab:purple')

# Apply custom y-ticks
_ = plt.yticks(y_ticks)

# Add labels, title, legend, and grid
_ = plt.title("ROUGE Scores Across Epochs")
_ = plt.xlabel("Epochs")
_ = plt.ylabel("ROUGE Score")
_ = plt.legend()
_ = plt.grid(True)

# Show the plot
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Extract ROUGE scores
rouge1_scores = [entry['rouge1'] for entry in eval_rouge_logger]
rouge2_scores = [entry['rouge2'] for entry in eval_rouge_logger]
rougeL_scores = [entry['rougeL'] for entry in eval_rouge_logger]

# Define the range and step size for y-axis ticks for all metrics
all_metrics = eval_bleu_logger + eval_meteor_logger + rouge1_scores + rouge2_scores + rougeL_scores
y_min, y_max = min(all_metrics), max(all_metrics)
step = (y_max - y_min) / 10  # Set 10 evenly spaced ticks
y_ticks = np.arange(y_min, y_max + step, step)

# Create the plot
_ = plt.figure(figsize=(12, 6))

# Plot BLEU and Meteor scores
_ = plt.plot(eval_bleu_logger, label="BLEU", color='tab:blue')
_ = plt.plot(eval_meteor_logger, label="Meteor", color='tab:orange')

# Plot ROUGE scores
_ = plt.plot(rouge1_scores, label="ROUGE-1", color='tab:green')
_ = plt.plot(rouge2_scores, label="ROUGE-2", color='tab:red')
_ = plt.plot(rougeL_scores, label="ROUGE-L", color='tab:purple')

# Apply custom y-ticks
_ = plt.yticks(y_ticks)

# Add labels, title, legend, and grid
_ = plt.title("Evaluation Metrics Across Epochs")
_ = plt.xlabel("Epochs")
_ = plt.ylabel("Metric Value")
_ = plt.legend()
_ = plt.grid(True)

# Show the plot
plt.show()

In [None]:
# Create a dataloader itterable object
dataiter = next(iter(data_loader_val))
# Sample from the itterable object
test_images, test_captions = dataiter

In [None]:
# Choose an index within the batch
index = 5
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Sesuaikan ukuran jika perlu
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_image = transform(test_images[index]).unsqueeze(0)

In [None]:
# Lets visualise an entire batch of images!
plt.figure(figsize = (3,3))
out = torchvision.utils.make_grid(test_image, 1, normalize=True)
_ = plt.imshow(out.numpy().transpose((1, 2, 0)))
print(test_captions[index])

In [None]:
# Add the Start-Of-Sentence token to the prompt to signal the network to start generating the caption
sos_token = 101 * torch.ones(1, 1).long()

# Set the temperature for sampling during generation
temp = 0.5

In [None]:
log_tokens = [sos_token]
caption_model.eval()

with torch.no_grad():
    # Encode the input image
    with torch.amp.autocast(device_type='cuda'):
        # Forward pass
        image_embedding = caption_model.encoder(test_image.to(device)).last_hidden_state

    # Generate the answer tokens
    for i in range(50):
        input_tokens = torch.cat(log_tokens, 1)
        
        # Decode the input tokens into the next predicted tokens
        data_pred = caption_model.decoder(input_tokens.to(device), image_embedding)
        
        # Sample from the distribution of predicted probabilities
        dist = Categorical(logits=data_pred[:, -1] / temp)
        next_tokens = dist.sample().reshape(1, 1)
        
        # Append the next predicted token to the sequence
        log_tokens.append(next_tokens.cpu())
        
        # Break the loop if the End-Of-Caption token is predicted
        if next_tokens.item() == 102:
            break

In [None]:
# Convert the list of token indices to a tensor
pred_text = torch.cat(log_tokens, 1)

# Convert the token indices to their corresponding strings using the vocabulary
pred_text_strings = tokenizer.decode(pred_text[0], skip_special_tokens=True)

# Join the token strings to form the predicted text
pred_text = "".join(pred_text_strings)

In [None]:
# Lets visualise an entire batch of images!
plt.figure(figsize = (3, 3))
out = torchvision.utils.make_grid(test_image, 1, normalize=True)
_ = plt.imshow(out.numpy().transpose((1, 2, 0)))

# Print the predicted text
print(pred_text)

In [None]:
torch.save(caption_model, "/kaggle/working/ViT-Transformer_Decoder-BERT_Word_Embbed_V2.pt")

# Inference On Best Model

In [None]:
import torch
from transformers import AutoTokenizer, ViTFeatureExtractor
from PIL import Image
import matplotlib.pyplot as plt

# ===== 1. Load the Model ===== #
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the model architecture
caption_model = VisionEncoderDecoder(
    encoder=vit_model,  # Ganti vit_model menjadi encoder
    decoder=Decoder(num_emb=tokenizer.vocab_size, hidden_size=hidden_size,
                    num_layers=num_layers[1], num_heads=num_heads)
).to(device)

caption_model.load_state_dict(torch.load("best_model.pt", map_location=device))
caption_model.eval()
#

# Load the tokenizer and feature extractor
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")  # Replace if custom tokenizer
feature_extractor = ViTFeatureExtractor.from_pretrained("google/vit-base-patch16-224-in21k")

# ===== 2. User Input: Load New Image ===== #
def load_and_preprocess_image(image_path):
    """
    Load an image and preprocess it for the model.
    """
    image = Image.open(image_path).convert("RGB")
    inputs = feature_extractor(images=image, return_tensors="pt")
    return inputs["pixel_values"].to(device), image

# Prompt user for image input in Kaggle
image_path = input("Please provide the path to the new image: ")  # Example: "../input/my-image.jpg"
image_tensor, original_image = load_and_preprocess_image(image_path)

# ===== 3. Generate Caption ===== #
def generate_caption(model, image_tensor, tokenizer, max_length=30, temp=0.7):
    """
    Generate a caption for the input image using the trained model.
    """
    sos_token = torch.tensor([[tokenizer.cls_token_id]]).to(device)  # Start token
    tokens = [sos_token]

    with torch.no_grad():
        # Get image features using encoder
        image_embedding = model.encoder(pixel_values=image_tensor).last_hidden_state

        for _ in range(max_length):
            input_tokens = torch.cat(tokens, dim=1)  # Concatenate tokens
            outputs = model.decoder(input_seq=input_tokens, encoder_output=image_embedding)

            # Sample the next token with temperature
            logits = outputs[:, -1] / temp  # Use logits of last token
            next_token = torch.argmax(logits, dim=-1).unsqueeze(1)
            
            # Append next token and stop if end token is generated
            tokens.append(next_token)
            if next_token.item() == tokenizer.sep_token_id:  # End token
                break

    # Decode generated tokens
    caption = tokenizer.decode(torch.cat(tokens, dim=1)[0], skip_special_tokens=True)
    return caption

# Generate the caption
caption = generate_caption(caption_model, image_tensor, tokenizer)

# ===== 4. Display Image and Caption ===== #
plt.imshow(original_image)
plt.axis("off")
plt.title(f"Generated Caption: {caption}")
plt.show()
