In [None]:
!gdown "1--2MHS70-Y8rZfsDd5PjztPevBC6Q5vd&confirm=t"
!unzip flickr8kimagescaptions.zip

# Library

In [None]:
import os
import cv2
from tqdm import tqdm
import torch 
import random
import torchtext
import numpy as np
import pandas as pd
from PIL import Image
import torch.nn as nn
from textwrap import wrap
import torch.optim as optim
import matplotlib.pyplot as plt
import torchvision.models as models
from torchtext.data import get_tokenizer
import torchvision.transforms as transforms
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset, random_split
from nltk.translate.bleu_score import corpus_bleu
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data.sampler import SubsetRandomSampler
import spacy
spacy_eng = get_tokenizer('spacy')
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Visualize sample

In [None]:
captions = open("/content/flickr8k/captions.txt")
captions = [line.strip().split(",") for line in captions]

In [None]:
captions = open("/content/flickr8k/captions.txt")

captions = [line.strip().split(",") for line in captions]

dict_image_caption = {}

for i, item in enumerate(captions):
    # item 0 is for description of columns and it has not been used
    if i==0:
        continue
    # adding captions to image in dictionary
    if item[0] in dict_image_caption.keys():
        dict_image_caption[item[0]].append(item[1])
    else:
        dict_image_caption[item[0]]= [item[1]]

# choosing 1 image
keys = random.sample(list(dict_image_caption), 1)

for key in keys:
    img = cv2.imread(os.path.join("/content/flickr8k/images", key))
    plt.figure(figsize=(15, 15))
    for i,caption in enumerate(dict_image_caption[key]):
        ax = plt.subplot(1, len( dict_image_caption[key]), i + 1)
        caption = "\n".join(wrap(caption, 15))
        plt.imshow(img)
        plt.title(caption)
        plt.axis("off")
    plt.show()

# Customize dataloader

In [None]:
def collator(batch):
    ls_ims = [i for i, c in batch]
    ls_caps = [c for i, c in batch]
    padded_captions = pad_sequence(ls_caps, batch_first=False, padding_value=0)
    return (torch.stack(ls_ims), padded_captions)

assert False

In [None]:
class Vocab():
    def __init__(self):
        self.i2s = ['<PAD>', '<SOS>', '<EOS>', '<UNK>']
        self.s2i = {item:index for index, item in enumerate(self.i2s)}
    
    def build_vocabulary(self, caps):
        for cap in caps:
            for tok in self.tokenize(cap):                
                if tok not in self.s2i:
                    self.i2s.append(tok)
                    self.s2i[tok] = len(self.i2s) - 1
    
    def tokenize(self, cap):
        return [tok for tok in spacy_eng(cap.replace("'",'').replace('"','').lower())]

    def vectorize(self, cap):
        tokenized_cap = self.tokenize(cap)
        return [self.s2i[tok] if tok in self.s2i else self.s2i["<UNK>"] for tok in tokenized_cap]
    
    def __len__(self):
        return len(self.i2s)

In [None]:
class MyDataset(Dataset):
    def __init__(self, dir_img, dir_ann):
        super(MyDataset, self).__init__()
        self.dir_img, self.dir_ann = dir_img, dir_ann
        self.df = pd.read_csv(dir_ann)
        
        self.caps = self.df['caption'].tolist()
        self.imgs = self.df['image'].tolist()
        
        self.vocab = Vocab()
        self.vocab.build_vocabulary(self.caps)
        self.vec_caps = [torch.tensor([self.vocab.s2i['<SOS>']] + self.vocab.vectorize(cap) + [self.vocab.s2i['<EOS>']]) for cap in self.caps]

        self.transform = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Resize((224, 224)),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
            ]
        )

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        cap = self.vec_caps[index]
        img = self.transform(cv2.imread(os.path.join(self.dir_img, self.imgs[index])))
        return (img, cap)

In [None]:
dataset = MyDataset('content/flickr8k/images', 'content/flickr8k/captions.txt')
PAD_VALUE = dataset.vocab.s2i['<PAD>']

train_dataset, valid_dataset = random_split(dataset, [int(0.8 * len(dataset)), len(dataset) - int(0.8 * len(dataset)) ])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collator)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False, collate_fn=collator)

# Model

