Please make sure to firstly change the runtime type to "GPU"!

# 1. Download Flickr8k dataset from Kaggle

In [None]:
# Load image data
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

Mounted at /content/drive/


In [None]:
! pip install kaggle



Before running the cells bellow, you need to make sure that you are at the root directory, i.e., the directory contains "sample_data". Then, you need to get the kaggle.json file from the Kaggle website.

Here is the instruction of downloading Kaggle datasets to Colab: https://www.analyticsvidhya.com/blog/2021/06/how-to-load-kaggle-datasets-directly-into-google-colab/

Here is the link to the Flickr8k dataset on Kaggle: https://www.kaggle.com/adityajn105/flickr8k

**Alternatively**: You could directly download the Flickr8k dataset on Kaggle from the link above and upload the unzipped files (Images directory and captions.txt) to the root directory on Colab. However, this method may be slower. If you you choose to do in this way, you could directly move to the next section.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
cd sample_data/

/content/sample_data


In [None]:
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download adityajn105/flickr8k
!unzip flickr8k.zip

# 2. Import necessary packages

In [None]:
import pandas as pd
import numpy as np
from collections import Counter 
import torchvision
from torchvision import transforms
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
from torch.autograd import Variable
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import math
import torch.nn.functional as F
import pickle
import gc
import random
import cv2
from google.colab.patches import cv2_imshow
import argparse, os, sys
from sklearn.metrics import accuracy_score
from torchvision import transforms

pd.set_option('display.max_colwidth', None)

# 3. Preprocess Images

In [None]:
df = pd.read_csv("captions.txt", sep=',')
print(len(df))
display(df.head(3))

df = df.sort_values(by = 'image')
train = df.iloc[:int(0.9*len(df))]
valid = df.iloc[int(0.9*len(df)):]

unq_train_imgs = train[['image']].drop_duplicates()
unq_valid_imgs = valid[['image']].drop_duplicates()
print(len(unq_train_imgs), len(unq_valid_imgs))

40455


Unnamed: 0,image,caption
0,1000268201_693b08cb0e.jpg,A child in a pink dress is climbing up a set of stairs in an entry way .
1,1000268201_693b08cb0e.jpg,A girl going into a wooden building .
2,1000268201_693b08cb0e.jpg,A little girl climbing into a wooden playhouse .


7282 810


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
class extractImageFeatureResNetDataSet():
    def __init__(self, data):
        self.data = data 
        self.scaler = transforms.Resize([224, 224])
        self.normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
        self.to_tensor = transforms.ToTensor()
    def __len__(self):  
        return len(self.data)

    def __getitem__(self, idx):

        image_name = self.data.iloc[idx]['image']
        img_loc = 'Images/'+str(image_name)

        img = Image.open(img_loc)
        # display(img)
        # display(self.scaler(img))
        t_img = self.normalize(self.to_tensor(self.scaler(img)))
        # plt.axis('off')
        # plt.imshow(  t_img.permute(1, 2, 0)  )

        return image_name, t_img

In [None]:
train_ImageDataset_ResNet = extractImageFeatureResNetDataSet(unq_train_imgs)
train_ImageDataloader_ResNet = DataLoader(train_ImageDataset_ResNet, batch_size = 1, shuffle=False)

In [None]:
img = train_ImageDataset_ResNet.__getitem__(100)[1]

In [None]:
img = train_ImageDataset_ResNet.__getitem__(2000)[1]

In [None]:
valid_ImageDataset_ResNet = extractImageFeatureResNetDataSet(unq_valid_imgs)
valid_ImageDataloader_ResNet = DataLoader(valid_ImageDataset_ResNet, batch_size = 1, shuffle=False)

In [None]:
resnet18 = torchvision.models.resnet18(pretrained=True).to(device)
resnet18.eval()
list(resnet18._modules)

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth


  0%|          | 0.00/44.7M [00:00<?, ?B/s]

['conv1',
 'bn1',
 'relu',
 'maxpool',
 'layer1',
 'layer2',
 'layer3',
 'layer4',
 'avgpool',
 'fc']

In [None]:
resNet18Layer4 = resnet18._modules.get('layer4').to(device)

In [None]:
def get_vector(t_img):
    
    t_img = Variable(t_img)
    my_embedding = torch.zeros(1, 512, 7, 7)
    def copy_data(m, i, o):
        my_embedding.copy_(o.data)
    
    h = resNet18Layer4.register_forward_hook(copy_data)
    resnet18(t_img)
    
    h.remove()
    return my_embedding

Convert the preprocessed images into feature vectors using the pretrained ResNets.

