In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
adityajn105_flickr30k_path = kagglehub.dataset_download('adityajn105/flickr30k')

print('Data source import complete.')


Data source import complete.


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import os, pickle, torch, torch.nn as nn
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from tqdm import tqdm



In [None]:
if torch.cuda.is_available():
    print("GPU is running and it's name is: ", torch.cuda.get_device_name(0))
else:
    print('CPU is running..')

CPU is running..


In [None]:
# now first understand the Assignment (understood the problem)


def find_image_dir():
    # Common Kaggle root
    base_input = '/kaggle/input'

    # Walk through the input directory to find where the images actually are
    for root, dirs, files in os.walk(base_input):
        # Look for the folder containing a high volume of jpg files
        if len([f for f in files if f.endswith('.jpg')]) > 1000:
            return root
    return None


# IMAGE_DIR = find_image_dir() # Original line, replaced to use correct path
IMAGE_DIR = adityajn105_flickr30k_path # Use the path provided by kagglehub
OUTPUT_FILE = 'flickr30k_features.pkl'

if IMAGE_DIR:
    print(f" Found images at: {IMAGE_DIR}")
else:
    raise FileNotFoundError(
        "Could not find the Flickr30k image directory. Please ensure the dataset is added to the notebook."
    )


# --- THE DATASET CLASS ---
class FlickrDataset(Dataset):
    def __init__(self, img_dir, transform):
        self.img_names = [
            f for f in os.listdir(img_dir)
            if f.endswith(('.jpg', '.jpeg'))
        ]
        self.transform = transform
        self.img_dir = img_dir

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        name = self.img_names[idx]
        img_path = os.path.join(self.img_dir, name)
        img = Image.open(img_path).convert('RGB')
        return self.transform(img), name


# --- REMAINDER OF THE PIPELINE (AS BEFORE) ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
model = nn.Sequential(*list(model.children())[:-1])  # Feature vector only
model = nn.DataParallel(model).to(device)
model.eval()

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        (0.485, 0.456, 0.406),
        (0.229, 0.224, 0.225)
    )
])

dataset = FlickrDataset(IMAGE_DIR, transform)
loader = DataLoader(dataset, batch_size=128, num_workers=4,)

features_dict = {}

with torch.no_grad():
    for imgs, names in tqdm(loader, desc="Extracting Features"):
        feats = model(imgs.to(device)).view(imgs.size(0), -1)
        for i, name in enumerate(names):
            features_dict[name] = feats[i].cpu().numpy()

with open(OUTPUT_FILE, 'wb') as f:
    pickle.dump(features_dict, f)

print(f"Success! {len(features_dict)} images processed and saved to {OUTPUT_FILE}")

 Found images at: /root/.cache/kagglehub/datasets/adityajn105/flickr30k/versions/1


Extracting Features: 0it [00:00, ?it/s]

Success! 0 images processed and saved to flickr30k_features.pkl





In [None]:
CAPTIONS_FILE = "/kaggle/input/flickr30k/captions.txt"

In [None]:
with open(CAPTIONS_FILE, 'r') as f:
    for i in range(5):
        print(f.readline())


FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/flickr30k/captions.txt'

In [None]:
import re
from collections import defaultdict

def clean_caption(caption):
    caption = caption.lower()
    caption = re.sub(r"[^a-z\s]", "", caption)
    caption = caption.strip()
    return f"<start> {caption} <end>"

# Dictionary: image_name -> list of captions
image_captions = defaultdict(list)

with open(CAPTIONS_FILE, 'r') as f:
    next(f)  # skip header line: image,caption
    for line in f:
        line = line.strip()
        if not line:
            continue

        image_name, caption = line.split(',', 1)  # split ONLY first comma
        caption = clean_caption(caption)
        image_captions[image_name].append(caption)


In [None]:
for image, captions in image_captions.items():
    print("first image : ",image,", and there captions: ")
    for c in captions:
        print(c)
    break

In [None]:
from collections import Counter

# Count all words in the dataset
word_counter = Counter()

