# Importing the required stuff

In [None]:
import torch
import torch.optim as optim
import torch.nn.functional as F
from torchvision import models
from torchvision import transforms
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from nltk.translate.bleu_score import corpus_bleu
import os
import string
import numpy as np 
from PIL import Image 
from matplotlib import pyplot as plt
from tqdm.notebook import tqdm
import pickle 
import cv2
import random
import re
import torchtext

# Loading the dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Image transformations

In [None]:
img_transform = transforms.Compose([
    transforms.Resize(size=256),
    transforms.CenterCrop(size=224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Creating list of images

In [None]:
images_path = "/content/drive/MyDrive/Flicker8k_Images"
text_path = "/content/drive/MyDrive/Flickr8k_text/Flickr8k.token.txt"

In [None]:
image_list = []
names = []
for file in os.listdir(os.path.join(images_path)):
    file_path = os.path.join(images_path, file)
    image_list.append(img_transform(Image.open(file_path)).unsqueeze(0))
    names.append(os.path.basename(file_path))

# Feature extraction using CNN and saving to a file

In [None]:
alexnet = models.alexnet(pretrained = True)
alexnet.eval()

features_dict = {}
i = 0
for img in image_list:
    with torch.no_grad():
        feature = alexnet(img).detach().numpy()
    name = names[i]
    if name not in features_dict.keys():
        features_dict[name] = [feature]
    else:
        features_dict[name].append(feature)
print("All Training Images Appended!")

with open('features.pickle', 'wb') as handle:
    pickle.dump(features_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

# Loading the features file

In [None]:
with open('features_2.pickle', 'rb') as handle:
    features_dict = pickle.load(handle)
print(type(features_dict))

# Create a name-caption dictionary and preprocessing captions

In [None]:
with open(text_path, "r") as f:
    captions = f.read().split("\n")

In [None]:
captions_dict = {}
i = 0
for strn in captions:
    contents = strn.split("\t")
    if len(contents) < 2:
        continue
    filename, cap = contents[0], contents[1]
    filename = filename[:-2]
    if filename in captions_dict.keys():
        captions_dict[filename].append(cap)
    else:
        captions_dict[filename] = [cap]
    if i%2000 == 0:
        print(str(i) + "Captions appended!")
    i += 1
print("All captions appended!")
with open('features.pickle', 'wb') as handle:
    pickle.dump(features_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

# Extracting train and test names

In [None]:
train_path = "/content/drive/MyDrive/Flickr8k_text/Flickr_8k.trainImages.txt"
test_path = "/content/drive/MyDrive/Flickr8k_text/Flickr_8k.testImages.txt"

with open(train_path, "r") as f:
    train_names = f.read().split("\n")
with open(test_path, "r") as f:
    test_names = f.read().split("\n")

# Train and test features

In [None]:
train_features, train_captions, test_features, test_captions = {}, {}, {}, {}

for name in train_names:
    if name in features_dict.keys() and name in captions_dict.keys():
        train_features[name] = features_dict[name]
        train_captions[name] = captions_dict[name]
        
for name in test_names:
    if name in features_dict.keys() and name in captions_dict.keys():
        test_features[name] = features_dict[name]
        test_captions[name] = captions_dict[name]

# Creating histogram and vocabulary 

In [None]:
def preprocess_line(strn):
    strn = strn.split()                             
    strn = [s.lower() for s in strn]                
    strn = [s for s in strn if s.isalpha()]      
    strn = " ".join(strn).translate(
        str.maketrans("", "", string.punctuation)   # Remove punctuation
    )
    strn = "begin " + strn + " end"
    
    return strn


def get_all_captions_tokenize(captions_dict):
    captions = []
    for cap_lst in captions_dict.values():
        captions.extend(cap_lst)
    tokenizer = Tokenizer(filters='')           
    tokenizer.fit_on_texts(captions)               
    vocab_size = len(tokenizer.word_index)+1    
    return tokenizer, vocab_size
  


def pad_tokens(tokens):
    return pad_sequences(tokens, padding='post') 

In [None]:
for filename, cap_lst in captions_dict.items():
    for i in range(len(cap_lst)):
        cap_lst[i] = preprocess_line(cap_lst[i])

tokenizer, vocab_size = get_all_captions_tokenize(captions_dict)

# Data loader

In [None]:
def data_generator(img_features, captions_dict, batch_size):
    input_img, input_caption, target_cap = [], [], []
    count = 0
    while True:
        for name, caption_list in captions_dict.items():
            img_fs = img_features[name]
            for cap in caption_list:
                caption_seq = tokenizer.texts_to_sequences([cap])[0]
                for i in range(1, len(caption_seq)):
                    input_seq, trg_seq = caption_seq[:i], caption_seq[i]
                    input_img.append(img_fs)
                    input_caption.append(input_seq)
                    target_cap.append(trg_seq) 
                    count += 1
                    if count == batch_size:
                        input_caption = pad_sequences(input_caption, padding='pre')
                        yield (
                            torch.FloatTensor(np.array(input_img)).squeeze(1).squeeze(1).to('cuda'),
                            torch.LongTensor((input_caption)).to('cuda'),
                            torch.LongTensor((target_cap)).to('cuda')
                        )
                        input_img, input_caption, target_cap = [], [], []
                        count = 0

# Creating Neural Network class

In [None]:
class Network(torch.nn.Module):
    def __init__(self, glove_weights):
        super(Network, self).__init__()
        self.fc_img = torch.nn.strnar(1000, 512)               
        self.embedding = torch.nn.Embedding(vocab_size, 200)   
        self.lstm = torch.nn.LSTM(200, 512, batch_first=True)  
        self.fc_wrapper = torch.nn.Linear(1024, 1024)          
        self.fc_output = torch.nn.Linear(1024, vocab_size)     
        self.embedding.weight = torch.nn.Parameter(glove_weights)

    def forward(self, input_img, input_caption):
        x1 = self.fc_img(input_img)
        x1 = F.relu(x1)
        x2 = self.embedding(input_caption)
        x2, _ = self.lstm(x2)           
        x2 = x2[:, -1, :].squeeze(1)    
        x3 = torch.cat((x1, x2), dim=-1)
        x3 = self.fc_wrapper(x3)
        x3 = F.relu(x3)
        x3 = self.fc_output(x3)
        out = F.log_softmax(x3, dim=-1)
        return out 

#Processing GloVe embeddings

In [None]:
with open("/content/drive/MyDrive/glove.6B.200d.txt", "r") as f:
    glove = f.read().split("\n")

In [None]:
glove_dict = {}
for strn in glove:
    try:
        elements = strn.split()
        word, vector = elements[0], np.array([float(i) for i in elements[1:]])
        glove_dict[word] = vector
    except:
        continue

glove_weights = np.random.uniform(0, 1, (vocab_size, 200))
found = 0

for word in tokenizer.word_index.keys():
    if word in glove_dict.keys():
        glove_weights[tokenizer.word_index[word]] = glove_dict[word]
        found += 1
    else:
        continue        
print("Number of words found in GloVe: {} / {}".format(found, vocab_size))

In [None]:
def learning_step(input_img, input_caption, target_cap):
    optimizer.zero_grad()
    preds = model(input_img, caption_in)
    loss = F.nll_loss(preds, target_cap)
    loss.backward()
    optimizer.step()
    return loss

In [None]:
epochs = 40
steps_per_epoch = len(train_captions)
model = Network(glove_weights=torch.FloatTensor(glove_weights).to('cuda'))
optimizer = optim.Adam(model.parameters(), lr=0.0005)
model = model.to("cuda")
for epoch in range(epochs):
    print("Epoch {}".format(epoch+1))
    d_gen = data_generator(train_features, train_captions,32)
    total_loss = 0

    for batch in range(steps_per_epoch):
        input_img, input_caption, target_cap = next(d_gen)

        input_img.to('cuda')
        input_caption.to('cuda')
        target_cap.to('cuda')

        loss = learning_step(input_img, input_caption, target_cap)
        total_loss += loss
        if batch % 1000 == 0:
            print("Epoch {} - Batch {} - Loss {:.4f}".format(
                epoch+1, batch, loss
            ))
    epoch_loss = total_loss/steps_per_epoch
    
    print("\nEpoch {} - Average loss {:.4f}".format(
        epoch+1, epoch_loss
    ))
    if(epoch%10 == 0):
        torch.save(model.state_dict(), "model_{}".format(epoch+1))
    

In [None]:
def translate(features):
    features = torch.FloatTensor(features)
    result = "begin "
    for t in range(1, max_length):
        input_seq = tokenizer.texts_to_sequences([result])
        input_seq = pad_sequences(input_seq, maxlen=max_length, padding='pre')
        input_seq = torch.LongTensor(input_seq)
        preds = model.forward(features, input_seq)
        pred_idx = preds.argmax(dim=-1).detach().numpy()[0]
        word = tokenizer.index_word.get(pred_idx)
        if word is None or word == 'end':
            break
        result += word + " "
    return " ".join(result.split()[1:])

def evaluate_model(feature_dict, caption_dict):
    refer = []
    guess = []
    
    for name in tqdm(feature_dict.keys()):
        prediction = translate(feature_dict[name])
        guess.append(prediction.split())
        refs = [cap.split() for cap in caption_dict[name]]
        refer.append(refs)

    bleu_1 = corpus_bleu(refer, guess, weights=(1.0, 0, 0, 0))
    bleu_2 = corpus_bleu(refer, guess, weights=(0.5, 0.5, 0, 0))
    bleu_3 = corpus_bleu(refer, guess, weights=(0.33, 0.33, 0.33, 0))
    bleu_4 = corpus_bleu(refer, guess, weights=(0.25, 0.25, 0.25, 0.25))
    
    print("BLEU-1: {:.4f}".format(bleu_1))
    print("BLEU-2: {:.4f}".format(bleu_2))
    print("BLEU-3: {:.4f}".format(bleu_3))
    print("BLEU-4: {:.4f}".format(bleu_4))

In [None]:
torch.save(model.state_dict(), "model_{}".format(35))

In [None]:
max_length = 20
model = Network(glove_weights=torch.FloatTensor(glove_weights))
model.load_state_dict(torch.load("/content/model", map_location=torch.device('cuda')))

In [None]:
for x in train_features:
    val = train_features[x]
    train_features[x] = val[0]

In [None]:
for x in test_features:
    val = test_features[x]
    test_features[x] = val[0]

In [None]:
evaluate_model(train_features, train_captions)
evaluate_model(test_features, test_captions)



In [None]:
name = list(test_features.keys())[2]
image = Image.open(images_path + '/' + name)

plt.imshow(image)
plt.axis('off')
plt.show()

print("[CAPTION]: {}".format(translate(test_features[name])))