## Resize Images (skip)

This code transforms the data set into 255x255 images normalising the dimensions of all images

In [68]:
from PIL import Image
import torch, os, pandas as pd
from torchvision import transforms
from tqdm.notebook import tqdm

preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
])

In [None]:
def get_data():
    

In [69]:
df = pd.read_csv('data/small_data/small_data.csv').drop(columns='Unnamed: 0')

In [70]:
caption_folder = 'data/small_data/'
image_folder = 'data/small_data/images/'

for i in tqdm(zip(df['id'], df['title']),total=len(df)):
    a = Image.open('{}{}'.format(image_folder,i[0]))
    b = i[1]
    
    resized = preprocess(a)
    
    resized.save('data/small_data_resized/{}'.format(i[0]))
    
    

HBox(children=(FloatProgress(value=0.0, max=1000.0), HTML(value='')))




## Generate Data (skip)

In [7]:
to_tensor = transforms.ToTensor()
data_dir = 'data/small_data/images/'


images = []
captions = []
image_paths = []
for i in tqdm(zip(df['id'], df['title']), total=len(df)):
    images.append(to_tensor(Image.open('{}{}'.format(data_dir, i[0]))))
    captions.append(i[1])
    image_paths.append('{}{}'.format(data_dir,i[0]))

HBox(children=(FloatProgress(value=0.0, max=1000.0), HTML(value='')))




## Generate Dataset

In [71]:
from torch.utils.data import DataLoader, Dataset
from tqdm.notebook import tqdm
from PIL import Image
from torchvision import transforms
import pandas as pd, torch

pre_process = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

In [72]:
data_dir = 'data/small_data_resized/'
df = pd.read_csv('data/small_data/small_data.csv').drop(columns='Unnamed: 0')

In [73]:
images = []
captions = []
image_paths = []
for i in tqdm(zip(df['id'], df['title']), total=len(df)):
    images.append(to_tensor(Image.open('{}{}'.format(data_dir, i[0]))))
    captions.append(i[1])
    image_paths.append('{}{}'.format(data_dir,i[0]))
    

HBox(children=(FloatProgress(value=0.0, max=1000.0), HTML(value='')))




In [74]:
class CustomDataset(Dataset):
    def __init__(self, image_paths, captions_path, transforms,train=True):  

        self.image_paths  = image_paths
        self.transforms   = transforms
        self.captions     = captions_path
        self.lengths = torch.LongTensor([len(i) for i in self.captions])
    
    def __getitem__(self, index):

        image   = Image.open(self.image_paths[index])
        caption = self.captions[index]
        t_image = self.transforms(image)
        return t_image, caption, self.lengths

    def __len__(self):  # return count of sample we have

        return len(self.image_paths)

## Generate Vocab

In [75]:
def tokenize(title_string):
    return title_string.split(' ')

In [76]:
tokenized_titles = []

for i in captions:
    tokenized_titles.append(tokenize(i))

In [77]:
def gen_vocab(tokenized_data):
    
    vocab = ['<pad>', '<start>', '<end>', '<unk>']
    
    for title in tokenized_data:
        
        for token in title:
            
            if token not in vocab:
                vocab.append(token)
            else:
                continue
    
    
    idx2wrd = dict(enumerate(set(vocab)))
    wrd2idx = {wrd : num for num, wrd in idx2wrd.items()}
    
    return idx2wrd, wrd2idx 
            
            
        

In [78]:
idx2wrd, vocab = gen_vocab(tokenized_titles) 

## Encode titles 

In [79]:
from torch.nn.utils.rnn import pad_sequence,pad_packed_sequence


In [80]:
def encode(titles, vocab):
    
    encoded_titles = []
    title_lengths = []
    
    for title in titles:
        title_lengths.append(len(title))
        encodings = [vocab['<start>']]
        
        for token in title:
            
            if token in vocab:
                encodings.append(vocab[token])
            else:
                encoding.append(vocab['<unk>'])
        
        encodings.append(vocab['<end>'])
        
        encoded_titles.append(torch.LongTensor(encodings))
    
    return pad_sequence(encoded_titles, batch_first=True, padding_value=vocab['<pad>']), title_lengths

In [81]:
encoded_titles, title_lengths = encode(tokenized_titles, vocab)

In [82]:
dataset = CustomDataset(image_paths=image_paths, captions_path=encoded_titles, transforms=to_tensor)

In [83]:
train_dataloader = DataLoader(dataset, batch_size = 5)

## Encoder Model

In [84]:
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class EncoderCNN(nn.Module):
    def __init__(self):
        super(EncoderCNN, self).__init__()
        self.conv1 = nn.Sequential(         # input shape (1, 28, 28)
            nn.Conv2d(
                in_channels=3,              # input height
                out_channels=16,            # n_filters
                kernel_size=5,              # filter size
                stride=1,                   # filter movement/step
                padding=2,                  # if want same width and length of this image after Conv2d, padding=(kernel_size-1)/2 if stride=1
            ),                              # output shape (16, 28, 28)
            nn.ReLU(),                      # activation
            nn.MaxPool2d(kernel_size=2),    # choose max value in 2x2 area, output shape (16, 14, 14)
        )
        self.conv2 = nn.Sequential(         # input shape (16, 14, 14)
            nn.Conv2d(16, 32, 5, 1, 2),     # output shape (32, 14, 14)
            nn.ReLU(),                      # activation
            nn.MaxPool2d(2),                # output shape (32, 7, 7)
        )
        
        self.conv3 = nn.Sequential(         # input shape (16, 14, 14)
            nn.Conv2d(32, 64, 5, 1, 2),     # output shape (32, 14, 14)
            nn.ReLU(),                      # activation
            nn.MaxPool2d(2),                # output shape (32, 7, 7)
        )
        
        resnet = torchvision.models.resnet101(pretrained=True)
        modules = list(resnet.children())[:-2]
        self.resnet = nn.Sequential(*modules)
        self.out = nn.Linear(32 * 7 * 7, 10)   # fully connected layer, output 10 classes

    def forward(self, x):
        #x = self.conv1(x)
        
        #x = self.conv2(x)
        #x = self.conv3(x)
        #x = x.view(x.size(0), -1)
        
        out = self.resnet(x)
        
        #out = self.adaptive_pool(out)
        # flatten the output of conv2 to (batch_size, 32 * 7 * 7)
        #output = self.out(x)
        out = out.permute(0, 2, 3, 1)
        #return output, x    # return x for visualization
        
        return out

