In [None]:
from google.colab import drive
drive.mount('/content/MyDrive/')

!pip install --upgrade transformers
!pip install --upgrade --force-reinstall sympy

import os
import json
import torch
import random
import pandas as pd
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image, UnidentifiedImageError
from transformers import GPT2Tokenizer, GPT2LMHeadModel, get_scheduler
from tqdm import tqdm
from nltk.translate.bleu_score import corpus_bleu
from sklearn.model_selection import train_test_split

# Paths
image_folder = '/content/MyDrive/MyDrive/M.Sc Data Science/Sem 4/CV/flickr30k_images/flickr30k_images/flickr30k_images'
captions_json = '/content/MyDrive/MyDrive/M.Sc Data Science/Sem 4/CV/flickr30k_images/flickr_captions.json'
model_save_path = '/content/MyDrive/MyDrive/M.Sc Data Science/Sem 4/CV/best_full_caption_model'
os.makedirs(model_save_path, exist_ok=True)

# Hyperparameters
max_length = 30
batch_size = 16
num_epochs = 5
learning_rate = 5e-5
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load captions
with open(captions_json, 'r') as f:
   captions_data = json.load(f)

# Tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.pad_token = tokenizer.eos_token  # pad token set to eos token

# Dataset
class ImageCaptionDataset(Dataset):
   def __init__(self, image_folder, samples, tokenizer, transform):
       self.image_folder = image_folder
       self.samples = samples  # list of (img_name, caption)
       self.tokenizer = tokenizer
       self.transform = transform

   def __len__(self):
       return len(self.samples)

   def __getitem__(self, idx):
       img_name, caption = self.samples[idx]
       img_path = os.path.join(self.image_folder, img_name)

       try:
           image = Image.open(img_path).convert("RGB")
       except (OSError, UnidentifiedImageError):
           # fallback black image if unreadable
           image = Image.new('RGB', (224, 224), (0, 0, 0))

       image = self.transform(image)
       tokens = self.tokenizer(caption, return_tensors='pt', padding='max_length',
                               truncation=True, max_length=max_length)
       input_ids = tokens['input_ids'].squeeze(0)
       attention_mask = tokens['attention_mask'].squeeze(0)

       return image, input_ids, attention_mask, caption

# Transforms for images
transform = transforms.Compose([
   transforms.Resize((224, 224)),
   transforms.ToTensor(),
   transforms.Normalize([0.485, 0.456, 0.406],
                        [0.229, 0.224, 0.225])
])

# Prepare all samples (img_name, caption)
all_samples = [(img, cap) for img, caps in captions_data.items() for cap in caps]

# Train-validation split
train_samples, val_samples = train_test_split(all_samples, test_size=0.1, random_state=42)

# Create Datasets and DataLoaders
train_dataset = ImageCaptionDataset(image_folder, train_samples, tokenizer, transform)
val_dataset = ImageCaptionDataset(image_folder, val_samples, tokenizer, transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=18)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=18)

# Encoder: EfficientNet-B0 without classifier head
encoder_cnn = models.efficientnet_b0(pretrained=True)
encoder_cnn.classifier = nn.Identity()
encoder_cnn.to(device)
encoder_cnn.eval()  # frozen encoder

# Decoder: GPT-2 LM head model
decoder = GPT2LMHeadModel.from_pretrained('gpt2')
decoder.resize_token_embeddings(len(tokenizer))
decoder.to(device)

# Projector: project encoder features to GPT-2 embedding dimension
projector = nn.Linear(1280, decoder.config.n_embd).to(device)

# Optimizer and Scheduler
optimizer = torch.optim.AdamW(list(projector.parameters()) + list(decoder.parameters()), lr=learning_rate)
lr_scheduler = get_scheduler(
   "linear",
   optimizer=optimizer,
   num_warmup_steps=0,
   num_training_steps=len(train_loader) * num_epochs
)
loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

# Training loop
best_val_loss = float('inf')