In [None]:
class ImageCaptioning(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(ImageCaptioning, self).__init__()
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        # backbone for feature exctract
        self.featuresCNN = models.resnet50(pretrained=True)
        # convert features to feature vector
        for param in self.featuresCNN.parameters():
            param.requires_grad = False
        self.featuresCNN.fc = nn.Linear(self.featuresCNN.fc.in_features, embed_size)
        self.fc = nn.Linear(hidden_size, vocab_size)
        #activation function
        self.relu = nn.ReLU()
        #RNN
        self.embed = nn.Embedding(self.vocab_size, self.embed_size, padding_idx=PAD_VALUE)
        self.lstm = nn.LSTM(self.embed_size, self.hidden_size, self.num_layers)
        self.linear = nn.Linear(self.hidden_size, self.vocab_size)
        
    def forward(self, images, captions):
        features = self.featuresCNN(images)
        features = self.relu(features)
        embeddings = self.embed(captions)
        embeddings = torch.cat((features.unsqueeze(0), embeddings), dim=0)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.fc(hiddens)
        return outputs
    
    def generate(self, img, i2s, max = 40):
        with torch.no_grad():
            gen_toks = []
            img = torch.unsqueeze(img, 0)
            x = self.relu(self.featuresCNN(img)).unsqueeze(0)
            s = None
            for _ in range(max):
                h, s = self.lstm(x, s)
                o = self.fc(h.squeeze(0))
                p = o.argmax(1)
                gen_toks.append(p.item())
                x = self.embed(p).unsqueeze(0)

                if i2s[p.item()] == '<EOS>':
                    return [i2s[i] for i in gen_toks]
    
    def bleu_score(self, dataset):
        ls_r, ls_h = [], []
        pbar = tqdm(dict_image_caption.items(), total = len(dict_image_caption))
        for img, caps in pbar:
            image = dataset.transform(cv2.imread(os.path.join(dataset.dir_img, img))).to(device)
            generated_caption = [token for token in self.generate(image, dataset.vocab.i2s) if token not in ['<PAD>', '<SOS>', '<EOS>', '<UNK>']]
            
            ls_r.append([dataset.vocab.tokenize(c) for c in caps])
            ls_h.append(generated_caption)
            pbar.set_description(f'Generating Captions for {img}')

        print("BLEU 1 =", corpus_bleu(ls_r, ls_h, weights = (1,0,0,0)))   
        print("BLEU 2 =", corpus_bleu(ls_r, ls_h, weights = (0.5,0.5,0,0)))   


# Train

In [None]:
vocab_size = len(dataset.vocab.i2s)
num_epochs = 10
embed_size = 200
hidden_size = 200

model = ImageCaptioning(vocab_size, embed_size, hidden_size, 3).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)
criterion = nn.CrossEntropyLoss()

checkpoint = torch.load('model-emb-freeze.pt')
model.load_state_dict(checkpoint)

# for epoch in range(num_epochs):
#     model.train()
    
#     cumm_train_loss = 0
#     pbar = tqdm(enumerate(train_loader), total = len(train_loader))
    
#     for i, (imgs, caps) in pbar:
#         optimizer.zero_grad()
#         imgs, caps = imgs.to(device), caps.to(device)
        
#         y_hat = model(imgs, caps[:-1])
#         loss = criterion(y_hat.reshape(-1, y_hat.shape[2]), caps.reshape(-1))
        
#         loss.backward()
#         optimizer.step()
        
#         cumm_train_loss += loss.item()
#         pbar.set_description(f'Train Epoch {epoch} | loss: {cumm_train_loss / (i+1)}')
        
#     model.eval()
    
#     cumm_loss_valid = 0
#     pbar = tqdm(enumerate(valid_loader), total = len(valid_loader))
    
#     with torch.no_grad():
#         for i, (imgs, caps) in pbar:
#             imgs, caps = imgs.to(device), caps.to(device)
            
#             y_hat = model(imgs, caps[:-1])
#             loss = criterion(y_hat.reshape(-1, y_hat.shape[2]), caps.reshape(-1))
            
#             cumm_loss_valid += loss.item()
#             pbar.set_description(f'Validation | loss: {cumm_loss_valid / (i+1)}')

# torch.save(model.state_dict(), 'model-emb-freeze.pt')

In [None]:
model.bleu_score(dataset)

# Change Model (Glove embedding + Froze ResNet)

