In [None]:
# data file location
data_dir = './Flickr8k_Dataset/'
captions_file = './Flickr8k_text/Flickr8k.token.txt'

In [None]:
# variables
start_word = 'startseq'
end_word = 'endseq'
unknown_word = 'unk'
extracted_features_file = 'features.pkl'
embedding_dim = 256
lstm_units = 256

In [None]:
# list of train and validation images
train_image_id_list = []
val_image_id_list = []

In [None]:
# create list of train images
train_images = './Flickr8k_text/Flickr_8k.trainImages.txt'
with open(train_images, 'r') as f:
    image_list = f.readlines()
    for image in image_list:
        train_image_id_list.append(image.split('.')[0])

In [None]:
# create list of train images
val_images = './Flickr8k_text/Flickr_8k.devImages.txt'  
with open(val_images, 'r') as f:
    image_list = f.readlines()
    for image in image_list:
        val_image_id_list.append(image.split('.')[0])

In [None]:
print(len(train_image_id_list))
print(len(val_image_id_list))

In [None]:
# open file for reading
f = open(captions_file, 'r')

In [None]:
# read file
sentences = f.readlines()

In [None]:
# create a word tokenizer
from tensorflow.keras.preprocessing.text import Tokenizer
tokenizer = Tokenizer()

In [None]:
from tensorflow.keras.preprocessing.text import text_to_word_sequence

# create a list of image ids and captions
image_ids = []
captions = {}
for sentence in sentences:
    image_and_caption_index, caption = sentence.split('\t')
    image_id = image_and_caption_index.split('.')[0]
    caption = start_word + ' ' + caption + ' ' + end_word
    caption = text_to_word_sequence(caption)
    caption = ' '.join(caption)
    # append to lists
    if image_id not in image_ids:
        image_ids.append(image_id)
        captions[image_id] = []
    captions[image_id].append(caption)

In [None]:
print(len(image_ids))
print(len(captions))
print(captions[image_ids[3]])

In [None]:
# create a list of all captions
all_captions = []
for image_id, caption in captions.items():
    all_captions += caption

In [None]:
len(all_captions)

In [None]:
# fit tokenizer on all captions
tokenizer.fit_on_texts(all_captions)

In [None]:
# define vocabulary size
vocab_size = len(tokenizer.word_index) + 1
vocab_size

In [None]:
max_len = max(len(text_to_word_sequence(caption)) for caption in all_captions)
max_len

In [None]:
# VGG16 model pretrained on ImageNet to be used as our CNN encoder
from tensorflow.keras.applications.vgg16 import VGG16

In [None]:
base_model = VGG16(include_top=True, weights='imagenet')
base_model.summary()

In [None]:
# create a new model using CGG16 but without the softmax prediction layer
from tensorflow.keras.models import Model

In [None]:
feature_extractor_model = Model(inputs=base_model.inputs, outputs=base_model.layers[-2].output)
feature_extractor_model.summary()

In [None]:
# extract features
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications.vgg16 import preprocess_input

features = {}

for image_id in image_ids:
    image = data_dir + image_id + '.jpg'
    feature = []
    try:
        image = load_img(image, target_size=(224, 224))
        image = img_to_array(image)
        image = image.reshape((1, 224, 224, 3))
        image = preprocess_input(image)

        feature = feature_extractor_model.predict(image)
    except FileNotFoundError:
        pass
    features[image_id] = feature

In [None]:
# store features in a pickle file
from pickle import dump
dump(features, open(extracted_features_file, 'wb'))

In [None]:
# open extracted features file
from pickle import load
f_extracted = load(open(extracted_features_file, 'rb'))

In [None]:
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

def load_data(image_id, caption_list):
    X_encoder = []
    X_decoder = []
    y_decoder = []
        
    # load features from file
    image = f_extracted[image_id]
    
    # integer encode the caption
    integer_encoded_caption_list = tokenizer.texts_to_sequences(caption_list)
    
    # train on first caption for testing purposes
    # integer_encoded_caption_list = [integer_encoded_caption_list[0]]
    
    for caption in integer_encoded_caption_list:
        for i in range(1, len(caption)):
            in_seq = caption[:i]
            out_seq = caption[i]
                
            X_encoder.append(image)
            X_decoder.append(in_seq)
            y_decoder.append(out_seq)
        
    return np.array(X_encoder), np.array(X_decoder), np.array(y_decoder)