In [None]:
extract_imgFtr_ResNet_train = {}
for image_name, t_img in tqdm(train_ImageDataloader_ResNet):
    t_img = t_img.to(device)
    # print(t_img.shape)
    # pass down 
    embdg = get_vector(t_img)
    
    extract_imgFtr_ResNet_train[image_name[0]] = embdg

  0%|          | 0/7282 [00:00<?, ?it/s]

In [None]:
a_file = open("./EncodedImageTrainResNet.pkl", "wb")
pickle.dump(extract_imgFtr_ResNet_train, a_file)
a_file.close()

In [None]:
extract_imgFtr_ResNet_valid = {}
for image_name, t_img in tqdm(valid_ImageDataloader_ResNet):
    t_img = t_img.to(device)
    embdg = get_vector(t_img)
    extract_imgFtr_ResNet_valid[image_name[0]] = embdg

  0%|          | 0/810 [00:00<?, ?it/s]

In [None]:
for key, value in extract_imgFtr_ResNet_valid.items():
    print(key)
    print(value.shape)
    break

436015762_8d0bae90c3.jpg
torch.Size([1, 512, 7, 7])


In [None]:
a_file = open("./EncodedImageValidResNet.pkl", "wb")
pickle.dump(extract_imgFtr_ResNet_valid, a_file)
a_file.close()

# 3. CNN & LSTM Training

In [None]:
features_train, features_test = extract_imgFtr_ResNet_train, extract_imgFtr_ResNet_valid
n_train_images, n_test_images = len(features_train), len(features_test)
feature_dim = features_train['436015762_8d0bae90c3.jpg'][0].shape
print(n_train_images, n_test_images, feature_dim)

7282 810 torch.Size([512, 7, 7])


In [None]:
HIDDEN_DIM = 200
BATCH_SIZE = 10
LEARN_RATE = 0.001
NUM_EPOCHS = 10

In [None]:
def extract_captions(fn):
    image_captions_train, image_captions_test = [], []
    with open(fn) as f:
        for line in f.readlines():
            arr = line.split()
            arr = arr[0].split(',') + ['<s>'] + arr[1:] + ['</s>']
            if arr[0] in features_train:
                image_captions_train.append(arr)
            if arr[0] in features_test:
                image_captions_test.append(arr)
    return image_captions_train, image_captions_test

image_captions_train, image_captions_test = extract_captions('captions.txt')
n_train, n_test = len(image_captions_train), len(image_captions_test)
print(n_train, n_test)
for i in range(10):
    print(image_captions_train[i])