## Decoder Model

In [99]:
class Decoder(nn.Module):
    
    def __init__(self, embed_dim, decoder_dim, vocab_size, 
                 device, encoder_dim=2048, dropout=0.5):
        
        super(Decoder, self).__init__()
    
        self.encoder_dim = encoder_dim #feature size of images
        self.embed_dim = embed_dim #embedding size 512
        self.decoder_dim = decoder_dim #size of decoder rnn 512
        self.vocab_size = vocab_size
        self.dropout = dropout
        self.device = device
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.dropout = nn.Dropout(p=self.dropout)
        self.decode_step = nn.LSTMCell(embed_dim + encoder_dim, decoder_dim, bias=True)
        self.init_h = nn.Linear(encoder_dim, decoder_dim)  # linear layer to find initial hidden state of LSTMCell
        self.init_c = nn.Linear(encoder_dim, decoder_dim)  # linear layer to find initial cell state of LSTMCell
        self.f_beta = nn.Linear(decoder_dim, encoder_dim)  # linear layer to create a sigmoid-activated gate
        self.sigmoid = nn.Sigmoid()
        self.fc = nn.Linear(decoder_dim, vocab_size)  # linear layer to find scores over vocabulary
        self.init_weights()  # initialize some layers with the uniform distribution
        
    def init_weights(self):
        """
        Initializes some parameters with values from the uniform distribution, for easier convergence.
        """
        self.embedding.weight.data.uniform_(-0.1, 0.1)
        self.fc.bias.data.fill_(0)
        self.fc.weight.data.uniform_(-0.1, 0.1)
        
    def init_hidden_state(self, encoder_out):
        """
        Creates the initial hidden and cell states for the decoder's LSTM based on the encoded images.
        :param encoder_out: encoded images, a tensor of dimension (batch_size, num_pixels, encoder_dim)
        :return: hidden state, cell state
        """
        mean_encoder_out = encoder_out.mean(dim=1)
        h = self.init_h(mean_encoder_out)  # (batch_size, decoder_dim)
        c = self.init_c(mean_encoder_out)
        return h, c
    
    def forward(self, encoder_out, encoded_captions, caption_lengths):
        
        batch_size = encoder_out.size(0)
        encoder_dim = encoder_out.size(-1)
        vocab_size = self.vocab_size
        
        encoder_out = encoder_out.view(batch_size, -1, encoder_dim)
        
        num_pixels = encoder_out.size(1)
        
        caption_lengths, sort_ind = caption_lengths.squeeze(1).sort(dim=0, descending=True)
    
        encoder_out = encoder_out[sort_ind]
        
        encoded_captions = encoded_captions[sort_ind]
        
        embeddings = self.embedding(encoded_captions)
        
        h, c = self.init_hidden_state(encoder_out)
        decode_lengths = (caption_lengths - 1).tolist()
            
        predictions = torch.zeros(batch_size, max(decode_lengths[0]), vocab_size).to(device)
        alphas = torch.zeros(batch_size, max(decode_lengths[0]), num_pixels).to(device)
    
        return embeddings, encoder_out, h , c
    
        for t in range(max(decode_lengths[0])):
            batch_size_t = sum([l > t for l in decode_lengths[0]])
            
            #attention_weighted_encoding, alpha = self.attention(encoder_out[:batch_size_t],
                                                                #h[:batch_size_t])
            
            gate = self.sigmoid(self.f_beta(h[:batch_size_t]))
            
            return embeddings[:batch_size_t, t, :], (h[:batch_size_t],
                                    c[:batch_size_t])
        
            h, c = self.decode_step(embeddings[:batch_size_t, t, :], (h[:batch_size_t],
                                    c[:batch_size_t]))
            
            # (batch_size_t, decoder_dim)
            preds = self.fc(self.dropout(h))  # (batch_size_t, vocab_size)
            predictions[:batch_size_t, t, :] = preds
            alphas[:batch_size_t, t, :] = alpha
            
        
        return predictions, encoded_captions, decode_lengths, alphas, sort_ind
            

## Training Loop

In [100]:
device = 'cpu'
encoder_model = EncoderCNN()
#decoder_model = DecoderRnn(vocab_size=1092, max_len=17,
                           #sos_id=vocab['<start>'], eos_id=vocab['<end>'],
                           #hidden_size=2048)
decoder_model = Decoder(embed_dim=512, decoder_dim=512, vocab_size=len(vocab),device=device)


In [102]:
optimizer = torch.optim.Adam(encoder_model.parameters(), lr=0.001)

encoder_model.train()
#decoder_model.train()

for i in train_dataloader:
    image = i[0]
    caption = i[1]
    lengths = i[2]
    features = encoder_model(image)
    
    embeddings, encoder_out, h, c = decoder_model(features, caption, lengths)
    
    
    break

In [103]:
h.shape

torch.Size([5, 49, 512])

In [104]:
c.shape

torch.Size([5, 49, 512])

In [110]:
gru(embeddings, encoder_out)

RuntimeError: input must have 3 dimensions, got 4