In [2]:
# Run onluy the first time!
# from pyunpack import Archive
# Archive('Images.zip').extractall('Assignment4_Images')

In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import tarfile
import pandas as pd
import os
import re
from torch.utils.data import Dataset, DataLoader, ConcatDataset, random_split
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import torch.optim as optim
from sklearn.metrics import confusion_matrix
from sklearn.decomposition import PCA
from io import StringIO
from PIL import Image
from torchtext.vocab import GloVe
from torchtext.data import Field
from torchtext.data.metrics import bleu_score

In [7]:
torch.__version__

'1.5.0'

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [5]:
class DatasetClass(Dataset):
    
    def __init__(self, folder, image_list, captions):
        
        self.folder = folder
        self.captions = captions
        self.size = 5*len(image_list)
        self.image_list = []
        for image in image_list:
            all_images = [image + '#' + str(i) for i in range(5)]
            self.image_list.extend(all_images)
        
    def __getitem__(self, idx):     
        
        image_name = self.image_list[idx]
        caption = self.captions.loc[image_name, 'Caption']
        img = Image.open(self.folder + image_name[:-2]).resize((227, 227))
        trans = transforms.ToTensor()
        return trans(img), caption
      
    def __len__(self):
        
        return self.size

In [6]:
def train_test_loader(directory, image_list, captions, train_fraction=0.8, num_workers=0, batch_size=32):

    dataset = DatasetClass(directory, image_list, captions)
    
    N = dataset.size
    train_size = int(N*train_fraction)
    test_size = N - train_size

    train_data, test_data = torch.utils.data.random_split(dataset, [train_size, test_size])

    trainloader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    testloader = DataLoader(test_data, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    
    return trainloader, testloader, train_size, test_size

In [7]:
with open('captions.txt') as f:
    captions = pd.read_csv(StringIO(f.read()), sep='\t', header=None, names=['Image', 'Caption']) #.set_index('Image', drop=True)
    
with open('image_names.txt') as f:
    names = list(map(lambda x: x.rstrip(), f.readlines()))

In [8]:
trainloader, testloader, train_size, test_size = train_test_loader(directory='Assignment4_Data/Images/', image_list=names, captions=captions)

#### GloVe Representation

Available GloVe Representations: 

1. glove.42B.300d 
2. glove.840B.300d 
3. glove.twitter.27B.25d 
4. glove.twitter.27B.50d 
5. glove.twitter.27B.100d 
6. glove.twitter.27B.200d 
7. glove.6B.50d 
8. glove.6B.100d 
9. glove.6B.200d 
10. glove.6B.300d

In [9]:
embedding_glove = GloVe(name='6B', dim=100)

In [32]:
text_field = Field(tokenize='basic_english', lower=True, eos_token='<EOS>', init_token='<SOS>')
preprocessed_text = captions['Caption'].apply(lambda x: text_field.preprocess(x))

In [33]:
text_field.build_vocab(preprocessed_text, vectors=embedding_glove)
embedding_trained = nn.Embedding.from_pretrained(text_field.vocab.vectors)

In [36]:
vocab_tokens = np.array(text_field.vocab.itos)

In [None]:
captions['Image'] = captions['Image'].apply(lambda x: x[:-2])

In [31]:
class NetVLAD(nn.Module):
    
    def __init__(self, k):
        super(NetVLAD, self).__init__()
        
        # CNN
        self.c1 = nn.Conv2d(3, 96, 11, stride=4)
        self.mp1 = nn.MaxPool2d(3, stride=2)
        self.c2 = nn.Conv2d(96, 256, 5)
        self.mp2 = nn.MaxPool2d(3, stride=2)
        self.c3 = nn.Conv2d(256, 384, 3)
        self.c4 = nn.Conv2d(384, 384, 3)
        self.c5 = nn.Conv2d(384, 256, 3, stride=3)

        # NetVLAD
        self.K = k
        self.nv_conv = nn.Conv2d(256, k, 1)
        self.nv_soft_ass = nn.Softmax2d()

        # NetVLAD Parameter
        self.c = nn.Parameter(torch.Tensor(self.K, 256))
        
        # Flatten to get h
        self.flat = nn.Flatten(1, -1)

    def forward(self, x):
        
        # CNN
        x = self.mp1(F.relu(self.c1(x)))
        x = self.mp2(F.relu(self.c2(x)))
        x = self.relu(self.c5(F.relu(self.c4(F.relu(self.c3(x))))))

        # NetVLAD Step 1
        a = self.nv_soft_ass(self.nv_conv(x))

        # NetVLAD Step 2
        for k in range(self.K):
            a_k = a[:, k, :, :]
            c_k = self.c[k, :]
            temp = (x - c_k.reshape(1, -1, 1, 1))*a_k.unsqueeze(1)
            z_k = torch.sum(temp, axis=(2, 3))
            if k==0:
                Z = z_k.unsqueeze(1)
            else:
                Z = torch.cat((Z, z_k.unsqueeze(1)), 1)
        
        # Flatten
        Z = self.flatten(Z)

        return Z

In [12]:
class RNN(nn.Module):

    def __init__(self, embed_size, hidden_size, output_size, embedding_pre_trained):
        
        super(RNNDecoder, self).__init__()
        self.hidden_size = hidden_size
        
        # Pre-trained Word Embedding of all the words is used
        self.embedding = nn.Embedding.from_pretrained(embedding_pre_trained)
        
        # Input to the RNN is word embeddings of a word
        self.RNN = nn.RNN(input_size=embed_size, hidden_size=hidden_size) 
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)  
        
    def forward(self, input_vec, hidden_vec):
        '''
        Parameters:
        ------------
        input_vec  - tensor of index of word/token. Example: torch.LongTensor([[0]]) for sos_token
        hidden_vec - train_image or output from previous RNN cell
        '''
        embedded_input_vec = self.embedding(input_vec)
        output_vec, hidden_vec = self.RNN(embedded_input_vec, hidden_vec)
        output_vec = self.softmax(self.out(output_vec[0]))
        return output_vec, hidden_vec      
    
    

