In [53]:
from os import listdir
from pickle import dump
from keras.applications.resnet50 import ResNet50
from keras.applications.resnet50 import preprocess_input
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.models import Model

In [54]:
def extract_feature_vectors(directory):
    model = ResNet50()
    # remove the softmax layer
    model.layers.pop()
    model = Model(inputs=model.inputs, outputs=model.layers[-1].output)
    # dictionary where we store the feature vectors
    featuresDict = dict()
    # used to keep track of the progress
    counter = 0
    # iterate through all the images in the current directory (all images in Flickr8k)
    for name in listdir(directory):
        # load current image and convert pixels to array
        filename = directory + '/' + name
        # the target size is 224x224 because it is the expected size for resnet50
        image = load_img(filename, target_size=(224, 224))
        image = img_to_array(image)
        # reshape data to four dimension as expected by resnet50
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        # using resnet preprocessing
        image = preprocess_input(image)
        # run inference with our version of resnet50 (missing the softmax layer), we have verbose=0 to not show progress
        feature = model.predict(image, verbose=0)
        # get image name from filename (cut out the .jpg)
        image_id = name.split('.')[0]
        # insert feature in features dictionary
        featuresDict[image_id] = feature
        # shows progress
        print(counter)
        counter += 1
    return featuresDict

In [55]:
# directory where we can find all images in Flickr8k
imagedirectory = "C:\\Users\\brace\\Downloads\\Flickr8k_Dataset\\Flicker8k_Dataset"
# using function defined above to run resnet50 on all images
features = extract_feature_vectors(imagedirectory)
# dump feature dictionary python object to a file to use it in the future without having to run resnet50 again
dump(features, open('features.pkl', 'wb'))

0
1
2
3
4
5
6
7
8
9
10
11


KeyboardInterrupt: 

In [58]:
import string
def descriptionsToDict(doc):
    # create a dictionary where keys are imageNames and values are a list of 5 descriptions
    imageToDescription = dict()
    # process lines
    for line in doc.split('\n'):
        # get all words by splitting line by white space
        words = line.split()
        # words should have at least an imagename and a word for descritions
        if len(words) < 2:
            continue
        # take the first word as the image name and remove .jpg
        image_name = words[0].split('.')[0]
        # convert description words array to string (no image name)
        image_desc = ' '.join(words[1:])
        # create the list if imagename is not already in the dictionary
        if image_name not in imageToDescription:
            imageToDescription[image_name] = list()
        # add current description to dictionary
        imageToDescription[image_name].append(image_desc)
    return imageToDescription

def clean_descriptions(descriptions):
    # loop through all keys in dictionary
    for key, desc_list in descriptions.items():
        # loop through all sentences associated to a given key
        for i in range(len(desc_list)):
            # transform current sentence to array of words
            desc = desc_list[i].split()
            # convert all words to lower case
            desc = [word.lower() for word in desc]
            # remove punctuation from each word
            desc = [w.translate(str.maketrans('', '', string.punctuation)) for w in desc]
            # store cleaned descriptions as string
            desc_list[i] =  ' '.join(desc)

def create_vocabulary(descriptions):
    # use a set to create a vocabulary of all unique words
    vocabulary = set()
    for key in descriptions.keys():
        [vocabulary.update(d.split()) for d in descriptions[key]]
    return vocabulary

def saveToFile(descriptions, filename):
    lines = list()
    # loop through all descriptions
    for key, desc_list in descriptions.items():
        for desc in desc_list:
            # add descriton to list
            lines.append(key + ' ' + desc)
    # store the clean descriptions in a file
    file = open(filename, 'w')
    file.write('\n'.join(lines))
    file.close()

