## About
Image captioning in PyTorch

In [1]:
#neccessary imports
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
from torch.utils.data import DataLoader,Dataset
import numpy as np
import pandas as pd
from torchsummary import summary
from torchvision.transforms import transforms
import glob
from torch.nn.utils.rnn import pad_sequence  # pad batch
import nltk
import torch.optim as optim
from PIL import Image
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# os.chdir('/content/drive/MyDrive/Datasets/')
# !unzip Flickr8k.zip

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: Images/2844846111_8c1cbfc75d.jpg  
  inflating: Images/2844963839_ff09cdb81f.jpg  
  inflating: Images/2845246160_d0d1bbd6f0.jpg  
  inflating: Images/2845691057_d4ab89d889.jpg  
  inflating: Images/2845845721_d0bc113ff7.jpg  
  inflating: Images/2846037553_1a1de50709.jpg  
  inflating: Images/2846785268_904c5fcf9f.jpg  
  inflating: Images/2846843520_b0e6211478.jpg  
  inflating: Images/2847514745_9a35493023.jpg  
  inflating: Images/2847615962_c330bded6e.jpg  
  inflating: Images/2847859796_4d9cb0d31f.jpg  
  inflating: Images/2848266893_9693c66275.jpg  
  inflating: Images/2848571082_26454cb981.jpg  
  inflating: Images/2848895544_6d06210e9d.jpg  
  inflating: Images/2848977044_446a31d86e.jpg  
  inflating: Images/2849194983_2968c72832.jpg  
  inflating: Images/2850719435_221f15e951.jpg  
  inflating: Images/2851198725_37b6027625.jpg  
  inflating: Images/2851304910_b5721199bc.jpg  
  inflating: Images/285

In [2]:
device = torch.device("cuda" if torch.cuda.is_available else "cpu")
print(device)

cuda


In [3]:
# approach
#1. extract image features out of a VGG net
#2. Pass it through a sequential model(LSTM) during training as Hidden state or input.|

# 1. Feature Extractor block -4096 features 
class FeatureExtractorBlock(nn.Module):
    def __init__(self,embedding_size, train_backbone=False):
        super().__init__()
        self.train_backbone = train_backbone
        self.backbone =models.vgg16(pretrained=True)
        self.backbone.fc = nn.Linear(self.backbone.classifier[0].in_features, embedding_size)
        self.activation_layer = nn.ReLU()
        self.dropout_layer = nn.Dropout(0.3)
    
    def forward(self, batch_images):
        image_features = self.backbone(batch_images)
        #setting last layer to be set to trainable
        for param in self.backbone.parameters():
            param.requires_grad_(False)

        return self.dropout_layer(self.activation_layer(image_features))

# 2. Sequential Block
class SequentialBlock(nn.Module):
    def __init__(self, embedding_size, hidden_size, vocab_size, n_layers):
        super().__init__()
        self.embedding_layer = nn.Embedding(vocab_size,embedding_size)
        self.lstm_layer = nn.LSTM(embedding_size,hidden_size,n_layers)
        self.linear_layer = nn.Linear(hidden_size,vocab_size)
        self.dropout_layer = nn.Dropout(0.4)
    
    def forward(self, image_features,image_captions):
        embeddings = self.dropout_layer(self.embedding_layer(image_captions))
        embeddings = torch.cat((image_features.unsqueeze(0), embeddings), dim=0) # unsqueeze to reveal time step
        hiddens, _ = self.lstm_layer(embeddings)
        outputs = self.linear_layer(hiddens)
        return outputs

#3. Bottleneck layer
class BottleneckBlock(nn.Module):
    def __init__(self, embedding_size,hidden_size, vocab_size, n_layers):
        super().__init__()
        self.feature_extractor_block = FeatureExtractorBlock(embedding_size=embedding_size)
        self.sequential_block = SequentialBlock(embedding_size=embedding_size,hidden_size=hidden_size,vocab_size=vocab_size,n_layers=n_layers)
    
    def forward(self, images, captions):
        image_features = self.feature_extractor_block(images)
        output_vector = self.sequential_block(image_features,captions)  
    
        return output_vector


In [4]:
embedding_size=512
hidden_size=512
vocab_size=100
n_layers=2
model = BottleneckBlock(embedding_size,hidden_size,vocab_size,n_layers)
print(model)



