#CS550 Project - Image Captioning

In [1]:
#Importing Necessary Libraries

from os import listdir
from pickle import dump
import tensorflow as tf
from zipfile import ZipFile
from keras.applications.vgg16 import VGG16
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.applications.vgg16 import preprocess_input
from keras.models import Model

In [3]:
# check gpu connectivity

tf.test.gpu_device_name()

'/device:GPU:0'

In [4]:
# Function to extract features from each photo in the directory

from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Model
from os import listdir

def extract_features(image_directory):
    base_model = VGG16(weights='imagenet')
    feature_extractor = Model(inputs=base_model.input, outputs=base_model.get_layer('block5_pool').output)

    extracted_features = dict()

    for image_name in listdir(image_directory):
        image_path = image_directory + '/' + image_name
        image = load_img(image_path, target_size=(224, 224))
        image_array = img_to_array(image)
        image_array = image_array.reshape((1, image_array.shape[0], image_array.shape[1], image_array.shape[2]))
        preprocessed_image = preprocess_input(image_array)

        features = feature_extractor.predict(preprocessed_image, verbose=0)
        image_id = image_name.split('.')[0]
        extracted_features[image_id] = features

    return extracted_features

In [5]:
file_name = "/content/drive/MyDrive/ML_Project/Images.zip"
with ZipFile(file_name,'r') as zip:
  zip.extractall("/content/Flicker8k")
  print('Dataset Loaded')

Dataset Loaded


In [6]:
#Extracting features from the actual dataset

directory = '/content/Flicker8k'
features = extract_features(directory)
print('Extracted Features: %d' % len(features))

dump(features, open('features.pkl', 'wb'))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5
Extracted Features: 8091


In [7]:
#Loading the descriptions given the document

import string

def read_document(filename):
    with open(filename, 'r') as file:
        document = file.read()
    return document

def parse_descriptions(document):
    descriptions_mapping = dict()
    for line in document.split('\n'):
        tokens = line.split()
        if len(tokens) < 2:
            continue

        image_id, image_desc = tokens[0], tokens[1:]
        image_id = image_id.split('.')[0]
        image_desc = ' '.join(image_desc)

        if image_id not in descriptions_mapping:
            descriptions_mapping[image_id] = list()
        descriptions_mapping[image_id].append(image_desc)

    return descriptions_mapping

In [8]:
#cleaning the descriptions

import string

def clean_descriptions(descriptions):
    punctuation_table = str.maketrans('', '', string.punctuation)
    for image_id, desc_list in descriptions.items():
        for i in range(len(desc_list)):
            description = desc_list[i]
            words = description.split()
            words = [word.lower() for word in words]
            words = [word.translate(punctuation_table) for word in words]
            words = [word for word in words if len(word) > 1]
            words = [word for word in words if word.isalpha()]
            desc_list[i] = ' '.join(words)

def create_vocabulary(descriptions):
    vocabulary = set()
    for key in descriptions.keys():
        [vocabulary.update(description.split()) for description in descriptions[key]]
    return vocabulary

def save_descriptions(descriptions, filename):
	lines = list()
	for key, desc_list in descriptions.items():
		for desc in desc_list:
			lines.append(key + ' ' + desc)
	data = '\n'.join(lines)
	file = open(filename, 'w')
	file.write(data)
	file.close()

In [9]:
#using the aove defined functions

filename = '/content/drive/MyDrive/ML_Project/captions.txt'
doc = read_document(filename)
descriptions = parse_descriptions(doc)
print('Loaded: %d ' % len(descriptions))

clean_descriptions(descriptions)
vocabulary = create_vocabulary(descriptions)
print('Vocabulary Size: %d' % len(vocabulary))

save_descriptions(descriptions, 'descriptions.txt')

Loaded: 8091 
Vocabulary Size: 8680


In [34]:
from pickle import load
from keras.preprocessing.text import Tokenizer

#tokenizing the descriptions

def load_image_ids(filename):
    document = read_document(filename)
    image_ids = set()
    for line in document.split('\n'):
        if len(line) < 1:
            continue
        identifier = line.split('.')[0]
        image_ids.add(identifier)
    return image_ids

def load_descriptions(filename, image_ids):
    document = read_document(filename)
    descriptions = dict()
    for line in document.split('\n'):
        tokens = line.split()
        image_id, image_desc = tokens[0], tokens[1:]
        if image_id in image_ids:
            if image_id not in descriptions:
                descriptions[image_id] = list()
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
            descriptions[image_id].append(desc)
    return descriptions

def load_image_features(filename, image_ids):
    all_features = load(open(filename, 'rb'))
    features = {k: all_features[k] for k in image_ids}
    return features

def max_length(descriptions):
    lines = to_lines(descriptions)
    return max(len(d.split()) for d in lines)

def to_lines(descriptions):
    all_desc = list()
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

def word_for_id(integer, tokenizer):
	for word, index in tokenizer.word_index.items():
		if index == integer:
			return word
	return None

