In [2]:
import string
import numpy as np
from PIL import Image
import os
from pickle import dump, load
import numpy as np
import random

from keras.applications.xception import Xception, preprocess_input
from keras.preprocessing.image import load_img, img_to_array
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from keras.layers.merge import add
from keras.models import Model, load_model
from keras.layers import Input, Dense, LSTM, Embedding, Dropout

# small library for seeing the progress of loops.
from tqdm import tqdm_notebook as tqdm
tqdm().pandas()

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

0it [00:00, ?it/s]

In [3]:
DATASET_RAW = "../data/raw"
DATASET_INTERIM = "../data/interim"
CHECKPOINTS = "../checkpoints"

## Load & clean captions

In [4]:
def clean_caption(caption, table):
    caption.replace("-", " ")    # Replace "-" with " "
    words = caption.split()      # Split the words
    
    words = [word.lower() for word in words]    # Convert to lowercase
    words = [word.translate(table) for word in words]  # Remove punctuations
    words = [word for word in words if(len(word)>1)]   # Remove 's and a
    words = [word for word in words if(word.isalpha())] # Remove tokens with numbers
    
    return ' '.join(words)


def load_captions(filename):
    # Load the text file into memory
    file = open(filename, 'r')
    text = file.read()
    file.close()
    table = str.maketrans('','',string.punctuation)
    
    img_captions = dict()
    
    lines = text.split('\n')
    for line in lines[1:]:
        if len(line) == 0:
            continue
        img, caption = line.split(',', 1)
        # Clean the caption text
        caption = clean_caption(caption, table)
        if img in img_captions:
            img_captions[img].append(caption)
        else:
            img_captions[img] = [caption]
        
    return img_captions


def text_vocabulary(img_captions):
    vocab = set()
    for img in img_captions:
        [vocab.update(d.split()) for d in img_captions[img]]

    return vocab

img_captions = load_captions(os.path.join(DATASET_RAW, 'captions.txt'))
print(f"Number of captions: \t{len(img_captions)}")

vocabulary = text_vocabulary(img_captions)
print(f"Length of vocab: \t{len(vocabulary)}")


Number of captions: 	8091
Length of vocab: 	8763


In [5]:
def save_img_captions(img_captions, filename):
    lines = list()
    for key, desc_list in img_captions.items():
        for desc in desc_list:
            lines.append(key + '\t' + desc )
    data = "\n".join(lines)
    file = open(filename,"w")
    file.write(data)
    file.close()
    

save_img_captions(img_captions, os.path.join(DATASET_INTERIM, 'captions.txt'))

## Extracting the features from images 

In [37]:
def extract_img_features(directory):
    model = Xception(include_top=False, pooling='avg')
    features = {}
    for img in tqdm(os.listdir(directory)):
        filename = directory + "/" + img
        image = Image.open(filename)
        image = image.resize((299,299))
        image = np.expand_dims(image, axis=0)
        image = image/127.5
        image = image - 1.0

        feature = model.predict(image)
        features[img] = feature
    return features

#2048 feature vector
img_features = extract_img_features(os.path.join(DATASET_RAW, 'Images'))
dump(img_features, open(os.path.join(DATASET_INTERIM, 'img_features.pkl'), 'wb'))

2023-11-04 20:07:09.421439: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-11-04 20:07:09.475971: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-11-04 20:07:09.477446: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-11-04 20:07:09.479969: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropri

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/xception/xception_weights_tf_dim_ordering_tf_kernels_notop.h5


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for img in tqdm(os.listdir(directory)):


  0%|          | 0/8091 [00:00<?, ?it/s]

2023-11-04 20:07:18.987298: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
2023-11-04 20:07:23.180721: I tensorflow/stream_executor/cuda/cuda_dnn.cc:369] Loaded cuDNN version 8201
2023-11-04 20:07:27.142004: I tensorflow/core/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory


In [8]:
img_features = load(open(os.path.join(DATASET_INTERIM, 'img_features.pkl'),"rb"))

print(f"# of images:\t{len(img_features)}")
print(img_features['1000268201_693b08cb0e.jpg'].shape)

# of images:	8091
(1, 2048)


In [7]:
len(img_captions)

8091

## Split the training/dev/test set

In [6]:
img_files = list(img_captions.keys())

assert len(img_files) > 7000, "The list must have more than 7000 elements."

# Shuffle the list in place
random.shuffle(img_files)

# Split into training, dev, and test datasets
train_imgs = img_files[:6000]
dev_imgs = img_files[6000:7000]
test_imgs = img_files[7000:]  # The remaining part of the list

dataset = {
    "train": train_imgs,
    "dev": dev_imgs,
    "test": test_imgs
}

