# INFO2049 Image Captioning (NLP + Vision) - December 2025

- Simon Gardier s192580
- Lei Yang s201670
- Camille Trinh s192024

## Imports

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "6,7"
import ast
import json
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import matplotlib.pyplot as plt
import pandas as pd

from PIL import Image
from tqdm import tqdm
from textwrap import wrap
from torchvision import transforms, models
from transformers import AutoTokenizer
from torch.utils.data import Dataset


## Utils

In [None]:
def load_image(image_path):
    image = Image.open(image_path).convert('RGB')
    return image

def show_image(image, title=None):
    plt.imshow(image)
    if title:
        plt.title("\n".join(wrap(title, 60)))
    plt.axis('off')
    plt.show()

def show_image_with_label(df, path_image_folder):
    for i in range(len(df)):
        image_name = df.iloc[i]['image']
        caption = df.iloc[i]['caption']
        image_path = os.path.join(path_image_folder, image_name)
        image = load_image(image_path)
        show_image(image, title=caption)

def clean_dataset(df):
    df['raw'] = df['raw'].apply(lambda x: ast.literal_eval(x))
    new_df = df.explode('raw').reset_index(drop=True)
    new_df = new_df.rename(columns={'raw': 'caption','filename': 'image'})
    new_df = new_df.drop(columns=['sentids'])
    return new_df

def load_karpathy_dataframe(json_path, image_folder_path):
    """
    Parses the Karpathy JSON file and returns a Pandas DataFrame.
    """
    with open(json_path, 'r') as f:
        data = json.load(f)
    rows = []
    for img_item in data['images']:
        # 1. Get metadata
        filename = img_item['filename']
        split = img_item['split']
        # 2. Extract captions
        # The JSON contains 'raw' (original text) and 'tokens' (pre-tokenized).
        # Since you have your own cleaning logic, we grab 'raw'.
        for sentence in img_item['sentences']:
            caption = sentence['raw']
            
            rows.append({
                'image': filename,
                'image_path': os.path.join(image_folder_path, filename),
                'caption': caption,
                'split': split
            })
            
    return pd.DataFrame(rows)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

## Datasets

In [None]:
datasets = {
    'flickr8k': {
        'json': 'flickr8k/dataset_flickr8k.json',
        'img_folder': 'data/flickr8k/images/'
    },
    'flickr30k': {
        'json': 'flickr30k/dataset_flickr30k.json',
        'img_folder': 'data/flickr30k/flickr30k-images/'
    }
}

flickr8k_captions = pd.read_csv('data/flickr8k/captions.txt')
flicker8k_images_folder = 'data/flickr8k/images/'

flickr30k_captions = clean_dataset(pd.read_csv('data/flickr30k/flickr_annotations_30k.csv'))
flickr30k_images_folder = 'data/flickr30k/flickr30k-images'

# Visualization of some Images 

In [None]:
print("===== images8k =====")
show_image_with_label(flickr8k_captions.sample(5), flicker8k_images_folder)
print("===== images30k=====")
show_image_with_label(flickr30k_captions.sample(5), images30k_folder)

## Text Preprocessing

In [None]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def text_preprocessing(df):
    df['caption'] = df['caption'].str.lower().str.strip()
    df['caption'] = df['caption'].apply(lambda x: x.replace('\n', ' ').replace('\r', ' '))
    df['caption'] = df['caption'].str.replace(r'[^\w\s]', '', regex=True)
    
    return df