In [None]:
import locale
!wget http://nlp.stanford.edu/data/glove.6B.zip
!unzip glove.6B.zip
!ls -lat
locale.getpreferredencoding = lambda: 'UTF-8'

In [None]:
gvocabs, gembeddings = [],[]
with open('glove.6B.200d.txt','r', encoding="utf-8") as file:
    gloves = file.read().strip().split('\n')

for row in tqdm(gloves):
    vals = row.split(' ')
    gvocabs.append(vals[0])
    gembeddings.append([float(val) for val in vals[1:]])
del gloves

In [None]:
avocabs = np.array(['<PAD>','<SOS>','<EOS>','<UNK>'] + gvocabs)
aembeddings = np.array(gembeddings)
emb_pad = np.zeros((1, aembeddings.shape[1])) # pad should be zero
emb_sos = np.random.rand(1, aembeddings.shape[1]) # we learn the relation, no need to do something fancy
emb_eos = np.random.rand(1, aembeddings.shape[1]) # we learn the relation, no need to do something fancy
emb_unk = np.mean(aembeddings, axis=0,keepdims=True) # this is suggested by glove paper writer
aembeddings = np.vstack([emb_pad, emb_sos, emb_eos, emb_unk, aembeddings])

In [None]:
class VocabGlove:
    def __init__(self):
        self.i2s = ['<PAD>', '<SOS>', '<EOS>', '<UNK>']
        self.s2i = {item:index for index, item in enumerate(self.i2s)}
        self.u = set(self.i2s)

    
    def build_vocabulary(self, caps):
        for cap in tqdm(caps, total=len(caps)):
            for tok in self.tokenize(cap):                
                if tok not in self.s2i:
                    self.i2s.append(tok)
                    self.s2i[tok] = len(self.i2s) - 1
    
    def tokenize(self, cap):
        return [tok for tok in spacy_eng(cap.replace("'",'').replace('"','').lower())]
    
    def vectorize(self, cap):
        tokenized_cap = self.tokenize(cap)
        return [self.s2i[tok] if tok in self.s2i else self.s2i["<UNK>"] for tok in tokenized_cap]
    
    def populate_u(self, caps):
        for cap in tqdm(caps, total=len(caps)):
            for word in self.tokenize(cap):                
                if word in avocabs and word not in self.u:
                    self.u.add(word)

    def reset_vocabs(self, rvocabs):
        self.i2s, self.s2i = [], {}
        for i in range(rvocabs.shape[0]):
            self.i2s.append(rvocabs[i])
            self.s2i[rvocabs[i]] = len(self.i2s) - 1
                    
    def __len__(self):
        return len(self.i2s)

In [None]:
class MyDataset_Glove(Dataset):
    def __init__(self, dir_img, dir_ann):
        super(MyDataset_Glove, self).__init__()
        self.dir_img, self.dir_ann = dir_img, dir_ann
        self.df = pd.read_csv(dir_ann)
        
        self.caps = self.df['caption'].tolist()
        self.imgs = self.df['image'].tolist()
        
        self.vocab = VocabGlove()
        self.vocab.populate_u(self.caps)
        self.vocab.build_vocabulary(self.caps)
        
        self.transform = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Resize((224, 224)),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
            ]
        )


    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        cap = self.vec_caps[index]
        img = self.transform(cv2.imread(os.path.join(self.dir_img, self.imgs[index])))
        return (img, cap)

    def build_vectorized_captions(self,):
        self.vec_caps = [torch.tensor([self.vocab.s2i['<SOS>']] + self.vocab.vectorize(cap) + [self.vocab.s2i['<EOS>']]) for cap in self.caps]

In [None]:
dataset_glove = MyDataset_Glove('/content/flickr8k/images', '/content/flickr8k/captions.txt')
todel = [i for i in range(avocabs.shape[0]) if avocabs[i] not in dataset_glove.vocab.s2i]

rvocaba = np.delete(avocabs, todel, axis=0)
rembeddings = np.delete(aembeddings, todel, axis=0)

dataset_glove.vocab.reset_vocabs(rvocaba)
dataset_glove.build_vectorized_captions()

train_dataset, valid_dataset = random_split(dataset_glove, [int(0.8 * len(dataset_glove)), len(dataset_glove) - int(0.8 * len(dataset_glove))])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collator)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False, collate_fn=collator)