def create_description_tokenizer(descriptions):
    lines = to_lines(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer

In [37]:
filename = '/content/drive/MyDrive/ML_Project/Flickr_8k.trainImages.txt'
train = load_image_ids(filename)
print('Dataset: %d' % len(train))

train_descriptions = load_descriptions('descriptions.txt', train)
print('Descriptions: train=%d' % len(train_descriptions))

max_len = max_length(train_descriptions)
print('Description Length: %d' % max_len)

train_features = load_image_features('features.pkl', train)
print('Photos: train=%d' % len(train_features))

tokenizer = create_description_tokenizer(train_descriptions)
dump(tokenizer, open('tokeniazer.pkl', 'wb'))
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' % vocab_size)

print("Traning set loaded")

filename = '/content/drive/MyDrive/ML_Project/Flickr_8k.testImages.txt'
test = load_image_ids(filename)
print('Dataset: %d' % len(test))
# descriptions
test_descriptions = load_descriptions('descriptions.txt', test)
print('Descriptions: test=%d' % len(test_descriptions))
# photo features
test_features = load_image_features('features.pkl', test)
print('Photos: test=%d' % len(test_features))
print("Test set loaded")

Dataset: 6000
Descriptions: train=6000
Description Length: 33
Photos: train=6000
Vocabulary Size: 7507
Traning set loaded
Dataset: 1000
Descriptions: test=1000
Photos: test=1000
Test set loaded


In [13]:
from numpy import array
import tensorflow
from pickle import load
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Embedding
from keras.layers import Dropout
from keras.layers import add
from keras.callbacks import ModelCheckpoint
from numpy import argmax
from pickle import load
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.models import load_model
from nltk.translate.bleu_score import corpus_bleu
from keras.layers import concatenate
from keras.preprocessing.sequence import pad_sequences
import numpy as np

In [32]:
#defining the LSTM Model

def create_sequences(tokenizer, max_length, desc_list, photo):
    X1, X2, y = list(), list(), list()
    for desc in desc_list:
        seq = tokenizer.texts_to_sequences([desc])[0]
        for i in range(1, len(seq)):
            in_seq, out_seq = seq[:i], seq[i]
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]

            X1.append(photo)
            X2.append(in_seq)
            y.append(out_seq)

    return array(X1), array(X2), array(y)


def define_model(vocab_size, max_length):

	inputs1 = Input(shape=(1000,))
	fe1 = Dropout(0.5)(inputs1)
	fe2 = Dense(256, activation='relu')(fe1)

	inputs2 = Input(shape=(max_length,))
	se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
	se2 = Dropout(0.5)(se1)
	se3 = LSTM(256)(se2)

	decoder1 = add([fe2, se3])
	decoder2 = Dense(256, activation='relu')(decoder1)
	outputs = Dense(vocab_size, activation='softmax')(decoder2)

	model = Model(inputs=[inputs1, inputs2], outputs=outputs)
	model.compile(loss='categorical_crossentropy', optimizer='adam')

	#print(model.summary())
	return model

In [18]:
def data_generator(descriptions, photos, tokenizer, max_length):
	while 1:
		for key, desc_list in descriptions.items():
			photo = photos[key][0]
			in_img, in_seq, out_word = create_sequences(tokenizer, max_length, desc_list, photo)
			yield [[in_img, in_seq], out_word]

In [None]:
#Training the model

model = define_model(vocab_size, max_len)
epochs = 20
steps = len(train_descriptions)
print("length is :" ,steps)
for i in range(epochs):
	generator = data_generator(train_descriptions, train_features, tokenizer, max_len)
	model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)
	model.save('model_' + str(i) + '.h5')

In [40]:
#adding starts and end keywords

def generate_desc(model, tokenizer, photo, max_length):
	in_text = 'startseq'

	for i in range(max_length):

		sequence = tokenizer.texts_to_sequences([in_text])[0]
		sequence = pad_sequences([sequence], maxlen=max_length)
		yhat = model.predict([photo,sequence], verbose=0)
		yhat = argmax(yhat)
		word = word_for_id(yhat, tokenizer)
		if word is None:
			break
		in_text += ' ' + word
		if word == 'endseq':
			break
	return in_text

In [41]:
# Evaluate the skill of the model
def evaluate_model(model, descriptions, photos, tokenizer, max_length):
	actual, predicted = list(), list()
	for key, desc_list in descriptions.items():
		yhat = generate_desc(model, tokenizer, photos[key], max_length)
		references = [d.split() for d in desc_list]
		actual.append(references)
		predicted.append(yhat.split())

	print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
	print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
	print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
	print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

In [None]:
filename = 'model_0.h5'
model = load_model(filename)
evaluate_model(model, test_descriptions, test_features, tokenizer, max_len)

BLEU-1: 0.566379
BLEU-2: 0.295063
BLEU-3: 0.194548
BLEU-4: 0.081796


In [None]:
#testing the model on a image

tokenizer = load(open('tokeniazer.pkl', 'rb'))

max_length = 33
model = load_model('model_0.h5')

def extract_feature(filename):
	model1 = VGG16()
	model1.layers.pop()
	model1 = Model(inputs=model1.inputs, outputs=model1.layers[-1].output)
	image = load_img(filename, target_size=(224, 224))
	image = img_to_array(image)
	image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
	image = preprocess_input(image)
	feature = model1.predict(image, verbose=0)
	return feature

photo = extract_feature('image.jpg')
description = generate_desc(model, tokenizer, photo, max_length)

query = description
stopwords = ['startseq','endseq']
querywords = query.split()
resultwords  = [word for word in querywords if word.lower() not in stopwords]
result = ' '.join(resultwords)

print(result)