# file with all descriptions for all images
descriptionFile = "C:\\Users\\brace\\Downloads\\Flickr8k_text\\Flickr8k.token.txt"
# load descriptions from descriptionFile
file = open(descriptionFile, 'r')
descriptions = file.read()
file.close()
# create a dictionary with the descriptions
descriptions = descriptionsToDict(descriptions)
print('Loaded: %d ' % len(descriptions))
# clean descriptions
clean_descriptions(descriptions)
# summarize vocabulary
vocabulary = create_vocabulary(descriptions)
print('Vocabulary Size: %d' % len(vocabulary))
# save to file
saveToFile(descriptions, 'descriptions.txt')

Loaded: 8092 
Vocabulary Size: 8828


In [60]:
from pickle import load

# load a pre-defined list of photos
def load_dataset(filename):
    # read data from file
    file = open(filename, 'r')
    text = file.read()
    file.close()
    dataset = list()
    for line in text.split('\n'):
        # do not consider empty lines
        if len(line) < 1:
            continue
        # append current name without .jpg
        dataset.append(line.split('.')[0])
    # make sure that there is no duplicate    
    return set(dataset)

def filter_processed_descriptions(filename, dataset):
    # read data from file
    file = open(filename, 'r')
    doc = file.read()
    file.close()
    descriptions = dict()
    for line in doc.split('\n'):
        # get all words in a line
        words = line.split()
        # get descrition without imageName
        image_desc = words[1:]
        # check if name is in current dataset
        if words[0] in dataset:
            # if this is first description for given imageName we create list
            if words[0] not in descriptions:
                descriptions[words[0]] = list()
            # add start and end words to descrition before storing it in dictionary
            descriptions[words[0]].append('startseq ' + ' '.join(image_desc) + ' endseq')
    return descriptions

def filter_image_features(filename, dataset):
    # load all features from pickle file
    all_features = load(open(filename, 'rb'))
    # filter features
    features = {k: all_features[k] for k in dataset}
    return features

# dataset with names of images in training dataset
filename = "C:\\Users\\brace\\Downloads\\Flickr8k_text\\Flickr_8k.trainImages.txt"
train = load_dataset(filename)
# load descriptions and image feaures for training dataset
train_descriptions = filter_processed_descriptions('descriptions.txt', train)
train_features = filter_image_features('features.pkl', train)

In [61]:
from numpy import array
import tensorflow
from pickle import load
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Masking
from keras.layers import RepeatVector
from keras.layers import Embedding
from keras.layers import concatenate
from keras.layers import Dropout
from keras.layers.merge import add
from keras.callbacks import ModelCheckpoint

In [71]:
# create sequences of images features, input sequences and output words for an image
def create_sequences(desc_list, imageFeature, tokenizer, max_length):
    inputImage, inputText, output = list(), list(), list()
    # loop through every description for specific image
    for desc in desc_list:
        # encode the current description to a sequence of numeric values using a predefined tokenier which was fit on the training data
        seq = tokenizer.texts_to_sequences([desc])[0]
        # split one sequence into multiple X,y pairs
        for i in range(1, len(seq)): 
            # pad input sequence for them to have all the same size (the same as max_length)
            currInput = pad_sequences([seq[:i]], maxlen=max_length)[0]
            # transform output value to categorical (a row of the size of the vocabulary where all entries are 0 except from the entry corresponding to seq[i])
            currOutput = to_categorical([seq[i]], num_classes=vocab_size)[0]
            inputImage.append(imageFeature)
            inputText.append(currInput)
            output.append(currOutput)
    return array(inputImage), array(inputText), array(output)

