In [1]:
# 1) Install & import the libraries
!pip install --quiet PyDrive kaggle

In [2]:
import shutil
import os
# 2) Point the Kaggle client at your kaggle.json
os.environ["KAGGLE_CONFIG_DIR"] = "/kaggle/input/kaggle"

# 3) Copy your OAuth client file where PyDrive expects it
shutil.copy("/kaggle/input/clientsecretjson/client_secrets.json", "client_secrets.json")


FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/clientsecretjson/client_secrets.json'

In [None]:
# # 2. Copy your dictionary file there
# shutil.copy(
#     "/kaggle/input/records/checkpoint.pt",
#     os.path.expanduser("/kaggle/working/")
# )

# # 2. Copy your history file there
# shutil.copy(
#     "/kaggle/input/records/history.pkl",
#     os.path.expanduser("/kaggle/working/")
# )

In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from kaggle.api.kaggle_api_extended import KaggleApi

In [None]:
# 4) PyDrive silent auth setup
gauth = GoogleAuth()
# Load the client_secrets.json you just copied
gauth.LoadClientConfigFile("client_secrets.json")

In [None]:
# Try to load saved credentials
gauth.LoadCredentialsFile("mycreds.txt")

if not gauth.credentials:
    # This will print a URL—open it in your local browser,
    # copy the verification code, and paste it back here.
    gauth.CommandLineAuth()
    gauth.SaveCredentialsFile("mycreds.txt")

drive = GoogleDrive(gauth)
print("✅ Google Drive authentication successful!")

In [None]:
# 1. Create the default config directory
os.makedirs(os.path.expanduser("~/.config/kaggle"), exist_ok=True)

In [None]:

# 2. Copy your file there
shutil.copy(
    "/kaggle/input/kaggle/kaggle.json",
    os.path.expanduser("~/.config/kaggle/kaggle.json")
)

In [None]:
# 3. (Optional) tighten permissions so Kaggle API is happy
os.chmod(os.path.expanduser("~/.config/kaggle/kaggle.json"), 0o600)

In [None]:

# Now authenticate
from kaggle.api.kaggle_api_extended import KaggleApi
api = KaggleApi()
api.authenticate()
print("✅ Kaggle API authentication successful!")

In [None]:
import glob
import os
import requests
import json

# ── Configuration ───────────────────────────────────────────────────────────
DRIVE_FOLDER_ID       = "170M9LmNiuVUYMQ76x_GPAatI_2TA7bh-"
# IMAGE ZIP (unchanged)
EXPECTED_ZIP_NAME_IMG = "Flickr8k_Dataset.zip"
GITHUB_IMG_URL        = (
    "https://github.com/jbrownlee/Datasets/releases/download/"
    "Flickr8k/Flickr8k_Dataset.zip"
)
# NEW: CAPTIONS ZIP
EXPECTED_ZIP_NAME_CAP = "Flickr8k_text.zip"
GITHUB_CAP_URL        = (
    "https://github.com/jbrownlee/Datasets/releases/download/"
    "Flickr8k/Flickr8k_text.zip"
)

