<a href="https://colab.research.google.com/github/Delaunay-I/image_cap_generator/blob/main/caption_generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
%cd /content/drive/MyDrive/colab\ files

In [3]:
import matplotlib.pyplot as plt
import tensorflow as tf
import keras
import sys, time, os, warnings 
import numpy as np
import pandas as pd 
from collections import Counter 

print("python {}".format(sys.version))
print("keras version {}".format(keras.__version__)); del keras
print("tensorflow version {}".format(tf.__version__))

def set_seed(sd=123):
    from numpy.random import seed
    from tensorflow import set_random_seed
    import random as rn
    ## numpy random seed
    seed(sd)
    ## core python's random number 
    rn.seed(sd)
    ## tensor flow's random number
    set_random_seed(sd)

python 3.9.16 (main, Dec  7 2022, 01:11:51) 
[GCC 9.4.0]
keras version 2.12.0
tensorflow version 2.12.0


In [12]:
!pip install opendatasets
import opendatasets as od
import pandas

od.download("https://www.kaggle.com/datasets/adityajn105/flickr8k")

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Skipping, found downloaded files in "./flickr8k" (use force=True to force download)


In [13]:
## The location of the Flickr8K_ photos
dir_Flickr_jpg = "./flickr8k/Images"
## The location of the caption file
dir_Flickr_text = "./flickr8k/captions.txt"

jpgs = os.listdir(dir_Flickr_jpg)
print("The number of jpg flies in Flicker8k: {}".format(len(jpgs)))

The number of jpg flies in Flicker8k: 8091


## Preliminary Analysis

In [14]:
df_txt = pd.read_csv(dir_Flickr_text, skiprows=1, names=["filename", "caption"])
df_txt['caption'] = df_txt['caption'].str.lower()

df_txt['index'] = df_txt.groupby("filename").cumcount()

uni_filenames = np.unique(df_txt.filename.values)
print("The number of unique file names : {}".format(len(uni_filenames)))
print("The distribution of the number of captions for each image:")
Counter(Counter(df_txt.filename.values).values())

The number of unique file names : 8091
The distribution of the number of captions for each image:


Counter({5: 8091})

# Data prepration
prepare text and image separately

In [15]:
from copy import copy
def add_start_end_seq_token(captions):
    caps = []
    for txt in captions:
        txt = 'startseq ' + txt + ' endseq'
        caps.append(txt)
    return(caps)

df_txt["caption"] = add_start_end_seq_token(df_txt["caption"])
df_txt.head(5)

Unnamed: 0,filename,caption,index
0,1000268201_693b08cb0e.jpg,startseq a child in a pink dress is climbing u...,0
1,1000268201_693b08cb0e.jpg,startseq a girl going into a wooden building ....,1
2,1000268201_693b08cb0e.jpg,startseq a little girl climbing into a wooden ...,2
3,1000268201_693b08cb0e.jpg,startseq a little girl climbing the stairs to ...,3
4,1000268201_693b08cb0e.jpg,startseq a little girl in a pink dress going i...,4


# split the dataset int train and test splits

In [17]:
from sklearn.model_selection import train_test_split

# split your df_txt into train and test sets
train_df, test_df = train_test_split(df_txt, test_size=0.2, shuffle=False)
# dropping the imagefile that with shared captions in the two splits
# this would also avoid data leakage
test_df = test_df.iloc[1:]
train_df =  train_df[:-4]

# Image prepration
## create features for image using InceptionV3 model

In [18]:
from keras.applications.inception_v3 import InceptionV3, preprocess_input
from keras.models import Model

base_model = InceptionV3(weights='imagenet')
image_model = Model(inputs = base_model.input, outputs=base_model.layers[-2].output)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels.h5


In [19]:
from tensorflow.keras.utils import load_img, img_to_array
from keras.applications.inception_v3 import preprocess_input
import glob
import pickle

train_path = dir_Flickr_jpg
path_all_images = glob.glob(train_path + '/*jpg')

