# Simple Model for Image Captioning

## Mount Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Get Images and Dataset files

In [None]:
!mkdir ImageCaption_Dataset
!unzip /content/drive/MyDrive/Image_Caption/NLP_dataset.zip -d ImageCaption_Dataset

# Import Libraries

In [None]:
import json
import keras
import random
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
from tqdm import tqdm
import numpy as np
from tensorflow import keras

In [None]:
with open("/content/ImageCaption_Dataset/cleaned_dataset.json", 'r') as f:
    dataset_json = json.load(f)

dataset_len = len(dataset_json)
dataset_ids = [*range(dataset_len)]

#shuffle dataset
random.Random(6).shuffle(dataset_ids)

captions = [dataset_json[id]["caption"] for id in dataset_ids]
image_paths = ['/content/ImageCaption_Dataset/images/{0:09d}.jpg'.format(id) for id in dataset_ids]


#take 80% as train set, and 20% as test
train_length = int(dataset_len * .8)
test_length = dataset_len - train_length

# Load Images and Inception Model 

In [26]:
def load_image(image_path):
  img = tf.io.read_file(image_path)
  img = tf.io.decode_jpeg(img, channels=3)
  img = tf.keras.layers.Resizing(299, 299)(img)
  img = tf.keras.applications.inception_v3.preprocess_input(img)
  return img, image_path

def create_inceptionv3_model():
  inception_model = tf.keras.applications.InceptionV3(include_top=False,
                                                weights='imagenet')
  return tf.keras.Model(inception_model.input, inception_model.layers[-1].output)

# Get features of Images

In [None]:
#get image features from inceptionv3 model's last layer
img_dataset = tf.data.Dataset.from_tensor_slices(image_paths)
img_dataset = img_dataset.map(
  load_image, num_parallel_calls=tf.data.AUTOTUNE).batch(32)

feature_extraction_model = create_inceptionv3_model()

path_of_feature = '/content/drive/MyDrive/Image_Caption/ExtractedFeatures/'

i = 0

for img, path in tqdm(img_dataset): 
  features = feature_extraction_model(img)
  image_features = tf.reshape(features, (features.shape[0], -1, features.shape[-1]))
  

  for bf, p in zip(image_features, path):
    path_sp = str(p).split('/')[-1]
    path_image = path_of_feature + path_sp.split('.')[0] + '.npy'
    np.save(path_image, bf.numpy())

# Define tokenizer

In [None]:
def get_longest_caption_length(captions):
  return max([len(caption.split()) for caption in captions])

output_sequence_length = get_longest_caption_length(captions)
train_captions = captions[:train_length] 

# create a tokenizer and get vocabulary from train captions using TextVectorization
tokenizer = tf.keras.layers.TextVectorization(
  standardize=None, 
  max_tokens=5000,
  output_sequence_length=output_sequence_length)

caption_dataset = tf.data.Dataset.from_tensor_slices(train_captions)
tokenizer.adapt(caption_dataset)

#get word2index and index2word for mapping the words and indices
word_to_index = tf.keras.layers.StringLookup(
    mask_token="",
    vocabulary=tokenizer.get_vocabulary())
index_to_word = tf.keras.layers.StringLookup(
    mask_token="",
    vocabulary=tokenizer.get_vocabulary(),
    invert=True)

# Model and Training Parameters

In [None]:
BATCH_SIZE = 64
embedding_dim = 512
EPOCHS = 40

# Create Dataset and Tokenize Captions

In [None]:
# get tokenized vectors
def get_vectors(caps):
  cap_dataset = tf.data.Dataset.from_tensor_slices(caps)
  return cap_dataset.map(lambda x: tokenizer(x)) #CHANGE THIS TO A [] LATER

def create_dataset_data(image_path, caption):
  cap_vec = tokenizer(caption)
  img_vec = np.load(image_path.decode('utf-8'))
  return img_vec, cap_vec

def set_dataset_shapes(img_vec, cap_vec):
  cap_vec.set_shape(cap_vec.shape)
  img_vec.set_shape(img_vec.shape)
  return img_vec, cap_vec

In [None]:
import os
image_npy_paths = ['/content/drive/MyDrive/Image_Caption/ExtractedFeatures/' + path for path in os.listdir('/content/drive/MyDrive/Image_Caption/ExtractedFeatures/')]

In [None]:
train_image_paths = image_npy_paths[:train_length]

train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_captions))
train_dataset = train_dataset.map(lambda item1, item2: tf.numpy_function(
          create_dataset_data, [item1, item2], [tf.float32, tf.int64]),
          num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.map(
          set_dataset_shapes,
          num_parallel_calls=tf.data.AUTOTUNE)

train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(buffer_size=tf.data.AUTOTUNE)

# Decoder

In [None]:
class Decoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim):
    super(Decoder, self).__init__()
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.lstm2 = tf.keras.layers.LSTM(512,
                                   return_sequences=True,
                                   return_state=True)
    self.out_fc = tf.keras.layers.Dense(vocab_size)

  
  def call(self, x, hidden, features):
    x = self.embedding(x)  

    x, new_hidden, _= self.lstm2(x)

    x = tf.reshape(x, (-1, x.shape[2]))

    x = self.out_fc(x)

    return x, new_hidden

