In [25]:
import nltk
import os
import zipfile

# Set up custom NLTK data dir
nltk_data_dir = "/kaggle/working/nltk_data"
corpora_path = os.path.join(nltk_data_dir, "corpora")
os.makedirs(corpora_path, exist_ok=True)
nltk.data.path.append(nltk_data_dir)

# Download
nltk.download("wordnet", download_dir=nltk_data_dir)
nltk.download("omw-1.4", download_dir=nltk_data_dir)

# Unzip (Kaggle downloads zip instead of extracting)
def unzip_corpus(zip_path, extract_to):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

unzip_corpus(f"{corpora_path}/wordnet.zip", corpora_path)
unzip_corpus(f"{corpora_path}/omw-1.4.zip", corpora_path)


[nltk_data] Downloading package wordnet to
[nltk_data]     /kaggle/working/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     /kaggle/working/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


In [26]:
import os
import random
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# Install dependencies (if not already installed)
!pip install nltk rouge-score transformers --quiet

import nltk
nltk.download('wordnet')
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from rouge_score import rouge_scorer

from transformers import ViTModel, AutoTokenizer, pipeline, GPT2Model
from transformers import AutoProcessor, AutoModelForVision2Seq
from transformers.image_utils import load_image

[nltk_data] Downloading package wordnet to /usr/share/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [None]:
'''
Samw as Part-A
'''
class CustomImageCaptionDataset(Dataset):
    def __init__(self, csv_path, images_folder, transform=None, tokenizer=None, max_length=30):
        self.df = pd.read_csv(csv_path)
        self.images_folder = images_folder
        self.transform = transform
        self.tokenizer = tokenizer
        self.max_length = max_length

        if 'filename' not in self.df.columns or 'caption' not in self.df.columns:
            raise ValueError("CSV must have 'filename' and 'caption' columns.")

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        filename = row['filename']
        caption = row['caption']
        image_path = os.path.join(self.images_folder, filename)
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        if self.tokenizer:
            tokens = self.tokenizer.encode(caption, truncation=True, max_length=self.max_length, padding='max_length')
            tokens = torch.tensor(tokens)
        else:
            tokens = caption

        return image, tokens, caption


In [None]:
'''
This function randomly blacks out a given percentage of 16×16 patches in an image. It works by:
Dividing the image into a grid of 16×16 patches
Randomly selecting patches to mask (based on the percentage input)
Setting selected patches to black (zero values)
Returning the partially occluded image
Useful for testing model robustness to missing visual data.
'''

def occlude_image(image: np.array, mask_percentage: int) -> np.array:
    img = image.copy()
    H, W, C = img.shape
    patch_size = 16
    num_patches_h = H // patch_size
    num_patches_w = W // patch_size

    total_patches = num_patches_h * num_patches_w
    num_to_mask = int(total_patches * (mask_percentage / 100.0))

    patch_indices = [(i, j) for i in range(num_patches_h) for j in range(num_patches_w)]
    random.shuffle(patch_indices)
    mask_indices = patch_indices[:num_to_mask]

    for (i, j) in mask_indices:
        y1 = i * patch_size
        y2 = y1 + patch_size
        x1 = j * patch_size
        x2 = x1 + patch_size
        img[y1:y2, x1:x2, :] = 0  # black out patch

    return img

In [None]:
'''
This cell evaluates the model on test set 
'''

device = 'cuda' if torch.cuda.is_available() else 'cpu'

