<a href="https://colab.research.google.com/github/nayvirmis/open-source-arcade/blob/main/ImageCaptioningSystem.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
from torch.nn.utils.rnn import pack_padded_sequence


In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        """Load the pretrained ResNet-152 and replace top fc layer."""
        super(EncoderCNN, self).__init__()
        resnet = models.resnet152(pretrained=True)
        modules = list(resnet.children())[:-1]      # delete the last fc layer.
        self.resnet = nn.Sequential(*modules)
        self.linear = nn.Linear(resnet.fc.in_features, embed_size)
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)

    def forward(self, images):
        """Extract feature vectors from input images."""
        with torch.no_grad():
            features = self.resnet(images)
        features = features.reshape(features.size(0), -1)
        features = self.bn(self.linear(features))
        return features


In [None]:
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers, max_seq_length=20):
        """Set the hyper-parameters and build the layers."""
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.max_seg_length = max_seq_length

    def forward(self, features, captions, lengths):
        """Decode image feature vectors and generates captions."""
        embeddings = self.embed(captions)
        embeddings = torch.cat((features.unsqueeze(1), embeddings), 1)
        packed = pack_padded_sequence(embeddings, lengths, batch_first=True)
        hiddens, _ = self.lstm(packed)
        outputs = self.linear(hiddens[0])
        return outputs

    def sample(self, features, states=None):
        """Generate captions for given image features using greedy search."""
        sampled_ids = []
        inputs = features.unsqueeze(1)
        for i in range(self.max_seg_length):
            hiddens, states = self.lstm(inputs, states)          # hiddens: (batch_size, 1, hidden_size)
            outputs = self.linear(hiddens.squeeze(1))            # outputs:  (batch_size, vocab_size)
            _, predicted = outputs.max(1)                        # predicted: (batch_size)
            sampled_ids.append(predicted)
            inputs = self.embed(predicted)                       # inputs: (batch_size, embed_size)
            inputs = inputs.unsqueeze(1)                         # inputs: (batch_size, 1, embed_size)
        sampled_ids = torch.stack(sampled_ids, 1)                # sampled_ids: (batch_size, max_seq_length)
        return sampled_ids


In [None]:
# To access the dataset, use the link https://www.kaggle.com/datasets/adityajn105/flickr8k

from google.colab import files

# Upload the kaggle.json file
uploaded = files.upload()

# Move the uploaded kaggle.json file to the required directory
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Specify the Kaggle dataset name for Flickr8k
kaggle_dataset_name = 'adityajn105/flickr8k'

# Download the dataset using Kaggle API with --force option
!kaggle datasets download -d {kaggle_dataset_name} --force

# Unzip the dataset with -o option to automatically replace conflicting files
!unzip -o -qq '*.zip' -d '/content/Images/Images'

import os
import random

# List the contents of the 'Images' folder to verify the structure
image_folder_contents = os.listdir('/content/Images/Images')
print(f'Contents of the Images folder: {image_folder_contents}')

# Identify the correct folder name containing the images and captions
correct_folder_name = 'Images'  # Adjust the folder name as needed
image_folder = '/content/Images/Images'
# Update the paths accordingly
captions_file = f'/content/Images/Images/captions.txt'
image_folder = f'/content/Images/Images/{correct_folder_name}'

image_folder = '/content/Images/Images'  # Adjusted path to include the extra 'Images'

# Read captions from the text file and create image-caption pairs
image_caption_pairs = {}
with open(captions_file, 'r') as file:
    for line in file:
        parts = line.strip().split(',')
        image_filename = parts[0].strip()
        caption = parts[1].strip()

        # Add captions to the same image filename
        if image_filename not in image_caption_pairs:
            image_caption_pairs[image_filename] = []
        image_caption_pairs[image_filename].append(caption)

# List all image filenames
image_filenames = list(image_caption_pairs.keys())

# Split the data into training and testing sets
random.shuffle(image_filenames)
split_point = int(0.8 * len(image_filenames))

train_image_filenames = image_filenames[:split_point]
test_image_filenames = image_filenames[split_point:]

# Create training and testing dictionaries
train_image_caption_pairs = {filename: image_caption_pairs[filename] for filename in train_image_filenames}
test_image_caption_pairs = {filename: image_caption_pairs[filename] for filename in test_image_filenames}

# Print the number of images in each set
print(f'Number of images in the training set: {len(train_image_filenames)}')
print(f'Number of images in the testing set: {len(test_image_filenames)}')


