In [None]:
data_location="/kaggle/input/flickr8kimagescaptions/flickr8k"

In [None]:
!ls {data_location}/images | head -4


In [None]:
import pandas as pd

# Read and process data

In [None]:
df=pd.read_csv(data_location+"/captions.txt")

In [None]:
import os
print(os.listdir("/kaggle/input/flickr8kimagescaptions/flickr8k"))

In [None]:
df.head()

# Define dataset and dataloader

In [None]:
import os
from collections import Counter
import spacy
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader,Dataset
import torchvision.transforms as T

from PIL import Image

## Using spacy pretrained model for tokenizing

In [None]:
spacy_eng = spacy.load("en_core_web_sm")

In [None]:
class Vocab:
    def __init__(self,threshold_freq):
        #setting the pre-reserved tokens int to string tokens
        self.itos = {0:"<PAD>",1:"<SOS>",2:"<EOS>",3:"<UNK>"}

        #string to int tokens
        #its reverse dict self.itos
        self.stoi = {v:k for k,v in self.itos.items()}

        self.freq_threshold = threshold_freq

    def __len__(self): return len(self.itos)

    @staticmethod
    def tokenize(text):
        return [token.text.lower() for token in spacy_eng.tokenizer(text)]

    def build_vocab(self, sentence_list):
        frequencies = Counter()
        idx = 4

        for sentence in sentence_list:
            for word in self.tokenize(sentence):
                frequencies[word] += 1

                #add the word to the vocab if it reaches minum frequecy threshold
                if frequencies[word] == self.freq_threshold:
                    self.stoi[word] = idx
                    self.itos[idx] = word
                    idx += 1

    def numericalize(self,text):
        tokenized_text = self.tokenize(text)
        return [ self.stoi[token] if token in self.stoi else self.stoi["<UNK>"] for token in tokenized_text ]

In [None]:
class flickrdataset(Dataset):
    def __init__(self,root_dir,captions_file,transform=None,frequency_threshold=5):
        self.root_dir=root_dir
        self.df=pd.read_csv(captions_file)
        self.transform=transform
        self.imgs=self.df["image"]
        self.captions=self.df["caption"]

        self.vocab = Vocab(frequency_threshold)
        self.vocab.build_vocab(self.captions.tolist())
    def __len__(self):
        return len(self.df)
    def __getitem__(self,idx):
        captions=self.captions[idx]
        images=self.imgs[idx]
        img=Image.open(self.root_dir+"/"+images).convert("RGB")
        if self.transform:
            img=self.transform(img)
        cap_vec=[]
        cap_vec+=[self.vocab.stoi["<SOS>"]]
        cap_vec+=self.vocab.numericalize(captions)
        cap_vec+=[self.vocab.stoi["<EOS>"]]
        return img,torch.tensor(cap_vec)

## pipeline to process image


In [None]:
# transform pipeline to process image
transforms=T.Compose([T.Resize(256),
    T.RandomCrop(224),
    T.ToTensor()
    #T.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))
                     ]
                    )

In [None]:
import matplotlib.pyplot as plt
def show_image(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)

In [None]:
dataset =  flickrdataset(
    root_dir = data_location+"/images",
    captions_file = data_location+"/captions.txt",
    transform=transforms
)

In [None]:
img,cap=dataset[0]
show_image(img,"Image")
print("Token:",cap)
print("Sentence:")
print([dataset.vocab.itos[token] for token in cap.tolist()])

In [None]:
from torch.utils.data.sampler import SubsetRandomSampler
import numpy as np

BATCH_SIZE = 256
NUM_WORKER = 2

#token to represent the padding
pad_idx = dataset.vocab.stoi["<PAD>"]
def collate_function(batch):

    imgs = [item[0].unsqueeze(0) for item in batch]
    imgs = torch.cat(imgs,dim=0)
    targets = [item[1] for item in batch]
    targets = pad_sequence(targets, batch_first=True, padding_value=dataset.vocab.stoi["<PAD>"])
    return imgs,targets

validation_split = .2
shuffle_dataset = True
random_seed= 42

