In [None]:
%%capture
!pip install --upgrade wandb

In [None]:
## Importing Packages
import os
import torch
import random
import warnings
import wandb
import numpy as np
import transformers
import pandas as pd 
from PIL import Image
import torch.nn as nn
warnings.filterwarnings("ignore")
import torchvision.transforms as T
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from typing import Callable, Optional

In [None]:
wandb.login(key="33cea174d8d4310fdff4d2cc9f28e5fac88028ac");
wandb.init(project="quotes_for_posts", entity="quantum-potion")

In [None]:
## For Reproducibility
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
seed_everything(42)

## Tokenizer
tokenizer = transformers.BertTokenizer.from_pretrained("bert-base-uncased",do_lower_case=True)

## Device Configuration ('cuda' if torch.cuda.is_available() else )
device = torch.device('cpu')

In [None]:
data_dir = '../raw_data'
image_dir = f'{data_dir}/flickr30k_images'
csv_file = f'{image_dir}/results.csv'

In [None]:
csv_file

In [None]:
import pandas as pd
df = pd.read_csv(csv_file, delimiter='|')
df[' comment_number'][19999] = ' 4'
df[' comment'][19999] = ' A dog runs across the grass .'
df['image_name'] = image_dir+'/'+df['image_name']
df.head(5)

In [None]:
image_name = {
    'image_name':df[df[' comment_number'] == df[' comment_number'][0]]['image_name'].values,
}
comments = {
    'comment_0':df[df[' comment_number'] == df[' comment_number'][0]][' comment'].values,
    'comment_1':df[df[' comment_number'] == df[' comment_number'][1]][' comment'].values,
    'comment_2':df[df[' comment_number'] == df[' comment_number'][2]][' comment'].values,
    'comment_3':df[df[' comment_number'] == df[' comment_number'][3]][' comment'].values,
    'comment_4':df[df[' comment_number'] == df[' comment_number'][4]][' comment'].values,
}

image_name_df = pd.DataFrame.from_dict(image_name)
comments_df = pd.DataFrame.from_dict(comments)

df = pd.concat([image_name_df,comments_df], axis=1)
df.tail()

In [None]:
## Obtain Train and Test Split 
train, test = train_test_split(df, test_size=0.2, random_state=42)

## Reset Indexes 
train = train.reset_index(drop=True)
test = test.reset_index(drop=True)

## Obtain Train and Validation Split 
train, val = train_test_split(train, test_size=0.25, random_state=42)

## Reset Indexes 
train = train.reset_index(drop=True)
val = val.reset_index(drop=True)

## Let's see how many entries we have
print(train.shape)
print(val.shape)
print(test.shape)

In [None]:
class FlickrDataset(Dataset):
    
    def __init__(self, df, 
                 transforms= None):
        self.df = df
        self.transforms = T.Compose([
            T.ToTensor(),
            T.Normalize(mean = [0.5], std = [0.5]),
            T.Resize((256,256)),
        ])
        
    def __len__(self) -> int:
        return len(self.df)
    
    def __getitem__(self, idx: int):
        
        image_id = self.df.image_name.values[idx]
        image = Image.open(image_id).convert('RGB')
            
        if self.transforms is not None:
            image = self.transforms(image)
            
        comments = self.df[self.df.image_name == image_id].values.tolist()[0][1:][0] # Last zero is to obtain the first caption ONLY
        encoded_inputs = tokenizer(comments,
                                   return_token_type_ids = False, 
                                   return_attention_mask = False, 
                                   max_length = 100, 
                                   padding = "max_length",
                                  return_tensors = "pt")
        
        sample = {"image":image.to(device),"captions": encoded_inputs["input_ids"].flatten().to(device)}
        
        return sample

In [None]:
batch_size = 32

train_dataset = FlickrDataset(train, transforms = True)
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size, drop_last=True)

val_dataset = FlickrDataset(val, transforms = True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size = batch_size,drop_last=True)

test_dataset = FlickrDataset(test, transforms = True)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size = batch_size,drop_last=True)

In [None]:
class CNN(nn.Module):
    
    def __init__(self, embed_size):
        super(CNN, self).__init__()
        model = models.resnet50(pretrained=True)
        for param in model.parameters():
            param.requires_grad_(False)
        
        modules = list(model.children())[:-1]
        self.model = nn.Sequential(*modules)
        self.embed = nn.Linear(model.fc.in_features, embed_size)
        
    def forward(self, image):
        features = self.model(image)
        features = features.view(features.size(0), -1)
        features = self.embed(features)
                
        return features

In [None]:
class RNN(nn.Module):
    
    def __init__(self, input_size, hidden_size, embedding_dim,vocab_size):
        super(RNN, self).__init__()
        
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.embedding_dim = embedding_dim
        self.vocab_size = vocab_size
        
        self.embedding = nn.Embedding(num_embeddings = vocab_size,embedding_dim = embedding_dim)
        
        self.lstm = nn.LSTM(input_size=input_size,
                            hidden_size=hidden_size,
                            batch_first=True)
        
        self.fc = nn.Linear(hidden_size, vocab_size)
        
    def init_hidden(self, features):
        
        return (torch.autograd.Variable(torch.zeros(1,32,512).to(device)), 
                torch.autograd.Variable(features.unsqueeze(0)).to(device))
        
    def forward(self, features, captions):
        
        state = self.init_hidden(features)
        
        embed = self.embedding(captions)
                    
        lstm_out, state = self.lstm(embed, state)
                        
        outputs = self.fc(lstm_out)
        outputs = outputs.view(-1, self.vocab_size)
        
        return outputs

In [None]:
training_data = next(iter(train_dataloader))

image, captions = training_data["image"], training_data["captions"]

encoder = CNN(embed_size = 512).to(device)
decoder = RNN(input_size = 512, hidden_size = 512, embedding_dim=512, vocab_size = 28881).to(device)

features = encoder(image)
embed = decoder(features, captions)

print("Image Transformation: ", image.shape, " --> ", features.shape)
print("Captions Transformation: ", captions.shape, " --> ", embed.shape)

In [None]:
%%capture

vocab_size = 90000
steps_per_epoch = 19069 // 32

encoder = CNN(embed_size = 512).to(device)
decoder = RNN(input_size = 512, hidden_size = 512, embedding_dim=512, vocab_size = vocab_size).to(device)

criterion = nn.CrossEntropyLoss().to(device)
params = list(decoder.parameters()) + list(encoder.embed.parameters())

optimizer = torch.optim.Adam(params, lr=0.001)

In [None]:
for epoch in range(10):

    for idx, train_data in enumerate(train_dataloader):
        
        if idx > steps_per_epoch:
            break
        
        image, captions = torch.tensor(train_data['image']).to(device), torch.tensor(train_data['captions']).to(device)
        
        # zero the parameter gradients
        decoder.zero_grad()
        encoder.zero_grad()
        
        # Forward pass
        features = encoder(image)
        outputs = decoder(features, captions)
        
        # Compute the Loss
        loss = criterion(outputs.view(-1, vocab_size), 
                         captions.view(-1))
        
        # Backward pass.
        loss.backward()
        
        # Update the parameters in the optimizer.
        optimizer.step()
            
        # Get training statistics.
        stats = 'Epoch [%d], Loss: %.4f' % (epoch, loss.item())
        wandb.log({"Loss": loss.item()})
        print('\r' + stats, end="")