# Artificial Neural Networks - 3rd Assignment - Armin Abbasi Najarzadeh

# Importing Libraries

In [None]:
import os
import spacy
import torch
import numpy as np
import pandas as pd
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

from PIL import Image
from torch import nn
from pathlib import Path
from matplotlib import pyplot as plt
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchvision.models import vgg16, VGG16_Weights

spacy_eng = spacy.load("en_core_web_sm")

In [None]:
!pip install torchsummary
!pip install pycocoevalcap

from torchsummary import summary
from pycocoevalcap.bleu.bleu import Bleu
from pycocoevalcap.meteor.meteor import Meteor
from pycocoevalcap.rouge.rouge import Rouge
from pycocoevalcap.cider.cider import Cider

### Setting the Device

In [None]:
if torch.cuda.is_available():
    torch.backends.cudnn.deterministic = True

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

print("Device:", device)

# Preparing Dataset

## Custom Vocabulary

In [4]:
class Vocabulary:
    def __init__(self, freq_threshold):
        self.freq_threshold = freq_threshold
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {v: k for k, v in self.itos.items()}
    
    def __len__(self):
        return len(self.itos)
  
    def build_vocab(self, sentence_list):
        freqs = {}
        idx = 4

        for sentence in sentence_list:
            sentence = str(sentence)

            for word in self.tokenize(sentence):
                freqs[word] = freqs.get(word, 0) + 1

                if freqs[word] == self.freq_threshold:
                    self.itos[idx] = word
                    self.stoi[word] = idx
                    
                    idx += 1

    def numericalize(self, sentence):
        tokens = self.tokenize(sentence)
        result = []

        for token in tokens:
            result.append(self.stoi.get(token, self.stoi["<UNK>"]))

        return result
    
    @staticmethod
    def tokenize(sentence):
        return [token.text.lower() for token in spacy_eng.tokenizer(str(sentence))]

## Custom Dataset

In [5]:
class Flickr(Dataset):
    def __init__(self, root_dir, caption_path, transform, freq_threshold=5):
        self.freq_threshold = freq_threshold
        self.transform = transform
        self.root_dir = root_dir
    
        self.df = pd.read_csv(caption_path, delimiter='|')
        
        self.images = self.df["image_name"]
        self.captions = self.df[" comment"]
        
        self.vocab = Vocabulary(freq_threshold)
        
        self.vocab.build_vocab(self.captions.tolist())
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        image, caption = self.images[index], self.captions[index]
        
        image = Image.open(os.path.join(self.root_dir, image)).convert("RGB")
        
        image = self.transform(image)
        
        numericalized_caption = [self.vocab.stoi["<SOS>"]]
        
        numericalized_caption += self.vocab.numericalize(caption)
        
        numericalized_caption.append(self.vocab.stoi["<EOS>"])
        
        return image, torch.tensor(numericalized_caption)
    
    def get_label(self, index):
        _, caption = self[index]
    
        label = [self.vocab.itos[token] for token in caption.tolist()]

        return label
        

## Custom Caption Collat

In [6]:
class CapCollat:
    def __init__(self, pad_seq, batch_first=False):
        self.pad_seq = pad_seq
        self.batch_first = batch_first
  
    def __call__(self, batch):
        images = [item[0].unsqueeze(0) for item in batch]
        images = torch.cat(images, dim=0)

        target_caps = [item[1] for item in batch]
        target_caps = pad_sequence(target_caps,
                                   batch_first=self.batch_first,
                                   padding_value=self.pad_seq)
        
        return images, target_caps

## Custom Scorer 

In [7]:
class Scorer():
    def __init__(self, references, candidates):
        self.references = references
        self.candidates = candidates

        self.word_based_scorers = [
            (Bleu(4), ["BLEU 1", "BLEU 2", "BLEU 3", "BLEU 4"]),
            (Meteor(),"METEOR"),
            (Rouge(), "ROUGE_L"),
            (Cider(), "CIDEr"),
            ]

    def compute_scores(self):
        total_scores = {}

        for scorer, method in self.word_based_scorers:
            score, _ = scorer.compute_score(self.references, self.candidates)
    
            if type(method) is list:
                total_scores["BLEU 1"] = score[0]
                total_scores["BLEU 2"] = score[1]
                total_scores["BLEU 3"] = score[2]
                total_scores["BLEU 4"] = score[3]

            else:
                total_scores[method] = score

        return total_scores

## Loading and Testing the Dataset 

In [8]:
root_folder = "/kaggle/input/flickr-image-dataset/flickr30k_images/flickr30k_images/"
csv_file = "/kaggle/input/flickr-image-dataset/flickr30k_images/results.csv"