def evaluate_on_occluded_images(model, dataloader, device, occlusion_levels, tokenizer):
    model.to(device)
    model.eval()

    results = {}
    details_list = []  # To store per-sample details for analysis

    for level in occlusion_levels:
        print(f"Evaluating occlusion level: {level}")
        bleu_scores = []
        meteor_scores_ = []
        rouge_l_scores = []
        scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)

        with torch.no_grad():
            batch_count = 0
            for images, captions, raw_captions in dataloader:
                batch_count += 1

                # Occlude each image in the batch and collect occluded images
                occluded_images = []
                for img_tensor in images:
                    # 1) unnormalize
                    np_img = img_tensor.cpu().numpy()
                    np_img = np.transpose(np_img, (1, 2, 0))
                    np_img = (np_img * 0.5) + 0.5
                    np_img = np.clip(np_img, 0, 1)
                    np_img = (np_img * 255).astype(np.uint8)

                    # 2) occlude
                    occluded_np = occlude_image(np_img, level)

                    # 3) re-normalize
                    occluded_np = occluded_np.astype(np.float32) / 255.0
                    occluded_np = np.clip(occluded_np, 0, 1)
                    occluded_np = (occluded_np - 0.5) / 0.5
                    occluded_tensor = torch.from_numpy(np.transpose(occluded_np, (2, 0, 1)))

                    occluded_images.append(occluded_tensor)

                occluded_batch = torch.stack(occluded_images).to(device)
                captions = captions.to(device)

                # Generate predictions (greedy decoding)
                batch_size = occluded_batch.size(0)

                start_token = tokenizer.eos_token_id
                generated = torch.full((batch_size, 1), start_token, dtype=torch.long).to(device)

                for _ in range(model.max_seq_length - 1):
                    outputs = model(occluded_batch, generated)
                    next_token = outputs[:, -1, :].argmax(dim=-1).unsqueeze(1)
                    generated = torch.cat([generated, next_token], dim=1)

                # Compute metrics and save details for analysis
                for i in range(batch_size):
                    pred_caption = tokenizer.decode(generated[i], skip_special_tokens=True)
                    ref_caption = raw_captions[i]
                    smooth = SmoothingFunction().method1
                    bleu = sentence_bleu([ref_caption.split()], pred_caption.split(), smoothing_function=smooth)
                    m_score = meteor_score([ref_caption.split()], pred_caption.split())
                    r_score = scorer.score(ref_caption, pred_caption)['rougeL'].fmeasure

                    bleu_scores.append(bleu)
                    meteor_scores_.append(m_score)
                    rouge_l_scores.append(r_score)
                    details_list.append({
                        "occlusion_level": level,
                        "original_caption": ref_caption,
                        "generated_caption": pred_caption,
                        "BLEU": bleu,
                        "METEOR": m_score,
                        "ROUGE-L": r_score,
                        "occluded_image": occluded_images[i]  # store corresponding occluded image tensor
                    })

                print(f"  Processed batch {batch_count}/{len(dataloader)} at occlusion level {level}")

        results[level] = {
            "BLEU": np.mean(bleu_scores),
            "METEOR": np.mean(meteor_scores_),
            "ROUGE-L": np.mean(rouge_l_scores)
        }

        print(f"Completed evaluation for occlusion level {level}\n")

    results["details"] = details_list
    return results


In [None]:
'''
Same as Part-A
'''
class ImageCaptionModel(nn.Module):
    def __init__(self,
                 encoder_model_name="google/vit-base-patch16-224",
                 decoder_model_name="gpt2",
                 max_seq_length=50):
        super(ImageCaptionModel, self).__init__()
        self.encoder = ViTModel.from_pretrained(encoder_model_name)
        encoder_hidden_dim = self.encoder.config.hidden_size
        self.decoder = GPT2Model.from_pretrained(decoder_model_name)
        decoder_embed_dim = self.decoder.config.n_embd
        self.encoder_proj = nn.Linear(encoder_hidden_dim, decoder_embed_dim)
        self.vocab_size = self.decoder.config.vocab_size
        self.fc_out = nn.Linear(decoder_embed_dim, self.vocab_size)

        self.max_seq_length = max_seq_length

    def forward(self, images, captions):
        batch_size = images.size(0)
        encoder_outputs = self.encoder(images).last_hidden_state
        memory = self.encoder_proj(encoder_outputs)
        num_image_tokens = memory.size(1)
        caption_embeddings = self.decoder.wte(captions)
        inputs_embeds = torch.cat([memory, caption_embeddings], dim=1)
        seq_length = captions.size(1)
        position_ids = torch.arange(0, num_image_tokens + seq_length, dtype=torch.long, device=captions.device).unsqueeze(0).repeat(batch_size, 1)
        decoder_outputs = self.decoder(inputs_embeds=inputs_embeds, position_ids=position_ids).last_hidden_state
        caption_outputs = decoder_outputs[:, num_image_tokens:, :]
        logits = self.fc_out(caption_outputs)

        return logits

