In [None]:
import numpy as np
import cv2 
import os
from glob import glob
import matplotlib.pyplot as plt

from keras.applications import ResNet50
from keras.models import Model
import copy
import tensorflow as tf
from tensorflow import keras
print(tf.__version__)
import sys
import pandas

In [None]:
images_path = 'Flickr_Data/Images/'
images = glob(images_path+'*.jpg')
captions_path = "Flickr_Data/Flickr_TextData/FLickr8k.token.txt"
max_len = 40

In [None]:
len(images)

In [None]:
resnet_model = ResNet50(include_top=True)

In [None]:
# restructure model
resnet_model = Model(inputs=resnet_model.input, outputs=resnet_model.layers[-2].output)

In [None]:
images[3]

In [None]:
# preprocess the images
# generate a dictionary of image filename -> feature vector

# start_index is inclusive

def generate_feature_vectors(start_index, num_of_images, images, model):
    img_feature_vectors = {}

    count = 0
    for item in images[start_index:]:
        img = cv2.imread(item)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (224, 224))
        img = img.reshape(1, 224, 224, 3)

        feature_vector = model.predict(img, verbose=0).reshape(2048,)

        img_filename = item.split('\\')[-1]
        img_feature_vectors[img_filename] = feature_vector

        count += 1

        if (count%50==0):
            print(count)

        if (count==num_of_images):
            break

    return img_feature_vectors

In [None]:
# makes a string lowercase, prepends it with the string 'sos' and appends with 'eos'
def process_string(s):
    s = s.lower()
    return 'sos ' + s + ' eos'

In [None]:
# preprocess the captions data

# generate a dictionary of image filename -> list of captions

def generate_captions_dict(captions_path, img_feature_vectors):
    captions_dict = {}

    f = open(captions_path, 'r').read().split('\n')

    # generate a dictionary of filenames to a list of captions
    for line in f:
        try:
            filename_caption = line.split('\t') 
            filename = filename_caption[0][:-2]
            caption = process_string(filename_caption[1])

            if filename in img_feature_vectors:
                if filename not in captions_dict:
                    captions_dict[filename] = [caption]
                else:
                    captions_dict[filename].append(caption)
        except:
            pass
        
    return captions_dict

In [None]:
# generates a list of all captions in a text file 

def get_list_of_captions(captions_path):
    f = open(captions_path, 'r').read().split('\n')
    all_captions = []

    for line in f:
        try:
            filename_caption = line.split('\t') 
            caption = process_string(filename_caption[1])
            all_captions.append(caption)
        except:
            pass
        
    return all_captions

In [None]:
# generate a dictionary called vocab which will contain all words in the list of captions mapped to a unique integer

# def create_full_vocab(captions_path):
#     vocab = {}
#     all_captions = get_list_of_captions(captions_path)
#     count = 1
#     for caption in all_captions:
#         caption_as_list = caption.split()
#         for word in caption_as_list:
#             if word not in vocab:
#                 vocab[word] = count
#                 count += 1
#     return vocab

In [None]:
def create_vocab(captions_dict):
    vocab = {}
    count = 1
    for filename, captions in captions_dict.items():
        for caption in captions:
            caption_as_list = caption.split()
            for word in caption_as_list:
                if word not in vocab:
                    vocab[word] = count
                    count += 1
    return vocab

In [None]:
# # generate a dictionary called vocab which will contain all words in the list of captions mapped to a unique integer
# # does not include words that appear less than 10 times
# only keeps common words

def create_common_vocab(captions_path):
    vocab_freq = {}
    vocab_dict = {}
    all_captions = get_list_of_captions(captions_path)
    count = 1
    for caption in all_captions:
        caption_as_list = caption.split()
        for word in caption_as_list:
            if word not in vocab_freq:
                vocab_freq[word] = 1
            else:
                vocab_freq[word] = vocab_freq[word]+1
    vocab_list = [w for w in vocab_freq if vocab_freq[w] >= 10]
    count = 1
    for word in vocab_list:
        if word not in vocab_dict:
            vocab_dict[word] = count
            count+=1
    return vocab_dict

In [None]:
# takes a string and returns a list of integers where each integer corresponds to a particular word
def encode_string(s, vocab):
    s_list = s.split()
    encoded_string = []
    for word in s_list:
            if word in vocab:
                encoded_string.append(vocab[word])
    return encoded_string

In [None]:
# generates a copy of captions_dict where each caption is replaced with a list of integers where each integer corresponds to a word in the caption

def encode_captions_dict(captions_dict, vocab):
    captions_dict_encoded = copy.deepcopy(captions_dict)

    for filename, captions in captions_dict_encoded.items():
        for i, caption in enumerate(captions):
            captions[i] = encode_string(caption, vocab)
    return captions_dict_encoded

