**Task: given image and caption find probability by which caption belongs to image.** i.e. output the probability.

In [None]:
from google.colab import drive
drive.mount('/content/drive')
data_root = '/content/drive/My Drive/IITB Courses/CS 772/Project/data'

Mounted at /content/drive


In [None]:
# import os
# import zipfile

# !wget http://images.cocodataset.org/zips/train2017.zip -P '/content/drive/My Drive/IITB Courses/CS 772/Project/data'
# with zipfile.ZipFile(os.path.join(data_root,'train2017.zip'), 'r') as zip_ref:
#     zip_ref.extractall(data_root)


--2022-04-07 21:07:47--  http://images.cocodataset.org/zips/train2017.zip
Resolving images.cocodataset.org (images.cocodataset.org)... 52.217.141.201
Connecting to images.cocodataset.org (images.cocodataset.org)|52.217.141.201|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 19336861798 (18G) [application/zip]
Saving to: ‘/content/drive/My Drive/IITB Courses/CS 772/Project/data/train2017.zip’


2022-04-07 21:13:59 (49.6 MB/s) - ‘/content/drive/My Drive/IITB Courses/CS 772/Project/data/train2017.zip’ saved [19336861798/19336861798]



In [None]:
import numpy as np
from numpy import array
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import string
import os
import json
import math
import random
from tqdm import tqdm
from PIL import Image
import glob
import pickle
from time import time
import tensorflow as tf
from keras.applications.inception_v3 import InceptionV3, preprocess_input
# from keras.preprocessing import image

NO_OF_IMAGES = 15000 # number of images to load (NOTE: it is different from training size since there are multiple captions per image)
MAX_LEN = 20    # max length of caption
EMBED_SIZE = 200
TRAIN_SIZE = .8
TEST_SIZE = .2

# process images

In [None]:
# images_list = os.listdir(os.path.join(data_root, 'train2017'))    # list all the images in folder.
# pickle.dump(images_list, open(os.path.join(data_root, 'dump', 'images_list'), 'wb'))  # store images file names so that we don't need to retrive again.
images_list = pickle.load(open(os.path.join(data_root, 'dump', 'images_list'), 'rb'))   # load the images finle names from dump.

images_list = images_list[:NO_OF_IMAGES]
print(f'total images:{len(images_list)}')
image_dict = {image: [] for image in images_list}       # create dict to store image and it's captions in a list.

data = json.loads(open(os.path.join(data_root, 'captions_train2017.json'), 'r').read())
print(data.keys())

total images:15000
dict_keys(['info', 'licenses', 'images', 'annotations'])


In [None]:
images = data['images']
annotations = data['annotations']

image2id, id2image = {}, {} # every image file has unique id and unique file name create dict of file name -> id and id -> file name.

images_set = set(images_list)
for image in images:
    if image['file_name'] in images_set:
        image2id[image['file_name']] = image['id']
        id2image[image['id']] = image['file_name']

captions_404 = [] # captions which does not have any image (since we didn't consider all the images) use these caption for -ve example generation.

for caption in annotations:
    if id2image.get(caption['image_id']) is None:
        captions_404.append(caption['caption'])
    else:
        image_dict[id2image[caption['image_id']]].append(caption['caption'])


In [None]:
# load images from dump to save time.
# image_loaded = {}
# pickle.dump(image_loaded, open(os.path.join(data_root, 'dump', 'images_loaded_299'), mode='wb'))  # use it for first time.
image_loaded = pickle.load(open(os.path.join(data_root, 'dump', 'images_loaded_299'), mode='rb'))
def load_image(file_name):
    img = image_loaded.get(file_name)   # retrive image if it is already loaded from disk.
    if img is None:
        img = tf.keras.preprocessing.image.load_img(os.path.join(data_root, 'train2017',file_name), target_size=(299, 299)) # load new image from disk
        image_loaded[file_name] = img   # save image to dict and later dump this dict so that we can retrive dict in next runs of program.
    return img

In [None]:
inceptionV3_model = InceptionV3(weights='imagenet') # InceptionV3 is used to extract the features from image(you can use your own CNN but it is trained on huge dataset)
my_inceptionV3_model = tf.keras.models.Model(inputs=inceptionV3_model.input, outputs=inceptionV3_model.layers[-2].output)   # take second last layer as output layer.

