In [10]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [36]:
import tensorflow as tf
from tf.keras import Input, layers
from tf.keras import optimizers
from tf.keras.optimizers import Adam
from tf.keras.preprocessing import sequence
from tf.keras.preprocessing import image
from tf.keras.preprocessing.text import Tokenizer
from tf.keras.preprocessing.sequence import pad_sequences
from tf.keras.applications.inception_v3 import InceptionV3
from tf.keras.applications.inception_v3 import preprocess_input
from tf.keras.utils import to_categorical
from tf.keras.models import Model, load_model
from tf.keras.layers import LSTM, Embedding, Dense, Activation, Flatten, Reshape, Dropout

from keras.layers.wrappers import Bidirectional
from keras.layers.merge import add

from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.meteor_score import meteor_score

import numpy as np
from numpy import array
import matplotlib.pyplot as plt
import string
import os
import glob
from PIL import Image
from time import time

token_path = "/content/gdrive/MyDrive/Flick8k/Flick8k_Text/Flickr8k.token.txt"
train_images_path = '/content/gdrive/MyDrive/Flick8k/Flick8k_Text/Flickr_8k.trainImages.txt'
test_images_path = '/content/gdrive/MyDrive/Flick8k/Flick8k_Text/Flickr_8k.testImages.txt'
images_path = "/content/gdrive/MyDrive/Flick8k/Flick8k_Dataset/"
glove_path = '/content/gdrive/MyDrive/Flick8k/Flick8k_Text/'

In [13]:
#create dict with image name as key and captions as values
flickr8k = dict()
for line in open(token_path,'r').read().split('\n'):
        pieces = line.split()
        if len(line) > 2:
          image_name = pieces[0].split('.')[0]
          image_caption = ' '.join(pieces[1:])
          if image_name not in flickr8k:
              flickr8k[image_name] = list()
          flickr8k[image_name].append(image_caption)

In [14]:
#Convert to lowercase and remove punctuation 
for key, caption_list in flickr8k.items():
    for i in range(len(caption_list)):
        caption = caption_list[i].split()
        caption = [word.lower() for word in caption]
        caption = [word.translate(str.maketrans('', '', string.punctuation)) for word in caption]
        caption_list[i] =  ' '.join(caption)
#save cleaned captions
lines = list()
for key, caption_list in flickr8k.items():
    for caption in caption_list:
        lines.append(key + ' ' + caption)
new_flickr8k = '\n'.join(lines)

In [18]:
#load 6000 training image ids 
unique_training_images = set()
for line in open(train_images_path,'r').read().split('\n'):
    if len(line) > 1:
      name = line.split('.')[0]
      unique_training_images.add(name)

Train Dataset size: 6000


In [19]:
#process image flickr8k
img = glob.glob(images_path + '*.jpg')
train_images = set(open(train_images_path, 'r').read().strip().split('\n'))
train_img = []
for i in img: 
    if i[len(images_path):] in train_images:
        train_img.append(i)
test_images = set(open(test_images_path, 'r').read().strip().split('\n'))
test_img = []
for i in img: 
    if i[len(images_path):] in test_images: 
        test_img.append(i)

/content/gdrive/MyDrive/Flick8k/Flick8k_Dataset/


In [20]:
#load flickr8k into dict and append start and end token
train_flickr8k = dict()
for line in new_flickr8k.split('\n'):
    pieces = line.split()
    image_name, image_caption = pieces[0], pieces[1:]
    if image_name in unique_training_images:
        if image_name not in train_flickr8k:
            train_flickr8k[image_name] = list()
        caption = ' '.join(image_caption)
        caption = '<start> ' + caption + ' <end>'
        train_flickr8k[image_name].append(caption)

Descriptions: train = 6000


In [21]:
#create big caption list
big_caption_list = []
for key, value in train_flickr8k.items():
    for caption in value:
        big_caption_list.append(caption)

30000

In [22]:
#reduce vocabulary 
max_count = 15
counts = {}
i = 0
for caption in big_caption_list:
    i += 1
    for word in caption.split(' '):
        counts[word] = counts.get(word, 0) + 1
vocab = [word for word in counts if counts[word] >= max_count]

Vocabulary = 1659


In [23]:
index_to_word = {}
word_to_index = {}

index = 1
for word in vocab:
    word_to_index[word] = index
    index_to_word[index] = word
    index += 1

#set max_length_of_caption
max_length_of_caption = 40
vocab_size = 1 + len(index_to_word)

In [27]:
#using glove.6b.200.txt
embeddings_index = {} 
for line in open(os.path.join(glove_path, 'glove.6B.200d.txt'), encoding="utf-8"):
    embeddings_index[line.split()[0]] = np.asarray(line.split()[1:], dtype='float32')

embedding_dim = 200
embedding_matrix = np.zeros((vocab_size, embedding_dim))

for word, i in word_to_index.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

In [20]:
#using InceptionV3 for transfer learning
model = InceptionV3(weights='imagenet')
#removing classification layers
model_new = Model(model.input, model.layers[-2].output)