# Creating data indices for training and validation splits:
dataset_size = len(dataset)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))
if shuffle_dataset :
    np.random.seed(random_seed)
    np.random.shuffle(indices)
train_indices, val_indices = indices[split:], indices[:split]

# Creating PT data samplers and loaders:
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)

train_loader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE,
                                           sampler=train_sampler,num_workers=NUM_WORKER,collate_fn=collate_function)
validation_loader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE,
                                                sampler=valid_sampler,num_workers=NUM_WORKER,collate_fn=collate_function)


In [None]:
dataiter=next(iter(train_loader))
images, captions_ = dataiter

for k in range(4):
    img,captions=images[k],captions_[k]
    show_image(img," ".join([dataset.vocab.itos[token] for token in captions.tolist() if token!=dataset.vocab.stoi["<EOS>"]and token!=dataset.vocab.stoi["<PAD>"]]))
    plt.show()

# Define model classes

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim
from torchvision.models.resnet import ResNet50_Weights

## Normal attention

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class EncoderCNN(nn.Module):
    def __init__(self):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet50(weights=ResNet50_Weights.DEFAULT)
        for p in resnet.parameters():
            p.requires_grad_(False)
        modules = list(resnet.children())[:-2]
        self.resnet = nn.Sequential(*modules)

    def forward(self, images):
        features = self.resnet(images)
        features = features.permute(0, 2, 3, 1)
        features = features.view(features.size(0), -1, features.size(-1))
        return features  # (batch_size, 49, 2048)


class Attention(nn.Module):
    def __init__(self, encoder_dim, decoder_dim):
        super(Attention, self).__init__()
        self.encoder_dim = encoder_dim
        self.decoder_dim = decoder_dim
        self.attention = nn.Linear(decoder_dim, encoder_dim)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, features, hidden_state):
        att = self.attention(hidden_state)  # (batch_size, encoder_dim)
        att = att.unsqueeze(2)  # (batch_size, encoder_dim, 1)
        scores = torch.bmm(features, att).squeeze(2)  # (batch_size, 49)
        alpha = self.softmax(scores)  # (batch_size, 49)
        context = torch.bmm(features.transpose(1, 2), alpha.unsqueeze(2)).squeeze(2)  # (batch_size, encoder_dim)
        return alpha, context


class DecoderRNN(nn.Module):
    def __init__(self, embed_size, vocab_size, attention_dim, encoder_dim, decoder_dim, drop_prob=0.3):
        super(DecoderRNN, self).__init__()

        self.vocab_size = vocab_size
        self.attention_dim = attention_dim
        self.decoder_dim = decoder_dim

        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.attention = Attention(encoder_dim, decoder_dim)

        self.init_h = nn.Linear(encoder_dim, decoder_dim)
        self.init_c = nn.Linear(encoder_dim, decoder_dim)
        self.lstm_cell = nn.LSTMCell(embed_size + encoder_dim, decoder_dim, bias=True)
        self.fcn = nn.Linear(decoder_dim, vocab_size)
        self.drop = nn.Dropout(drop_prob)

    def forward(self, features, captions):
        embeds = self.embedding(captions)
        h, c = self.init_hidden_state(features)

        seq_length = len(captions[0]) - 1
        batch_size = captions.size(0)

        preds = torch.zeros(batch_size, seq_length, self.vocab_size).to(features.device)
        alphas = torch.zeros(batch_size, seq_length, features.size(1)).to(features.device)

        for s in range(seq_length):
            alpha, context = self.attention(features, h)
            lstm_input = torch.cat((embeds[:, s], context), dim=1)
            h, c = self.lstm_cell(lstm_input, (h, c))
            output = self.fcn(self.drop(h))
            preds[:, s] = output
            alphas[:, s] = alpha

        return preds, alphas

    def generate_caption(self, features, max_len=20, vocab=None):
        batch_size = features.size(0)
        h, c = self.init_hidden_state(features)
        alphas = []
        word = torch.tensor(vocab.stoi['<SOS>']).view(1, -1).to(features.device)
        embeds = self.embedding(word)
        captions = []

        for i in range(max_len):
            alpha, context = self.attention(features, h)
            alphas.append(alpha.cpu().detach().numpy())
            lstm_input = torch.cat((embeds[:, 0], context), dim=1)
            h, c = self.lstm_cell(lstm_input, (h, c))
            output = self.fcn(self.drop(h))
            output = output.view(batch_size, -1)
            predicted_word_idx = output.argmax(dim=1)
            captions.append(predicted_word_idx.item())

            if vocab.itos[predicted_word_idx.item()] == "<EOS>":
                break

            embeds = self.embedding(predicted_word_idx.unsqueeze(0))

        return [vocab.itos[idx] for idx in captions], alphas

    def init_hidden_state(self, encoder_out):
        mean_encoder_out = encoder_out.mean(dim=1)
        h = self.init_h(mean_encoder_out)
        c = self.init_c(mean_encoder_out)
        return h, c


