<a href="https://colab.research.google.com/github/VSSARATHI/Video-Captioning-Using-Attention-based-Bi-directional-LSTM/blob/master/model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

%tensorflow_version 2.x
import tensorflow as tf

In [None]:
video_vector = np.load("/content/drive/My Drive/Colab Notebooks/CV_Project/x_train1.npy")
train_captions = np.load("/content/drive/My Drive/Colab Notebooks/CV_Project/y_train1.npy")
name = np.load("/content/drive/My Drive/Colab Notebooks/CV_Project/videos1.npy")

In [None]:
def calc_max_length(tensor):
    return max(len(t) for t in tensor)

top_k = 10000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')


In [None]:
tokenizer.fit_on_texts(train_captions)
train_seqs = tokenizer.texts_to_sequences(train_captions)

tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

train_seqs = tokenizer.texts_to_sequences(train_captions)

cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')

max_length = calc_max_length(train_seqs)


In [None]:
video_train, video_val, cap_train, cap_val = train_test_split(video_vector,
                                                                    cap_vector,
                                                                    test_size=0.2,
                                                                    random_state=0)

len(video_train), len(cap_train), len(video_val), len(cap_val)


(8000, 8000, 2000, 2000)

In [None]:
print(video_train.shape)
len(cap_train[0])

(8000, 30, 4096)


39

In [None]:
BATCH_SIZE = 64
BUFFER_SIZE = 1000
steps_per_epoch = len(video_train)//BATCH_SIZE

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((video_train, cap_train))

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
class Encoder(tf.keras.Model):
  def __init__(self, enc_units, batch_sz, max_frames):

    super(Encoder, self).__init__()

    self.batch_sz = batch_sz

    self.enc_units = enc_units

    self.max_frames = max_frames

    self.LSTM = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(self.enc_units, return_sequences = True))

  def call(self, x, hidden):

    output = self.LSTM(x, initial_state = hidden)

    return output

  def initialize_hidden_state(self, batch_sz):

    c_0 = tf.convert_to_tensor(np.zeros([batch_sz, self.enc_units]).astype(np.float32))

    h_0 = tf.convert_to_tensor(np.zeros([batch_sz, self.enc_units]).astype(np.float32))

    c_1 = tf.convert_to_tensor(np.zeros([batch_sz, self.enc_units]).astype(np.float32))

    h_1 = tf.convert_to_tensor(np.zeros([batch_sz, self.enc_units]).astype(np.float32))

    initial_state = [h_0, c_0, h_1, c_1]
    
    return initial_state

In [None]:
units = 1024
BATCH_SIZE = 64
MAX_FRAMES = 30
encoder = Encoder(units, BATCH_SIZE, MAX_FRAMES)

example_input_batch = tf.random.uniform(shape = [64,30,4096])
sample_hidden = encoder.initialize_hidden_state(BATCH_SIZE)
sample_output = encoder(example_input_batch, sample_hidden)
print ('Encoder output shape: (batch size, sequence length, units) {}'.format(sample_output.shape))

Encoder output shape: (batch size, sequence length, units) (64, 30, 2048)


In [None]:
class BahdanauAttention(tf.keras.layers.Layer):
  def __init__(self, units):

    super(BahdanauAttention, self).__init__()

    self.W1 = tf.keras.layers.Dense(units)

    self.W2 = tf.keras.layers.Dense(units)
    
    self.V = tf.keras.layers.Dense(1)

  def call(self, hidden, sample_output):

    state_hidden = tf.expand_dims(hidden, 1)

    score = self.V(tf.nn.tanh(self.W1(state_hidden) + self.W2(sample_output)))

    attention_weights = tf.nn.softmax(score, axis=1)

    context_vector = attention_weights * sample_output

    context_vector = tf.reduce_sum(context_vector, axis=1)

    return context_vector, attention_weights

In [None]:
attention_layer = BahdanauAttention(10)
attention_result, attention_weights = attention_layer(sample_output[:,-1],sample_output)

print("Attention result shape: (batch size, units) {}".format(attention_result.shape))
print("Attention weights shape: (batch_size, sequence_length, 1) {}".format(attention_weights.shape))

Attention result shape: (batch size, units) (64, 2048)
Attention weights shape: (batch_size, sequence_length, 1) (64, 30, 1)


In [None]:
class Decoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
    
    super(Decoder, self).__init__()

    self.batch_sz = batch_sz

    self.dec_units = dec_units

    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)

    self.LSTM = tf.keras.layers.LSTM(self.dec_units,
                                   return_sequences=True,
                                   return_state=True)
    
    self.fc = tf.keras.layers.Dense(vocab_size)

    self.attention = BahdanauAttention(self.dec_units)

  def call(self, x, hidden, enc_output):

    context_vector, attention_weights = self.attention(hidden, enc_output)

    x = self.embedding(x)

    x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

    output, state_h, state_c = self.LSTM(x)

    output = tf.reshape(output, (-1, output.shape[2]))

    state = [state_h, state_c]

    x = self.fc(output)

    return x, state_h, attention_weights

