# Deep Learning Mini-Challenge 2: Image Captioning

**Task description:** 

**Description of the dataset:** 

In [None]:
import torch
import torch.nn.functional as F
from torch import nn
import os

from torch.utils.data import Dataset
import numpy as np
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.model_selection import train_test_split
from torchtext.data.utils import get_tokenizer
from torch.utils.data import DataLoader
import torchtext

import wandb

# print(torch.__version__)
# print(torchtext.__version__)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
image_path = "../../data/Flickr8k/Images/"
label_path = "../../data/Flickr8k/"

In [None]:
def read_labels(label_path, skip_header=True):
    '''
    Reads the labels and caption text from the captions.txt file in the specified path
    '''
    with open(label_path + "captions.txt") as f:
        if skip_header:
            next(f)
        lines = f.readlines()
        lines = [line.replace("\n", "") for line in lines]
        lines = [line.split(".jpg,") for line in lines]
        filenames = [line[0] + ".jpg" for line in lines]
        text = [line[1] for line in lines]
        return(pd.DataFrame([filenames, text], index=(["filename", "text"])).T)

df_caption = read_labels(label_path)
df_caption.head()

## Explorative data analysis

In [None]:
def show_sample_imeages(df, n=3, m=2):
    '''
    Visualises a number of images with the corresponding captions
    '''
    fig, axes = plt.subplots(n, m, figsize=(22,14))
    unique_files = df_caption.filename.unique()

    for i in range(n*m):
        filename = unique_files[i]
        caption = "\n".join(list(df.loc[df["filename"]==unique_files[i]]["text"]))
        img = mpimg.imread(image_path + filename)
        axes[i//m, i%m].imshow(img)
        axes[i//m, i%m].set_title(caption)
    plt.subplots_adjust(hspace = 0.8)
    plt.show()


show_sample_imeages(df_caption)

In [None]:
plt.figure(figsize=(10,5))
sns.ecdfplot(df_caption.text.apply(str.split).apply(len))
plt.title("ecdf of nr of words per caption")
plt.xlabel("nr of words")
plt.ylabel("proportion")
plt.show()

(df_caption.text.apply(str.split).apply(len)).quantile([.5,.6,.7,.8,.9,.95,1])

In [None]:
def plot_image_sizes(df):
    '''
    Visualizes the height and width of the images in nr of pixels.
    '''
    # read image sizes
    widths, heights = [], []
    for i in range(len(df)):
        filename = df.iloc[i]
        img = mpimg.imread(image_path + filename)
        width, heigth, chanels = np.shape(img)
        widths.append(width)
        heights.append(heigth)

    #create plot
    plt.figure(figsize=(8,8))
    plt.scatter(widths, heights)
    plt.title("Image sizes")
    plt.xlabel("width of images in pixels")
    plt.xlabel("heights of images in pixels")
    plt.show()

unique_filenames = df_caption["filename"].drop_duplicates()
# plot_image_sizes(unique_filenames)

## Preprocessing 

### Preprocessing Images

Preprocessing the images includes the following transformations: 
- `ToPILImage` Transformes the input images to a PIL image which provides the python interpreter with editing capabilities using the **P**ython **I**maging **L**ibrary.
- `CenterCrop` Crops the images from the center, resulting in a fixed image resolution. Images with less pixels than specified recieve a padding of zeros to fill the gap.
- `ToTensor` Trainforms the numpy format to a tensor.

In [None]:
from torchvision.transforms import Compose, CenterCrop, ToTensor, ToPILImage, Normalize

image_transform = Compose([
    ToPILImage(),
    CenterCrop((224, 224)), # resnet18 input shape
    ToTensor(),
    Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

### Preprocessing Captions

In this section, the captions for the images are preprocessed. The captions are originally provided as strings. In a first step they are processed using the `basic_english` tokenizer included in the torchtext library. It performs several operations such as: lowercasing and replacing certain symbols using a pattern dict. We also limit the maximum number of words per caption to 20, since over 95 percent of all captions are within this range. Sentences with less than 20 words are padded using the `<pad>` token. Finally, we mark the beginning `<bos>` and end `<eos>` with the corresponding tokens, giving all captions a fixed length of 22 tokens.

In [None]:
# define special tokens
start_token = "<bos>"
stop_token = "<eos>"
unknown_token = "<unk>"
padding_token = "<pad>"

# define caption boundaries
max_length  = 20

# specify tokenizer
tokenizer = get_tokenizer('basic_english')

def preprocess_caption(text):
    '''
    Tokenizes the captions and applies preprocessing steps.
    '''
    # tokenize words with torchtext
    tokens = tokenizer(text)
    # cut list length to max_length
    tokens = tokens[:max_length]
    #pad to short sentences
    tokens = tokens + [padding_token] * (max_length - len(tokens))
    # add start and end token
    tokens = [start_token] + tokens + [stop_token]
    return tokens

df_caption["text_tokens"] = df_caption["text"].apply(preprocess_caption)

### Define Embedding 

https://nlp.stanford.edu/projects/glove/

In [None]:
from collections import Counter, OrderedDict
from torchtext.vocab import Vocab, GloVe

#define embeding method
vectors = "glove.6B.100d"

# define minimal required occurence of words
min_word_count = 3

# count vocabulary
vocab_count = Counter()
for capiton in df_caption["text_tokens"]:
    vocab_count.update(capiton)
sorted_by_freq_tuples = sorted(vocab_count.items(), key=lambda x: x[1], reverse=True)
ordered_dict = OrderedDict(sorted_by_freq_tuples)

# define vocabulary
vocab = Vocab(
    vocab_count,
    vectors=vectors,  
    min_freq=min_word_count, 
    specials=((start_token, stop_token, unknown_token, padding_token)))

# comparison between vocabs
glove = GloVe(name='6B', dim=100)
print("GloVe vocab:", glove.vectors.size())

print("Reduced vocab:", vocab.vectors.size())

**Description:**

In [None]:
def embed_tokens(text):
    '''
    Encodes the tokens from string to integer using our vocabulary
    '''
    return [vocab.stoi[word] for word in text]

def inverse_embed_tokens(text):
    '''
    Encodes the tokens from integer to string using our vocabulary
    '''
    return [vocab.itos[word] for word in text]

text_list = ["<bos>", "Simon", "is", "in", "this", "picture", ":)", "<eos>"]

embedded_text = embed_tokens(text_list)
print("Encoding:", embedded_text)
reconstructed_text = inverse_embed_tokens(embedded_text)
print("Inverse Encoding:", reconstructed_text)

In [None]:
df_caption["text_encoded"] = df_caption["text_tokens"].apply(embed_tokens)
df_caption[["text", "text_tokens", "text_encoded"]].head()

**Description:**

### Train-test split

In [None]:
train_files, test_files = train_test_split(unique_filenames, test_size=0.2)
df_train = df_caption.loc[ df_caption["filename"].isin( list(train_files) )]
df_test = df_caption.loc[ df_caption["filename"].isin( list(test_files) )]

train_img_labels = set(df_train["filename"])
test_img_labels = set(df_test["filename"])
print("Proportion of train set:", len(train_img_labels) / (len(train_img_labels) + len(test_img_labels)))
print("Proportion of test set:", len(test_img_labels) / (len(train_img_labels) + len(test_img_labels)))
print("Overlapping labels of train and test set:", sum([label in train_img_labels for label in test_img_labels]))

### Create train and test set

In [None]:
class Flickr8kDataset(Dataset):
    
    def __init__(self, df, image_path, transform=None):
        """
        Args:
            df (pandas DataFrame): contains the filenames and the captions of the pictures
            transform (callable, optional): Optional transform to apply on the images
        """
        self.df = df
        self.transform = transform

    def __len__(self):
        return self.df.shape[0]

    def __getitem__(self, idx):
        df_row = self.df.iloc[idx, :]
        image = mpimg.imread(image_path + df_row['filename'])
        if self.transform:
            image = self.transform(image)

        caption = torch.from_numpy(np.array(df_row['text_encoded']))
        length = torch.from_numpy(np.array(len(df_row['text_encoded'])))
        return image, caption, length
        


In [None]:
train_set = Flickr8kDataset(df_train, image_path, transform=image_transform)
test_set = Flickr8kDataset(df_test, image_path, transform=image_transform)

### Define the dataloader

In [None]:
# Set seed 
torch.manual_seed(42)
batch_size = 16

train_dataloader = DataLoader(
    dataset=train_set, 
    batch_size=batch_size, 
    shuffle=True)

test_dataloader = DataLoader(
    dataset=test_set, 
    batch_size=batch_size, 
    shuffle=False)

In [None]:
example_batch = iter(train_dataloader)
samples, labels, length = example_batch.next()
np.shape(length)

### Define models

In [None]:
import torchvision.models as models

class EncoderCNN(nn.Module):

    def __init__(self, embed_size, train_cnn=False, dropout=0):
        '''
        Args:
            embed_size (int)
            train_cnn (bool) if true trains the complete network
            dropout (float): dropout ratio after the last layer
        '''
        super(EncoderCNN, self).__init__()
        self.train_cnn = train_cnn
        self.cnn_model = models.resnet18(pretrained=True)
        self.cnn_model.fc = nn.Linear(self.cnn_model.fc.in_features, embed_size) # resize outout shape
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)

    def forward(self, images):
        features = self.cnn_model(images)

        # specify if the complete network should be trained or only the last one
        for name, param in self.cnn_model.named_parameters():
            if "fc.weight" in name or "fc.bias" in name:
                param.requires_grad = True
            else:
                param.requires_grad = self.train_cnn
        return self.dropout(self.relu(features))



class DecoderRNN(nn.Module):

    def __init__(self, embed_size, hidden_size, vocab_size, num_layers, dropout=0):
        super(DecoderRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size) # ????????????????
        self.lstm = nn.LSTM(input_size=embed_size, 
                            hidden_size=hidden_size,
                            num_layers=num_layers,
                            # batch_first=True
                            )
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(dropout)
        

    def forward(self, features, captions):
        # print("captions shape", np.shape(captions))
        # embedding of captions
        embeddings = self.dropout(self.embedding(captions))
        # print("Embedded captions shape", np.shape(embeddings))
        # print("features shape", np.shape(features))
        embeddings = torch.cat((features.unsqueeze(1), embeddings), dim=1)
        
        hiddens, _ = self.lstm(embeddings) #(1, batch size, len_embedding)
        outputs = self.linear(hiddens)
        return outputs



class CNNtoRNN(nn.Module):

    def __init__(self, embed_size, hidden_size, vocab_size, num_layers, dropout=0):
        super(CNNtoRNN, self).__init__()
        self.encoderCNN = EncoderCNN(embed_size=embed_size)
        self.decoderRNN = DecoderRNN(embed_size=embed_size, hidden_size=hidden_size, vocab_size=vocab_size, num_layers=num_layers)

    def forward(self, images, capitons):
        features = self.encoderCNN(images)
        outputs = self.decoderRNN(features, capitons)
        return outputs

    def caption_images(self, image, vocabulary, max_length = 30):
        '''Creates a caption for a single image'''
        caption_result = []
        with torch.no_grad():
            x = self.encoderCNN(image).unsqueeze(0)
            states = None

            for _ in range(max_length):
                hidden, states = self.decoderRNN.lstm(x, states)
                output = self.decoderRNN.linear(hidden.squeeze(0))
                predicted = output.argmax(1)

                caption_result.append(predicted.item())
                x = self.decoderRNN.embedding(predicted).unsqueeze(0)

                if vocabulary.itos[predicted.item()] == "<eos>":
                    break
            return [vocabulary.itos[idx] for idx in caption_result]


In [None]:
# class EncoderCNN(nn.Module):
#     def __init__(self, embed_size):
#         """Load the pretrained ResNet-152 and replace top fc layer."""
#         super(EncoderCNN, self).__init__()
#         resnet = models.resnet152(pretrained=True)
#         modules = list(resnet.children())[:-1]      # delete the last fc layer.
#         self.resnet = nn.Sequential(*modules)
#         self.linear = nn.Linear(resnet.fc.in_features, embed_size)
#         self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)
        
#     def forward(self, images):
#         """Extract feature vectors from input images."""
#         with torch.no_grad():
#             features = self.resnet(images)
#         features = features.reshape(features.size(0), -1)
#         features = self.bn(self.linear(features))
#         return features


# class DecoderRNN(nn.Module):
#     def __init__(self, embed_size, hidden_size, vocab_size, num_layers, max_seq_length=20):
#         """Set the hyper-parameters and build the layers."""
#         super(DecoderRNN, self).__init__()
#         self.embed = nn.Embedding(vocab_size, embed_size)
#         self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
#         self.linear = nn.Linear(hidden_size, vocab_size)
#         self.max_seg_length = max_seq_length
        
#     def forward(self, features, captions, lengths):
#         """Decode image feature vectors and generates captions."""
#         embeddings = self.embed(captions)
#         print("captions shape", np.shape(captions))
#         embeddings = torch.cat((features.unsqueeze(1), embeddings), 1)
#         packed = pack_padded_sequence(embeddings, lengths, batch_first=True) 
#         hiddens, _ = self.lstm(packed)
#         outputs = self.linear(hiddens[0])
#         return outputs
    
#     def sample(self, features, states=None):
#         """Generate captions for given image features using greedy search."""
#         sampled_ids = []
#         inputs = features.unsqueeze(1)
#         for i in range(self.max_seg_length):
#             hiddens, states = self.lstm(inputs, states)          # hiddens: (batch_size, 1, hidden_size)
#             outputs = self.linear(hiddens.squeeze(1))            # outputs:  (batch_size, vocab_size)
#             _, predicted = outputs.max(1)                        # predicted: (batch_size)
#             sampled_ids.append(predicted)
#             inputs = self.embed(predicted)                       # inputs: (batch_size, embed_size)
#             inputs = inputs.unsqueeze(1)                         # inputs: (batch_size, 1, embed_size)
#         sampled_ids = torch.stack(sampled_ids, 1)                # sampled_ids: (batch_size, max_seq_length)
#         return sampled_ids


def wandb_log(log_dict, e):
    wandb.log(log_dict, step=e)

In [None]:
import torch.optim as optim
from tqdm import tqdm
from torch.nn.utils.rnn import pack_padded_sequence

wandb.init(project="del_mc2_image_cap", entity="Simon")

# hyperparameters
embedding_size=256
hidden_size = 256
vocab_size = vocab.vectors.size()[0]
num_layers = 1
learning_rate = 0.0003
num_epochs=3

model = CNNtoRNN(embedding_size, hidden_size, vocab_size, num_layers).to(device)
criterion = nn.CrossEntropyLoss() # ignore_index
optimizer = optim.Adam(model.parameters(), lr = learning_rate)


model.train()


step = 0
for epoch in range(num_epochs):
    print("epoch:", epoch)
    for i, (imgs, captions, lengths) in tqdm(enumerate(train_dataloader)):
        step += len(imgs)
        # print(np.shape(imgs), np.shape(captions))
        imgs = imgs.to(device)
        captions = captions.to(device)
        # targets = pack_padded_sequence(captions, length, batch_first=True)[0]
        targets = pack_padded_sequence(captions, lengths, enforce_sorted=False, batch_first=True)[0]
        # print(targets)
        outputs = model(imgs, captions[:,:-1])
        
        # print(np.shape(captions.reshape(-1)))
        # # outputs = pack_padded_sequence(outputs, lengths.tolist(), enforce_sorted=False, batch_first=True)[0]
        # print(np.shape(outputs.reshape(-1, outputs.shape[2])))
        # print(np.shape(targets))
        # # loss = criterion(outputs, targets)
        loss = criterion(outputs.reshape(-1, outputs.shape[2]), captions.reshape(-1).type(torch.long))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        wandb.log({"train_loss": loss}, step=step)



In [None]:
for i in range(len(outputs[0])):
    print(vocab.itos[outputs[0][i].argmax().item()])

In [None]:
np.shape(outputs[0][0])

In [None]:
len(outputs[0])

In [None]:
model.caption_images(vocab)