In [None]:
def ensure_and_upload(zip_name, download_url):
    # 1) check Drive
    files = drive.ListFile({'q': (
        f"title='{zip_name}' and trashed=false "
        f"and '{DRIVE_FOLDER_ID}' in parents"
    )}).GetList()
    if files:
        print(f"✅ '{zip_name}' already in Drive (ID: {files[0]['id']}); skipping.")
        return

    # 2) download locally if missing
    if not os.path.exists(zip_name):
        print(f"⬇️ Downloading {zip_name} from GitHub...")
        with requests.get(download_url, stream=True) as r:
            r.raise_for_status()
            with open(zip_name, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
        print(f"✅ Downloaded {zip_name}!")

    # 3) upload to Drive using PyDrive
    print(f"⬆️ Uploading {zip_name} to Google Drive…")
    gfile = drive.CreateFile({
        'title': zip_name,
        'parents': [{'id': DRIVE_FOLDER_ID}]
    })
    gfile.SetContentFile(zip_name)
    gfile.Upload()  # <-- let PyDrive handle multipart for you
    print(f"✅ Uploaded '{zip_name}' (File ID: {gfile['id']}).")


In [None]:
ensure_and_upload(EXPECTED_ZIP_NAME_IMG, GITHUB_IMG_URL)
ensure_and_upload(EXPECTED_ZIP_NAME_CAP, GITHUB_CAP_URL)


In [None]:
import os
import zipfile

def fetch_and_extract_from_drive(drive, folder_id, zip_name, extract_dir):
    """
    Given an authenticated PyDrive `drive` object, a Drive folder ID, the name of a ZIP
    in that folder, and a local directory name:
      1) Checks if zip_name exists in the specified Drive folder
      2) Downloads it if not already present locally
      3) Extracts it into extract_dir if that directory doesn’t already exist

    Raises FileNotFoundError if the ZIP isn’t found in Drive.
    """
    # 1) Locate in Drive
    q = (
        f"'{folder_id}' in parents "
        f"and title = '{zip_name}' "
        "and trashed=false"
    )
    files = drive.ListFile({'q': q}).GetList()
    if not files:
        raise FileNotFoundError(f"No '{zip_name}' in Drive folder {folder_id}")
    file_id = files[0]['id']
    print(f"✅ Found '{zip_name}' in Drive (ID: {file_id})")

    # 2) Download ZIP if missing
    if not os.path.exists(zip_name):
        print(f"⬇️ Downloading '{zip_name}' locally...")
        gfile = drive.CreateFile({'id': file_id})
        gfile.GetContentFile(zip_name)
        print("✅ Download complete!")
    else:
        print(f"ℹ️ '{zip_name}' already exists locally; skipping download.")

    # 3) Extract if needed
    if not os.path.isdir(extract_dir):
        print(f"🗜️ Unzipping '{zip_name}' into '{extract_dir}/' …")
        with zipfile.ZipFile(zip_name, 'r') as z:
            z.extractall(extract_dir)
        print("✅ Unzip complete!")
    else:
        print(f"ℹ️ Directory '{extract_dir}/' already exists; skipping unzip.")

    # 4) Return a sample listing
    sample = os.listdir(extract_dir)
    print(f"Contents of {extract_dir}/ (first 10): {sample[:10]}\n")


# ── Usage ────────────────────────────────────────────────────────────────────
DRIVE_FOLDER_ID = "170M9LmNiuVUYMQ76x_GPAatI_2TA7bh-"

# Fetch images
fetch_and_extract_from_drive(
    drive,
    DRIVE_FOLDER_ID,
    zip_name="Flickr8k_Dataset.zip",
    extract_dir="flickr8k_images"
)

# Fetch captions
fetch_and_extract_from_drive(
    drive,
    DRIVE_FOLDER_ID,
    zip_name="Flickr8k_text.zip",
    extract_dir="flickr8k_captions"
)


In [None]:
import os

def head_tail_dir(path, n=10):
    """Prints the first n and last n file names in `path`, with error handling."""
    print(f"\n📁 Exploring: {path}")
    if not os.path.isdir(path):
        print(f"⚠️ Path not found: {path}")
        return

    files = sorted(os.listdir(path))
    if not files:
        print("⚠️ Directory is empty.")
        return

    # Determine slices
    head = files[:n]
    tail = files[-n:] if len(files) > n else []

    # Print results
    print(f"First {min(n, len(files))} files:")
    for fn in head:
        print("  ", fn)
    if tail and tail != head:
        print(f"\nLast {n} files:")
        for fn in tail:
            print("  ", fn)

# ── Use it for your two folders ──────────────────────────────────────────────
paths = [
    "/kaggle/working/flickr8k_images/Flicker8k_Dataset",
    "/kaggle/working/flickr8k_captions",
    "/kaggle/working/flickr8k_captions/Flickr8k.token.txt"
]

for p in paths:
    head_tail_dir(p, n=10)


In [None]:
import os
import numpy as np
from PIL import Image

# 1) Path to your images
image_dir = '/kaggle/working/flickr8k_images/Flicker8k_Dataset'

# 2) Containers for sums, sums of squares, and total pixel count
channel_sum = np.zeros(3, dtype=np.float64)
channel_sq_sum = np.zeros(3, dtype=np.float64)
total_pixels = 0

# 3) Loop through all images
for fname in os.listdir(image_dir):
    if fname.lower().endswith(('.jpg', '.jpeg', '.png')):
        path = os.path.join(image_dir, fname)
        img = np.array(Image.open(path).convert('RGB'), dtype=np.float64)
        h, w, _ = img.shape

        # Flatten H×W×3 → (H*W)×3 and accumulate
        pixels = img.reshape(-1, 3)
        channel_sum    += pixels.sum(axis=0)
        channel_sq_sum += (pixels**2).sum(axis=0)
        total_pixels   += pixels.shape[0]

# 4) Compute mean and stddev per channel
raw_means = channel_sum / total_pixels
variances = (channel_sq_sum / total_pixels) - (raw_means**2)
raw_stds = np.sqrt(variances)

# 2) scale to 0–1
means = [m / 255.0 for m in raw_means]
stds  = [s / 255.0 for s in raw_stds]

# 5) Report
print("Channel Means (R, G, B):", means)
print("Channel Stds  (R, G, B):", stds)


In [None]:
captions = {}
with open('/kaggle/working/flickr8k_captions/Flickr8k.token.txt') as f:
    for line in f:
        image, caption = line.strip().split('\t')
        image = image.split('#')[0]
        captions.setdefault(image,[]).append(caption)


In [None]:
# import random
# TOY_SIZE = 500
# toy_keys     = random.sample(list(captions.keys()), TOY_SIZE)
# toy_captions = { k: captions[k] for k in toy_keys }
# print(f"Using toy dataset: {len(toy_captions)} images, "
#       f"{sum(len(v) for v in toy_captions.values())} captions")

# # --- 3) Overwrite captions for the rest of the notebook ---
# captions = toy_captions

In [None]:
import unicodedata

# Assuming captions is already populated as in your example
# e.g. captions = {'1000268201_693b08cb0e.jpg': ['A child in a pink dress is climbing up a set of stairs in an entry way .', ...], …}

normalized_captions = {}

for img_name, caps in captions.items():
    normalized_list = []
    for cap in caps:
        # 1. Normalize to NFC (composed form)
        norm = unicodedata.normalize('NFC', cap)
        # 2. Lowercase
        norm = norm.lower()
        normalized_list.append(norm)
    normalized_captions[img_name] = normalized_list

# Optionally, overwrite the original dict
captions = normalized_captions

# Now every caption is Unicode‑normalized and in lowercase:
# captions['1000268201_693b08cb0e.jpg'][0]
# → "a child in a pink dress is climbing up a set of stairs in an entry way ."


In [None]:
import string

def strip_punctuation(captions):
    """
    Remove all punctuation defined in string.punctuation from each caption.

    Args:
        captions (dict): { image_name: [caption1, caption2, …], … }
                        (captions should already be normalized & lowercased)

    Returns:
        dict: same keys, but with punctuation removed from each caption
    """
    # Build a translation table that maps each punctuation char to None
    table = str.maketrans('', '', string.punctuation)

    stripped = {}
    for img_name, caps in captions.items():
        new_caps = []
        for cap in caps:
            # Remove punctuation
            no_punct = cap.translate(table)
            # Collapse any extra whitespace
            no_punct = ' '.join(no_punct.split())
            new_caps.append(no_punct)
        stripped[img_name] = new_caps

    return stripped

# Example usage:
captions = strip_punctuation(captions)
# → "a child in a pink dress is climbing up a set of stairs in an entry way"


In [None]:
# Install dependencies
!pip install -q torchtext==0.6.0

In [None]:
# 1) Preliminary tokenization: add <start>/<end>
pre_tokenized = {}
for img_name, caps in captions.items():
    pre_tokenized[img_name] = [
        ['<start>'] + cap.split() + ['<end>']
        for cap in caps
    ]

In [None]:
import torch
max_len = 20
PAD_TOKEN = '<pad>'

padded_tokens = {}            # image_name -> List of token-lists
tgt_padding_masks = {}        # image_name -> List of BoolTensor masks

for img_name, seqs in pre_tokenized.items():
    out_seqs = []              # will hold padded/truncated token lists
    out_masks = []             # will hold corresponding padding masks
    for seq in seqs:
        if len(seq) > max_len:
            tr = seq[:max_len]
            if tr[-1] != '<end>':
                tr[-1] = '<end>'
            padded_seq = tr
        else:
            pad_count = max_len - len(seq)
            padded_seq = seq + [PAD_TOKEN] * pad_count
        # build padding mask: True at pad positions
        mask =  [token == PAD_TOKEN for token in padded_seq]

        out_seqs.append(padded_seq)
        out_masks.append(mask)
    padded_tokens[img_name] = out_seqs
    tgt_padding_masks[img_name] = out_masks

In [None]:
import string
from collections import Counter

# ── 2. Preprocess & collect all words ────────────────────────────
all_words = []
for cap_list in captions.values():
    for cap in cap_list:
        # remove punctuation, lowercase, split on whitespace
        cleaned = cap.lower().translate(str.maketrans('', '', string.punctuation))
        all_words.extend(cleaned.split())

# ── 3. Unique words & frequencies ────────────────────────────────
unique_words = set(all_words)
print(f"Total unique words: {len(unique_words)}")
freq = Counter(all_words)

In [None]:
from collections import defaultdict
from torchtext.vocab import Vocab

# hyperparams
V = 10000
theta = 5
SPECIALS = [PAD_TOKEN, '<unk>', '<start>', '<end>']

vocab = Vocab(counter=freq,
              max_size=V,
              min_freq=theta,
              specials=SPECIALS)

# ----- REPLACING vocab.set_default_index(vocab['<unk>']) -----
unk_idx = vocab.stoi['<unk>']
# wrap the existing stoi dict in a defaultdict that returns unk_idx for any missing key
vocab.stoi = defaultdict(lambda: unk_idx, vocab.stoi)
# ---------------------------------------------------------------

# numericalize padded tokens
numeric_captions = {}
numeric_masks = {}
for img_name in padded_tokens:
    seqs = padded_tokens[img_name]
    masks = tgt_padding_masks[img_name]
    num_seqs = []
    num_masks = []
    for seq, mask in zip(seqs, masks):
        num_seqs.append([vocab.stoi[token] for token in seq])
        num_masks.append(mask)
    numeric_captions[img_name] = num_seqs
    numeric_masks[img_name] = num_masks

# Example check:
ex = next(iter(numeric_captions.values()))[0]
print(f"Example numeric seq (len={len(ex)}): {ex}")

ex = next(iter(numeric_masks.values()))[0]
print(f"Example numeric seq (len={len(ex)}): {ex}")

In [None]:
import os

# Create a copy of the keys to avoid modifying the dictionary while iterating
keys_to_check = list(numeric_captions.keys())

for key in keys_to_check:
    img_path = os.path.join(image_dir, key)

    if not os.path.isfile(img_path):
        print(f"❌ Removing missing image: {key}")
        del numeric_captions[key]
        del numeric_masks[key]

print(f"\n✅ Finished. Remaining keys in numeric_captions: {len(numeric_captions)}")


In [None]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
from torchvision import transforms

# List all image keys
image_keys = list(numeric_captions.keys())

# Split 80% train / 20% test (you can adjust test_size or random_state)
train_keys, test_keys = train_test_split(
    image_keys,
    test_size=0.2,
    random_state=42,
    shuffle=True
)

# Build two caption‐dicts
train_captions = {k: numeric_captions[k] for k in train_keys}
test_captions  = {k: numeric_captions[k] for k in test_keys}


In [None]:
# 2) Reuse your same transform (or define different ones if you like)
# -------------------------------------------------------------------
transform_train = transforms.Compose([
    transforms.Resize((260, 260)),
    transforms.RandomResizedCrop(260, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
    transforms.ToTensor(),
    transforms.Normalize(mean=means, std=stds)
])

transform_test = transforms.Compose([
     transforms.Resize((260, 260)),
     transforms.ToTensor(),
     transforms.Normalize(mean=means, std=stds)
])

In [None]:
import os
from PIL import Image
import torch
from torch.utils.data import Dataset

class CaptionDataset(Dataset):
    """
    PyTorch Dataset for image-caption pairs, including padding masks.

    Args:
        numeric_captions (dict):
            Mapping from image filename to list of index sequences.
        numeric_masks (dict):
            Mapping from image filename to list of boolean mask lists (True if PAD).
        image_dir (str):
            Directory where images are stored.
        transform (callable, optional):
            A torchvision transform to apply to each PIL image.
    """
    def __init__(self, numeric_captions, numeric_masks, image_dir, transform):
        self.image_dir = image_dir
        self.transform = transform

        # Flatten into list of (img_name, seq, mask)
        self.samples = []
        for img_name, seq_list in numeric_captions.items():
            mask_list = numeric_masks[img_name]
            for seq, mask in zip(seq_list, mask_list):
                self.samples.append((img_name, seq, mask))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_name, seq, mask = self.samples[idx]

        # Load and transform image
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Convert to tensors
        caption = torch.tensor(seq, dtype=torch.long)
        padding_mask = torch.tensor(mask, dtype=torch.bool)

        return image, caption, padding_mask


In [None]:
# 3) Instantiate two datasets
# ----------------------------
image_dir = "/kaggle/working/flickr8k_images/Flicker8k_Dataset"  # ← update to your actual image folder

train_dataset = CaptionDataset(
    train_captions,
    numeric_masks,
    image_dir=image_dir,
    transform=transform_train
)

test_dataset = CaptionDataset(
    test_captions,
    numeric_masks,
    image_dir=image_dir,
    transform=transform_test
)



In [None]:
# 4) Create two DataLoaders
# --------------------------
train_loader = DataLoader(
    train_dataset,
    batch_size=16,
    shuffle=True,       # shuffle training set
    num_workers=4
)

test_loader = DataLoader(
    test_dataset,
    batch_size=16,
    shuffle=False,      # no need to shuffle test set
    num_workers=4
)

print(f"  • Train set: {len(train_dataset)} samples")
print(f"  • Test set:  {len(test_dataset)} samples")
print("DataLoaders ready!")

In [None]:
images, captions,mask = next(iter(train_loader))

images.shape
captions.shape
mask.shape

images[0]


In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
# import all the Weights enums
from torchvision.models import (
    EfficientNet_B0_Weights, EfficientNet_B1_Weights,
    EfficientNet_B2_Weights, EfficientNet_B3_Weights,
    EfficientNet_B4_Weights, EfficientNet_B5_Weights,
    EfficientNet_B6_Weights, EfficientNet_B7_Weights,
)

# map model name → its Weights enum
WEIGHTS = {
    "efficientnet_b0": EfficientNet_B0_Weights,
    "efficientnet_b1": EfficientNet_B1_Weights,
    "efficientnet_b2": EfficientNet_B2_Weights,
    "efficientnet_b3": EfficientNet_B3_Weights,
    "efficientnet_b4": EfficientNet_B4_Weights,
    "efficientnet_b5": EfficientNet_B5_Weights,
    "efficientnet_b6": EfficientNet_B6_Weights,
    "efficientnet_b7": EfficientNet_B7_Weights,
}

class EfficientNetEncoder(nn.Module):
    def __init__(self, variant: str, pretrained: bool, d_model: int, grid_size: int = 7):
        super().__init__()
        weights_enum = WEIGHTS[variant]
        weights = weights_enum.IMAGENET1K_V1 if pretrained else None
        backbone = getattr(models, variant)(weights=weights)
        modules = list(backbone.features)  # keep all conv blocks
        self.backbone = nn.Sequential(*modules)
        feat_dim = backbone.classifier[1].in_features
        self.adapt_pool = nn.AdaptiveAvgPool2d((grid_size, grid_size))
        self.proj = nn.Linear(feat_dim, d_model)
        self.norm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, images: torch.Tensor) -> torch.Tensor:
        x = self.backbone(images)            # (B, C, H, W)
        x = self.adapt_pool(x)               # (B, C, grid, grid)
        B, C, G, _ = x.size()
        x = x.flatten(2).transpose(1, 2)     # (B, G*G, C)
        x = self.proj(x)                     # (B, G*G, d_model)
        x = self.norm(x)
        return self.dropout(x)               # (B, G*G, d_model)

In [None]:
import math

# --------------------------------------------------------------
# 1) Causal Mask: Prevent decoder from seeing future tokens
# --------------------------------------------------------------
def generate_square_subsequent_mask(sz: int) -> torch.Tensor:
    """
    Returns an (sz x sz) float mask with -inf above diagonal, 0 on & below.
    Enforces autoregressive decoding.
    """
    # fill a (sz, sz) tensor with -inf
    mask = torch.triu(torch.ones(sz, sz), diagonal=1).bool()
    return mask


In [None]:
# --------------------------------------------------------------
# 2) Sinusoidal Positional Encoding
# --------------------------------------------------------------
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 20):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(pos * div)
        pe[:, 1::2] = torch.cos(pos * div)
        pe = pe.unsqueeze(0)  # [1, max_len, d_model]
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: [batch_size, seq_len, d_model]
        seq_len = x.size(1)
        return x + self.pe[:, :seq_len, :]


