# Importing the modules

In [1]:
# linear algebra 
import numpy as np 
# data processing, CSV file I / O (e.g. pd.read_csv) 
import pandas as pd 
import os 
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.preprocessing.sequence import pad_sequences 
from tensorflow.keras.preprocessing.text import Tokenizer
from PIL import Image
from tensorflow.keras.models import Model 
from tensorflow.keras.layers import Flatten, Dense, LSTM, Dropout, Embedding, Activation 
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import concatenate, BatchNormalization, Input
from tensorflow.keras.layers import add,TextVectorization,Bidirectional 
from tensorflow.keras.utils import to_categorical, plot_model 
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input 
import matplotlib.pyplot as plt # for plotting data 
import cv2 
import nltk
from nltk.corpus import stopwords


### Reading the text file

In [2]:
#Reading the image captions
def read_file(path):
    with open(path,'r') as file:
        return file.read().split('\n')

data = read_file('../data/raw/captions.txt')

### Creating a dictionary of Images and it's captions

In [3]:
data = data[1:]

In [4]:
# Converting image_captions data into dict where keys = images and value = captions
def get_data_dictionary(data):
    descriptions = dict()
    for line in data:
        token = line.split('\t')
        new_token = token[0].split('.')
        if len(new_token)<2:
            continue
        image_name  = new_token[0] 
        caption = new_token[1].split(',')[1]
        if image_name in descriptions.keys():
            descriptions[image_name].append(caption)
        else:
            descriptions[image_name] = [caption]
    return descriptions

descriptions = get_data_dictionary(data)

### Clean the descriptions

In [5]:
import re
def remove_punc(text) :
    return re.sub(r'[^\w\s]','',text)

def to_lower_case(text) :
    return text.lower()

stopwords_list = stopwords.words('english')
def remove_stopwords(text) :
    text_words = [word for word in text.split() if ((word not in stopwords_list) and (len(word) > 2))]
    text = ' '.join(text_words)
    return text

def remove_numbers(text) :
    return re.sub(r'[0-9]','',text)

def remove_multiple_spaces(text) :
    return re.sub(' +',' ',text).strip()

# gathering all the text cleaning functions in one fuction
def clean_text(text) :
    text = remove_punc(text)
    text = to_lower_case(text)
    text = remove_stopwords(text)
    text = remove_numbers(text)
    text = remove_multiple_spaces(text)
    return text

# using the above clean_text function to clean captions
def clean_captions(descriptions) :
    for image in descriptions.keys() :
        for index , caption in enumerate(descriptions[image]) :
            descriptions[image][index] = clean_text(caption)
    return descriptions

descriptions = clean_captions(descriptions)

### Saving the cleaned data

In [6]:
# saving data dictionary into external file
def write_file(path,data) :
    lines = []
    for image in data.keys() :
        for caption in data[image] :
            lines.append(image +'\t'+ caption)
    lines = '\n'.join(lines)
    with open(path,'w') as file :
        file.write(lines)

write_file('../data/processed/cleaned_data.txt', descriptions)

### Importing VGG16 model

In [7]:
# importing VGG16 model without the output layer
features_extractor = VGG16()
features_extractor = Model(inputs = features_extractor.inputs , outputs = features_extractor.layers[-2].output)
# getting image name list from Images folder location
images_path = '../data/raw/Images'
images_names = os.listdir(images_path)

### Using the pre-trained VGG16 model

In [8]:
import pickle

In [9]:
# Using the extracted VGG16 model to extract images features and build dict where key: images_names and values: images_features
def preprocess_image(model, images_path, images_list) :
    features = {}
    for img in images_list :
        path = os.path.join(images_path,img)
        image = Image.open(path)
        image = image.resize((224,224))
        image = np.expand_dims(image,axis = 0)
        image = image /127.5
        image = image -1
        feature = model.predict(image,verbose = 0)
        features[img.split('.')[0]] = feature
    return features

features = preprocess_image(features_extractor, images_path, images_names)

# saving images_features dict into .bin file
pickle.dump(features, open('../data/processed/image_features.pickle','wb'))

### Adding tokens in captions and splitting them in train & test

In [10]:
from sklearn.model_selection import train_test_split

# adding startseq and endseq to each caption
def load_tokens(path,images) :
    lines = read_file(path)
    tokens = {}
    for line in lines :
        img , caption = line.split('\t')
        if img not in tokens.keys() :
            tokens[img] = []
        tokens[img].append("startseq "+ caption +" endseq")
    return tokens


