In [None]:
import os
import numpy as np
from tqdm.notebook import tqdm

import warnings
warnings.filterwarnings('ignore')

from tensorflow.keras.applications.resnet50 import  preprocess_input
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add

In [None]:
## Taking data from git and unziping
!wget -q https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_Dataset.zip
!wget -q https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_text.zip
!unzip -qq Flickr8k_Dataset.zip
!unzip -qq Flickr8k_text.zip

replace Flicker8k_Dataset/1000268201_693b08cb0e.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: A
replace CrowdFlowerAnnotations.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: A


In [None]:
images_path = './Flicker8k_Dataset'
captions_file = './Flickr8k.token.txt'
captions_path = '/content/sample_data/captions.txt'

In [None]:
model = ResNet50()

In [None]:
## takin inputs from model and ouput from model excluding two layers
model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
print(model.summary())

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 230, 230, 3)  0           ['input_1[0][0]']                
                                                                                                  
 conv1_conv (Conv2D)            (None, 112, 112, 64  9472        ['conv1_pad[0][0]']              
                                )                                                                 
                                                                                              

In [None]:
features = {}
for img_name in tqdm(os.listdir(images_path)):

    # load the image from file
    img_path = images_path + '/' + img_name
    image = load_img(img_path, target_size=(224, 224))

    # convert image pixels to numpy array
    image = img_to_array(image)

    # reshape data for model
    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    # image = image.reshape((1, 224,224,3))

    # preprocess image for vgg
    image = preprocess_input(image)

    # extract features
    # feature = model.predict(image, verbose=0)
    feature = model.predict(image, verbose=0)

    # get image ID
    image_id = img_name.split('.')[0]

    # store feature
    features[image_id] = feature

  0%|          | 0/8091 [00:00<?, ?it/s]

In [None]:
##converting into pickel format
import pickle
pickle.dump(features, open('./features.pkl', 'wb'))

In [None]:
### loading the features
with open('./features.pkl', 'rb') as f:
    features = pickle.load(f)

In [None]:
## loading captions
with open(captions_path, 'r') as f:
    next(f)
    captions_doc = f.read()

In [None]:
### mapping images and captions together
mapping = {}

for line in tqdm(captions_doc.split('\n')):

    # split the line by comma(,)
    tokens = line.split(',')

    if len(line) < 2:
        continue

    image_id, caption = tokens[0], tokens[1:]

    # remove extension from image ID
    image_id = image_id.split('.')[0]

    # convert caption list to string
    caption = " ".join(caption)

    # create list if needed
    if image_id not in mapping:
        mapping[image_id] = []

    # store the caption
    mapping[image_id].append(caption)

In [None]:
## cheking length
len(mapping)

In [None]:
## cleanining the mapping by removing special charc
def clean(mapping):

    for key, captions in mapping.items():

        for i in range(len(captions)):

            # take one caption at a time
            caption = captions[i]

            # preprocessing steps
            # convert to lowercase
            caption = caption.lower()

            # delete digits, special chars, etc.,
            caption = caption.replace('[^A-Za-z]', '')

            # delete additional spaces
            caption = caption.replace('\s+', ' ')

            # add start and end tags to the caption
            caption = 'startseq ' + " ".join([word for word in caption.split() if len(word)>1]) + ' endseq'
            captions[i] = caption

In [None]:
##passing id of image and there captions
mapping['1000268201_693b08cb0e']

In [None]:
##cleaing the mapping
# preprocess the text
clean(mapping)

In [None]:
# after preprocess of text
mapping['1000268201_693b08cb0e']

In [None]:
### appending all captions with key(image id)
all_captions = []
for key in mapping:
    for caption in mapping[key]:
        all_captions.append(caption)

In [None]:

len(all_captions)

In [None]:
all_captions[:5]

In [None]:
## converting each captions into token(each word stored in tokens)
# tokenize the text
tokenizer = Tokenizer()
tokenizer.fit_on_texts(all_captions)
vocab_size = len(tokenizer.word_index) + 1
vocab_size


In [None]:
# get maximum length of the caption available
max_length = max(len(caption.split()) for caption in all_captions)
max_length

In [None]:
image_ids = list(mapping.keys())
split = int(len(image_ids) * 0.90)
train = image_ids[:split]
test = image_ids[split:]

In [None]:

print(len(train))
print(len(test))

In [None]:

