In [None]:
import os, re, random, math, pickle
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


In [None]:
import os, pickle, torch, torch.nn as nn
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from tqdm import tqdm


def find_image_dir():
    # Common Kaggle root
    base_input = '/kaggle/input'

    # Walk through the input directory to find where the images actually are
    for root, dirs, files in os.walk(base_input):
        # Look for the folder containing a high volume of jpg files
        if len([f for f in files if f.endswith('.jpg')]) > 1000:
            return root

    return None


IMAGE_DIR = find_image_dir()
OUTPUT_FILE = 'flickr30k_features.pkl'

if IMAGE_DIR:
    print(f" Found images at: {IMAGE_DIR}")
else:
    raise FileNotFoundError(
        "Could not find the Flickr30k image directory. Please ensure the dataset "
        "is added to the notebook."
    )


# --- THE DATASET CLASS ---
class FlickrDataset(Dataset):
    def __init__(self, img_dir, transform):
        self.img_names = [
            f for f in os.listdir(img_dir)
            if f.endswith(('.jpg', '.jpeg'))
        ]
        self.transform = transform
        self.img_dir = img_dir

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        name = self.img_names[idx]
        img_path = os.path.join(self.img_dir, name)
        img = Image.open(img_path).convert('RGB')
        return self.transform(img), name


# --- REMAINDER OF THE PIPELINE (AS BEFORE) ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
model = nn.Sequential(*list(model.children())[:-1])  # Feature vector only
model = nn.DataParallel(model).to(device)
model.eval()

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        (0.485, 0.456, 0.406),
        (0.229, 0.224, 0.225)
    )
])

dataset = FlickrDataset(IMAGE_DIR, transform)
loader = DataLoader(dataset, batch_size=128, num_workers=4)

features_dict = {}

with torch.no_grad():
    for imgs, names in tqdm(loader, desc="Extracting Features"):
        feats = model(imgs.to(device)).view(imgs.size(0), -1)
        for i, name in enumerate(names):
            features_dict[name] = feats[i].cpu().numpy()

with open(OUTPUT_FILE, 'wb') as f:
    pickle.dump(features_dict, f)

print(f"Success! {len(features_dict)} images processed and saved to {OUTPUT_FILE}")


**Find Dataset Files**

In [None]:
def find_file_in_kaggle_input(filename):
    base_input = "/kaggle/input"
    for root, dirs, files in os.walk(base_input):
        if filename in files:
            return os.path.join(root, filename)
    return None

def find_image_dir():
    base_input = "/kaggle/input"
    best_dir = None
    best_count = 0
    for root, dirs, files in os.walk(base_input):
        jpgs = [f for f in files if f.lower().endswith((".jpg", ".jpeg"))]
        if len(jpgs) > best_count:
            best_count = len(jpgs)
            best_dir = root
    return best_dir

CAPTIONS_PATH = find_file_in_kaggle_input("captions.txt")
IMAGE_DIR = find_image_dir()

print("CAPTIONS_PATH:", CAPTIONS_PATH)
print("IMAGE_DIR:", IMAGE_DIR)

assert CAPTIONS_PATH is not None, "captions.txt not found. Make sure Flickr30k dataset is added."
assert IMAGE_DIR is not None, "Image directory not found. Make sure Flickr30k dataset is added."


**Load Caption & Clean Text**

In [None]:
df = pd.read_csv(CAPTIONS_PATH)
print(df.head())
print(df.columns)
print("Rows:", len(df))


