**Project Overview:**

input: Image

output: Caption

Train the model using given dataset

Steps:
1. Split into train, val, test.
2. Tune hyperparameters based on the validation set. Try out various hyperparameters. Prevent overfitting using common techniques (dropout, regularization)

Model Choices:
1. CNN for image feature extraction, and use this as the input for the RNNs. Incorporate attention into the image feature extraction.

Code Structure: (Import the required modules)
1. Input the data
2. Preprocess the data
3. Define the model
4. Train the model. Use various different hyperparameters and choose the best for the validation set
5. Evaluate using the test data





In [None]:
# Import the required libraries

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from google.colab import drive

from tensorflow.keras.applications import VGG16
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.vgg16 import preprocess_input

from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder

import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms

import os
from PIL import Image
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

import os
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from collections import Counter
import nltk
from torchvision import models

from torch.nn.utils.rnn import pack_padded_sequence

In [None]:
# Mounting the drive

drive.mount('/content/drive',force_remount=True)

In [None]:
# Importing Data, Data Preprocessing, Building Vocabulary

nltk.download('punkt')

class Vocabulary:
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0

    def add_word(self, word):
        if word not in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1

    def __call__(self, word):
        if word not in self.word2idx:
            return self.word2idx['<unk>']
        return self.word2idx[word]

    def __len__(self):
        return len(self.word2idx)

def build_vocab(captions_file, threshold=1):
    counter = Counter()
    df = pd.read_csv(captions_file, delimiter=',', header=None, names=['image', 'caption'])
    for caption in df['caption']:
        tokens = nltk.tokenize.word_tokenize(caption.lower())
        counter.update(tokens)

    words = [word for word, cnt in counter.items() if cnt >= threshold]

    vocab = Vocabulary()
    vocab.add_word('<pad>')
    vocab.add_word('<start>')
    vocab.add_word('<end>')
    vocab.add_word('<unk>')

    for word in words:
        vocab.add_word(word)

    return vocab

captions_file = '/content/drive/My Drive/image_captions.zip/captions.txt'
vocab = build_vocab(captions_file)

vgg16 = VGG16(weights='imagenet', include_top=False)

class ImageCaptionDataset(Dataset):
    def __init__(self, images_dir, captions_file, vocab, transform=None):
        self.images_dir = images_dir
        self.captions = pd.read_csv(captions_file, delimiter=',', header=None, names=['image', 'caption'])
        self.vocab = vocab
        self.transform = transform
        self.feature_extractor = nn.Sequential(*list(models.vgg16(pretrained=True).children())[:-1])
        self.feature_extractor.eval()

    def __len__(self):
        return len(self.captions)

    def __getitem__(self, idx):
        img_name = os.path.join(self.images_dir, self.captions.iloc[idx, 0])
        image = Image.open(img_name).convert('RGB')
        caption = self.captions.iloc[idx, 1]

        if self.transform:
            image = self.transform(image)

        with torch.no_grad():
            features = self.feature_extractor(image.unsqueeze(0))
        features = features.view(-1)

        tokens = nltk.tokenize.word_tokenize(caption.lower())
        caption_indices = [self.vocab('<start>')] + [self.vocab(token) for token in tokens] + [self.vocab('<end>')]
        caption_tensor = torch.Tensor(caption_indices).long()

        return features, caption_tensor

images_dir = '/content/drive/My Drive/image_captions.zip/Images'

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize images for input to VGG16
])

dataset = ImageCaptionDataset(images_dir=images_dir, captions_file=captions_file, vocab=vocab, transform=transform)

def collate_fn(data):
    images, captions = zip(*data)

    images = torch.stack(images, 0)

    lengths = [len(cap) for cap in captions]
    max_length = max(lengths)
    padded_captions = torch.zeros(len(captions), max_length).long()
    for i, cap in enumerate(captions):
        end = lengths[i]
        padded_captions[i, :end] = cap[:end]

    return images, padded_captions

dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

def tensor_to_caption(tensor, vocab):
    words = []
    for idx in tensor[0]:
        word = vocab.idx2word[idx.item()]
        if word == '<end>':
            break
        if word != '<start>' and word != '<pad>':
            words.append(word)
    return ' '.join(words)