## Bahadnau attention

In [None]:
class EncoderCNN(nn.Module):
    def __init__(self):
        super(EncoderCNN,self).__init__()
        resnet=models.resnet50(weights=ResNet50_Weights.DEFAULT)
        for p in resnet.parameters():
            p.requires_grad_(False)
        modules=list(resnet.children())[:-2]
        self.resnet=nn.Sequential(*modules)
    def forward(self,images):
        features=self.resnet(images)
        features=features.permute(0,2,3,1)
        features=features.view(features.size(0),-1,features.size(-1))
        return features #(batch_size,49,2048)


In [None]:
class BahadnauAttention(nn.Module):
    def __init__(self):
        super(BahadnauAttention,self).__init__()
        self.U=nn.Linear(512,512)
        self.W=nn.Linear(2048,512)
        self.v=nn.Linear(512,1)
        self.tanh=nn.Tanh()
        self.softmax=nn.Softmax(1)

    def forward(self,features,hidden_states):
        U_hidden=self.U(hidden_states)#(batch_size,512)
        W_features=self.W(features)#(batch_size,49,512)
        attention=self.tanh(U_hidden.unsqueeze(1)+W_features)
        e=self.v(attention).squeeze(2) #(batch_size,1)
        alpha=self.softmax(e)
        context=(features*alpha.unsqueeze(2)).sum(1)
        return alpha,context

In [None]:
class DecoderRNNBahadnau(nn.Module):
    def __init__(self,embed_size, vocab_size, attention_dim,encoder_dim,decoder_dim,drop_prob=0.3):
        super().__init__()

        #save the model param
        self.vocab_size = vocab_size
        self.attention_dim = attention_dim
        self.decoder_dim = decoder_dim

        self.embedding = nn.Embedding(vocab_size,embed_size)
        self.attention = BahadnauAttention()


        self.init_h = nn.Linear(encoder_dim, decoder_dim)
        self.init_c = nn.Linear(encoder_dim, decoder_dim)
        self.lstm_cell = nn.LSTMCell(embed_size+encoder_dim,decoder_dim,bias=True)
        self.f_beta = nn.Linear(decoder_dim, encoder_dim)


        self.fcn = nn.Linear(decoder_dim,vocab_size)
        self.drop = nn.Dropout(drop_prob)



    def forward(self, features, captions):

        #vectorize the caption
        embeds = self.embedding(captions)

        # Initialize LSTM state
        h, c = self.init_hidden_state(features)  # (batch_size, decoder_dim)

        #get the seq length to iterate
        seq_length = len(captions[0])-1 #Exclude the last one
        batch_size = captions.size(0)
        num_features = features.size(1)

        preds = torch.zeros(batch_size, seq_length, self.vocab_size).to(device)
        alphas = torch.zeros(batch_size, seq_length,num_features).to(device)

        for s in range(seq_length):
            alpha,context = self.attention(features, h)
            lstm_input = torch.cat((embeds[:, s], context), dim=1)
            h, c = self.lstm_cell(lstm_input, (h, c))

            output = self.fcn(self.drop(h))

            preds[:,s] = output
            alphas[:,s] = alpha


        return preds, alphas

    def generate_caption(self,features,max_len=20,vocab=None):
        # Inference part
        # Given the image features generate the captions

        batch_size = features.size(0)
        h, c = self.init_hidden_state(features)  # (batch_size, decoder_dim)

        alphas = []

        #starting input
        word = torch.tensor(vocab.stoi['<SOS>']).view(1,-1).to(device)
        embeds = self.embedding(word)


        captions = []

        for i in range(max_len):
            alpha,context = self.attention(features, h)


            #store the apla score
            alphas.append(alpha.cpu().detach().numpy())

            lstm_input = torch.cat((embeds[:, 0], context), dim=1)
            h, c = self.lstm_cell(lstm_input, (h, c))
            output = self.fcn(self.drop(h))
            output = output.view(batch_size,-1)


            #select the word with most val
            predicted_word_idx = output.argmax(dim=1)

            #save the generated word
            captions.append(predicted_word_idx.item())

            #end if <EOS detected>
            if vocab.itos[predicted_word_idx.item()] == "<EOS>":
                break

            #send generated word as the next caption
            embeds = self.embedding(predicted_word_idx.unsqueeze(0))

        #covert the vocab idx to words and return sentence
        return [vocab.itos[idx] for idx in captions],alphas


    def init_hidden_state(self, encoder_out):
        mean_encoder_out = encoder_out.mean(dim=1)
        h = self.init_h(mean_encoder_out)  # (batch_size, decoder_dim)
        c = self.init_c(mean_encoder_out)
        return h, c