In [98]:
# define the captioning model
def create_model(vocab_size, max_length):
    # this is the connection to the last layer of the CNN (whose output has size 1000)
    inputFromCNN = Input(shape=(1000,))
    # we use a dropout layer because in the paper it was suggested that it improves performance (we also use a small training dataset)
    dropoutCNN = Dropout(0.5)(inputFromCNN)
    # we use a fully connected layer to make the output of the CNN have the same size as the word embeddings
    CNNToEmbeddings = Dense(256, activation='relu')(dropoutCNN)
    # add time dimension so that this layer output shape is (None, 1, embed_size), we also need to use masking to avoid size errors
    final_cnn = Masking()(RepeatVector(1)(CNNToEmbeddings))
    # we create an input source from where we take the tokenized words, the max_length is the length of the longest caption
    inputWords = Input(shape=(max_length,))
    # we embedd the tokenized words to vectors of the size of the vocabulary
    wordsEmbedding = Embedding(vocab_size, 256, mask_zero=True)(inputWords)
    # we concatenate theoutput from the CNN to the word embeddings, in this way the first step of the LSTM will be fed with
    # the output of the CNN and the other steps will be fed with word embeddings
    concateateInputs = concatenate([final_cnn, wordsEmbedding], axis=1)
    # an additional droupout layer for further regularization
    droupoutLSTM = Dropout(0.5)(concateateInputs)
    # we have an LSTM as specified in the paper, the input size is 256 (size of the embeddings)
    LSTMLayer = LSTM(256)(droupoutLSTM)
    # after the LSTM we have a fully connected layer (softmax activations) to output a probability distribution over the vocabulary size 
    outputs = Dense(vocab_size, activation='softmax')(LSTMLayer)
    # set inputs (CNN output and words) and outputs (correct word) for the model 
    model = Model(inputs=[inputFromCNN, inputWords], outputs=outputs)
    # we compile the model using categorical_crossentropy and adam optimizer
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    # visualize model
    print(model.summary())
    return model

In [99]:
def data_generator(descriptions, imageFeatures, tokenizer, max_length):
        # loop through all key value pairs in the descritions
        for key, descriptionList in descriptions.items():
            # retrieve the feature vector for the current image
            imageFeature = imageFeatures[key][0]
            # get tokenized input/output sequences for every step in the lstm, they should also have attachedinput image
            inputImage, inputText, output = create_sequences(descriptionList, imageFeature, tokenizer, max_length)
            #  yield is used to obtain the result of the current iteration of the generator
            yield ((inputImage, inputText), output)

In [100]:
# get a list from dictionary of training descritption to be fed into tokenizer
listFromDict = list()
# append all the descriptions to the list
for key in train_descriptions.keys():
        [listFromDict.append(d) for d in train_descriptions[key]]
# tokenize all words (the machine learning model is going to need numbers as input)
tokenizer = Tokenizer()
tokenizer.fit_on_texts(listFromDict)
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' % vocab_size)
# get the maximum length of a descrition in the training dataset (size needed when creating model to train)
max_length = max(len(d.split()) for d in listFromDict)
print('Description Length: %d' % max_length)

Vocabulary Size: 7633
Description Length: 38


In [103]:
# use function defined above to create the model
model = create_model(vocab_size, max_length)
# train the model, 
epochs = 20
generator = data_generator(train_descriptions, train_features, tokenizer, max_length)
model.fit(generator, epochs=16, batch_size=32, steps_per_epoch=len(train_descriptions)/16, verbose=1)
model.save('finalModel.h5')

Model: "functional_65"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_40 (InputLayer)           [(None, 1000)]       0                                            
__________________________________________________________________________________________________
dropout_12 (Dropout)            (None, 1000)         0           input_40[0][0]                   
__________________________________________________________________________________________________
dense_12 (Dense)                (None, 256)          256256      dropout_12[0][0]                 
__________________________________________________________________________________________________
repeat_vector_6 (RepeatVector)  (None, 1, 256)       0           dense_12[0][0]                   
______________________________________________________________________________________

KeyboardInterrupt: 