In [30]:
#train function
def data_generator(flickr8k, images, word_to_index, max_length_of_caption, no_images):
    X1, X2, y = list(), list(), list()
    n=0
    # loop for ever over images
    while 1:
        for key, caption_list in flickr8k.items():
            n+=1
            # retrieve the image feature
            image = images[key+'.jpg']
            for caption in caption_list:
                # encode the sequence
                seq = [word_to_index[word] for word in caption.split(' ') if word in word_to_index]
                for i in range(1, len(seq)):
                    X1.append(image)
                    X2.append(pad_sequences([seq[:i]], maxlen=max_length_of_caption)[0])
                    y.append(to_categorical([seq[i]], num_classes=vocab_size)[0])
                    
            if n==no_images:
                yield ([array(X1), array(X2)], array(y))
                X1, X2, y = list(), list(), list()
                n=0

In [None]:
#reshape to 299x299 for InceptionV3 and preprocess
def preprocess(image_path):
    img = image.load_img(image_path, target_size=(299, 299))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    return x


def encode(image):
    image = preprocess(image)
    fea_vec = model_new.predict(image) 
    fea_vec = np.reshape(fea_vec, fea_vec.shape[1])
    return fea_vec

#run encoding on all training images
encoding_train = {}
for img in train_img: 
    encoding_train[img[len(images_path):]] = encode(img)
#run encoding on all testing images
encoding_test = {}
for img in test_img:
    encoding_test[img[len(images_path):]] = encode(img)

#set features
train_features = encoding_train

In [None]:
#training image
inputs1 = Input(shape=(2048,))
layer1 = Dense(256, activation='relu')(Dropout(0.5)(inputs1))
#training caption
inputs2 = Input(shape=(max_length_of_caption,))
layer2 = LSTM(256)(Dropout(0.5)(Embedding(vocab_size, embedding_dim, mask_zero=True)(inputs2)))
#concatenate and decode
decoder = Dense(256, activation='relu')(add([layer1, layer2]))

model = Model(inputs=[inputs1, inputs2], outputs=Dense(vocab_size, activation='softmax')(decoder))
# model.summary()
model.layers[2].set_weights([embedding_matrix])
model.layers[2].trainable = False
model.compile(loss='categorical_crossentropy', optimizer='adam')

In [None]:
epochs = 20
batch_size = 5
steps = len(train_flickr8k)//batch_size
#Next let's train our model for 20 epochs with batch size of 5 and 2000 steps per epoch
generator = data_generator(train_flickr8k, train_features, word_to_index, max_length_of_caption, batch_size)
model.fit(generator, epochs=epochs, steps_per_epoch=steps, verbose=1)
#save model
model.save('/content/gdrive/MyDrive/Flick8k/lstm_cnn_model.h5')

In [33]:
def beam_search_predictions(image, beam_index = 4):
    start = [word_to_index["<start>"]]
    initial_word = [[start, 0.0]]
    while len(initial_word[0][0]) < max_length_of_caption:
        temp = []
        for i in initial_word:
            par_caps = sequence.pad_sequences([i[0]], maxlen=max_length_of_caption, padding='post')
            preds = model.predict([image,par_caps], verbose=0)
            word_preds = np.argsort(preds[0])[-beam_index:]

            for word in word_preds:
                next_caption, next_probability = i[0][:], i[1]
                next_caption.append(word)
                next_probability += preds[0][word]
                temp.append([next_caption, next_probability])
                    
        initial_word = temp
        
        initial_word = sorted(initial_word, reverse=False, key=lambda x: x[1])
        
        initial_word = initial_word[-beam_index:]
    
    initial_word = initial_word[-1][0]
    temp_caption = [index_to_word[i] for i in initial_word]
    final_caption = []
    for i in temp_caption:
        if i == '<end>':
            break
        else:
            final_caption.append(i)
    
    final_caption = ' '.join(final_caption[1:])
    return final_caption

In [None]:
no_images_to_test = 2
for i in range(no_images_to_test):
    image_name = list(encoding_test.keys())[i]
    image = encoding_test[image_name].reshape((1,2048))
    x=plt.imread(images_path+image_name)
    plt.imshow(x)
    real_captions = flickr8k[str(image_name).split('.jpg')[0]]
    predicted_caption = beam_search_predictions(image, beam_index = 4)
    print("Real Captions:", real_captions)
    print("Beam Search, K = 4:",beam_search_predictions(image, beam_index = 4))


    scores = []
    for caption in real_captions:
        scores.append(sentence_bleu(caption, predicted_caption, weights=(1.0,0,0,0)))
    print(f"BLEU-1 score: {sum(scores)/len(scores)*100}")
    scores = []
    for caption in real_captions:
        scores.append(sentence_bleu(caption, predicted_caption, weights=(0.5,0.5,0,0)))
    print(f"BLEU-2 score: {sum(scores)/len(scores)*100}")
    scores = []
    for caption in real_captions:
        scores.append(sentence_bleu(caption, predicted_caption, weights=(0.33,0.33,0.33,0)))
    print(f"BLEU-3 score: {sum(scores)/len(scores)*100}")
    scores = []
    for caption in real_captions:
        scores.append(sentence_bleu(caption, predicted_caption, weights=(0.25,0.25,0.25,0.25)))
    print(f"BLEU-4 score: {sum(scores)/len(scores)*100}")

    print(f"METEOR score: {meteor_score(real_captions, caption)*100}")