train_img = []  # list of all images in training set
test_img = []
for im in path_all_images:
    file_name = os.path.basename(os.path.normpath(im))
    # include images that only exist in the target directory
    # can split the dataset this way
    if(file_name in train_df.filename.to_list()):
        train_img.append(im)
    elif (file_name in test_df.filename.to_list()):
        test_img.append(im)

def preprocess(image_path):
    # inception v3 excepts img in 299 * 299 * 3
    image = load_img(image_path, target_size=(299, 299))
    # convert the image pixels to a numpy array
    x = img_to_array(image)
    # Add one more dimension
    x = np.expand_dims(x, axis = 0)
    x = preprocess_input(x)
    return x

def encode(image_path):
    image = preprocess(image_path)
    vec = image_model.predict(image, verbose=0)
    vec_flattened = vec.flatten()
    return vec_flattened


train_img_feats = {}
test_img_feats = {}

if not (os.path.exists('train_encoder.pkl') and os.path.exists('test_encoder.pkl')):
    for image in train_img:
        file_name = os.path.basename(os.path.normpath(image))
        train_img_feats[file_name] = encode(image)
    for image in test_img:
        file_name = os.path.basename(os.path.normpath(image))
        test_img_feats[file_name] = encode(image)
    # Save the image features
    with open('train_encoder.pkl', 'wb') as f:
        pickle.dump(train_img_feats, f)
    with open('test_encoder.pkl', 'wb') as f:
        pickle.dump(test_img_feats, f)
else:
    # Load previously encoded image data
    with open('train_encoder.pkl', 'rb') as f:
        train_img_feats = pickle.load(f)
    with open('test_encoder.pkl', 'rb') as f:
        test_img_feats = pickle.load(f)

In [53]:
train_img_feats