transform = T.Compose([
                T.Resize((256, 256), interpolation=T.InterpolationMode.BILINEAR),
                T.CenterCrop((224, 224)),
                T.ToTensor(),
                T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                ])

batch_size = 64
num_workers = 2
freq_threshold = 5
batch_first = True
pin_memory = True

dataset = Flickr(root_folder, csv_file, transform, freq_threshold)
pad_idx = dataset.vocab.stoi["<PAD>"]

data_size = len(dataset)
train_size = int(0.9 * data_size)
val_size = data_size - train_size

val_set_start = train_size
val_set_end = train_size + val_size - 1

train_set, val_set = torch.utils.data.Subset(dataset, range(0, train_size)), torch.utils.data.Subset(dataset, range(train_size, data_size))

train_loader = DataLoader(train_set,
                            batch_size=batch_size,
                            pin_memory=pin_memory,
                            num_workers=num_workers,
                            shuffle=True,
                            collate_fn=CapCollat(pad_seq=pad_idx, batch_first=batch_first))

val_loader = DataLoader(val_set,
                            batch_size=batch_size,
                            pin_memory=pin_memory,
                            num_workers=num_workers,
                            shuffle=False,
                            collate_fn=CapCollat(pad_seq=pad_idx, batch_first=batch_first))

### Setting Vocabulary

In [9]:
vocab = dataset.vocab
vocab_size = len(vocab)

### Testing Scores

In [None]:
dummy_references = {}
dummy_candidates = {}

for idx in range(100):
    label = dataset.get_label(np.random.randint(0, data_size - 1))
    
    cut = np.random.randint(1, len(label) - 1)
    
    dummy_references[idx] = [" ".join(label)]
    dummy_candidates[idx] = [" ".join(label[:cut])]
    
test_metrics = Scorer(dummy_references, dummy_candidates).compute_scores()

In [None]:
cumulative_bleu_score = np.exp((np.log(test_metrics["BLEU 1"]) 
                                + np.log(test_metrics["BLEU 2"]) 
                                + np.log(test_metrics["BLEU 3"]) 
                                + np.log(test_metrics["BLEU 4"])) / 4)

print(f"Cumulative BLEU:\t{cumulative_bleu_score:.6f}")

print(f"CIDEr:\t\t\t{test_metrics['CIDEr']:.6f}")

print(f"ROUGE_L:\t\t{test_metrics['ROUGE_L']:.6f}")

print(f"METEOR:\t\t\t{test_metrics['METEOR']:.6f}")

### Testing Dataset

In [None]:
for _ in range(10):
    idx = np.random.randint(0, data_size - 1)

    image, _ = dataset[idx]
    
    label = " ".join(dataset.get_label(idx)[1:-1])

    image = image.permute(1, 2, 0)
    
    plt.title(label)
    plt.imshow(image)

    plt.show()

# Custom Models

## Encoder

In [28]:
class Encoder(nn.Module):
        def __init__(self, model, embed_size: int):
            super(Encoder, self).__init__() 

            self.model = model
            
            for param in self.model.parameters():
                param.requires_grad = False
            
            embedding_layer = nn.Linear(1000, embed_size) 
            
            self.encoder = nn.Sequential(self.model,
                                         embedding_layer)
        
        def forward(self, image):
            encoded_image = self.encoder(image)
    
            return encoded_image

## Decoder