for epoch in range(num_epochs):
   decoder.train()
   projector.train()
   total_train_loss = 0.0

   loop = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training")
   for images, input_ids, attention_mask, _ in loop:
       images, input_ids, attention_mask = images.to(device), input_ids.to(device), attention_mask.to(device)

       with torch.no_grad():
           image_features = encoder_cnn(images)

       image_embeddings = projector(image_features).unsqueeze(1)  # (batch, 1, embd_dim)

       # Get embeddings for captions
       caption_embeddings = decoder.transformer.wte(input_ids)  # (batch, seq_len, embd_dim)

       # Concatenate image embedding + caption embeddings along sequence dim
       decoder_inputs = torch.cat([image_embeddings, caption_embeddings], dim=1)

       # Labels: input_ids + pad at end (shifted right)
       labels = torch.cat([input_ids, torch.full((input_ids.shape[0], 1), tokenizer.pad_token_id, device=device)], dim=1)

       outputs = decoder(inputs_embeds=decoder_inputs)
       logits = outputs.logits[:, :-1, :].contiguous()  # align logits and labels

       loss = loss_fn(logits.view(-1, logits.size(-1)), labels[:, :-1].contiguous().view(-1))

       loss.backward()
       optimizer.step()
       lr_scheduler.step()
       optimizer.zero_grad()

       total_train_loss += loss.item()
       loop.set_postfix(loss=loss.item())

   avg_train_loss = total_train_loss / len(train_loader)

   # Validation
   decoder.eval()
   projector.eval()
   total_val_loss = 0.0
   with torch.no_grad():
       for images, input_ids, attention_mask, _ in val_loader:
           images, input_ids = images.to(device), input_ids.to(device)

           image_features = encoder_cnn(images)
           image_embeddings = projector(image_features).unsqueeze(1)

           caption_embeddings = decoder.transformer.wte(input_ids)

           decoder_inputs = torch.cat([image_embeddings, caption_embeddings], dim=1)

           labels = torch.cat([input_ids, torch.full((input_ids.shape[0], 1), tokenizer.pad_token_id, device=device)], dim=1)

           outputs = decoder(inputs_embeds=decoder_inputs)
           logits = outputs.logits[:, :-1, :].contiguous()

           val_loss = loss_fn(logits.view(-1, logits.size(-1)), labels[:, :-1].contiguous().view(-1))
           total_val_loss += val_loss.item()

   avg_val_loss = total_val_loss / len(val_loader)
   print(f"\nEpoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

   # Save best model
   if avg_val_loss < best_val_loss:
       best_val_loss = avg_val_loss
       torch.save({
           'projector_state_dict': projector.state_dict(),
           'decoder_state_dict': decoder.state_dict(),
           'tokenizer': tokenizer,
           'epoch': epoch + 1
       }, os.path.join(model_save_path, f'best_caption_model_epoch{epoch+1}.pt'))
       print("✅ Best model saved.\n")

   # Generate sample caption (beam search) from first val image
   sample_img, _, _, _ = val_dataset[0]
   sample_img = sample_img.unsqueeze(0).to(device)

   with torch.no_grad():
       image_features = encoder_cnn(sample_img)
       image_embedding = projector(image_features).unsqueeze(1)

   output = decoder.generate(
       inputs_embeds=torch.cat([image_embedding, decoder.transformer.wte(torch.full((1, 1), tokenizer.bos_token_id, dtype=torch.long).to(device))], dim=1),
       max_length=max_length,
       num_beams=5,
       early_stopping=True,
       bos_token_id=tokenizer.bos_token_id,
       eos_token_id=tokenizer.eos_token_id
   )
   caption = tokenizer.decode(output[0], skip_special_tokens=True)
   print("🖼️ Sample Generated Caption (Beam Search):", caption)

# Final BLEU evaluation on 100 validation images
decoder.eval()
y_true, y_pred = [], []

with torch.no_grad():
   for i in range(min(100, len(val_dataset))):
       image, _, _, ref_caption = val_dataset[i]
       image = image.unsqueeze(0).to(device)

       image_features = encoder_cnn(image)
       image_embedding = projector(image_features).unsqueeze(1)

       output = decoder.generate(
           inputs_embeds=torch.cat([image_embedding, decoder.transformer.wte(torch.full((1, 1), tokenizer.bos_token_id, dtype=torch.long).to(device))], dim=1),
           max_length=max_length,
           num_beams=5,
           early_stopping=True,
           bos_token_id=tokenizer.bos_token_id,
           eos_token_id=tokenizer.eos_token_id
       )
       pred_caption = tokenizer.decode(output[0], skip_special_tokens=True)

       y_pred.append(pred_caption.split())
       y_true.append([ref_caption.split()])

bleu = corpus_bleu(y_true, y_pred)
print(f"\n📊 Final BLEU Score (Beam Search): {bleu:.4f}")



Fine tuning

In [None]:
import os
import json
import torch
import random
import pandas as pd
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image, UnidentifiedImageError
from transformers import GPT2Tokenizer, GPT2LMHeadModel, get_scheduler
from tqdm import tqdm
from nltk.translate.bleu_score import corpus_bleu
from sklearn.model_selection import train_test_split


# --- Paths ---
image_folder = '/content/MyDrive/MyDrive/CV/Images/flickr30k_images'
captions_json = '/content/MyDrive/MyDrive/CV/flickr_captions.json'

# Path to load the previously trained best model
original_model_load_path = '/content/MyDrive/MyDrive/best_full_caption_model/best_caption_model_epoch4.pt'

# DIRECTLY assign the path to your pre-existing fine-tuned models folder
finetuned_model_save_folder = '/content/MyDrive/MyDrive/best_full_caption_model/fine_tuned_best_model/' # <--- Make sure this path exists in your Drive!
os.makedirs(finetuned_model_save_folder, exist_ok=True) # Ensure the folder exists (good practice even if you created it)


# Hyperparameters (Consider adjusting for fine-tuning)
max_length = 30
batch_size = 16
num_epochs_finetune = 5 # You can increase this to train for more epochs
learning_rate_finetune = 2e-5 # Often good to reduce learning rate for fine-tuning
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# --- Load captions, Tokenizer, Dataset, Dataloaders (Keep as is) ---
with open(captions_json, 'r') as f:
    captions_data = json.load(f)

tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.pad_token = tokenizer.eos_token

class ImageCaptionDataset(Dataset):
    def __init__(self, image_folder, samples, tokenizer, transform):
        self.image_folder = image_folder
        self.samples = samples
        self.tokenizer = tokenizer
        self.transform = transform

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_name, caption = self.samples[idx]
        img_path = os.path.join(self.image_folder, img_name)
        try:
            image = Image.open(img_path).convert("RGB")
        except (OSError, UnidentifiedImageError):
            image = Image.new('RGB', (224, 224), (0, 0, 0))
        image = self.transform(image)
        tokens = self.tokenizer(caption, return_tensors='pt', padding='max_length',
                                 truncation=True, max_length=max_length)
        input_ids = tokens['input_ids'].squeeze(0)
        attention_mask = tokens['attention_mask'].squeeze(0)
        return image, input_ids, attention_mask, caption

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
])