In [None]:
'''
Same as Part-A
'''
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def zero_shot_captioning(image_path: str,
                         model_name: str = "HuggingFaceTB/SmolVLM-256M-Instruct",
                         token: str = None) -> str:
    processor = AutoProcessor.from_pretrained(model_name, token=token)
    model = AutoModelForVision2Seq.from_pretrained(model_name,torch_dtype=torch.bfloat16,_attn_implementation="eager",token=token).to(DEVICE)
    image = Image.open(image_path).convert("RGB")

    # Define the prompt using the chat template
    messages = [
        {"role": "user",
         "content": [
              {"type": "image"},
              {"type": "text", "text": "Describe this image."}
             ]
        }
    ]
    text = processor.apply_chat_template(messages, add_generation_prompt=True)
    inputs = processor(text=[text.strip()], images=[image], return_tensors="pt", padding=True).to(DEVICE)

    # Generate caption
    generated_ids = model.generate(**inputs, max_new_tokens=50)
    generated_ids = generated_ids[:, inputs['input_ids'].shape[1]:]
    caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
    return caption

In [None]:
'''
This cell evaluates the model on test set using SMOLvlm model 
'''

def evaluate_zero_shot_occluded(test_dataset, test_folder,
                                model_name: str = "HuggingFaceTB/SmolVLM-256M-Instruct",
                                token: str = None,
                                occlusion_levels=[0, 10, 50, 80]) -> dict:
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load processor and model once.
    print(f"Loading zero-shot SmolVLM model: {model_name} with attn_implementation='eager' ...")
    processor = AutoProcessor.from_pretrained(model_name, token=token)
    model = AutoModelForVision2Seq.from_pretrained(
        model_name,
        torch_dtype=torch.bfloat16,
        _attn_implementation="eager",
        token=token
    ).to(DEVICE)
    model.eval()
    print("Model loaded.\n")

    results = {}
    details_list = []
    scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
    num_samples = len(test_dataset)

    for level in occlusion_levels:
        print(f"Evaluating occlusion level: {level}%")
        bleu_scores = []
        meteor_scores = []
        rouge_l_scores = []

        for idx in range(num_samples):
            # Retrieve sample info from the dataset
            filename = test_dataset.df.iloc[idx]['filename']
            ref_caption = test_dataset.df.iloc[idx]['caption']
            image_path = os.path.join(test_folder, filename)

            # Load image and apply occlusion
            image = Image.open(image_path).convert("RGB")
            image_np = np.array(image)
            occluded_np = occlude_image(image_np, level)
            occluded_image = Image.fromarray(occluded_np)

            # Generate caption using zero-shot approach with chat prompt
            messages = [
                {"role": "user",
                 "content": [
                      {"type": "image"},
                      {"type": "text", "text": "Describe this image."}
                     ]
                }
            ]
            text = processor.apply_chat_template(messages, add_generation_prompt=True)
            inputs = processor(text=[text.strip()], images=[occluded_image], return_tensors="pt", padding=True).to(DEVICE)

            # Generate caption
            generated_ids = model.generate(**inputs, max_new_tokens=50)
            generated_ids = generated_ids[:, inputs['input_ids'].shape[1]:]
            generated_caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
            # Compute metrics (tokenized by whitespace)
            smooth = SmoothingFunction().method1
            bleu = sentence_bleu([ref_caption.split()], generated_caption.split(), smoothing_function=smooth)
            meteor = meteor_score([ref_caption.split()], generated_caption.split())
            r_score = scorer.score(ref_caption, generated_caption)['rougeL'].fmeasure

            bleu_scores.append(bleu)
            meteor_scores.append(meteor)
            rouge_l_scores.append(r_score)

            details_list.append({
                "occlusion_level": level,
                "filename": filename,
                "original_caption": ref_caption,
                "generated_caption": generated_caption,
                "BLEU": bleu,
                "METEOR": meteor,
                "ROUGE-L": r_score
            })

            print(f"[{idx+1}/{num_samples}] Occlusion {level}% => BLEU: {bleu:.4f}, "
                  f"METEOR: {meteor:.4f}, ROUGE-L: {r_score:.4f}")

        avg_bleu = np.mean(bleu_scores)
        avg_meteor = np.mean(meteor_scores)
        avg_rouge = np.mean(rouge_l_scores)
        results[level] = {"BLEU": avg_bleu, "METEOR": avg_meteor, "ROUGE-L": avg_rouge}
        print(f"Completed evaluation for occlusion level {level}%")
        print(f"Average Scores -> BLEU: {avg_bleu:.4f}, METEOR: {avg_meteor:.4f}, ROUGE-L: {avg_rouge:.4f}\n")

    results["details"] = details_list
    return results

In [None]:
'''
main function from which all functions are called 
'''
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Using device:", device)

# Define dataset paths (adjust if necessary)
dataset_root = "custom_captions_dataset"
test_csv = os.path.join(dataset_root, "test.csv")
test_folder = os.path.join(dataset_root, "test")

