In [76]:
import nltk
from collections import defaultdict
class Vocabulary:
    def __init__(self, threshold):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0
        self.threshold = threshold

        # Special Tokesn
        self.add_word('<pad>')
        self.add_word('<start>')
        self.add_word('<end>')
        self.add_word('<unk>')
    def add_word(self, word):
        if word not in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx+=1

    def __call__(self, word):
        if word not in self.word2idx:
            return self.word2idx['<unk>']
        return self.word2idx[word]
    def __len__(self):
        """Return the size of the vocabulary."""
        return len(self.word2idx)

def build_vocab(ann_file, threshold=5):
    from pycocotools.coco import COCO
    coco = COCO(ann_file)
    counter = defaultdict(int)
    for ann_id in coco.anns.keys():
        caption = coco.anns[ann_id]['caption']
        tokens = nltk.tokenize.word_tokenize(caption.lower())
        for token in tokens:
            counter[token]+=1

    # create vocab
    vocab = Vocabulary(threshold)
    for word, count in counter.items():
        if count>threshold:
            vocab.add_word(word)

    return vocab
    

    

In [None]:
vocab = build_vocab('/home/mirsee/projects/image_captioning/notebooks/annotations/captions_val2017.json', threshold=2)

In [None]:
vocab.word2idx

In [4]:
import nltk
from collections import defaultdict
caption = "Hello how are you"
counter = defaultdict(int)
tokens = nltk.tokenize.word_tokenize(caption.lower())
for token in tokens:
    counter[token]+=1

In [None]:
tokens

In [None]:
counter

In [None]:
for word, count in counter.items():
    print(word, count)

## Create a Custom Dataset for COCO
- Now that we have the vocabulary, we need to create a PyTorch custom Dataset class to load the images and captions. This class will:

- Load the image from the COCO dataset.
- Tokenize the caption using the vocabulary.
- Apply transformations (like resizing and normalization) to the images.
- Return the processed image and the tokenized caption.

In [45]:
import os
from torch.utils.data import Dataset
from PIL import Image
import torch
import nltk
from pycocotools.coco import COCO
import random
import requests
from io import BytesIO

class CocoDataset(Dataset):
    def __init__(self, root, ann_file, vocab, transform=None, subset_fraction=1.0):
        """
        Args:
            root (string): Directory with all the images.
            ann_file (string): Path to the annotation file.
            vocab (Vocabulary): Vocabulary object for tokenizing captions.
            transform (callable, optional): Transform to be applied to the images.
            subset_fraction (float, optional): Fraction of dataset to use, e.g., 0.25 for 1/4th of the data.
        """
        self.root = root
        self.coco = COCO(ann_file)
        self.ids = list(self.coco.anns.keys())  # List of annotation IDs

        # Use only a subset of the dataset if subset_fraction is less than 1.0
        if subset_fraction < 1.0:
            subset_size = int(len(self.ids) * subset_fraction)
            self.ids = random.sample(self.ids, subset_size)

        self.vocab = vocab
        self.transform = transform

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, index):
        ann_id = self.ids[index]
        caption = self.coco.anns[ann_id]['caption']
        img_id = self.coco.anns[ann_id]['image_id']
        img_info = self.coco.loadImgs(img_id)[0]
        path = img_info['file_name']
        img_url = img_info['coco_url']  # Get the image URL from COCO annotations

        # Fetch the image from the URL
        response = requests.get(img_url)
        image = Image.open(BytesIO(response.content)).convert('RGB')


        # Load the image

        # Apply transformations if provided
        if self.transform:
            image = self.transform(image)

        # Tokenize the caption and convert to indices
        tokens = nltk.tokenize.word_tokenize(str(caption).lower())
        caption_idx = [self.vocab('<start>')] + [self.vocab(token) for token in tokens] + [self.vocab('<end>')]
        caption_tensor = torch.LongTensor(caption_idx)

        return image, caption_tensor


In [None]:
coco = COCO('/home/mirsee/projects/image_captioning/notebooks/annotations/captions_val2017.json')

In [47]:
ann_id = list(coco.anns.keys())[0]

In [None]:
caption = coco.anns[ann_id]['caption']
caption

In [None]:
img_id = coco.anns[ann_id]['image_id']
img_id

In [None]:
img_info = coco.loadImgs(img_id)
img_info

In [51]:
img_info = img_info[0]