In [None]:
from torch.nn import TransformerDecoderLayer, TransformerDecoder

class TransformerDecoder(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        d_model: int = 256,
        num_layers: int = 3,
        nhead: int = 4,
        dim_feedforward: int = 1024,
        dropout: float = 0.3,
        max_len: int = 20
    ):
        super().__init__()
        self.d_model = d_model
        self.max_len = max_len

        # Token embedding + positional encoding
        self.embedding   = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len)
        self.dropout     = nn.Dropout(dropout)

        # Transformer decoder stack
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout
        )
        self.decoder = nn.TransformerDecoder(
            decoder_layer,
            num_layers=num_layers,
            norm=nn.LayerNorm(d_model)
        )

        # Pre-compute causal mask and register as buffer
        mask = generate_square_subsequent_mask(max_len)
        self.register_buffer('fixed_tgt_mask', mask)

        # Final projection to vocab logits
        self.output_proj = nn.Linear(d_model, vocab_size)

    def forward(
        self,
        tgt: torch.Tensor,
        memory: torch.Tensor,
        tgt_key_padding_mask: torch.Tensor
    ) -> torch.Tensor:
        """
        Args:
          tgt: Tensor of shape [tgt_len, batch_size] -- token indices
          memory: Tensor of shape [src_len, batch_size, d_model] -- encoder features
          tgt_key_padding_mask: BoolTensor [batch_size, tgt_len], True at PAD positions

        Returns:
          logits: FloatTensor [tgt_len, batch_size, vocab_size]
        """
        tgt_len, _ = tgt.size()


        # Embed tokens and add positional encoding
        emb = self.embedding(tgt) * math.sqrt(self.d_model)  # [tgt_len, batch, d_model]
        emb = emb.transpose(0,1)                             # [batch, tgt_len, d_model]
        emb = self.pos_encoder(emb)
        emb = self.dropout(emb)
        emb = emb.transpose(0,1)                             # [tgt_len, batch, d_model]

        # Slice or reuse the precomputed mask
        if tgt_len == self.max_len:
            tgt_mask = self.fixed_tgt_mask
        else:
            tgt_mask = self.fixed_tgt_mask[:tgt_len, :tgt_len]

        # Decode
        dec_out = self.decoder(
            tgt=emb,
            memory=memory,
            tgt_mask=tgt_mask,
            memory_mask=None,
            tgt_key_padding_mask=tgt_key_padding_mask,
            memory_key_padding_mask=None
        )

        # Project to vocab logits
        return self.output_proj(dec_out)


