## Import Libraries


In [None]:
import torch
import torchvision as tv
import torch.nn as nn
import pandas as pd
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
import torchvision.models as models
from torchsummary import summary
from torch.utils.data import Dataset, DataLoader
import spacy
import torch.optim as optim
import warnings
import re

warnings.filterwarnings("ignore")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

## Load Data


In [None]:
BASE_DIRECTORY = "dataset"
# BASE_DIRECTORY = "/kaggle/input/flickr8k"

df = pd.read_csv(BASE_DIRECTORY + "/captions.txt")
df["path"] = df["image"].apply(lambda x: BASE_DIRECTORY + "/Images/" + x)
df = df.rename(columns={"image": "id"})
df["id"] = df["id"].str.replace(".jpg", "")
df.head()

In [None]:
print("Dataframe shape:", df.shape)
unique_id_count = df["id"].nunique()
print("Number of samples", unique_id_count)

### Build Dictionary


In [None]:
data = {}

for i in range(0, len(df), 5):
    id = df["id"][i]
    captions = [df["caption"][j] for j in range(i, i + 5)]
    path = df["path"][i]
    data[id] = {"captions": captions, "path": path}

keys = list(data.keys())

key = keys[0]
value = data[key]

print(f"Key: {key}")
value

### Split the dictionary into train, test, and validation sets


In [None]:
def read_ids_from_file(filename):
    with open(filename, "r") as f:
        ids = [line.strip() for line in f]
    return ids


# BASE_DIRECTORY = "/kaggle/input/id-files"

train_keys = read_ids_from_file(BASE_DIRECTORY + "/train_id.txt")
val_keys = read_ids_from_file(BASE_DIRECTORY + "/val_id.txt")
test_keys = read_ids_from_file(BASE_DIRECTORY + "/test_id.txt")

# Create the training, validation, and testing sets
train_data = {key: data[key] for key in train_keys}
val_data = {key: data[key] for key in val_keys}
test_data = {key: data[key] for key in test_keys}

train_keys = list(train_data.keys())
val_keys = list(val_data.keys())
test_keys = list(test_data.keys())

print("Training set size:", len(train_data))
print("Validation set size:", len(val_data))
print("Testing set size:", len(test_data))

## Read & Show Image Data


In [None]:
def read_image(path):
    return Image.open(path)


def show_image(image):
    plt.imshow(image)
    plt.axis("off")
    plt.show()


def read_from_tensor(tensor):
    img_numpy = tensor.permute(1, 2, 0).numpy()
    img_numpy = np.clip(img_numpy, 0, 1)
    plt.imshow(img_numpy)
    plt.axis("off")
    plt.show()

### Example of an image with captions


In [None]:
index = 0
show_image(read_image(train_data[train_keys[index]]["path"]))
for i in range(5):
    print(train_data[train_keys[index]]["captions"][i])

## Image Preprocessing


### Transformations