all_samples = [(img, cap) for img, caps in captions_data.items() for cap in caps]
train_samples, val_samples = train_test_split(all_samples, test_size=0.1, random_state=42)

train_dataset = ImageCaptionDataset(image_folder, train_samples, tokenizer, transform)
val_dataset = ImageCaptionDataset(image_folder, val_samples, tokenizer, transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=18)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=18)

# --- Encoder (Keep as is, frozen) ---
encoder_cnn = models.efficientnet_b0(pretrained=True)
encoder_cnn.classifier = nn.Identity()
encoder_cnn.to(device)
encoder_cnn.eval()

# --- Load pre-trained Decoder and Projector weights ---
decoder = GPT2LMHeadModel.from_pretrained('gpt2')
decoder.resize_token_embeddings(len(tokenizer))
projector = nn.Linear(1280, decoder.config.n_embd)

# Load the best performing checkpoint (Epoch 4 from your previous run)
print(f" Loading model from: {original_model_load_path}")
checkpoint = torch.load(original_model_load_path, map_location=device, weights_only=False)
decoder.load_state_dict(checkpoint['decoder_state_dict'])
projector.load_state_dict(checkpoint['projector_state_dict'])

decoder.to(device)
projector.to(device)

# --- Optimizer and Scheduler (Re-initialize for fine-tuning) ---
optimizer = torch.optim.AdamW(list(projector.parameters()) + list(decoder.parameters()), lr=learning_rate_finetune)
lr_scheduler = get_scheduler(
    "linear",
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=len(train_loader) * num_epochs_finetune
)
loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

