In [2]:
import os
from pathlib import Path

import pandas as pd
import numpy as np
import cv2
import matplotlib.pyplot as plt
import pickle
from tqdm import tqdm
import torch
from torch.utils.data import Dataset, DataLoader,random_split
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.models as models

In [3]:
LABELS_PATH = '../data/MSVD_label_final.csv'
DATA_PATH = '../data/MSVD/training_data/feat/'
MODEL_PATH = '../model/'

## Load utils variable

In [4]:
label_final_df = pd.read_csv(LABELS_PATH)
embedding = np.load(MODEL_PATH+'MSVD_embedding.npy')
word2idx = pickle.load(open(MODEL_PATH+'MSVD_word2idx.pkl', "rb"))
idx2word = pickle.load(open(MODEL_PATH+'MSVD_idx2word.pkl', "rb"))

## Load Model

In [6]:
def get_features(vid, DATA_PATH):
    filename = DATA_PATH+ f'{vid}.npy'
    raw_x = torch.tensor(np.load(filename)).float()
    return raw_x.unsqueeze(0)

In [7]:
class Encoder(nn.Module):
    '''
    Take sequence of video's resnet50 features as input
    
    note: batch_first=True does not apply to hidden or cell states
    '''
    def __init__(self, input_dim, hidden_dim):
        super(Encoder, self).__init__()
        self.hidden_dim = hidden_dim
        self.rnn = nn.GRU(input_dim, hidden_dim, num_layers=1, batch_first=True)
                
    def forward(self, x):
        '''
        x: PackedSequence
        '''
        outputs, hidden = self.rnn(x) 
        return hidden
    
class Decoder(nn.Module):
    '''
    Decode Hidden State from Encoder to sentence (sequence of texts)
    
    note: batch_first=True does not apply to hidden or cell states
    '''
    def __init__(self, weights, emb_dim, hidden_dim, out_dim):
        super(Decoder, self).__init__()
        self.hidden_dim = hidden_dim
        self.emb_dim = emb_dim
        self.out_dim = out_dim
        
        # layers
        #self.emb = nn.Embedding.from_pretrained(torch.tensor(weights), padding_idx=0, freeze=False)
        self.emb = nn.Embedding(out_dim, emb_dim, padding_idx=0)
        self.rnn = nn.GRU(emb_dim + hidden_dim, hidden_dim, num_layers=1, batch_first=True)
        self.fc_out1 = nn.Linear(emb_dim + hidden_dim * 2, out_dim)
        self.fc_out2 = nn.Linear(out_dim, out_dim)

                
    def forward(self, word_input, encoded_context, hidden):
        '''
        word_input: (batch_size)
        encoded_context: (1, batch_size, hidden_dim)
        hidden: (1, batch_size, hidden_dim)
        '''
        # 1 word at a time
    
        word_input = self.emb(word_input) # dim (batch, emb_dim) 
        emb_input = torch.cat([word_input, encoded_context.squeeze(0)], dim=1)
        output, hidden = self.rnn(emb_input.unsqueeze(1).float(), hidden)
        prediction = F.relu(self.fc_out1(torch.cat([word_input, encoded_context.squeeze(0), hidden.squeeze(0)], dim=1).float()))
        prediction = self.fc_out2(prediction)
        return prediction, hidden 

    
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    
    def forward(self, x, y, teacher_forcing_ratio=0.8):
        '''
        x: PackedSequence
        y: (batch_size, sentence_len(padded))
        hidden: (1, batch_size, hidden_dim)
        '''
        batch_size = y.size(0)
        sentence_len = y.size(1)
        vocab_size = self.decoder.out_dim
        
        ##############
        # Initialize #
        ##############
        # tensor for final outputs
        outputs = torch.zeros(batch_size, sentence_len, vocab_size).to(self.device)
        # last hidden state of the encoder is the context
        encoded_context = self.encoder(x) # (1, batch_size, hidden_dim)
        # first hidden state 
        hidden = encoded_context # (1, batch_size, hidden_dim)
        # first input '<START>'
        word_input = y[:, 0] # (batch_size)
        for t in range(1, sentence_len):
            #insert input token embedding, previous hidden state and the context state
            #receive output tensor (predictions) and new hidden state
            output, hidden = self.decoder(word_input, encoded_context, hidden)
            
            #place predictions in a tensor holding predictions for each token
            outputs[:, t, :] = output
            
            #decide if we are going to use teacher forcing or not
            teacher_force = np.random.rand() < teacher_forcing_ratio
            
            #get the highest predicted token from our predictions
            top1 = output.argmax(1) # dim: (batch_size)
            
            #if teacher forcing, use actual next token as next input
            #if not, use predicted token
            word_input = y[:, t] if teacher_force else top1

        return outputs
    