# Image Encoder

In [None]:
class ImageEncoder(keras.Model):
  def __init__(self, embedding_dim):
    super(ImageEncoder, self).__init__()
    self.out_fc = tf.keras.layers.Dense(embedding_dim,activation='relu')

  def call(self, x):
    x = self.out_fc(x)
    return x

In [None]:
Image_encoder = ImageEncoder(embedding_dim)
decoder = Decoder(tokenizer.vocabulary_size(), embedding_dim)

# Image Captioning Model

In [None]:
class ImageCaptioningModel(keras.Model):
  def __init__(
        self, encoder, decoder, units
    ):
    super().__init__()
    self.units = units
    self.encoder = encoder
    self.decoder = decoder
    self.loss_tracker = keras.metrics.Mean(name="loss")
    self.acc_tracker = keras.metrics.Mean(name="accuracy")

  def calculate_loss(self, y_true, y_pred, mask):
    loss = self.loss(y_true, y_pred)
    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask
    mask_reduced = tf.reduce_sum(mask)
    if mask_reduced == 0:
      return 0
    return tf.reduce_sum(loss) / mask_reduced

  def calculate_accuracy(self, y_true, y_pred, mask):
    accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=-1))
    accuracy = tf.math.logical_and(mask, accuracy)
    accuracy = tf.cast(accuracy, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)
    mask_reduced = tf.reduce_sum(mask)
    if mask_reduced == 0:
      return -1
    return tf.reduce_sum(accuracy) / mask_reduced

  @property
  def metrics(self):
    # We need to list our metrics here so the reset_states() can be
    # called automatically.
    return [self.loss_tracker, self.acc_tracker]

  def train_step(self, batch_data):
    batch_img, batch_seq = batch_data
    batch_loss = 0
    batch_acc = []

    batch_seq_size = batch_seq.get_shape().as_list()[0] # tf.shape(batch_seq)[0]
    hidden = tf.zeros((batch_seq_size, self.units))
    decoder_input = tf.expand_dims([word_to_index('<start>')] * batch_seq_size, 1)
    with tf.GradientTape() as tape:
      encoder_out = self.encoder(batch_img, training=True)
      for i in range(1, batch_seq.shape[1]):
        predictions, hidden = self.decoder(decoder_input, hidden, encoder_out)
        mask = tf.math.not_equal(batch_seq[:,i], 0)
        loss = self.calculate_loss(batch_seq[:,i], predictions, mask)
        # print(loss)
        batch_loss += loss
        acc = self.calculate_accuracy(batch_seq[:,i], predictions, mask)
        if acc != -1:
          batch_acc.append(acc) 

        # using teacher forcing
        decoder_input = tf.expand_dims(batch_seq[:, i], 1)
    
    total_loss = (batch_loss / int(batch_seq.shape[1]))
    total_acc = tf.reduce_mean(batch_acc)
    trainable_variables = self.encoder.trainable_variables + self.decoder.trainable_variables
    gradients = tape.gradient(batch_loss, trainable_variables)
    optimizer.apply_gradients(
      (grad, var) 
      for (grad, var) in zip(gradients, trainable_variables) 
      if grad is not None
    )
   
    self.loss_tracker.update_state(total_loss)
    self.acc_tracker.update_state(total_acc)

    
    return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()}

  def test_step(self, batch_data):
    batch_img, batch_seq = batch_data
    batch_loss = 0
    batch_acc = []

    encoder_out = self.encoder(batch_img)
    batch_seq_size = batch_seq.get_shape().as_list()[0] # tf.shape(batch_seq)[0]
    hidden = tf.zeros((batch_seq_size, self.units))
    decoder_input = tf.expand_dims([word_to_index('<start>')] * batch_seq_size, 1)

    for i in range(output_sequence_length):
      predictions, hidden= self.decoder(decoder_input, hidden, encoder_out)
      predicted_id = tf.random.categorical(predictions, 1)[:,0].numpy()
      mask = tf.math.not_equal(batch_seq[:,i], 0)
      loss = self.calculate_loss(batch_seq[:,i], predictions, mask)
      # print(loss)
      batch_loss += loss
      acc = self.calculate_accuracy(batch_seq[:,i], predictions, mask)
      if acc != -1:
        batch_acc.append(acc) 

      # using teacher forcing
      decoder_input = tf.expand_dims(predicted_id, 1)

    total_loss = (batch_loss / int(batch_seq.shape[1]))
    total_acc = tf.reduce_mean(batch_acc)

    
    self.loss_tracker.update_state(total_loss)
    self.acc_tracker.update_state(total_acc)

    return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()}