# Define transforms (must match training preprocessing)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])

# Initialize tokenizer (using GPT-2 tokenizer)
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token  # set pad token

test_dataset = CustomImageCaptionDataset(csv_path=test_csv, images_folder=test_folder,
                                           transform=transform, tokenizer=tokenizer, max_length=30)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

model = ImageCaptionModel()
model.to(device)

# Define optimizer and loss (for completeness; not used here since model is pre-trained)
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

weights_path = "model_weights.pt"

# Check if saved weights exist
if os.path.exists(weights_path):
    model.load_state_dict(torch.load(weights_path, map_location=device))
    print(f"Loaded model weights from {weights_path}")
else:
    print("No saved weights found. Training from scratch.")

Using device: cuda


Some weights of the model checkpoint at google/vit-base-patch16-224 were not used when initializing ViTModel: ['classifier.weight', 'classifier.bias']
- This IS expected if you are initializing ViTModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing ViTModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Loaded model weights from model_weights.pt


In [None]:
## Custom starts here
# Evaluate on occluded images including baseline (0% occlusion)
occlusion_levels = [0, 10, 50, 80]
occlusion_results = evaluate_on_occluded_images(model, test_loader, device, occlusion_levels, tokenizer)

baseline = occlusion_results[0]
print("Baseline (no occlusion) metrics:", baseline)

# Compute changes relative to baseline for occlusion levels 10, 50, and 80
changes = {}
for lvl in [10, 50, 80]:
    changes[lvl] = {
        "BLEU_change": occlusion_results[lvl]["BLEU"] - baseline["BLEU"],
        "METEOR_change": occlusion_results[lvl]["METEOR"] - baseline["METEOR"],
        "ROUGE-L_change": occlusion_results[lvl]["ROUGE-L"] - baseline["ROUGE-L"]
    }

print("\nOcclusion Evaluation Metrics and Changes:")
for lvl in [10, 50, 80]:
    print(f"Occlusion {lvl}% => Metrics: {occlusion_results[lvl]}, Changes: {changes[lvl]}")

Evaluating occlusion level: 0
  Processed batch 1/58 at occlusion level 0
  Processed batch 2/58 at occlusion level 0
  Processed batch 3/58 at occlusion level 0
  Processed batch 4/58 at occlusion level 0
  Processed batch 5/58 at occlusion level 0
  Processed batch 6/58 at occlusion level 0
  Processed batch 7/58 at occlusion level 0
  Processed batch 8/58 at occlusion level 0
  Processed batch 9/58 at occlusion level 0
  Processed batch 10/58 at occlusion level 0
  Processed batch 11/58 at occlusion level 0
  Processed batch 12/58 at occlusion level 0
  Processed batch 13/58 at occlusion level 0
  Processed batch 14/58 at occlusion level 0
  Processed batch 15/58 at occlusion level 0
  Processed batch 16/58 at occlusion level 0
  Processed batch 17/58 at occlusion level 0
  Processed batch 18/58 at occlusion level 0
  Processed batch 19/58 at occlusion level 0
  Processed batch 20/58 at occlusion level 0
  Processed batch 21/58 at occlusion level 0
  Processed batch 22/58 at occlusi

  Processed batch 2/58 at occlusion level 80
  Processed batch 3/58 at occlusion level 80
  Processed batch 4/58 at occlusion level 80
  Processed batch 5/58 at occlusion level 80
  Processed batch 6/58 at occlusion level 80
  Processed batch 7/58 at occlusion level 80
  Processed batch 8/58 at occlusion level 80
  Processed batch 9/58 at occlusion level 80
  Processed batch 10/58 at occlusion level 80
  Processed batch 11/58 at occlusion level 80
  Processed batch 12/58 at occlusion level 80
  Processed batch 13/58 at occlusion level 80
  Processed batch 14/58 at occlusion level 80
  Processed batch 15/58 at occlusion level 80
  Processed batch 16/58 at occlusion level 80
  Processed batch 17/58 at occlusion level 80
  Processed batch 18/58 at occlusion level 80
  Processed batch 19/58 at occlusion level 80
  Processed batch 20/58 at occlusion level 80
  Processed batch 21/58 at occlusion level 80
  Processed batch 22/58 at occlusion level 80
  Processed batch 23/58 at occlusion level

In [None]:
keys = ["occlusion_level", "original_caption", "generated_caption"]
with open("occlusion_details_custom.csv", "w", newline="") as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=keys)
    writer.writeheader()
    details = occlusion_results["details"]
    for detail in details:
        writer.writerow({k: detail[k] for k in keys})