In [None]:
class EncoderDecoderBahadnau(nn.Module):
    def __init__(self,embed_size, vocab_size, attention_dim,encoder_dim,decoder_dim,drop_prob=0.3):
        super().__init__()
        self.encoder = EncoderCNN()
        self.decoder = DecoderRNNBahadnau(
            embed_size=embed_size,
            vocab_size = len(dataset.vocab),
            attention_dim=attention_dim,
            encoder_dim=encoder_dim,
            decoder_dim=decoder_dim
        )

    def forward(self, images, captions):
        features = self.encoder(images)
        outputs = self.decoder(features, captions)
        return outputs

In [None]:
class EncoderDecoderAttn(nn.Module):
    def __init__(self,embed_size, vocab_size, attention_dim,encoder_dim,decoder_dim,drop_prob=0.3):
        super().__init__()
        self.encoder = EncoderCNN()
        self.decoder = DecoderRNN(
            embed_size=embed_size,
            vocab_size = len(dataset.vocab),
            attention_dim=attention_dim,
            encoder_dim=encoder_dim,
            decoder_dim=decoder_dim
        )

    def forward(self, images, captions):
        features = self.encoder(images)
        outputs = self.decoder(features, captions)
        return outputs

In [None]:
embed_size=300
vocab_size = len(dataset.vocab)

# attention_dim=64
# encoder_dim=512
# decoder_dim=128

attention_dim=256
encoder_dim=2048
decoder_dim=512

learning_rate = 3e-4

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
modelAttn = EncoderDecoderAttn(
    embed_size=embed_size,
    vocab_size = vocab_size,
    attention_dim=attention_dim,
    encoder_dim=encoder_dim,
    decoder_dim=decoder_dim
).to(device)

criterionAttn = nn.CrossEntropyLoss(ignore_index=dataset.vocab.stoi["<PAD>"])
optimizerAttn = optim.Adam(modelAttn.parameters(), lr=learning_rate)

In [None]:
modelBahadnau = EncoderDecoderBahadnau(
    embed_size=embed_size,
    vocab_size = vocab_size,
    attention_dim=attention_dim,
    encoder_dim=encoder_dim,
    decoder_dim=decoder_dim
).to(device)

criterionBahadnau = nn.CrossEntropyLoss(ignore_index=dataset.vocab.stoi["<PAD>"])
optimizerBahadnau = optim.Adam(modelBahadnau.parameters(), lr=learning_rate)