def preprocess_captions(df, tokenizer, max_length=30):
    tokenized_captions = []
    for caption in tqdm(df['caption'], desc="Tokenizing captions"):
        encoded = tokenizer.encode_plus(
            caption,
            add_special_tokens=True,
            max_length=max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        tokenized_captions.append(encoded['input_ids'].squeeze(0))
    df['tokenized_caption'] = tokenized_captions
    return df
flickr8k_captions = text_preprocessing(flickr8k_captions)
flickr30k_captions = text_preprocessing(flickr30k_captions)

flickr8k_captions = preprocess_captions(flickr8k_captions, tokenizer)
flickr30k_captions = preprocess_captions(flickr30k_captions, tokenizer)

# Print sample tokenized captions
print(flickr8k_captions[['caption', 'tokenized_caption']].head())

## Splitting the Dataset between train and test set

In [None]:
def splitting_dataset(df, size_split = 0.3, forcing=False):
    # For the case of flickr30k with a predefined split
    if 'split' in df.columns and not forcing:
        train_df = df[df['split'] == 'train'].reset_index(drop=True)
        test_df = df[df['split'] == 'test'].reset_index(drop=True)
        return train_df, test_df
    else:
        mapping_image = {}
        
        for i in range(len(df)):
            image_name = df.iloc[i]['image']
            if image_name not in mapping_image:
                mapping_image[image_name] = []
            mapping_image[image_name].append(i)

        len_image = len(mapping_image)

        train_size = int(len_image * (1 - size_split))
        test_size = len_image - train_size
        for i in range(len_image):
            image_name = list(mapping_image.keys())[i]
            indices = mapping_image[image_name]
            if i < train_size:
                df.loc[indices, 'split'] = 'train'
            else:
                df.loc[indices, 'split'] = 'test'
        train_df = df[df['split'] == 'train'].reset_index(drop=True)
        test_df = df[df['split'] == 'test'].reset_index(drop=True)

        return train_df, test_df
train_data8k, test_data8k = splitting_dataset(flickr8k_captions, size_split=0.3, forcing=True)
test_data8k,valid_data8k = splitting_dataset(test_data8k, size_split=0.5, forcing=True)

print(f"Flickr8k - Train set size: {len(train_data8k)}, Test set size: {len(test_data8k)}, Valid set size: {len(valid_data8k)}")


## Dataset

In [None]:
class FlickrDataset(Dataset):
    def __init__(self,df, root, transform=None):
        super().__init__()
        self.df = df
        self.root = root
        self.transform = transform

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        image_name = self.df.iloc[idx]['image']
        caption = self.df.iloc[idx]['tokenized_caption']
        image_path = os.path.join(self.root, image_name)
        image = load_image(image_path)
        if self.transform:
            image = self.transform(image)
        
        input_ids = caption[:-1]   # Exclude the last token for input
        target_ids = caption[1:]   # Exclude the first token for target
        return image, input_ids, target_ids
    
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
    #,transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

In [None]:
train_Flickr8k_dataset = FlickrDataset(
    train_data8k, flicker8k_images_folder, transform=image_transform)
test_Flickr8k_dataset = FlickrDataset(
    test_data8k, flicker8k_images_folder, transform=image_transform)
valid_Flickr8k_dataset = FlickrDataset(
    valid_data8k, flicker8k_images_folder, transform=image_transform)

print(f"Number of training samples: {len(train_Flickr8k_dataset)}")
print(f"Number of testing samples: {len(test_Flickr8k_dataset)}")
print(f"Number of validation samples: {len(valid_Flickr8k_dataset)}")

train_loader = data.DataLoader(
    train_Flickr8k_dataset, batch_size=32, shuffle=True, num_workers=0)
test_loader = data.DataLoader(
    test_Flickr8k_dataset, batch_size=32, shuffle=False, num_workers=0)
valid_loader = data.DataLoader(
    valid_Flickr8k_dataset, batch_size=32, shuffle=False, num_workers=0)

## Models

In [None]:
class EncoderResnet50(nn.Module):
    def __init__(self, encoded_dim=256):
        super().__init__()
        self.resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)

        #For the frozen Resnet
        for param in self.resnet.parameters():
            param.requires_grad = False
        
        modules = nn.Sequential(*list(self.resnet.children())[:-1])
        self.backbone = nn.Sequential(*modules)

        self.fc = nn.Linear(self.resnet.fc.in_features, encoded_dim)
        self.bn = nn.BatchNorm1d(encoded_dim)
        self.relu = nn.ReLU()

    def forward(self, images):
        with torch.no_grad():
            features = self.backbone(images)
        features = features.view(features.size(0), -1)
        features = self.relu(self.bn(self.fc(features)))
        return features
class DecoderLSTM(nn.Module):
    def __init__(self,encoder_dim, embed_size, hidden_size, vocab_size, num_layers=1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size + encoder_dim, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, features, captions):

        embeddings = self.embedding(captions)          
        img_features = features.unsqueeze(1).repeat(1, embeddings.size(1), 1)  
        inputs = torch.cat((img_features, embeddings), dim=2)  
        hiddens, _ = self.lstm(inputs)
        outputs = self.linear(hiddens)  
        return outputs

Encoder = EncoderResnet50(encoded_dim=256).to(device)
Decoder = DecoderLSTM(encoder_dim=256, embed_size=256, hidden_size=512, vocab_size=tokenizer.vocab_size).to(device)

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
params = list(Decoder.parameters())
optimizer = optim.Adam(params, lr= 1e-4)