print(f"train: {len(train_imgs)} | dev: {len(dev_imgs)} | test: {len(test_imgs)}")

train: 6000 | dev: 1000 | test: 1091


In [9]:
def get_captions(img_captions, mode="train", dataset=dataset):
    """
    Generate an img-captions dictionary with images from a specifc dataset
    
    Reture:
        A dictionary (img filename: <start> caption <end>
    """
    captions = dict()
    for img_name in img_captions:
        if img_name in dataset[mode]:
            captions[img_name] = [
                f"<start> {caption} <end>"
                for caption in img_captions[img_name]
            ]
    
    return captions
            

def get_img_features(img_features, mode="train", dataset=dataset):
    features = dict()
    for img_name in img_features:
        if img_name in dataset[mode]:
            features[img_name] = img_features[img_name]
    
    return features
    

In [10]:
all_captions = img_captions
all_img_features = img_features

train_captions = get_captions(img_captions, "train", dataset)
train_img_features = get_img_features(img_features, "train", dataset)

dev_captions = get_captions(img_captions, "dev", dataset)
dev_img_features = get_img_features(img_features, "dev", dataset)

test_captions = get_captions(img_captions, "test", dataset)
test_img_features = get_img_features(img_features, "test", dataset)

In [11]:
print(len(all_captions), len(train_captions), len(dev_captions), len(test_captions))
print(len(all_img_features), len(dev_img_features), len(test_img_features))

8091 6000 1000 1091
8091 1000 1091


all_captionstrain_captions## Tokenize the training captions

In [12]:
def dict_to_list(captions):
    all_caption = []
    for key in captions:
        [all_caption.append(caption) for caption in captions[key]]
    return all_caption


def create_tokenizer(captions):
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(dict_to_list(captions))
    return tokenizer

In [13]:
tokenizer = create_tokenizer(train_captions)
# Save the tokenize to hard drive
dump(tokenizer, open(os.path.join(DATASET_INTERIM, 'tokenizer.pkl'), 'wb'))

vocab_size = len(tokenizer.word_index) + 1
print(f"Size of vocab = {vocab_size}")

Size of vocab = 7687


In [14]:
def max_length(captions):
    desc_list = dict_to_list(captions)
    return max(len(d.split()) for d in desc_list)
    
max_length = max_length(img_captions)
max_length

32

## Create data generator

In [41]:
# create input-output sequence pairs from the image description.

# data generator, used by model.fit_generator()
def data_generator(descriptions, features, tokenizer, max_length=32):
    while True:
        for key, description_list in descriptions.items():
            feature = features[key][0]
            input_image, input_sequence, output_word = create_sequences(tokenizer, max_length, description_list, feature)
            yield [input_image, input_sequence], output_word

            
def create_sequences(tokenizer, max_length, desc_list, feature):
    X1, X2, y = list(), list(), list()
    # walk through each description for the image
    for desc in desc_list:
        # encode the sequence
        seq = tokenizer.texts_to_sequences([desc])[0]
        # split one sequence into multiple X,y pairs
        for i in range(1, len(seq)):
            # split into input and output pair
            in_seq, out_seq = seq[:i], seq[i]
            # pad input sequence
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            # encode output sequence
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            # store
            X1.append(feature)
            X2.append(in_seq)
            y.append(out_seq)
    return np.array(X1), np.array(X2), np.array(y)

# You can check the shape of the input and output for your model
[a, b], c = next(data_generator(train_captions, train_img_features, tokenizer, max_length))
a.shape, b.shape, c.shape

((47, 2048), (47, 32), (47, 7687))

In [42]:
def define_model(vocab_size, max_length):

    # features from the CNN model squeezed from 2048 to 256 nodes
    inputs1 = Input(shape=(2048,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)

    # LSTM sequence model
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)

    # Merging both models
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)

    # tie it together [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam')

    # summarize model
    # print(model.summary())

    return model

In [None]:
# train our model
print('Dataset: ', len(train_imgs))
print('Descriptions: train =', len(train_captions))
print('Photos: train =', len(train_img_features))
print('Vocabulary Size:', vocab_size)
print('Description Length: ', max_length)

model = define_model(vocab_size, max_length)
epochs = 10
steps = len(train_captions)
# making a directory models to save our models
for i in range(epochs):
    generator = data_generator(train_captions, train_img_features, tokenizer, max_length)
    model.fit(x=generator, epochs=1, steps_per_epoch=steps, verbose=1)
    model.save(os.path.join(CHECKPOINTS, f"/model_{str(i)}.h5"))

Dataset:  6000
Descriptions: train = 6000
Photos: train = 6000
Vocabulary Size: 7687
Description Length:  32