<a href="https://colab.research.google.com/github/BhanuPratapSingh16/Image-Captioner/blob/main/Image_Captioner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Dataset filtering

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import os
import numpy as np

In [None]:
!pip install kaggle



In [None]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"kakarot167vegeta","key":"f7ed2086f7177f1009a3581f363ee6af"}'}

In [None]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d adityajn105/flickr8k
!unzip flickr8k.zip -d flickr8k


In [None]:
import string

image_captions = {}
with open('/content/flickr8k/captions.txt', 'r') as f:
    line = f.readline()
    while line:
        image, caption = line.split('.jpg,')
        image = image + '.jpg'
        caption = caption.strip().lower()[:-1].strip()
        cleaned = caption.translate(str.maketrans('', '', string.punctuation))
        caption = cleaned.strip()
        caption = '<start>' + caption + '<end>'

        if image not in image_captions.keys():
            image_captions[image] = []
        image_captions[image].append(caption)
        line = f.readline()

print(image_captions)

In [None]:
import json

# save
with open("image_captions.json", "w") as f:
    json.dump(image_captions, f)

# load
with open("image_captions.json", "r") as f:
    image_captions = json.load(f)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!mv /content/flickr8k /content/drive/MyDrive/datasets/flickr8k

In [None]:
!mv /content/image_captions.json /content/drive/MyDrive/datasets/flickr8k

In [None]:
import random

images = list(image_captions.keys())
random.shuffle(images)
print(images)

In [None]:
num_images = len(images)
print(num_images)
train_size = int(0.8 * num_images)
val_size = int(0.1 * num_images)
test_size = num_images - train_size - val_size


8091


In [None]:
train_images, val_images, test_images = images[:train_size], images[train_size:train_size+val_size], images[train_size+val_size:]

In [None]:
import os

base_dir = "/content/drive/MyDrive/datasets/flickr8k/flickr8k/"
for split in ["train", "val", "test"]:
        os.makedirs(os.path.join(base_dir, split), exist_ok=True)

In [None]:
import shutil

original_images_dir = "/content/drive/MyDrive/datasets/flickr8k/flickr8k/Images"

# Function to copy images to split folder
def copy_images(image_list, split_name):
        for img_name in image_list:
                src = os.path.join(original_images_dir, img_name)
                dst = os.path.join(base_dir, split_name, img_name)
                shutil.copy(src, dst)

# Copy images
copy_images(train_images, "train")
copy_images(val_images, "val")
copy_images(test_images, "test")

In [None]:
import os
base_dir = "/content/drive/MyDrive/datasets/flickr8k/flickr8k/"

print(len(os.listdir(os.path.join(base_dir, "train"))))
print(len(os.listdir(os.path.join(base_dir, "val"))))
print(len(os.listdir(os.path.join(base_dir, "test"))))


6472
809
810


In [None]:
!mv /content/drive/MyDrive/datasets/flickr8k/flickr8k/test /content/drive/MyDrive/datasets/flickr8k/flickr8k/split/test

In [None]:
!mv /content/drive/MyDrive/datasets/flickr8k/flickr8k/train /content/drive/MyDrive/datasets/flickr8k/flickr8k/split/train
!mv /content/drive/MyDrive/datasets/flickr8k/flickr8k/val /content/drive/MyDrive/datasets/flickr8k/flickr8k/split/val

In [None]:
import json

with open("/content/drive/MyDrive/datasets/flickr8k/image_captions.json", "r") as f:
    image_captions = json.load(f)

In [None]:
import os
train_images = os.listdir(os.path.join(base_dir, "train"))
val_images     = os.listdir(os.path.join(base_dir, "val"))
test_images    = os.listdir(os.path.join(base_dir, "test"))

In [None]:
train_captions = {img: image_captions[img] for img in train_images if img in image_captions}
val_captions     = {img: image_captions[img] for img in val_images if img in image_captions}
test_captions    = {img: image_captions[img] for img in test_images if img in image_captions}


In [None]:
print(len(train_captions))
print(len(val_captions))
print(len(test_captions))

6472
809
810


In [None]:
with open(base_dir+"/train_captions.json", "w") as f:
        json.dump(train_captions, f)

with open(base_dir+"/val_captions.json", "w") as f:
        json.dump(val_captions, f)

with open(base_dir+"/test_captions.json", "w") as f:
        json.dump(test_captions, f)

# Dataset preprocessing

In [None]:
base_dir = "/content/drive/MyDrive/datasets/flickr8k/flickr8k/split"

