In [None]:
# to navigate through directories
import os 
import pickle
import string 
import tensorflow
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.layers import add
from keras.models import Model,load_model
# ModelCheckpoint is a class that is used to save weights of a model during training process
from keras.callbacks import ModelCheckpoint 
# Tokenizer class is used to preprocess text data and convert text corpus into integer tokens
# Which can then be passed to a NN for training
from keras.preprocessing.text import Tokenizer
# to_categorical converts class vectors into class matrices
from keras.utils import to_categorical,plot_model
# plot_sequences pads sequences of integer tokens to the same length
# often used with text data where length of sequences may vary
# This function is used after "Tokenizer" class
from keras_preprocessing.sequence import pad_sequences
from keras.applications.vgg16 import VGG16,preprocess_input
from keras.layers import Input,Dense,LSTM,Embedding,Dropout
# load_img loads an image from a file and returns it as a Numpy array, size of 
# this nummpy array is specified as target_size
# img_to_array converts image represented as a NumPy array to a single-channel 
# image array with dimensions
from keras_preprocessing.image import img_to_array,load_img
from nltk.translate.bleu_score import sentence_bleu,corpus_bleu
%matplotlib inline

In [None]:
'''
Function that takes in a directory as an argument and returns a dictionary
of features extracted from the images in the directory
'''
def extract_features(directory):
  # creating a VGG16 model and removing its final layer by .pop()
  model = VGG16()
  model.layers.pop()
  # creating a new model with inputs and outputs of the modified VGG16 model
  # for the input argument we pass the input tensors of original VGG16 
  # for output we pass the out tensor of the last layer of original VGG16
  # this way we get a new model which can be used as a feature extractor
  # as it will only output features from last layer instead os full set of prediction
  model = Model(inputs=model.inputs,outputs=model.layers[-1].output)
  print(model.summary())
  features = {}
  i=0
  for name in os.listdir(directory):
    print(i)
    img = load_img(directory+'/'+name,target_size=(224,224))
    img = img_to_array(img)
    img = img.reshape((1,img.shape[0],img.shape[1],img.shape[2]))
    img = preprocess_input(img)
    feature = model.predict(img,verbose=0)
    img_id = name.split('.')[0]
    features[img_id] = feature
    i+=1
  return features
directory ='drive/My Drive/image_captioning/Flicker8k/Flicker8k_Dataset'
features = extract_features(directory)

In [None]:
'''
load_description takes as input a filename, opens it is read mode and reading
lines of the file using readlines. Then we split each line into tokens.
Ignore the token if it's shorter than 2 tokens. First element of token is 
image_id, rest of it is image description, which we join together.
mappings is a dictionary, image_id -> image_desc. We return mappins dict.
'''
def load_description(filename):
  mappings = {}
  file = open(filename,'r')
  content = file.readlines()
  file.close()
  for lines in content:
    tokens = lines.split()
    if len(lines)<2:
      continue
    image_id,image_desc = tokens[0].split('.')[0],tokens[1:]
    image_desc = ' '.join(image_desc)
    if image_id not in mappings:
      mappings[image_id] = []
    mappings[image_id].append(image_desc)
  return mappings

'''
takes in a dictionary of descriptions as argument and cleans them by removing 
punctuations and making all of the words lowercase.
'''
def clean_description(descriptions):
  table = str.maketrans('','',string.punctuation)
  for k,image_descriptions in descriptions.items():
    for i in range(len(image_descriptions)):
      desc = image_descriptions[i]
      desc = desc.split()
      desc = [x.lower() for x in desc]
      desc = [w.translate(table) for w in desc]
      desc = [x for x in desc if len(x)>1]
      desc = [x for x in desc if x.isalpha()]
      image_descriptions[i] = ' '.join(desc)
    
'''
Takes in a dictionary of description and returns a set containing
all of the unique words in the descriptions. For each key, the function
interates over descriptions and splits each one into a lists of words.
Then we update the corpus set with the words in each description.
.update() method with sets only updates unique words, so there will be no 
duplicates in corpus.
'''
def create_corpus(descriptions):
  corpus = set()
  for k in descriptions.keys():
        # for x in descriptions[k]:
        #     corpus.update(x.split())
    [corpus.update(x.split()) for x in descriptions[k]]
  return corpus


'''
writes descriptions to the file specified
'''
def save_descriptions(desc,filename):
  lines = []
  for k,v in desc.items():
    for description in v:
      lines.append(k+' '+description)
  data = '\n'.join(lines)
  file = open(filename,'w')
  file.write(data)
  file.close()
# load all descriptions
filename = 'drive/My Drive/image_captioning/Flicker8k/Flickr8k.token.txt'
descriptions = load_description(filename)
print('Descriptions loaded: ',len(descriptions))
# clean the loaded descriptions
clean_description(descriptions)
# check the vocabulary length
vocabulary = create_corpus(descriptions)
print('Vocabulary length: ',len(vocabulary))
save_descriptions(descriptions,'drive/My Drive/image_captioning/descriptions.txt')
print('SAVED !!!')