print("Saved occlusion details to occlusion_details_custom.csv")

Saved occlusion details to occlusion_details_custom.csv


In [38]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Define dataset paths (adjust if necessary)
dataset_root = "/kaggle/input/captioning-dataset/custom_captions_dataset"
test_csv = os.path.join(dataset_root, "test.csv")
test_folder = os.path.join(dataset_root, "test")

# Define transforms (must match training preprocessing)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])

# Initialize tokenizer (using GPT-2 tokenizer)
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token  # set pad token

test_dataset = CustomImageCaptionDataset(csv_path=test_csv, images_folder=test_folder,
                                           transform=transform, tokenizer=tokenizer, max_length=30)

## SmolVLM starts here
occlusion_levels = [0, 10, 50, 80]
test_metrics_zero_shot = evaluate_zero_shot_occluded(
    test_dataset, test_folder,
    model_name="HuggingFaceTB/SmolVLM-256M-Instruct",
    token=None,
    occlusion_levels=occlusion_levels
)


baseline = test_metrics_zero_shot[0]
print("Baseline (no occlusion) metrics:", baseline)

changes = {}
for lvl in [10, 50, 80]:
    changes[lvl] = {
        "BLEU_change": test_metrics_zero_shot[lvl]["BLEU"] - baseline["BLEU"],
        "METEOR_change": test_metrics_zero_shot[lvl]["METEOR"] - baseline["METEOR"],
        "ROUGE-L_change": test_metrics_zero_shot[lvl]["ROUGE-L"] - baseline["ROUGE-L"]
    }

print("\nOcclusion Evaluation Metrics and Changes:")
for lvl in [10, 50, 80]:
    print(f"Occlusion {lvl}% => Metrics: {test_metrics_zero_shot[lvl]}, Changes: {changes[lvl]}")

Loading zero-shot SmolVLM model: HuggingFaceTB/SmolVLM-256M-Instruct with attn_implementation='eager' ...


Some kwargs in processor config are unused and will not have any effect: image_seq_len. 


Model loaded.

Evaluating occlusion level: 0%
[1/928] Occlusion 0% => BLEU: 0.0129, METEOR: 0.2375, ROUGE-L: 0.2469
[2/928] Occlusion 0% => BLEU: 0.1299, METEOR: 0.3769, ROUGE-L: 0.3421
[3/928] Occlusion 0% => BLEU: 0.0146, METEOR: 0.2133, ROUGE-L: 0.2759
[4/928] Occlusion 0% => BLEU: 0.0169, METEOR: 0.3063, ROUGE-L: 0.2989
[5/928] Occlusion 0% => BLEU: 0.0172, METEOR: 0.2311, ROUGE-L: 0.2791
[6/928] Occlusion 0% => BLEU: 0.0149, METEOR: 0.2782, ROUGE-L: 0.2162
[7/928] Occlusion 0% => BLEU: 0.0494, METEOR: 0.2601, ROUGE-L: 0.3023
[8/928] Occlusion 0% => BLEU: 0.0321, METEOR: 0.2247, ROUGE-L: 0.2340
[9/928] Occlusion 0% => BLEU: 0.0000, METEOR: 0.1532, ROUGE-L: 0.1687
[10/928] Occlusion 0% => BLEU: 0.0280, METEOR: 0.1509, ROUGE-L: 0.3000
[11/928] Occlusion 0% => BLEU: 0.0890, METEOR: 0.4194, ROUGE-L: 0.3784
[12/928] Occlusion 0% => BLEU: 0.0170, METEOR: 0.2102, ROUGE-L: 0.2000
[13/928] Occlusion 0% => BLEU: 0.0146, METEOR: 0.2059, ROUGE-L: 0.3059
[14/928] Occlusion 0% => BLEU: 0.0161, M

In [39]:
# Save per-sample details to a CSV file.
import csv
keys = ["occlusion_level", "original_caption", "generated_caption"]
with open("/kaggle/working/occlusion_details_SmolVLM.csv", "w", newline="",  encoding="utf-8") as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=keys)
    writer.writeheader()
    details = test_metrics_zero_shot["details"]
    for detail in details:
        writer.writerow({k: detail[k] for k in keys})
print("Saved occlusion details to occlusion_details_SmolVLM.csv")

Saved occlusion details to occlusion_details_SmolVLM.csv