In [None]:
class LSTM(nn.Module):

    def __init__(self, embed_size, hidden_size, output_size, embedding_pre_trained):
        
        super(RNNDecoder, self).__init__()
        self.hidden_size = hidden_size
        
        # Pre-trained Word Embedding of all the words is used
        self.embedding = nn.Embedding.from_pretrained(embedding_pre_trained)
        
        # Input to the RNN is word embeddings of a word
        self.RNN = nn.LSTM(input_size=embed_size, hidden_size=hidden_size) 
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)  
        
    def forward(self, input_vec, hidden_vec):
        '''
        Parameters:
        ------------
        input_vec  - tensor of index of word/token. Example: torch.LongTensor([[0]]) for sos_token
        hidden_vec - train_image or output from previous RNN cell
        '''
        embedded_input_vec = self.embedding(input_vec)
        output_vec, hidden_vec = self.RNN(embedded_input_vec, hidden_vec)
        output_vec = self.softmax(self.out(output_vec[0]))
        return output_vec, hidden_vec      
    
    

In [9]:
def train_one_image(train_image, image_caption, encoder_obj, decoder_obj, decoder_hidden_size, encoder_optim, decoder_optim, loss_func):
    '''
    Parameters:
    -----------
    train_image     - Images stored in batches using DataLoader
    image_caption   - Caption of the image (in the same word embedding representation used in )

    
    '''
    # Start of sentence and End of Sentence token
    eos_token = 1
    sos_token = 0
    
    
    # Length of the image caption
    caption_length = image_caption.size(0)
    
    # Setting gradients from previous backpropagation to zero
    encoder_optim.zero_grad()
    decoder_optim.zero_grad()
    
    # TO BE CHECKED!!!!!!!!!!!!!!
    encoder_output = encoder_obj(train_image).view(1, 1, decoder_hidden_size)    
    decoder_input = torch.tensor([[sos_token]], device=device)
    decoder_hidden = encoder_output
    
    loss = 0
    for i in range(caption_length):
        decoder_output, decoder_hidden = decoder_obj(decoder_input, decoder_hidden) 
        max_val, max_ind = decoder_output.topk(1)  # Choosing the word with maximum probability 
        decoder_input = max_ind 
        loss += loss_func(decoder_output, image_caption[i])
        if decoder_input.item() == eos_token:
            break
    loss.backward()
    encoder_optim.step()
    decoder_optim.step()
    
    return loss.item()/caption_length
    