In [None]:
vocab_size = top_k + 1
embedding_dim = 256
units = 2048
decoder = Decoder(vocab_size, embedding_dim, units, BATCH_SIZE)

sample_decoder_output, hidden , _ = decoder(tf.random.uniform((BATCH_SIZE, 1)),sample_output[:,-1], sample_output)

print ('Decoder output shape: (batch_size, vocab size) {}'.format(sample_decoder_output.shape))
print ('Decoder state shape: (batch_size, vocab size) {}'.format(hidden.shape))

Decoder output shape: (batch_size, vocab size) (64, 10001)
Decoder state shape: (batch_size, vocab size) (64, 2048)


In [None]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_mean(loss_)

In [None]:
checkpoint_path = "/content/drive/My Drive/Colab Notebooks/CV_Project/Saved_Models/checkpoints"
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer = optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=20)



In [None]:
start_epoch = 0
if ckpt_manager.latest_checkpoint:
  start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])
  ckpt.restore(ckpt_manager.latest_checkpoint)

In [None]:
start_epoch

20

In [None]:
@tf.function
def train_step(inp, targ, enc_hidden):
  loss = 0

  with tf.GradientTape() as tape:
    enc_output= encoder(inp, enc_hidden)
    hidden = enc_output[:,-1]
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * BATCH_SIZE, 1)

    for t in range(1, targ.shape[1]):
      
      predictions, hidden, _ = decoder(dec_input, hidden, enc_output)

      loss += loss_function(targ[:, t], predictions)

      dec_input = tf.expand_dims(targ[:, t], 1)

  batch_loss = (loss / int(targ.shape[1]))

  variables = encoder.trainable_variables + decoder.trainable_variables

  gradients = tape.gradient(loss, variables)

  optimizer.apply_gradients(zip(gradients, variables))

  return batch_loss


In [None]:
from tqdm import notebook
import time
EPOCHS = 20

for epoch in range(start_epoch, EPOCHS):
  start = time.time()

  enc_hidden = encoder.initialize_hidden_state(BATCH_SIZE)
  total_loss = 0

  for (batch, (inp, targ)) in notebook.tqdm(enumerate(dataset.take(steps_per_epoch))):
    batch_loss = train_step(inp, targ, enc_hidden)
    total_loss += batch_loss

    if batch % 30 == 0:
      print('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1,
                                                   batch,
                                                   batch_loss.numpy()))
  ckpt_manager.save()
  print('Epoch {} Loss {:.4f}'.format(epoch + 1,
                                      total_loss / steps_per_epoch))
  print('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

In [None]:
tf.random.set_seed(10)
def evaluate(video,max_length):
    video = tf.convert_to_tensor(video)
    enc_hidden = encoder.initialize_hidden_state(1)
    features = encoder(video,enc_hidden)
    hidden = features[:,-1]
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 1)
    result = []

    for i in range(max_length):
        dec_input, hidden, attention_weights = decoder(dec_input, hidden, features)
        dec_input = tf.random.categorical(dec_input, 1).numpy()
        result.append(tokenizer.index_word[dec_input[0].item()])

        if tokenizer.index_word[dec_input[0].item()] == '<end>':
            return ' '.join(result[:-1])

    return ' '.join(result)

In [None]:
for i in range(10):
  rid = np.random.randint(0,10000)
  video = np.zeros(shape = (1,30,4096))
  video[0]=video_vector[rid]
  real_caption = ' '.join([tokenizer.index_word[i] for i in cap_vector[rid] if i not in [0]][1:-1])
  result = evaluate(video,39)

  print ('Real Caption:', real_caption)
  print ('Prediction Caption:', result)
  print('Video Name:',name[rid])
  print()



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Real Caption: a person is slicing a stack of tortillas
Prediction Caption: a person is prepair jusee
Video Name: j2Dhf-xFUxU

Real Caption: men in uniforms are marching down the road
Prediction Caption: a group is marching on trampolines
Video Name: s7znbqra118

Real Caption: the gang of people are perfoming
Prediction Caption: the band is performing onstage
Video Name: bJJndejsUWc

Real Caption: a boy putting a flat rock into his mouth
Prediction Caption: a young man removes the something
Video Name: r4qv_BNlQNk

Real Caption: water put to boil on a cooking range
Prediction Caption: the woman makes a kettle
Video Name: NjCqtzZ3OtU

