<a href="https://colab.research.google.com/github/AshishRShetty/csce5218/blob/main/Baseline_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# GROUP 22 BASELINE MODEL
 ASHISH RATHNAKAR SHETTY (11808466)
 SIDDHIVINAYAK RAGHAVRAJU ()





This baseline model generates captions for Flickr30k images using a CNN encoder (based on ResNet18) and a decoder built with LSTM or LSTM with attention.
Images are preprocessed and captions are tokenized with BERT tokenizer.
The model is trained with scheduled sampling and mixed precision for efficiency, and uses beam search during inference to generate more accurate captions.
Performance is evaluated using BLEU-1 to BLEU-4 scores on a validation set.

 This cell sets up everything you need to get started. It will:
- Install required packages (Kaggle, NLTK, Transformers, and Timm)
- Prompt you to upload your Kaggle API credentials file (kaggle.json)
- Download the Flickr30k dataset

When prompted to upload your kaggle.json file, click the "Choose Files" button and select your credentials file. If you don't have one, you'll need to create a Kaggle account and generate API credentials from your account settings.


In [19]:
# Install Kaggle and Libraries
!pip install kaggle nltk transformers timm

# Upload kaggle.json manually
from google.colab import files
files.upload()

# Set Kaggle credentials
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Download Flickr30k Dataset
!kaggle datasets download -d hsankesara/flickr-image-dataset

# Unzip dataset
!unzip -q flickr-image-dataset.zip -d /content/flickr30k


Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->timm)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->timm)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch->timm)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch->timm)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch->timm)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch->timm)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch->tim

Saving kaggle.json to kaggle.json
Dataset URL: https://www.kaggle.com/datasets/hsankesara/flickr-image-dataset
License(s): CC0-1.0


You're importing all the libraries needed for data handling, model building, and evaluation.










In [20]:
import os
import json
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from nltk.translate.bleu_score import corpus_bleu
from transformers import BertTokenizer


You're creating a custom dataset class that loads images and their captions from Flickr30k and prepares them for model training.

In [21]:
class Flickr30kDataset(Dataset):
    def __init__(self, root_dir, captions_file, tokenizer, transform=None, max_length=50):
        self.root_dir = root_dir
        self.transform = transform
        self.tokenizer = tokenizer
        self.max_length = max_length

        # Load captions with correct column name
        self.captions_df = pd.read_csv(captions_file, delimiter='|')
        self.image_to_captions = {}

        for idx, row in self.captions_df.iterrows():
            image_name = row['image_name'].strip()
            caption = str(row[' comment']).strip()   # ✅ (space before comment)
            if image_name not in self.image_to_captions:
                self.image_to_captions[image_name] = []
            self.image_to_captions[image_name].append(caption)

        self.image_names = list(self.image_to_captions.keys())

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        img_name = self.image_names[idx]
        img_path = os.path.join(self.root_dir, img_name)

        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        captions = self.image_to_captions[img_name]
        caption = random.choice(captions)

        tokens = self.tokenizer.encode(
            caption,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        ).squeeze(0)

        return image, tokens


You're defining how each image should be resized, converted to a tensor, and normalized before feeding it into the model.

In [22]:
# Data transformations for images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])


You're loading the tokenizer, setting up the dataset and dataloaders, splitting into train and validation sets, and preparing your device (CPU or GPU) for training.

In [23]:
# Load tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Dataset paths
root_dir = '/content/flickr30k/flickr30k_images/flickr30k_images'  # careful double folder
captions_file = '/content/flickr30k/flickr30k_images/results.csv'

# Create dataset
dataset = Flickr30kDataset(
    root_dir=root_dir,
    captions_file=captions_file,
    tokenizer=tokenizer,
    transform=transform
)

# Train/Validation split
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=2)

# Device setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"✅ Data Loaded Successfully on {device}")


✅ Data Loaded Successfully on cpu


You're building an encoder using ResNet18 and a decoder using an LSTM, then initializing them and moving them to your device (CPU or GPU).

In [24]:
# Encoder CNN
class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet18(pretrained=True)
        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.linear = nn.Linear(resnet.fc.in_features, embed_size)
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)

    def forward(self, images):
        with torch.no_grad():
            features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = self.bn(self.linear(features))
        return features

# Decoder RNN
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, features, captions):
        embeddings = self.embed(captions)
        inputs = torch.cat((features.unsqueeze(1), embeddings), 1)
        hiddens, _ = self.lstm(inputs)
        outputs = self.linear(hiddens)
        return outputs

