In [1]:
import numpy as np
import os
import pickle
from tqdm import tqdm
import keras
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Input, Dense, add, LSTM, Embedding, Dropout, Conv2D, MaxPooling2D, BatchNormalization, Flatten
from tensorflow.keras.models import Model, Sequential
import tensorflow as tf

In [3]:
base_dir = r'D:\University Files\Assignments\7th Semester\Machine Learning\Project\VizWiz Dataset'
train_img_dir = r'D:\University Files\Assignments\7th Semester\Machine Learning\Project\VizWiz Dataset\train'
val_img_dir = r'D:\University Files\Assignments\7th Semester\Machine Learning\Project\VizWiz Dataset\val'
test_img_dir = r'D:\University Files\Assignments\7th Semester\Machine Learning\Project\VizWiz Dataset\test'
train_annot_dir = r'D:\University Files\Assignments\7th Semester\Machine Learning\Project\VizWiz Dataset\annotations\train.json'
val_annot_dir = r'D:\University Files\Assignments\7th Semester\Machine Learning\Project\VizWiz Dataset\annotations\val.json'
test_annot_dir = r'D:\University Files\Assignments\7th Semester\Machine Learning\Project\VizWiz Dataset\annotations\test.json'

In [4]:
import json

with open(train_annot_dir, 'r') as file:
    train_dict = json.load(file)

with open(val_annot_dir, 'r') as file:
    val_dict = json.load(file)

with open(test_annot_dir, 'r') as file:
    test_dict = json.load(file)

In [5]:
train_img_mapping = {}

for indiv_dict in train_dict['images']:
    img_name = indiv_dict['file_name']
    img_id = indiv_dict['id']
    train_img_mapping[img_name] = img_id

In [6]:
val_img_mapping = {}

for indiv_dict in val_dict['images']:
    img_name = indiv_dict['file_name']
    img_id = indiv_dict['id']
    val_img_mapping[img_name] = img_id

In [7]:
train_set = []
validation_set = []

 # FIGURE OUT A WAY TO USE CUSTOM CONVNET

In [8]:
HEIGHT = 224
WIDTH = 224

shape = (HEIGHT, WIDTH, 3)

Initial Convolutional Neural Network

In [12]:
def define_model(neurons, dense_layers, bn, dropouts):
  model = Sequential()

  for i, nodes in enumerate(neurons):
    if i == 0:
      model.add(
          Conv2D(nodes, (3, 3), input_shape=shape, activation='relu'))
      model.add(MaxPooling2D(pool_size=(2, 2)))
      if bn:
        model.add(BatchNormalization())
    else:
      model.add(Conv2D(nodes, (3, 3), activation='relu'))
      model.add(MaxPooling2D(pool_size=(2, 2)))

  model.add(Flatten())

  for i, nodes in enumerate(dense_layers):
    model.add(Dense(nodes, activation='relu'))
    model.add(Dropout(dropouts[i]))

  model.add(Dense(1, activation='sigmoid'))
  model.compile(loss="binary_crossentropy",
                optimizer='adam', metrics=["accuracy"])
  return model

In [None]:
es = tf.keras.callbacks.EarlyStopping(
    monitor='val_accuracy',
    patience=5,
    verbose=1,
    mode='max'
)

cp = tf.keras.callbacks.ModelCheckpoint(
    'best_aug.h5',
    monitor='val_accuracy',
    mode='max',
    verbose=1,
    save_best_only=True
)

history = model.fit(X_train, y_train, validation_data=(
    X_val, y_val), batch_size=132, epochs=30, callbacks=[es, cp], verbose=2)

In [18]:
# Define VGG16 Model
model = VGG16()
model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_5 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 56, 56, 128)       0     

In [19]:
def get_features(model, split_set, set_dict):
    image_features = {}
    directory = os.path.join(base_dir, split_set)

    for img_name in tqdm(os.listdir(directory)):
        if not (img_name in set_dict.keys()): continue
        # Load an image
        img_path = os.path.join(directory, img_name)
        img = load_img(img_path, target_size=(HEIGHT, WIDTH))
        # Convert image into numpy pixel values
        img = img_to_array(img)
        # Reshape the data for the model
        img = img.reshape((1, img.shape[0], img.shape[1], img.shape[2]))
        # Preprocess
        img = preprocess_input(img)
        # Extract features
        feature = model.predict(img, verbose=0)
        # Map img_id with its features
        img_id = set_dict[img_name]
        image_features[img_id] = feature
    return image_features

In [20]:
train_img_features = get_features(model, 'train', train_img_mapping)

 34%|███▍      | 8211/23954 [1:21:10<1:57:15,  2.24it/s]

In [None]:
val_img_features = get_features(model, 'val', val_img_mapping)

In [None]:
train_img_features_path = r'D:\University Files\Assignments\7th Semester\Machine Learning\Project\loaded_data\train_img_features.pkl'
val_img_features_path = r'D:\University Files\Assignments\7th Semester\Machine Learning\Project\loaded_data\val_img_features.pkl'

In [None]:
import pickle 
pickle.dump(train_img_features, train_img_features_path, 'wb')
pickle.dump(val_img_features, val_img_features_path, 'wb')

In [None]:
t = train_img_features
v = val_img_features

In [None]:
with open(train_img_features_path, 'rb') as f:
    train_img_features = pickle.load(f)
with open(val_img_features_path, 'rb') as f:
    val_img_features = pickle.load(f)

