Emily, AJ, &
Assignment 2
CSC 561 - Professor Alvarez
4/18/2025

In [1]:
pip install scikit-learn


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [8]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import os
from collections import Counter, defaultdict
import json
import nltk
from nltk.tokenize import word_tokenize
import pickle
import sklearn
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import numpy as np

In [3]:
# Set NLTK data path and download necessary files
nltk_data_path = '/home/emily_light_uri_edu/nltk_data'
nltk.download('punkt', download_dir=nltk_data_path)
nltk.data.path.append(nltk_data_path)

# Load captions from file
caption_file = "/work/pi_csc561_uri_edu/datasets/flickr/training-captions.json"
caption_dict = defaultdict(list)

# Read caption data
with open(caption_file, 'r') as f:
    data = json.load(f)

# Store captions by image
for img_id, captions in data.items():
    for caption in captions:
        caption_dict[img_id].append(caption)
        
for img_id, captions in list(data.items())[:5]:
    print(f"Image ID: {img_id}, Captions: {captions}")

Image ID: 1000268201_693b08cb0e.jpg, Captions: ['A child in a pink dress is climbing up a set of stairs in an entry way .\n', 'A girl going into a wooden building .\n', 'A little girl climbing into a wooden playhouse .\n', 'A little girl climbing the stairs to her playhouse .\n', 'A little girl in a pink dress going into a wooden cabin .\n']
Image ID: 1002674143_1b742ab4b8.jpg, Captions: ['A little girl covered in paint sits in front of a painted rainbow with her hands in a bowl .\n', 'A little girl is sitting in front of a large painted rainbow .\n', 'A small girl in the grass plays with fingerpaints in front of a white canvas with a rainbow on it .\n', 'There is a girl with pigtails sitting in front of a rainbow painting .\n', 'Young girl with pigtails painting outside in the grass .\n']
Image ID: 1003163366_44323f5815.jpg, Captions: ['A man lays on a bench while his dog sits by him .\n', 'A man lays on the bench to which a white dog is also tied .\n', 'a man sleeping on a bench outs

[nltk_data] Downloading package punkt to
[nltk_data]     /home/emily_light_uri_edu/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [4]:
# Tokenize captions
tokens = []
for img_id, captions in caption_dict.items():
    for caption in captions:
        tokens += word_tokenize(caption.lower(), language='english')

# Build vocabulary
vocab = Counter(tokens)

# Filter words with frequency less than 5
min_freq = 5
filtered_vocab = {word for word, count in vocab.items() if count >= min_freq}

# Create word2idx and idx2word dictionaries
word2idx = {
    '<pad>': 0,
    '<start>': 1,
    '<end>': 2,
    '<unk>': 3,
}

for word in sorted(filtered_vocab):
    word2idx[word] = len(word2idx)

idx2word = {idx: word for word, idx in word2idx.items()}

# Save vocab
import pickle
with open('vocab.pkl', 'wb') as f:
    pickle.dump(word2idx, f)

print(f"Vocabulary size: {len(word2idx)}")

Vocabulary size: 2722


In [5]:
import random

# Load image features
train_features = torch.load("train_features.pt")

# Get all image IDs with captions
all_ids = list(caption_dict.keys())

# Shuffle and split
random.seed(42)
random.shuffle(all_ids)
split_idx = int(0.9 * len(all_ids))
train_ids = all_ids[:split_idx]
val_ids = all_ids[split_idx:]

# Find captioned images without features
missing_keys = [k for k in caption_dict if k not in train_features]


In [6]:
class CaptionDataset(torch.utils.data.Dataset):
    def __init__(self, features, captions, word2idx, max_len=30):
        self.features = features
        self.captions = captions
        self.word2idx = word2idx
        self.max_len = max_len
        self.keys = list(captions.keys())  # List of image IDs

    def __len__(self):
        return len(self.keys)

    def __getitem__(self, idx):
        img_id = self.keys[idx]  # Get image ID from the list
        img_feat = torch.tensor(self.features[img_id], dtype=torch.float32)

        # Choose the first caption or a random one (if multiple)
        caption_text = self.captions[img_id][0].lower()
        tokens = word_tokenize(caption_text)

        # Add start/end and map to indices
        caption_idx = [self.word2idx['<start>']] + \
                      [self.word2idx.get(token, self.word2idx['<unk>']) for token in tokens] + \
                      [self.word2idx['<end>']]

        # Pad or truncate
        if len(caption_idx) < self.max_len:
            caption_idx += [self.word2idx['<pad>']] * (self.max_len - len(caption_idx))
        else:
            caption_idx = caption_idx[:self.max_len]

        caption_tensor = torch.tensor(caption_idx, dtype=torch.long)

        return img_feat, caption_tensor


In [9]:
# Load GloVe embeddings
def load_glove_embeddings(glove_file, word2idx, embed_dim=300):
    embedding_matrix = np.zeros((len(word2idx), embed_dim))
    with open(glove_file, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            if word in word2idx:
                embedding_matrix[word2idx[word]] = np.asarray(values[1:], dtype=np.float32)
    return torch.tensor(embedding_matrix)

# Path to your GloVe embeddings
glove_file = 'glove.6B.300d.txt'

# Assuming word2idx is already created and contains the vocabulary
pretrained_embedding_matrix = load_glove_embeddings(glove_file, word2idx, embed_dim=300)


In [10]:
# Exclude test images from train and val sets
valid_train_ids = [k for k in train_ids if k not in missing_keys]
valid_val_ids = [k for k in val_ids if k not in missing_keys]

# Create datasets using valid IDs
train_caps = {k: caption_dict[k] for k in valid_train_ids}
val_caps = {k: caption_dict[k] for k in valid_val_ids}
train_feats = {k: train_features[k] for k in valid_train_ids}
val_feats = {k: train_features[k] for k in valid_val_ids}

# Create dataset objects
train_dataset = CaptionDataset(train_feats, train_caps, word2idx)
val_dataset = CaptionDataset(val_feats, val_caps, word2idx)

# Create DataLoader
from torch.utils.data import DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(val_dataset)}")


Number of training samples: 6381
Number of validation samples: 710


In [11]:
import torch.optim as optim
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pad_idx = word2idx['<pad>']

def train_one_epoch(model, dataloader, criterion, optimizer, vocab_size):
    model.train()
    total_loss = 0

    for images, captions in dataloader:
        images = images.to(device)
        captions = captions.to(device)

        outputs = model(images, captions)  # shape: (batch, seq_len, vocab_size)

        outputs = outputs[:, :-1, :].reshape(-1, vocab_size)  # Remove the last token, flatten to [B*(T-1), V]
        targets = captions[:, 1:].reshape(-1)  # Remove the first token (shifted target), flatten to [B*(T-1)]
        non_pad = (targets != pad_idx).sum().item()

        loss = criterion(outputs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

def evaluate(model, dataloader, criterion, vocab_size):
    model.eval()
    total_loss = 0

    with torch.no_grad():
        for image_feats, captions in dataloader:
            image_feats = image_feats.to(device)
            captions = captions.to(device)

            outputs = model(image_feats, captions)
            outputs = outputs[:, :-1, :].reshape(-1, vocab_size)
            targets = captions[:, 1:].reshape(-1)

            loss = criterion(outputs, targets)
            total_loss += loss.item()

    return total_loss / len(dataloader)


In [17]:
class ImageCaptioningModel(nn.Module):
    def __init__(self, feature_dim, embed_dim, hidden_dim, vocab_size, pretrained_embedding_matrix=None):
        super(ImageCaptioningModel, self).__init__()
        
        # Initialize embedding layer with pretrained embeddings
        self.embed = nn.Embedding(vocab_size, embed_dim)
        if pretrained_embedding_matrix is not None:
            self.embed.weight.data.copy_(pretrained_embedding_matrix)
            self.embed.weight.requires_grad = False  # Freeze embeddings (optional)
        
        self.feat2hidden = nn.Linear(feature_dim, hidden_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, image_feats, captions):
        # Encode image features to initial hidden state
        h0 = self.feat2hidden(image_feats).unsqueeze(0)  # (1, batch, hidden_dim)
        c0 = torch.zeros_like(h0)  # Initial cell state (same shape as h0)
        
        # Embed captions
        embeddings = self.embed(captions)  # (batch, seq_len, embed_dim)
        
        # Decode
        outputs, _ = self.lstm(embeddings, (h0, c0))  # (batch, seq_len, hidden_dim)
        
        # Project to vocab
        outputs = self.fc(outputs)  # (batch, seq_len, vocab_size)
        return outputs



In [18]:
images, captions = next(iter(train_loader))
print("Caption example:", captions[0])
print("Decoded:", [idx2word[i.item()] for i in captions[0] if i.item() in idx2word])

for batch_idx, (images, captions) in enumerate(train_loader):
    print(f"Batch {batch_idx+1}:")
    print(f"Images batch shape: {images.shape}")  # Check the shape of the image tensor
    print(f"Captions batch shape: {captions.shape}")  # Check the shape of the caption tensor
    print("First caption in this batch:", captions[0])  # Print the first caption to inspect
    break  # Only view the first batch to avoid excessive printing


Caption example: tensor([  1,  18,  19, 306,   2,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0])
Decoded: ['<start>', '``', 'a', 'boy', '<end>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>']
Batch 1:
Images batch shape: torch.Size([32, 2048])
Captions batch shape: torch.Size([32, 30])
First caption in this batch: tensor([   1,   18,   19,  269, 1201, 1559, 2423, 2628,    2,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0])


  img_feat = torch.tensor(self.features[img_id], dtype=torch.float32)


In [None]:
import torch
import torch.optim as optim
import torch.nn as nn

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Model configuration
feature_dim = 2048
embed_dim = 300
hidden_dim = 512
vocab_size = len(word2idx)
pad_idx = word2idx["<pad>"]

# Initialize model, loss function, and optimizer
model = ImageCaptioningModel(feature_dim, embed_dim, hidden_dim, vocab_size, pretrained_embedding_matrix).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Training loop
num_epochs = 10
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Train for one epoch
    train_loss = train_one_epoch(model, train_loader, criterion, optimizer, vocab_size)
    
    # Evaluate on validation set
    val_loss = evaluate(model, val_loader, criterion, vocab_size)
    
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
    
    # Save the best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "best_model.pt")
        print("✅ Saved new best model.")


  img_feat = torch.tensor(self.features[img_id], dtype=torch.float32)


In [None]:
def generate_caption(model, image_feat, word2idx, idx2word, max_len=20):
    model.eval()
    caption = [word2idx["<start>"]]
    input_seq = torch.tensor([caption], dtype=torch.long).to(device)
    image_feat = image_feat.to(device).unsqueeze(0)  # Add batch dimension

    hidden = model.feat2hidden(image_feat).unsqueeze(0)  # (1, batch, hidden_dim)

    result = []

    for _ in range(max_len):
        emb = model.embed(input_seq[:, -1])  # Last generated word
        emb = emb.unsqueeze(1)  # (batch, 1, embed_dim)
        output, hidden = model.gru(emb, hidden)
        logits = model.fc(output.squeeze(1))  # (batch, vocab_size)
        next_token = logits.argmax(-1).item()

        if next_token == word2idx["<end>"]:
            break

        result.append(idx2word.get(next_token, "<unk>"))
        input_seq = torch.cat([input_seq, torch.tensor([[next_token]]).to(device)], dim=1)

    return " ".join(result)


In [None]:
with open("lm-predictions.txt", "w") as f:
    for image_id, feature in tqdm(test_features.items(), desc="Generating captions"):
        feature_tensor = torch.tensor(feature, dtype=torch.float32).to(device)
        caption = generate_caption(model, feature_tensor, word2idx, idx2word)
        f.write(f"{image_id}: {caption}\n")