In [None]:
def train_data_generator():
    while True:
        # for image_id, caption_list in captions.items():
        for image_id in train_image_id_list:
            caption_list = []
            try:
                caption_list = captions[image_id]
            except KeyError:
                pass
            if len(caption_list) == 0:
                continue
            if len(f_extracted[image_id]) == 0:
                continue
            X_encoder, X_decoder, y_decoder = load_data(image_id, caption_list)
            X_encoder = X_encoder.reshape((X_encoder.shape[0], 4096))
            X_decoder = pad_sequences(X_decoder, maxlen=max_len)
            y_decoder = to_categorical(y_decoder, num_classes=vocab_size)
            
            yield [[X_encoder, X_decoder], y_decoder]

In [None]:
def val_data_generator():
    while True:
        for image_id in val_image_id_list:
            caption_list = []
            try:
                caption_list = captions[image_id]
            except KeyError:
                pass
            if len(caption_list) == 0:
                continue
            if len(f_extracted[image_id]) == 0:
                continue
            X_encoder, X_decoder, y_decoder = load_data(image_id, caption_list)
            X_encoder = X_encoder.reshape((X_encoder.shape[0], 4096))
            X_decoder = pad_sequences(X_decoder, maxlen=max_len)
            y_decoder = to_categorical(y_decoder, num_classes=vocab_size)
            
            yield [[X_encoder, X_decoder], y_decoder]

In [None]:
# create sequential models
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Embedding, LSTM, RepeatVector, Activation, Input, add, TimeDistributed, Dropout

In [None]:
# create image encoder model(CNN)
input_image_encoder = Input(shape=(4096,))
fc = Dense(embedding_dim, activation='relu')(input_image_encoder)
output_image_encoder = RepeatVector(max_len)(fc)

# create decoder model(RNN)
input_sequence_decoder = Input(shape=(max_len,))
embedding_sequence_model = Embedding(vocab_size, embedding_dim)(input_sequence_decoder)
lstm_sequence_model = LSTM(lstm_units, return_sequences=True)(embedding_sequence_model)
td_sequence_model = TimeDistributed(Dense(embedding_dim, activation='relu'))(lstm_sequence_model)

# merge inputs
merged = add([output_image_encoder, td_sequence_model])

# language model (decoder)
lstm_encoder_decoder = LSTM(lstm_units, return_sequences=False)(merged)
fc_encoder_decoder = Dense(lstm_units, activation='relu')(lstm_encoder_decoder)
dropout_encoder_decoder = Dropout(0.5)(fc_encoder_decoder)
output_encoder_decoder = Dense(vocab_size, activation='softmax')(dropout_encoder_decoder)

# tie it together [image, seq] [word]
model = Model(inputs=[input_image_encoder, input_sequence_decoder], outputs=output_encoder_decoder)

model.summary()

In [None]:
# compile model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
# callback for saving model
from tensorflow.keras.callbacks import ModelCheckpoint

filepath="NIC-{epoch:02d}-{loss:.4f}.h5"
checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=1, save_best_only=True, mode='min')
callbacks_list = [checkpoint]

In [None]:
# create generator callback
generator_train = train_data_generator()
generator_val = val_data_generator()

In [None]:
model.fit_generator(generator_train, epochs=10, verbose=1, steps_per_epoch=len(train_image_id_list), validation_data=generator_val, validation_steps=len(val_image_id_list), callbacks=callbacks_list)

In [None]:
# create test image id list
test_image_id_list = []

# create list of test images
test_images = './Flickr8k_text/Flickr_8k.testImages.txt'
with open(test_images, 'r') as f:
    image_list = f.readlines()
    for image in image_list:
        test_image_id_list.append(image.split('.')[0])

In [None]:
len(test_image_id_list)

In [None]:
# extract features of a random test file
test_image_id = test_image_id_list[2]

image_feature = f_extracted[test_image_id]

In [None]:
image_feature.shape

In [None]:
# create unique integer to word mapping
int_to_word = {int:word for word, int in tokenizer.word_index.items()}

In [None]:
# initialize seed word
seed_word = start_word
seed_word

In [None]:
# get caption list for testing purposes
caption_list = captions[test_image_id]

X_encoder = image_feature

In [None]:
# generate sequence
for i in range(max_len):
    # integer encode seed word
    sequence = tokenizer.texts_to_sequences([seed_word])
    # pad sequence
    sequence = pad_sequences(sequence, maxlen=max_len)
    # create input tensor
    X_test = [X_encoder, sequence]
    # predict next word in sequence
    y_pred = model.predict(X_test, verbose=0)
    # calculate maximum index from one hot encoded tensor
    y_pred = np.argmax(y_pred)
    # convert index back to word
    word = int_to_word[y_pred]
    # break if word cannot be mapped back
    if word is None:
        break
    # append current predicted word to the sequence which will be the input in the next timestep
    seed_word += ' ' + word
    # break if endseq is predicted
    if word == 'endseq':
        break

# print generated sequence
print(seed_word)