# Loss, Optimizer and Checkpoint

In [None]:
# Define the loss function
cross_entropy = keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction="none"
)
optimizer = keras.optimizers.Adam(learning_rate=0.001)


checkpoint_path = "/content/drive/MyDrive/checking"
ckpt = tf.train.Checkpoint(encoder=Image_encoder,
                           decoder=decoder,
                           optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

epoch_start = 0
if ckpt_manager.latest_checkpoint:
  epoch_start = int(ckpt_manager.latest_checkpoint.split('-')[-1])
  # restoring the latest checkpoint in checkpoint_path
  ckpt.restore(ckpt_manager.latest_checkpoint)

# Compile the model
model = ImageCaptioningModel(Image_encoder, decoder,512)
model.compile(optimizer=optimizer, loss=cross_entropy)

# Train

In [None]:
for epoch in range(epoch_start, EPOCHS):
    total_loss = 0
    total_acc = 0
    iter = 1

    for (batch, (img_tensor, target)) in enumerate(train_dataset):
        loss_acc = model.train_step((img_tensor, target))
        total_loss += loss_acc["loss"]
        total_acc += loss_acc["acc"]
        iter+=1
        
        if batch % 50 == 0:
            print(f'Epoch {epoch+1} Batch {batch} Loss {loss_acc["loss"]:.4f} Acc {loss_acc["acc"]:.4f}')

    ckpt_manager.save()

    total_loss = total_loss / iter
    total_acc = total_acc / iter
    print(f'Epoch {epoch+1} Loss {total_loss} Accuracy {total_acc}')

Epoch 16 Batch 0 Loss 2.4569 Acc 0.3135
Epoch 16 Batch 50 Loss 2.1334 Acc 0.2666
Epoch 16 Batch 100 Loss 2.1499 Acc 0.2655
Epoch 16 Batch 150 Loss 2.1770 Acc 0.2637
Epoch 16 Loss 2.169631004333496 Accuracy 0.26470842957496643
Epoch 17 Batch 0 Loss 2.1759 Acc 0.2643
Epoch 17 Batch 50 Loss 2.1635 Acc 0.2649
Epoch 17 Batch 100 Loss 2.1635 Acc 0.2644
Epoch 17 Batch 150 Loss 2.1737 Acc 0.2635
Epoch 17 Loss 2.157156229019165 Accuracy 0.2627442181110382
Epoch 18 Batch 0 Loss 2.1730 Acc 0.2638
Epoch 18 Batch 50 Loss 2.1659 Acc 0.2638
Epoch 18 Batch 100 Loss 2.1654 Acc 0.2636
Epoch 18 Batch 150 Loss 2.1716 Acc 0.2634
Epoch 18 Loss 2.1562631130218506 Accuracy 0.262020081281662
Epoch 19 Batch 0 Loss 2.1711 Acc 0.2637
Epoch 19 Batch 50 Loss 2.1662 Acc 0.2638
Epoch 19 Batch 100 Loss 2.1659 Acc 0.2633
Epoch 19 Batch 150 Loss 2.1707 Acc 0.2631
Epoch 19 Loss 2.155486822128296 Accuracy 0.2618359625339508
Epoch 20 Batch 0 Loss 2.1704 Acc 0.2633
Epoch 20 Batch 50 Loss 2.1670 Acc 0.2633
Epoch 20 Batch 100

# Test

In [28]:
test_image_paths = image_npy_paths[train_length:]
examples = len(test_image_paths)
test_captions = captions[train_length:examples + train_length]

test_dataset = tf.data.Dataset.from_tensor_slices((test_image_paths, test_captions))
test_dataset = test_dataset.map(lambda item1, item2: tf.numpy_function(
          create_dataset_data, [item1, item2], [tf.float32, tf.int64]),
          num_parallel_calls=tf.data.AUTOTUNE)

test_dataset = test_dataset.batch(BATCH_SIZE).prefetch(buffer_size=tf.data.AUTOTUNE)

In [29]:
total_loss = 0
total_acc = 0
iter = 1
for (batch, (img_tensor, target)) in enumerate(test_dataset):
  loss_acc = model.test_step((img_tensor, target))
  total_loss += loss_acc["loss"]
  total_acc += loss_acc["acc"]
  iter+=1
  if batch % 50 == 0:
    # average_batch_loss = batch_loss.numpy()/int(target.shape[1])
    print(f'Batch {batch} Loss {loss_acc["loss"]:.4f} Acc {loss_acc["acc"]:.4f}')
# storing the epoch end loss value to plot later
# loss_plot.append(total_loss / num_steps)

# if epoch % 5 == 0:
  # ckpt_manager.save()
total_loss = total_loss / iter
total_acc = total_acc / iter
print(f'Loss {total_loss} Accuracy {total_acc}')

Batch 0 Loss 5.7848 Acc 0.0469
Loss 4.842346668243408 Accuracy 0.040326740592718124