for captions in image_captions.values():
    for caption in captions:
        words = caption.split()
        word_counter.update(words)


In [None]:
print(len(word_counter))

In [25]:
special_tokens = ["<pad>", "<start>", "<end>", "<unk>"]


In [26]:
word2idx = {} # making a dictionary where word is assigned a unique number
idx2word = {} # index to word

# Add special tokens first
for idx, token in enumerate(special_tokens):
    word2idx[token] = idx
    idx2word[idx] = token

# Add remaining words
idx = len(special_tokens)
for word in word_counter:
    if word not in word2idx:
        word2idx[word] = idx
        idx2word[idx] = word
        idx += 1


NameError: name 'word_counter' is not defined

In [None]:
vocab_size = len(word2idx)
print("Vocabulary size:", vocab_size)


In [None]:
print(idx2word[17]) # here

In [None]:
# Convert captions to sequences of numbers
image_caption_sequences = {}

for image, captions in image_captions.items():
    seq_list = []
    for caption in captions:
        seq = []
        for word in caption.split():
            seq.append(word2idx.get(word, word2idx["<unk>"]))
        seq_list.append(seq)
    image_caption_sequences[image] = seq_list


In [None]:
img = list(image_caption_sequences.keys())[0]
print("Image:", img)

print("Text caption:")
print(image_captions[img][0])

print("Sequence:")
print(image_caption_sequences[img][0])


In [None]:
import pickle

with open("flickr30k_features.pkl", "rb") as f:
    image_features = pickle.load(f)


In [None]:
with open("vocab.pkl", "wb") as f:
    pickle.dump({
        "word2idx": word2idx,
        "idx2word": idx2word
    }, f)


In [None]:
print(type(image_features))
print(len(image_features))


In [None]:
from sklearn.model_selection import train_test_split

all_images = list(image_caption_sequences.keys())

# First split: Train + Temp (Val + Test)
train_images, temp_images = train_test_split(
    all_images,
    test_size=0.3,      # 30% for val + test
    random_state=42
)

# Second split: Validation + Test
val_images, test_images = train_test_split(
    temp_images,
    test_size=2/3,      # 20% test, 10% val
    random_state=42
)

print(len(train_images), len(val_images), len(test_images))


In [None]:
print(type(image_features))
print(type(image_caption_sequences))
print(len(train_images))


In [None]:
class CustomDataset(Dataset):
    def __init__(self, image_features, caption_sequences, image_names):
        self.image_features = image_features
        self.caption_sequences = caption_sequences
        self.image_names = image_names

        # Build a flat list of (image, caption) pairs
        self.data = []
        for img in image_names:
            for seq in caption_sequences[img]:
                self.data.append((img, seq))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name, caption = self.data[idx]

        img_feat = torch.tensor(self.image_features[img_name], dtype=torch.float32)

        caption = torch.tensor(caption, dtype=torch.long)

        # Input and target captions
        input_caption = caption[:-1]
        target_caption = caption[1:]

        return img_feat, input_caption, target_caption


In [None]:
from torch.nn.utils.rnn import pad_sequence

PAD_IDX = word2idx["<pad>"]

def collate_fn(batch):
    img_feats, inputs, targets = zip(*batch)

    img_feats = torch.stack(img_feats)

    inputs = pad_sequence(inputs, batch_first=True, padding_value=PAD_IDX)
    targets = pad_sequence(targets, batch_first=True, padding_value=PAD_IDX)

    return img_feats, inputs, targets


In [None]:
from torch.utils.data import DataLoader

# Train dataset & loader
train_dataset = CustomDataset(
    image_features,
    image_caption_sequences,
    train_images
)

train_loader = DataLoader(
    train_dataset,
    batch_size=32,
    shuffle=True,          # shuffle ONLY for training
    collate_fn=collate_fn
)

# Validation dataset & loader
val_dataset = CustomDataset(
    image_features,
    image_caption_sequences,
    val_images
)

val_loader = DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=False,         # no shuffle for validation
    collate_fn=collate_fn
)