In [17]:
def train(lr, trainloader_images, encoder_obj, decoder_obj, preprocessed_text, captions):
    
    loss_func = nn.CrossEntropyLoss()
    
    encoder_optim = optim.SGD(encoder_obj.parameters(), lr=lr)
    decoder_optim = optim.SGD(decoder_obj.parameters(), lr=lr)
    old_loss = np.inf
    epochs = 0
    losses = []
    while True:
        new_loss = 0
        epoch += 1
        
        ## CHANGE THIS!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
        for x in trainloader_images:
            image_name = ___
            train_image = ____
            image_captions = np.append(preprocessed_text[captions['Image']==image_name].values, '<EOS>')
            
            for image_caption in image_captions:
                new_loss += train_one_image(train_image, image_caption, encoder_obj, decoder_obj, decoder_hidden_size, encoder_optim, decoder_optim, loss_func)
            
            print('Epoch {0}: Loss = {1}'.format(epoch, new_loss))
            losses.append(new_loss)
            
            if abs(new_loss-old_loss)/new_loss < 1e-5:
                print('Converged')
                return losses, epoch
            
            old_loss = new_loss

In [69]:
def evaluate(encoder_obj, decoder_obj, image, image_caption, decoder_hidden_size, vocab_tokens):
    with torch.no_grad():

        # Start of sentence and End of Sentence token
        sos_token = np.argwhere(vocab_tokens=='<SOS>')
        eos_token = np.argwhere(vocab_tokens=='<EOS>')

        # Length of the image caption
        caption_length = image_caption.size(0)


        # TO BE CHECKED!!!!!!!!!!!!!!
        encoder_output = encoder_obj(image).view(1, 1, decoder_hidden_size)    
        decoder_input = torch.tensor([[sos_token]], device=device)
        decoder_hidden = encoder_output

        output_caption = []
        for i in range(caption_length):
            decoder_output, decoder_hidden = decoder_obj(decoder_input, decoder_hidden) 
            max_val, max_ind = decoder_output.topk(1)  # Choosing the word with maximum probability 
            decoder_input = max_ind 
            loss += loss_func(decoder_output, image_caption[i])
            if decoder_input.item() == eos_token:
                output_caption.append('<EOS>')
                break
            else:
                output_caption.append(vocab_tokens[decoder_input.item()])

        return bleu_score(image_caption, output_caption)

### Question 1

In [6]:
k = 1
decoder_hidden_size =50

encoder_obj = NetVLAD(k).to(device)

embed_size = embedding_trained.weight.shape[1]
decoder_output_size = embedding_trained.weight.shape[0]
decoder_obj = RNN(embed_size, decoder_hidden_size, decoder_output_size, embedding_trained).to(device)

train(lr=0.01, trainloader, encoder_obj, decoder_obj)

### Question 2

In [40]:
k = 1
decoder_hidden_size =50

encoder_obj = NetVLAD(k).to(device)

embed_size = embedding_trained.weight.shape[1]
decoder_output_size = embedding_trained.weight.shape[0]
decoder_obj = LSTM(embed_size, decoder_hidden_size, decoder_output_size, embedding_trained).to(device)

train(lr=0.01, trainloader, encoder_obj, decoder_obj)