In [107]:
from numpy import argsort
# this function recovers the original word from a token (if there is a translation)
def word_to_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None
def beam_search_predictions(model, tokenizer, image, max_len, beam_index):
    # create a token for the startseq
    start = tokenizer.texts_to_sequences(['startseq'])[0]
    # create a matrix where we keep track of the best scores
    start_word = [[start, 0.0]]
    # we make sure that we do not perform inference more times than the size of the longest sequence
    while len(start_word[0][0]) < max_len:
        # temporary array used to update the matrix with the best values
        temp = []
        # loop through all our current best results
        for s in start_word:
            # we pad our current sequence to max_length
            paddedSeq = pad_sequences([s[0]], maxlen=max_len, padding='post')
            # we run inference using the image and the current sequence (predict next word)
            predictions = model.predict([image,paddedSeq], verbose=0)
            # we get the top predictions from the probability distribution over the vocabulary
            bestPredictions = argsort(predictions[0])[-beam_index:]
            # we add the best predictions into our count for best predictions
            for word in bestPredictions:
                # get current sequence and probability
                currentSeq, prob = s[0][:], s[1]
                # append current word to sequence
                currentSeq.append(word)
                # update probability
                prob += predictions[0][word]
                # append to changes that need to be committed
                temp.append([currentSeq, prob])
        # commit changes to structure that keeps track of best result            
        start_word = temp
        # Sorting according to the probabilities
        start_word = sorted(start_word, reverse=False, key=lambda l: l[1])
        # Getting the top words
        start_word = start_word[-beam_index:]
    
    # get the best prediction
    bestPrediction = start_word[-1][0]
    # retrieve actual sentence from tokens
    intermediate_caption = [word_to_id(i, tokenizer) for i in bestPrediction]

    final_caption = []
    # get all the words in the predicted caption that do not include start and end token
    for i in intermediate_caption:
        if i != 'endseq':
            final_caption.append(i)
        else:
            break
    
    final_caption = ' '.join(final_caption[1:])
    return final_caption


In [109]:
from keras.models import load_model
# we define a function to generate a feature vector from a specific image
def generateFeatureVector(filename):
    model = ResNet50()
    # remove the kast layer of the ResNet50 model
    model.layers.pop()
    model = Model(inputs=model.inputs, outputs=model.layers[-1].output)
    image = load_img(filename, target_size=(224, 224))
    # preprocess the image
    image = img_to_array(image)
    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    image = preprocess_input(image)
    # return feature vector computed with inference
    return model.predict(image, verbose=0)


model = load_model('model_19.h5')
photoPath = "C:\\Users\\brace\\Downloads\\Flickr8k_Dataset\\Flicker8k_Dataset\\3222055946_45f7293bb2.jpg"
# get the feature vector from the image
photo = generateFeatureVector(photoPath)
# generate description and print it
description = beam_search_predictions(model, tokenizer, photo, max_length, 5)
print(description)

a group of young boys playing soccer on a sunny day


In [114]:
from nltk.translate.bleu_score import corpus_bleu
def evaluate_model(model, descriptions, photos, tokenizer, max_length):
    # actual keeps track of correct sequence while predicted is the predicted sequence
    actual, predicted = list(), list()
    # use a counter to keep track of the progress
    counter = 0
    # looping through every image in dataset
    for key, desc_list in descriptions.items():
        # prediction for current image
        prediction = beam_search_predictions(model, tokenizer, photos[key], max_length, 5)
        # get the correct descriptions and remove the startseq and endseq tokens
        correctDescritions = [d.split()[1:-1] for d in desc_list]
        # append the prediction and the correct description to the lists
        actual.append(correctDescritions)
        predicted.append(prediction.split())
        counter = counter + 1
        print(counter)
    # calculate BLEU score for 1 ,2 ,3 and 4 GRAM
    print(corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
    print(corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
    print(corpus_bleu(actual, predicted, weights=(0.33, 0.33, 0.33, 0)))
    print(corpus_bleu(actual, predicted))

In [None]:
# load test dataset (descriptions)
testDataset = "C:\\Users\\brace\\Downloads\\Flickr8k_text\\Flickr_8k.testImages.txt"
test = load_dataset(testDataset)
test_descriptions = filter_processed_descriptions('descriptions.txt', test)
# load test dataset (pre-processed image features)
test_features = filter_image_features('features.pkl', test)

# load the model and evaluate using the test dataset
filename = 'model_19.h5'
model = load_model(filename)
evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