In [None]:
class ImageCaptioningGlove(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(ImageCaptioningGlove, self).__init__()
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        # backbone for feature exctract
        self.featuresCNN = models.resnet50(pretrained=True)
        # convert features to feature vector
        for param in self.featuresCNN.parameters():
            param.requires_grad = False
        self.featuresCNN.fc = nn.Linear(self.featuresCNN.fc.in_features, embed_size)
        self.fc = nn.Linear(hidden_size, vocab_size)
        #activation function
        self.relu = nn.ReLU()
        #RNN
        # we use glove embeddings instead of training them from scratch
        self.embed = torch.nn.Embedding.from_pretrained(torch.from_numpy(rembeddings).float())
        self.lstm = nn.LSTM(self.embed_size, self.hidden_size, self.num_layers)
        self.linear = nn.Linear(self.hidden_size, self.vocab_size)
        
    def forward(self, images, captions):
        features = self.featuresCNN(images)
        features = self.relu(features)
        embeddings = self.embed(captions)
        embeddings = torch.cat((features.unsqueeze(0), embeddings), dim=0)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.fc(hiddens)
        return outputs
    
    def generate(self, img, i2s, max = 40):
        with torch.no_grad():
            gen_toks = []
            img = torch.unsqueeze(img, 0)
            x = self.relu(self.featuresCNN(img)).unsqueeze(0)
            s = None
            for _ in range(max):
                h, s = self.lstm(x, s)
                o = self.fc(h.squeeze(0))
                p = o.argmax(1)
                gen_toks.append(p.item())
                x = self.embed(p).unsqueeze(0)

                if i2s[p.item()] == '<EOS>':
                    return [i2s[i] for i in gen_toks]
    
    def bleu_score(self, dataset):
        ls_r, ls_h = [], []
        pbar = tqdm(dict_image_caption.items(), total = len(dict_image_caption))
        for img, caps in pbar:
            image = dataset.transform(cv2.imread(os.path.join(dataset.dir_img, img))).to(device)
            generated_caption = [token for token in self.generate(image, dataset.vocab.i2s) if token not in ['<PAD>', '<SOS>', '<EOS>', '<UNK>']]
            
            ls_r.append([dataset.vocab.tokenize(c) for c in caps])
            ls_h.append(generated_caption)
            pbar.set_description(f'Generating Captions for {img}')

        print("BLEU 1 =", corpus_bleu(ls_r, ls_h, weights = (1,0,0,0)))   
        print("BLEU 2 =", corpus_bleu(ls_r, ls_h, weights = (0.5,0.5,0,0)))  

In [None]:
vocab_size = len(dataset.vocab.i2s)
num_epochs = 10
embed_size = 200
hidden_size = 200

model = ImageCaptioningGlove(vocab_size, embed_size, hidden_size, 3).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)
criterion = nn.CrossEntropyLoss()

checkpoint = torch.load('model-glove-freeze.pt')
model.load_state_dict(checkpoint)

# for epoch in range(num_epochs):
#     model.train()
    
#     cumm_train_loss = 0
#     pbar = tqdm(enumerate(train_loader), total = len(train_loader))
    
#     for i, (imgs, caps) in pbar:
#         optimizer.zero_grad()
#         imgs, caps = imgs.to(device), caps.to(device)
        
#         y_hat = model(imgs, caps[:-1])
#         loss = criterion(y_hat.reshape(-1, y_hat.shape[2]), caps.reshape(-1))
        
#         loss.backward()
#         optimizer.step()
        
#         cumm_train_loss += loss.item()
#         pbar.set_description(f'Train Epoch {epoch} | loss: {cumm_train_loss / (i+1)}')
        
#     model.eval()
    
#     cumm_loss_valid = 0
#     pbar = tqdm(enumerate(valid_loader), total = len(valid_loader))
    
#     with torch.no_grad():
#         for i, (imgs, caps) in pbar:
#             imgs, caps = imgs.to(device), caps.to(device)
            
#             y_hat = model(imgs, caps[:-1])
#             loss = criterion(y_hat.reshape(-1, y_hat.shape[2]), caps.reshape(-1))
            
#             cumm_loss_valid += loss.item()
#             pbar.set_description(f'Validation | loss: {cumm_loss_valid / (i+1)}')

# torch.save(model.state_dict(), 'model-emb-freeze.pt')

In [None]:
model.bleu_score(dataset_glove)

# Train Again (Glove embedding + Unfroze ResNet)