BottleneckBlock(
  (feature_extractor_block): FeatureExtractorBlock(
    (backbone): VGG(
      (features): Sequential(
        (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): ReLU(inplace=True)
        (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (3): ReLU(inplace=True)
        (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (6): ReLU(inplace=True)
        (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (8): ReLU(inplace=True)
        (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (11): ReLU(inplace=True)
        (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (13): ReLU(inplace=True)
        (14): Conv2d(256, 25

In [5]:
#visualising the dataset
dataset_path = "/content/drive/MyDrive/Datasets/"
annotation_file = "captions.txt"
image_dir = "Images/"

#converting contents of annotation_file to df
with open(dataset_path+annotation_file) as f:
    lines = [line.rstrip() for line in f]

column_names = lines[0].split(',')
caption_image_dict = {}
for i,line in enumerate(lines):
    image_name = line.split(',')[0]
    caption = line.split(',')[-1]
    if i ==0:
        pass # since it's image and caption itself
    else:
        caption_image_dict[image_name] = caption

df = pd.DataFrame.from_dict(caption_image_dict,orient='index').reset_index()
df.columns = column_names






In [6]:
df.head()

Unnamed: 0,image,caption
0,1000268201_693b08cb0e.jpg,A little girl in a pink dress going into a woo...
1,1001773457_577c3a7d70.jpg,Two dogs on pavement moving toward each other .
2,1002674143_1b742ab4b8.jpg,Young girl with pigtails painting outside in t...
3,1003163366_44323f5815.jpg,man laying on bench holding leash of dog sitti...
4,1007129816_e794419615.jpg,The man with pierced ears is wearing glasses a...


In [7]:
#creating a vocabulary 
word_tokenize = nltk.tokenize.word_tokenize('This is me')
for char in word_tokenize:
    print(char.lower())


this
is
me


In [8]:
#creating a vocabulary 
class WordVocab:
    def __init__(self,freq_threshold):
        self.index_to_string = {0:"<PAD>",1:"<START>",2:"<END>",3:"<OOV>"}
        self.string_to_index = {}
        for k,v in self.index_to_string.items():
            self.string_to_index[v]=k
        self.freq_threshold = freq_threshold

    def __len__(self):
        return len(self.string_to_index)

    @staticmethod
    def tokenised_sents(sentence):
        return [char.lower() for char in nltk.tokenize.word_tokenize(sentence)]

    def build_vocab(self,sentence_list):
        freq_dict = {}
        idx=4
        for sentence in sentence_list:
            for word in self.tokenised_sents(sentence):
                if word not in freq_dict:
                    freq_dict[word]=1
                else:
                    freq_dict[word]+=1
                
                if freq_dict[word]== self.freq_threshold:
                    self.string_to_index[word]=idx
                    self.index_to_string[idx]=word
                    idx+=1

    
    def convert_to_number(self,text):
        tokenized_text = self.tokenised_sents(text)

        return [
            self.string_to_index[token] if token in self.string_to_index else self.string_to_index["<OOV>"]
            for token in tokenized_text
        ]

In [9]:
#creating dataset with captions converted to numerical value
class FlickrDataset(nn.Module):
    def __init__(self,dataframe,image_dir, freq_threshold=5, transform = None):
        self.dataframe = dataframe
        self.image_names = dataframe['image'].values.tolist()
        self.caption = dataframe['caption'].values.tolist()
        self.image_dir = image_dir
        self.transform = transform
        self.vocab = WordVocab(freq_threshold)
        self.vocab.build_vocab(self.caption)

    def __len__(self):
        return len(self.dataframe)
    
    def __getitem__(self,index):
        image_name = glob.glob(self.image_dir+self.image_names[index])[0]
        image = Image.open(image_name)
        caption = self.caption[index]
        if self.transform is not None:
            image = self.transform(image)

        converted_caption = [self.vocab.string_to_index["<START>"]]
        converted_caption+=self.vocab.convert_to_number(caption)
        converted_caption.append(self.vocab.string_to_index["<END>"])

        item = {'image':image, 'caption':torch.tensor(converted_caption)}

        return item
            

In [10]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Resize((224,224)),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

In [11]:
dataset = FlickrDataset(df, dataset_path+image_dir,transform=transform)

In [12]:
item = dataset.__getitem__(5)
print(item['image'].shape, item['caption'])

torch.Size([3, 224, 224]) tensor([  1,   9,  50,  41, 223,   4,   5,  35, 866,   4,   5, 208,   6,   2])


In [13]:
#collate_function
class Collater(object):
    def __init__(self, pad_index):
        self.pad_index = pad_index

    def __call__(self, batch):
        images = [item['image'].unsqueeze(0) for item in batch]
        images = torch.cat(images,dim=0)
        captions = [item['caption'] for item in batch]
        captions = pad_sequence(captions, batch_first=False, padding_value=self.pad_index)

        item = {'images':images, 'captions':captions}
        return item


In [14]:
#creating data loader
batch=128
pad_idx = dataset.vocab.string_to_index["<PAD>"]
dataloader = DataLoader(dataset=dataset, batch_size=batch,num_workers=4, shuffle=True, pin_memory=True,collate_fn=Collater(pad_idx))



In [15]:
for i,item in enumerate(dataloader):
    print(item['images'].shape, item['captions'].shape)
    break

torch.Size([128, 3, 224, 224]) torch.Size([25, 128])


In [16]:
#training the model
#device = torch.device("cpu")


In [17]:
#Defining hyperparams
embedding_size=1000
hidden_size=256
vocab_size=len(dataset.vocab)
n_layers=1
lr=0.001
num_epochs=10

model = BottleneckBlock(embedding_size,hidden_size,vocab_size,n_layers)




In [18]:
model = model.to(device)

In [19]:
#defining criterion and los function
criterion = nn.CrossEntropyLoss(ignore_index=dataset.vocab.string_to_index["<PAD>"])
optimizer = optim.Adam(model.parameters(), lr=lr)

In [20]:
#training
for epoch in range(num_epochs):
    for i, batch in enumerate(dataloader):
        images = batch['images'].to(device)
        captions = batch['captions'].to(device)
        output = model(images, captions[:-1]) #leaving the last to predict
        loss = criterion(output.reshape(-1,output.shape[2]), captions.reshape(-1))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print("Epoch - {}, Loss - {}".format(epoch, loss.item()))

Epoch - 0, Loss - 3.7365944385528564
Epoch - 1, Loss - 3.1778571605682373
Epoch - 2, Loss - 2.9402990341186523
Epoch - 3, Loss - 2.6418631076812744
Epoch - 4, Loss - 2.590700626373291
Epoch - 5, Loss - 2.550065279006958
Epoch - 6, Loss - 2.311509132385254
Epoch - 7, Loss - 2.5047953128814697
Epoch - 8, Loss - 2.60382080078125
Epoch - 9, Loss - 2.106279134750366


In [21]:
torch.save(model.state_dict(),'Image_caption_10.pth')