In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:

import numpy as np
from PIL import Image
import os
import string
from pickle import dump
from pickle import load
import tensorflow as tf
from tqdm import tqdm
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, concatenate, add
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout

In [3]:
# Set the path to the image directory and caption file
image_dir = '/content/drive/MyDrive/flickr8/Images'
caption_file = '/content/drive/MyDrive/flickr8/captions.txt'

In [4]:
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D
def extract_features(directory):
    base_model = ResNet50(weights='imagenet')
    feature_extractor = Model(inputs=base_model.input, outputs=base_model.layers[-2].output)
    features = {}
    for img in tqdm(os.listdir(image_dir)):
        filename = os.path.join(image_dir, img)
        image = load_img(filename, target_size=(224, 224))
        image = img_to_array(image)
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        image = preprocess_input(image)
        feature = feature_extractor.predict(image, verbose=0)
        image_id = img.split('.')[0]
        features[image_id] = feature
    return features

In [5]:
# Extract features from images and save to pickle file
dataset_images = '/content/drive/MyDrive/Flickr8k_Dataset/Images'
features = extract_features(dataset_images)
dump(features, open("features.p", "wb"))

# Load features from pickle file
features = load(open("features.p", "rb"))
print('Extracted Features: %d' % len(features))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels.h5


100%|██████████| 8091/8091 [14:05<00:00,  9.57it/s]


Extracted Features: 8091


In [6]:
feature_shape = features['1000268201_693b08cb0e'].shape

In [7]:
captions_file = open(caption_file, "r")
FileContent = captions_file.read()
captions_dict = {}
for line in (FileContent.split('\n')):
    tokens = line.split(',')
    image_id, caption = tokens[0], tokens[1:]
    if len(line) < 2:
      continue
    image_id = image_id.split('.')[0]
    caption = " ".join(caption)
    if image_id not in captions_dict:
        captions_dict[image_id] = []
    captions_dict[image_id].append('<start> ' + caption.strip() + ' <end>')
del captions_dict["image"]
len(captions_dict)

8091

In [8]:
def text_cleaning(captions_dict):
    for key in captions_dict:
        captions = captions_dict[key]
        for i in range(len(captions)):
            caption = captions[i]
            caption = caption.lower()
            cleaned_caption = ""
            for char in caption:
                if char.isalpha() or char == " ":
                    cleaned_caption += char
            captions[i] = cleaned_caption

In [9]:
text_cleaning(captions_dict)
captions = list(captions_dict.values())
all_captions = [element for sublist in captions for element in sublist]
tokenizer = Tokenizer()
tokenizer.fit_on_texts(all_captions)
vocab_size = tokenizer.num_words + 1 if tokenizer.num_words else len(tokenizer.word_index) + 1
max_length = max(map(lambda caption: len(caption.split()), all_captions))

In [10]:
image_ids = [*captions_dict]
split = int(len(image_ids) * 0.75)
train, test = image_ids[:split], image_ids[split:]

In [11]:
def data_generator(data_keys, captions_dict, features, tokenizer, max_length, vocab_size, batch_size):
    while True:
        X1, X2, y = [], [], []
        n = 0
        for key in data_keys:
            n += 1
            captions = captions_dict[key]
            for caption in captions:
                seq = tokenizer.texts_to_sequences([caption])[0]
                for i in range(1, len(seq)):
                  in_seq = pad_sequences([seq[:i]], maxlen=max_length)[0]
                  out_seq = to_categorical([seq[i]], num_classes=vocab_size)[0]
                  X1.append(features[key][0])
                  X2.append(in_seq)
                  y.append(out_seq)
            if n == batch_size:
              yield [np.array(X1), np.array(X2)], np.array(y)
              X1, X2, y = [], [], []
              n = 0

In [12]:
enocder_input = feature_shape[1]
embedd_dim = 150
number_units = 256

inputs1 = Input(shape=(enocder_input,))
drop_out = Dropout(0.2)(inputs1)
layer2 = Dense(number_units, activation='relu')(drop_out)

inputs2 = Input(shape=(max_length,))
embed_layer = Embedding(vocab_size, embedd_dim, mask_zero=True)(inputs2)
drop_out2 = Dropout(0.2)(embed_layer)
layer_2 = LSTM(number_units)(drop_out2)
decoder1 = add([layer2, layer_2])
decoder2 = Dense(number_units, activation='relu')(decoder1)
outputs = Dense(vocab_size, activation='softmax')(decoder2)

model = Model(inputs=[inputs1, inputs2], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam')

In [13]:
epochs = 20
batch_size = 32
steps = len(train) // batch_size

for i in range(epochs):
    # create data generator
    generator = data_generator(train, captions_dict, features, tokenizer, max_length, vocab_size, batch_size)
    # fit for one epoch
    model.fit(generator, epochs=1, steps_per_epoch=steps, verbose=1)



In [14]:
def predict_caption(model, image, tokenizer, max_length):
    in_text = 'startseq'
    for _ in range(max_length):
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        sequence = pad_sequences([sequence], max_length)
        y_est = np.argmax(model.predict([image, sequence], verbose=0))
        word = tokenizer.index_word.get(y_est)
        if not word:
            break
        in_text += " " + word
        if word == 'endseq':
            break

    return in_text

In [15]:
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
smoothie = SmoothingFunction().method1
actual, predicted = [], []
for i, sample in enumerate(test[:100]):
    captions = captions_dict[sample]
    y_pred = predict_caption(model, features[sample], tokenizer, max_length)
    actual_captions = [caption.split() for caption in captions]
    y_pred = y_pred.split()
    actual.append(actual_captions)
    predicted.append(y_pred)
bleu_score = corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0), smoothing_function=smoothie)
print("BLEU-1: %f" % bleu_score)

BLEU-1: 0.131842