### Building vocab

In [None]:
import json
from collections import Counter

def build_vocab(captions_file, threshold):
    with open(captions_file, "r") as f:
        captions = json.load(f)

    counter = Counter()
    for captions in captions.values():
        for caption in captions:
            tokens = caption.split()
            counter.update(tokens)

    special_tokens = ["<pad>", "<start>", "<end>", "<unk>"]
    word2idx = {token:idx for idx, token in enumerate(special_tokens)}
    idx2word = {idx:token for idx, token in enumerate(special_tokens)}

    del counter["<start>"]
    del counter["<end>"]

    for token, count in counter.items():
        if count >= threshold:
            idx = len(word2idx)
            word2idx[token] = idx
            idx2word[idx] = token

    print(f"Vocabulary size: {len(word2idx)}")
    return word2idx, idx2word

In [None]:
import os

train_captions_file = os.path.join(base_dir, "train_captions.json")
word2idx, idx2word = build_vocab(train_captions_file, threshold=2)

Vocabulary size: 5032


In [None]:
print(word2idx)

In [None]:
print(idx2word)

In [None]:
!wget http://nlp.stanford.edu/data/glove.6B.zip
!unzip glove.6B.zip -d glove

--2025-09-30 05:28:20--  http://nlp.stanford.edu/data/glove.6B.zip
Resolving nlp.stanford.edu (nlp.stanford.edu)... 171.64.67.140
Connecting to nlp.stanford.edu (nlp.stanford.edu)|171.64.67.140|:80... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://nlp.stanford.edu/data/glove.6B.zip [following]
--2025-09-30 05:28:20--  https://nlp.stanford.edu/data/glove.6B.zip
Connecting to nlp.stanford.edu (nlp.stanford.edu)|171.64.67.140|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://downloads.cs.stanford.edu/nlp/data/glove.6B.zip [following]
--2025-09-30 05:28:20--  https://downloads.cs.stanford.edu/nlp/data/glove.6B.zip
Resolving downloads.cs.stanford.edu (downloads.cs.stanford.edu)... 171.64.64.22
Connecting to downloads.cs.stanford.edu (downloads.cs.stanford.edu)|171.64.64.22|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 862182613 (822M) [application/zip]
Saving to: ‘glove.6B.zip’


202

In [None]:
import numpy as np

def load_glove(path):
    embeddings = {}
    with open(path, encoding="utf-8") as f:
        for line in f:
            parts = line.split()
            word = parts[0]
            vector = np.array(parts[1:], dtype=np.float32)
            embeddings[word] = vector
    return embeddings

glove_embeddings = load_glove("/content/drive/MyDrive/datasets/flickr8k/flickr8k/glove/glove.6B.100d.txt")

### Building embedding matrix

In [None]:
embedding_dim = 100
vocab_size = len(word2idx)

embedding_matrix = np.zeros((vocab_size, embedding_dim))

for word, idx in word2idx.items():
    if word in glove_embeddings:
        embedding_matrix[idx] = glove_embeddings[word]
    else:
        embedding_matrix[idx] = np.random.normal(scale=0.6, size=(embedding_dim,))

In [None]:

embedding_matrix

In [None]:
np.save("/content/drive/MyDrive/datasets/flickr8k/flickr8k/embedding_matrix.npy", embedding_matrix)

In [None]:
def caption_to_seq(caption, word2idx, max_length):
    words = caption.split()
    seq = []

    for w in words:
        seq.append(word2idx.get(w, word2idx["<unk>"]))

    while(len(seq) < max_length):
        seq.append(word2idx["<pad>"])

    if(len(seq) > max_length):
        seq = seq[:max_length]

    return seq

In [None]:
max_length = 20

train_sequences = {}
for img_name, captions in train_captions.items():
    train_sequences[img_name] = [caption_to_seq(c, word2idx, max_length) for c in captions]

test_sequences = {}
for img_name, captions in test_captions.items():
    test_sequences[img_name] = [caption_to_seq(c, word2idx, max_length) for c in captions]

val_sequences = {}
for img_name, captions in val_captions.items():
    val_sequences[img_name] = [caption_to_seq(c, word2idx, max_length) for c in captions]

In [None]:
print(len(train_sequences))
print(len(test_sequences))
print(len(val_sequences))

6472
810
809


In [None]:
base_dir

'/content/drive/MyDrive/datasets/flickr8k/flickr8k/split'

In [None]:
with open(base_dir+"/train_sequences.json", "w") as f:
        json.dump(train_sequences, f)

