In [None]:
import os
import pickle
import re
import numpy as np
# gives UI to see how much the data is processed till now
from tqdm import tqdm as tqdm

from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add


In [None]:
BASE_DIR= '/kaggle/input/flicker-8k-image-dataset-captionstxt'
WORKING_DIR= '/kaggle/working'

## Extract image features

In [None]:
# load VGG16 model

model= VGG16()

# restructure the model
model= Model(inputs= model.inputs, outputs= model.layers[-2].output) # we dont need the FCN of the VGG16 model we just need 
                                                                    # previous layer for features
    
# summarize
print(model.summary())

In [None]:
# extract features from image

features= {}
directory= os.path.join(BASE_DIR, 'Images')

for img_name in tqdm(os.listdir(directory)):
    # load the image from the file
    img_path= directory+ '/' + img_name
    image= load_img(img_path, target_size= (224,224))  # resizing the image 
    
    # convert image pixels to numpy array
    image= img_to_array(image)
    # reshape the data for model
    image= image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    
    # preprocess img for VGG 
    image= preprocess_input(image)
    # extract features
    feature= model.predict(image, verbose = 0)
    
    # get imag id
    image_id= img_name.split('.')[0]
    # store feature
    features[image_id]= feature

In [None]:
# store features in pickle
pickle.dump(features, open(os.path.join(WORKING_DIR, 'features.pkl'), 'wb'))

In [None]:
# load features from pickle
with open(os.path.join(WORKING_DIR, 'features.pkl'), 'rb') as f:
    features= pickle.load(f)

In [None]:
features['3226254560_2f8ac147ea'][0]

## Load the Caption Data

In [None]:
with open(os.path.join(BASE_DIR, 'captions.txt'), 'r') as f:
    # to skip the 1st line
    next(f)
    captions_doc= f.read()

In [None]:
# captions_doc

In [None]:
# create mapping of image to cations
mapping= {}

# process lines
for line in tqdm(captions_doc.split('\n')):
    # split the line by comma
    tokens= line.split(',')
    
    if len(line) < 2:
        continue
    image_id, caption= tokens[0], tokens[1:]
    
    # remove extension from imag_id
    image_id= image_id.split('.')[0]
    # convert cation list to string
    caption=  ' '.join(caption)
    # if theire are multiple cations for one image_id then store them all in one list of that image_id by creating list if needed
    if image_id not in mapping:
        mapping[image_id]= []
    # store the caption
    mapping[image_id].append(caption)
    

In [None]:
len(mapping)

In [None]:
mapping[image_id][0]

## Preprocess Text Data

In [None]:
# preprocessing the captions
def clean_captions(mapping):
    for key, captions in mapping.items():
        for i in range(len(captions)):
            # take one caption at a time
            caption= captions[i]
            
            # preprocessing step
            # conver to lower case
            caption= caption.lower()
            # have only alphabets
            caption= caption.replace('[^A-Za-z]', '')
            # delete additional space,if more than one space is present replace with single space
            caption= caption.replace('\s+', ' ')
            # add start and end tags to the caption, remove single char/smaller words
            caption = '<start> '+ ' '.join([word for word in caption.split() if len(word)>1]) + ' <end>'
            
            captions[i]= caption

In [None]:
# before preproces of text
mapping['1000268201_693b08cb0e']

In [None]:
# preprocess the text
clean_captions(mapping)

In [None]:
# After preprocessing
mapping['1000268201_693b08cb0e']

In [None]:
# create one single caption list
all_captions= []
for key in mapping:
    for caption in mapping[key]:
        all_captions.append(caption)

In [None]:
len(all_captions)

In [None]:
all_captions[:10]

In [None]:
# tokenize the text
tokenizer= Tokenizer()
tokenizer.fit_on_texts(all_captions)

vocab_size= len(tokenizer.word_index) + 1
vocab_size

In [None]:
# get max length of the caption available
max_len= max(len(caption.split()) for caption in all_captions)
max_len


In [None]:
# train test split
image_ids= list(mapping.keys())
split= int(len(image_ids) * 0.90)
split

In [None]:
train = image_ids[:split]
test= image_ids[split:]