# listing all available images
def list_images(path) :
    all_images = []
    lines = read_file(path)
    for line in lines:
        img, caption = line.split('\t')
        if img not in all_images :
            all_images.append(img)
    return all_images
    
all_images_list = list_images('../data/processed/cleaned_data.txt')

# splitting images to training and testing
training_images, testing_images = train_test_split(all_images_list, test_size =0.1, shuffle = True)
cross_validation_images , testing_images = train_test_split(testing_images, test_size = 0.5, shuffle = True)

# saving the training, validation & testing images_lists
pickle.dump(training_images, open('../data/processed/training_images.txt','wb'))
pickle.dump(cross_validation_images, open('../data/processed/cross_validation_images.txt','wb'))
pickle.dump(testing_images, open('../data/processed/testing_images.txt','wb'))

#loading training images_captions dict
training_tokens = load_tokens('../data/processed/cleaned_data.txt',training_images)

# loading extracted images features
features = pickle.load(open('../data/processed/image_features.pickle','rb'))

### Vectorizing the captions

In [11]:
# extracting all captions into one list
def fetch_captions(tokens) :
    captions = []
    for caps in tokens.values() :
        [captions.append(cap) for cap in caps]
    return captions

captions = fetch_captions(training_tokens)

# searching for captions max_length
sentences_length = []
for caption in captions :
    sentences_length.append(len(caption.split()))

max_length = max(sentences_length)

text_dataset = tf.data.Dataset.from_tensor_slices(captions)

# preparing TextVectorization layer to be used to tokenize captions
vectorize_layer = TextVectorization(output_mode = 'int')
vectorize_layer.adapt(text_dataset)

# building vocab using TextVectorization layer
vocabulary = list(vectorize_layer.get_vocabulary())
vocab_size = vectorize_layer.vocabulary_size()

# tokenizing captions and saving it back to dict where keys:images and values:sequences

for img , captions in training_tokens.items() :
    training_tokens[img] = []
    for caption in captions :
        sequence = vectorize_layer(tf.constant([caption])).numpy().tolist()[0]
        training_tokens[img].append(sequence)

### Building Data Generator

In [12]:
def data_generator(tokens_keys, tokens,features, vocab_size,max_length,batch_size) :
    input_1, input_2, output = [] , [] , []
    n = 0
    while 1:
        for img in tokens_keys :
            sequences = tokens[img]
            n += 1
            if img in features.keys() :
                feature = features[img][0]
                for sequence in sequences :
                    for index in range(1, len(sequence)) :
                        input_b = sequence[:index]
                        input_b = pad_sequences([input_b],maxlen = max_length, padding = 'post')[0]
                        output_w = sequence[index]
                        output_w = to_categorical([output_w],num_classes = vocab_size)[0]
                        input_1.append(feature)
                        input_2.append(input_b)
                        output.append(output_w)

            if n == batch_size :
                try :
                    input_1, input_2, output = np.array(input_1), np.array(input_2), np.array(output)
                    yield [input_1,input_2], output
                    input_1, input_2, output = [], [], []
                    n = 0
                except :
                    print("Skipped")
                    input_1, input_2, output = [], [], []
                    n = 0

### Building Model

In [13]:
# no of features ectracted from last layer of VGG16_extractor
no_of_features = 4096

def build_model(no_of_features, max_length, output_size, learning_rate) :

 #  images features model path
    input_img = Input(shape = (no_of_features,))
    cnn_layer1 = Dropout(0.4)(input_img)
    cnn_layer2 = Dense(256, activation = 'relu')(cnn_layer1)

 #  sequences path 
    input_seq = Input(shape = (max_length,))
    lstm_layer1 = Embedding(output_size,300,input_length = max_length, mask_zero = True)(input_seq)
    lstm_layer2 = Dropout(0.4)(lstm_layer1)
    lstm_layer3 = LSTM(256,activation = 'tanh')(lstm_layer2)

 #  merging the two model
    merging_layer = add([cnn_layer2,lstm_layer3])
    final_dense = Dense(256, activation = 'relu')(merging_layer)
    output = Dense(output_size , activation = 'softmax')(final_dense)

 # initiating a model 
    model = Model(inputs = [input_img, input_seq], outputs = output)

    optimizer = Adam(learning_rate = learning_rate)
    model.compile(loss = 'categorical_crossentropy',optimizer = optimizer)

    return model


captioning_model = build_model(no_of_features,max_length, vocab_size, learning_rate= 0.001)