36410 4050
['1000268201_693b08cb0e.jpg', 'A', '<s>', 'child', 'in', 'a', 'pink', 'dress', 'is', 'climbing', 'up', 'a', 'set', 'of', 'stairs', 'in', 'an', 'entry', 'way', '.', '</s>']
['1000268201_693b08cb0e.jpg', 'A', '<s>', 'girl', 'going', 'into', 'a', 'wooden', 'building', '.', '</s>']
['1000268201_693b08cb0e.jpg', 'A', '<s>', 'little', 'girl', 'climbing', 'into', 'a', 'wooden', 'playhouse', '.', '</s>']
['1000268201_693b08cb0e.jpg', 'A', '<s>', 'little', 'girl', 'climbing', 'the', 'stairs', 'to', 'her', 'playhouse', '.', '</s>']
['1000268201_693b08cb0e.jpg', 'A', '<s>', 'little', 'girl', 'in', 'a', 'pink', 'dress', 'going', 'into', 'a', 'wooden', 'cabin', '.', '</s>']
['1001773457_577c3a7d70.jpg', 'A', '<s>', 'black', 'dog', 'and', 'a', 'spotted', 'dog', 'are', 'fighting', '</s>']
['1001773457_577c3a7d70.jpg', 'A', '<s>', 'black', 'dog', 'and', 'a', 'tri-colored', 'dog', 'playing', 'with', 'each', 'other', 'on', 'the', 'road', '.', '</s>']
['1001773457_577c3a7d70.jpg', 'A', '<s>', 

In [None]:
def extract_dict(fn):
    index_to_word, word_to_index = ['PAD'], {'PAD':0}
    image_captions = image_captions_train + image_captions_test
    for image_caption in image_captions:
        for word in image_caption[1:]:
            if word not in index_to_word:
                word_to_index[word] = len(index_to_word)
                index_to_word.append(word)
    return index_to_word, word_to_index, len(index_to_word)

index_to_word, word_to_index, embedding_dim = extract_dict('captions.txt')
print(len(index_to_word), index_to_word)
print(len(word_to_index), word_to_index)



In [None]:
def caption_to_tensor(caption, max_len_batch):
    caption_arr = np.zeros(max_len_batch)
    caption += ['PAD' for i in range(max_len_batch-len(caption))]
    for i in range(max_len_batch):
        caption_arr[i] = word_to_index[caption[i]]
    return torch.Tensor(caption_arr)

caption_example = ['A', 'child', 'climbing', 'up', 'stairs']
print(caption_example + ['PAD' for i in range(10-len(caption_example))])
print(caption_to_tensor(caption_example, 10))

In [None]:
def scores_to_caption(scores):
    predicted_indices = torch.argmax(scores, -1)
    predicted_captions = []
    for i in range(n_test):
        j = 0
        while j 
    return ret

In [None]:
def make_batch(batch_number):
    batch_index_first = batch_number * BATCH_SIZE
    batch_index_final = min(n_train, (batch_number+1)*BATCH_SIZE)
    # compute maximum caption length in batch
    max_len_batch = 0
    for i in range(batch_index_first, batch_index_final):
        if len(image_captions_train[i][1:]) > max_len_batch:
            max_len_batch = len(image_captions_train[i][1:])
    X_train_batch = torch.zeros((BATCH_SIZE, feature_dim[0],feature_dim[1], feature_dim[2]))
    Y_train_batch = torch.zeros((BATCH_SIZE, max_len_batch))
    for i in range(batch_index_first, batch_index_final):
        j = i - batch_index_first
        image, caption = image_captions_train[i][0], image_captions_train[i][1:]
        X_train_batch[j] = features_train[image]
        Y_train_batch[j] = caption_to_tensor(caption, max_len_batch)
    return X_train_batch, torch.flatten(Y_train_batch), Y_train_batch, max_len_batch

X_train_batch, Y_train_batch, Y_train_batch_unflattened, max_len_batch = make_batch(0)
print(X_train_batch.shape, Y_train_batch.shape, max_len_batch)

torch.Size([10, 512, 7, 7]) torch.Size([190]) 19


In [None]:
def make_test():
    X_test = torch.zeros((n_test, feature_dim[0],feature_dim[1], feature_dim[2]))
    for i in range(0, n_test):
        image = image_captions_test[i][0]
        X_test[i] = features_test[image]
    return X_test

X_test = make_test()
print(X_test.shape)

torch.Size([4050, 512, 7, 7])


In [None]:
class EncoderCNN(nn.Module):

    def __init__(self, feature_dim, embedding_dim):
        super(Encoder, self).__init__()
        self.encoder = nn.Sequential(nn.Conv2d(in_channels=feature_dim[0], out_channels=256, kernel_size=(3,3), stride=1, padding=0),
                                     torch.nn.BatchNorm2d(num_features=256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True, device=None, dtype=None),
                                     nn.ReLU(),
                                     nn.Conv2d(in_channels=256, out_channels=128, kernel_size=(3,3), stride=1, padding=0),
                                     torch.nn.BatchNorm2d(num_features=128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True, device=None, dtype=None),
                                     nn.ReLU(),
                                     nn.Linear(in_features=128*feature_dim[1]*feature_dim[2], out_features=embedding_dim))

    def forward(self, features):
        return self.encoder(features)

In [None]:
class DecoderLSTM(nn.Module):

    def __init__(self, embedding_dim, hidden_dim):
        super(RNNTagger, self).__init__()
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.hidden2tag = nn.Linear(hidden_dim, embedding_dim)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, in_sequence):
        lstm_out, _ = self.lstm(in_sequence)
        tag_space = self.hidden2tag(lstm_out)
        tag_scores = self.softmax(tag_space)
        return tag_scores

In [None]:
def teacher_forcer(encoded_features, Y_train_batch_unflattened, max_len_batch):
    label_output = np.zeros((BATCH_SIZE, max_len_batch, embedding_dim))
    # Index of samples in the current batch
    for idx_sentence in range(BATCH_SIZE):
        for idx_word in range(max_len_batch):
            if idx_word == 0:
                label_output[idx_sentence, idx_word, :] = encoded_features[idx_sentence, :]
            else:
                label_output[idx_sentence, idx_word, Y_train_batch_unflattened[idx_sentence, idx_word - 1]] = 1
    return label_output

