In [None]:
import os
import pickle
import numpy as np
from tqdm.notebook import tqdm
import pandas as pd
# from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add
from PIL import Image

from nltk.stem import PorterStemmer
import re
from sklearn.model_selection import train_test_split
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

In [25]:
labels = pd.read_csv("captions.txt")[["Caption", "image_id"]]
labels

Unnamed: 0,Caption,image_id
0,A child in a pink dress is climbing up a set o...,1000268201_693b08cb0e.jpg
1,A girl going into a wooden building .,1000268201_693b08cb0e.jpg
2,A little girl climbing into a wooden playhouse .,1000268201_693b08cb0e.jpg
3,A little girl climbing the stairs to her playh...,1000268201_693b08cb0e.jpg
4,A little girl in a pink dress going into a woo...,1000268201_693b08cb0e.jpg
...,...,...
40450,A man in a pink shirt climbs a rock face,997722733_0cb5439472.jpg
40451,A man is rock climbing high in the air .,997722733_0cb5439472.jpg
40452,A person in a red shirt climbing up a rock fac...,997722733_0cb5439472.jpg
40453,A rock climber in a red shirt .,997722733_0cb5439472.jpg


In [26]:
labels.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 40455 entries, 0 to 40454
Data columns (total 2 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   Caption   40455 non-null  object
 1   image_id  40455 non-null  object
dtypes: object(2)
memory usage: 632.2+ KB


In [None]:
# Feature Extraction
# vgg = VGG16()
# vgg = Model(inputs=vgg.inputs, outputs=vgg.layers[-2].output)
# vgg.summary()

resnet = ResNet50()
resnet = Model(inputs=ResNet50().inputs, outputs=ResNet50().layers[-2].output)
resnet.summary()

In [None]:
features = {}
BASE_DIR = "./Images/"
images = os.listdir(BASE_DIR)

for img in images:
    image_path = os.path.join(BASE_DIR, img)
    image = Image.open(image_path).resize((224, 224))

    image = img_to_array(image)
    image = np.expand_dims(image, axis=0)
    image = preprocess_input(image)

    feature = resnet.predict(image, verbose=0)

    image_idx = str(img).split(".")[0]
    features[image_idx] = feature

pickle.dump(features, open("image_features.pkl", "wb"))

In [27]:
# Preprocess Twitter captions
class Preprocessor:
    def __init__(self):
        self.lemmatizer = WordNetLemmatizer()
        self.stopwords = set(stopwords.words("english"))

    def remove_username(self, tweet):
        return re.sub(r"@(\w+)", "", tweet)
    
    def remove_punctuations(self, tweet):
        puncs = [".", ",", "!", "?", ":", ";", "-", "_", "(", ")", "[", "]", "{", "}", "<", ">", "/", "\\", "|", "@", "$", "%", "^", "&", "*", "+", "=", "~", "`", "RT"]
        for punc in puncs:
            tweet = tweet.replace(punc, "")
        return tweet

    def remove_stopwords(self, tweet):
        stop_words = set(stopwords.words('english'))
    
        filtered_tweets = [t for t in tweet.split(" ") if t.lower() not in stop_words]
        filtered_tweets = " ".join(filtered_tweets).strip()
        return filtered_tweets
    
    def lemmatize(self, tweet):
        lemmatizer = WordNetLemmatizer()
        lemmatized = [lemmatizer.lemmatize(t) for t in tweet.split(" ")]
        lemmatized = " ".join(lemmatized)
        return lemmatized
    
    def stem_words(self, tweet):
        stemmer = PorterStemmer()
        stemmed = [stemmer.stem(t) for t in tweet.split(" ")]
        stemmed = " ".join(stemmed)
        return stemmed

In [28]:
preprocessor = Preprocessor()
labels["Caption"] = labels["Caption"].fillna("").astype(str)
labels["Caption"] = labels["Caption"].apply(preprocessor.remove_username)
labels["Caption"] = labels["Caption"].apply(preprocessor.remove_punctuations)
labels["Caption"] = labels["Caption"].apply(preprocessor.remove_stopwords)
labels["Caption"] = labels["Caption"].apply(preprocessor.lemmatize)

In [29]:
with open(os.path.join("./", "image_features.pkl"), "rb") as f:
    image_features = pickle.load(f)

In [30]:
labels["Caption"] = labels["Caption"].apply(lambda x: "<start> " + x + " <end>")

In [31]:
MAX_LENGTH =  max(len(caption.split()) for caption in labels["Caption"])

In [32]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(labels["Caption"].values)
vocab_size = len(tokenizer.word_index) + 1

print("Vocab size: ", vocab_size)

Vocab size:  7651


In [33]:
labels

Unnamed: 0,Caption,image_id
0,<start> child pink dress climbing set stair en...,1000268201_693b08cb0e.jpg
1,<start> girl going wooden building <end>,1000268201_693b08cb0e.jpg
2,<start> little girl climbing wooden playhouse ...,1000268201_693b08cb0e.jpg
3,<start> little girl climbing stair playhouse <...,1000268201_693b08cb0e.jpg
4,<start> little girl pink dress going wooden ca...,1000268201_693b08cb0e.jpg
...,...,...
40450,<start> man pink shirt climb rock face <end>,997722733_0cb5439472.jpg
40451,<start> man rock climbing high air <end>,997722733_0cb5439472.jpg
40452,<start> person red shirt climbing rock face co...,997722733_0cb5439472.jpg
40453,<start> rock climber red shirt <end>,997722733_0cb5439472.jpg


In [None]:
import tensorflow as tf
def generate_data(keys, labels, max_length, tokenizer, vocab_size, batch_size=32):
    """
    Organize the data into image features, input sequence features and the label (output sequence).

    features - image features
    labels - caption dataframe
    max)_length - maximum length of the caption
    tokenizer - tokenizer object
    vocab_size - total # of classes the model can predict
    """
    img_features, i_sequence, o_sequence = list(), list(), list()
    counter = 0
    while 1:
        for key in keys:
            counter +=1
            image_feature = image_features[key][0] #(4096,)

            labels_per_image = labels[labels["image_id"] == f"{key}.jpg"]["Caption"].values
            #labels_per_image = labels[labels["image_id"] == f"{key}.jpg"]["Caption"].values[0]
            
            # print(labels_per_image, key)
            for label in labels_per_image:

                label_split = tokenizer.texts_to_sequences([label])[0]
                for i in range(1,len(label_split)):
                    prev, next = label_split[:i], label_split[i]

                    in_seq = pad_sequences([prev], maxlen=max_length, padding='post')[0].astype('int32')
                    # out_seq = to_categorical([next], 	num_classes=vocab_size)[0].astype('float16')
                    
                    img_features.append(image_feature)
                    i_sequence.append(in_seq)
                    o_sequence.append(next)

            if counter == batch_size:
                yield (np.array(img_features, dtype='float32'), np.array(i_sequence, dtype='int32')), np.array(o_sequence, dtype='int32')
                img_features, i_sequence, o_sequence = list(), list(), list()
                counter = 0
                    
# data = generate_data(image_features, labels, max_length=MAX_LENGTH, tokenizer=tokenizer, vocab_size=vocab_size)

In [None]:
def pretrained_embedding(glove_file):
    features = {}
    with open(glove_file, "r", encoding="utf-8") as f:
        for line in f:
            word = line.split()[0]
            vec = np.array(line.split()[1:], dtype="float32")

            features[word] = vec

        return features
word_to_vec = pretrained_embedding("glove.6B.50d.txt")

In [35]:
def pretrained_embedding_layer(vocab_size):
    embedding_matrix = np.zeros((vocab_size, 50))

    for word, i in word_to_vec.items():
        if word in tokenizer.word_index:
            idx = tokenizer.word_index[word]
            embedding_matrix[idx] = word_to_vec[word]
        
    embedding_layer = Embedding(vocab_size, 50, trainable=False)
    embedding_layer.build((None,))
    embedding_layer.set_weights([embedding_matrix])
    return embedding_layer

pretrained_embedding = pretrained_embedding_layer(len(tokenizer.word_index)+1)

In [None]:
# Model

def caption_generator_model(vocab_size):
    image_input = Input(shape=(4096,))
    second_last_layer = Dropout(0.5)(image_input) # trainable 2nd last layer
    last_layer = Dense(256, activation='relu')(second_last_layer) # trainable last layer

    text_input = Input(shape=(MAX_LENGTH,))

    # embedding = Embedding(input_dim=vocab_size, output_dim=256)(text_input)
    embedding = pretrained_embedding(text_input) 

    t = Dropout(0.5)(embedding)
    t = LSTM(256)(t) 

    concat = add([last_layer, t])

    res = Dense(256, activation='relu')(concat)
    res = Dense(vocab_size, activation='softmax')(res)

    model = Model(inputs=[image_input, text_input], outputs=res)

    return model

model = caption_generator_model(vocab_size=7651)

In [41]:
model.summary()

In [None]:
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')

In [19]:
steps = len(image_features) // 32
# creating a directory named models to save our models
import tensorflow as tf

output_signature = (
    (
        tf.TensorSpec(shape=(None, 4096), dtype='float32'),  # image features
        tf.TensorSpec(shape=(None, MAX_LENGTH), dtype='int32')  # input sequence
    ),
    tf.TensorSpec(shape=(None,), dtype='int32')  # output sequence
)

generator = lambda: generate_data(list(image_features.keys()), labels, max_length=MAX_LENGTH, tokenizer=tokenizer, vocab_size=vocab_size)
dataset = tf.data.Dataset.from_generator(generator, output_signature=output_signature)
model.fit(dataset, steps_per_epoch= steps, epochs=10)

Epoch 1/10
[1m252/252[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m164s[0m 631ms/step - accuracy: 0.1493 - loss: 5.8384
Epoch 2/10
[1m252/252[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m161s[0m 638ms/step - accuracy: 0.2234 - loss: 4.5395
Epoch 3/10
[1m252/252[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m130s[0m 517ms/step - accuracy: 0.2421 - loss: 4.2186
Epoch 4/10
[1m252/252[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m118s[0m 469ms/step - accuracy: 0.2507 - loss: 4.0027
Epoch 5/10
[1m252/252[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m121s[0m 480ms/step - accuracy: 0.2592 - loss: 3.8273
Epoch 6/10
[1m 12/252[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m1:54[0m 478ms/step - accuracy: 0.2629 - loss: 3.7206

KeyboardInterrupt: 

In [None]:
model.save("model_weights.h5")

In [None]:
from nltk.translate.bleu_score import sentence_bleu

TEST_PATH = "Test/"
def write_caption(image_name, reference):
    caption = "<start>"
    
    image_path = os.path.join(TEST_PATH, image_name)
    image = Image.open(image_path).resize((224, 224))
    
    image = img_to_array(image)
    image = np.expand_dims(image, axis=0)   
    image = preprocess_input(image)

    feature = vgg.predict(image, verbose=0)

    while True:
        tokenized = tokenizer.texts_to_sequences([caption])[0]
        in_seq = pad_sequences([tokenized], maxlen=MAX_LENGTH, padding='post')[0].astype('int32')
        in_seq = np.expand_dims(in_seq, axis=0)
        pred = model.predict([feature, in_seq], verbose=0)
  
        next_word = np.argmax(pred[0])

        caption += " " + tokenizer.index_word[next_word]
        if tokenizer.index_word[next_word] == "end":
            break

        print(caption)

    # Calculate BLEU score
    reference = [ref.split() for ref in reference]
    caption = caption.split()
    score = sentence_bleu(reference, caption, weights=(0.25, 0.25, 0.25, 0.25))
    print("BLEU score: ", score)

references = [
    ['man with a white shirt and a hat'],
    ['man holding a hat'],
    ['man with a brown bowler hat']
]

write_caption("download.jpg", references)

<start> man
<start> man white
<start> man white shirt
<start> man white shirt shirt
<start> man white shirt shirt shirt
<start> man white shirt shirt shirt shirt
<start> man white shirt shirt shirt shirt shirt
<start> man white shirt shirt shirt shirt shirt shirt


In [None]:
references = [
    ['man with a white shirt and a hat'],
    ['man holding a hat'],
    ['man with a brown bowler hat']
]

write_caption("download.jpg", references)

# TODO
- validate the above runs without issue
- create another notebook to do it for Twitter data