In [None]:
import numpy as np
from PIL import Image
import os
import string
import tensorflow as tf
import keras
import re
from pickle import dump, load
from tqdm.notebook import tqdm
tqdm().pandas()

In [None]:
# Directory name of the text data
dataset_text_dirname = "Flickr8k_text"
# Directory name of the image data
dataset_image_dirname = "Flickr8k_Dataset/Flicker8k_Dataset"

In [None]:
# Utility

def load_file(filepath: str) -> str:
    """
    @param filepath path of the file to be loaded
    @return the contents of the file
    """
    file = open(filepath, "r")
    text = file.read()
    file.close()
    return text

def get_img_ids(dataset_text_filepath: str) -> list[str]:
    """
    @param dataset_text_filepath path of the file containing image ids
    @return list of the image ids
    """
    text = load_file(dataset_text_filepath)
    img_ids = text.split()
    return img_ids

In [None]:
img_name_filename = "Flickr_8k.trainImages.txt"
img_name_path = dataset_text_dirname + "/" + img_name_filename
img_ids = get_img_ids(img_name_path)

In [None]:
# Process text dataset into descriptions

def get_descriptions(dataset_text_filepath: str, img_ids: list[str] = [], cleaned: bool = False) -> dict[str, list[str]]:
    """
    Load the descriptions from the text dataset into a dictionary

    @param dataset_text_filepath path of the file containing image ids and captions
        >>> file contents
        1000268201_693b08cb0e.jpg#0\tA child in a pink dress is climbing up a set of stairs in an entry way .\n
        1000268201_693b08cb0e.jpg#1\tA girl going into a wooden building .\n
        ...
    @param img_ids list of image ids the dictionary should have. This can add and remove items from the dictionary. If empty return all
    @param cleaned True if the descriptions to be read have already been processed and cleaned
    @return dictionary of descriptions (key: image id -> value: array of image captions)
    """
    text = load_file(dataset_text_filepath)
    entries = text.split("\n")

    descriptions = {}
    for entry in entries:
        if entry == "":
            continue
        img_id, caption = entry.split("\t")
        if not cleaned:
            # Strip numbers off id (ie 1000268201_693b08cb0e.jpg#0 -> 1000268201_693b08cb0e.jpg)
            img_id = img_id[:-2]
        if img_id not in descriptions:
            descriptions[img_id] = [caption]
        else:
            descriptions[img_id].append(caption)

    # Ensure descriptions has inputted image ids
    if len(img_ids) > 0:
        filtered_descriptions = {}
        for img_id in img_ids:
            if img_id in descriptions:
                filtered_descriptions[img_id] = descriptions[img_id]
            else:
                filtered_descriptions[img_id] = []
        return filtered_descriptions

    else:
        return descriptions

def clean_descriptions(descriptions: dict[str, list[str]]) -> None:
    """
    Clean the entries in the descriptions dictionary in-place.
    Convert all letters to lowercase, removes punctuation, removes hanging "s" and "a"s,
    removes words containing numbers, and removes duplicate whitespace

    @param descriptions a dictionary (key: image id -> value: array of image captions)
    """
    for img_id, captions in descriptions.items():
        for i, caption in enumerate(captions):
            # Convert to lowercase
            caption = caption.lower()

            # Remove punctuation
            caption = caption.translate(str.maketrans("", "", string.punctuation))

            # Remove hanging "s" and "a"s
            words = caption.split()
            caption = " ".join([word for word in words if word not in ["a", "s"]])

            # Remove words with letters
            caption = re.sub(r"\w*\d\w*", "", caption)

            # Remove duplicate whitespace
            caption = " ".join(caption.split())

            descriptions[img_id][i] = caption

def save_descriptions(filepath: str, descriptions: dict[str, list[str]]) -> None:
    """
    Write the descriptions back to a file

    @param filepath the name of the file to write the descriptions to
    @param descriptions a dictionary (key: image id -> value: array of image captions)
    """
    lines = list()
    for img_id, captions in descriptions.items():
        for caption in captions:
            description = img_id + "\t" + caption
            lines.append(description)

    data = "\n".join(lines)
    file = open(filepath, "w")
    file.write(data)
    file.close()

def dict_to_list(descriptions: dict[str, list[str]]) -> list[str]:
    """
    @param descriptions a dictionary (key: image id -> value: array of image captions)
    @return list of all captions in descriptions
    """
    descriptions_list = []
    for key, captions in descriptions.items():
        for caption in captions:
            descriptions_list.append(caption)
    return descriptions_list

In [None]:
descriptions_filename = "descriptions.txt"