In [None]:
class ImageCaptioningModel(nn.Module):
    """
    Combines a CNN encoder with a Transformer decoder for image-to-caption generation.

    - Training forward uses teacher forcing (input ground-truth captions).
    - Greedy inference generates captions and returns both token IDs and logits.
    """
    def __init__(
        self,
        encoder: nn.Module,
        decoder: nn.Module,
        pad_idx: int,
        sos_idx: int,
        eos_idx: int,
        max_len: int = 20
    ):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.pad_idx = pad_idx
        self.sos_idx = sos_idx
        self.eos_idx = eos_idx
        self.max_len = max_len

    def forward(
        self,
        images: torch.Tensor,
        captions: torch.Tensor,
        captions_mask: torch.Tensor
    ) -> torch.Tensor:
        # Teacher-forcing forward
        features = self.encoder(images)
        if features.dim() == 2:
            # single vector case
            memory = features.unsqueeze(0)       # → (1, B, d_model)
        else:
            # grid case
            memory = features.permute(1, 0, 2)    # → (src_len=G*G, B, d_model)
        tgt_input = captions.transpose(0, 1)
        logits = self.decoder(
            tgt=tgt_input,
            memory=memory,
            tgt_key_padding_mask=captions_mask
        )  # [T, B, V]
        return logits

    def generate(
        self,
        images: torch.Tensor,
        device: torch.device,
        beam_size: int = 5,
        length_norm: bool = True
    ) -> (torch.Tensor, torch.Tensor):
        """
        Batched beam-search inference.

        Returns:
            generated: [B, max_len] token IDs (best sequence per image)
            all_logits: [B, max_len, V] logits for the chosen sequence (recomputed)
        """
        import torch
        import torch.nn.functional as F

        B = images.size(0)
        features = self.encoder(images)
        if features.dim() == 2:
            memory = features.unsqueeze(0)           # (1, B, d_model)
        else:
            memory = features.permute(1, 0, 2)       # (src_len, B, d_model)

        V = int(self.decoder.output_proj.out_features)
        max_len = self.max_len

        # buffers (match greedy interface)
        generated = torch.full((B, max_len), self.pad_idx, dtype=torch.long, device=device)
        generated[:, 0] = self.sos_idx
        all_logits = torch.zeros(B, max_len, V, device=device, dtype=torch.float)

        # Greedy fallback to preserve behavior for beam_size <= 1
        if beam_size <= 1:
            for t in range(1, max_len):
                tgt_input = generated[:, :t].transpose(0, 1)        # [t, B]
                tgt_mask = (generated[:, :t] == self.pad_idx)      # [B, t]
                out = self.decoder(tgt=tgt_input, memory=memory, tgt_key_padding_mask=tgt_mask)  # [t, B, V]
                step_logits = out[-1, :, :]                        # [B, V]
                all_logits[:, t, :] = step_logits
                next_tokens = step_logits.argmax(dim=-1)
                generated[:, t] = next_tokens
                if (next_tokens == self.eos_idx).all():
                    break
            return generated, all_logits

        # ---------- Vectorized batched beam search ----------
        # sequences: [B, beam, max_len]
        sequences = torch.full((B, beam_size, max_len), self.pad_idx, dtype=torch.long, device=device)
        sequences[:, :, 0] = self.sos_idx  # SOS at pos 0 for all beams

        # scores: [B, beam], initialize so only the first beam has score 0 and others -inf
        NEG_INF = -1e9
        beam_scores = torch.full((B, beam_size), NEG_INF, device=device, dtype=torch.float)
        beam_scores[:, 0] = 0.0

        # We'll iterate time steps and expand beams
        for t in range(1, max_len):
            # Prepare decoder input for all beams: [t, B*beam]
            # current prefix tokens for all beams: sequences[:, :, :t] -> [B, beam, t]
            cur_prefix = sequences[:, :, :t].reshape(B * beam_size, t)  # [B*beam, t]
            tgt_input = cur_prefix.transpose(0, 1)                      # [t, B*beam]
            # padding mask for decoder: [B*beam, t] then will be interpreted as tgt_key_padding_mask
            tgt_mask = (cur_prefix == self.pad_idx).reshape(B * beam_size, t)  # [B*beam, t]

            # Repeat memory to match beams: memory: [src_len, B, d_model] -> mem_rep: [src_len, B*beam, d_model]
            mem_rep = memory.repeat(1, beam_size, 1)  # (src_len, B*beam, d_model)

            # Run decoder for all beams at once
            out = self.decoder(tgt=tgt_input, memory=mem_rep, tgt_key_padding_mask=tgt_mask)  # [t, B*beam, V]
            step_logits = out[-1, :, :].reshape(B, beam_size, V)  # [B, beam, V]
            logp = F.log_softmax(step_logits, dim=-1)             # [B, beam, V]

            # cumulative scores for all beam x token expansions -> [B, beam*V]
            total_scores = beam_scores.unsqueeze(2) + logp        # [B, beam, V]
            flat = total_scores.view(B, beam_size * V)           # [B, beam*V]

            # pick top `beam_size` candidates per batch
            k = min(beam_size, flat.size(1))
            topk_scores, topk_idx = torch.topk(flat, k=k, dim=1)   # each: [B, beam_size]

            # decode flat indices -> (prev_beam_idx, token_idx)
            prev_beam_idx = (topk_idx // V)                       # [B, beam_size]
            token_idx = (topk_idx % V)                            # [B, beam_size]

            # gather the previous best sequences for each selected candidate
            # sequences has shape [B, beam, max_len]; we want selected_prev: [B, beam_size, max_len]
            batch_idx = torch.arange(B, device=device).unsqueeze(1).expand(B, k)  # [B, beam_size]
            selected_prev = sequences[batch_idx, prev_beam_idx]  # [B, beam_size, max_len]

            # form new sequences by appending the chosen token at position t
            new_sequences = selected_prev.clone()
            new_sequences[:, :, t] = token_idx  # broadcast assigns [B, beam_size]

            # update sequences and beam_scores for next step
            # sequences becomes the new_sequences (we keep beam_size beams per batch)
            sequences = torch.full_like(sequences, self.pad_idx)
            # If k < beam_size (edge), we only fill first k positions (but k == beam_size normally)
            sequences[:, :k, :] = new_sequences[:, :k, :]
            beam_scores = topk_scores  # [B, beam_size]

            # optional early stopping condition:
            # if every top token across the batch are EOS and all beams produced EOS, we can stop
            # check if all new tokens are EOS for the topk beams across the batch:
            all_eos = (token_idx == self.eos_idx).all(dim=1).all()  # bool across entire batch
            if all_eos:
                break

        # After expanding to max_len or early stopped, we have `sequences` and `beam_scores`:
        # sequences: [B, beam, max_len], beam_scores: [B, beam]

        # Decide final best sequence per batch:
        # Prefer beams that contain EOS. If none finished in a batch, pick best live beam.
        # 1) find first EOS position per beam (if any), else set to max_len
        positions = torch.arange(max_len, device=device).view(1, 1, max_len)  # [1,1,max_len]
        eos_mask = (sequences == self.eos_idx)                               # [B, beam, max_len]
        # eos_pos will be min position where eos == True; if no eos, value becomes max_len
        eos_pos = torch.where(eos_mask, positions, torch.full_like(positions, max_len)).min(dim=2).values  # [B, beam]
        finished_mask = (eos_pos < max_len)  # [B, beam]

        # lengths for normalization: if finished -> eos_pos+1 else -> max_len (or current t)
        lengths = torch.where(finished_mask, eos_pos + 1, torch.full_like(eos_pos, max_len))  # [B, beam]
        # normalized scores
        norm_scores = beam_scores / lengths.to(beam_scores.dtype)

        # If batch has any finished beam, only consider finished ones; otherwise consider all live beams
        has_finished = finished_mask.any(dim=1)  # [B]
        # build mask_keep: True for beams we are allowed to consider for selection
        # if has_finished[b] is True -> keep only finished beams; else -> keep all beams
        keep_mask = finished_mask.clone()
        # For batches without finished beams, mark all beams as keep
        if (has_finished == False).any():
            no_finished_idx = (~has_finished).nonzero(as_tuple=False).squeeze(1)
            if no_finished_idx.numel() > 0:
                keep_mask[no_finished_idx, :] = True

        # set scores for disallowed beams to -inf so they won't be selected
        NEG_INF_TENSOR = torch.full_like(norm_scores, NEG_INF)
        selectable_scores = torch.where(keep_mask, norm_scores, NEG_INF_TENSOR)  # [B, beam]

        # choose best beam index per batch
        best_beam_idx = torch.argmax(selectable_scores, dim=1)  # [B]

        # gather best sequences -> best_seqs [B, max_len]
        batch_idx = torch.arange(B, device=device)
        best_seqs = sequences[batch_idx, best_beam_idx, :]  # [B, max_len]
        generated = best_seqs.clone()

        # Recompute logits for the chosen sequence (vectorized across the batch)
        tgt_input = generated.transpose(0, 1)                # [max_len, B]
        tgt_mask = (generated == self.pad_idx)               # [B, max_len]
        out_full = self.decoder(tgt=tgt_input, memory=memory, tgt_key_padding_mask=tgt_mask)  # [max_len, B, V]
        # store logits per time-step (starting from t=1)
        for t in range(1, max_len):
            all_logits[:, t, :] = out_full[t, :, :]

        return generated, all_logits


In [None]:
# Example usage:
# Your “hyperparameters”
variant = 'efficientnet_b2'
encoder = EfficientNetEncoder(
        variant=variant,
        pretrained=True,
        d_model=256
    )
decoder = TransformerDecoder(vocab_size=len(vocab), d_model=256)
model = ImageCaptioningModel(
    encoder, decoder,
    pad_idx=vocab.stoi['<pad>'],
    sos_idx=vocab.stoi['<start>'],
    eos_idx=vocab.stoi['<end>'],
    max_len=max_len
)

# # Training:
# logits = model(images, captions, masks)  # teacher forcing
#
# # Inference:
# gen = model.generate(images, device)

In [None]:
# ====== HYPERPARAMETERS (edit these) ======
decoder_lr = 0.0005       # LR for decoder while encoder is frozen
encoder_lr = 0.00001       # LR for encoder after unfreezing (much smaller)
weight_decay = 0.001
unfreeze_epoch = 10      # epoch at which to unfreeze encoder and add encoder params to optimizer
num_epochs = 30         # total epochs
# ===========================================

import torch
from torch.optim import AdamW
from torch.optim.lr_scheduler import ReduceLROnPlateau


criterion = nn.CrossEntropyLoss(label_smoothing=0.1, ignore_index=vocab.stoi['<pad>'])

# 1) Freeze encoder parameters
for p in model.encoder.parameters():
    p.requires_grad = False

# 2) Prepare decoder param list (trainable)
decoder_params = [p for p in model.decoder.parameters() if p.requires_grad]
if len(decoder_params) == 0:
    raise RuntimeError("Decoder has no trainable params. Check attribute name 'model.decoder'.")

# 3) Create optimizer with decoder params only (we will add encoder group later)
optimizer = AdamW([{'params': decoder_params, 'lr': decoder_lr, 'weight_decay': weight_decay}],
                  betas=(0.9, 0.999), eps=1e-8)

# 4) Create ReduceLROnPlateau scheduler which expects (optimizer, ...)
#    This will scale all param-group LRs together when plateauing.
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True, min_lr=1e-8)