### data generator will get data in batches
def data_generator(data_keys, mapping, features, tokenizer, max_length, vocab_size, batch_size):

    # loop over images
    X1, X2, y = list(), list(), list()
    n = 0

    while 1:
        for key in data_keys:

            n += 1
            captions = mapping[key]

            # process each caption
            for caption in captions:

                # encode the sequence
                seq = tokenizer.texts_to_sequences([caption])[0]

                # split the sequence into X, y pairs
                for i in range(1, len(seq)):

                    # split into input and output pairs
                    in_seq, out_seq = seq[:i], seq[i]

                    # pad input sequence
                    in_seq = pad_sequences([in_seq], maxlen=max_length)[0]

                    # encode output sequence
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]

                    # store the sequences
                    X1.append(features[key][0])
                    X2.append(in_seq)
                    y.append(out_seq)

            if n == batch_size:
                X1, X2, y = np.array(X1), np.array(X2), np.array(y)
                yield [X1, X2], y
                X1, X2, y = list(), list(), list()
                n = 0


In [None]:
# Encoder - Contextual representation
### LSTM model input(modelinput caption) output(LSTMinput caption) encoder getting output from decoder
# image feature layers
# inputs1 = Input(shape=(4096,))
inputs1 = Input(shape=(2048,))

fe1 = Dropout(0.4)(inputs1)
fe2 = Dense(256, activation='relu')(fe1)

# sequence feature layers
inputs2 = Input(shape=(max_length,))

se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
se2 = Dropout(0.4)(se1)
se3 = LSTM(256)(se2)

# Decoder - Generates o/p
decoder1 = add([fe2, se3])
decoder2 = Dense(256, activation='relu')(decoder1)

# output
outputs = Dense(vocab_size, activation='softmax')(decoder2)

model = Model(inputs=[inputs1, inputs2], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam')

print(model.summary())

In [None]:
##model plot
plot_model(model, show_shapes=True, show_layer_names=False, show_layer_activations=True, dpi=64)

In [None]:
## training the model
from tensorflow import keras
# train the model
epochs = 2
batch_size = 128
steps = len(train)//batch_size

es = keras.callbacks.EarlyStopping(monitor='loss', restore_best_weights=True, patience=4)
for i in range(epochs):

    # create data generator
    generator = data_generator(train, mapping, features, tokenizer, max_length, vocab_size, batch_size)


    model.fit(generator, epochs=1, steps_per_epoch=steps, verbose=1,callbacks=[es])

In [None]:
model.save('./initial_model.h5')

In [None]:

def idx_to_word(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None


In [None]:
# generate caption for an image
def predict_caption(model, image, tokenizer, max_length):

    # add start tag for generation process
    in_text = 'startseq'

    # iterate over the max length of sequence
    for i in range(max_length):

        # encode input sequence
        sequence = tokenizer.texts_to_sequences([in_text])[0]

        # pad the sequence
        sequence = pad_sequences([sequence], max_length)

        # predict next word
        yhat = model.predict([image, sequence], verbose=0)

        # get index with high probability
        yhat = np.argmax(yhat)

        # convert index to word
        word = idx_to_word(yhat, tokenizer)

        # stop if word not found
        if word is None:
            break

        # append word as input for generating next word
        in_text += " " + word

        # stop if we reach end tag
        if word == 'endseq':
            break

    return in_text

In [None]:
from nltk.translate.bleu_score import corpus_bleu
# validate with test data
actual, predicted = list(), list()

for key in tqdm(test):

    # get actual caption
    captions = mapping[key]

    # predict the caption for image
    y_pred = predict_caption(model, features[key], tokenizer, max_length)

    # split into words
    actual_captions = [caption.split() for caption in captions]
    y_pred = y_pred.split()

    # append to the list
    actual.append(actual_captions)

    predicted.append(y_pred)

# calcuate BLEU score
print("BLEU: %f" % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))

In [None]:
from PIL import Image
import matplotlib.pyplot as plt

def generate_caption(image_name):

    # load the image
    image_id = image_name.split('.')[0]
    img_path = os.path.join(images_path, image_name)
    image = Image.open(img_path)
    captions = mapping[image_id]
    print('=============================== Actual ===============================')
    for caption in captions:
        print(caption)

    # predict the caption
    y_pred = predict_caption(model, features[image_id], tokenizer, max_length)
    print('      ')
    print('============================== Predicted ==============================')
    print(y_pred)
    plt.imshow(image)

In [None]:
generate_caption("1028205764_7e8df9a2ea.jpg")

In [None]:

generate_caption("101669240_b2d3e7f17b.jpg")

In [None]:
generate_caption("1019604187_d087bf9a5f.jpg")