In [29]:
class Decoder(nn.Module):
    class Attention(nn.Module):
        def __init__(self, hidden_size: int):
            super(Decoder.Attention, self).__init__()
            self.hidden_size = hidden_size

            self.fc1 = nn.Linear(self.hidden_size, self.hidden_size, bias=False)
            self.fc2 = nn.Linear(self.hidden_size * 2, self.hidden_size, bias=False)

            self.softmax = nn.Softmax(dim=1)

        def forward(self, hidden_states):
            score_first_part = self.fc1(hidden_states)

            h_t = hidden_states[:,-1,:]

            score = torch.bmm(score_first_part, h_t.unsqueeze(2)).squeeze(2)

            attention_weights = self.softmax(score)

            context_vector = torch.bmm(hidden_states.permute(0,2,1), attention_weights.unsqueeze(2)).squeeze(2)

            pre_activation = torch.cat((context_vector, h_t), dim=1)

            attention_vector = self.fc2(pre_activation)
            attention_vector = torch.tanh(attention_vector)

            return attention_vector, attention_weights
        
    def __init__(self, model, embed_size: int, vocab_size: int, hidden_size: int, num_layers: int, attention: bool = False):
        super(Decoder, self).__init__()

        self.attention = attention

        self.model = model(embed_size, hidden_size, num_layers, batch_first=True)

        self.embedding = nn.Embedding(vocab_size, embed_size)
            
        self.linear = nn.Linear(hidden_size, 512)
        
        if self.attention:
            self.attention = self.Attention(hidden_size)
            self.linear = nn.Linear(embed_size + hidden_size, 512)
        
        self.classifier = nn.Sequential(
            self.linear,
            nn.ReLU(),
            nn.Linear(512, 1024),
            nn.ReLU(),
            nn.Linear(1024, 2048),
            nn.ReLU(),
            nn.Linear(2048, vocab_size)
        )

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, features, captions):
        captions_embed = self.embedding(captions)
        
        initial_hidden_state = features.unsqueeze(0)

        hidden_states = initial_hidden_state if type(self.model) is nn.RNN else (initial_hidden_state, initial_hidden_state)

        output, hidden_states = self.model(captions_embed, hidden_states)

        if self.attention:
            attention_weighted_encoding, _ = self.attention(hidden_states)
            output = torch.cat([output, attention_weighted_encoding.unsqueeze(1).repeat(1, output.size(1), 1)], dim=2)

        output = self.classifier(output)

        return output

## Encoder-Decoder

In [37]:
class EncoderDecoder(nn.Module):
    def __init__(self, encoder: Encoder, decoder: Decoder):
        super(EncoderDecoder, self).__init__()
    
        self.encoder = encoder
        self.decoder = decoder
    
    def forward(self, images, captions):
        x = self.encoder(images)
        x = self.decoder(x, captions)
        
        return x
    
    def caption(self, image, vocabulary, max_length=25):
        output = []
        
        with torch.no_grad():
            hidden_state = self.encoder(image)
            hidden_state = hidden_state if type(self.decoder.model) is nn.RNN else (hidden_state, hidden_state)
            
            pred = vocabulary.stoi["<SOS>"]

            for _ in range(max_length):
                pred = torch.tensor([pred]).to(device)
                pred = self.decoder.embedding(pred)

                if self.decoder.attention:
                    attention_weighted_encoding, _ = self.attention(hidden_states)
                    pred = torch.cat([pred, attention_weighted_encoding.unsqueeze(1)], dim=2)

                pred, hidden_state = self.decoder.model(pred, hidden_state)

                pred = self.decoder.softmax(pred)

                pred = torch.argmax(pred, dim=-1)

                pred = pred.squeeze().item()

                output.append(vocabulary.itos[pred])

                if pred == vocabulary.stoi["<EOS>"]:
                    break

        return output


# Custom Trainer