# Test dataset & loader
test_dataset = CustomDataset(
    image_features,
    image_caption_sequences,
    test_images
)

test_loader = DataLoader(
    test_dataset,
    batch_size=32,
    shuffle=False,         # no shuffle for test
    collate_fn=collate_fn
)


In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim=2048, hidden_dim=512):
        super().__init__()
        self.fc = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()

    def forward(self, image_features):
        # image_features: (batch_size, 2048)
        x = self.fc(image_features)
        x = self.relu(x)
        return x  # (batch_size, hidden_dim)


In [None]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim=256, hidden_dim=512,dropout=0.3):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.dropout = nn.Dropout(dropout)

        self.lstm = nn.LSTM(
            embed_dim,
            hidden_dim,
            batch_first=True
        )

        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, captions, encoder_hidden):
        """
        captions: (batch, seq_len)
        encoder_hidden: (batch, hidden_dim)
        """

        embeddings = self.embedding(captions)
        embeddings = self.dropout(embeddings)

        h0 = encoder_hidden.unsqueeze(0)   # (1, batch, hidden_dim)
        c0 = torch.zeros_like(h0)

        outputs, _ = self.lstm(embeddings, (h0, c0))
        outputs = self.dropout(outputs)

        outputs = self.fc(outputs)  # (batch, seq_len, vocab_size)
        return outputs


In [None]:
class ImageCaptioningModel(nn.Module):
    def __init__(self, vocab_size, embed_dim=256, hidden_dim=512):
        super().__init__()

        self.encoder = Encoder(2048, hidden_dim)
        self.decoder = Decoder(
            vocab_size=vocab_size,
            embed_dim=embed_dim,
            hidden_dim=hidden_dim
        )

    def forward(self, image_features, captions):
        enc_out = self.encoder(image_features)
        outputs = self.decoder(captions, enc_out)
        return outputs


In [None]:
def train_one_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0

    for img_feats, captions_in, captions_out in dataloader:
        img_feats = img_feats.to(device)
        captions_in = captions_in.to(device)
        captions_out = captions_out.to(device)

        optimizer.zero_grad()

        outputs = model(img_feats, captions_in)
        # outputs: (batch_size, seq_len, vocab_size)

        loss = criterion(
            outputs.reshape(-1, outputs.size(-1)),
            captions_out.reshape(-1)
        )

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5)
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)