In [15]:
def get_captions(img_name, dict_set):
    img_captions = []
    for d in dict_set['images']:
        encountered = False
        if d.get('file_name') == img_name:
            img_id = d.get('id')
            for k in dict_set['annotations']:
                if k.get('image_id') != img_id and encountered: break
                if k.get('image_id') == img_id:
                    encountered = True
                    if k.get('is_rejected') == False:
                        img_captions.append(k.get('caption'))
                        print(k.get('caption'))
                        
    return img_captions

In [None]:
train_img_to_captions = {}

for img_name, img_id in train_img_mapping.items():
    if img_id not in train_img_to_captions:
        train_img_to_captions[img_id] = get_captions(img_name, train_dict)

train_img_to_captions

In [None]:
# Do it for validation

In [None]:
def preprocess_text(mapping):
    for key, captions in mapping.items():
        for i in range(len(captions)):
            caption = caption[i]
            caption = caption.lower()
            caption = caption.replace('[^A-Za-z]', '')
            caption = caption.replace('\s+', '')
            caption = 'startseq ' + ' '.join([word for word in caption.split() if len(word) > 1]) + ' endseq'
            captions[i] = caption

In [None]:
# Before preprocessing captions:
train_img_to_captions[0]

In [None]:
# After preprocessing
preprocess_text(train_img_to_captions)
train_img_to_captions[0]

In [None]:
# Get all vocabulary 

all_captions = []

for key in mapping.keys():
    for caption in mapping[key]:
        all_captions.append(caption)

len(all_captions)

In [None]:
# Tokenize the text
tokenizer = Tokenizer()
tokenizer.fit_on_texts(all_captions)
vocab_size = len(tokenizer.word_index) + 1
vocab_size

In [None]:
# get max length of a caption:
max_length = max(len(caption.split()) for caption in all_captions)
max_length

In [None]:
# Train test split
img_ids = list(train_img_to_captions.keys())
split = int(len(img_ids) * 0.90)
train = img_ids[:split]
test = img_ids[split:]

In [None]:
# create data generator to fetch imgs and captions (get data in batches)
def data_generator(data_keys, mapping, features, tokenizer, max_length, vocab_size, batch_size):
    x1, x2, y = list(), list(), list()
    n = 0
    while 1:
        for key in data_keys:
            n += 1
            captions = mapping[key]
            for caption in captions:
                # Encoding the sequence
                seq = tokenizer.texts_to_sequence([caption])[0]
                # Split the seq into x, y pairs
                for i in range(1, len(seq)):
                    in_seq, out_seq = seq[:i], seq[i]
                    in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]

                    # Store seqs
                    x1.append(features[key][0])
                    x2.append(in_seq)
                    y.append(out_seq)

        if n == batch_size:
            x1, x2, y = np.array(x1), np.rray(x2), np.array(y)
            yield [x1, x2] y
            x1, x2, y = list(), list(), list()
            n = 0   

In [None]:
conv_inputs = Input(shape=(4096,))
fe1 = Dropout(0.4)(conv_inputs)
fe2 = Dense(256, activation='relu')(fe1)

seq_inputs = Input(shape=(max_length,))
se1 = Embedding(vocab_size, 256, mask_zero=True)(seq_inputs) # try pretrained embedding
se2 = Dropout(0.4)(se1)
se3 = LSTM(256)(se2)

# Decoder model
decoder1 = add([fe2, se3])
decoder2 = Dense(256, activation='relu')(decoder1)
outputs = Dense(vocab_size, activation='softmax')(decoder2)

model = Model(inputs=[inputs1, inputs2], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam')

plot_model(model, show_shapes=True)

In [None]:
epochs = 30
batch_size = 64
steps = len(train) // batch_size

for i in range(epochs):
    gen = data_generator(train, train_img_to_captions, train_img_features, tokenizer, max_length, vocab_size, batch_size)
    model.fit(gen, epochs=1, steps_per_epoch=steps, verbose=1)

In [None]:
model.save('vgg_model.h5')

In [None]:
# generate captions
def idx_to_word(integer, tokenizer):
    for word, idx in tokenizer.word_index.items():
        if idx == integer: return word
    return None

In [None]:
def predict_caption(model, image, tokenizer, max_length):
    in_text = 'startseq'
    for i in range(max_length):
        sequence = tokenizer.texts_to_sequence([in_text])[0]
        sequence = pad_sequences([sequence], max_length)[0]
        next_word = model.predict([image, sequence], verbose=0)
        next_word = np.argmax(next_word)
        word = idx_to_word(next_word, tokenizer)
        if word is None:
            break
        in_text += ' ' + word
        if word == 'endseq': break
    return in_text

In [None]:
actual, predicted = list(), list()

    for key in tqdm(test):
        captions = train_img_to_captions[key]
        y_pred = predict_caption(model, train_img_features, tokenizer, max_length)

        actual_captions = [caption.split() for caption in captions]
        y_pred = y_pred.split()
        actual.append(actual_captions)
        predicted.append(y_pred)

print(f'BLEU-1: {corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0))}')
print(f'BLEU-2: {corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0))}')

In [None]:
# Visualize the results
from PIL import Image
import matplotlib.pyplot as plt
img_name = ''
img_id = val_img_mapping[img_name]
img_path = os.path.join(base_dir, 'val', img_name)
img = Image.open(img_path)
captions = val_img_to_captions[img_id]
print('-------------------------Actual-----------------------------')
for caption in captions:
    print(caption)

y_pred = predict_caption(model, val_img_features[img_id], tokenizer, max_length)
print('-------------------------Predicted-----------------------------')
print(y_pred)

plt.imshow(img)