In [None]:
path = img_info['file_name']
path

In [None]:
import urllib
import urllib.request
img_url = img_info['coco_url']
file_name = img_info['file_name']
img_path = os.path.join('/home/mirsee/projects/image_captioning', file_name)
urllib.request.urlretrieve(img_url, file_name)

In [None]:
print(img_path)

In [None]:
Image.open(file_name)

In [56]:
tokens = nltk.tokenize.word_tokenize(caption.lower())

In [None]:
tokens

In [58]:
caption_idx = [vocab('<start>')] +[vocab(token) in tokens]+[vocab('<end>')]

In [None]:
caption_idx

In [None]:
torch.Tensor(caption_idx).long()

## Image Transformation

In [43]:
import torchvision.transforms as transforms

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

## Create a DataLoader
    Next, you need to create a DataLoader to load the dataset in mini-batches. 
    The DataLoader will shuffle the data and handle batching. However, captions are variable-length sequences, so you need a custom collate_fn to pad the captions in each batch.

In [61]:
import torch.nn.utils.rnn as rnn

def collate_fn(data, pad_idx):
    """Creates mini-batch tensors from the list of tuples (image, caption)."""
    # Sort data by caption length (descending)
    data.sort(key=lambda x: len(x[1]), reverse=True)
    images, captions = zip(*data)

    # Stack images into a single tensor
    images = torch.stack(images, 0)

    # Pad the captions to the maximum length in the batch
    lengths = [len(cap) for cap in captions]
    padded_captions = rnn.pad_sequence(captions, batch_first=True, padding_value=pad_idx)

    return images, padded_captions, lengths



In [None]:
# Path to COCO image folder and annotations
image_root = '/home/mirsee/projects/image_captioning/data'
ann_file = '/home/mirsee/projects/image_captioning/notebooks/annotations/captions_train2017.json'

# Build vocabulary (done earlier)
vocab = build_vocab(ann_file, threshold=5)

# Create the dataset with only 1/4 of the COCO dataset
subset_fraction = 0.25  # Use only 25% of the dataset
dataset = CocoDataset(root=image_root, ann_file=ann_file, vocab=vocab, transform=transform, subset_fraction=subset_fraction)

pad_idx = vocab('<pad>')

# Create the DataLoader, passing the pad_idx to the collate_fn
data_loader = DataLoader(dataset=dataset, batch_size=32, shuffle=True, 
                         num_workers=4, collate_fn=lambda x: collate_fn(x, pad_idx))


In [None]:
data_loader

In [None]:
# Iterate over the data loader
for i, (images, captions, lengths) in enumerate(data_loader):
    print("Batch", i)
    print("Images shape:", images.shape)  # (batch_size, 3, 224, 224)
    print("Captions shape:", captions.shape)  # (batch_size, max_caption_length)
    print("Lengths:", lengths)

    if i == 0:  # Only show the first batch
        break


Great! It looks like your DataLoader is now working correctly. Here’s a quick summary of what you have:

Output Explanation:
Batch 0: This is the first mini-batch of data.

Images shape: torch.Size([32, 3, 224, 224])

You have a batch size of 32 images.
Each image is in 3 channels (RGB).
Each image has been resized to 224x224 pixels (due to your transformation).
Captions shape: torch.Size([32, 23])

There are 32 captions in this batch (matching the batch size).
The maximum caption length in this batch is 23 tokens.
All captions have been padded to the maximum length of 23 tokens in the batch.
Lengths: [23, 19, 19, 17, 17, ...]

This is the list of actual lengths of the captions before padding.
The longest caption in the batch has 23 tokens, while the shortest has 10 tokens.

In [1]:
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transform
import torch
import nltk
from collections import defaultdict
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import os
from torch.utils.data import Dataset
from PIL import Image
import torch
import nltk
from pycocotools.coco import COCO
import random
import requests
from io import BytesIO

class Vocabulary:
    def __init__(self, threshold):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0
        self.threshold = threshold

        # Special Tokesn
        self.add_word('<pad>')
        self.add_word('<start>')
        self.add_word('<end>')
        self.add_word('<unk>')
    def add_word(self, word):
        if word not in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx+=1

    def __call__(self, word):
        if word not in self.word2idx:
            return self.word2idx['<unk>']
        return self.word2idx[word]
    def __len__(self):
        """Return the size of the vocabulary."""
        return len(self.word2idx)