# 5) Print optimizer param groups and LRs for sanity
for i, pg in enumerate(optimizer.param_groups):
    print(f"param_group[{i}] lr = {pg['lr']:g}  (num_params={len(pg['params'])})")


In [None]:
!pip install -q nltk

In [None]:
import os, pickle
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

CHECKPOINT_FILE = "checkpoint.pt"
HISTORY_FILE    = "history.pkl"
EPOCHS_PER_RUN  = 40   # how many epochs you do per Kaggle session
TOTAL_EPOCHS    = 200  # eventual goal

# Default start values
start_epoch = 1
history = {
    "train_losses":   [],
    "val_losses":     [],
    "train_bleus":    [],
    "val_bleus":      [],
    "train_meteors":  [],
    "val_meteors":    []
}

# 1a) If there’s a checkpoint, load it
if os.path.exists(CHECKPOINT_FILE):
    ckpt = torch.load(CHECKPOINT_FILE, map_location=device)
    print("checkpoint param_groups:", len(ckpt["opt_state"]["param_groups"]))
    print("current optimizer.param_groups:", len(optimizer.param_groups))

    # get encoder params from your model (adjust attribute if different)
    encoder_params = list(model.encoder.parameters())
    decoder_params = list(model.decoder.parameters())

    # If checkpoint expects 2 groups but optimizer currently has 1,
    # add encoder group (must add in the same order used when saving).
    if len(ckpt["opt_state"]["param_groups"]) > len(optimizer.param_groups):
        # try to match the saved group's LR and weight_decay
        saved_group = ckpt["opt_state"]["param_groups"][1]  # second group
        saved_lr = saved_group.get("lr", encoder_lr)  # fallback to your encoder_lr
        saved_wd = saved_group.get("weight_decay", weight_decay)

        optimizer.add_param_group({
            "params": encoder_params,
            "lr": saved_lr,
            "weight_decay": saved_wd
        })
        print("Added encoder param_group to optimizer.")