def preprocess_image(file_name):
    img = load_image(file_name)
    img = tf.keras.preprocessing.image.img_to_array(img)    # convert image from PIL form to array of (width, height, channels) shape.
    img = np.expand_dims(img, axis=0) # inceptionV3 expects input as (1, width, height, channels)
    img = preprocess_input(img)
    return img

def extract_features(file_name):
    img = preprocess_image(file_name)
    feature_vec = my_inceptionV3_model.predict(img) # extract the feature vector from image it is of shape (1, 2048)
    feature_vec = feature_vec.reshape(feature_vec.shape[1], )   # reshape to (2048,) 1D vector
    return feature_vec



image_feature = {}  # map file name to feature vector
for file_name in tqdm(images_list):
    image_feature[file_name] = extract_features(file_name)

# now dump the newly loaded images to file along with already loaded images present in dict
pickle.dump(image_loaded, open(os.path.join(data_root, 'dump', 'images_loaded_299'), 'wb'))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels.h5


100%|██████████| 15000/15000 [17:57<00:00, 13.93it/s]


#process captions

In [None]:
def remove_punctuations(list):
    processed_list = []
    filters='''!'"“”#$%&()⟨⟩*+,-–—./:;<=>?@[\\]^_`’{|}~\t\n'''
    for caption in list:
        caption = caption.lower()
        for char in filters:
            caption = caption.replace(char, ' ')
            caption = ' '.join(caption.split())
        processed_list.append(caption)
    return processed_list

def _tokenizer(train_captions):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        filters='''!'"“”#$%&()⟨⟩*+,-–—./:;<=>?@[\\]^_`’{|}~\t\n''',
        lower=True
    )
    tokenizer.fit_on_texts(train_captions)
    return tokenizer

def encoder(sents, tokenizer):
    encoded_sents = tokenizer.texts_to_sequences(sents)
    padded_sents = tf.keras.preprocessing.sequence.pad_sequences(
        sequences=encoded_sents,
        maxlen=MAX_LEN,
        padding='post'
    )
    return padded_sents

for key, captions in image_dict.items():
    image_dict[key] = remove_punctuations(image_dict[key])  # process the captions.
    
captions_404 = remove_punctuations(captions_404)

#split data

In [None]:
data = list(image_dict.items()) # image_dict has image file name as key and list of captions as value.
random.shuffle(data)
length = len(data)

train_data = data[ : math.ceil(TRAIN_SIZE*length)]
test_data = data[math.ceil((TRAIN_SIZE)*length) : ]
print(f'train_size:{len(train_data)}, test_size:{len(test_data)}')

def getDataDict(data):
    data_dict = {}
    for image, list in data:
        data_dict[image] = list
    return data_dict

train_data = getDataDict(train_data)    # train data is list of tuples convert it to dictionary of {file name: list of caption}
test_data = getDataDict(test_data)
# print(f'{list(train_data.items())[:10]}, \n{list(val_data.items())[:10]}, \n{list(test_data.items())[:10]}')

train_captions = [] # take all the captions in training data to be used in tokenizer (we can not use test data in tokenizer)
for _, captions in train_data.items():
    train_captions.extend(captions)

tokenizer = _tokenizer(train_captions)
pickle.dump(tokenizer, open(os.path.join(data_root, 'dump', 'tokenizer'), 'wb'))

train_x, train_y = [], []
test_x, test_y = [], []


#TODO: instead of taking 5 captions of same image better to keep 5 different image with one caption.
def prepare_dataset(data_dict):
    data_x, data_y = [], []
    for key, captions in data_dict.items():
        count = 1 #len(captions)
        captions = encoder(captions, tokenizer)
        for caption in captions[:1]:    # taking +ve examples
            data_x.append([image_feature[key], caption])
            data_y.append(1)

        captions = encoder(random.sample(captions_404, count), tokenizer)
        for caption in captions:  # taking same number of -ve example
            data_x.append([image_feature[key], caption])
            data_y.append(0)
    return data_x, data_y

train_x, train_y = prepare_dataset(train_data)  # create a list of list with contains [[image_feature vector, single caption], ...]
test_x, test_y = prepare_dataset(test_data)

# print(f'train_x:{len(train_x)}, val_x:{len(val_x)}, test_x:{len(test_x)}')

train_size:12000, test_size:3000


#model

In [None]:
""" Downloading and extracting word vectors
"""
# !wget http://nlp.stanford.edu/data/glove.6B.zip -P '/content/drive/My Drive/IITB Courses/CS 772/Project/data/glove'
# import zipfile
# with zipfile.ZipFile('/content/drive/My Drive/IITB Courses/CS 772/Project/data/glove/'+'glove.6B.zip', 'r') as zip_ref:
#     zip_ref.extractall('/content/drive/My Drive/IITB Courses/CS 772/Project/data/glove')