Saving kaggle.json to kaggle.json
Downloading flickr8k.zip to /content
 99% 1.03G/1.04G [00:11<00:00, 131MB/s]
100% 1.04G/1.04G [00:11<00:00, 94.6MB/s]
Contents of the Images folder: ['186890601_8a6b0f1769.jpg', '2119660490_ce0d4d1f73.jpg', '2121357310_f8235311da.jpg', '888517718_3d5b4b7b43.jpg', '3690189273_927d42ff43.jpg', '1670592963_39731a3dac.jpg', '2554081584_233bdf289a.jpg', '3517040752_debec03376.jpg', '3380407617_07b53cbcce.jpg', '3280644151_3d89cb1e0e.jpg', '516214924_c2a4364cb3.jpg', '271177682_48da79ab33.jpg', '2512876666_9da03f9589.jpg', '670609997_5c7fdb3f0b.jpg', '2718495608_d8533e3ac5.jpg', '522063319_33827f1627.jpg', '3349258288_5300c40430.jpg', '2985679744_75a7102aab.jpg', '2928152792_b16c73434a.jpg', '2561849813_ff9caa52ac.jpg', '1334892555_1beff092c3.jpg', '2537119659_fa01dd5de5.jpg', '3596459539_a47aa80612.jpg', '3397633339_d1ae6d9a0e.jpg', '2256138896_3e24b0b28d.jpg', '3676561090_9828a9f6d0.jpg', '3637013_c675de7705.jpg', '2384728877_48c85d58af.jpg', '3041170372_c

In [None]:
import nltk
from collections import Counter

# Download the punkt tokenizer for word splitting
nltk.download('punkt')

class Vocabulary(object):
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0  # Keeps track of the next index

    def add_word(self, word):
        if not word in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1

    def __call__(self, word):
        if not word in self.word2idx:
            return self.word2idx['<unk>']  # '<unk>' is a special token for unknown words
        return self.word2idx[word]

    def __len__(self):
        return len(self.word2idx)

    def get_word(self, idx):
        if idx not in self.idx2word:
            return '<unk>'  # Return '<unk>' if the index is not in the vocabulary
        return self.idx2word[idx]


# Define a function to build the vocabulary and preprocess captions
def build_vocab(image_caption_pairs, threshold):
    counter = Counter()
    for _, captions in image_caption_pairs.items():
        for caption in captions:
            tokens = nltk.tokenize.word_tokenize(caption.lower())
            counter.update(tokens)

    words = [word for word, cnt in counter.items() if cnt >= threshold]

    # Create a vocab instance
    vocab = Vocabulary()
    vocab.add_word('<pad>')
    vocab.add_word('<start>')
    vocab.add_word('<end>')
    vocab.add_word('<unk>')

    # Add the words to the vocabulary
    for word in words:
        vocab.add_word(word)
    return vocab

# Build vocabulary
vocab = build_vocab(train_image_caption_pairs, threshold=5)
vocab_size = len(vocab)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [None]:
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from pathlib import Path
import os
from PIL import Image
import torch
from torchvision import transforms

transform = transforms.Compose([
    # Other transformations...
    transforms.ToTensor(),  # Converts PIL Image to Tensor
    # Other transformations...
])

train_dataset = Flickr8kCustomDataset(train_image_caption_pairs, image_folder, vocab, transform=transform)
# ...


image_folder = '/content/Images/Images'  # Adjusted path to include the extra 'Images'

class Flickr8kCustomDataset(Dataset):
    def __init__(self, image_caption_pairs, image_folder, vocab, transform=None):
        """
        Args:
            image_caption_pairs: Dictionary of image paths and corresponding captions
            image_folder: Folder path where images are stored
            vocab: Vocabulary instance
            transform: Transforms to be applied on images
        """
        self.image_caption_pairs = image_caption_pairs
        self.image_folder = image_folder
        self.vocab = vocab
        self.transform = transform


    def __len__(self):
        return len(self.image_caption_pairs)

    def __getitem__(self, idx):
        """
        Returns one data pair (image and caption).
        """
        image_name = list(self.image_caption_pairs.keys())[idx]
        image_path = os.path.join(self.image_folder, image_name)

        caption = random.choice(self.image_caption_pairs[image_name])

        image = Image.open(os.path.join(self.image_folder, image_name)).convert('RGB')
        if not Path(image_path).is_file():
            print(f"File not found: {image_path}")
            # Instead of returning None, return a zero tensor or some placeholder
            image = torch.zeros(3, 224, 224)  # Example: return a zero tensor with the same dimensions as your images
            target = torch.zeros(1)  # Placeholder target, you might want to handle this differently
        else:
            image = Image.open(image_path).convert('RGB')
            if self.transform:
                image = self.transform(image)


        # Convert caption to word ids.
        tokens = nltk.tokenize.word_tokenize(str(caption).lower())
        caption = []
        caption.append(self.vocab('<start>'))
        caption.extend([self.vocab(token) for token in tokens])
        caption.append(self.vocab('<end>'))
        target = torch.Tensor(caption)

        # Convert caption to tensor and pad if necessary
        caption_tensor = torch.Tensor(caption).long()
        return image, caption_tensor


# Define transformations for the image
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomCrop(224),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

# Create the Dataset and DataLoader
train_dataset = Flickr8kCustomDataset(train_image_caption_pairs, image_folder, vocab, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)


In [None]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    """Creates mini-batch tensors from the list of tuples (image, caption).

    We should build a custom collate_fn rather than using default collate_fn,
    because merging caption (including padding) is not supported in default.
    Args:
        batch: list of tuple (image, caption).
            - image: torch tensor of shape (3, 256, 256).
            - caption: torch tensor of shape (?); variable length.

    Returns:
        images: torch tensor of shape (batch_size, 3, 256, 256).
        targets: torch tensor of shape (batch_size, padded_length).
        lengths: list; valid length for each padded caption.
    """

    batch = [data for data in batch if data is not None and data[0] is not None]
    if not batch:
        return None, None, None
    # Filter out all the None values or empty dicts
    batch = [data for data in batch if data[0] != {}]

    # Proceed only if the batch is not empty
    if len(batch) == 0:
        return torch.Tensor(), torch.Tensor(), torch.Tensor()  # return empty tensors

    # Sort a data list by caption length (descending order).
    batch.sort(key=lambda x: len(x[1]), reverse=True)
    images, captions = zip(*batch)

    # Merge images (from tuple of 3D tensor to 4D tensor).
    images = torch.stack(images, 0) if all(isinstance(image, torch.Tensor) for image in images) else None


    # Merge captions (from tuple of 1D tensor to 2D tensor).
    lengths = [len(cap) for cap in captions]
    targets = pad_sequence(captions, batch_first=True)
    return images, targets, lengths

# Create the DataLoader with custom collate_fn
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, collate_fn=collate_fn)


