import os
os.getcwd() 

In [9]:
import tensorflow as tf
import pandas as pd
import numpy as np
import os
import sys
import time
import cv2
from keras.preprocessing import sequence
import matplotlib.pyplot as plt
from tensorflow.contrib import rnn
#from app.fire import  fire
#from elapsedtimer import ElapsedTimer
from pathlib import Path
print('tensorflow version:',tf.__version__)
from IPython.core.debugger import Pdb
ipdb = Pdb()

tensorflow version: 1.13.1



<br>
This takes the CNN features of a video frames and passes it through Back to Back LSTMs(Sequence to Sequence<br>
Model) to generate the Caption for the Video<br>
 path_prj - Project directory.<br>
 feat_dir - Subdirectory containing the CNN features .. absolute path /path_prj/feat_dir/<br>
 cnn_feat_dim - Dimension of the feature vector from CNN for each image frame <br>
 video_steps -  No of image frames from each video. <br>
 out_steps  - Sequence length for the text caption. The output text sequence would be contained in 2o words.<br>
 learning rate - training hyper parameter<br>
 epoch     - Traing epochs<br>
 model_path - Absolute Path to save the model <br>
 mode - train/inference <br>


In [10]:
class VideoCaptioning:
    
    
    def __init__(self,path_prj,caption_file,feat_dir,
                 cnn_feat_dim=4096,h_dim=512,
                 lstm_steps=80,video_steps=80,
                 out_steps=20, frame_step=80,
                 batch_size=8,learning_rate=1e-4,
                 epochs=100,model_path=None,
                 mode='train'):
        self.dim_image = cnn_feat_dim
        self.dim_hidden = h_dim
        self.batch_size = batch_size
        self.lstm_steps = lstm_steps
        self.video_lstm_step=video_steps
        self.caption_lstm_step=out_steps
        self.path_prj = Path(path_prj)
        self.mode = mode
        if mode == 'train':
            self.train_text_path = self.path_prj / caption_file
            self.train_feat_path = self.path_prj / feat_dir
        else:
            self.test_text_path = self.path_prj / caption_file
            self.test_feat_path = self.path_prj / feat_dir
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.frame_step = frame_step
        self.model_path = model_path
    def build_model(self):

        # Defining the weights associated with the Network
        with tf.device('/cpu:0'): 
            self.word_emb = tf.Variable(tf.random_uniform([self.n_words, self.dim_hidden], -0.1, 0.1), name='word_emb')
        self.lstm1 = tf.nn.rnn_cell.BasicLSTMCell(self.dim_hidden, state_is_tuple=False)
        self.lstm2 = tf.nn.rnn_cell.BasicLSTMCell(self.dim_hidden, state_is_tuple=False)
        self.encode_W = tf.Variable( tf.random_uniform([self.dim_image,self.dim_hidden], -0.1, 0.1), name='encode_W')
        self.encode_b = tf.Variable( tf.zeros([self.dim_hidden]), name='encode_b')
        
        self.word_emb_W = tf.Variable(tf.random_uniform([self.dim_hidden,self.n_words], -0.1,0.1), name='word_emb_W')
        self.word_emb_b = tf.Variable(tf.zeros([self.n_words]), name='word_emb_b')
        
        # Placeholders 
        video = tf.placeholder(tf.float32, [self.batch_size, self.video_lstm_step, self.dim_image])
        video_mask = tf.placeholder(tf.float32, [self.batch_size, self.video_lstm_step])
        caption = tf.placeholder(tf.int32, [self.batch_size, self.caption_lstm_step+1])
        caption_mask = tf.placeholder(tf.float32, [self.batch_size, self.caption_lstm_step+1])
        video_flat = tf.reshape(video, [-1, self.dim_image])
        image_emb = tf.nn.xw_plus_b( video_flat, self.encode_W,self.encode_b )         
        image_emb = tf.reshape(image_emb, [self.batch_size, self.lstm_steps, self.dim_hidden])
        state1 = tf.zeros([self.batch_size, self.lstm1.state_size])
        state2 = tf.zeros([self.batch_size, self.lstm2.state_size])
        padding = tf.zeros([self.batch_size, self.dim_hidden])
        probs = []
        loss = 0.0

        #  Encoding Stage 
        for i in range(0, self.video_lstm_step):
            if i > 0:
                tf.get_variable_scope().reuse_variables()
            with tf.variable_scope("LSTM1"):
                output1, state1 = self.lstm1(image_emb[:,i,:], state1)
            with tf.variable_scope("LSTM2"):
                output2, state2 = self.lstm2(tf.concat([padding, output1],1), state2)

        #  Decoding Stage  to generate Captions 
        for i in range(0, self.caption_lstm_step):
            with tf.device("/cpu:0"):
                current_embed = tf.nn.embedding_lookup(self.word_emb, caption[:, i])
            tf.get_variable_scope().reuse_variables()
            with tf.variable_scope("LSTM1"):
                output1, state1 = self.lstm1(padding, state1)
            with tf.variable_scope("LSTM2"):
                output2, state2 = self.lstm2(tf.concat([current_embed, output1],1), state2)
            labels = tf.expand_dims(caption[:, i+1], 1)
            indices = tf.expand_dims(tf.range(0, self.batch_size, 1), 1)
            concated = tf.concat([indices, labels],1)
            onehot_labels = tf.sparse_to_dense(concated, tf.stack([self.batch_size, self.n_words]), 1.0, 0.0)
            logit_words = tf.nn.xw_plus_b(output2, self.word_emb_W, self.word_emb_b)
        # Computing the loss     
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=logit_words,labels=onehot_labels)
            cross_entropy = cross_entropy * caption_mask[:,i]
            probs.append(logit_words)
            current_loss = tf.reduce_sum(cross_entropy)/self.batch_size
            loss = loss + current_loss
        with tf.variable_scope(tf.get_variable_scope(),reuse=tf.AUTO_REUSE):
            train_op = tf.train.AdamOptimizer(self.learning_rate).minimize(loss)    
        return loss, video, video_mask, caption, caption_mask, probs,train_op
    
    def build_generator(self):
        with tf.device('/cpu:0'):
        self.word_emb = tf.Variable(tf.random_uniform([self.n_words, self.dim_hidden], -0.1, 0.1), name='word_emb')
        self.lstm1 = tf.nn.rnn_cell.BasicLSTMCell(self.dim_hidden, state_is_tuple=False)
        self.lstm2 = tf.nn.rnn_cell.BasicLSTMCell(self.dim_hidden, state_is_tuple=False)
        self.encode_W = tf.Variable( tf.random_uniform([self.dim_image,self.dim_hidden], -0.1, 0.1), name='encode_W')
        self.encode_b = tf.Variable( tf.zeros([self.dim_hidden]), name='encode_b')
        self.word_emb_W = tf.Variable(tf.random_uniform([self.dim_hidden,self.n_words], -0.1,0.1), name='word_emb_W')
        self.word_emb_b = tf.Variable(tf.zeros([self.n_words]), name='word_emb_b')
        video = tf.placeholder(tf.float32, [1, self.video_lstm_step, self.dim_image])
        video_mask = tf.placeholder(tf.float32, [1, self.video_lstm_step])
        video_flat = tf.reshape(video, [-1, self.dim_image])
        image_emb = tf.nn.xw_plus_b(video_flat, self.encode_W, self.encode_b)
        image_emb = tf.reshape(image_emb, [1, self.video_lstm_step, self.dim_hidden])
        state1 = tf.zeros([1, self.lstm1.state_size])
        state2 = tf.zeros([1, self.lstm2.state_size])
        padding = tf.zeros([1, self.dim_hidden])
        generated_words = []
        probs = []
        embeds = []
        for i in range(0, self.video_lstm_step):
            if i > 0:
                tf.get_variable_scope().reuse_variables()
            with tf.variable_scope("LSTM1"):
                output1, state1 = self.lstm1(image_emb[:, i, :], state1)
            with tf.variable_scope("LSTM2"):
                output2, state2 = self.lstm2(tf.concat([padding, output1],1), state2)
        for i in range(0, self.caption_lstm_step):
            tf.get_variable_scope().reuse_variables()
            if i == 0:
                with tf.device('/cpu:0'):
                    current_embed = tf.nn.embedding_lookup(self.word_emb, tf.ones([1], dtype=tf.int64))
            with tf.variable_scope("LSTM1"):
                output1, state1 = self.lstm1(padding, state1)
            with tf.variable_scope("LSTM2"):
                output2, state2 = self.lstm2(tf.concat([current_embed, output1],1), state2)
            logit_words = tf.nn.xw_plus_b( output2, self.word_emb_W, self.word_emb_b)
            max_prob_index = tf.argmax(logit_words, 1)[0]
            generated_words.append(max_prob_index)
            probs.append(logit_words)
            with tf.device("/cpu:0"):
                current_embed = tf.nn.embedding_lookup(self.word_emb, max_prob_index)
                current_embed = tf.expand_dims(current_embed, 0)
            embeds.append(current_embed)
        return video, video_mask, generated_words, probs, embeds     