if os.path.isfile(descriptions_filename):
    descriptions = get_descriptions(descriptions_filename, img_ids, True)

else:
    text_filename = "Flickr8k.token.txt"
    text_path = dataset_text_dirname + "/" + text_filename

    descriptions = get_descriptions(text_path)

    clean_descriptions(descriptions)
    
    save_descriptions(descriptions_filename, descriptions)

In [None]:
# Process image dataset into features

def extract_features(dataset_img_dirpath: str) -> dict[str, np.ndarray]:
    """
    Load the features from the images in the image dataset into a dictionary
    
    @param dataset_img_dirpath path to the directory containing the images
    @return dictionary of features (key: image id -> value: numpy ndarray of features)
    """
    model = tf.keras.applications.xception.Xception(include_top=False, pooling="avg")
    
    features = {}
    images = os.listdir(dataset_img_dirpath)

    for img_id in tqdm(images):
        img_path = dataset_img_dirpath + "/" + img_id
        img = Image.open(img_path)
        img = img.resize((299, 299))
        img = np.expand_dims(img, axis=0)
        img = img / 127.5
        img = img - 1.0
        feature = model.predict(img)
        features[img_id] = feature

    return features

def save_features(filepath: str, features: dict[str, np.ndarray]) -> None:
    """
    Write the features back to a file

    @param filepath the name of the file to write the features to
    @param features dictionary of features (key: image id -> value: numpy ndarray of features)
    """
    dump(features, open(filepath, "wb"))

def load_features(filepath: str, img_ids: list[str] = []) -> dict[str, np.ndarray]:
    """
    Read the features from a file

    @param filepath the name of the file to read the features from
    @param img_ids list of image ids to get the features for. If empty list return all
    @return features dictionary of features (key: image id -> value: numpy ndarray of features)
    """
    features = load(open(filepath, "rb"))
    if len(img_ids) > 0:
        features = {img_id : features[img_id] for img_id in img_ids if img_id in features}
    return features

In [None]:
features_filename = "features.p"

if os.path.isfile(features_filename):
    features = load_features(features_filename, img_ids)
    
else:
    features = extract_features(dataset_image_dirname)
    
    save_features(features_filename, features)

In [None]:
# Tokenize descriptions

def create_tokenizer(descriptions: dict[str, list[str]]) -> tf.keras.preprocessing.text.Tokenizer:
    """
    @param descriptions a dictionary (key: image id -> value: array of image captions)
    @return tokenizer tool that stores every word in the vocabuluary at an unique index
    """
    descriptions_list = dict_to_list(descriptions)
    tokenizer = tf.keras.preprocessing.text.Tokenizer()
    tokenizer.fit_on_texts(descriptions_list)
    return tokenizer

def save_tokenizer(filepath: str, tokenizer: tf.keras.preprocessing.text.Tokenizer) -> None:
    """
    Write the tokenizer to a file

    @param filepath the name of the file to write the tokens to
    @param tokenizer tool that stores every word in the vocabuluary at an unique index
    """
    dump(tokenizer, open(filepath, "wb"))

def load_tokenizer(filepath: str) -> tf.keras.preprocessing.text.Tokenizer:
    """
    Read the tokenizer from a file

    @param filepath the name of the file to read the tokenizer from
    @return tool that stores every word in the vocabuluary at an unique index
    """
    tokenizer = load(open(filepath, "rb"))
    return tokenizer

In [None]:
tokenizer_filename = "tokenizer.p"

if os.path.isfile(tokenizer_filename):
    tokenizer = load_tokenizer(tokenizer_filename)

else:
    tokenizer = create_tokenizer(descriptions)
    
    save_tokenizer(tokenizer_filename, tokenizer)

In [None]:
# Find vocabulary features

def get_vocab(descriptions: dict[str, list[str]]) -> set[str]:
    """
    @param descriptions a dictionary (key: image id -> value: array of image captions)
    @return set of all words used in captions
    """
    vocab = set()
    for img_id, captions in descriptions.items():
        [vocab.update(words.split()) for words in captions]
    return vocab

def get_max_length(descriptions: dict[str, list[str]]) -> int:
    """
    @param descriptions a dictionary (key: image id -> value: array of image captions)
    @return the length of the longest caption
    """
    descriptions_list = dict_to_list(descriptions)
    max_length = max(len(caption.split()) for caption in descriptions_list)
    return max_length

def get_vocab_size(tokenizer: tf.keras.preprocessing.text.Tokenizer) -> int:
    """
    @param tokenizer tool that stores every word in the vocabuluary at an unique index
    @return the number of unique words given all captions
    """
    vocab_size = len(tokenizer.word_index) + 1
    return vocab_size