In [None]:
if 'image' in image_caption_pairs:
    del image_caption_pairs['image']


In [None]:
import os

# Path to your image directory
image_dir = '/content/Images/Images/'

# Get list of file names from the directory
actual_file_names = os.listdir(image_dir)

# Get list of file names from your dataset
dataset_file_names = list(image_caption_pairs.keys())

missing_files = [file for file in dataset_file_names if file not in actual_file_names]

# Assume image_caption_pairs is your dataset dictionary {image_name: caption}
not_found_files = []
for image_name in image_caption_pairs.keys():
    image_path = os.path.join(image_dir, image_name)
    if not os.path.isfile(image_path):
        not_found_files.append(image_name)

if not_found_files:
    print(f"Files not found: {not_found_files}")
else:
    print("All files found.")


All files found.


In [62]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence
import time

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Model parameters
embed_size = 256  # Dimension of image and word embeddings
hidden_size = 512  # Dimension of the RNN/LSTM hidden state
num_layers = 2  # Number of layers in the RNN/LSTM


# Initialize the models
encoder = EncoderCNN(embed_size).to(device)
decoder = DecoderRNN(embed_size, hidden_size, vocab_size, num_layers).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
params = list(decoder.parameters()) + list(encoder.linear.parameters()) + list(encoder.bn.parameters())
optimizer = torch.optim.Adam(params, lr=0.001)


    # Training loop
num_epochs = 5
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, batch in enumerate(train_loader):  # Note the addition of 'lengths' here
        images, captions, lengths = batch
        if any(d is None for d in batch):  # Changed 'data' to 'batch'
            print("Skipping a batch due to None values")
            continue  # Skip this iteration due to issue in the data

        # Set mini-batch dataset
        images = images.to(device)
        captions = captions.to(device)

        # Prepare packed sequence to handle variable-length captions
        targets = pack_padded_sequence(captions, lengths, batch_first=True, enforce_sorted=False)[0]

        # Forward, backward and optimize
        features = encoder(images)
        outputs = decoder(features, captions, lengths)
        loss = criterion(outputs, targets)

        decoder.zero_grad()
        encoder.zero_grad()
        loss.backward()
        optimizer.step()

        # Print log info
        if i % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i}/{total_step}], Loss: {loss.item():.4f}')