In [None]:
# get length of the longest caption in the data set
# max_len = 0
# for caption in all_captions:
#     if len(caption.split())>max_len:
#         max_len = len(caption.split())
# max_len # = 40 

In [None]:
from npy_append_array import NpyAppendArray

In [None]:
# def generate_training_data(image_feature_vectors, captions_dict_encoded, max_len, vocab_len):
#     X = []
#     y_in = []
#     y_out = []
    
#     for filename, captions in captions_dict_encoded.items():
#         for caption in captions:
#             i = 0
#             for word in caption:
#                 y_in_item = caption[:i]
#                 y_in_item = (y_in_item + max_len * [0])[:max_len]
#                 y_in.append(y_in_item)
                
#                 y_out_item = [0]*(vocab_len+1)
#                 y_out_item[word] = 1
#                 y_out.append(y_out_item)
                
#                 X_item = image_feature_vectors[filename]
#                 X.append(X_item)
#                 i+=1
#     return X, y_in, y_out

In [None]:
# def generate_training_data(image_feature_vectors, captions_dict_encoded, max_len, vocab_len):
#     X = 'X.npy'
#     y_in = 'y_in.npy'
#     y_out = 'y_out.npy'
#     count = 0
    
#     for filename, captions in captions_dict_encoded.items():
#         print(count)
#         for caption in captions:
#             i = 0
#             for word in caption:
#                 y_in_item = caption[:i]
#                 y_in_item = (y_in_item + max_len * [0])[:max_len]
#                 with NpyAppendArray(y_in) as npaa:
#                     npaa.append(np.array(y_in_item))
                
#                 y_out_item = [0]*(vocab_len+1)
#                 y_out_item[word] = 1
#                 with NpyAppendArray(y_out) as npaa:
#                     npaa.append(np.array(y_out_item))
                
#                 X_item = image_feature_vectors[filename]
#                 with NpyAppendArray(X) as npaa:
#                     npaa.append(np.array(X_item))
#                 i+=1
#         count += 1

In [None]:
# def data_generator(image_feature_vectors, captions_dict_encoded, batch_size):
#     X = []
#     y_in = []
#     y_out = []
#     n = 0
#     while 1:
#         for filename, captions in captions_dict_encoded.items():
#             n+=1
#             for caption in captions:
#                 i = 0
#                 for word in caption:
#                     y_in_item = caption[:i]
#                     y_in_item = (y_in_item + max_len * [0])[:max_len]
#                     y_in.append(y_in_item)

#                     y_out_item = [0]*(3988+1)
#                     y_out_item[word] = 1
#                     y_out.append(y_out_item)

#                     X_item = image_feature_vectors[filename]
#                     X.append(X_item)
#                     i+=1
#             if n==batch_size:
#                 yield [[np.array(X), np.array(y_in)], np.array(y_out)]
#                 X = []
#                 y_in = []
#                 y_out = []
#                 n=0

In [None]:
def generate_training_data(image_feature_vectors, captions_dict_encoded, max_len, vocab_len, batch_size):
    X_file = 'X.npy'
    y_in_file = 'y_in.npy'
    y_out_file = 'y_out.npy'
    X = []
    y_in = []
    y_out = []
    n = 0
    for filename, captions in captions_dict_encoded.items():
        n+=1
        for caption in captions:
            i = 0
            for word in caption:
                y_in_item = caption[:i]
                y_in_item = (y_in_item + max_len * [0])[:max_len]
                y_in.append(y_in_item)

                y_out_item = [0]*(vocab_len+1)
                y_out_item[word] = 1
                y_out.append(y_out_item)

                X_item = image_feature_vectors[filename]
                X.append(X_item)
                i+=1
        if n==batch_size:
            print(n)
            with NpyAppendArray(X_file) as npaa:
                npaa.append(np.array(X))
            with NpyAppendArray(y_in_file) as npaa:
                npaa.append(np.array(y_in))
            with NpyAppendArray(y_out_file) as npaa:
                npaa.append(np.array(y_out))
            X = []
            y_in = []
            y_out = []
            n=0

In [None]:
img_feature_vectors = generate_feature_vectors(0, 8091, images, resnet_model)

In [None]:
captions_dict = generate_captions_dict(captions_path, img_feature_vectors)
vocab = create_common_vocab(captions_path)
captions_dict_encoded = encode_captions_dict(captions_dict, vocab)

In [None]:
len(vocab)

In [None]:
# np.save('img_feature_vectors.npy', img_feature_vectors)
# np.save('vocab.npy', vocab)
# np.save('captions_dict_encoded.npy', captions_dict_encoded)

In [None]:
img_feature_vectors = np.load('img_feature_vectors.npy', allow_pickle='TRUE').item()
captions_dict_encoded = np.load('captions_dict_encoded.npy', allow_pickle='TRUE').item()
vocab = np.load('vocab.npy', allow_pickle='TRUE').item()