In [14]:
captioning_model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 22)]         0           []                               
                                                                                                  
 input_2 (InputLayer)           [(None, 4096)]       0           []                               
                                                                                                  
 embedding (Embedding)          (None, 22, 300)      2530500     ['input_3[0][0]']                
                                                                                                  
 dropout (Dropout)              (None, 4096)         0           ['input_2[0][0]']                
                                                                                            

### Training the model

In [15]:
import random
# Define a custom callback class to track the traning loss history
class LossHistory(tf.keras.callbacks.Callback) :    
# Define a function to initialize the loss history list at the begining of training
    def on_train_begin(self, logs={}) :
        self.losses = []

# Define a function to append the training loss at the end of each epoch
    def on_epoch_end(self, epoch, logs={}) :
        self.losses.append(logs.get('loss'))

history = LossHistory()
model_loss = []
steps = len(training_tokens) /64

for i in range(50) :
# shuffling training data before each epoch
    tokens_keys = list(training_tokens.keys())
    random.shuffle(tokens_keys)
    data = data_generator(tokens_keys, training_tokens, features, vocab_size,max_length,64)

    captioning_model.fit(data, epochs = 1, steps_per_epoch = steps, verbose = 0, callbacks =[history])

# extracting epoch model loss and saving it into txt file
    loss = history.losses
    model_loss.append([loss[0]])

# saving the model 
captioning_model.save('../models/model_1.h5')
    

### Model Evaluation

In [16]:
def get_features_from_image(image_path,model) :
    img = Image.open(image_path)
    img = img.resize((224,224))
    img = np.expand_dims(img, axis= 0)
    img = img/127.5
    img = img -1
    features = model.predict(img,verbose = 0)
    return features

def get_word(index,vocab) :
    word = vocab[index]
    return word

def get_caption(path, features_extractor, vectorize_layer, captioning_model) :
    my_features = get_features_from_image(path,features_extractor)
    caption = 'startseq'
    for i in range(max_length) :
        sequenced_caption = vectorize_layer(tf.constant([caption])).numpy().tolist()
        padded_sequenced_caption = pad_sequences(sequenced_caption, maxlen = max_length, padding = 'post')[0]
        padded_sequenced_caption = np.resize(padded_sequenced_caption,(1,max_length))
        output = captioning_model.predict([my_features , padded_sequenced_caption],verbose = 0)
        index = np.argmax(output)
        if index == 2:
            caption = caption + ' endseq'
            return caption
        else:
            current_word = get_word(index,vocabulary)
            caption = caption + ' ' + current_word
    return caption
    

In [None]:
from nltk.translate.bleu_score import corpus_bleu

def evaluation_func(captioning_model, vectorize_layer, features_extractor, images_folder, images_set_path) :
    images_names = read_file(images_set_path)
    images_tokens = load_tokens('../data/processed/cleaned_data.txt',images_names)
    actual, predicted = list(), list()
    for image in images_tokens.keys() :
        image_path = os.path.join(images_folder,image+'.jpg')
        generated_caption = get_caption(image_path,features_extractor,vectorize_layer,captioning_model)
        actual_captions = images_tokens[image]
        actual.append([caption.split() for caption in actual_captions])
        predicted.append(generated_caption.split())

    BLEU_1 = corpus_bleu(actual,predicted,weights = (1.0,0,0,0))
    return BLEU

training_bleu_score = evaluation_func(captioning_model, vectorize_layer,features_extractor, images_path,
                                     '../data/processed/training_images.txt')

validation_bleu_score = evaluation_func(captioning_model, vectorize_layer,features_extractor, images_path,
                                        '../data/processed/cross_validation_images.txt')

testing_bleu_score = evaluation_func(captioning_model, vectorize_layer,features_extractor, images_path,
                                     '../data/processed/testing_images.txt')

print(f'Training BLEU score    : {training_bleu_score}')
print(f'Validation BLEU score  : {validation_bleu_score}')
print(f'Testing BLEU score     : {testing_bleu_score}')

### Visualizing the captions

In [191]:
# getting top k probabilities and indexes
def get_word_preds(sentence, testing_image, beam_size) :
    sequenced_caption = vectorize.layer(tf.constant([sentence])).numpy().tolist()
    padded_sequenced_caption = pad_sequences(sequenced_caption, maxlen=max_length, padding='post')[0]
    padded_sequenced_caption = np.resize(padded_sequenced_caption, (1,max_length))
    preds = captioning_model.