# now safe to load optimizer state

    model.load_state_dict(ckpt["model_state"])
    optimizer.load_state_dict(ckpt["opt_state"])
    start_epoch = ckpt["epoch"] + 1
    # load the metrics history so far
    with open(HISTORY_FILE, "rb") as f:
        history = pickle.load(f)
    print(f"Resuming from epoch {start_epoch}")
else:
    print("No checkpoint found — starting from scratch.")


In [None]:
import torch
import torch.nn as nn
from torch.optim import AdamW
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score


# Hyperparams & bookkeeping
pad_idx   = vocab.stoi['<pad>']
sos_idx   = vocab.stoi['<start>']
eos_idx   = vocab.stoi['<end>']

smooth = SmoothingFunction().method4  # typically method4 for corpus_bleu

train_losses, val_losses = [], []
train_bleus,  val_bleus  = [], []
train_meteors, val_meteors = [], []

end_epoch = min(start_epoch + EPOCHS_PER_RUN - 1, TOTAL_EPOCHS)

for epoch in range(start_epoch, end_epoch + 1):
    # ——— TRAIN LOOP ———
    model.train()
    running_loss = 0.0
    n_batches    = 0

    # For corpus BLEU
    all_train_refs = []  # list of list of refs per example
    all_train_hyps = []  # list of hyp token lists

    running_meteor = 0.0
    n_train_samples = 0

    for images, captions, masks in train_loader:
        images, captions, masks = images.to(device), captions.to(device), masks.to(device)

        # 1) Forward (teacher forcing) for loss
        captions_input = captions[:, :-1]
        masks_input    = masks[:, :-1]
        logits = model(images, captions_input, masks_input)  # [T, B, V]
        T, B, V = logits.shape

        # 2) Compute cross-entropy loss
        pred   = logits.transpose(0,1).reshape(B*T, V)
        target = captions[:, 1:].reshape(B*T)
        loss = criterion(pred, target)

        # 3) Backprop & step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # 4) Accumulate train loss
        running_loss += loss.item()
        n_batches    += 1

        # 5) Collect for corpus BLEU & METEOR
        with torch.no_grad():
            gen_ids, _ = model.generate(images, device)  # [B, max_len]
            for g, t in zip(gen_ids, captions):
                # reference: single ref list of tokens
                t_tokens = [vocab.itos[i] for i in t.cpu().tolist()
                            if i not in {pad_idx, sos_idx, eos_idx}]
                all_train_refs.append([t_tokens])
                # hypothesis:
                g_tokens = [vocab.itos[i] for i in g.cpu().tolist()
                            if i not in {pad_idx, sos_idx, eos_idx}]
                all_train_hyps.append(g_tokens)

                # METEOR per sample (we’ll average later)
                running_meteor += meteor_score([t_tokens], g_tokens)
                n_train_samples += 1

    # Compute epoch‐level metrics
    train_losses.append(running_loss / n_batches)
    train_meteors.append(running_meteor / n_train_samples)
    # corpus BLEU-4
    train_bleu = corpus_bleu(
        list_of_references=all_train_refs,
        hypotheses=all_train_hyps,
        weights=(0.25,0.25,0.25,0.25),
        smoothing_function=smooth
    )
    train_bleus.append(train_bleu * 100)

    # ——— VALIDATION LOOP ———
    model.eval()
    running_val_loss = 0.0
    n_val_batches    = 0

    all_val_refs = []
    all_val_hyps = []
    running_val_meteor = 0.0
    n_val_samples = 0

    with torch.no_grad():
        for images, captions, masks in test_loader:
            images, captions, masks = images.to(device), captions.to(device), masks.to(device)

            # forward for loss
            captions_input = captions[:, :-1]
            masks_input    = masks[:, :-1]
            logits = model(images, captions_input, masks_input)
            T, B, V = logits.shape

            pred   = logits.transpose(0,1).reshape(B*T, V)
            target = captions[:, 1:].reshape(B*T)
            batch_loss = criterion(pred, target)
            running_val_loss += batch_loss.item()
            n_val_batches += 1

            # collect for corpus BLEU & METEOR
            gen_ids, _ = model.generate(images, device)
            for g, t in zip(gen_ids, captions):
                t_tokens = [vocab.itos[i] for i in t.cpu().tolist()
                            if i not in {pad_idx, sos_idx, eos_idx}]
                all_val_refs.append([t_tokens])
                g_tokens = [vocab.itos[i] for i in g.cpu().tolist()
                            if i not in {pad_idx, sos_idx, eos_idx}]
                all_val_hyps.append(g_tokens)

                running_val_meteor += meteor_score([t_tokens], g_tokens)
                n_val_samples += 1

    # Average validation loss & METEOR
    val_loss = running_val_loss / n_val_batches
    val_meteor = running_val_meteor / n_val_samples
    val_losses.append(val_loss)
    val_meteors.append(val_meteor)

    # corpus BLEU-4 on validation
    val_bleu = corpus_bleu(
        list_of_references=all_val_refs,
        hypotheses=all_val_hyps,
        weights=(0.25,0.25,0.25,0.25),
        smoothing_function=smooth
    )
    val_bleus.append(val_bleu * 100)

    # Step scheduler on validation loss
    scheduler.step(val_loss)
        # === UNFREEZE ENCODER at chosen epoch (do this once) ===
    if epoch == unfreeze_epoch:
        print(f"*** Unfreezing encoder at epoch {epoch} and adding encoder params to optimizer (lr={encoder_lr}) ***")
        # 1) Enable gradients for encoder
        for p in model.encoder.parameters():
            p.requires_grad = True

        # 2) Collect ids of params already in optimizer to avoid duplication
        existing_param_ids = set()
        for pg in optimizer.param_groups:
            for p in pg['params']:
                existing_param_ids.add(id(p))

        # 3) Build list of encoder params not already in optimizer
        encoder_params_to_add = [p for p in model.encoder.parameters() if id(p) not in existing_param_ids and p.requires_grad]

        if len(encoder_params_to_add) == 0:
            print("Warning: no encoder params to add (they might already be present).")
        else:
            optimizer.add_param_group({'params': encoder_params_to_add, 'lr': encoder_lr, 'weight_decay': weight_decay})
            print("Added encoder param group. Current optimizer param groups and LRs:")
            for i, pg in enumerate(optimizer.param_groups):
                print(f"  param_group[{i}] lr = {pg['lr']:g}  (num_params={len(pg['params'])})")

    # ——— LOG & SAVE ———
    print(f"Epoch {epoch}/{end_epoch}")
    print(f"  Train Loss: {train_losses[-1]:.4f} | Val Loss: {val_losses[-1]:.4f}")
    print(f"  Train BLEU: {train_bleus[-1]:.2f}% | Val BLEU: {val_bleus[-1]:.2f}%")
    print(f"  Train MET:  {train_meteors[-1]:.4f} | Val MET:  {val_meteors[-1]:.4f}")

    torch.save({
        "epoch":       epoch,
        "model_state": model.state_dict(),
        "opt_state":   optimizer.state_dict()
    }, CHECKPOINT_FILE)

    history["train_losses"].append(train_losses[-1])
    history["val_losses"].append(val_losses[-1])
    history["train_bleus"].append(train_bleus[-1])
    history["val_bleus"].append(val_bleus[-1])
    history["train_meteors"].append(train_meteors[-1])
    history["val_meteors"].append(val_meteors[-1])
    with open(HISTORY_FILE, "wb") as f:
        pickle.dump(history, f)
    print("✅ Checkpoint & history updated\n")


In [None]:
from IPython.display import FileLink

# After your training / save cell:
display(FileLink("history.pkl"))
display(FileLink("checkpoint.pt"))