In [8]:
N_VOCAB = len(word2idx)
EMB_DIM = 300
INPUT_DIM = 4096 # resnet50 fc dim
HIDDEN_DIM = 512

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
encoder = Encoder(INPUT_DIM, HIDDEN_DIM)
decoder = Decoder(embedding, EMB_DIM, HIDDEN_DIM, N_VOCAB)
model = Seq2Seq(encoder, decoder, device)
model.to(device)

Seq2Seq(
  (encoder): Encoder(
    (rnn): GRU(4096, 512, batch_first=True)
  )
  (decoder): Decoder(
    (emb): Embedding(1504, 300, padding_idx=0)
    (rnn): GRU(812, 512, batch_first=True)
    (fc_out1): Linear(in_features=1324, out_features=1504, bias=True)
    (fc_out2): Linear(in_features=1504, out_features=1504, bias=True)
  )
)

In [9]:
model.load_state_dict(torch.load(MODEL_PATH+'MSVD_seq2seq_v1.pt', map_location=torch.device('cpu')) )
model.eval()

Seq2Seq(
  (encoder): Encoder(
    (rnn): GRU(4096, 512, batch_first=True)
  )
  (decoder): Decoder(
    (emb): Embedding(1504, 300, padding_idx=0)
    (rnn): GRU(812, 512, batch_first=True)
    (fc_out1): Linear(in_features=1324, out_features=1504, bias=True)
    (fc_out2): Linear(in_features=1504, out_features=1504, bias=True)
  )
)

In [20]:
def gen_seq(model, x, start_token, seq_length):
    model.eval()
    with torch.no_grad():
        word_input = torch.tensor(word2idx[start_token]).unsqueeze(0)
        context = model.encoder(x)
        hidden = context
        outputs = []
        ## generate a sequence!
        for i in range(seq_length):
            output, hidden = model.decoder(word_input, context, hidden)
            word = output.argmax(1)
            actual_word = idx2word[word.item()]
            if actual_word == '<END>':
                return ' '.join(outputs)
            outputs.append(actual_word)
            word_input = word
        return ' '.join(outputs)

    
def gen_greedy_caption(vid_id, model, DATA_PATH, max_length=10):
    context = get_features(vid_id, DATA_PATH)
    text = gen_seq(model, context, '<START>', max_length)
    return text

In [87]:
label_final_df.iloc[9754]

caption     a little girl putting ingredients in a food pr...
video_id                                PQbkdRbir0M_45_53.avi
sent_len                                                    9
Name: 9754, dtype: object

In [89]:
vid_id = 'PQbkdRbir0M_45_53.avi'
gen_greedy_caption(vid_id, model, DATA_PATH)

'a girl is <UNK>'

# Make Gif file

In [90]:
VIDEO_PATH = '../data/MSVD/training_data/video/'
SAMPLE_PATH = '../sample/'

In [91]:
from moviepy.editor import VideoFileClip
#from tkinter.filedialog import *

#video_file = askopenfilename()
clip = VideoFileClip(VIDEO_PATH+vid_id)
clip.write_gif(SAMPLE_PATH+vid_id+'.gif', fps=10)

t:   0%|          | 0/81 [00:00<?, ?it/s, now=None]

MoviePy - Building file ../sample/PQbkdRbir0M_45_53.avi.gif with imageio.


                                                            