In [None]:
train_transform = transforms.Compose(
    [
        transforms.RandomHorizontalFlip(),
        transforms.RandomResizedCrop(
            224, scale=(0.9, 1.0), ratio=(0.95, 1.05), antialias=True
        ),
        transforms.RandomRotation(10),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

val_test_transform = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

### Data to Tensor Conversion


In [None]:
train_tensors = [
    train_transform(read_image(train_data[key]["path"])) for key in train_keys
]
train_tensors = torch.stack(train_tensors)

val_tensors = [
    val_test_transform(read_image(val_data[key]["path"])) for key in val_keys
]
val_tensors = torch.stack(val_tensors)

test_tensors = [
    val_test_transform(read_image(test_data[key]["path"])) for key in test_keys
]
test_tensors = torch.stack(test_tensors)


print("Training tensor shape:", train_tensors.shape)
print("Validation tensor shape:", val_tensors.shape)
print("Testing tensor shape:", test_tensors.shape)

### Map the captions to the image from tensors


In [None]:
read_from_tensor(train_tensors[index])
for i in range(5):
    print(train_data[train_keys[index]]["captions"][i])

## Dataset & DataLoader


## Dataset


In [None]:
class ImageCaptionDataset(Dataset):
    def __init__(self, data, keys, tensors):
        self.data = data
        self.keys = keys
        self.tensors = tensors

    def __len__(self):
        return len(self.keys)

    def __getitem__(self, index):
        key = self.keys[index]
        tensor = self.tensors[index]
        captions = self.data[key]["captions"]
        return tensor, captions

In [None]:
train_dataset = ImageCaptionDataset(train_data, train_keys, train_tensors)
val_dataset = ImageCaptionDataset(val_data, val_keys, val_tensors)
test_dataset = ImageCaptionDataset(test_data, test_keys, test_tensors)

X, y = train_dataset[index]
print(X)
read_from_tensor(X)
for i in range(5):
    print(y[i])

## DataLoader


In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

In [None]:
train_dataloader.dataset

## ResNet


In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet50(pretrained=True)
        for param in resnet.parameters():
            param.requires_grad_(False)

        modules = list(resnet.children())[
            :-2
        ]  # Remove the last two layers (fc and avgpool)
        self.resnet = nn.Sequential(*modules)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(
            resnet.fc.in_features, embed_size
        )  # Resnet fc.in_features is 2048
        self.ln = nn.LayerNorm(embed_size)  # Use LayerNorm instead of BatchNorm1d
        self.embed_size = embed_size

    def forward(self, images):
        features = self.resnet(images)
        features = self.avgpool(features)
        features = features.view(features.size(0), -1)  # Flatten to (batch_size, 2048)
        features = self.fc(features)  # Reduce to (batch_size, embed_size)
        features = self.ln(features)
        features = features.view(
            features.size(0), 1, 1, self.embed_size
        )  # Reshape to (batch_size, 1, 1, embed_size)
        return features


# Instantiate the model and move it to the appropriate device
encoder = EncoderCNN(512).to(device)

# Test the encoder with a single image
test_image = train_tensors[index].unsqueeze(0).to(device)
print(test_image.shape)

feature = encoder(test_image)
print(feature.shape)

## RNN Preprocessing


In [None]:
nlp = spacy.load("en_core_web_sm")


class Vocab:
    def __init__(self):
        self.vocab_map = {"PAD": 0, "SOS": 1, "EOS": 2, "UNK": 3}
        self.index_map = {0: "PAD", 1: "SOS", 2: "EOS", 3: "UNK"}
        self.max_length = 0
        self.sequences = []

    def build_sequences(self):
        for data in train_dataset:
            _, y = data
            for sequence in y:
                self.sequences.append(sequence)
                self.max_length = max(self.max_length, len(sequence.split(" ")))
        print(len(self.sequences))

    def build_vocab(self):
        self.build_sequences()
        index = 4
        for sequence in self.sequences:
            for word in self.tokenize(sequence):
                if word not in self.vocab_map:
                    self.index_map[index] = word
                    self.vocab_map[word] = index
                    index += 1

    def clean(self, sequence):
        # preprocessing steps
        # convert to lowercase
        sequence = sequence.lower()
        # delete digits, special chars, etc.,
        sequence = re.sub("[^A-Za-z]", "", sequence)
        # add start and end tags to the caption
        sequence = "SOS " + sequence + " EOS"
        return sequence

    def tokenize(self, sequence):
        tokenized_sequence = []
        tokenized_sequence.append("SOS")
        for token in nlp.tokenizer(sequence):
            if token.text != ".":
                tokenized_sequence.append(token.text.lower())

        return tokenized_sequence

    def add_padd(self, sequence):

        sequence = sequence + " PAD" * (self.max_length - len(sequence.split(" ")))
        return sequence

    def __len__(self):
        return len(self.vocab_map)

    def vectorize(self, sequence):
        tokenized_sequence = self.tokenize(sequence)
        for i in range(len(tokenized_sequence), self.max_length):
            tokenized_sequence.append("PAD")
        vectorized_sequence = [
            (
                self.vocab_map[token]
                if token in self.vocab_map
                else self.vocab_map["UNK"]
            )
            for token in tokenized_sequence
        ]
        return vectorized_sequence

In [None]:
vocab = Vocab()
vocab.build_vocab()

vocab_map = vocab.vocab_map
vocab_size = len(vocab_map)
print("Vocab size:", vocab_size)

## RNN


### Attention Mechanism


In [None]:
class Attention(nn.Module):
    def __init__(self, encoder_dim, decoder_dim, attention_dim):
        super(Attention, self).__init__()
        self.encoder_att = nn.Linear(encoder_dim, attention_dim)
        self.decoder_att = nn.Linear(decoder_dim, attention_dim)
        self.full_att = nn.Linear(attention_dim, 1)
        self.softmax = nn.Softmax(dim=1)
        self.tanh = nn.Tanh()

    def forward(self, encoder_out, decoder_hidden):
        att1 = self.encoder_att(encoder_out)
        att2 = self.decoder_att(decoder_hidden)
        att = self.full_att(self.tanh(att1 + att2.unsqueeze(1))).squeeze(2)
        alpha = self.softmax(att)
        attention_weighted_encoding = (encoder_out * alpha.unsqueeze(2)).sum(dim=1)
        return attention_weighted_encoding, alpha

### RNN Decoder


In [None]:
class RNNModule(nn.Module):
    def __init__(
        self,
        embed_size,
        hidden_size,
        input_size,
        vocab_size,
        attention_dim,
        num_layers=1,
    ):
        super(RNNModule, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.vocab_size = vocab_size
        self.attention = Attention(
            encoder_dim=input_size, decoder_dim=hidden_size, attention_dim=attention_dim
        )
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.init_h = nn.Linear(input_size, hidden_size)
        self.init_c = nn.Linear(input_size, hidden_size)
        self.lstm = nn.LSTM(
            embed_size + input_size, hidden_size, num_layers, batch_first=True
        )
        self.fc = nn.Linear(hidden_size, vocab_size)
        self.init_weights()

    def init_weights(self):
        torch.nn.init.xavier_uniform_(self.fc.weight)
        torch.nn.init.xavier_uniform_(self.embedding.weight)
        self.fc.bias.data.fill_(0)

    def forward(self, features, captions):
        embeddings = self.embedding(captions)
        hidden, cell = self.init_hidden(features)
        outputs = torch.zeros(captions.size(0), captions.size(1), self.vocab_size).to(
            features.device
        )
        for t in range(captions.size(1)):
            attention_weighted_encoding, _ = self.attention(features, hidden[-1])
            lstm_input = torch.cat(
                (embeddings[:, t], attention_weighted_encoding), dim=1
            ).unsqueeze(1)
            lstm_output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
            outputs[:, t] = self.fc(lstm_output.squeeze(1))

        return outputs

    def init_hidden(self, features):
        mean_features = features.mean(dim=1)
        h = self.init_h(mean_features).unsqueeze(0).repeat(self.num_layers, 1, 1)
        c = self.init_c(mean_features).unsqueeze(0).repeat(self.num_layers, 1, 1)
        return h, c

    def generate_caption(self, features, vocab, max_caption_length=20):
        caption = []
        hidden, cell = self.init_hidden(features)
        sos = torch.tensor(vocab.vocab_map["SOS"]).view(1, -1).to(features.device)
        embed = self.embedding(sos)
        for i in range(max_caption_length):
            attention_weighted_encoding, _ = self.attention(features, hidden[-1])
            print(embed.shape, attention_weighted_encoding.shape)
            lstm_input = torch.cat(
                (embed[:, 0], attention_weighted_encoding), dim=1
            ).unsqueeze(1)
            lstm_output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
            outputs = self.fc(lstm_output.squeeze(1))
            outputs = outputs.argmax(1)
            caption.append(outputs.item())
            if outputs.item() == vocab.vocab_map["EOS"]:
                break
            embed = self.embedding(outputs.unsqueeze(0))
        return caption

In [None]:
class EncoderDecoder(nn.Module):
    def __init__(
        self,
        embed_size,
        hidden_size,
        input_size,
        vocab_size,
        attention_dim,
        num_layers=1,
    ):
        super(EncoderDecoder, self).__init__()
        self.encoder = EncoderCNN(embed_size)
        self.decoder = RNNModule(
            embed_size, hidden_size, input_size, vocab_size, attention_dim, num_layers
        )

    def forward(self, images, captions):
        features = self.encoder(images)
        features = features.squeeze(0)
        outputs = self.decoder(features, captions)
        return outputs

In [None]:
embed_size = 512
hidden_size = 512
input_size = 512
attention_dim = 256
num_layers = 1
learning_rate = 0.001
epochs = 10
print_every = 10

In [None]:
model = EncoderDecoder(
    embed_size, hidden_size, input_size, vocab_size, attention_dim, num_layers
).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=vocab_map["PAD"])
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
for epoch in range(epochs):
    for i, (images, captions) in enumerate(train_dataloader.dataset):
        captions = [vocab.vectorize(caption) for caption in captions]
        captions = torch.tensor(captions).to(device)
        images = images.to(device)
        if len(images.shape) == 3:
            images = images.unsqueeze(0)
        outputs = model(images, captions[0].unsqueeze(0))
        # compute the loss function over the outputs and first caption from the captions
        print()
        loss = criterion(outputs.view(-1, vocab_size), captions[0].view(-1))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i + 1) % print_every == 0:
            print("Epoch: {} loss: {:.5f}".format(epoch, loss.item()))
            model.eval()
            with torch.no_grad():
                dataiter = iter(train_dataloader.dataset)
                img, _ = next(dataiter)
                features = model.encoder(
                    images
                )  # Take the first image and add batch dimension
                print(features.shape)

                caption_ids = model.decoder.generate_caption(
                    features.squeeze(0), vocab=vocab, max_caption_length=20
                )
                caption = " ".join([vocab.index_map[idx] for idx in caption_ids])
                print(caption)
                # show_image(img[0], title=caption)

            model.train()

In [None]:
rnn = RNNModule(
    embed_size, hidden_size, input_size, vocab_size, attention_dim, num_layers
).to(device)
features = encoder(test_image)
captions = (
    torch.tensor(vocab.vectorize("SOS a dog is running EOS")).unsqueeze(0).to(device)
)
features = features.squeeze(0)
print(features.shape)
print(captions.shape)

outputs = rnn(features, captions)
print(outputs.shape)
print(outputs)