Epoch [1/5], Step [0/203], Loss: 7.8585
Epoch [1/5], Step [100/203], Loss: 4.0263
Epoch [1/5], Step [200/203], Loss: 3.5143
Epoch [2/5], Step [0/203], Loss: 3.8293
Epoch [2/5], Step [100/203], Loss: 3.6423
Epoch [2/5], Step [200/203], Loss: 3.7506
Epoch [3/5], Step [0/203], Loss: 3.5165
Epoch [3/5], Step [100/203], Loss: 3.4975
Epoch [3/5], Step [200/203], Loss: 4.1603
Epoch [4/5], Step [0/203], Loss: 3.4851
Epoch [4/5], Step [100/203], Loss: 3.1382
Epoch [4/5], Step [200/203], Loss: 3.0995
Epoch [5/5], Step [0/203], Loss: 3.1473
Epoch [5/5], Step [100/203], Loss: 3.1271
Epoch [5/5], Step [200/203], Loss: 3.4978


In [64]:
from nltk.translate.bleu_score import sentence_bleu
from pathlib import Path



    # Existing code to open the image and process it
    # ...


def evaluate_model(encoder, decoder, image, vocab, max_length=20):
    # Prepare the image
    encoder.eval()
    decoder.eval()
    with torch.no_grad():
        # Encode the image
        features = encoder(image).unsqueeze(1)

        # Prepare the generated caption
        generated_caption = []

        # Initialize the LSTM state
        states = None

        # Generate the caption word by word
        for _ in range(max_length):
            hiddens, states = decoder.lstm(features, states)
            outputs = decoder.linear(hiddens.squeeze(1))
            _, predicted = outputs.max(1)
            predicted_word = vocab.get_word(predicted.item())
            generated_caption.append(predicted_word)

            # Break if the sentence is complete
            if predicted_word == '<end>':
                break

            # Prepare the next input
            features = decoder.embed(predicted).unsqueeze(1)

    # Convert the list of generated words to a sentence
    generated_caption = ' '.join(generated_caption)

    return generated_caption

# Set the models to evaluation mode
encoder.eval()
decoder.eval()

# Store the BLEU scores
bleu_scores = []

for image_filename in test_image_filenames:
    image_path = os.path.join(image_folder, image_filename)
    if not Path(image_path).is_file():
        print(f"File not found: {image_path}")
        continue  # Skip this file
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0).to(device)

    generated_caption = evaluate_model(encoder, decoder, image, vocab)

    # Retrieve the actual captions
    actual_captions = test_image_caption_pairs[image_filename]

    # Preprocess the generated caption and the actual captions
    generated_words = generated_caption.split()
    actual_words = [caption.split() for caption in actual_captions]

    # Calculate BLEU score for this image
    score = sentence_bleu(actual_words, generated_words)
    bleu_scores.append(score)

    print(f'Generated caption: {generated_caption}')
    print(f'Actual captions: {actual_captions}')
    print(f'BLEU score: {score}\n')

# Calculate the average BLEU score
average_bleu_score = sum(bleu_scores) / len(bleu_scores)
print(f'Average BLEU score on the test dataset: {average_bleu_score}')


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
BLEU score: 6.150463482648932e-155

Generated caption: <start> a dog is running through a field of water . <end>
Actual captions: ['A brown and black dog is standing on its hind legs trying to catch something .', 'A dog is jumping up on hind legs with its mouth open .', 'A dog plays on the grass .', 'A German shepherd playing with water in the grass .', 'German Shepherd standing up snapping at droplets of water']
BLEU score: 7.200083385062637e-155

Generated caption: <start> a man in a red shirt is standing on a <unk> . <end>
Actual captions: ['a closeup blurred image of a man wearing a green shirt and tan pants .', 'A distorted picture of two men walking in the street .', 'A man in brown pants has his picture enlarged and stretched .', 'A man wearing carpenters pants and a green hooded sweatshirt walks down the street .', 'A man with a dark blue jacket and brown pants stands beside another man in front of a building .']