In [None]:
vocab_size = len(tokenizer.word_index)+1    # index of word starts from 1 hence vocab size is more because we have to include 0th index.
print('size of vocab', vocab_size)

embeddings = {}
with open(os.path.join(data_root, 'glove', 'glove.6B.'+str(EMBED_SIZE)+'d.txt'), mode='r', encoding='utf8') as f:
    for line in f:
        line = line.split()
        embeddings[line[0]] = np.asarray(line[1:], dtype='float32')

embed_init = np.zeros((vocab_size, EMBED_SIZE))

for word, idx in tokenizer.word_index.items():
    vec = embeddings.get(word)
    if vec is not None:
        embed_init[idx] = vec

embedding_layer = tf.keras.layers.Embedding(
    input_dim=vocab_size,
    output_dim=EMBED_SIZE,
    embeddings_initializer=tf.keras.initializers.Constant(embed_init),
    trainable=False,
    mask_zero=True
)
print(embed_init.shape)

img_input = tf.keras.Input(shape=(2048,))   # input of image feature vector
img1 = tf.keras.layers.Dropout(rate=.5)(img_input)
img2 = tf.keras.layers.Dense(units=256, activation='relu')(img1)

cap_input = tf.keras.Input(shape=(MAX_LEN,))    # input of caption
cap1 = embedding_layer(cap_input)   # returns matrix of shape (number of words, vector size)
cap2 = tf.keras.layers.Dropout(rate=.5)(cap1)
cap3 = tf.keras.layers.LSTM(units=256)(cap2)    # after applying entire input sequence it returns the last hidden state of LSTM.

combine = tf.keras.layers.concatenate([img2, cap3])
combine1 = tf.keras.layers.Dense(units=256, activation='relu')(combine)

output = tf.keras.layers.Dense(units=1, activation='sigmoid')(combine1) # output neuron since we have to output single probability sigmoid is used, in case of prob distribution softmax is used.
model = tf.keras.models.Model(inputs=[img_input, cap_input], outputs=output) # model has two input both are in a list and one output. we need to supply the training data in the same format to model.fit() like input will be list of two list/tensor output will be single list

model.summary()
tf.keras.utils.plot_model(model, to_file='/content/drive/My Drive/IITB Courses/CS 772/Project/model_inception.png')   # to output the model diagram.

model.compile(
    loss=tf.keras.losses.BinaryCrossentropy(), # since sigmoid is used, BinaryCrossentropy() is used with sigmod in case of single output.
    optimizer=tf.keras.optimizers.Adam(),
    metrics=['accuracy']
    )

size of vocab 10414
(10414, 200)
Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 20)]         0           []                               
                                                                                                  
 input_2 (InputLayer)           [(None, 2048)]       0           []                               
                                                                                                  
 embedding (Embedding)          (None, 20, 200)      2082800     ['input_3[0][0]']                
                                                                                                  
 dropout (Dropout)              (None, 2048)         0           ['input_2[0][0]']                
                                                           

In [None]:
train_x_img, train_x_cap = [], []   # we have training data into same list but model expects two input hence we need to separate the two input data.

for img, cap in train_x:
    train_x_img.append(img)
    train_x_cap.append(cap)

history = model.fit(
    x=[tf.constant(train_x_img), tf.constant(train_x_cap)],     # model was giving error so I converted to tensors using tf.constant().
    y=tf.constant(train_y),
    batch_size=32,
    epochs=40,
    validation_split=0.2
    )
model.save(os.path.join(data_root, 'dump', 'model'))

Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40
Epoch 26/40
Epoch 27/40
Epoch 28/40
Epoch 29/40
Epoch 30/40
Epoch 31/40
Epoch 32/40
Epoch 33/40
Epoch 34/40
Epoch 35/40
Epoch 36/40
Epoch 37/40
Epoch 38/40
Epoch 39/40
Epoch 40/40




INFO:tensorflow:Assets written to: /content/drive/My Drive/IITB Courses/CS 772/Project/data/dump/model/assets


INFO:tensorflow:Assets written to: /content/drive/My Drive/IITB Courses/CS 772/Project/data/dump/model/assets


In [None]:
import numpy as np
from sklearn.metrics import precision_recall_fscore_support
import sklearn.metrics