In [None]:
def save_model(model, name,num_epochs):
    model_state = {
        'num_epochs':num_epochs,
        'embed_size':embed_size,
        'vocab_size':len(dataset.vocab),
        'attention_dim':attention_dim,
        'encoder_dim':encoder_dim,
        'decoder_dim':decoder_dim,
        'state_dict':model.state_dict()
    }

    torch.save(model_state,f'{name}.pth')

In [None]:

def train(model,train_loader,validation_loader,optimizer,criterion,num_epochs, print_every, name = 'normal'):
    losses = []

    for epoch in range(1, num_epochs + 1):
        losses_per_epoch = 0

        for idx, (image, captions) in enumerate(train_loader):
            image, captions = image.to(device), captions.to(device)
            optimizer.zero_grad()

            # Forward pass
            features = model.encoder(image)
            outputs, attentions = model.decoder(features, captions)

            # Calculate loss
            targets = captions[:, 1:]
            loss = criterion(outputs.view(-1, vocab_size), targets.reshape(-1))
            losses_per_epoch += loss.item()

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            if (idx + 1) % print_every == 0:
                print("Epoch: {} loss: {:.5f}".format(epoch, loss.item()))

                # Generate caption
                model.eval()
                with torch.no_grad():
                    dataiter = iter(validation_loader)
                    img, _ = next(dataiter)
                    features = model.encoder(img[0:1].to(device))
                    caps, _ = model.decoder.generate_caption(features, vocab=dataset.vocab)
                    caption = ' '.join(caps)
                    show_image(img[0], title=caption)

                model.train()
#             break
        losses.append(losses_per_epoch)
    save_model(model,name,epoch)


### Train model with normal attention

In [None]:
num_epochs = 100
print_every = 100

train(modelAttn,
      train_loader,
      validation_loader,
      optimizerAttn,
      criterionAttn,
      num_epochs,
      print_every,
      name = 'Normal Attention')


### Train model with Bahadnau attention

In [None]:
train(modelBahadnau,
      train_loader,
      validation_loader,
      optimizerBahadnau,
      criterionBahadnau,
      num_epochs,
      print_every,
      name = 'Bahadnau Attention')

### Test

In [None]:
#testing
import nltk
from nltk.translate.bleu_score import sentence_bleu

modelAttn.eval()
img,caption=next(iter(validation_loader))
img_=img[50]
print(img.size())
show_image(img_," ".join([dataset.vocab.itos[k] for k in caption[50].tolist() if dataset.vocab.itos[k]!="<PAD>"]))
features = modelAttn.encoder(img_.unsqueeze(0).to(device))
caps,alphas = modelAttn.decoder.generate_caption(features,vocab=dataset.vocab)
caption_ = ' '.join(caps)
reference=[[dataset.vocab.itos[k] for k in caption[50].tolist() if dataset.vocab.itos[k]!="<PAD>"]]
candidate=caps
print(reference,candidate)
score=sentence_bleu(reference,candidate,weights=(1,0,0,0))
print("Generated caption : ",caption_)
print("Bleu 1 gram score : ",score)

In [None]:
#checking corpus bleu score
from nltk.translate.bleu_score import corpus_bleu

def score_model(model,validation_loader):
    references=[]
    candidates=[]
    for idx,(image,caption) in enumerate(iter(validation_loader)):
        for k in range(image.size()[0]):
            features=model.encoder(image[k].unsqueeze(0).to(device))
            caps,alpha=model.decoder.generate_caption(features,vocab=dataset.vocab)
            reference=[dataset.vocab.itos[i] for i in caption[k].tolist() if dataset.vocab.itos[i]!="<PAD>"]
            references.append(reference)
            candidates.append(caps)

    print("bleu 1 score : ",corpus_bleu(references,candidates,weights=(1,0,0,0)))
    print("bleu 2 score : ",corpus_bleu(references,candidates,weights=(0.5,0.5,0,0)))
    print("bleu 3 score : ",corpus_bleu(references,candidates,weights=(0.33,0.33,0.33,0)))
    print("bleu 4 score : ",corpus_bleu(references,candidates,weights=(0.25,0.25,0.25,0.25)))


In [None]:
score_model(modelAttn, validation_loader)

In [None]:
score_model(modelBahadnau, validation_loader)