In [None]:
def clean_caption(text: str) -> str:
    text = str(text).lower()
    text = re.sub(r"[^a-z\s]", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

# Try to detect correct column names
# Common possibilities: ['image', 'caption'] or ['image_name', 'comment']
img_col = None
cap_col = None

for c in df.columns:
    if "image" in c.lower():
        img_col = c
    if "caption" in c.lower() or "comment" in c.lower() or "text" in c.lower():
        cap_col = c

assert img_col is not None and cap_col is not None, f"Could not detect image/caption columns from: {df.columns}"

df = df[[img_col, cap_col]].rename(columns={img_col: "image", cap_col: "caption"})
df["caption"] = df["caption"].apply(clean_caption)
df["caption"] = df["caption"].apply(lambda x: f"<start> {x} <end>")

print(df.head())


**Load Cached Features**

In [None]:
FEATURES_PATH = "flickr30k_features.pkl"
assert os.path.exists(FEATURES_PATH), "Run feature extraction cell first to create flickr30k_features.pkl"

with open(FEATURES_PATH, "rb") as f:
    features_dict = pickle.load(f)

print("Loaded features for images:", len(features_dict))


In [None]:
df = df[df["image"].isin(features_dict.keys())].reset_index(drop=True)
print("After filtering:", len(df))


**Build Vocabulary**

In [None]:
from collections import Counter

PAD_TOKEN = "<pad>"
UNK_TOKEN = "<unk>"

def build_vocab(captions, min_freq=3):
    counter = Counter()
    for cap in captions:
        counter.update(cap.split())
    
    vocab = [PAD_TOKEN, UNK_TOKEN, "<start>", "<end>"]
    for word, freq in counter.items():
        if freq >= min_freq and word not in vocab:
            vocab.append(word)
    return vocab

vocab = build_vocab(df["caption"].tolist(), min_freq=3)
word2idx = {w:i for i,w in enumerate(vocab)}
idx2word = {i:w for w,i in word2idx.items()}

pad_id = word2idx[PAD_TOKEN]
unk_id = word2idx[UNK_TOKEN]
start_id = word2idx["<start>"]
end_id = word2idx["<end>"]

print("Vocab size:", len(vocab))
print("pad_id:", pad_id, "start_id:", start_id, "end_id:", end_id)


**Convert captions to ids + pad/truncate**

In [None]:
def encode_caption(cap, word2idx):
    return [word2idx.get(w, word2idx[UNK_TOKEN]) for w in cap.split()]

encoded = [encode_caption(c, word2idx) for c in df["caption"].tolist()]
lengths = [len(x) for x in encoded]
max_len = int(np.percentile(lengths, 95))  # simple robust max
max_len = max(10, min(max_len, 40))        # keep it reasonable
print("Max_len used:", max_len)

def pad_or_trunc(seq, max_len, pad_id):
    if len(seq) < max_len:
        return seq + [pad_id]*(max_len-len(seq))
    return seq[:max_len]

df["cap_ids"] = [pad_or_trunc(x, max_len, pad_id) for x in encoded]


**Train/Val/Test Split**

In [None]:
all_images = df["image"].unique().tolist()
random.seed(42)
random.shuffle(all_images)

n = len(all_images)
train_imgs = set(all_images[:int(0.8*n)])
val_imgs   = set(all_images[int(0.8*n):int(0.9*n)])
test_imgs  = set(all_images[int(0.9*n):])

train_df = df[df["image"].isin(train_imgs)].reset_index(drop=True)
val_df   = df[df["image"].isin(val_imgs)].reset_index(drop=True)
test_df  = df[df["image"].isin(test_imgs)].reset_index(drop=True)

print(len(train_df), len(val_df), len(test_df))


**Dataset + Dataloader**

In [None]:
class CaptionDataset(Dataset):
    def __init__(self, data_df, features_dict):
        self.df = data_df
        self.features = features_dict
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_name = row["image"]
        feat = torch.tensor(self.features[img_name], dtype=torch.float32)  # [2048]
        
        cap = torch.tensor(row["cap_ids"], dtype=torch.long)              # [max_len]
        x = cap[:-1]  # input
        y = cap[1:]   # target
        return feat, x, y, img_name

batch_size = 128
train_loader = DataLoader(CaptionDataset(train_df, features_dict), batch_size=batch_size, shuffle=True, num_workers=2)
val_loader   = DataLoader(CaptionDataset(val_df, features_dict), batch_size=batch_size, shuffle=False, num_workers=2)
test_loader  = DataLoader(CaptionDataset(test_df, features_dict), batch_size=64, shuffle=False, num_workers=2)


**Seq2Seq Model (Encoder + Decoder)**

In [None]:
class Encoder(nn.Module):
    def __init__(self, in_dim=2048, hidden_size=512):
        super().__init__()
        self.fc = nn.Linear(in_dim, hidden_size)
        self.relu = nn.ReLU()
        
    def forward(self, feat):
        # feat: [B, 2048]
        h = self.relu(self.fc(feat))  # [B, hidden]
        return h

class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim=256, hidden_size=512, num_layers=1, dropout=0.1):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_id)
        self.lstm = nn.LSTM(embed_dim, hidden_size, num_layers=num_layers, batch_first=True, dropout=0.0)
        self.dropout = nn.Dropout(dropout)
        self.fc_out = nn.Linear(hidden_size, vocab_size)
        
    def forward(self, x_tokens, h0):
        """
        x_tokens: [B, T] (teacher forcing input)
        h0: [B, hidden]
        """
        emb = self.dropout(self.embed(x_tokens))  # [B, T, E]
        
        # LSTM initial state expects [num_layers, B, hidden]
        h0 = h0.unsqueeze(0)                 # [1, B, hidden]
        c0 = torch.zeros_like(h0)            # [1, B, hidden]
        
        out, _ = self.lstm(emb, (h0, c0))    # [B, T, hidden]
        logits = self.fc_out(out)            # [B, T, vocab]
        return logits