# --- Evaluation Function ---
def evaluate_model(encoder, decoder, projector, data_loader, tokenizer, device, max_length):
    encoder.eval()
    decoder.eval()
    projector.eval()

    generated_captions = []
    reference_captions = []

    print("\nStarting evaluation...")
    with torch.no_grad():
        for images, _, _, original_captions in tqdm(data_loader, desc="Generating captions for evaluation"):
            images = images.to(device)

            image_features = encoder(images)
            image_embeddings = projector(image_features).unsqueeze(1)

            # Generate captions using beam search for better quality
            outputs = decoder.generate(
                inputs_embeds=torch.cat([image_embeddings, decoder.transformer.wte(torch.full((images.shape[0], 1), tokenizer.bos_token_id, dtype=torch.long).to(device))], dim=1),
                max_length=max_length,
                num_beams=5,
                early_stopping=True,
                bos_token_id=tokenizer.bos_token_id,
                eos_token_id=tokenizer.eos_token_id,
                attention_mask=torch.ones(image_embeddings.shape[0], image_embeddings.shape[1] + 1, dtype=torch.long).to(device)
            )

            decoded_captions = [tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
            generated_captions.extend(decoded_captions)
            reference_captions.extend([[cap] for cap in original_captions]) # corpus_bleu expects a list of lists for references

    # Calculate BLEU score
    # Ensure all references and hypotheses are tokenized
    tokenized_generated_captions = [cap.split() for cap in generated_captions]
    tokenized_reference_captions = [[cap.split()] for ref_list in reference_captions for cap in ref_list] # Flatten and tokenize

    bleu_score = corpus_bleu(tokenized_reference_captions, tokenized_generated_captions)
    return generated_captions, reference_captions, bleu_score

# --- Training loop ---
best_val_loss = float('inf')

for epoch in range(num_epochs_finetune):
    decoder.train()
    projector.train()
    total_train_loss = 0.0

    loop = tqdm(train_loader, desc=f"Fine-tune Epoch {epoch+1}/{num_epochs_finetune} - Training")
    for images, input_ids, attention_mask, _ in loop:
        images, input_ids, attention_mask = images.to(device), input_ids.to(device), attention_mask.to(device)

        with torch.no_grad():
            image_features = encoder_cnn(images)

        image_embeddings = projector(image_features).unsqueeze(1)

        caption_embeddings = decoder.transformer.wte(input_ids)

        decoder_inputs = torch.cat([image_embeddings, caption_embeddings], dim=1)

        labels = torch.cat([input_ids, torch.full((input_ids.shape[0], 1), tokenizer.pad_token_id, device=device)], dim=1)

        outputs = decoder(inputs_embeds=decoder_inputs)
        logits = outputs.logits[:, :-1, :].contiguous()

        loss = loss_fn(logits.view(-1, logits.size(-1)), labels[:, :-1].contiguous().view(-1))

        loss.backward()
        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()

        total_train_loss += loss.item()
        loop.set_postfix(loss=loss.item())

    avg_train_loss = total_train_loss / len(train_loader)

    # Validation
    decoder.eval()
    projector.eval()
    total_val_loss = 0.0
    with torch.no_grad():
        for images, input_ids, attention_mask, _ in val_loader:
            images, input_ids = images.to(device), input_ids.to(device)

            image_features = encoder_cnn(images)
            image_embeddings = projector(image_features).unsqueeze(1)

            caption_embeddings = decoder.transformer.wte(input_ids)

            decoder_inputs = torch.cat([image_embeddings, caption_embeddings], dim=1)

            labels = torch.cat([input_ids, torch.full((input_ids.shape[0], 1), tokenizer.pad_token_id, device=device)], dim=1)

            outputs = decoder(inputs_embeds=decoder_inputs)
            logits = outputs.logits[:, :-1, :].contiguous()

            val_loss = loss_fn(logits.view(-1, logits.size(-1)), labels[:, :-1].contiguous().view(-1))
            total_val_loss += val_loss.item()

    avg_val_loss = total_val_loss / len(val_loader)
    print(f"\nFine-tune Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

    # Save best model to the DIRECTLY assigned fine-tuned model folder
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        # Using the new path: finetuned_model_save_folder
        torch.save({
            'projector_state_dict': projector.state_dict(),
            'decoder_state_dict': decoder.state_dict(),
            'tokenizer': tokenizer,
            'epoch': checkpoint['epoch'] + epoch + 1,
            'best_val_loss': best_val_loss
        }, os.path.join(finetuned_model_save_folder, f'best_caption_model_finetuned_epoch{checkpoint["epoch"] + epoch + 1}.pt'))
        print(f" Best fine-tuned model saved to {finetuned_model_save_folder}.\n")

    # Generate sample caption (beam search) from first val image
    # Note: To avoid issues with random access for `val_dataset[0]` if `num_workers` > 0
    # it's safer to get a sample from the first batch of the DataLoader.
    # For a quick sample, let's keep it for now but be aware of potential issues in a highly parallel setup.
    sample_img_tensor, _, _, sample_original_caption = val_dataset[0] # Get image tensor and its original caption
    sample_img_tensor = sample_img_tensor.unsqueeze(0).to(device)

    with torch.no_grad():
        image_features = encoder_cnn(sample_img_tensor)
        image_embedding = projector(image_features).unsqueeze(1)

    output = decoder.generate(
        inputs_embeds=torch.cat([image_embedding, decoder.transformer.wte(torch.full((1, 1), tokenizer.bos_token_id, dtype=torch.long).to(device))], dim=1),
        max_length=max_length,
        num_beams=5,
        early_stopping=True,
        bos_token_id=tokenizer.bos_token_id,
        eos_token_id=tokenizer.eos_token_id,
        attention_mask=torch.ones(image_embedding.shape[0], image_embedding.shape[1] + 1, dtype=torch.long).to(device)
    )
    generated_sample_caption = tokenizer.decode(output[0], skip_special_tokens=True)
    print(f" Sample Image Original Caption: {sample_original_caption}")
    print(f" Sample Generated Caption (Beam Search) (Fine-tuned): {generated_sample_caption}\n")


# --- Final Evaluation after Fine-tuning ---
print("\n--- Running Final Evaluation ---")
all_generated_captions, all_reference_captions, final_bleu_score = evaluate_model(
    encoder_cnn, decoder, projector, val_loader, tokenizer, device, max_length
)

print(f"Final BLEU Score: {final_bleu_score * 100:.2f}%")

print("\n--- Fine-tuning Complete ---")