In [None]:
# Create data generators to feed data into model

def create_sequences(captions: list[str],
                     feature: np.ndarray,
                     tokenizer: tf.keras.preprocessing.text.Tokenizer,
                     max_description_length: int,
                     vocab_size: int) -> tuple[np.array, np.array, np.array]:
    """
    Generate input/output sequences for a given image and caption set

    @param captions list of captions of an image
    @param feature feature array of an image
    @param tokenizer tool that stores every word in the vocabuluary at an unique index
    @param max_description_length the length of the longest caption
    @param vocab_size the number of unique words given all captions
    @return x1 2048 feature vector of image
            x2 input text sequence for image
            y predicted output text sequence for image
    """
    x1 = []
    x2 = []
    y = []

    for caption in captions:
        # Encode sequence
        seq = tokenizer.texts_to_sequences([caption])[0]

        # Divide sequence into x, y pairs
        for i in range(1, len(seq)):
            in_seq = seq[:i]
            out_seq = seq[i]
            in_seq = tf.keras.preprocessing.sequence.pad_sequences([in_seq], maxlen=max_description_length)[0]
            out_seq = tf.keras.utils.to_categorical([out_seq], num_classes=vocab_size)[0]

            x1.append(feature)
            x2.append(in_seq)
            y.append(out_seq)
    
    return (np.array(x1), np.array(x2), np.array(y))

def data_generator(descriptions: dict[str, list[str]],
                   features: dict[str, np.ndarray],
                   tokenizer: tf.keras.preprocessing.text.Tokenizer,
                   max_description_length: int,
                   vocab_size: int) -> list[list[np.array, np.array], np.array]:
    """
    Yield input/output sequences for every image and caption set

    @param descriptions a dictionary (key: image id -> value: array of image captions)
    @param features dictionary of features (key: image id -> value: numpy ndarray of features)
    @param tokenizer tool that stores every word in the vocabuluary at an unique index
    @param max_description_length the length of the longest caption
    @param vocab_size the number of unique words given all captions
    @return input_image 2048 feature vector of image
            input_sequence input text sequence for image
            output_seq predicted output text sequence for image
    """
    while True:
        for img_id, captions in descriptions.items():
            feature = features[img_id][0]
            input_img, input_seq, output_seq = create_sequences(captions, feature, tokenizer, max_description_length, vocab_size)
            yield [[input_img, input_seq], output_seq]

In [None]:
vocab_size = get_vocab_size(tokenizer)
max_description_length = get_max_length(descriptions)

[input_img, input_seq], output_seq = next(data_generator(descriptions, features, tokenizer, max_description_length, vocab_size))

In [None]:
# Define the CNN-RNN model

def define_model(max_description_length: int, vocab_size: int) -> tf.keras.models.Model:
    """
    Define the CNN-RNN model

    @param max_description_length the length of the longest caption
    @param vocab_size the number of unique words given all captions
    @return the CNN-RNN model
    """
    # Features from CNN model compress from 2048 -> 256 nodes
    inputs1 = tf.keras.layers.Input(shape=(2048,))
    fe1 = tf.keras.layers.Dropout(0.5)(inputs1)
    fe2 = tf.keras.layers.Dense(256, activation="relu")(fe1)

    # LSTM sequence model
    inputs2 = tf.keras.layers.Input(shape=(max_description_length,))
    se1 = tf.keras.layers.Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 =  tf.keras.layers.Dropout(0.5)(se1)
    se3 = tf.keras.layers.LSTM(256)(se2)

    # Merge both models
    decoder1 = tf.keras.layers.add([fe2, se3])
    decoder2 = tf.keras.layers.Dense(256, activation="relu")(decoder1)
    outputs = tf.keras.layers.Dense(vocab_size, activation="softmax")(decoder2)

    model = tf.keras.models.Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss="categorical_crossentropy", optimizer="adam")

    print(model.summary())

    return model

In [None]:
# Train the model

print("Dataset:", len(img_ids))
print("Descriptions:", len(descriptions))
print("Features:", len(features))
print("Vocabulary Size:", vocab_size)
print("Description Length:", max_description_length)

model = define_model(max_description_length, vocab_size)
epochs = 10
steps = len(descriptions)

model_dirname = "models"
if not os.path.exists(model_dirname):
    os.mkdir(model_dirname)

for i in range(epochs):
    model_name = model_dirname + "/model_" + str(i) + ".h5"

    generator = data_generator(descriptions, features, tokenizer, max_description_length, vocab_size)
    model.fit(generator, epochs=1, steps_per_epoch=steps, verbose=1)
    model.save(model_name)