class Img2Caption(nn.Module):
    def __init__(self, vocab_size, in_dim=2048, hidden_size=512, embed_dim=256):
        super().__init__()
        self.encoder = Encoder(in_dim=in_dim, hidden_size=hidden_size)
        self.decoder = Decoder(vocab_size=vocab_size, embed_dim=embed_dim, hidden_size=hidden_size)
        
    def forward(self, feat, x_tokens):
        h0 = self.encoder(feat)
        logits = self.decoder(x_tokens, h0)
        return logits


**Training Setup**

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Img2Caption(vocab_size=len(vocab), hidden_size=512, embed_dim=256).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=pad_id)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


**Training + Validation Loop**

In [None]:
def run_epoch(model, loader, train=True):
    if train:
        model.train()
    else:
        model.eval()
        
    total_loss = 0.0
    total_tokens = 0
    
    for feat, x, y, _ in tqdm(loader, leave=False):
        feat = feat.to(device)
        x = x.to(device)        # [B, T]
        y = y.to(device)        # [B, T]
        
        if train:
            optimizer.zero_grad()
        
        with torch.set_grad_enabled(train):
            logits = model(feat, x)  # [B, T, V]
            loss = criterion(logits.reshape(-1, logits.size(-1)), y.reshape(-1))
            
            if train:
                loss.backward()
                nn.utils.clip_grad_norm_(model.parameters(), 5.0)
                optimizer.step()
        
        # count non-pad tokens for a fair avg (optional)
        non_pad = (y != pad_id).sum().item()
        total_loss += loss.item() * non_pad
        total_tokens += non_pad
    
    return total_loss / max(1, total_tokens)

epochs = 50  
train_losses, val_losses = [], []

for ep in range(1, epochs+1):
    tr = run_epoch(model, train_loader, train=True)
    va = run_epoch(model, val_loader, train=False)
    train_losses.append(tr)
    val_losses.append(va)
    print(f"Epoch {ep}/{epochs} | train loss: {tr:.4f} | val loss: {va:.4f}")


**Plot Loss Curves**

In [None]:
plt.figure(figsize=(7,4))
plt.plot(train_losses, label="train")
plt.plot(val_losses, label="val")
plt.title("Loss Curve")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)
plt.show()


**Token Decode**

In [None]:
def decode_tokens(token_ids):
    words = []
    for tid in token_ids:
        w = idx2word.get(int(tid), UNK_TOKEN)
        if w == "<end>":
            break
        if w not in ["<start>", "<pad>"]:
            words.append(w)
    return " ".join(words)


**Greedy Search**

In [None]:
@torch.no_grad()
def greedy_caption(model, feat_vec, max_len=30):
    model.eval()
    feat = torch.tensor(feat_vec, dtype=torch.float32).unsqueeze(0).to(device)  # [1,2048]
    h0 = model.encoder(feat)  # [1,hidden]
    
    cur = torch.tensor([[start_id]], dtype=torch.long).to(device)  # [1,1]
    out_tokens = []
    
    # we feed one token at a time
    h = h0.unsqueeze(0)  # [1,1,hidden] for LSTM hidden
    c = torch.zeros_like(h)
    
    for _ in range(max_len):
        emb = model.decoder.embed(cur)  # [1,1,E]
        lstm_out, (h, c) = model.decoder.lstm(emb, (h, c))  # [1,1,hidden]
        logits = model.decoder.fc_out(lstm_out.squeeze(1))  # [1,V]
        nxt = torch.argmax(logits, dim=-1).item()
        
        if nxt == end_id:
            break
        out_tokens.append(nxt)
        cur = torch.tensor([[nxt]], dtype=torch.long).to(device)
        
    return decode_tokens(out_tokens)


**Beam Search**