{'3103264875_2a8d534abc.jpg': array([0.29125422, 0.56051743, 0.09697243, ..., 0.06267833, 0.5895493 ,
        0.52111846], dtype=float32),
 '3319020762_d429d56a69.jpg': array([0.08743957, 0.03076484, 0.47039473, ..., 0.24871874, 0.627587  ,
        0.15285188], dtype=float32),
 '247637795_fdf26a03cf.jpg': array([0.18437263, 0.54828835, 0.39811057, ..., 0.75611246, 0.71042645,
        0.3554488 ], dtype=float32),
 '2766726291_b83eb5d315.jpg': array([0.401232  , 0.2789092 , 0.24069485, ..., 1.32501   , 0.14128834,
        0.00393988], dtype=float32),
 '3461583471_2b8b6b4d73.jpg': array([0.2895525 , 0.15182093, 0.15093619, ..., 0.50745636, 1.243553  ,
        0.5307038 ], dtype=float32),
 '2456907314_49bc4591c4.jpg': array([0.48334837, 0.3883487 , 0.09491307, ..., 0.67991817, 0.5676531 ,
        0.49680197], dtype=float32),
 '3249014584_21dd9ddd9d.jpg': array([0.2869537 , 0.86530936, 0.6054361 , ..., 1.3873844 , 1.1618288 ,
        0.490624  ], dtype=float32),
 '3295452057_0c987f895f.jpg'

# Tokenize the captions

In [20]:
from tensorflow.keras.layers import TextVectorization

vocab_size = 8000
# output_sequence_length = 20 # adjust this according to your max_length

# create the TextVectorization layer
vectorizer = TextVectorization(max_tokens=vocab_size, output_mode="int")

# adapt the layer to the captions_list
vectorizer.adapt(train_df.caption.to_list())

train_seqs = vectorizer(train_df.caption.to_list())
test_seqs = vectorizer(test_df.caption.to_list())

vectorizer_word_index = {}
vectorizer_index_word = {}
for index, word in enumerate(vectorizer.get_vocabulary()):
    if index == 0:
        continue
    vectorizer_word_index[word] = index
    vectorizer_index_word[index] = word

# get the max length of sequences from the vectorizer
max_length = train_seqs.shape[1]

In [21]:
from tensorflow.keras.utils import pad_sequences
from tensorflow.keras.utils import to_categorical

def data_generator(df, vectorizer, max_length, vocab_size, image_data, batch_size):
    num_batches = len(df) // batch_size
    while True:
        for i in range(num_batches):
            batch_df = df.iloc[i*batch_size:(i+1)*batch_size]
            X1, X2, y = [], [], []
            for index, row in batch_df.iterrows():
                try:
                    # try to get the image features from the image_data dictionary
                    pic = image_data[row['filename']]
                except KeyError:
                    # if the file name is not found, print a warning message and skip this row
                    print(f"Warning: file name {row['filename']} not found in image_data dictionary. Skipping this row.")
                    continue
                seq = vectorizer(row['caption']).numpy() # convert caption to vectorized tensor and then to numpy array
                # seq = np.argmax(seq) # optional: get index of max value in vector
                for j in range(1, len(seq)):
                    in_seq, out_seq = seq[:j], seq[j]
                    in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                    X1.append(pic)
                    X2.append(in_seq)
                    y.append(out_seq)
            yield [np.array(X1), np.array(X2)], np.array(y)

batch_size = 64
# create data generator for the train set
train_generator = data_generator(train_df, vectorizer, max_length, vocab_size, train_img_feats, batch_size)

# create data generator for the test set
test_generator = data_generator(test_df, vectorizer, max_length, vocab_size, test_img_feats, batch_size)

In [51]:
def data_generator(df, vectorizer, max_length, vocab_size, image_data, batch_size):
    num_batches = len(df) // batch_size
    while True:
        for i in range(num_batches):
            batch_df = df.iloc[i*batch_size:(i+1)*batch_size]
            X1, X2, y = [], [], []
            for index, row in batch_df.iterrows():
                try:
                    # try to get the image features from the image_data dictionary
                    pic = image_data[row['filename']]
                except KeyError:
                    # if the file name is not found, print a warning message and skip this row
                    print(f"Warning: file name {row['filename']} not found in image_data dictionary. Skipping this row.")
                    continue
                seq = vectorizer(row['caption']).numpy() # convert caption to vectorized tensor and then to numpy array
                # seq = np.argmax(seq) # optional: get index of max value in vector
                for j in range(1, len(seq)):
                    in_seq, out_seq = seq[:j], seq[j]
                    in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                    X1.append(pic)
                    X2.append(in_seq)
                    y.append(out_seq)
            yield [np.array(X1), np.array(X2)], np.array(y)
        
        # handle leftover samples that don't fit in a full batch
        remaining = len(df) % batch_size
        if remaining > 0:
            batch_df = df.iloc[-remaining:]
            X1, X2, y = [], [], []
            for index, row in batch_df.iterrows():
                try:
                    # try to get the image features from the image_data dictionary
                    pic = image_data[row['filename']]
                except KeyError:
                    # if the file name is not found, print a warning message and skip this row
                    print(f"Warning: file name {row['filename']} not found in image_data dictionary. Skipping this row.")
                    continue
                seq = vectorizer(row['caption']).numpy() # convert caption to vectorized tensor and then to numpy array
                # seq = np.argmax(seq) # optional: get index of max value in vector
                for j in range(1, len(seq)):
                    in_seq, out_seq = seq[:j], seq[j]
                    in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                    X1.append(pic)
                    X2.append(in_seq)
                    y.append(out_seq)
            yield [np.array(X1), np.array(X2)], np.array(y)

batch_size = 64
# create data generator for the train set
train_generator = data_generator(train_df, vectorizer, max_length, vocab_size, train_img_feats, batch_size)

# create data generator for the test set
test_generator = data_generator(test_df, vectorizer, max_length, vocab_size, test_img_feats, batch_size)

# Downloading GloVe to using its vector embeddings

In [22]:
import urllib.request
import zipfile

# Set the URL for the GloVe embeddings
url = 'http://nlp.stanford.edu/data/glove.6B.zip'

# Set the path where the embeddings will be stored
embeddings_dir = 'embeddings/glove'

# Create the directory if it doesn't exist
if not os.path.exists(embeddings_dir):
    os.makedirs(embeddings_dir)

# Set the file name for the embeddings archive
embeddings_zip = os.path.join(embeddings_dir, 'glove.6B.zip')

# Download the embeddings archive if it doesn't exist
if not os.path.exists(embeddings_zip):
    print(f'Downloading GloVe embeddings from {url}...')
    urllib.request.urlretrieve(url, embeddings_zip)
    print('Done!')

# Extract the embeddings if they haven't been extracted yet
if not os.path.exists(os.path.join(embeddings_dir, 'glove.6B.100d.txt')):
    print('Extracting GloVe embeddings...')
    with zipfile.ZipFile(embeddings_zip, 'r') as zip_ref:
        zip_ref.extractall(embeddings_dir)
    print('Done!')


In [23]:
# load GloVe embeddings
embeddings_index = {}
glove_path = "./embeddings/glove/glove.6B.200d.txt"

with open(glove_path, 'r', encoding='utf-8') as f:
    for line in f:
        values = line.split()
        word = values[0]
        coefs = np.asarray(values[1:], dtype='float32')
        embeddings_index[word] = coefs

# prepare embedding matrix
embedding_dim = 200
embedding_matrix = np.zeros((vocab_size, embedding_dim))
for i, word in enumerate(vectorizer.get_vocabulary()):
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

# Define the model

In [24]:
from tensorflow.keras import layers, Model

# define the model
ip1 = layers.Input(shape = (2048, ))
fe1 = layers.Dropout(0.2)(ip1)
fe2 = layers.Dense(256, activation = 'relu')(fe1)
ip2 = layers.Input(shape = (max_length, ))
se1 = layers.Embedding(vocab_size, embedding_dim, mask_zero = True)(ip2)
se2 = layers.Dropout(0.2)(se1)
se3 = layers.LSTM(256)(se2)
decoder1 = layers.add([fe2, se3])
decoder2 = layers.Dense(256, activation = 'relu')(decoder1)
outputs = layers.Dense(vocab_size, activation = 'softmax')(decoder2)
model = Model(inputs = [ip1, ip2], outputs = outputs)

# Train the model

In [36]:
print(f"test_df: {test_df.shape}")
print(f"train_df: {train_df.shape}")

test_df: (8090, 3)
train_df: (32360, 3)


In [38]:
len(train_df) // batch_size

505

In [37]:
len(test_df)  // batch_size

126

In [52]:
model.layers[2].set_weights([embedding_matrix])
model.layers[2].trainable = False
model.compile(loss = 'categorical_crossentropy', optimizer = 'adam')

num_batches = len(train_df) // batch_size
num_val_batches = len(test_df) // batch_size

model.fit(train_generator,
          validation_data=test_generator,
          validation_steps=num_val_batches,
          epochs = 50,
          steps_per_epoch=num_batches,
          verbose = 1)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50

KeyboardInterrupt: ignored

## Saving the model

In [None]:
# define some custom metadata for the model
metadata = {
  'name': 'image_caption_generator',
  'description': 'A model that generates captions for images using InceptionV3 and have all the stop words, and with no lemmatization',
  'parameters': {
    'vocab_size': 8000,
    'embedding_dim': 200,
    'lstm_units': 256,
    'beam_size': 5
  },
  'performance': {
    'loss': 1.6700,
    'accuracy': 'Nan',
    'bleu_score': 'NaN'
  }
}

# save the model with the metadata
model.save('capGen_model_v2.h5')

In [None]:
model = tf.keras.models.load_model('img_cap_model_v2.h5')

In [None]:
# Load and preprocess the new image
img = encode('./test_img.jpg')
img = img.reshape((1, 2048))

# Generate the caption
caption = [tokenizer.word_index["startseq"]]

for i in range(max_length):
    seq = pad_sequences([caption], maxlen=max_length)
    pred = model.predict([img, seq])
    pred_word_index = np.argmax(pred[0][i])
    if pred_word_index in tokenizer.index_word:
        pred_word = tokenizer.index_word[pred_word_index]
        caption.append(pred_word_index)
        if pred_word == 'endseq':
            break

# Convert the caption back to text
caption = tokenizer.sequences_to_texts([caption])[0]

print(caption)

In [None]:
model_old = tf.keras.models.load_model("./caption_generator_inceptionV3.h5")

In [None]:
# Load and preprocess the new image
img = encode("test_img.jpg")
img = img.reshape((1, 2048))

caption = [vectorizer(["startseq"]).numpy()[0][0]]

for i in range(max_length):
    padded_caption = pad_sequences([caption], maxlen=22, padding='post')
    prediction = model.predict([img, padded_caption], verbose=0)
    word_index = np.argmax(prediction)
    caption.append(word_index)
    if word_index == vectorizer_word_index["endseq"]:
        break


caption_words = [vectorizer_index_word[i] for i in caption]
# join the words to form a sentence
caption_sentence = ' '.join(caption_words[1:-1])
caption_sentence

In [None]:
def beam_search_caption(image_path, beam_width):
    # encode the image
    image_vec = encode(image_path)
    image_vec = image_vec.reshape(1, -1)
    # initialize the caption with the start token
    caption = [vectorizer(["startseq"]).numpy()[0][0]]
    # initialize beam search
    beam = [(caption, 0)]
    
    # loop until the end token or the maximum length is reached
    for i in range(max_length):
        # generate new candidates
        candidates = []
        for j in range(len(beam)):
            seq, score = beam[j]
            # check if the sequence ends with endseq
            if seq[-1] == vectorizer_word_index["endseq"]:
                candidates.append((seq, score))
                continue
            # predict the next word using the model
            padded_caption = pad_sequences([seq], maxlen=max_length, padding='post')
            prediction = model.predict([image_vec, padded_caption], verbose=0)[0]
            # get the top k words with the highest probability
            top_k = prediction.argsort()[-beam_width:][::-1]
            # add new candidates to the list
            for w in top_k:
                new_seq = seq + [w]
                new_score = score + np.log(prediction[w])
                candidates.append((new_seq, new_score))
        # select top k candidates
        beam = sorted(candidates, key=lambda x: x[1], reverse=True)[:beam_width]
        
    # select the best candidate
    seq, score = beam[0]
    # convert the caption indices to words
    caption_words = [vectorizer_index_word[i] for i in seq]
    # join the words to form a sentence
    caption_sentence = ' '.join(caption_words[1:-1])
    return caption_sentence

In [None]:
generated_caption = beam_search_caption("test_img.jpg", 10)
print(generated_caption)

# Model 2, more complicated
* You can use a bidirectional LSTM instead of a single LSTM for the caption encoder. This way, you can capture the context from both directions of the caption sequence, and generate more coherent captions.

* You can use an attention mechanism to allow the decoder to focus on different parts of the image and the caption encoder outputs at each time step. This way, you can generate more relevant and informative captions that align with the image content.

* You can use a scheduled sampling technique to train the decoder with a mix of ground truth and predicted words. This way, you can reduce the exposure bias and improve the generalization ability of the decoder.

* You can use a beam search instead of a greedy search for generating captions. This way, you can explore more possible captions and choose the one with the highest probability.

In [None]:
from tensorflow.keras import layers, Model
from tensorflow.keras.layers import Bidirectional
from tensorflow.keras.layers import Attention

# define the model
ip1 = layers.Input(shape = (2048,))
fe1 = layers.Dropout(0.2)(ip1)
fe2 = layers.Dense(512, activation='relu')(fe1)  # add this layer to match the dimension of se3[:, -1]
ip2 = layers.Input(shape = (max_length,))
se1 = layers.Embedding(vocab_size, embedding_dim, mask_zero = True)(ip2)
se2 = layers.Dropout(0.2)(se1)
se3 = Bidirectional(layers.LSTM(256, return_sequences=True))(se2)
decoder1 = layers.add([fe2, se3[:, -1]])
decoder2 = layers.Dense(256, activation='relu')(decoder1)
attn_layer = Attention()
context_vector, attention_weights = attn_layer([decoder2, se3])
decoder3 = layers.Dense(256, activation='relu')(context_vector)
outputs = layers.Dense(vocab_size, activation='softmax')(decoder3)
model_v2 = Model(inputs=[ip1, ip2], outputs=outputs)

model_v2.layers[3].set_weights([embedding_matrix])
model_v2.layers[3].trainable = False
model_v2.compile(loss = 'categorical_crossentropy', optimizer = 'adam')

num_batches = len(train_df) // batch_size
model_v2.fit(train_generator, epochs = 50, steps_per_epoch=num_batches, verbose = 1)

## Beam search

In [None]:
def beam_search(image_path, beam_size):
  # encode the image
  image_vec = encode(image_path)
  # add another dimension to match the model input
  image_vec = np.expand_dims(image_vec, axis=0)
  # initialize the candidates with the start token
  candidates = [[tokenizer.word_index['<start>']]]
  # initialize the probabilities with 1
  probabilities = [1]
  # loop until the maximum length is reached
  for i in range(max_length):
    # initialize a list to store the next candidates
    next_candidates = []
    # initialize a list to store the next probabilities
    next_probabilities = []
    # loop over the current candidates
    for j in range(len(candidates)):
      # get the current candidate
      candidate = candidates[j]
      # pad the candidate sequence
      padded_candidate = pad_sequences([candidate], maxlen=max_length, padding='post')
      # predict the next word using the model
      prediction = model.predict([image_vec, padded_candidate], verbose=0)
      # get the top beam_size words and their probabilities
      top_words = np.argsort(prediction[0])[-beam_size:]
      top_probs = prediction[0][top_words]
      # loop over the top words and their probabilities
      for k in range(beam_size):
        # get the word and its probability
        word = top_words[k]
        prob = top_probs[k]
        # append the word to the candidate and multiply the probability
        next_candidate = candidate + [word]
        next_prob = probabilities[j] * prob
        # append the next candidate and probability to the lists
        next_candidates.append(next_candidate)
        next_probabilities.append(next_prob)
    # sort the next candidates and probabilities by descending order of probability
    sorted_indices = np.argsort(next_probabilities)[::-1]
    sorted_candidates = [next_candidates[i] for i in sorted_indices]
    sorted_probabilities = [next_probabilities[i] for i in sorted_indices]
    # select the top beam_size candidates and probabilities for the next iteration
    candidates = sorted_candidates[:beam_size]
    probabilities = sorted_probabilities[:beam_size]
    # check if any candidate has reached the end token
    end_index = tokenizer.word_index['<end>']
    if any(candidate[-1] == end_index for candidate in candidates):
      break
  # return the candidate with the highest probability
  best_candidate = candidates[0]
  # convert the candidate indices to words
  caption_words = [tokenizer.index_word[i] for i in best_candidate]
  # join the words to form a sentence
  caption_sentence = ' '.join(caption_words[1:-1])
  return caption_sentence

In [None]:
from tensorflow.keras.layers import TextVectorization
from tensorflow.keras.utils import pad_sequences
from tensorflow.keras.utils import to_categorical
import tensorflow as tf

def data_generator(df, image_data, batch_size):
    # create a dataset from the data frame
    ds = tf.data.Dataset.from_tensor_slices((df['filename'], df['caption']))
    # map the filename to the image data
    ds = ds.map(lambda x, y: (image_data[x], y))
    # apply the TextVectorization layer as a transformation
    ds = ds.map(lambda x, y: (x, vectorizer([y])))
    # unbatch the dataset to get individual elements
    ds = ds.unbatch()
    # create input and output sequences
    ds = ds.map(lambda x, y: (x, y[:-1], y[1:]))
    # pad and one-hot encode the sequences if needed
    # ds = ds.map(lambda x, y, z: (x, pad_sequences([y], maxlen=max_length)[0], to_categorical([z], num_classes=vocab_size)[0]))
    # batch the dataset
    ds = ds.batch(batch_size)
    return ds

batch_size = 64
train_generator = data_generator(df_txt, image_data, batch_size)