In [38]:
class Trainer:
    def __init__(self, model, criterion, optimizer, train_loader, val_loader, num_epochs, save_path = None):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer

        self.train_loader = train_loader
        self.val_loader = val_loader
        
        self.num_epochs = num_epochs

        self.save_path = save_path
        
        if self.save_path:
            self.save_path = Path(save_path)

        self.train_losses = []
        self.val_losses = []

        self.bleu1_scores = []
        self.bleu2_scores = []
        self.bleu3_scores = []
        self.bleu4_scores = []

        self.cumulative_bleu_scores = []

        self.cider_scores = []

        self.meteor_scores = []

        self.rougel_scores = []

    def train_model(self):
        for epoch in range(self.num_epochs):
            self.model.train()
            
            running_loss = 0.0

            for images, captions in train_loader:
                images, captions = images.to(device), captions.to(device)
                
                train_score = self.model(images, captions)

                self.optimizer.zero_grad()
                
                loss = self.criterion(train_score.view(-1, vocab_size), captions.view(-1))
                running_loss += loss.item()
                
                loss.backward()
                
                self.optimizer.step()
                
            train_loss = running_loss / len(train_loader)
            self.train_losses.append(train_loss)
            
            with torch.no_grad():
                self.model.eval()

                idx = val_set_start
                
                references = {}
                candidates = {}
                
                val_iter = iter(val_loader)
                
                running_loss = 0.0
                
                while idx < val_set_end:
                    batch = next(val_iter)

                    images, captions = batch

                    images, captions = images.to(device), captions.to(device)

                    val_score = self.model(images, captions)

                    loss = self.criterion(val_score.view(-1, vocab_size), captions.view(-1))
                    running_loss += loss.item()
                    
                    for image in images:
                        val_pred = self.model.caption(image.unsqueeze(0), vocab)
                        
                        candidate = " ".join(val_pred)

                        candidates[idx] = [candidate]
                        
                        label = " ".join(dataset.get_label(idx))

                        references[idx] = [label]
                        
                        idx += 1
                        
                val_loss = running_loss / len(val_loader)
                self.val_losses.append(val_loss)
            
            self._account_scores(references, candidates)

            self._log(epoch)

        if self.save_path:
            self.save_weights(self.save_path / Path(f"model_{id(self.model)}.pth"))

    def get_train_losses(self):
        return self.train_losses
    
    def get_val_losses(self):
        return self.val_losses
    
    def get_scores(self):
        scores = {
                "BLEU 1": self.bleu1_scores,
                "BLEU 2": self.bleu2_scores,
                "BLEU 3": self.bleu3_scores,
                "BLEU 4": self.bleu4_scores,
                "Cumulative BLEU": self.cumulative_bleu_scores,
                "METEOR": self.meteor_scores,
                "ROUGE_L": self.rougel_scores,
                "CIDEr": self.cider_scores,
                }
        
        return scores
    
    def save_weights(self, save_path):
        torch.save(self.model.state_dict(), save_path)

    def plot(self):
        plt.figure(0)
        plt.plot(self.train_losses, label = "Training loss")
        plt.plot(self.val_losses, label = "Validation loss")
        plt.ylabel("Cross Entropy Loss")
        plt.legend()

        if self.save_path:
            plt.savefig(self.save_path / Path("losses.png"))

        plt.figure(1)
        plt.plot(self.bleu1_scores, label = "BLEU 1")
        plt.plot(self.bleu2_scores, label = "BLEU 2")
        plt.plot(self.bleu3_scores, label = "BLEU 3")
        plt.plot(self.bleu4_scores, label = "BLEU 4")
        plt.ylabel("BLEU Scores")
        plt.legend()

        if self.save_path:
            plt.savefig(self.save_path / Path("bleu_scores.png"))
                
        plt.figure(2)
        plt.plot(self.cumulative_bleu_scores, label = "Cumulative BLEU")
        plt.plot(self.cider_scores, label = "CIDEr")
        plt.plot(self.meteor_scores, label = "METEOR")
        plt.plot(self.rougel_scores, label = "ROUGE_L")
        plt.ylabel("Scores")
        plt.legend()

        if self.save_path:
            plt.savefig(self.save_path / Path("scores.png"))

        plt.show()

    def _account_scores(self, references, candidates):
        metrics = Scorer(references, candidates).compute_scores()

        self.bleu1_scores.append(metrics["BLEU 1"])
        self.bleu2_scores.append(metrics["BLEU 2"])
        self.bleu3_scores.append(metrics["BLEU 3"])
        self.bleu4_scores.append(metrics["BLEU 4"])

        cumulative_bleu_score = np.exp((np.log(metrics["BLEU 1"]) 
                                        + np.log(metrics["BLEU 2"]) 
                                        + np.log(metrics["BLEU 3"]) 
                                        + np.log(metrics["BLEU 4"])) / 4)
        
        self.cumulative_bleu_scores.append(cumulative_bleu_score)

        self.cider_scores.append(metrics["CIDEr"])

        self.rougel_scores.append(metrics["ROUGE_L"])

        self.meteor_scores.append(metrics["METEOR"])

    def _log(self, epoch):
            log = ""
            log += f" ---------------------------------------------------------------------------------------------------\n"
            log += f"|     Epoch [{epoch + 1}/{self.num_epochs}]     |         Training Loss: {self.train_losses[-1]: .6f}         |      Validation Loss: {self.val_losses[-1]: .6f}  |\n"
            log += f" ---------------------------------------------------------------------------------------------------\n"
            log += f"|        Cumulative BLEU Score: {self.cumulative_bleu_scores[-1]: .6f}       |               CIDEr Score: {self.cider_scores[-1]: .6f}              |\n"
            log += f" ---------------------------------------------------------------------------------------------------\n"
            log += f"|        METEOR Score: {self.meteor_scores[-1]: .6f}                |               ROUGE_L Score: {self.rougel_scores[-1]: .6f}            |\n"
            log += f" ---------------------------------------------------------------------------------------------------\n"

            print(log)
        

# Training and Evaluating Models

## EncoderDecoder Model = VGG16 + RNN

### Setting Hyperparameters

In [24]:
num_epochs = 5
embed_size = 256
hidden_size = 256
num_layers = 1
learning_rate = 0.001

### Configuring The Model