def build_vocab(ann_file, threshold=5):
    from pycocotools.coco import COCO
    coco = COCO(ann_file)
    counter = defaultdict(int)
    for ann_id in coco.anns.keys():
        caption = coco.anns[ann_id]['caption']
        tokens = nltk.tokenize.word_tokenize(caption.lower())
        for token in tokens:
            counter[token]+=1

    # create vocab
    vocab = Vocabulary(threshold)
    for word, count in counter.items():
        if count>threshold:
            vocab.add_word(word)

    return vocab
    

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
 

class CocoDataset(Dataset):
    def __init__(self, root, ann_file, vocab, transform=None, subset_fraction=1.0):
        """
        Args:
            root (string): Directory with all the images.
            ann_file (string): Path to the annotation file.
            vocab (Vocabulary): Vocabulary object for tokenizing captions.
            transform (callable, optional): Transform to be applied to the images.
            subset_fraction (float, optional): Fraction of dataset to use, e.g., 0.25 for 1/4th of the data.
        """
        self.root = root
        self.coco = COCO(ann_file)
        self.ids = list(self.coco.anns.keys())  # List of annotation IDs

        # Use only a subset of the dataset if subset_fraction is less than 1.0
        if subset_fraction < 1.0:
            subset_size = int(len(self.ids) * subset_fraction)
            self.ids = random.sample(self.ids, subset_size)

        self.vocab = vocab
        self.transform = transform

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, index):
        ann_id = self.ids[index]
        caption = self.coco.anns[ann_id]['caption']
        img_id = self.coco.anns[ann_id]['image_id']
        img_info = self.coco.loadImgs(img_id)[0]
        path = img_info['file_name']
        img_url = img_info['coco_url']  # Get the image URL from COCO annotations

        # Fetch the image from the URL
        response = requests.get(img_url)
        image = Image.open(BytesIO(response.content)).convert('RGB')


        # Load the image

        # Apply transformations if provided
        if self.transform:
            image = self.transform(image)

        # Tokenize the caption and convert to indices
        tokens = nltk.tokenize.word_tokenize(str(caption).lower())
        caption_idx = [self.vocab('<start>')] + [self.vocab(token) for token in tokens] + [self.vocab('<end>')]
        caption_tensor = torch.LongTensor(caption_idx)

        return image, caption_tensor
   
# Path to COCO image folder and annotations
image_root = '/home/mirsee/projects/image_captioning/data'
ann_file = '/home/mirsee/projects/image_captioning/notebooks/annotations/captions_train2017.json'

# Build vocabulary (done earlier)
vocab = build_vocab(ann_file, threshold=5)

# Create the dataset with only 1/4 of the COCO dataset
subset_fraction = 0.25  # Use only 25% of the dataset
dataset = CocoDataset(root=image_root, ann_file=ann_file, vocab=vocab, transform=transform, subset_fraction=subset_fraction)

pad_idx = vocab('<pad>')
import torch.nn.utils.rnn as rnn

def collate_fn(data, pad_idx):
    """Creates mini-batch tensors from the list of tuples (image, caption)."""
    # Sort data by caption length (descending)
    data.sort(key=lambda x: len(x[1]), reverse=True)
    images, captions = zip(*data)

    # Stack images into a single tensor
    images = torch.stack(images, 0)

    # Pad the captions to the maximum length in the batch
    lengths = [len(cap) for cap in captions]
    padded_captions = rnn.pad_sequence(captions, batch_first=True, padding_value=pad_idx)

    return images, padded_captions, lengths


# Create the DataLoader, passing the pad_idx to the collate_fn
data_loader = DataLoader(dataset=dataset, batch_size=32, shuffle=True, 
                         num_workers=4, collate_fn=lambda x: collate_fn(x, pad_idx))


class EncoderCNN(nn.Module):
    """
    takes in the size of the embeded_vector to fed to rnn.
    this is not used for training but just get the feature vector of size embed size
    """
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet50(pretrained=True) #Load the model with all the pretrained weights
        for param in resnet.parameters():
            param.requires_grad_(False)
            #By setting requires_grad=False, you are telling PyTorch not to compute gradients for this tensor during backpropagation.
    
    # get all the layers except last as we are not intereseted in classification
        modules = list(resnet.children())[:-1] # last layer Linear(in_features=2048, out_features=1000, bias=True)
        self.resnet = nn.Sequential(*modules)#unpackign the layers
        self.embed = nn.Linear(resnet.fc.in_features, embed_size)
    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1) # flatten the layer 
        features = self.embed(features)
        return features
    


