<a href="https://colab.research.google.com/github/dude123studios/AdvancedDeepLearning/blob/main/Image_captioning_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import collections
import random
import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle

In [2]:

annotation_folder = '/annotations/'
if not os.path.exists(os.path.abspath('.') + annotation_folder):
  annotation_zip = tf.keras.utils.get_file('captions.zip',
                                          cache_subdir=os.path.abspath('.'),
                                          origin = 'http://images.cocodataset.org/annotations/annotations_trainval2014.zip',
                                          extract = True)
  annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json'
  os.remove(annotation_zip)
image_folder = '/train2014/'
if not os.path.exists(os.path.abspath('.') + image_folder):
  image_zip = tf.keras.utils.get_file('train2014.zip',
                                      cache_subdir=os.path.abspath('.'),
                                      origin = 'http://images.cocodataset.org/zips/train2014.zip',
                                      extract = True)
  PATH = os.path.dirname(image_zip) + image_folder
  os.remove(image_zip)
else:
  PATH = os.path.abspath('.') + image_folder

Downloading data from http://images.cocodataset.org/annotations/annotations_trainval2014.zip
Downloading data from http://images.cocodataset.org/zips/train2014.zip


In [10]:
with open(annotation_file, 'r') as f:
  annotations = json.load(f)

image_path_to_caption = collections.defaultdict(list)
for val in annotations['annotations']:
  caption = f"<start> {val['caption']} <end>"
  image_path = PATH+'COCO_train2014_'+'%012d.jpg' % (val['image_id'])
  image_path_to_caption[image_path].append(caption)
image_paths = list(image_path_to_caption.keys())
random.shuffle(image_paths)
train_image_paths = image_paths[:5000]

In [12]:
train_captions = []
image_names = []
for image_path in train_image_paths:
  caption_list = image_path_to_captions[image_path]
  train_captions.extend(caption_list)
  image_names.extend([image_path]*len(caption_list))


In [None]:
print(train_captions[0])
Image.open(image_names[0])

The output had to be cut out due to githubs image in jupyter notebook support

In [14]:
def load_image(image_path):
  img = tf.io.read_file(image_path)
  img = tf.image.decode_jpeg(img, channels=3)
  img = tf.image.resize(img, (299,299))
  img = tf.keras.applications.inception_v3.preprocess_input(img)
  return img, image_path

In [17]:
image_model = tf.keras.applications.inception_v3.InceptionV3(include_top=False,weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output
image_features_model = tf.keras.models.Model(new_input, hidden_layer)

encode_train = sorted(set(image_names))
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(
  load_image, num_parallel_calls=tf.data.AUTOTUNE).batch(16)

for img, path in image_dataset:
  batch_features = image_model(img)
  batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))

  for bf, p in zip(batch_features, path):
    path_of_feature = p.numpy().decode("utf-8")
    np.save(path_of_feature, bf.numpy())

In [18]:
top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(train_captions)

In [19]:
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'
train_seqs = tokenizer.texts_to_sequences(train_captions)
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')
max_length = max(len(t) for t in train_seqs)

In [20]:
img_to_cap_vector = collections.defaultdict(list)
for img, cap in zip(image_names, cap_vector):
  img_to_cap_vector[img].append(cap)

img_keys = list(img_to_cap_vector.keys())
random.shuffle(img_keys)

slice_index = int(len(img_keys)*0.8)
img_name_train_keys, img_name_val_keys = img_keys[:slice_index], img_keys[slice_index:]

img_name_train = []
cap_train = []
for imgt in img_name_train_keys:
  capt_len = len(img_to_cap_vector[imgt])
  img_name_train.extend([imgt] * capt_len)
  cap_train.extend(img_to_cap_vector[imgt])

img_name_val = []
cap_val = []
for imgv in img_name_val_keys:
  capv_len = len(img_to_cap_vector[imgv])
  img_name_val.extend([imgv] * capv_len)
  cap_val.extend(img_to_cap_vector[imgv])

In [21]:
len(img_name_train), len(cap_train), len(img_name_val), len(cap_val)

(20007, 20007, 5003, 5003)

In [22]:
BATCH_SIZE = 64
BUFFER_SIZE = 1000
embedding_dim = 256
units = 512
vocab_size = top_k + 1
num_steps = len(img_name_train) // BATCH_SIZE
features_shape = 2048
attention_features_shape = 64

def map_func(img_name, cap):
  img_tensor = np.load(img_name.decode('utf-8')+'.npy')
  return img_tensor, cap
dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))

dataset = dataset.map(lambda item1, item2: tf.numpy_function(
          map_func, [item1, item2], [tf.float32, tf.int32]),
          num_parallel_calls=tf.data.AUTOTUNE)

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [24]:
class BahdanauAttention(tf.keras.layers.Layer):
  def __init__(self, num_units):
    super(BahdanauAttention, self).__init__()
    self.W1 = tf.keras.layers.Dense(num_units)
    self.W2 = tf.keras.layers.Dense(num_units)
    self.V = tf.keras.layers.Dense(1)
  
  def call(self, query, values):
    
    query = tf.expand_dims(query, axis=1)
    
    distribution = self.V(tf.nn.tanh(self.W1(query)+ self.W2(values)))
    distribution = tf.nn.softmax(distribution)

    context = tf.reduce_sum(
        tf.linalg.matmul(
            tf.linalg.matrix_transpose(distribution),
            values
        ),axis=1
    )
    return context, distribution