In [None]:
def validate_one_epoch(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0


    with torch.no_grad():
        for img_feats, captions_in, captions_out in dataloader:
            img_feats = img_feats.to(device)
            captions_in = captions_in.to(device)
            captions_out = captions_out.to(device)

            outputs = model(img_feats, captions_in)

            loss = criterion(
                outputs.view(-1, outputs.size(-1)),
                captions_out.view(-1)
            )

            total_loss += loss.item()

    return total_loss / len(dataloader)


In [None]:
# 1 Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 2 Vocabulary size
vocab_size = len(word2idx)

# 3  Model
model = ImageCaptioningModel(
    vocab_size=vocab_size,
    embed_dim=256
    ,
    hidden_dim=512
)
model = model.to(device)

# 4  Loss function
criterion = nn.CrossEntropyLoss(ignore_index=word2idx["<pad>"])

# 5 Optimizer
optimizer = torch.optim.Adam(
    model.parameters(),
    lr=2e-4,
    weight_decay=1e-5   # L2 regularization
)



In [None]:
### from tqdm import tqdm
import copy

num_epochs = 25
patience = 3
min_delta = 0.001

best_val_loss = float("inf")
patience_counter = 0
best_model_state = None

train_losses = []
val_losses = []

for epoch in tqdm(range(num_epochs), desc="Training Epochs"):

    train_loss = train_one_epoch(
        model, train_loader, optimizer, criterion, device
    )

    val_loss = validate_one_epoch(
        model, val_loader, criterion, device
    )

    train_losses.append(train_loss)
    val_losses.append(val_loss)

    tqdm.write(
        f"Epoch [{epoch+1}/{num_epochs}] "
        f"Train Loss: {train_loss:.4f} | "
        f"Val Loss: {val_loss:.4f}"
    )

    # -------- Early Stopping Logic --------
    if best_val_loss - val_loss > min_delta:
        best_val_loss = val_loss
        patience_counter = 0
        best_model_state = copy.deepcopy(model.state_dict())
        torch.save(best_model_state, "best_model.pth")
    else:
        patience_counter += 1



    if patience_counter >= patience:
        tqdm.write("Early stopping triggered!")
        break

# Load best model
model.load_state_dict(best_model_state)


In [None]:
import matplotlib.pyplot as plt

epochs = range(1, len(train_losses) + 1)

plt.figure(figsize=(8, 5))
plt.plot(epochs, train_losses, label="Training Loss")
plt.plot(epochs, val_losses, label="Validation Loss")

plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Training vs Validation Loss")
plt.legend()
plt.grid(True)

plt.show()


In [None]:
def greedy_caption(
    model,
    image_feature,
    word2idx,
    idx2word,
    max_len=20,
    device="cpu"
):
    model.eval()

    # Add batch dimension
    image_feature = image_feature.unsqueeze(0).to(device)

    # Encode image â†’ initial hidden state
    with torch.no_grad():
        encoder_hidden = model.encoder(image_feature)

    # Start token
    current_word = torch.tensor([[word2idx["<start>"]]], device=device)

    caption = []
    hidden = None  # important for LSTM state passing

    for _ in range(max_len):

        with torch.no_grad():
            outputs, hidden = model.decoder(
                current_word,
                encoder_hidden,
                hidden
            )

        # Last timestep logits
        logits = outputs[:, -1, :]

        # Pick highest probability word
        predicted_idx = logits.argmax(dim=-1).item()
        predicted_word = idx2word[predicted_idx]

        # Stop if end token
        if predicted_word == "<end>":
            break

        caption.append(predicted_word)

        # Feed prediction back as next input
        current_word = torch.tensor([[predicted_idx]], device=device)

    return " ".join(caption)


In [None]:
import torch.nn.functional as F

def beam_search_caption(
    model,
    image_feature,
    word2idx,
    idx2word,
    beam_width=3,
    max_len=20,
    device="cpu"
):
    model.eval()

    image_feature = image_feature.unsqueeze(0).to(device)

    # Encode image once
    with torch.no_grad():
        encoder_hidden = model.encoder(image_feature)

    # Each beam: (sequence, score)
    sequences = [([word2idx["<start>"]], 0.0)]

    for _ in range(max_len):
        all_candidates = []

        for seq, score in sequences:

            # Stop expanding if <end> already generated
            if seq[-1] == word2idx["<end>"]:
                all_candidates.append((seq, score))
                continue

            caption_tensor = torch.tensor(
                [seq], dtype=torch.long, device=device
            )

            with torch.no_grad():
                outputs = model.decoder(caption_tensor, encoder_hidden)

            logits = outputs[:, -1, :]
            log_probs = F.log_softmax(logits, dim=-1)

            topk_log_probs, topk_idxs = log_probs.topk(beam_width)

            for i in range(beam_width):
                next_word = topk_idxs[0][i].item()
                next_score = score + topk_log_probs[0][i].item()

                # repetition penalty
                if next_word in seq:
                    next_score -= 1.5

                candidate = (seq + [next_word], next_score)
                all_candidates.append(candidate)

        sequences = sorted(
            all_candidates, key=lambda x: x[1], reverse=True
        )[:beam_width]

    best_seq = sequences[0][0]

    # Convert to words
    caption = []
    for idx in best_seq[1:]:
        word = idx2word[idx]
        if word == "<end>":
            break
        caption.append(word)

    return " ".join(caption)


In [None]:
import os
import matplotlib.pyplot as plt
from PIL import Image

# Your existing code
img_name = test_images[2020]
img_feat = torch.tensor(image_features[img_name], dtype=torch.float32)

caption = beam_search_caption(
    model,
    img_feat,
    word2idx,
    idx2word,
    beam_width=3,
    device=device
)

print("Image:", img_name)
print("Beam Search Caption:")
print(caption)

# -------- SHOW IMAGE --------
img_path = os.path.join(IMAGE_DIR, img_name)
image = Image.open(img_path).convert("RGB")

plt.figure(figsize=(6, 6))
plt.imshow(image)
plt.axis("off")
plt.title(caption, fontsize=12)
plt.show()


In [None]:
from nltk.translate.bleu_score import corpus_bleu
from tqdm import tqdm
import random
import torch

def evaluate_bleu(
    model,
    image_features,
    image_captions,
    test_images,
    word2idx,
    idx2word,
    device,
    beam_width=5,
    max_samples=1000
):
    references = []
    hypotheses = []

    model.eval()

    # Use subset for speed
    sample_images = random.sample(
        test_images, min(max_samples, len(test_images))
    )

    for img_name in tqdm(sample_images, desc="Evaluating BLEU-4"):
        img_feat = torch.tensor(
            image_features[img_name], dtype=torch.float32
        ).to(device)

        # Generate caption
        pred_caption = beam_search_caption(
            model,
            img_feat,
            word2idx,
            idx2word,
            beam_width=beam_width,
            device=device
        )

        pred_tokens = pred_caption.split()
        hypotheses.append(pred_tokens)

        # Ground truth captions (multiple references)
        gt_captions = image_captions[img_name]
        gt_tokens = [
            cap.replace("<start>", "").replace("<end>", "").split()
            for cap in gt_captions
        ]

        references.append(gt_tokens)

    bleu4 = corpus_bleu(
        references,
        hypotheses,
        weights=(0.25, 0.25, 0.25, 0.25)
    )

    return bleu4


In [None]:
bleu4 = evaluate_bleu(
    model,
    image_features,
    image_captions,
    test_images,
    word2idx,
    idx2word,
    device,
    beam_width=5
)

print(f"BLEU-4 Score: {bleu4:.4f}")


In [None]:
from collections import Counter

def precision_recall_f1(pred_tokens, gt_tokens):
    pred_counter = Counter(pred_tokens)
    gt_counter = Counter(gt_tokens)

    common = pred_counter & gt_counter
    true_positive = sum(common.values())

    precision = true_positive / max(len(pred_tokens), 1)
    recall = true_positive / max(len(gt_tokens), 1)

    if precision + recall == 0:
        f1 = 0.0
    else:
        f1 = 2 * precision * recall / (precision + recall)

    return precision, recall, f1


In [None]:
from tqdm import tqdm
import random
import torch

def evaluate_prf(
    model,
    image_features,
    image_captions,
    test_images,
    word2idx,
    idx2word,
    device,
    beam_width=5,
    max_samples=500
):
    precisions, recalls, f1s = [], [], []

    model.eval()

    sample_images = random.sample(
        test_images, min(max_samples, len(test_images))
    )

    for img_name in tqdm(sample_images, desc="Evaluating Precision/Recall/F1"):
        img_feat = torch.tensor(
            image_features[img_name], dtype=torch.float32
        ).to(device)

        # Generate caption
        pred_caption = beam_search_caption(
            model,
            img_feat,
            word2idx,
            idx2word,
            beam_width=beam_width,
            device=device
        )

        pred_tokens = pred_caption.split()

        # Use first ground-truth caption
        gt_caption = image_captions[img_name][0]
        gt_tokens = gt_caption.replace("<start>", "").replace("<end>", "").split()

        p, r, f = precision_recall_f1(pred_tokens, gt_tokens)

        precisions.append(p)
        recalls.append(r)
        f1s.append(f)

    return (
        sum(precisions) / len(precisions),
        sum(recalls) / len(recalls),
        sum(f1s) / len(f1s),
    )


In [None]:
precision, recall, f1 = evaluate_prf(
    model,
    image_features,
    image_captions,
    test_images,
    word2idx,
    idx2word,
    device
)

print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1-score:  {f1:.4f}")