Real Caption: two men are flying a remote controlled plane

In [None]:
references = []
hypothesis = [] 

In [None]:
for j in notebook.tqdm(range(len(references),2000)):
  video = np.zeros(shape = (1,30,4096))
  video[0] = video_val[j]
  h = evaluate(video,39)
  references.append(' '.join([tokenizer.index_word[i] for i in cap_val[j] if i not in [0]][1:-1]))
  hypothesis.append(h)

HBox(children=(FloatProgress(value=0.0, max=1000.0), HTML(value='')))




In [None]:
from nltk.translate.bleu_score import corpus_bleu
print('BLEU-1: %f' % corpus_bleu(references, hypothesis, weights=(1.0, 0, 0, 0)))
print('BLEU-2: %f' % corpus_bleu(references, hypothesis, weights=(0.5, 0.5, 0, 0)))
print('BLEU-3: %f' % corpus_bleu(references, hypothesis, weights=(0.3, 0.3, 0.3, 0)))
print('BLEU-4: %f' % corpus_bleu(references, hypothesis, weights=(0.25, 0.25, 0.25, 0.25)))

Corpus/Sentence contains 0 counts of 2-gram overlaps.
BLEU scores might be undesirable; use SmoothingFunction().


BLEU-1: 0.333395
BLEU-2: 0.577404
BLEU-3: 0.719263
BLEU-4: 0.759871


In [None]:
from nltk.translate.bleu_score import corpus_bleu
print('BLEU-1: %f' % corpus_bleu(references, hypothesis, weights=(1.0, 0, 0, 0)))
print('BLEU-2: %f' % corpus_bleu(references, hypothesis, weights=(0.5, 0.5, 0, 0)))
print('BLEU-3: %f' % corpus_bleu(references, hypothesis, weights=(0.3, 0.3, 0.3, 0)))
print('BLEU-4: %f' % corpus_bleu(references, hypothesis, weights=(0.25, 0.25, 0.25, 0.25)))

Corpus/Sentence contains 0 counts of 2-gram overlaps.
BLEU scores might be undesirable; use SmoothingFunction().


BLEU-1: 0.355469
BLEU-2: 0.596212
BLEU-3: 0.733230
BLEU-4: 0.772148


In [None]:
import math
def bleu_score(references, hypothesis):
  r = references.split(' ')
  h = hypothesis.split(' ')
  brevity_penalty = min(1,math.exp(1-len(r)/len(h)))
  n_gram_overlap = 1
  precision = [0]*4
  for n in range(1,5):
    if len(h)<n:
      break
    count = 0
    ngram = []
    th = []
    for i in range(len(r)-n+1):
      ngram.append(' '.join(r[i:i+n]))
    for i in range(len(h)-n+1):
      th.append(' '.join(h[i:i+n]))


    ngram = list(dict.fromkeys(ngram))
    for i in range(len(ngram)):
      a = min(ngram.count(ngram[i]),th.count(ngram[i]))
      count = count + a
    precision[n-1] = count/(len(h)-n+1)
    n_gram_overlap *= precision[n-1]
    # print('precision',n,precision[n-1])
  bleu = brevity_penalty * n_gram_overlap**0.25
  # print('BLEU Score :',bleu)
  return precision[0],  precision[1], precision[2], precision[3], bleu
  

print(bleu_score('The NASA Opportunity rover is battling a massive dust storm on Mars .','A NASA rover is fighting a massive storm on Mars .'))

(0.8181818181818182, 0.5, 0.2222222222222222, 0.125, 0.27221791225495623)


In [None]:
bleu_score(references[33],hypothesis[33])

(0.4166666666666667, 0.2727272727272727, 0.1, 0.0, 0.0)

In [None]:
def bleu(r, h):
  p = [0]*4
  b = 0
  count = len(r)
  for i in range(len(r)):
    p1, p2, p3, p4, bt = bleu_score(r[i],h[i])
    p[0] += p1
    p[1] += p2
    p[2] += p3
    p[3] += p4
    if bt == 0:
      count -= 1
    else :
      b += bt
  return p[0]/len(r), p[1]/len(r), p[2]/len(r), p[3]/len(r), b/count

p1, p2, p3, p4 ,b = bleu(references, hypothesis)

In [None]:
print('Precision 1(1 gram) :',p1)
print('Precision 2(2 gram) :',p2)
print('Precision 3(3 gram) :',p3)
print('Precision 4(4 gram) :',p4)
print('BLEU Score :',b)

Precision 1(1 gram) : 0.3470023554876495
Precision 2(2 gram) : 0.1364990981240981
Precision 3(3 gram) : 0.07230952380952382
Precision 4(4 gram) : 0.04366666666666666
BLEU Score : 0.6072045428935147
