In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import nltk
import pickle
import glob
from os import path
from PIL import Image
from tqdm import tqdm
from tensorflow import keras
from tensorflow.keras import layers, Input
from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.applications import InceptionV3
from tensorflow.keras.preprocessing import image
from tensorflow.keras import Model
%matplotlib inline

In [None]:
def split_data(l):
    temp = []
    for i in img:
        if i[len(images):] in l:
            temp.append(i)
    return temp

def preprocess_input(x):
    x /= 255.
    x -= 0.5
    x *= 2.
    return x

def preprocess(image_path):
    img = image.load_img(image_path, target_size=(299, 299))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    return x

def encode(image):
    image = preprocess(image)
    temp_enc = model_new.predict(image)
    temp_enc = np.reshape(temp_enc, temp_enc.shape[1])
    return temp_enc

def data_generator(batch_size = 32):
        partial_caps = []
        next_words = []
        images = []
        
        df = pd.read_csv('../custom/flickr8k_training_dataset.txt', delimiter='\t')
        df = df.sample(frac=1) 
        iter = df.iterrows()
        c = []
        imgs = []
        for i in range(df.shape[0]):
            x = next(iter)
            c.append(x[1][1])
            imgs.append(x[1][0])

        count = 0
        while True:
            for j, text in enumerate(c):
                current_image = encoding_train[imgs[j]]
                for i in range(len(text.split())-1):
                    count+=1                    
                    partial = [word2idx[txt] for txt in text.split()[:i+1]]
                    partial_caps.append(partial)
                    n = np.zeros(vocab_size)
                    n[word2idx[text.split()[i+1]]] = 1
                    next_words.append(n)
                    images.append(current_image)

                    if count>=batch_size:
                        next_words = np.asarray(next_words)
                        images = np.asarray(images)
                        partial_caps = sequence.pad_sequences(partial_caps, maxlen=max_len, padding='post')
                        yield ([images, partial_caps], next_words) 
                        partial_caps = []
                        next_words = []
                        images = []
                        count = 0
                        
def predict_captions(image):
    start_word = ["<start>"]
    while True:
        par_caps = [word2idx[i] for i in start_word]
        par_caps = sequence.pad_sequences([par_caps], maxlen=max_len, padding='post')
        if image in test_img:
          e = encoding_test[image[len(images):]]
        elif image in train_img:
          e = encoding_train[image[len(images):]]
        else:
          e = encode(image)
        preds = final_model.predict([np.array([e]), np.array(par_caps)])
        word_pred = idx2word[np.argmax(preds[0])]
        start_word.append(word_pred)
        if word_pred == "<end>" or len(start_word) > max_len:
            break
            
    return ' '.join(start_word[1:-1])

def beam_search_predictions(image, beam_index = 3):
    start = [word2idx["<start>"]]
    start_word = [[start, 0.0]]
    while len(start_word[0][0]) < max_len:
        temp = []
        for s in start_word:
            par_caps = sequence.pad_sequences([s[0]], maxlen=max_len, padding='post')
            if image in test_img:
              e = encoding_test[image[len(images):]]
            elif image in train_img:
              e = encoding_train[image[len(images):]]
            else:
              e = encode(image)
            preds = final_model.predict([np.array([e]), np.array(par_caps)])
            word_preds = np.argsort(preds[0])[-beam_index:]
            for w in word_preds:
                next_cap, prob = s[0][:], s[1]
                next_cap.append(w)
                prob += preds[0][w]
                temp.append([next_cap, prob])
        start_word = temp
        start_word = sorted(start_word, reverse=False, key=lambda l: l[1])
        start_word = start_word[-beam_index:]
        
    start_word = start_word[-1][0]
    intermediate_caption = [idx2word[i] for i in start_word]
    final_caption = []
    for i in intermediate_caption:
        if i != '<end>':
            final_caption.append(i)
        else:
            break    
    final_caption = ' '.join(final_caption[1:])
    return final_caption

In [None]:
images = "../input/flickr8k/Images"
dataset_folder = "../input/flickr8k-text/"
token = dataset_folder + 'flickr8k.token.txt'
train_images_file = dataset_folder + 'flickr_8k.trainImages.txt'
val_images_file = dataset_folder + 'flickr_8k.devImages.txt'
test_images_file = dataset_folder + 'flickr_8k.testImages.txt'

In [None]:
with open(token,"r") as file:
  captions = file.read().strip().split("\n")

In [None]:
d = {}
for i, row in enumerate(captions):
    row = row.split('\t')
    row[0] = row[0][:len(row[0])-2]
    if row[0] in d:
        d[row[0]].append(row[1])
    else:
        d[row[0]] = [row[1]]