In [None]:
class ImageCaptioningGloveUnfreeze(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(ImageCaptioningGloveUnfreeze, self).__init__()
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        # backbone for feature exctract
        self.featuresCNN = models.resnet50(pretrained=True)
        # convert features to feature vector
        for param in self.featuresCNN.parameters():
            param.requires_grad = True
        self.featuresCNN.fc = nn.Linear(self.featuresCNN.fc.in_features, embed_size)
        self.fc = nn.Linear(hidden_size, vocab_size)
        #activation function
        self.relu = nn.ReLU()
        #RNN
        self.embed = torch.nn.Embedding.from_pretrained(torch.from_numpy(rembeddings).float())
        self.lstm = nn.LSTM(self.embed_size, self.hidden_size, self.num_layers)
        self.linear = nn.Linear(self.hidden_size, self.vocab_size)
        
    def forward(self, images, captions):
        features = self.featuresCNN(images)
        features = self.relu(features)
        embeddings = self.embed(captions)
        embeddings = torch.cat((features.unsqueeze(0), embeddings), dim=0)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.fc(hiddens)
        return outputs
    
    def generate(self, img, i2s, max = 40):
        with torch.no_grad():
            gen_toks = []
            img = torch.unsqueeze(img, 0)
            x = self.relu(self.featuresCNN(img)).unsqueeze(0)
            s = None
            for _ in range(max):
                h, s = self.lstm(x, s)
                o = self.fc(h.squeeze(0))
                p = o.argmax(1)
                gen_toks.append(p.item())
                x = self.embed(p).unsqueeze(0)

                if i2s[p.item()] == '<EOS>':
                    return [i2s[i] for i in gen_toks]
    
    def bleu_score(self, dataset):
        ls_r, ls_h = [], []
        pbar = tqdm(dict_image_caption.items(), total = len(dict_image_caption))
        for img, caps in pbar:
            image = dataset.transform(cv2.imread(os.path.join(dataset.dir_img, img))).to(device)
            generated_caption = [token for token in self.generate(image, dataset.vocab.i2s) if token not in ['<PAD>', '<SOS>', '<EOS>', '<UNK>']]
            
            ls_r.append([dataset.vocab.tokenize(c) for c in caps])
            ls_h.append(generated_caption)
            pbar.set_description(f'Generating Captions for {img}')

        print("BLEU 1 =", corpus_bleu(ls_r, ls_h, weights = (1,0,0,0)))   
        print("BLEU 2 =", corpus_bleu(ls_r, ls_h, weights = (0.5,0.5,0,0)))  

In [None]:
vocab_size = len(dataset.vocab.i2s)
num_epochs = 10
embed_size = 200
hidden_size = 200

model = ImageCaptioningGloveUnfreeze(vocab_size, embed_size, hidden_size, 3).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)
criterion = nn.CrossEntropyLoss()

checkpoint = torch.load('model-glove-unfreeze.pt')
model.load_state_dict(checkpoint)

# for epoch in range(num_epochs):
#     model.train()
    
#     cumm_train_loss = 0
#     pbar = tqdm(enumerate(train_loader), total = len(train_loader))
    
#     for i, (imgs, caps) in pbar:
#         optimizer.zero_grad()
#         imgs, caps = imgs.to(device), caps.to(device)
        
#         y_hat = model(imgs, caps[:-1])
#         loss = criterion(y_hat.reshape(-1, y_hat.shape[2]), caps.reshape(-1))
        
#         loss.backward()
#         optimizer.step()
        
#         cumm_train_loss += loss.item()
#         pbar.set_description(f'Train Epoch {epoch} | loss: {cumm_train_loss / (i+1)}')
        
#     model.eval()
    
#     cumm_loss_valid = 0
#     pbar = tqdm(enumerate(valid_loader), total = len(valid_loader))
    
#     with torch.no_grad():
#         for i, (imgs, caps) in pbar:
#             imgs, caps = imgs.to(device), caps.to(device)
            
#             y_hat = model(imgs, caps[:-1])
#             loss = criterion(y_hat.reshape(-1, y_hat.shape[2]), caps.reshape(-1))
            
#             cumm_loss_valid += loss.item()
#             pbar.set_description(f'Validation | loss: {cumm_loss_valid / (i+1)}')

# torch.save(model.state_dict(), 'model-emb-freeze.pt')

In [None]:
model.bleu_score(dataset_glove)