In [None]:
# create data generator to get data in batch (to avaoid session crash)
def data_generator(data_keys, mapping, features, tokenizer, max_len, vocab_size, batch_size):
    
    # loop over images
    X1, X2, y= [],[],[]
    n= 0  # to chek if we reach the batch size or not
    
    while 1:
        for key in data_keys:
            n +=1
            captions= mapping[key]
            
            # process each caption
            for caption in captions:
                # encode the seq
                seq= tokenizer.texts_to_sequences([caption])[0]
                # split the seq into X, y pairs
                for i in range(1, len(seq)):
                    # split into input and output pairs
                    in_seq, out_seq= seq[:i], seq[i]
                    # pad input seq
                    in_seq= pad_sequences([in_seq], maxlen= max_len)[0]
                    # encode output seq
                    out_seq= to_categorical([out_seq], num_classes= vocab_size)[0]
                    
                    
                    # store the seq 
                    X1.append(features[key][0])
                    X2.append(in_seq)
                    y.append(out_seq)
                    
            if n == batch_size:
                X1, X2, y = np.array(X1), np.array(X2), np.array(y)
                print(X1, X2, y)
                # returns collected samples in the generator so that it will be consumed by the model
#                 yield [X1, X2], y
                X1, X2, y= [], [], []
                n= 0

In [None]:
mapping

In [None]:
# data_generator(train[:5], mapping, features, tokenizer, max_len, vocab_size, batch_size)

## Model Creation

In [None]:
# Encoder Model: 

#Image Feature Layers
inputs1= Input(shape= (4096,))
fe1= Dropout(0.4)(inputs1)
fe2= Dense(256, activation= 'relu')(fe1)

# sequence feature layer
inputs2= Input(shape= ( max_len,))
se1= Embedding(vocab_size, 256, mask_zero= True)(inputs2)
se2= Dropout(0.4)(se1)
se3= LSTM(256)(se2)

# Decoder Model:

# concat image and text features
decoder1= add([fe2, se3])
decoder2= Dense(256, activation= 'relu')(decoder1)
outputs= Dense(vocab_size, activation= 'softmax')(decoder2)


model= Model(inputs= [inputs1, inputs2], outputs= outputs)
model.compile(loss= 'categorical_cross_entropy', optimizer= 'adam')

# plot the model
plot_model(model, show_shapes= True)

In [None]:
# train the model
epochs= 15
batch_size= 64
# after each step it will do the back prop and woll fetch the data
steps= len(train) // batch_size

for i in range(epochs):
    X1, X2, y= data_generator(train, mapping, features, tokenizer, max_len, vocab_size, batch_size)
#     print(generator)
    # fit for one epoch
    model.fit([X1, X2], y, epochs= 1, steps_per_epoch= steps, verbose= 1)

In [None]:
# Save the model
model.save(WORKING_DIR + '/best_model.h5')

## Generate Captions for the image


In [None]:
def idx_to_word(integer, tokenizer):
    for word, idxin tokenizer.word_index.items():
        if index == integer:
            return word
    return None

In [None]:
## generate caption for an image
def predict_caption( model, image, tokenizer, max_len):
    # add start for a generation process
    in_text= '<start>'
    # iterate over the max_len of sequence
    for i in range(max_len):
        # encode inout into seq
        seq= tokenizer.text_to_sequences(in_text)[0]
        # pad the seq
        seq= pad_sequences([seq], maxlen)
        # predict next word
        ypred= model.predict([image, seq], verbose= 0)
        # get index with high prob 
        ypred= np.argmax(ypred)
        # convert index to word
        word= idx_to_word( ypred, tokenizer)
        # stop if word not found
        if word in None:
            break
        # append word as input for generating next word
        in_text += ' '+ word
        # stop if we reach end tag
        if word== '<end>':
            break
    return in_text

In [None]:
 from nltk.translate.bleu_score import corpus_bleu
# Validate with test data
actual, predicted= [], []
for key in tqdm(test):
    # get actual caption
    captions= mapping['key']
    # predict the cation for mapping
    y_pred= predict_caption(model, features[key], tokenizer, max_len)
    actual_caption= [caption.split() for caption in captions] 
    y_pred= y_pred.split()
    
    # append to the list
    actual.append(actual_caption)
    predicted.append(y_pred)
    
# calculate BLEU score
print('BLEU-1: %f'% corpus_bleu(actual, predicted, weights = [1.0, 0, 0, 0]))
print('BLEU-2: %f'% corpus_bleu(actual, predicted, weights = [0.5, 0.5, 0, 0]))   

## Visualize the Result

In [None]:
from PIL import Image
import matplotlib.pyplot as plt

def generate_caption(image_name):
    # load the image
#     image_name=  '1001773457_577c3a7d70.jpg'
    image_id= image_name.split('.')[0]
    img_path= os.path.join(BASE_DIR, 'Images', image_name)
    image= Image.open(img_path)
    captions= mapping[image_id]
    print('---------Actual ----------------')
    for caption in captions:
        print(captions)

    # predict the caption
    y_pred= predict_caption(model, features[image_id], tokenizer, max_len)
    print('---------Predicted ----------------')
    print(y_pred)
    plt.imshow(image)

In [None]:
generate_caption('1001773457_577c3a7d70.jpg')