In [None]:
'''
function to obtain the set of image ids
'''
def load_set_of_image_ids(filename):
  file = open(filename,'r')
  lines = file.readlines()
  file.close()
  image_ids = set()
  for line in lines:
    if len(line)<1:
      continue
    image_ids.add(line.split('.')[0])
  return image_ids


'''
Takes in two filenames and returns a dictionary of image desc.
first split each line in the file by space character, then split the
first character of the token list on '.'. First element of this split
is image_id and remaining is called image_desc.
if image_id is present in train_desc_names set, the function adds the
image_id and image_desc to descriptions dictionary. If not present,
the function does nothing.
'''
def load_clean_descriptions(all_desc,train_desc_names):
  file = open(all_desc,'r')
  lines = file.readlines()
  descriptions = {}
  for line in lines:
    tokens = line.split()
    image_id,image_desc = tokens[0].split('.')[0],tokens[1:]
    if image_id in train_desc_names:
      if image_id not in descriptions:
        descriptions[image_id] = []
      desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
      descriptions[image_id].append(desc)
  return descriptions

'''
takes in a filename and dataset and returns a dictionary of image features
using pickle to open the specified file in binary read mode and load contents
of the file into the variable "all_features". Create a new dictionary and 
iterate over keys in dataset and for each key, add an entry to the 
"features" dictionary with key k and the value "all_features[k]"
'''
def load_image_features(filename,dataset):
  all_features = pickle.load(open(filename,'rb'))
  features = {k:all_features[k] for k in dataset}
  return features

# load train image ids
train = 'drive/My Drive/image_captioning/Flicker8k/Flickr_8k.trainImages.txt'
train_image_ids = load_set_of_image_ids(train)
print('Training images found: ',len(train_image_ids))

# load training descriptions
train_descriptions = load_clean_descriptions('drive/My Drive/image_captioning/descriptions.txt',train_image_ids)
print('training descriptions loaded: ',len(train_descriptions))

# load training image features
train_features = load_image_features('drive/My Drive/image_captioning/Flicker_dataset_image_features.pkl',train_image_ids)
print('training features loaded: ',len(train_features))

train_descriptions

In [None]:
'''
converts a dictionary called descriptions into a list of
all the descriptions.
'''
def to_list(descriptions):
  all_desc_list = []
  for k,v in descriptions.items():
    for desc in v:
      all_desc_list.append(desc)
  return all_desc_list


'''
first we convert the discriptions dictionary into a list.
Then we create a 'Tokenizer' object and call the 'fit_on_texts'
method, passing the list as argument. 
Tokenizer then creates a mapping from words to integers.
'''
def tokenization(descriptions):
  # list of all the descriptions
  all_desc_list = to_list(descriptions)  
  tokenizer = Tokenizer()
  tokenizer.fit_on_texts(all_desc_list)
  return tokenizer

# create tokenizer
tokenizer = tokenization(train_descriptions)

# word index is the dictionary /mappings of word-->integer
# word_index maps words to integers. Integers are assigned in 
# such a way that the most frequently occuring word have lowest
# integer values. +1 cuz tokenizer object assignes integer value 
# of 0 to words that are not in vocabulory.
vocab_size = len(tokenizer.word_index)+1
print('Vocab size: ',vocab_size)

def max_length(descriptions):
  all_desc_list = to_list(descriptions)
  return (max(len(x.split()) for x in all_desc_list))


def create_sequences(tokenizer,desc_list,max_len,photo):
  X1,X2,y = [],[],[]
  # X1 will contain photo
  # X2 will contain current sequence
  # y will contain one hot encoded next word

  for desc in desc_list:
    # tokenize descriptions
    seq = tokenizer.texts_to_sequences([desc])[0]
    for i in range(1,len(seq)):
      # out seq is basically the next word in the sentence
      in_seq,out_seq = seq[:i],seq[i]
      # pad input sequence
      in_seq = pad_sequences([in_seq],maxlen=max_len)[0]
      # one hot encode output sequence
      out_seq = to_categorical([out_seq],num_classes=vocab_size)[0]
      X1.append(photo)
      X2.append(in_seq)
      y.append(out_seq)
  return np.array(X1),np.array(X2),np.array(y)

# maximum length that a description can have OR the biggest description we are having
max_len = max_length(train_descriptions)
print(max_len)

In [None]:
def data_generator(descriptions,photos,tokenizer,max_len):
  while 1:
    for k,desc_list in descriptions.items():
      photo = photos[k][0]
      in_img,in_seq,out_seq = create_sequences(tokenizer,desc_list,max_len,photo)
      yield[[in_img,in_seq],out_seq]