for images, captions in dataloader:
    print(images.size())
    print(captions)
    break


In [None]:
# Splitting into train, val, test datasets

from torch.utils.data import random_split

dataset_size = len(dataset)
train_size = 400
val_size = int(0.8 * dataset_size)
test_size = dataset_size - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

print(len(train_dataset))
print(len(val_dataset))
print(len(test_dataset))

In [None]:
# Define the model

# Model: Many - Many LSTM RNN

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyper-parameters

embed_size = 25088
hidden_size = 256
vocab_size = len(vocab)
num_layers = 1
learning_rate = 1e-3
num_epochs = 2

class RNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(RNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(0.5)
        self.softmax = nn.LogSoftmax(dim=2)

    def forward(self, features, captions):
        embeddings = self.dropout(self.embed(captions))
        embeddings = torch.cat((features.unsqueeze(1), embeddings), dim = 1)

        hiddens, _ = self.lstm(embeddings)
        outputs = self.linear(hiddens)
        outputs = self.softmax(outputs)

        #_, outputs = torch.max(outputs, dim=2)

        return outputs

    def caption_image(self, features, vocabulary, max_length = 50):
        result = []

        with torch.no_grad():
            x = features
            states = None

            for _ in range(max_length):

                hiddens, states = self.lstm(x, states)
                output = self.linear(hiddens)
                output = self.softmax(output.unsqueeze(1))
                predicted = output.argmax(2)
                result.append(predicted)

                x = self.embed(predicted)
                x = x.squeeze(0)
                if vocabulary.idx2word[predicted.item()] == "<end>":
                    break
        result_new = [vocabulary.idx2word[idx.item()] for idx in result]
        return result_new

In [None]:
# Instantiate Model

model = RNN(embed_size, hidden_size, vocab_size, num_layers).to(device)

# Loss and optimizer

criterion = nn.CrossEntropyLoss(ignore_index = vocab("<pad>"))
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Train the model

model.train()

for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (imgs, captions) in enumerate(train_loader):

        print(i)

        imgs = imgs.to(device)
        captions = captions.to(device)

        optimizer.zero_grad()

        lengths = [len(cap) for cap in captions]
        lengths = torch.tensor(lengths).to(device)

        outputs = model(imgs, captions)

        packed_targets = pack_padded_sequence(captions, lengths, batch_first=True, enforce_sorted=False).data
        outputs = pack_padded_sequence(outputs, lengths, batch_first=True, enforce_sorted=False).data

        targets = packed_targets.view(-1)


        loss = criterion(outputs, targets)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        print("Batch Loss:")
        print(running_loss)
        running_loss=0
        '''
        if i % 2000 == 1999:
            print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 2000))
            running_loss = 0.0
        '''

In [None]:
# Evaluating the BLEU score

import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# Ensure NLTK punkt is downloaded

nltk.download('punkt')
def calculate_bleu_scores(model, data_loader, vocab):
    model.eval()
    smoothing_function = SmoothingFunction().method1
    bleu_scores = []
    c = 0

    with torch.no_grad():
        for imgs, captions in test_loader:
            imgs = imgs.to(device)
            features = imgs
            for i in range(imgs.size(0)):
                print(c)
                c+=1
                reference_caption = captions[i].tolist()
                reference_caption = [vocab.idx2word[idx] for idx in reference_caption if (idx != vocab.word2idx['<pad>'] and idx != vocab.word2idx['<start>'] and idx != vocab.word2idx['<end>'])]
                reference_caption = [reference_caption]
                print(reference_caption)


                feature = features[i].unsqueeze(0)
                generated_caption_idx = model.caption_image(feature, vocab)

                generated_caption = generated_caption_idx
                print(generated_caption)

                # generated_caption = [vocab.idx2word[idx] for idx in generated_caption_idx] -- Converts into words, but above is already in words based on the function

                bleu_score = sentence_bleu(reference_caption, generated_caption, smoothing_function=smoothing_function)
                print(bleu_score)
                bleu_scores.append(bleu_score)

    avg_bleu_score = sum(bleu_scores) / len(bleu_scores)
    return avg_bleu_score

bleu_score = calculate_bleu_scores(model, test_loader, vocab)
print(f'Average BLEU score: {bleu_score}')