In [None]:
import matplotlib.pyplot as plt
import os


h = history  # loaded from HISTORY_FILE
x = list(range(1, len(h["train_losses"]) + 1))

# make output directory if needed
os.makedirs("plots", exist_ok=True)

# 1) Loss Curve: train & validation loss
plt.figure()
plt.plot(x, h["train_losses"], label="Train Loss")
plt.plot(x, h["val_losses"],  label="Val Loss")
plt.xlabel("Epoch")
plt.ylabel("CrossEntropy Loss")
plt.title("Training vs. Validation Loss")
plt.legend()
plt.tight_layout()
plt.show()

# 2) BLEU Curve: train & validation BLEU
plt.figure()
plt.plot(x, h['train_bleus'], label="Train BLEU")
plt.plot(x, h['val_bleus'],  label="Val BLEU")
plt.xlabel("Epoch")
plt.ylabel("BLEU Score")
plt.title("Training vs. Validation BLEU")
plt.legend()
plt.tight_layout()
plt.show()

# 3) METEOR Curve: train & validation METEOR
plt.figure()
plt.plot(x, h['train_meteors'], label="Train METEOR")
plt.plot(x, h['val_meteors'],  label="Val METEOR")
plt.xlabel("Epoch")
plt.ylabel("METEOR Score")
plt.title("Training vs. Validation METEOR")
plt.legend()
plt.tight_layout()
plt.show()

print("✅ Saved plots to the `plots/` folder:")
print("   - plots/loss_curve.png")
print("   - plots/bleu_curve.png")
print("   - plots/meteor_curve.png")


In [None]:
import torch
from PIL import Image
from torchvision import transforms

# 1) Path to your .pkl in the Kaggle Input directory
MODEL_PATH = "/kaggle/working/image_captioning_model.pkl"

# 2) Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = model.to(device)
model.eval()

# 4) Prepare your vocabulary lookup (inverse of stoi)
#    Assumes you still have `vocab` in scope
inv_vocab = {idx: tok for tok, idx in vocab.stoi.items()}

# 5) Define the same transforms you used during training
transform = transforms.Compose([
    transforms.Resize((260, 260)),         # or whatever your encoder expects
    transforms.ToTensor(),
    transforms.Normalize(
        mean=means,        # standard ImageNet stats
        std= stds
    )
])


# Define the image path
img_path = "/kaggle/input/samples/dog-house.jpg"

# Open the image
image = Image.open(img_path)

# Display the image
plt.imshow(image)
plt.axis('off')  # Hide axis
plt.show()


# 6) Load & preprocess a single image
img = Image.open(img_path).convert("RGB")
inp = transform(img).unsqueeze(0).to(device)  # shape [1,3,H,W]


# 7) Generate a caption (greedy decoding)
with torch.no_grad():
    output_ids,_ = model.generate(inp, device)  # → [1, max_len]
output_ids = output_ids[0].cpu().tolist()    # convert to Python list

# 8) Decode token IDs to words, stopping at <end>
words = []
for idx in output_ids:
    token = vocab.itos[idx]
    if token == "<end>":
        break
    if token not in {"<sos>", "<pad>"}:
        words.append(token)

caption = " ".join(words)
print("🖼️ Caption:", caption)


In [None]:


# Define the image path
img_path = "/kaggle/input/samples/Road and car.jpg"

# Open the image
image = Image.open(img_path)

# Display the image
plt.imshow(image)
plt.axis('off')  # Hide axis
plt.show()

# 6) Load & preprocess a single image
img = Image.open(img_path).convert("RGB")
inp = transform(img).unsqueeze(0).to(device)  # shape [1,3,H,W]

# 7) Generate a caption (greedy decoding)
with torch.no_grad():
    output_ids, _ = model.generate(inp, device)  # → [1, max_len]
output_ids = output_ids[0].cpu().tolist()    # convert to Python list

# 8) Decode token IDs to words, stopping at <end>
words = []
for idx in output_ids:
    token = vocab.itos[idx]
    if token == "<end>":
        break
    if token not in {"<sos>", "<pad>"}:
        words.append(token)

caption = " ".join(words)
print("🖼️ Caption:", caption)


In [None]:


# Define the image path
image_path = "/kaggle/input/samples/sample3.jpg"

# Open the image
image = Image.open(image_path)

# Display the image
plt.imshow(image)
plt.axis('off')  # Hide axis
plt.show()

# 6) Load & preprocess a single image
img_path = "/kaggle/input/samples/sample3.jpg"
img = Image.open(img_path).convert("RGB")
inp = transform(img).unsqueeze(0).to(device)  # shape [1,3,H,W]

# 7) Generate a caption (greedy decoding)
with torch.no_grad():
    output_ids,_ = model.generate(inp, device)  # → [1, max_len]
output_ids = output_ids[0].cpu().tolist()    # convert to Python list

# 8) Decode token IDs to words, stopping at <end>
words = []
for idx in output_ids:
    token = vocab.itos[idx]
    if token == "<end>":
        break
    if token not in {"<sos>", "<pad>"}:
        words.append(token)

caption = " ".join(words)
print("🖼️ Caption:", caption)


In [None]:
from PIL import Image
import matplotlib.pyplot as plt

# Define the image path
img_path = "/kaggle/input/samples/pet and owners.jpg"

# Open the image
image = Image.open(img_path)

# Display the image
plt.imshow(image)
plt.axis('off')  # Hide axis
plt.show()


# 6) Load & preprocess a single image
img_path = "/kaggle/input/samples/pet and owners.jpg"
img = Image.open(img_path).convert("RGB")
inp = transform(img).unsqueeze(0).to(device)  # shape [1,3,H,W]

# 7) Generate a caption (greedy decoding)
with torch.no_grad():
    output_ids,_ = model.generate(inp, device)  # → [1, max_len]
output_ids = output_ids[0].cpu().tolist()    # convert to Python list

# 8) Decode token IDs to words, stopping at <end>
words = []
for idx in output_ids:
    token = vocab.itos[idx]
    if token == "<end>":
        break
    if token not in {"<sos>", "<pad>"}:
        words.append(token)

caption = " ".join(words)
print("🖼️ Caption:", caption)


In [None]:
# ---------- Cell: train_loader lengths (teacher forcing, use same prep as training loop) ----------
import torch
import numpy as np

# remember whether model was in training mode and set eval for safe inference
was_training = model.training
model.eval()

batch_means_train = []
all_lengths_train = []

with torch.no_grad():
    for batch_idx, batch in enumerate(train_loader):
        # Unpack and move to device (same variable names as your loop)
        images, captions, masks = batch
        images, captions, masks = images.to(device), captions.to(device), masks.to(device)

        # 1) Forward (teacher forcing) for loss — follow your exact pattern
        captions_input = captions[:, :-1]    # remove last token for input
        masks_input    = masks[:, :-1]
        logits = model(images, captions_input, masks_input)   # [T, B, V]
        T, B, V = logits.shape

        # (You used these for loss in training; we compute them here for consistency)
        pred   = logits.transpose(0,1).reshape(B*T, V)        # [B*T, V]
        target = captions[:, 1:].reshape(B*T)                # [B*T]

        # get predicted token ids per timestep: logits.argmax -> [T, B]
        preds_tb = logits.argmax(dim=-1)                     # [T, B]
        preds_bt = preds_tb.transpose(0,1)                   # [B, T]

        # compute lengths per sample excluding special tokens
        mask_non_special = (preds_bt != pad_idx) & (preds_bt != sos_idx) & (preds_bt != eos_idx)
        lengths_per_sample = mask_non_special.sum(dim=1).cpu().tolist()  # list of length B

        # batch statistics
        batch_mean = float(np.mean(lengths_per_sample)) if len(lengths_per_sample) > 0 else 0.0
        batch_means_train.append(batch_mean)
        all_lengths_train.extend(lengths_per_sample)