# Initialize Encoder and Decoder
embed_size = 256
hidden_size = 512
vocab_size = tokenizer.vocab_size

encoder = EncoderCNN(embed_size).to(device)
decoder = DecoderRNN(embed_size, hidden_size, vocab_size).to(device)


Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 174MB/s]


You're setting up the loss function and optimizer to train both the encoder and decoder together.










In [25]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
params = list(decoder.parameters()) + list(encoder.linear.parameters()) + list(encoder.bn.parameters())
optimizer = optim.Adam(params, lr=1e-3)


You're creating a folder called checkpoints to save your model files during training.

In [None]:
# Create the checkpoints folder if not exists
!mkdir -p /content/checkpoints


You're defining a function to generate a caption for a given image using the trained encoder and decoder models.

In [8]:
def generate_caption(encoder, decoder, image, tokenizer, max_length=50):
    encoder.eval()
    decoder.eval()

    with torch.no_grad():
        feature = encoder(image.unsqueeze(0).to(device))

        output_caption = [tokenizer.cls_token_id]  # Start token

        for _ in range(max_length):
            current_input = torch.tensor(output_caption).unsqueeze(0).to(device)  # [1, seq_len]
            outputs = decoder(feature, current_input)

            outputs = outputs.squeeze(0)
            predicted_id = outputs.argmax(1)[-1].item()

            output_caption.append(predicted_id)

            if predicted_id == tokenizer.sep_token_id:  # End token
                break

    caption = tokenizer.decode(output_caption, skip_special_tokens=True)
    return caption


You're creating an attention mechanism that helps the model focus on important parts of the image while generating each word.










In [None]:
class Attention(nn.Module):
    def __init__(self, feature_dim, decoder_dim, attention_dim):
        super(Attention, self).__init__()
        self.feature_attn = nn.Linear(feature_dim, attention_dim)
        self.hidden_attn = nn.Linear(decoder_dim, attention_dim)
        self.full_attn = nn.Linear(attention_dim, 1)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)

    def forward(self, features, hidden_state):
        att1 = self.feature_attn(features)          # (batch_size, num_pixels, attention_dim)
        att2 = self.hidden_attn(hidden_state)        # (batch_size, attention_dim)
        att2 = att2.unsqueeze(1)                     # (batch_size, 1, attention_dim)
        att = self.relu(att1 + att2)
        att = self.full_attn(att).squeeze(2)          # (batch_size, num_pixels)
        alpha = self.softmax(att)                    # (batch_size, num_pixels)
        context = (features * alpha.unsqueeze(2)).sum(dim=1)  # (batch_size, feature_dim)
        return context, alpha



You're building a decoder with attention that generates captions by dynamically focusing on different parts of the image at each word step.

In [None]:
class DecoderWithAttention(nn.Module):
    def __init__(self, attention_dim, embed_dim, decoder_dim, vocab_size, feature_dim=2048, dropout=0.5):
        super(DecoderWithAttention, self).__init__()
        self.attention = Attention(feature_dim, decoder_dim, attention_dim)
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.dropout = nn.Dropout(dropout)
        self.decode_step = nn.LSTMCell(embed_dim + feature_dim, decoder_dim, bias=True)
        self.init_h = nn.Linear(feature_dim, decoder_dim)
        self.init_c = nn.Linear(feature_dim, decoder_dim)
        self.f_beta = nn.Linear(decoder_dim, feature_dim)
        self.sigmoid = nn.Sigmoid()
        self.fc = nn.Linear(decoder_dim, vocab_size)
        self.init_weights()

    def init_weights(self):
        self.embedding.weight.data.uniform_(-0.1, 0.1)
        self.fc.bias.data.fill_(0)
        self.fc.weight.data.uniform_(-0.1, 0.1)

    def init_hidden_state(self, features):
        mean_features = features.mean(dim=1)
        h = self.init_h(mean_features)
        c = self.init_c(mean_features)
        return h, c

    def forward(self, features, captions, caption_lengths):
        embeddings = self.embedding(captions)
        h, c = self.init_hidden_state(features)

        decode_lengths = caption_lengths - 1  # ✅ Fixed
        max_decode_length = decode_lengths.max().item()

        batch_size = features.size(0)
        vocab_size = self.fc.out_features

        predictions = torch.zeros(batch_size, max_decode_length, vocab_size).to(features.device)
        alphas = torch.zeros(batch_size, max_decode_length, features.size(1)).to(features.device)

        for t in range(max_decode_length):
            batch_size_t = sum([l > t for l in decode_lengths])

            context, alpha = self.attention(features[:batch_size_t], h[:batch_size_t])

            gate = self.sigmoid(self.f_beta(h[:batch_size_t]))  # gating scalar
            context = gate * context

            lstm_input = torch.cat([embeddings[:batch_size_t, t, :], context], dim=1)
            h, c = self.decode_step(lstm_input, (h[:batch_size_t], c[:batch_size_t]))

            preds = self.fc(self.dropout(h))
            predictions[:batch_size_t, t, :] = preds
            alphas[:batch_size_t, t, :] = alpha

        return predictions, decode_lengths, alphas