In [None]:
@torch.no_grad()
def beam_caption(model, feat_vec, beam_size=3, max_len=30):
    model.eval()
    feat = torch.tensor(feat_vec, dtype=torch.float32).unsqueeze(0).to(device)
    h0 = model.encoder(feat)  # [1,hidden]
    
    # each beam item: (tokens_list, logprob, h, c, last_token)
    h = h0.unsqueeze(0)
    c = torch.zeros_like(h)
    
    beams = [([], 0.0, h, c, start_id)]
    
    for _ in range(max_len):
        new_beams = []
        for tokens, score, h_i, c_i, last in beams:
            if last == end_id:
                new_beams.append((tokens, score, h_i, c_i, last))
                continue
            
            cur = torch.tensor([[last]], dtype=torch.long).to(device)
            emb = model.decoder.embed(cur)
            lstm_out, (h_new, c_new) = model.decoder.lstm(emb, (h_i, c_i))
            logits = model.decoder.fc_out(lstm_out.squeeze(1))  # [1,V]
            log_probs = torch.log_softmax(logits, dim=-1).squeeze(0)  # [V]
            
            topk = torch.topk(log_probs, beam_size)
            for lp, idx in zip(topk.values.tolist(), topk.indices.tolist()):
                new_tokens = tokens + [idx]
                new_score = score + lp
                new_beams.append((new_tokens, new_score, h_new, c_new, idx))
        
        # keep best beams
        new_beams.sort(key=lambda x: x[1], reverse=True)
        beams = new_beams[:beam_size]
        
        # if all ended
        if all(b[4] == end_id for b in beams):
            break
    
    best_tokens = beams[0][0]
    # remove last token if it is <end>
    if len(best_tokens) and best_tokens[-1] == end_id:
        best_tokens = best_tokens[:-1]
    return decode_tokens(best_tokens)


**Display 5 Random Test Images**

In [None]:
def show_random_examples(k=5):
    samples = test_df.sample(k, random_state=42)
    for _, row in samples.iterrows():
        img_name = row["image"]
        gt = row["caption"]
        feat = features_dict[img_name]
        
        pred_greedy = greedy_caption(model, feat, max_len=30)
        pred_beam   = beam_caption(model, feat, beam_size=3, max_len=30)
        
        img_path = os.path.join(IMAGE_DIR, img_name)
        img = Image.open(img_path).convert("RGB")
        
        plt.figure(figsize=(5,5))
        plt.imshow(img)
        plt.axis("off")
        plt.title(img_name)
        plt.show()
        
        print("GT:", gt)
        print("Greedy:", pred_greedy)
        print("Beam:", pred_beam)
        print("-"*80)

show_random_examples(5)


In [None]:
!pip -q install nltk

**BLEU-4 Score**

In [None]:
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

smooth = SmoothingFunction().method1

def bleu4_score(model, data_df, num_samples=2000, use_beam=True):
    df_eval = data_df.sample(min(num_samples, len(data_df)), random_state=123)
    scores = []
    for _, row in tqdm(df_eval.iterrows(), total=len(df_eval)):
        img = row["image"]
        feat = features_dict[img]
        
        pred = beam_caption(model, feat, beam_size=3) if use_beam else greedy_caption(model, feat)
        ref = row["caption"].replace("<start>","").replace("<end>","").strip()
        
        ref_tokens = [ref.split()]
        pred_tokens = pred.split()
        if len(pred_tokens) == 0:
            scores.append(0.0)
        else:
            scores.append(sentence_bleu(ref_tokens, pred_tokens, weights=(0.25,0.25,0.25,0.25), smoothing_function=smooth))
    return float(np.mean(scores))

bleu4 = bleu4_score(model, test_df, num_samples=2000, use_beam=True)
print("BLEU-4:", bleu4)


**Token-level Precision / Recall / F1**

In [None]:
def prf1_token_level(model, data_df, num_samples=2000, use_beam=True):
    df_eval = data_df.sample(min(num_samples, len(data_df)), random_state=1234)
    
    precisions, recalls, f1s = [], [], []
    for _, row in tqdm(df_eval.iterrows(), total=len(df_eval)):
        img = row["image"]
        feat = features_dict[img]
        
        pred = beam_caption(model, feat, beam_size=3) if use_beam else greedy_caption(model, feat)
        ref = row["caption"].replace("<start>","").replace("<end>","").strip()
        
        pred_tokens = [t for t in pred.split() if t]
        ref_tokens  = [t for t in ref.split() if t]
        
        pred_set = set(pred_tokens)
        ref_set  = set(ref_tokens)
        
        if len(pred_set) == 0:
            precisions.append(0.0)
            recalls.append(0.0)
            f1s.append(0.0)
            continue
        
        tp = len(pred_set & ref_set)
        fp = len(pred_set - ref_set)
        fn = len(ref_set - pred_set)
        
        p = tp / (tp + fp + 1e-9)
        r = tp / (tp + fn + 1e-9)
        f1 = 2*p*r / (p+r+1e-9)
        
        precisions.append(p)
        recalls.append(r)
        f1s.append(f1)
    
    return float(np.mean(precisions)), float(np.mean(recalls)), float(np.mean(f1s))