# dataset-level stats (population std, ddof=0)
if len(all_lengths_train) > 0:
    max_batch_avg_train = max(batch_means_train)
    min_batch_avg_train = min(batch_means_train)
    dataset_mean_train = float(np.mean(all_lengths_train))
    dataset_std_train  = float(np.std(all_lengths_train, ddof=0))
else:
    max_batch_avg_train = min_batch_avg_train = dataset_mean_train = dataset_std_train = 0.0

# restore original mode
if was_training:
    model.train()

print("TRAIN (teacher forcing) — caption length stats")
print(f"  • batches processed: {len(batch_means_train)}")
print(f"  • max(batch average lengths) : {max_batch_avg_train:.4f}")
print(f"  • min(batch average lengths) : {min_batch_avg_train:.4f}")
print(f"  • dataset mean length        : {dataset_mean_train:.4f}")
print(f"  • dataset std (pop)          : {dataset_std_train:.4f}")


In [None]:
# ---------- Cell: test_loader lengths (autoregressive generate) ----------
import torch
import numpy as np

was_training = model.training
model.eval()

batch_means_test = []
all_lengths_test = []

with torch.no_grad():
    for batch_idx, batch in enumerate(test_loader):
        images, captions, masks = batch
        images = images.to(device)

        # Use your model.generate(...) for autoregressive generation (same API you used)
        # Expect gen_ids shape: [B, max_len]
        gen_ids, _ = model.generate(images, device)   # adjust if your generate returns different order

        # compute lengths excluding special tokens
        gen_ids_cpu = gen_ids.cpu()
        mask_non_special = (gen_ids_cpu != pad_idx) & (gen_ids_cpu != sos_idx) & (gen_ids_cpu != eos_idx)
        lengths_per_sample = mask_non_special.sum(dim=1).tolist()

        batch_mean = float(np.mean(lengths_per_sample)) if len(lengths_per_sample) > 0 else 0.0
        batch_means_test.append(batch_mean)
        all_lengths_test.extend(lengths_per_sample)

# dataset-level stats
if len(all_lengths_test) > 0:
    max_batch_avg_test = max(batch_means_test)
    min_batch_avg_test = min(batch_means_test)
    dataset_mean_test = float(np.mean(all_lengths_test))
    dataset_std_test  = float(np.std(all_lengths_test, ddof=0))
else:
    max_batch_avg_test = min_batch_avg_test = dataset_mean_test = dataset_std_test = 0.0

if was_training:
    model.train()

print("TEST (autoregressive) — caption length stats")
print(f"  • batches processed: {len(batch_means_test)}")
print(f"  • max(batch average lengths) : {max_batch_avg_test:.4f}")
print(f"  • min(batch average lengths) : {min_batch_avg_test:.4f}")
print(f"  • dataset mean length        : {dataset_mean_test:.4f}")
print(f"  • dataset std (pop)          : {dataset_std_test:.4f}")


In [None]:
# ---------- Cell: predict & show RANDOM 10 samples (presentable version) ----------
import torch
import matplotlib.pyplot as plt
import numpy as np
import random
import textwrap

num_examples = 10
random_seed = None
wrap_width = 60  # narrower wrap for better fitting under image

pad_idx = vocab.stoi.get('<pad>', None)
sos_idx = vocab.stoi.get('<start>', None)
eos_idx = vocab.stoi.get('<end>', None)

def unnormalize_and_to_uint8(img_tensor):
    t = img_tensor.detach().cpu()
    if t.ndim == 3 and t.shape[0] <= 4:
        t = t.permute(1, 2, 0)
    arr = t.numpy().astype(np.float32)
    applied = False
    try:
        mean_arr = np.array(means).reshape(1, 1, -1)
        std_arr  = np.array(stds).reshape(1, 1, -1)
        arr = arr * std_arr + mean_arr
        applied = True
    except: pass
    if not applied:
        try:
            mean_arr = np.array(mean).reshape(1, 1, -1)
            std_arr  = np.array(std).reshape(1, 1, -1)
            arr = arr * std_arr + mean_arr
            applied = True
        except: pass
    if not applied:
        imagenet_mean = np.array([0.485, 0.456, 0.406])
        imagenet_std  = np.array([0.229, 0.224, 0.225])
        try:
            arr = arr * imagenet_std.reshape(1, 1, 3) + imagenet_mean.reshape(1, 1, 3)
        except: pass
    arr = np.clip(arr, 0.0, 1.0)
    return (arr * 255).astype(np.uint8)

def ids_to_sentence(ids):
    if isinstance(ids, torch.Tensor):
        ids = ids.tolist()
    words = []
    for idx in ids:
        if pad_idx is not None and idx == pad_idx:
            continue
        if sos_idx is not None and idx == sos_idx:
            continue
        if eos_idx is not None and idx == eos_idx:
            break
        words.append(vocab.itos[idx] if idx < len(vocab.itos) else str(idx))
    return " ".join(words).strip()

if random_seed is not None:
    random.seed(random_seed)

try:
    dataset_len = len(test_loader.dataset)
except:
    dataset_len = 0
    for b in test_loader:
        dataset_len += b[0].shape[0]

chosen_indices = random.sample(range(dataset_len), num_examples)

images_list = []
captions_list = []
dataset = test_loader.dataset
for idx in chosen_indices:
    item = dataset[idx]
    image = item[0]
    caption = item[1] if len(item) > 1 else None
    images_list.append(image)
    captions_list.append(caption)

images_batch = torch.stack(images_list, dim=0).to(device)
captions_batch = torch.stack(captions_list, dim=0).to(device) if captions_list[0] is not None else None

was_training = model.training
model.eval()
with torch.no_grad():
    gen_ids = model.generate(images_batch, device)
    if isinstance(gen_ids, (tuple, list)):
        gen_ids = gen_ids[0]
    gen_ids = gen_ids.cpu()
if was_training:
    model.train()

examples = []
for i in range(num_examples):
    img_np = unnormalize_and_to_uint8(images_list[i])
    pred_sentence = ids_to_sentence(gen_ids[i])
    true_sentence = ids_to_sentence(captions_list[i].cpu()) if captions_batch is not None else ""
    examples.append((img_np, pred_sentence, true_sentence, chosen_indices[i]))

# ---- Display in 2-column grid ----
cols = 2
rows = (num_examples + cols - 1) // cols
fig, axes = plt.subplots(rows, cols, figsize=(12, rows * 6))
axes = axes.flatten()

for i, ax in enumerate(axes):
    ax.axis('off')
    if i < len(examples):
        img_np, pred_sentence, true_sentence, ds_idx = examples[i]
        ax.imshow(img_np)
        ax.set_title(f"Sample {i+1} (idx={ds_idx})", fontsize=14, pad=10)
        wrapped_pred = textwrap.fill(f"Predicted: {pred_sentence}", width=wrap_width)
        wrapped_true = textwrap.fill(f"Actual: {true_sentence}", width=wrap_width)
        ax.text(0.5, -0.15, wrapped_pred + "\n" + wrapped_true,
                transform=ax.transAxes, ha='center', fontsize=11, va='top', clip_on=False)

plt.subplots_adjust(hspace=0.6, wspace=0.3)
plt.show()