In [None]:
img = glob.glob(images+'*.jpg')

In [None]:
train_images = set(open(train_images_file, 'r').read().strip().split('\n'))
train_img = split_data(train_images)
print(f"The train set contains {len(train_img)} out of 8000 images")

In [None]:
val_images = set(open(val_images_file, 'r').read().strip().split('\n'))
val_img = split_data(val_images)
print(f"The dev set contains {len(val_img)} out of 8000 images")

In [None]:
test_images = set(open(test_images_file, 'r').read().strip().split('\n'))
test_img = split_data(test_images)
print(f"The test set contains {len(test_img)} out of 8000 images")

In [None]:
model = InceptionV3(weights='imagenet')
new_input = model.input
hidden_layer = model.layers[-2].output
model_new = Model(new_input, hidden_layer)

In [None]:
encoding_train = {}
for img in tqdm(train_img):
    encoding_train[img[len(images):]] = encode(img)

In [None]:
encoding_test = {}
for img in tqdm(test_img):
    encoding_test[img[len(images):]] = encode(img)

In [None]:
train_d = {}
val_d = {}
test_d = {}

for i in train_img:
    if i[len(images):] in d:
        train_d[i] = d[i[len(images):]]

for i in val_img:
    if i[len(images):] in d:
        val_d[i] = d[i[len(images):]]

for i in test_img:
    if i[len(images):] in d:
        test_d[i] = d[i[len(images):]]

In [None]:
caps = []
for key, val in train_d.items():
    for i in val:
        caps.append('<start> ' + i + ' <end>')
words = [i.split() for i in caps]

In [None]:
unique = []
for i in words:
    unique.extend(i)
unique = list(set(unique))
vocab_size = len(unique)

In [None]:
word2idx = {val:index for index, val in enumerate(unique)}
idx2word = {index:val for index, val in enumerate(unique)}

In [None]:
max_len = 0
for c in caps:
    c = c.split()
    if len(c) > max_len:
        max_len = len(c)

In [None]:
f = open('../custom/flickr8k_training_dataset.txt', 'w')
f.write("image_id\tcaptions\n")

for key, val in train_d.items():
    for i in val:
        f.write(key[len(images):] + "\t" + "<start> " + i +" <end>" + "\n")

f.close()

In [None]:
df = pd.read_csv('../custom/flickr8k_training_dataset.txt', delimiter='\t')
c = [i for i in df['captions']]
imgs = [i for i in df['image_id']]

In [None]:
samples_per_epoch = 0
for ca in caps:
    samples_per_epoch += len(ca.split())-1
print(samples_per_epoch)

In [None]:
embedding_size = 300
max_len = 40
vocab_size = 8256
samples_per_epoch = 383454

In [None]:
image_input = Input(shape = (2048,))
x = layers.Dense(embedding_size, activation='relu')(image_input)
image_output = layers.RepeatVector(max_len)(x)
image_model = Model(inputs=image_input,outputs = image_output)
image_model.summary()

In [None]:
caption_input = Input(shape = (max_len,))
y = layers.Embedding(vocab_size,embedding_size,input_length=max_len)(caption_input)
y = layers.LSTM(256,return_sequences=True)(y)
caption_output = layers.TimeDistributed(layers.Dense(embedding_size))(y)
caption_model = Model(inputs = caption_input, outputs = caption_output)
caption_model.summary()

In [None]:
conca = layers.Concatenate(axis=1)([image_model.output,caption_model.output])
z = layers.Bidirectional(layers.LSTM(256, input_shape = (max_len,300), return_sequences=False))(conca)
z = layers.Dense(vocab_size)(z)
final_output = layers.Activation('softmax')(z)
final_model = Model(inputs = [image_model.input,caption_model.input], outputs = final_output)
final_model.summary()

In [None]:
optimizer = tf.keras.optimizers.Adam()
final_model.compile(loss=tf.keras.losses.CategoricalCrossentropy(), optimizer=optimizer, metrics=['accuracy'])

In [None]:
final_model.fit(data_generator(batch_size=2048), steps_per_epoch = samples_per_epoch//2048, verbose=1,epochs = 30)

In [None]:
try_image = '../input/flickr8k/Images/101654506_8eb26cfb60.jpg'
Image.open(try_image)

In [None]:
print ('Normal Max search:', predict_captions(try_image)) 
print ('Beam Search, k=3:', beam_search_predictions(try_image, beam_index=3))
print ('Beam Search, k=5:', beam_search_predictions(try_image, beam_index=5))
print ('Beam Search, k=7:', beam_search_predictions(try_image, beam_index=7))