class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(DecoderRNN, self).__init__()
        
        # Embedding layer: converts word indices into dense vectors of size embed_size
        self.embedding = nn.Embedding(vocab_size, embed_size)
        
        # LSTM: input to hidden, hidden_size must match the size of features from CNN
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        
        # Fully connected layer to map LSTM output to vocab_size
        self.fc = nn.Linear(hidden_size, vocab_size)
        
        # Initialize the hidden state (if needed)
        self.hidden_size = hidden_size
        
        # Optional dropout to prevent overfitting
        self.dropout = nn.Dropout(0.5)
    
    def forward(self, features, captions):
        """
        Forward pass of the decoder.
        Arguments:
        - features: Tensor of shape (batch_size, feature_size=512)
        - captions: Tensor of shape (batch_size, max_caption_length), word indices
        
        Returns:
        - outputs: Tensor of shape (batch_size, max_caption_length, vocab_size), word predictions
        """
        
        # Embedding the captions, excluding the <end> token"
        embeddings = self.embedding(captions[:, :-1])
        
        # Concatenate the features with the embedded captions
        # Features are passed as input to the first time step
        features = features.unsqueeze(1)  # shape (batch_size, 1, feature_size)
        lstm_input = torch.cat((features, embeddings), 1)  # shape (batch_size, 1 + caption_length, embed_size)
        
        # Pass the concatenated inputs through the LSTM
        lstm_out, _ = self.lstm(lstm_input)
        
        # Pass the LSTM output through the fully connected layer to get word predictions
        outputs = self.fc(lstm_out)
        
        return outputs


loading annotations into memory...
Done (t=0.62s)
creating index...
index created!
loading annotations into memory...
Done (t=0.67s)
creating index...
index created!


In [3]:
import torch
import torch.optim as optim
import torch.nn as nn
import torchvision.models as models

# Hyperparameters
embed_size = 256
hidden_size = 512
vocab_size = len(vocab)  # Vocabulary size (make sure vocab.__len__() is implemented)
num_epochs = 10
learning_rate = 0.001
log_interval = 10  # Log every 10 batches
batch_size = 32  # Batch size for training
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Initialize models
encoder = EncoderCNN(embed_size).to(device)
decoder = DecoderRNN(embed_size, hidden_size, vocab_size, num_layers=1).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=learning_rate)

# Load data
# Assume you have already created a DataLoader called data_loader
# data_loader = DataLoader(...)

# Training loop
for epoch in range(num_epochs):
    encoder.train()  # Set encoder to training mode
    decoder.train()  # Set decoder to training mode
    
    total_loss = 0  # Keep track of loss for the entire epoch
    
    for i, (images, captions, lengths) in enumerate(data_loader):
        
        # Move images and captions to the device (GPU or CPU)
        images = images.to(device)
        captions = captions.to(device)
        
        # Forward pass: Extract image features using the encoder
        features = encoder(images)  # (batch_size, embed_size)

        # Forward pass: Generate captions using the decoder
        outputs = decoder(features, captions)  # (batch_size, max_caption_length, vocab_size)
        
        # Adjust the outputs to exclude the last time step, to match the target captions[:, 1:]
        outputs = outputs[:, :-1, :]  # Exclude the last predicted word

# Flatten the outputs and the target captions
        loss = criterion(outputs.contiguous().view(-1, vocab_size), captions[:, 1:].contiguous().view(-1))

        
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        
        # Update the model's parameters
        optimizer.step()

        total_loss += loss.item()
        
        # Logging the loss every log_interval
        if i % log_interval == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(data_loader)}], Loss: {loss.item():.4f}")
    
    # Average loss for the epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Average Loss: {total_loss / len(data_loader):.4f}")


Epoch [1/10], Step [1/4624], Loss: 9.1783
Epoch [1/10], Step [11/4624], Loss: 5.0669
Epoch [1/10], Step [21/4624], Loss: 3.5331
Epoch [1/10], Step [31/4624], Loss: 3.1609
Epoch [1/10], Step [41/4624], Loss: 2.2805
Epoch [1/10], Step [51/4624], Loss: 3.5441
Epoch [1/10], Step [61/4624], Loss: 2.8929


KeyboardInterrupt: 