In [None]:
encoder = Encoder(vgg16(weights=VGG16_Weights.DEFAULT), embed_size).to(device)
decoder = Decoder(nn.RNN, embed_size, vocab_size, hidden_size, num_layers, attention=False).to(device)

model = EncoderDecoder(encoder, decoder).to(device)

In [None]:
print("Encoder Model Summary:")
summary(model.encoder, (3, 224, 224))

### Training The Model

In [27]:
criterion = nn.CrossEntropyLoss(ignore_index = vocab.stoi["<PAD>"]).to(device)

optimizer = optim.Adam(model.parameters(), lr = learning_rate)

save_path = "/kaggle/working"

trainer = Trainer(model, criterion, optimizer, train_loader, val_loader, num_epochs, save_path)

In [None]:
trainer.train_model()

### Plotting Cross-Validation and Scores

In [None]:
trainer.plot()

### *Loading Pre-Trained Parameters

In [None]:
load_path = ""

model.load_state_dict(torch.load(load_path))

model.eval()

### Inference

In [None]:
for _ in range(20):
    idx = np.random.randint(val_set_start, val_set_end - 1)

    image, caption = dataset[idx]
    
    image = image.to(device)

    prediction = ' '.join(model.caption(image.unsqueeze(0), vocab))

    image = image.permute(1,2,0)
    
    plt.title(prediction)
    plt.imshow(image.cpu())

    plt.show()    

## EncoderDecoder Model = VGG16 + LSTM

### Setting Hyperparameters

In [39]:
num_epochs = 5
embed_size = 256
hidden_size = 256
num_layers = 1
learning_rate = 0.001

### Configuring The Model

In [40]:
encoder = Encoder(vgg16(weights=VGG16_Weights.IMAGENET1K_FEATURES), embed_size).to(device)
decoder = Decoder(nn.LSTM, embed_size, vocab_size, hidden_size, num_layers, attention=False).to(device)

model = EncoderDecoder(encoder, decoder).to(device)

In [None]:
print("Encoder Model Summary:")
summary(model.encoder, (3, 224, 224))

### Training The Model

In [42]:
criterion = nn.CrossEntropyLoss(ignore_index = vocab.stoi["<PAD>"]).to(device)

optimizer = optim.Adam(model.parameters(), lr = learning_rate)

save_path = "/kaggle/working"

trainer = Trainer(model, criterion, optimizer, train_loader, val_loader, num_epochs, save_path)

In [None]:
trainer.train_model()

### Plotting Cross-Validation and Scores

In [None]:
trainer.plot()

### *Loading Pre-Trained Parameters

In [None]:
load_path = ""

model.load_state_dict(torch.load(load_path))

model.eval()

### Inference

In [None]:
for _ in range(10):
    idx = np.random.randint(val_set_start, val_set_end - 1)

    image, caption = dataset[idx]

    prediction = ' '.join(model.caption(image.unsqueeze(0), vocab))

    image = image.permute(1,2,0)
    
    plt.title(prediction)
    plt.imshow(image)

    plt.show()    

## EncoderDecoder Model = VGG16 + LSTM with Attention

### Setting Hyperparameters

In [None]:
num_epochs = 5
embed_size = 256
hidden_size = 256
num_layers = 1
learning_rate = 0.001

### Configuring The Model

In [None]:
encoder = Encoder(vgg16(weights=VGG16_Weights.IMAGENET1K_FEATURES), embed_size).to(device)
decoder = Decoder(nn.LSTM, embed_size, vocab_size, hidden_size, num_layers, attention=True).to(device)

model = EncoderDecoder(encoder, decoder).to(device)

In [None]:
print("Encoder Model Summary:")
summary(model.encoder, (3, 224, 224))

### Training The Model

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index = vocab.stoi["<PAD>"]).to(device)

optimizer = optim.Adam(model.parameters(), lr = learning_rate)

save_path = "/kaggle/working"

trainer = Trainer(model, criterion, optimizer, train_loader, val_loader, num_epochs, save_path)

In [None]:
trainer.train_model()

### Plotting Cross-Validation and Scores

In [None]:
trainer.plot()

### *Loading Pre-Trained Parameters

In [None]:
load_path = ""

model.load_state_dict(torch.load(load_path))

model.eval()

### Inference

In [None]:
for _ in range(10):
    idx = np.random.randint(val_set_start, val_set_end - 1)

    image, caption = dataset[idx]

    prediction = ' '.join(model.caption(image.unsqueeze(0), vocab))

    image = image.permute(1,2,0)
    
    plt.title(prediction)
    plt.imshow(image)

    plt.show()    