In [None]:
import time
start_time = time.time()

vocab_len = len(vocab)

generate_training_data(img_feature_vectors, captions_dict_encoded, 40, vocab_len, 1000)

print(time.time()-start_time)

In [None]:
# np.save('X.npy', X)
# np.save('y_in.npy', y_in)
# np.save('y_out.npy', y_out)

In [None]:
del X
del y_in
del y_out

In [None]:
# next stage - model creation and testing

In [None]:
#version 1

embedding_size = 128
vocab_len = 3988+1
max_len = 40

#image
x = keras.Sequential([
    keras.layers.Input(shape=(2048,)),
    keras.layers.Dense(128, activation='relu'),
    keras.layers.RepeatVector(max_len)
])

#caption
y = keras.Sequential([
    keras.layers.Embedding(vocab_len, embedding_size, input_length=max_len),
    keras.layers.LSTM(256, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(embedding_size))
])

#concatinate outputs of y and x
z = keras.layers.Concatenate()([x.output, y.output])
z = keras.layers.LSTM(128, return_sequences=True)(z)
z = keras.layers.LSTM(512, return_sequences=False)(z)
z = keras.layers.Dense(vocab_len, activation='softmax')(z)

model = Model(inputs=[x.input, y.input], outputs=z)

model.compile(loss='categorical_crossentropy', optimizer='RMSprop', metrics=['accuracy'])
model.summary()

In [None]:
#version 2

vocab_size = 1965+1
max_len = 40
embedding_size = 200

# image feature extractor model
image_input = Input(shape=(2048,))
ii1 = Dropout(0.5)(image_input)
ii2 = Dense(256, activation='relu')(ii1)

# partial caption sequence model
caption_input = Input(shape=(max_len,))
ci1 = Embedding(vocab_size, embedding_size, mask_zero=True)(caption_input)
ci2 = Dropout(0.5)(ci1)
ci3 = LSTM(256)(ci2)

# decoder (feed forward) model
output = add([ii2, ci3])
output = Dense(256, activation='relu')(output)
output = Dense(vocab_size, activation='softmax')(output)

# merge the two input models
model = Model(inputs=[image_input, caption_input ], outputs=output)

In [None]:
max_len = 40

In [None]:
model = keras.models.load_model('models/image_caption_gen_v2.h5')

In [None]:
vocab_inv = {v: k for k, v in vocab.items()}

In [None]:
def generate_caption(img_feature_vector):
    encoded_caption = np.zeros(max_len)
    encoded_caption[0] = 1
    encoded_caption = np.array([encoded_caption])
    
    for i in range(max_len-1):
        next_word = np.argmax(model.predict([img_feature_vector, encoded_caption]))
        encoded_caption[0][i+1] = next_word
        if next_word == vocab['eos']:
            break;
    return encoded_caption[0]

In [None]:
def generate_img_feature_vector(filename):
    img = cv2.imread(filename)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (224, 224))
    img = img.reshape(1, 224, 224, 3)
    
    feature_vector = resnet_model.predict(img, verbose=0).reshape(1, 2048)
    
    return feature_vector

In [None]:
def decode_caption(encoded_caption):
    decoded_caption = []
    for word in encoded_caption:
        if word == 0:
            return " ".join(decoded_caption)
        decoded_caption.append(vocab_inv[word])
    return " ".join(decoded_caption)

In [None]:
img_num = 8011

In [None]:
model = keras.models.load_model('models/image_caption_gen_v1.h5')

In [None]:
model = keras.models.load_model('models/image_caption_gen_v2.h5')

In [None]:
decode_caption(generate_caption(generate_img_feature_vector(images[img_num])))

In [None]:
plt.figure()
img = cv2.imread(images[img_num])
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
plt.imshow(img)

In [None]:
word_embeddings = {}
f = open("glove/glove.6B.200d.txt", encoding="utf-8")
for line in f:
    word_vector = line.split()
    word = word_vector[0]
    vector = np.asarray(word_vector[1:], dtype='float32')
    word_embeddings[word] = vector
f.close()

In [None]:
embedding_size = 200
vocab_size = 1965+1
# word to index

embedding_matrix = np.zeros((vocab_size, embedding_size))

for word, i in vocab.items():
    #if i < max_words:
    embedding_vector = word_embeddings.get(word)
    if embedding_vector is not None:
        # Words not found in the embedding index will be all zeros
        embedding_matrix[i] = embedding_vector

In [None]:
np.save('embedding_matrix.npy', embedding_matrix)

In [None]:
# X = np.load('X.npy', allow_pickle='TRUE')
# y_in = np.load('y_in.npy', allow_pickle='TRUE')
# y_out = np.load('y_out.npy', allow_pickle='TRUE')