In [1]:
import os
from pickle import dump, load
import numpy as np
from tqdm.notebook import tqdm

from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add

2023-02-10 14:15:59.169327: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
BASE_DIR = os.getcwd() + '/kaggle/'

In [3]:
# extract features from each photo in the directory
def extract_features(directory):
    # load the model
    model = VGG16()
    model = Model(inputs=model.inputs, outputs=model.layers[-2].output)

    # extract feature from image
    features = {}
    for img_name in tqdm(os.listdir(directory)):
        img_path = directory + '/' + img_name
        image = load_img(img_path, target_size=(224, 224))
        # convert image pixels to numpy array
        image = img_to_array(image)
        # reshape data for model
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        # preprocess image for vgg
        image = preprocess_input(image)
        # extract features
        feature = model.predict(image, verbose = 0)
        # get image ID
        image_id = img_name.split('.')[0]
        # store feature
        features[image_id] = feature
    return features

# extract features from all images
directory = BASE_DIR + 'Flicker8k_Dataset'
features = extract_features(directory)
print('Extracted Features: %d' % len(features))
# save to file
dump(features, open(BASE_DIR + 'features.pkl', 'wb'))

2023-02-10 14:16:27.737130: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


  0%|          | 0/8091 [00:00<?, ?it/s]

Extracted Features: 8091


In [5]:
def load_doc(filename):
    # load the captions Data
    file = open(os.path.join(BASE_DIR, filename), 'r')
    captions_doc = file.read()
    file.close()
    return captions_doc

filename = 'Flickr8k_text/Flickr8k.token.txt'
text_val = load_doc(filename)

def load_captions(text_val):
    # create mapping of image to captions
    mapping = {}
    # process lines
    for line in tqdm(text_val.split('\n')):
        # split the line by comma(,)
        tokens = line.split(',')
        if len(line) < 2:
            continue
        image_id, caption = tokens[0], tokens[1:]
        # remove extensions from image ID
        image_id = image_id.split('.')[0]
        # convert caption list to string
        caption = " ".join(caption)
        # create list if needed
        if image_id not in mapping:
            mapping[image_id] = []
        #store the caption
        mapping[image_id].append(caption)
    return mapping

# parse captions
captions = load_captions(text_val)
print('total loaded captions: %d ' % len(captions))

  0%|          | 0/40461 [00:00<?, ?it/s]

total loaded captions: 8092 


In [6]:
import string

def clean_captions(captions):
    # prepare translation table for removing punctuation
    table = str.maketrans('', '', string.punctuation)
    for key, cap_list in captions.items():
        for i in range(len(cap_list)):
            # take one caption at a time
            captionval = cap_list[i]
            captionval = captionval.split()
            # preprocessing steps
            # convert to lowercase
            captionval = [word.lower() for word in captionval]
            # replace digits, special chars, etc.,
            # remove punctuation from each token
            captionval = [w.translate(table) for w in captionval]
            # remove hanging 's' and 'a'
            captionval = [word for word in captionval if len(word)>1]
            # remove tokens with numbers in them
            captionval = [word for word in captionval if word.isalpha()]
            # store as string
            cap_list[i] =  ' '.join(captionval)

# clean captions
clean_captions(captions)

In [7]:
# covert the loaded captions into a vocabulary of words
def to_vocab(captions):
    all_caption = set()
    for key in captions.keys():
        [all_caption.update(c.split()) for c in captions[key]]
    return all_caption

print('vocabulary size: %d ' % len(to_vocab(captions)))

# save captions to file, one per line
def save_captions(captions, filename):
    lines = []
    file = open(os.path.join(BASE_DIR, filename), 'w')
    for key, cap_list in tqdm(captions.items()):
        for cap in cap_list:
            lines.append(key + ' ' + cap)
            data = '\n'.join(lines)
            file.write(data)
    file.close()

save_captions(captions, 'descriptions.txt')

vocabulary size: 2104 


  0%|          | 0/8092 [00:00<?, ?it/s]

In [8]:
def load_doc(filename):
    # open the file as read only
    file = open(filename, 'r')
    # read all text
    text = file.read()
    # close the file
    file.close()
    return text

# load a pre-defined list of photo identifiers
def load_set(filename):
    doc = load_doc(filename)
    dataset = list()
    # process line by line
    for line in doc.split('\n'):
        # skip empty lines
        if len(line) < 1:
            continue
        # get the image identifier
        identifier = line.split('.')[0]
        dataset.append(identifier)
    return set(dataset)

In [None]:
# load clean descriptions into memory
def load_clean_descriptions(filename, dataset):
    # load document
    doc = load_doc(filename)
    descriptions = {}
    for line in doc.split('\n'):
        # split line by white space
        tokens = line.split()
        # split id from description
        image_id, image_desc = tokens[0], tokens[1:]
        # skip images not in the set
        if image_id in dataset:
            # create list
            if image_id not in descriptions:
                descriptions[image_id] = list()
            # wrap description in tokens
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
            # store
            descriptions[image_id].append(desc)
    return descriptions

# load photo features
def load_photo_features(filename, dataset):
    # load all features
    all_features = load(open(filename, 'rb'))
    # filter features
    features = {k: all_features[k] for k in dataset}
    return features

# load training dataset (6K)
filename = BASE_DIR + 'Flickr8k_text/Flickr_8k.trainImages.txt'
train = load_set(filename)
print('Dataset: %d' % len(train))
# descriptions
train_descriptions = load_clean_descriptions(BASE_DIR + 'descriptions.txt', train)
print('Descriptions: train=%d' % len(train_descriptions))
# photo features
train_features = load_photo_features(BASE_DIR + 'features.pkl', train)
print('Photos: train=%d' % len(train_features))

Dataset: 6000


In [None]:
# convert a dictionary of clean descriptions to a list of descriptions
def to_lines(descriptions):
    all_desc = list()
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

# fit a tokenizer given caption descriptions
def create_tokenizer(descriptions):
    lines = to_lines(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer

# prepare tokenizer
tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' % vocab_size)

In [None]:
# create sequences of images, input sequences and output words for an image
def create_sequences(tokenizer, max_length, descriptions, photos, vocab_size):
    X1, X2, y = list(), list(), list()
    # walk through each image identifier
    for key, desc_list in descriptions.items():
        # walk through each description for the image
        for desc in desc_list:
            # encode the sequence
            seq = tokenizer.texts_to_sequences([desc])[0]
            # split one sequence into multiple X,y pairs
            for i in range(1, len(seq)):
                # split into input and output pair
                in_seq, out_seq = seq[:i], seq[i]
                # pad input sequence
                in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                # encode output sequence
                out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                # store
                X1.append(photos[key][0])
                X2.append(in_seq)
                y.append(out_seq)
    return array(X1), array(X2), array(y)