You're initializing the encoder and the new attention-based decoder models and moving them to your device (CPU or GPU).

In [None]:
# Initialize models
encoder = EncoderCNN().to(device)
decoder = DecoderWithAttention(
    attention_dim=512,
    embed_dim=256,
    decoder_dim=512,
    vocab_size=vocab_size
).to(device)


You're setting up the loss function, optimizer, and gradient scaler (for mixed precision training) to prepare for model training.

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
optimizer = optim.Adam(list(decoder.parameters()) + list(encoder.parameters()), lr=1e-4)
scaler = GradScaler()



  scaler = GradScaler()


You're saving the trained encoder, decoder, and optimizer states into a checkpoint file for future loading or fine-tuning.

In [None]:
# Save final model
torch.save({
    'encoder_state_dict': encoder.state_dict(),
    'decoder_state_dict': decoder.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
}, '/content/best_checkpoint.pth')

print("✅ Model saved successfully!")


✅ Model saved successfully!



You're defining a function to generate captions using beam search, which picks the best possible sequence of words instead of just the most likely next word at each step.

In [None]:
import torch
import torch.nn.functional as F

def generate_caption_beam_search(encoder, decoder, image, tokenizer, beam_size=5, max_len=50, device='cuda'):
    k = beam_size
    vocab_size = tokenizer.vocab_size

    # Encode
    encoder_out = encoder(image.unsqueeze(0).to(device))  # (1, feature_dim)
    encoder_out = encoder_out.expand(k, encoder_out.size(1))  # (k, feature_dim)

    # Initialize beams
    seqs = torch.full((k, 1), tokenizer.cls_token_id, dtype=torch.long).to(device)  # CLS token
    top_k_scores = torch.zeros(k, 1).to(device)

    complete_seqs = []
    complete_seqs_scores = []

    # Initialize hidden states
    h, c = decoder.init_hidden_state(encoder_out)

    for step in range(max_len):
        embeddings = decoder.embedding(seqs[:, -1])  # (k, embed_dim)
        context, alpha = decoder.attention(encoder_out, h)  # (k, feature_dim)

        gate = decoder.sigmoid(decoder.f_beta(h))  # gating scalar
        context = gate * context

        lstm_input = torch.cat([embeddings, context], dim=1)
        h, c = decoder.decode_step(lstm_input, (h, c))

        preds = decoder.fc(h)  # (k, vocab_size)
        preds = F.log_softmax(preds, dim=1)

        # Add log probabilities
        scores = top_k_scores.expand_as(preds) + preds  # (k, vocab_size)

        # Flatten for beam search
        if step == 0:
            top_k_scores, top_k_words = scores[0].topk(k, 0)
        else:
            top_k_scores, top_k_words = scores.view(-1).topk(k, 0)

        prev_word_inds = top_k_words // vocab_size  # which beam
        next_word_inds = top_k_words % vocab_size  # actual word

        seqs = torch.cat([seqs[prev_word_inds], next_word_inds.unsqueeze(1)], dim=1)

        # Check if completed
        incomplete_inds = [ind for ind, next_word in enumerate(next_word_inds) if next_word != tokenizer.sep_token_id]
        complete_inds = list(set(range(len(next_word_inds))) - set(incomplete_inds))

        if len(complete_inds) > 0:
            complete_seqs.extend(seqs[complete_inds].tolist())
            complete_seqs_scores.extend(top_k_scores[complete_inds])

        k -= len(complete_inds)  # reduce beam size accordingly

        if k == 0:
            break

        # Proceed with incomplete sequences
        seqs = seqs[incomplete_inds]
        h = h[prev_word_inds[incomplete_inds]]
        c = c[prev_word_inds[incomplete_inds]]
        encoder_out = encoder_out[prev_word_inds[incomplete_inds]]
        top_k_scores = top_k_scores[incomplete_inds].unsqueeze(1)

    if len(complete_seqs) == 0:
        complete_seqs = seqs.tolist()
        complete_seqs_scores = top_k_scores.squeeze(1)

    i = torch.argmax(torch.tensor(complete_seqs_scores)).item()
    best_seq = complete_seqs[i]

    caption = tokenizer.decode(best_seq, skip_special_tokens=True)
    return caption