def define_model(vocab_size, max_length):
    # image features extractor model
    inputs1 = Input(shape=(4096,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)
 
    # input sequence model
    inputs2 = Input(shape=(max_length,))
     # embedding(input_dimension,output_dimension,)
     # input dim is always the vocabulary size 
    # output dimension tells the size of vector space in which the words will be embedded
    # mask zero is used when the input itself is 0 then to not confuse it with padded zeros it is used as True
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)

    # decoder model OR output word model
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)

    # tie it together [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam')

    # summarize model
    print(model.summary())
    return model

In [None]:
model = define_model(vocab_size,max_len)
epochs = 20
steps = len(train_descriptions)
for i in range(epochs):
  generator = data_generator(train_descriptions,train_features,tokenizer,max_len)
  model.fit_generator(generator,epochs=1,steps_per_epoch=steps,verbose=1)
  model.save('drive/My Drive/image_captioning/model_'+str(i)+'.h5')

In [None]:
def int2word(tokenizer,integer):
  for word,index in tokenizer.word_index.items():
    if index==integer:
      return word
  return None

def predict_desc(model,tokenizer,photo,max_len):
  in_seq = 'startseq'
  for i in range(max_len):
    seq = tokenizer.texts_to_sequences([in_seq])[0]
    seq = pad_sequences([seq],maxlen=max_len)
    y_hat = model.predict([photo,seq],verbose=0)
    y_hat = np.argmax(y_hat)
    word = int2word(tokenizer,y_hat)
    if word==None:
      break
    in_seq = in_seq+' '+word
    if word=='endseq':
      break
  return in_seq

def evaluate_model(model,descriptions,photos,tokenizer,max_len):
  actual,predicted = [],[]
  for key,desc in descriptions.items():
    y_hat = predict_desc(model,tokenizer,photos[key],max_len)
    references = [d.split() for d in desc]
    actual.append(references)
    predicted.append(y_hat.split())
  print('BLEU-1: %f' %corpus_bleu(actual,predicted,weights=(1.0,0,0,0)))
  print('BLEU-2: %f' %corpus_bleu(actual,predicted,weights=(0.5,0.5,0,0)))
  print('BLEU-3: %f' %corpus_bleu(actual,predicted,weights=(0.33,0.33,0.33,0)))
  print('BLEU-4: %f' %corpus_bleu(actual,predicted,weights=(0.25,0.25,0.25,0.25)))

In [None]:
####################  load training data (6k)  ##########################
train = 'drive/My Drive/image_captioning/Flicker8k/Flickr_8k.trainImages.txt'
train_image_ids = load_set_of_image_ids(train)
print('Training images found: ',len(train_image_ids))

# load training descriptions
train_descriptions = load_clean_descriptions('drive/My Drive/image_captioning/descriptions.txt',train_image_ids)
print('training descriptions loaded: ',len(train_descriptions))

tokenizer = tokenization(train_descriptions)
max_len = max_length(train_descriptions)

####################  load test data  ##########################
test = 'drive/My Drive/image_captioning/Flicker8k/Flickr_8k.testImages.txt'
test_image_ids = load_set_of_image_ids(test)
print('Test images found: ',len(test_image_ids))

# load test descriptions
test_descriptions = load_clean_descriptions('drive/My Drive/image_captioning/descriptions.txt',test_image_ids)
print('test descriptions loaded: ',len(test_descriptions))

# load test image features
test_features = load_image_features('drive/My Drive/image_captioning/Flicker_dataset_image_features.pkl',test_image_ids)
print('training features loaded: ',len(test_features))
#################################################################
filename = 'drive/My Drive/image_captioning/model_18.h5'
model = load_model(filename)
evaluate_model(model,test_descriptions,test_features,tokenizer,max_len)

In [None]:
img_to_test = 'drive/My Drive/image_captioning/983801190.jpg'
img = plt.imread(img_to_test)
plt.imshow(img)

def extract_features(filename):
    # load the model
    model = VGG16()
    # re-structure the model
    model.layers.pop()
    model = Model(inputs=model.inputs, outputs=model.layers[-1].output)
    # load the photo
    image = load_img(filename, target_size=(224, 224))
    # convert the image pixels to a numpy array
    image = img_to_array(image)
    # reshape data for the model
    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    # prepare the image for the VGG model
    image = preprocess_input(image)
    # get features
    feature = model.predict(image, verbose=0)
    return feature

# pre-define the max sequence length (from training)
max_length = 34
# load the model
model = load_model('drive/My Drive/image_captioning/model_18.h5')
# load and prepare the photograph
photo = extract_features(img_to_test)
# generate description
description = predict_desc(model, tokenizer, photo, max_length)

description = ' '.join(description.split()[1:-1])
print()
print(description)