In [25]:
class CNN_Encoder(tf.keras.Model):
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

In [46]:
class Decoder(tf.keras.Model):
  def __init__(self, embedding_dim, decoder_dim, vocab_size,**kwargs):
    super(Decoder, self).__init__(**kwargs)
    self.units = decoder_dim
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.rnn = tf.keras.layers.GRU(decoder_dim, return_sequences=True, return_state=True)
    self.W1 = tf.keras.layers.Dense(self.units)
    self.W2 = tf.keras.layers.Dense(vocab_size)
    self.attention = BahdanauAttention(self.units)
  
  def call(self, x, values, state):
    context_vector, attention_weights = self.attention(state, values)
    x = self.embedding(x)
    x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

    output, state = self.rnn(x,state)
    x = self.W1(output)
    x = tf.reshape(x, (-1, x.shape[2]))
    x = self.W2(x)
    
    return x, state, attention_weights
    
  def reset_state(self, batch_size):
    return tf.zeros((batch_size, self.units))


In [47]:
encoder = CNN_Encoder(embedding_dim)
decoder = Decoder(embedding_dim, units, vocab_size)

In [48]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
def loss_fn(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)
  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask
  return tf.reduce_mean(loss_)

In [65]:
@tf.function
def train_step(img_tensor, target):
  loss = 0
  state = decoder.reset_state(batch_size=target.shape[0])
  dec_input = tf.expand_dims([tokenizer.word_index['<start>']]*target.shape[0],1)
  with tf.GradientTape() as tape:
    features = encoder(img_tensor)
    for i in range(1, target.shape[1]):
      preds, state, _ = decoder(dec_input, features, state)
      loss += loss_fn(target[:,i], preds)
      dec_input = tf.expand_dims(target[:,i], 1)
  total_loss = (loss / int(target.shape[1]))
  trainable_variables = encoder.trainable_variables + decoder.trainable_variables
  gradients = tape.gradient(loss, trainable_variables)
  optimizer.apply_gradients(zip(gradients, trainable_variables))
  return loss, total_loss

In [73]:
EPOCHS = 50
for epoch in range(start_epoch, EPOCHS):
  start_time = time.time()
  total_loss = 0
  for (batch, (img_tensor, target)) in enumerate(dataset):
    batch_loss, t_loss = train_step(img_tensor, target)
    total_loss += t_loss
    if batch % 100 == 0:
      print('Epoch: {}/{}, Batch: {}, loss: {:.4f}'.format(epoch+1,EPOCHS, batch, t_loss / int(target.shape[1])))
  if epoch % 10 == 0:
    encoder.save_weights('encoder_ckpt{}.h5'.format(epoch // 10))
    decoder.save_weights('decoder_ckpt{}.h5'.format(epoch // 10))
  print('Epoch {}, Loss {:.6f}'.format(epoch+1, total_loss/48))
  print ('Time taken for epoch is {} seconds\n'.format(time.time() - start_time))

Epoch: 1/50, Batch: 0, loss: 0.0267
Epoch: 1/50, Batch: 100, loss: 0.0244
Epoch: 1/50, Batch: 200, loss: 0.0250
Epoch: 1/50, Batch: 300, loss: 0.0242
Epoch 1, Loss 7.877653
Time taken for epoch is 51.739614963531494 seconds

Epoch: 2/50, Batch: 0, loss: 0.0262
Epoch: 2/50, Batch: 100, loss: 0.0254
Epoch: 2/50, Batch: 200, loss: 0.0267
Epoch: 2/50, Batch: 300, loss: 0.0226
Epoch 2, Loss 7.772827
Time taken for epoch is 51.69720125198364 seconds

Epoch: 3/50, Batch: 0, loss: 0.0259
Epoch: 3/50, Batch: 100, loss: 0.0246
Epoch: 3/50, Batch: 200, loss: 0.0236
Epoch: 3/50, Batch: 300, loss: 0.0237
Epoch 3, Loss 7.477122
Time taken for epoch is 51.70173978805542 seconds

Epoch: 4/50, Batch: 0, loss: 0.0233
Epoch: 4/50, Batch: 100, loss: 0.0248
Epoch: 4/50, Batch: 200, loss: 0.0234
Epoch: 4/50, Batch: 300, loss: 0.0238
Epoch 4, Loss 7.212861
Time taken for epoch is 51.62812161445618 seconds

Epoch: 5/50, Batch: 0, loss: 0.0235
Epoch: 5/50, Batch: 100, loss: 0.0221
Epoch: 5/50, Batch: 200, loss