test_x_img, test_x_cap = [], []

for img, cap in test_x:
    test_x_img.append(img)
    test_x_cap.append(cap)

predictions = model.predict([tf.constant(test_x_img), tf.constant(test_x_cap)])
predictions = tf.squeeze(predictions)   # prediction is of shape (number of examples, 1) hence I squeezed to make it 1d.

loss = tf.keras.losses.BinaryCrossentropy()
print(f'test loss is:{loss(test_y, predictions).numpy()}')

pred_y = []
for val in predictions: # since we are going to predict non-consistent caption if prediction is < .5 hence we are making it 0 and 1 otherwise now see what is error.
    if val < .5:
        pred_y.append(0)
    else:
        pred_y.append(1)
prec, rec, fscore, support = precision_recall_fscore_support(test_y, pred_y, average='weighted')

print(f'prec:{prec}, recall:{rec}, fsocre:{fscore}')

correct = 0
for x,y in zip(test_y, pred_y):
    if x == y:
        correct += 1

print(f'accuracy:{correct/len(test_y)}')

test loss is:0.4255835711956024
prec:0.9049387949622608, recall:0.9041666666666667, fsocre:0.9041209616617523
accuracy:0.9041666666666667


In [None]:
test_0, pred_0 = [], []
test_1, pred_1 = [], []

for x,y in zip(test_y, pred_y):
    if x == 1:
        test_1.append(x)
        pred_1.append(y)
    else:
        test_0.append(x)
        pred_0.append(y)

prec, rec, fscore, support = precision_recall_fscore_support(test_0, pred_0, average='weighted')

print(f'prec:{prec}, recall:{rec}, fsocre:{fscore}')

prec, rec, fscore, support = precision_recall_fscore_support(test_1, pred_1, average='weighted')

print(f'prec:{prec}, recall:{rec}, fsocre:{fscore}')

#testing

In [None]:
from google.colab import drive
drive.mount('/content/drive')
data_root = '/content/drive/My Drive/IITB Courses/CS 772/Project/data'

import pickle
import os
import tensorflow as tf
from keras.applications.inception_v3 import InceptionV3, preprocess_input
import numpy as np
MAX_LEN = 20

tokenizer = pickle.load(open(os.path.join(data_root, 'dump', 'tokenizer'), 'rb'))
model = tf.keras.models.load_model(os.path.join(data_root, 'dump', 'model'))

def encoder(sents, tokenizer):
    encoded_sents = tokenizer.texts_to_sequences(sents)
    padded_sents = tf.keras.preprocessing.sequence.pad_sequences(
        sequences=encoded_sents,
        maxlen=MAX_LEN,
        padding='post'
    )
    return padded_sents

inceptionV3_model = InceptionV3(weights='imagenet')
my_inceptionV3_model = tf.keras.models.Model(inputs=inceptionV3_model.input, outputs=inceptionV3_model.layers[-2].output)

def preprocess_image(file_name):
    img = tf.keras.preprocessing.image.load_img(file_name, target_size=(299, 299))
    img = tf.keras.preprocessing.image.img_to_array(img)
    img = np.expand_dims(img, axis=0) # inceptionV3 expects input as (1, width, height, channels)
    img = preprocess_input(img)
    return img


def extract_features(file_name):
    img = preprocess_image(file_name)
    feature_vec = my_inceptionV3_model.predict(img)
    feature_vec = feature_vec.reshape(feature_vec.shape[1], )
    return feature_vec

# upload image, take its name.
# input captions
# convert image to feature vector,

# use model.predict with expand_dim since single input is supplied.

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def upload_file():
    from google.colab import files
    uploaded = files.upload()
    for k, v in uploaded.items():
        open(k, 'wb').write(v)
    return list(uploaded.keys())
print(upload_file()) # printing the file name.

Saving COCO_test2014_000000000128.jpg to COCO_test2014_000000000128.jpg
['COCO_test2014_000000000128.jpg']


In [None]:
caption = 'There is an elephant'
feature_vec = extract_features('COCO_test2014_000000000128.jpg')
# print(type(feature_vec), feature_vec)
cap = encoder([caption], tokenizer)[0]
# print(type(cap), cap)
print(f'similarity is:{model.predict([tf.constant([feature_vec]), tf.constant([cap])])[0]}') # since model.predict expects batch input hence expanding dimension by enquoting feature vector and caption inside list separately. NOTE: outer list is to say there are two inputs inner list is expanding the  dim.

similarity is:[0.0026902]