You're training the encoder-decoder model for 30 epochs, using mixed precision and scheduled sampling to gradually make the model rely less on ground-truth captions during training.

In [None]:
from torch.cuda.amp import GradScaler, autocast
import random
import torch

scaler = GradScaler()

scheduled_sampling_prob = 0.15  # 15%

for epoch in range(30):
    encoder.train()
    decoder.train()
    running_loss = 0.0

    for images, captions in train_loader:
        images = images.to(device)
        captions = captions.to(device)

        optimizer.zero_grad()

        with autocast(enabled=True):
            encoder_out = encoder(images)
            caption_lengths = (captions != tokenizer.pad_token_id).sum(dim=1)

            outputs, decode_lengths, alphas = decoder(encoder_out, captions, caption_lengths)

            targets = captions[:, 1:]  # Actual next tokens
            batch_size, max_len = targets.size()

            preds = outputs.clone()
            sampled_preds = outputs.argmax(2)

            # ✅ Make a clone of targets for Scheduled Sampling
            sampled_targets = targets.clone()

            for b in range(batch_size):
                decode_len = decode_lengths[b]
                for t in range(decode_len):
                    if t > 0 and random.random() < scheduled_sampling_prob:
                        sampled_targets[b, t] = sampled_preds[b, t-1]

            outputs = outputs[:, :max(decode_lengths), :].contiguous()
            outputs = outputs.view(-1, tokenizer.vocab_size)

            sampled_targets = sampled_targets[:, :max(decode_lengths)].contiguous()
            sampled_targets = sampled_targets.reshape(-1)

            loss = criterion(outputs, sampled_targets)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

    avg_loss = running_loss / len(train_loader)
    print(f"✅ Epoch [{epoch+1}/30], Loss: {avg_loss:.4f}")


  scaler = GradScaler()
  with autocast(enabled=True):


✅ Epoch [1/30], Loss: 4.3215
✅ Epoch [2/30], Loss: 4.2931
✅ Epoch [3/30], Loss: 4.2760
✅ Epoch [4/30], Loss: 4.2467
✅ Epoch [5/30], Loss: 4.2595
✅ Epoch [6/30], Loss: 4.2305
✅ Epoch [7/30], Loss: 4.2335
✅ Epoch [8/30], Loss: 4.2275
✅ Epoch [9/30], Loss: 4.2143
✅ Epoch [10/30], Loss: 4.2096
✅ Epoch [11/30], Loss: 4.1901
✅ Epoch [12/30], Loss: 4.1701
✅ Epoch [13/30], Loss: 4.1722
✅ Epoch [14/30], Loss: 4.1790
✅ Epoch [15/30], Loss: 4.1698
✅ Epoch [16/30], Loss: 4.1557
✅ Epoch [17/30], Loss: 4.1539
✅ Epoch [18/30], Loss: 4.1259
✅ Epoch [19/30], Loss: 4.1408
✅ Epoch [20/30], Loss: 4.1238
✅ Epoch [21/30], Loss: 4.1273
✅ Epoch [22/30], Loss: 4.1246
✅ Epoch [23/30], Loss: 4.1066
✅ Epoch [24/30], Loss: 4.0975
✅ Epoch [25/30], Loss: 4.0920
✅ Epoch [26/30], Loss: 4.0847
✅ Epoch [27/30], Loss: 4.0714
✅ Epoch [28/30], Loss: 4.0740
✅ Epoch [29/30], Loss: 4.0633
✅ Epoch [30/30], Loss: 4.0420


You're evaluating the model's caption quality on the validation set using BLEU scores (BLEU-1 to BLEU-4) and printing the results.

In [None]:
bleu1, bleu2, bleu3, bleu4 = evaluate_bleu(encoder, decoder, val_loader, tokenizer)

print(f"✅ BLEU-1: {bleu1:.4f}")
print(f"✅ BLEU-2: {bleu2:.4f}")
print(f"✅ BLEU-3: {bleu3:.4f}")
print(f"✅ BLEU-4: {bleu4:.4f}")



✅ BLEU-1: 0.1895
✅ BLEU-2: 0.0974
✅ BLEU-3: 0.0547
✅ BLEU-4: 0.0312
✅ BLEU-1: 0.1895
✅ BLEU-2: 0.0974
✅ BLEU-3: 0.0547
✅ BLEU-4: 0.0312