with open(base_dir+"/val_sequences.json", "w") as f:
        json.dump(val_sequences, f)

with open(base_dir+"/test_sequences.json", "w") as f:
        json.dump(test_sequences, f)

In [None]:
print(train_sequences["3694093650_547259731e.jpg"])

[[1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 8, 13, 6, 4, 14, 2, 0, 0, 0, 0], [1, 4, 5, 6, 15, 16, 17, 18, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [1, 19, 15, 11, 8, 20, 21, 22, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [1, 19, 15, 8, 23, 24, 25, 26, 11, 8, 4, 27, 28, 2, 0, 0, 0, 0, 0, 0], [1, 19, 29, 7, 11, 30, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]


# CNN Encoder

In [None]:
import os
print(os.listdir(os.path.join(base_dir,"train")))

In [None]:
import os
import json
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torch

In [None]:
class ImageDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.dir = img_dir
        self.transform = transform
        self.img_list = os.listdir(self.dir)

        self.data = []
        for img_name in self.img_list:
            img_path = os.path.join(img_dir, img_name)
            self.data.append(img_path)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        img_path = self.data[index]
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, img_path

In [None]:
transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
train_dataset = ImageDataset(
        img_dir = os.path.join(base_dir, "train"),
        transform = transform
)

val_dataset = ImageDataset(
        img_dir = os.path.join(base_dir, "val"),
        transform = transform
)

test_dataset = ImageDataset(
        img_dir = os.path.join(base_dir, "test"),
        transform = transform
)

# Loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
print(train_loader)

<torch.utils.data.dataloader.DataLoader object at 0x7c4ef8d8b140>


In [None]:
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import os
import torch.nn as nn

In [None]:
model = models.resnet50(pretrained=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = model.to(device)

for param in model.parameters():
    param.requires_grad = False

model = nn.Sequential(*list(model.children())[:-1])
model.eval()

In [None]:
device

device(type='cuda')

In [None]:
from tqdm import tqdm

def extract_features(dataloader, model, device):
    model.eval()
    features = {}
    with torch.no_grad():
        for imgs, img_paths in tqdm(dataloader):
            imgs = imgs.to(device)
            outputs = model(imgs)
            outputs = outputs.squeeze()

            for i, path in enumerate(img_paths):
                features[path] = outputs[i].cpu().numpy()
    return features


In [None]:
train_features = extract_features(train_loader, model, device)

In [None]:
import pickle

with open("/content/drive/MyDrive/datasets/flickr8k/flickr8k/split/train_features.pkl", "wb") as f:
        pickle.dump(train_features, f)

In [None]:
val_features = extract_features(val_loader, model, device)
with open("/content/drive/MyDrive/datasets/flickr8k/flickr8k/split/val_features.pkl", "wb") as f:
        pickle.dump(val_features, f)

In [None]:
test_features = extract_features(test_loader, model, device)
with open("/content/drive/MyDrive/datasets/flickr8k/flickr8k/split/test_features.pkl", "wb") as f:
        pickle.dump(test_features, f)

# LSTM Decoder

In [None]:
base_dir = "/content/drive/MyDrive/datasets/flickr8k/flickr8k/split"

In [None]:
import os
import json

with open(os.path.join(base_dir, "train_sequences.json"), "r") as f:
    train_sequences = json.load(f)

with open(os.path.join(base_dir, "val_sequences.json"), "r") as f:
    val_sequences = json.load(f)

with open(os.path.join(base_dir, "test_sequences.json"), "r") as f:
    test_sequences = json.load(f)

In [None]:
import pickle

with open(os.path.join(base_dir, "train_features.pkl"), "rb") as f:
    train_features = pickle.load(f)

with open(os.path.join(base_dir, "val_features.pkl"), "rb") as f:
    val_features = pickle.load(f)

with open(os.path.join(base_dir, "test_features.pkl"), "rb") as f:
    test_features = pickle.load(f)

In [None]:
import os
import json
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

In [None]:
class CaptionDataset(Dataset):
    def __init__(self, sequences, features):
        self.sequences = sequences
        self.features = features

        self.data = []
        for img, seq in self.sequences.items():
            for s in seq:
                self.data.append((img, s))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        img_name, seqs = self.data[index]
        feature = self.features[img_name]
        return feature, seqs

In [None]:
train_dataset = CaptionDataset(
        sequences = train_sequences,
        features = train_features
)

val_dataset = CaptionDataset(
        sequences = val_sequences,
        features = val_features
)

test_dataset = CaptionDataset(
        sequences = test_sequences,
        features = test_features
)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [None]:
print(len(train_sequences))
print(len(train_features))

6472
6472


In [None]:
train_features["3039675864_0b7961844d.jpg"]

array([0.25472474, 1.0270143 , 0.23229678, ..., 0.5405816 , 0.9122872 ,
       0.54372734], dtype=float32)

In [None]:
train_sequences["3039675864_0b7961844d.jpg"]

[[1, 4, 122, 44, 4, 1076, 209, 19, 268, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [1,
  4,
  105,
  257,
  4,
  3,
  33,
  242,
  1154,
  50,
  4,
  1188,
  1005,
  120,
  2420,
  2,
  0,
  0,
  0,
  0],
 [1, 4, 261, 40, 2420, 4, 1094, 24, 4, 165, 50, 4, 1188, 2, 0, 0, 0, 0, 0, 0],
 [1, 19, 1123, 11, 8, 19, 261, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [1,
  241,
  165,
  133,
  68,
  4,
  972,
  40,
  4,
  880,
  6,
  2420,
  50,
  19,
  261,
  58,
  59,
  19,
  2,
  0]]

In [None]:
import cupy as cp
import numpy as np

In [None]:
class LSTMDecoder:
    def __init__(self, embedding_dim, hidden_dim, img_dim, vocab_size, embedding_matrix, lr):

        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.input_dim = embedding_dim + hidden_dim
        self.embedding_matrix = cp.array(embedding_matrix)
        self.img_dim = img_dim
        self.lr = lr
        self.vocab_size = vocab_size

        # Adam Optimizer parameters
        self.beta_1 = 0.9
        self.beta_2 = 0.999
        self.epsilon = 1e-8
        self.m = {}
        self.v = {}
        self.t = 0


        # CNN features to initial hidden state
        self.W_h = cp.random.randn(self.img_dim, self.hidden_dim) * 0.01
        self.b_h = cp.zeros((self.hidden_dim,))

        # CNN features to initial cell state
        self.W_c = cp.random.randn(self.img_dim, self.hidden_dim) * 0.01
        self.b_c = cp.zeros((self.hidden_dim,))

        # LSTM parameters
        # Forget Gate
        self.W_f = cp.random.randn(self.input_dim, self.hidden_dim) * 0.01
        self.b_f = cp.ones((self.hidden_dim,))

        # Input Gate
        self.W_i = cp.random.randn(self.input_dim, self.hidden_dim) * 0.01
        self.b_i = cp.zeros((self.hidden_dim,))

        # Candidate cell
        self.W_cand = cp.random.randn(self.input_dim, self.hidden_dim) * 0.01
        self.b_cand = cp.zeros((self.hidden_dim,))

        # Output Gate
        self.W_o = cp.random.randn(self.input_dim, self.hidden_dim) * 0.01
        self.b_o = cp.zeros((self.hidden_dim,))

        # Output projections
        self.W_out = cp.random.randn(self.hidden_dim, vocab_size) * 0.01
        self.b_out = cp.zeros((vocab_size,))

        self.initialize_adam()

    def initialize_adam(self):
        for name in ['embedding_matrix', 'W_h', 'b_h', 'W_c', 'b_c',
                     'W_f', 'b_f', 'W_i', 'b_i', 'W_cand', 'b_cand',
                     'W_o', 'b_o', 'W_out', 'b_out']:
            weight = getattr(self, name)
            self.m[name] = cp.zeros_like(weight)
            self.v[name] = cp.zeros_like(weight)


    def sigmoid(self, x):
        return cp.where(x >= 0,
                1 / (1 + cp.exp(-x)),
                cp.exp(x) / (1 + cp.exp(x)))

    def tanh(self, x):
        return cp.tanh(x)

    def softmax(self, x):
        exp_x = cp.exp(x - cp.max(x, axis=-1, keepdims=True))
        return exp_x / cp.sum(exp_x, axis=-1, keepdims=True)

    def forward_step(self, x_batch, h_prev_batch, c_prev_batch):
        batch_size = x_batch.shape[0]
        hidden_size = h_prev_batch.shape[1]

        # Concatenate input and previous hidden states
        concat = cp.concatenate([x_batch, h_prev_batch], axis=1)  # (batch_size, input_dim)

        # Compute gates
        f = self.sigmoid(concat @ self.W_f + self.b_f)   # (batch_size, hidden size)
        i = self.sigmoid(concat @ self.W_i + self.b_i)
        cand_c = self.tanh(concat @ self.W_cand + self.b_cand)
        o = self.sigmoid(concat @ self.W_o + self.b_o)

        # Update cell state
        c_next = f * c_prev_batch + i * cand_c  # (batch_size, hidden_dim)

        # Update hidden state
        h_next = o * self.tanh(c_next)

        # Cache for backprop
        cache = {
            'x': x_batch, 'h_prev': h_prev_batch, 'c_prev': c_prev_batch,
            'concat': concat, 'f': f, 'i': i, 'cand_c': cand_c,
            'o': o, 'c_next': c_next, 'h_next': h_next
        }

        return c_next, h_next, cache

    def forward(self, img_features_batch, cap_seq_batch, train=True):
        h = self.tanh(cp.dot(img_features_batch, self.W_h) + self.b_h)
        c = self.tanh(cp.dot(img_features_batch, self.W_c) + self.b_c)

        batch_size, seq_len = cap_seq_batch.shape
        seq_len -= 1

        outputs = []
        caches = [] if train else None
        loss = 0

        for t in range(seq_len):
            word_indices = cap_seq_batch[:, t]
            x = self.embedding_matrix[word_indices]

            c, h, cache = self.forward_step(x, h, c)
            logits = h @ self.W_out + self.b_out
            probs = self.softmax(logits)
            outputs.append(probs)

            if train:
                caches.append(cache)
                targets = cap_seq_batch[:, t+1].astype(cp.int32)
                batch_indices = cp.arange(batch_size)
                PAD_TOKEN_IDX = 0
                mask = (cap_seq_batch[:, t] != PAD_TOKEN_IDX).astype(cp.float32)
                loss += -cp.sum(cp.log(probs[batch_indices, targets]))

        outputs = cp.stack(outputs, axis=1)  # (batch_size, seq_len, vocab_size)

        if train:
            loss /= (batch_size * seq_len)
            return outputs, loss, caches
        else:
            return outputs

    def backward_step(self, dh_next, dc_next, cache):
        x = cache["x"]
        h_prev = cache['h_prev']
        c_prev = cache['c_prev']
        concat = cache['concat']
        f = cache['f']
        i = cache['i']
        o = cache['o']
        cand_c = cache['cand_c']
        h_next = cache['h_next']
        c_next = cache['c_next']

        batch_size = x.shape[0]

        # Gradient through hidden state
        do = dh_next * self.tanh(c_next)
        dc_next = dc_next + dh_next * o * (1 - self.tanh(c_next) ** 2)

        # Gradient through cell state
        dcand_c = dc_next * i
        di = dc_next * cand_c
        df = dc_next * c_prev
        dc_prev = dc_next * f

        # Gradient through gates
        do_input = do * o * (1 - o)
        di_input = di * i * (1 - i)
        df_input = df * f * (1 - f)
        dcand_c_input = dcand_c * (1- cand_c ** 2)

        # Gradient for weights
        dW_o = concat.T @ do_input
        dW_i = concat.T @ di_input
        dW_f = concat.T @ df_input
        dW_cand = concat.T @ dcand_c_input

        # Gradient for biases
        db_o = cp.sum(do_input, axis=0)
        db_i = cp.sum(di_input, axis=0)
        db_f = cp.sum(df_input, axis=0)
        db_cand = cp.sum(dcand_c_input, axis=0)

        # Gradient wrt concatanated input
        dconcat = (do_input @ self.W_o.T +
                   di_input @ self.W_i.T +
                   df_input @ self.W_f.T +
                   dcand_c_input @ self.W_cand.T)

        # Split concatanated gradient
        dx = dconcat[:, :self.embedding_dim]
        dh_prev = dconcat[:, self.embedding_dim:]

        grads = {
            'W_o': dW_o, 'b_o': db_o,
            'W_i': dW_i, 'b_i': db_i,
            'W_f': dW_f, 'b_f': db_f,
            'W_cand': dW_cand, 'b_cand': db_cand
        }

        return dx, dh_prev, dc_prev, grads

    def backward(self, img_features_batch, cap_seq_batch, outputs, caches):
        batch_size, seq_len = cap_seq_batch.shape
        seq_len -= 1

        grads = {
            'embedding_matrix': cp.zeros_like(self.embedding_matrix),
            'W_h': cp.zeros_like(self.W_h),
            'b_h': cp.zeros_like(self.b_h),
            'W_c': cp.zeros_like(self.W_c),
            'b_c': cp.zeros_like(self.b_c),
            'W_f': cp.zeros_like(self.W_f),
            'b_f': cp.zeros_like(self.b_f),
            'W_i': cp.zeros_like(self.W_i),
            'b_i': cp.zeros_like(self.b_i),
            'W_cand': cp.zeros_like(self.W_cand),
            'b_cand': cp.zeros_like(self.b_cand),
            'W_o': cp.zeros_like(self.W_o),
            'b_o': cp.zeros_like(self.b_o),
            'W_out': cp.zeros_like(self.W_out),
            'b_out': cp.zeros_like(self.b_out)
        }

        dh_next = cp.zeros((self.hidden_dim,))
        dc_next = cp.zeros((self.hidden_dim,))

        # Backpropagate through time
        for t in reversed(range(seq_len)):
            # Gradients from output layer
            dprobs = outputs[:, t, :].copy()
            target = cap_seq_batch[:, t+1]
            batch_indices = cp.arange(batch_size)
            dprobs[batch_indices, target] -= 1
            dprobs /= (batch_size * seq_len)

            # Gradient from output
            h = caches[t]["h_next"]
            grads["W_out"] += h.T @ dprobs
            grads["b_out"] += cp.sum(dprobs, axis=0)
            dh = dprobs @ self.W_out.T + dh_next

            # Gradient through backprop
            dx, dh_next, dc_next, step_grads = self.backward_step(dh, dc_next, caches[t])

            # Accumulate gradients
            for key in step_grads.keys():
                grads[key] += step_grads[key]

        # Gradient through initial state projection
        h0 = self.tanh(img_features_batch @ self.W_h + self.b_h)
        dh0 = dh_next * (1 - h0 ** 2)
        grads["W_h"] += img_features_batch.T @ dh0
        grads["b_h"] += cp.sum(dh0, axis=0)

        c0 = self.tanh(img_features_batch @ self.W_c + self.b_c)
        dc0 = dc_next * (1 - c0**2)
        grads['W_c'] = img_features_batch.T @ dc0
        grads['b_c'] = cp.sum(dc0, axis=0)

        # Gradient clipping
        for key in grads:
            grads[key] = cp.clip(grads[key], -5, 5)

        return grads

    def update_weights(self, grads):
        self.W_f -= self.lr * grads['W_f']
        self.b_f -= self.lr * grads['b_f']
        self.W_i -= self.lr * grads['W_i']
        self.b_i -= self.lr * grads['b_i']
        self.W_cand -= self.lr * grads['W_cand']
        self.b_cand -= self.lr * grads['b_cand']
        self.W_o -= self.lr * grads['W_o']
        self.b_o -= self.lr * grads['b_o']
        self.W_out -= self.lr * grads['W_out']
        self.b_out -= self.lr * grads['b_out']

    def update_weights_adam(self, grads):
        self.t += 1

        for name in grads.keys():
            self.m[name] = self.beta_1 * self.m[name] + (1 - self.beta_1) * grads[name]
            self.v[name] = self.beta_2 * self.v[name] + (1 - self.beta_2) * grads[name] ** 2

            m_hat = self.m[name] / (1 - self.beta_1 ** self.t)
            v_hat = self.v[name] / (1 - self.beta_2 ** self.t)

            weight = getattr(self, name)
            weight -= self.lr * m_hat / (cp.sqrt(v_hat) + self.epsilon)
            setattr(self, name, weight)


    def train_step(self, img_features_batch, cap_seq_batch):
        # Forward pass
        outputs, loss, caches = self.forward(img_features_batch, cap_seq_batch)

        # Backward pass
        grads = self.backward(img_features_batch, cap_seq_batch, outputs, caches)

        # Update weights
        self.update_weights_adam(grads)

        return loss

    def predict(self, img_features, idx2word, max_len=20, start_idx=1, end_idx=2):
        # Initialize states
        h = self.tanh(img_features @ self.W_h + self.b_h)
        c = self.tanh(img_features @ self.W_c + self.b_c)

        caption = []

        current_word_idx = start_idx

        for _ in range(max_len):
            # Get embedding
            x = self.embedding_matrix[current_word_idx]

            # Add batch dimension for compatibility
            x_batch = x[cp.newaxis, :]
            h_batch = h[cp.newaxis, :]
            c_batch = c[cp.newaxis, :]

            # Forward step
            c, h, _ = self.forward_step(x_batch, h_batch, c_batch)

            # Remove dimensions
            h = h.squeeze()
            c = c.squeeze()

            # Compute logits and probabilities
            logits = h @ self.W_out + self.b_out
            probs = self.softmax(logits)
            # print(probs)
            # exit()
            # print(probs, probs.shape, type(probs))

            # Sample next word
            current_word_idx = cp.argmax(probs)
            # print(current_word_idx)

            if current_word_idx == end_idx:
                break

            caption.append(idx2word[str(current_word_idx)])

        return " ".join(caption)

    def save_model(self, path):
        weights = {
            'embedding_matrix': cp.asnumpy(self.embedding_matrix),
            'W_h': cp.asnumpy(self.W_h),
            'b_h': cp.asnumpy(self.b_h),
            'W_c': cp.asnumpy(self.W_c),
            'b_c': cp.asnumpy(self.b_c),
            'W_f': cp.asnumpy(self.W_f),
            'b_f': cp.asnumpy(self.b_f),
            'W_i': cp.asnumpy(self.W_i),
            'b_i': cp.asnumpy(self.b_i),
            'W_cand': cp.asnumpy(self.W_cand),
            'b_cand': cp.asnumpy(self.b_cand),
            'W_o': cp.asnumpy(self.W_o),
            'b_o': cp.asnumpy(self.b_o),
            'W_out': cp.asnumpy(self.W_out),
            'b_out': cp.asnumpy(self.b_out),
            'config': {
                'vocab_size': self.vocab_size,
                'embedding_dim': self.embedding_dim,
                'hidden_dim': self.hidden_dim,
                'img_dim': self.img_dim
            }
        }
        np.savez(path, **weights)
        print(f"Model saved to {path}")

    def load_model(self, filepath):
        data = np.load(filepath, allow_pickle=True)
        self.embedding_matrix = cp.array(data['embedding_matrix'])
        self.W_h = cp.array(data['W_h'])
        self.b_h = cp.array(data['b_h'])
        self.W_c = cp.array(data['W_c'])
        self.b_c = cp.array(data['b_c'])
        self.W_f = cp.array(data['W_f'])
        self.b_f = cp.array(data['b_f'])
        self.W_i = cp.array(data['W_i'])
        self.b_i = cp.array(data['b_i'])
        self.W_cand = cp.array(data['W_cand'])
        self.b_cand = cp.array(data['b_cand'])
        self.W_o = cp.array(data['W_o'])
        self.b_o = cp.array(data['b_o'])
        self.W_out = cp.array(data['W_out'])
        self.b_out = cp.array(data['b_out'])
        print(f"Model loaded from {filepath}")


In [None]:
embedding_matrix = np.load("/content/drive/MyDrive/datasets/flickr8k/flickr8k/embedding_matrix.npy")
embedding_matrix

In [None]:
embedding_dim = embedding_matrix.shape[1]

In [None]:
vocab_size = embedding_matrix.shape[0]

In [None]:
embedding_dim, vocab_size

(100, 5032)

In [None]:
decoder = LSTMDecoder(embedding_dim, 512, 2048, vocab_size, embedding_matrix, lr=0.001)

In [None]:
import torch

In [None]:
import cupy as cp
import numpy as np
import time


NUM_EPOCHS = 10
PAD_TOKEN_IDX = 0
PRINT_EVERY = 20


print("\n" + "=" * 70)
print("TRAINING")
print("=" * 70)

best_val_loss = float('inf')
start_time = time.time()

count = 5
for epoch in range(NUM_EPOCHS):
    print(f"\nEpoch {epoch + 1}/{NUM_EPOCHS}")
    if epoch % 3 == 0:
        decoder.lr /= 2
    print("-" * 70)

    epoch_loss = 0.0
    num_batches = 0

    # Training
    for batch_idx, (features_batch, sequences_batch) in enumerate(train_loader):
        features_batch = cp.array(features_batch.numpy(), dtype=cp.float32)
        sequences_batch = torch.stack(sequences_batch)
        sequences_batch = sequences_batch.T
        sequences_batch = cp.array(sequences_batch.numpy(), dtype=cp.int32)


        # Training step
        loss = decoder.train_step(features_batch, sequences_batch)
        epoch_loss += loss
        num_batches += 1

        # Print progress
        if (batch_idx + 1) % PRINT_EVERY == 0:
            avg_loss = epoch_loss / num_batches
            elapsed = time.time() - start_time
            print(f"  Batch {batch_idx+1}/{len(train_loader)} - Loss: {avg_loss:.4f} - Time: {elapsed/60:.1f}min")

    # Calculate average training loss
    avg_train_loss = epoch_loss / num_batches

    # Validation
    print("\n  Evaluating on validation set...")
    val_loss = 0.0
    val_batches = 0

    for features_batch, sequences_batch in val_loader:
        features_batch = cp.array(features_batch.numpy(), dtype=cp.float32)
        sequences_batch = torch.stack(sequences_batch)
        sequences_batch = sequences_batch.T
        sequences_batch = cp.array(sequences_batch.numpy(), dtype=cp.int32)

        outputs, loss, _ = decoder.forward(features_batch, sequences_batch, train=True)
        val_loss += float(cp.asnumpy(loss))
        val_batches += 1

    avg_val_loss = val_loss / val_batches

    # Print epoch summary
    print(f"\n  Epoch {epoch + 1} Summary:")
    print(f"    Train Loss: {avg_train_loss:.4f}")
    print(f"    Val Loss:   {avg_val_loss:.4f}")

    # Save best model
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        decoder.save_model(f'/content/drive/MyDrive/datasets/flickr8k/flickr8k/saved_models/best_model{count}.npz')
        print(f"  ✓ Best model saved! (Val Loss: {avg_val_loss:.4f})")
        count+=1

print("\n" + "=" * 70)
print("TRAINING COMPLETE!")
print("=" * 70)
print(f"Best validation loss: {best_val_loss:.4f}")
print(f"Total time: {(time.time() - start_time)/60:.1f} minutes")

In [None]:
len(train_loader.dataset)

In [None]:
base_dir

'/content/drive/MyDrive/datasets/flickr8k/flickr8k/split'

In [None]:
# Loading saved model for further training
decoder = LSTMDecoder(embedding_dim, 512, 2048, vocab_size, embedding_matrix, lr=0.001)
decoder.load_model("/content/drive/MyDrive/datasets/flickr8k/flickr8k/saved_models/best_model5.npz")



Model loaded from /content/drive/MyDrive/datasets/flickr8k/flickr8k/saved_models/best_model5.npz


#Testing

In [None]:
with open("/content/drive/MyDrive/datasets/flickr8k/flickr8k/idx2word.json") as f:
    idx2word = json.load(f)

In [None]:
test_decoder = LSTMDecoder(embedding_dim, 512, 2048, vocab_size, embedding_matrix, lr=0.001)
test_decoder.load_model("/content/drive/MyDrive/datasets/flickr8k/flickr8k/saved_models/best_model7.npz")

Model loaded from /content/drive/MyDrive/datasets/flickr8k/flickr8k/saved_models/best_model7.npz


In [None]:
generated_captions = []
START_TOKEN_IDX = 1
END_TOKEN_IDX = 2
for i, (img, features)  in enumerate(test_features.items()):
    # Convert to CuPy
    features = cp.array(features, dtype=cp.float32)

    # Generate caption
    caption = test_decoder.predict(
        features,
        idx2word
    )

    generated_captions.append(caption)

    # Print progress
    if (i + 1) % 100 == 0:
        print(f"Generated {i+1}/{len(test_features)} captions...")

print(f"✓ Generated all {len(generated_captions)} captions")

print("\n" + "="*70)
print("SAMPLE GENERATED CAPTIONS")
print("="*70)

for i in range(10):
    print(f"\nImage {i+1}:")
    print(f"  Generated: {generated_captions[i]}")
    print("-" * 70)



Generated 100/810 captions...
Generated 200/810 captions...
Generated 300/810 captions...
Generated 400/810 captions...
Generated 500/810 captions...
Generated 600/810 captions...
Generated 700/810 captions...
Generated 800/810 captions...
✓ Generated all 810 captions

SAMPLE GENERATED CAPTIONS

Image 1:
  Generated: a woman in a black shirt is standing on a bench in front of a building
----------------------------------------------------------------------

Image 2:
  Generated: a black dog is running through the grass
----------------------------------------------------------------------

Image 3:
  Generated: a boy in a red shirt is jumping down a wooden wall
----------------------------------------------------------------------

Image 4:
  Generated: a dog is running through a grassy area
----------------------------------------------------------------------

Image 5:
  Generated: a person in a boat is standing on a rocky beach
-------------------------------------------------------

In [None]:
def generate_caption(image):
    features = cp.array(test_features[image], dtype = cp.float32)

    caption = test_decoder.predict(
        features,
        idx2word
    )

    return caption

In [None]:
generate_caption("1019077836_6fc9b15408.jpg")

'a brown dog is running through a field'