In [None]:
def experiment():
    encoder = EncoderCNN(feature_dim, embedding_dim)
    decoder = DecoderLSTM(feature_dim, embedding_dim, HIDDEN_DIM)
    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam([{'params':encoder.parameters()},
                                  {'params':decoder.parameters()}], lr=LEARN_RATE, momentum=0.9)
    num_batches = n_train//BATCH_SIZE
    for epoch in range(NUM_EPOCHS):
        epoch_loss = 0
        for batch_number in range(num_batches):
            optimizer.zero_grad()
            X_train_batch, Y_train_batch, Y_train_batch_unflattened, max_len_batch = make_batch(batch_number)
            encoded_features = encoder(X_train_batch)
            in_sequence = teacher_forcer(encoded_features, Y_train_batch_unflattened, max_len_batch)
            scores = decoder(in_sequence)
            loss = loss_func(scores.reshape((BATCH_SIZE*max_len_batch, embedding_dim)), Y_train_batch)
            loss.backward()
            optimizer.step()
        # evaluate encoder decoder on test data
        encoded_features_test = encoder(X_test)
        in_sequence_test = teacher_forcer(encoded_features_test, Y_test_batch_unflattened, max_len_test)
        scores_test = decoder(in_sequence_test)
        
        bleu_score = evaluate(pred, image_captions_test)

In [None]:
def experiment():
    encoder = EncoderCNN(feature_dim, embedding_dim)
    decoder = DecoderLSTM(feature_dim, embedding_dim, HIDDEN_DIM)
    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam([{'params':encoder.parameters()},
                                  {'params':decoder.parameters()}], lr=LEARN_RATE, momentum=0.9)
    num_batches = n_train//BATCH_SIZE
    for epoch in range(NUM_EPOCHS):
        epoch_loss = 0
        for batch_number in range(num_batches):
            optimizer.zero_grad()
            X_train_batch, Y_train_batch, Y_train_batch_unflattened, max_len_batch = make_batch(batch_number)
            encoded_features = encoder(X_train_batch)
            in_sequence = teacher_forcer(encoded_features, Y_train_batch_unflattened, max_len_batch)
            scores = decoder(in_sequence)
            loss = loss_func(scores.reshape((BATCH_SIZE*max_len_batch, embedding_dim)), Y_train_batch)
            loss.backward()
            optimizer.step()
        
        

        bleu_score = evaluate(pred, Y_test_tensor)

In [None]:
def experiment():
    encoder = EncoderCNN(feature_dim, embedding_dim)
    decoder = DecoderLSTM(feature_dim, embedding_dim, HIDDEN_DIM)
    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam([{'params':encoder.parameters()},
                                  {'params':decoder.parameters()}], lr=LEARN_RATE, momentum=0.9)
    num_batches = n_train//BATCH_SIZE
    for epoch in range(NUM_EPOCHS):
        epoch_loss = 0
        for batch_number in range(num_batches):
            optimizer.zero_grad()
            X_train_batch, Y_train_batch, Y_train_batch_unflattened, max_len_batch = make_batch(batch_number)
            encoded_features = encoder(X_train_batch)
            in_sequence = teacher_forcer(encoded_features, Y_train_batch_unflattened, max_len_batch)




In [None]:
def experiment():
    # initialise model
    model = RNNTagger(EMBED_DIM, HIDDEN_DIM, len(tags))
    loss_func = nn.CrossEntropyLoss()
    optimiser = torch.optim.Adam(params=model.parameters(),
                                 lr=LEARN_RATE)
    # pre train stuff
    epochs, accuracies = [], []
    pre_train_accuracy = model_accuracy(model(X_test_tensor), Y_test_tensor)
    epochs.append(0)
    accuracies.append(pre_train_accuracy)
    print('%s 0/%d %.4f%%'%('-'*NUM_EPOCHS,
                            NUM_EPOCHS,
                            pre_train_accuracy*100))
    # train
    num_batch = int(n_train/BATCH_SIZE)
    for epoch in range(0, NUM_EPOCHS):
        epoch_loss = 0
        for batch in range(0, num_batch):
            optimiser.zero_grad()
            X_train_batch, Y_train_batch, max_len_batch = make_batch(batch)
            scores = model(X_train_batch)
            loss = loss_func(scores.reshape((BATCH_SIZE*max_len_batch, len(tags))), Y_train_batch)
            loss.backward()
            epoch_loss += loss
            optimiser.step()
        epochs.append(epoch+1)
        accuracy = model_accuracy(model(X_test_tensor), Y_test_tensor)
        accuracies.append(accuracy)
        print('%s%s %d/%d %.4f%%'%('#'*(epoch+1),
                                   '-'*(NUM_EPOCHS-epoch-1),
                                   epoch+1,
                                   NUM_EPOCHS,
                                   accuracy*100))
    # plot
    plt.plot(epochs, [100*i for i in accuracies], "-.", color='lightskyblue')
    plt.show()
    return