p, r, f1 = prf1_token_level(model, test_df, num_samples=2000, use_beam=True)
print("Token-level Precision:", p)
print("Token-level Recall:", r)
print("Token-level F1:", f1)


**METEOR Score**

In [None]:
import nltk
nltk.download("wordnet", quiet=True)
nltk.download("omw-1.4", quiet=True)

from nltk.translate.meteor_score import meteor_score

def meteor_eval(model, data_df, num_samples=2000, use_beam=True):
    df_eval = data_df.sample(min(num_samples, len(data_df)), random_state=2026)

    scores = []
    for _, row in tqdm(df_eval.iterrows(), total=len(df_eval)):
        img = row["image"]
        feat = features_dict[img]

        pred = beam_caption(model, feat, beam_size=3) if use_beam else greedy_caption(model, feat)
        ref  = row["caption"].replace("<start>","").replace("<end>","").strip()

        # meteor_score expects tokenized strings (list of references, and hypothesis)
        # We pass tokens as lists
        ref_tokens = ref.split()
        pred_tokens = pred.split()

        if len(pred_tokens) == 0 or len(ref_tokens) == 0:
            scores.append(0.0)
        else:
            scores.append(meteor_score([ref_tokens], pred_tokens))

    return float(np.mean(scores))

meteor = meteor_eval(model, test_df, num_samples=2000, use_beam=True)
print("METEOR:", meteor)


**ROUGE Scores  (ROUGE-1 / ROUGE-2 / ROUGE-L)**

In [None]:
!pip -q install rouge-score

In [None]:

from rouge_score import rouge_scorer

def rouge_eval(model, data_df, num_samples=2000, use_beam=True):
    df_eval = data_df.sample(min(num_samples, len(data_df)), random_state=2027)

    scorer = rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True)

    r1_p = []; r1_r = []; r1_f = []
    r2_p = []; r2_r = []; r2_f = []
    rl_p = []; rl_r = []; rl_f = []

    for _, row in tqdm(df_eval.iterrows(), total=len(df_eval)):
        img = row["image"]
        feat = features_dict[img]

        pred = beam_caption(model, feat, beam_size=3) if use_beam else greedy_caption(model, feat)
        ref  = row["caption"].replace("<start>","").replace("<end>","").strip()

        if len(pred.strip()) == 0 or len(ref.strip()) == 0:
            # add zeros if empty
            r1_p.append(0.0); r1_r.append(0.0); r1_f.append(0.0)
            r2_p.append(0.0); r2_r.append(0.0); r2_f.append(0.0)
            rl_p.append(0.0); rl_r.append(0.0); rl_f.append(0.0)
            continue

        scores = scorer.score(ref, pred)

        r1_p.append(scores["rouge1"].precision)
        r1_r.append(scores["rouge1"].recall)
        r1_f.append(scores["rouge1"].fmeasure)

        r2_p.append(scores["rouge2"].precision)
        r2_r.append(scores["rouge2"].recall)
        r2_f.append(scores["rouge2"].fmeasure)

        rl_p.append(scores["rougeL"].precision)
        rl_r.append(scores["rougeL"].recall)
        rl_f.append(scores["rougeL"].fmeasure)

    results = {
        "ROUGE-1": {"P": float(np.mean(r1_p)), "R": float(np.mean(r1_r)), "F1": float(np.mean(r1_f))},
        "ROUGE-2": {"P": float(np.mean(r2_p)), "R": float(np.mean(r2_r)), "F1": float(np.mean(r2_f))},
        "ROUGE-L": {"P": float(np.mean(rl_p)), "R": float(np.mean(rl_r)), "F1": float(np.mean(rl_f))},
    }
    return results

rouge_results = rouge_eval(model, test_df, num_samples=2000, use_beam=True)
rouge_results


In [None]:
for k, v in rouge_results.items():
    print(f"{k}: P={v['P']:.4f}, R={v['R']:.4f}, F1={v['F1']:.4f}")

**Save Model**

In [None]:
torch.save({
    "model_state": model.state_dict(),
    "word2idx": word2idx,
    "idx2word": idx2word,
    "max_len": max_len,
}, "img_caption_seq2seq.pth")

print("Saved: img_caption_seq2seq.pth")