def train_loop(train_loader, valid_loader, encoder, decoder, optimizer, criterion, device, num_epochs=10, batch_size=32):
    array_loss_train = []
    array_loss_valid = []
    
    for epoch in range(num_epochs):
        encoder.train()
        decoder.train()
        total_loss = 0
        
        for images, input_ids, target_ids in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
            images = images.to(device)
            input_ids = input_ids.to(device)
            target_ids = target_ids.to(device)
            
            features = encoder(images)
            outputs = decoder(features, input_ids)
            
            loss = criterion(outputs.view(-1, tokenizer.vocab_size), target_ids.view(-1))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        avg_train_loss = total_loss / len(train_loader)
        array_loss_train.append(avg_train_loss)
        print(f"Epoch {epoch+1}/{num_epochs} - Training Loss: {avg_train_loss:.4f}")
        
        # Validation loop can be added here if needed
        encoder.eval()
        decoder.eval()
        for images, input_ids, target_ids in tqdm(valid_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Validation"):
            images = images.to(device)
            input_ids = input_ids.to(device)
            target_ids = target_ids.to(device)
            
            with torch.no_grad():
                features = encoder(images)
                outputs = decoder(features, input_ids)
                
                loss = criterion(outputs.view(-1, tokenizer.vocab_size), target_ids.view(-1))
                total_loss += loss.item()
        avg_valid_loss = total_loss / len(valid_loader)
        print(f"Epoch {epoch+1}/{num_epochs} - Validation Loss: {avg_valid_loss:.4f}")
        array_loss_valid.append(avg_valid_loss)

    return array_loss_train, array_loss_valid

In [None]:
array_loss_train, array_loss_valid = train_loop(train_loader, valid_loader, Encoder, Decoder, optimizer, criterion, device, num_epochs=20, batch_size=32)

plt.plot(array_loss_train, label='Training Loss')
plt.plot(array_loss_valid, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
cls_id = tokenizer.cls_token_id
sep_id = tokenizer.sep_token_id

def generate_caption(image_path, encoder, decoder, tokenizer, max_length=30, device=device):
    encoder.eval()
    decoder.eval()

    image = load_image(image_path)
    image = image_transform(image).unsqueeze(0).to(device)

    with torch.no_grad():
        img_features = encoder(image)

        
        input_ids = torch.tensor([[cls_id]], dtype=torch.long).to(device)

        generated_ids = []

        for _ in range(max_length):

            outputs = decoder(img_features, input_ids)       
            logits = outputs[:, -1, :]                       
            predicted_id = logits.argmax(dim=1).unsqueeze(1) 

            if predicted_id.item() == sep_id:
                break
            generated_ids.append(predicted_id.item())
            input_ids = torch.cat([input_ids, predicted_id], dim=1)

    caption = tokenizer.decode(generated_ids, skip_special_tokens=True)
    return caption

# Test the caption generation

unique_images = test_data8k['image'].drop_duplicates().sample(5)

for image_name in unique_images:
    refs = test_data8k[test_data8k['image'] == image_name]['caption'].tolist()
    image_path = os.path.join(flicker8k_images_folder, image_name)
    generated_caption = generate_caption(image_path, Encoder, Decoder, tokenizer, max_length=30, device=device)
    print(f"Image: {image_name}")
    print(f"Generated Caption: {generated_caption}")
    print("Reference Captions:")
    for i, ref in enumerate(refs, 1):
        print(f"  Ref {i}: {ref}")
    image =load_image(image_path)
    refs_str = "\n".join([f"Ref {i}: {c}" for i, c in enumerate(refs)])
    show_image(image, title=f"Generated: {generated_caption}\n{refs_str}")
    print("\n" + "-"*50 + "\n")

## Transform-based model ()

In [None]:
class EncoderVisionTransformer(nn.Module):
    def __init__(self, encoded_dim=256):
        super().__init__()
        self.vit = models.vit_b_16(weights=models.ViT_B_16_Weights.IMAGENET1K_V1)

        #For the frozen ViT
        for param in self.vit.parameters():
            param.requires_grad = False
        
        modules = nn.Sequential(*list(self.vit.children())[:-1])
        self.backbone = nn.Sequential(*modules)

        self.fc = nn.Linear(self.vit.heads.head.in_features, encoded_dim)
        self.bn = nn.BatchNorm1d(encoded_dim)
        self.relu = nn.ReLU()
        
        
    def forward(self, images):
        with torch.no_grad():
            features = self.backbone(images)
        features = features.view(features.size(0), -1)
        features = self.relu(self.bn(self.fc(features)))
        return features
    
class DecoderTransformer(nn.Module):
    def __init__(self, encoder_dim, embed_size, num_heads, hidden_size, vocab_size, num_layers=1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_encoding = nn.Parameter(torch.zeros(1, 500, embed_size)) 
        transformer_layer = nn.TransformerDecoderLayer(d_model=embed_size + encoder_dim, nhead=num_heads, dim_feedforward=hidden_size)
        self.transformer_decoder = nn.TransformerDecoder(transformer_layer, num_layers=num_layers)
        self.linear = nn.Linear(embed_size + encoder_dim, vocab_size)
    
    def forward(self, features, captions):
        embeddings = self.embedding(captions)          
        embeddings = embeddings + self.positional_encoding[:, :embeddings.size(1), :]

        img_features = features.unsqueeze(1).repeat(1, embeddings.size(1), 1)  
        inputs = torch.cat((img_features, embeddings), dim=2)  

        inputs = inputs.permute(1, 0, 2) 
        memory = img_features.permute(1, 0, 2)  

        hiddens = self.transformer_decoder(inputs, memory)
        hiddens = hiddens.permute(1, 0, 2) 

        outputs = self.linear(hiddens)  
        return outputs

Encoder = EncoderVisionTransformer(encoded_dim=256).to(device)
Decoder = DecoderTransformer(encoder_dim=256, embed_size=256, num_heads=8, hidden_size=512, vocab_size=tokenizer.vocab_size, num_layers=2).to(device)