In [25]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import os  # when loading file paths
import pandas as pd  # for lookup in annotation file
import spacy  # for tokenizer
import torch
from torch.nn.utils.rnn import pad_sequence  # pad batch
from torch.utils.data import DataLoader, Dataset
from PIL import Image  # Load img
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
from torchtext.data.metrics import bleu_score
from nltk.translate.bleu_score import corpus_bleu
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.meteor_score import meteor_score

import torch
import torch.nn as nn
import statistics
import torchvision.models as models
from torchvision.models import inception_v3, Inception_V3_Weights
import random


spacy_eng = spacy.load("en_core_web_sm")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [9]:
textDirPath = "Flickr8K/Flickr8k_text/" 
pathDirImage = "Flickr8K/Flicker8k_Images/"

tokenLemmaFile = pd.read_csv("Flickr8K/Flickr8k_text/Flickr8k.lemma.token.txt",sep = "\t", on_bad_lines='skip' , header = None )
token = pd.read_csv("Flickr8K/Flickr8k_text/Flickr8k.token.txt",sep = "\t",on_bad_lines='skip', header = None)
validationData = pd.read_csv("Flickr8K/Flickr8k_text/Flickr_8k.valImages.txt", on_bad_lines = 'skip', header = None)
trainingData = pd.read_csv("Flickr8K/Flickr8k_text/Flickr_8k.trainImages.txt",on_bad_lines='skip', header = None)
testingData = pd.read_csv("Flickr8K/Flickr8k_text/Flickr_8k.testImages.txt",on_bad_lines='skip', header = None)

# renaming columns with Image and caption
tokenLemmaFile = tokenLemmaFile.rename(columns = {0 : "Image" , 1 : "caption"})

# splitting using '#' token 
tokenLemmaFile["Image"] =[i.split('#')[0] for i in tokenLemmaFile["Image"].values ]


# drop the index column 

# validation file 
captionVal = tokenLemmaFile.where(tokenLemmaFile["Image"].isin(validationData[0].values) == True).dropna().reset_index().drop('index' , axis = 1)

# training file 
captionTrain = tokenLemmaFile.where(tokenLemmaFile["Image"].isin(trainingData[0].values) == True).dropna().reset_index().drop('index' , axis = 1)

# testing file
captionTest = tokenLemmaFile.where(tokenLemmaFile["Image"].isin(testingData[0].values) == True).dropna().reset_index().drop('index' , axis = 1)

In [10]:
print(captionTrain)

                           Image  \
0      1305564994_00513f9a5b.jpg   
1      1305564994_00513f9a5b.jpg   
2      1305564994_00513f9a5b.jpg   
3      1305564994_00513f9a5b.jpg   
4      1305564994_00513f9a5b.jpg   
...                          ...   
29995   985067019_705fe4a4cc.jpg   
29996   985067019_705fe4a4cc.jpg   
29997   985067019_705fe4a4cc.jpg   
29998   985067019_705fe4a4cc.jpg   
29999   985067019_705fe4a4cc.jpg   

                                                 caption  
0      A man in street racer armor be examine the tir...  
1             Two racer drive a white bike down a road .  
2      Two motorist be ride along on their vehicle th...  
3      Two person be in a small race car drive by a g...  
4           Two person in race uniform in a street car .  
...                                                  ...  
29995                A boy go down an inflatable slide .  
29996       A boy in red slide down an inflatable ride .  
29997               A boy be slide d

In [11]:
class Vocabulary:
    def __init__(self, freq_threshold):
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.freq_threshold = freq_threshold

    def __len__(self):
        return len(self.itos)

    @staticmethod
    def tokenizer_eng(text):
        return [tok.text.lower() for tok in spacy_eng.tokenizer(text)]

    def build_vocabulary(self, sentence_list):
        frequencies = {}
        idx = 4

        for sentence in sentence_list:
            for word in self.tokenizer_eng(sentence):
                if word not in frequencies:
                    frequencies[word] = 1

                else:
                    frequencies[word] += 1

                if frequencies[word] == self.freq_threshold:
                    self.stoi[word] = idx
                    self.itos[idx] = word
                    idx += 1

    def numericalize(self, text):
        tokenized_text = self.tokenizer_eng(text)

        return [
            self.stoi[token] if token in self.stoi else self.stoi["<UNK>"]
            for token in tokenized_text
        ]


class FlickrDataset(Dataset):
    def __init__(self, root_dir, captions_file, transform=None, freq_threshold=5):
        self.root_dir = root_dir
        #self.df = pd.read_csv(captions_file)
        self.df = captions_file
        self.transform = transform

        # Get img, caption columns
        self.imgs = self.df["Image"]
        self.captions = self.df["caption"]

        # Initialize vocabulary and build vocab
        self.vocab = Vocabulary(freq_threshold)
        self.vocab.build_vocabulary(self.captions.tolist())

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        caption = self.captions[index]
        img_id = self.imgs[index]
        filename = self.root_dir + "/" + img_id
        img = Image.open(filename)
        #img = Image.open(os.path.join(self.root_dir, img_id)).convert("RGB")

        if self.transform is not None:
            img = self.transform(img)

        numericalized_caption = [self.vocab.stoi["<SOS>"]]
        numericalized_caption += self.vocab.numericalize(caption)
        numericalized_caption.append(self.vocab.stoi["<EOS>"])

        return img, torch.tensor(numericalized_caption)


class MyCollate:
    def __init__(self, pad_idx):
        self.pad_idx = pad_idx

    def __call__(self, batch):
        imgs = [item[0].unsqueeze(0) for item in batch]
        imgs = torch.cat(imgs, dim=0)
        targets = [item[1] for item in batch]
        targets = pad_sequence(targets, batch_first=False, padding_value=self.pad_idx)

        return imgs, targets


def get_loader(
    root_folder,
    annotation_file,
    transform,
    batch_size=32,
    num_workers=2,
    shuffle=True,
    pin_memory=True,
):
    dataset = FlickrDataset(root_folder, annotation_file, transform=transform)

    pad_idx = dataset.vocab.stoi["<PAD>"]
    #pad_idx = 0

    loader = DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        shuffle=shuffle,
        pin_memory=pin_memory,
        collate_fn=MyCollate(pad_idx=pad_idx),
    )

    return loader, dataset



transform = transforms.Compose(
        [transforms.Resize((224, 224)), transforms.ToTensor(),]
)

#loader, dataset = get_loader(
#    "Flickr8k/Images", "flickr8k/captions.txt", transform=transform
#)

# for idx, (imgs, captions) in enumerate(loader):
#     print(imgs.shape)
#     print(captions.shape)

In [12]:
def print_examples(model, device, dataset):
    transform = transforms.Compose(
        [
            transforms.Resize((299, 299)),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ]
    )

    model.eval()
    test_img1 = transform(Image.open("test_examples/dog.jpg").convert("RGB")).unsqueeze(
        0
    )
    print("Example 1 CORRECT: Dog on a beach by the ocean")
    print(
        "Example 1 OUTPUT: "
        + " ".join(model.caption_image(test_img1.to(device), dataset.vocab))
    )
    test_img2 = transform(
        Image.open("test_examples/child.jpg").convert("RGB")
    ).unsqueeze(0)
    print("Example 2 CORRECT: Child holding red frisbee outdoors")
    print(
        "Example 2 OUTPUT: "
        + " ".join(model.caption_image(test_img2.to(device), dataset.vocab))
    )
    test_img3 = transform(Image.open("test_examples/bus.png").convert("RGB")).unsqueeze(
        0
    )
    print("Example 3 CORRECT: Bus driving by parked cars")
    print(
        "Example 3 OUTPUT: "
        + " ".join(model.caption_image(test_img3.to(device), dataset.vocab))
    )
    test_img4 = transform(
        Image.open("test_examples/boat.png").convert("RGB")
    ).unsqueeze(0)
    print("Example 4 CORRECT: A small boat in the ocean")
    print(
        "Example 4 OUTPUT: "
        + " ".join(model.caption_image(test_img4.to(device), dataset.vocab))
    )
    test_img5 = transform(
        Image.open("test_examples/horse.png").convert("RGB")
    ).unsqueeze(0)
    print("Example 5 CORRECT: A cowboy riding a horse in the desert")
    print(
        "Example 5 OUTPUT: "
        + " ".join(model.caption_image(test_img5.to(device), dataset.vocab))
    )
    model.train()


def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)


def load_checkpoint(checkpoint, model, optimizer):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])
    step = checkpoint["step"]
    return step

In [None]:
class Attention(nn.Module):

    def __init__(self, encoder_dim, decoder_dim, attention_dim):
        super(Attention, self).__init__()
        self.encoder_att = nn.Linear(encoder_dim, attention_dim)  # linear layer to transform encoded image
        self.decoder_att = nn.Linear(decoder_dim, attention_dim)  # linear layer to transform decoder's output
        self.full_att = nn.Linear(attention_dim, 1)  # linear layer to calculate values to be softmax-ed
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)  # softmax layer to calculate weights

    def forward(self, encoder_out, decoder_hidden):
        att1 = self.encoder_att(encoder_out)  # (batch_size, num_pixels, attention_dim)
        att2 = self.decoder_att(decoder_hidden)  # (batch_size, attention_dim)
        att = self.full_att(self.relu(att1 + att2.unsqueeze(1))).squeeze(2)  # (batch_size, num_pixels)
        alpha = self.softmax(att)  # (batch_size, num_pixels)
        attention_weighted_encoding = (encoder_out * alpha.unsqueeze(2)).sum(dim=1)  # (batch_size, encoder_dim)

        return attention_weighted_encoding, alpha


class DecoderWithAttention(nn.Module):

    def __init__(self, attention_dim, embed_dim, decoder_dim, vocab_size, encoder_dim=2048, dropout=0.5,teacher_forcing_ratio = 1):
        super(DecoderWithAttention, self).__init__()

        self.encoder_dim = encoder_dim
        self.attention_dim = attention_dim
        self.embed_dim = embed_dim
        self.decoder_dim = decoder_dim
        self.vocab_size = vocab_size
        self.dropout = dropout

        self.attention = Attention(encoder_dim, decoder_dim, attention_dim)  # attention network

        self.embedding = nn.Embedding(vocab_size, embed_dim)  # embedding layer
        for p in self.embedding.parameters():
            p.requires_grad = True
        
        self.dropout = nn.Dropout(p=self.dropout)
        self.decode_step = nn.LSTMCell(embed_dim + encoder_dim, decoder_dim, bias=True)  # decoding LSTMCell
        self.init_h = nn.Linear(encoder_dim, decoder_dim)  # linear layer to find initial hidden state of LSTMCell
        self.init_c = nn.Linear(encoder_dim, decoder_dim)  # linear layer to find initial cell state of LSTMCell
        self.f_beta = nn.Linear(decoder_dim, encoder_dim)  # linear layer to create a sigmoid-activated gate
        self.sigmoid = nn.Sigmoid()
        self.fc = nn.Linear(decoder_dim, vocab_size)  # linear layer to find scores over vocabulary
        self.init_weights()  # initialize some layers with the uniform distribution
        self.teacher_forcing_ratio = teacher_forcing_ratio

    def init_weights(self):
        self.embedding.weight.data.uniform_(-0.1, 0.1)
        self.fc.bias.data.fill_(0)
        self.fc.weight.data.uniform_(-0.1, 0.1)
            

    def forward(self, encoder_out, encoded_captions, caption_lengths):
        batch_size = encoder_out.size(0)
        encoder_dim = encoder_out.size(-1)
        vocab_size = self.vocab_size

        # Flatten image
        encoder_out = encoder_out.view(batch_size, -1, encoder_dim)  # (batch_size, num_pixels, encoder_dim)
        num_pixels = encoder_out.size(1)

        # Sort input data by decreasing lengths; why? apparent below
        caption_lengths, sort_ind = caption_lengths.squeeze(1).sort(dim=0, descending=True)
        encoder_out = encoder_out[sort_ind]
        encoded_captions = encoded_captions[sort_ind]

        # Embedding
        embeddings = self.embedding(encoded_captions)  # (batch_size, max_caption_length, embed_dim)

        # Initialize LSTM state
        mean_encoder_out = encoder_out.mean(dim=1)
        h = self.init_h(mean_encoder_out)  # (batch_size, decoder_dim)
        c = self.init_c(mean_encoder_out)

        # We won't decode at the <end> position, since we've finished generating as soon as we generate <end>
        # So, decoding lengths are actual lengths - 1
        decode_lengths = (caption_lengths - 1).tolist()

        # Create tensors to hold word predicion scores and alphas
        predictions = torch.zeros(batch_size, max(decode_lengths), vocab_size).to(device)
        alphas = torch.zeros(batch_size, max(decode_lengths), num_pixels).to(device)

        # At each time-step, decode by
        # attention-weighing the encoder's output based on the decoder's previous hidden state output
        # then generate a new word in the decoder with the previous word and the attention weighted encoding
        preds = embeddings[:,0,:]
        for t in range(max(decode_lengths)):
            batch_size_t = sum([l > t for l in decode_lengths])
            attention_weighted_encoding, alpha = self.attention(encoder_out[:batch_size_t],
                                                                h[:batch_size_t])
            gate = self.sigmoid(self.f_beta(h[:batch_size_t]))  # gating scalar, (batch_size_t, encoder_dim)
            attention_weighted_encoding = gate * attention_weighted_encoding
            teacher_force = random.random() < self.teacher_forcing_ratio
            top1 = preds.argmax(1)
            embeds_temp = self.embedding(top1)
            next_input = embeddings[:batch_size_t, t, :] if teacher_force else embeds_temp[:batch_size_t,:]
            h, c = self.decode_step(
                torch.cat([next_input, attention_weighted_encoding], dim=1),
                (h[:batch_size_t], c[:batch_size_t]))  # (batch_size_t, decoder_dim)
            preds = self.fc(self.dropout(h))  # (batch_size_t, vocab_size)
            predictions[:batch_size_t, t, :] = preds
            alphas[:batch_size_t, t, :] = alpha
            
        return predictions, encoded_captions, decode_lengths, alphas, sort_ind

In [13]:


class EncoderCNN(nn.Module):
    def __init__(self, embed_size, train_CNN=False):
        super(EncoderCNN, self).__init__()
        self.train_CNN = train_CNN
#         self.inception = models.inception_v3(pretrained=True, aux_logits=True)
        self.inception = inception_v3(Inception_V3_Weights.DEFAULT)
        self.inception.fc = nn.Linear(self.inception.fc.in_features, embed_size)
        self.relu = nn.ReLU()
        self.times = []
        self.dropout = nn.Dropout(0.5)

    def forward(self, images):
        self.inception.aux_logits = False
        features = self.inception(images)
        self.inception.aux_logits = True
        return self.dropout(self.relu(features))


class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(0.5)

    def forward(self, features, captions):
        embeddings = self.dropout(self.embed(captions))
        embeddings = torch.cat((features.unsqueeze(0), embeddings), dim=0)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.linear(hiddens)
        return outputs


class CNNtoRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers,attention_size, use_attn=0):
        super(CNNtoRNN, self).__init__()
        self.encoderCNN = EncoderCNN(embed_size)
        if not use_attn:
            self.decoderRNN = DecoderRNN(embed_size, hidden_size, vocab_size, num_layers)
        else:
            self.decoderRNN = DecoderWithAttention(embed_size, hidden_size, vocab_size, num_layers)

    def forward(self, images, captions):
        features = self.encoderCNN(images)
        outputs = self.decoderRNN(features, captions)
        return outputs

    def caption_image(self, image, vocabulary, max_length=50):
        result_caption = []

        with torch.no_grad():
            x = self.encoderCNN(image).unsqueeze(0)
            states = None

            for _ in range(max_length):
                hiddens, states = self.decoderRNN.lstm(x, states)
                output = self.decoderRNN.linear(hiddens.squeeze(0))
                predicted = output.argmax(1)
                lmao=predicted[0].item()
                result_caption.append(lmao)
                x = self.decoderRNN.embed(predicted).unsqueeze(0)

                if vocabulary.itos[lmao] == "<EOS>":
                    break

        return [vocabulary.itos[idx] for idx in result_caption]


In [14]:
import torch
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.tensorboard import SummaryWriter

def train():
    transform = transforms.Compose(
        [
            transforms.Resize((356, 356)),
            transforms.RandomCrop((299, 299)),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ]
    )


    train_loader, dataset = get_loader(root_folder=pathDirImage,annotation_file=captionTrain,transform=transform,num_workers=2)

    torch.backends.cudnn.benchmark = True
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    load_model = False
    save_model = True
    train_CNN = False

    # Hyperparameters
    embed_size = 256
    hidden_size = 256
    vocab_size = len(dataset.vocab)
    num_layers = 1
    use_attentn = 0
    attention_dim = 256
    learning_rate = 3e-4
    num_epochs = 10

    # for tensorboard
    writer = SummaryWriter("runs/flickr")
    step = 0

    # initialize model, loss etc
    model = CNNtoRNN(embed_size, hidden_size, vocab_size, num_layers,attention_dim,0).to(device)
    criterion = nn.CrossEntropyLoss(ignore_index=dataset.vocab.stoi["<PAD>"])
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Only finetune the CNN
    for name, param in model.encoderCNN.inception.named_parameters():
        if "fc.weight" in name or "fc.bias" in name:
            param.requires_grad = True
        else:
            param.requires_grad = train_CNN

    if load_model:
        step = load_checkpoint(torch.load("my_checkpoint.pth.tar"), model, optimizer)

    model.train()

    for epoch in range(num_epochs):
        # Uncomment the line below to see a couple of test cases
        # print_examples(model, device, dataset)

        if save_model:
            checkpoint = {
                "state_dict": model.state_dict(),
                "optimizer": optimizer.state_dict(),
                "step": step,
            }
            save_checkpoint(checkpoint)

        for idx, (imgs, captions) in tqdm(enumerate(train_loader), total=len(train_loader), leave=False):
            imgs = imgs.to(device)
            captions = captions.to(device)

            outputs = model(imgs, captions[:-1])
            loss = criterion(outputs.reshape(-1, outputs.shape[2]), captions.reshape(-1))

            writer.add_scalar("Training loss", loss.item(), global_step=step)
            step += 1

            optimizer.zero_grad()
            loss.backward(loss)
            optimizer.step()

    return model

max_split_size_mb=512
model = train()



=> Saving checkpoint


                                                 

In [15]:
'''transform = transforms.Compose(
    [transforms.Resize((224, 224)), transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
val_loader, dataset = get_loader(
        pathDirImage,
        captionVal,
        transform=transform,
        num_workers=2,
    )

lossesVal = [] 
for indices, (imgs, captions) in tqdm(
            enumerate(val_loader), total=len(val_loader), leave=False ):
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            imgs = imgs.to(device)
            captions = captions.to(device)

            Output = model(imgs, captions[:-1])
            model.caption_image(imgs) 
            technique = nn.CrossEntropyLoss(ignore_index=Dataset.vocab.stoi["<PAD>"])
            loss = technique(
                Output.reshape(-1, Output.shape[2]), captions.reshape(-1)
            )
            lossesVal.append(loss)'''
            #if indices % 500 : 
            #  print(loss)

#losses = []
#for i in lossesVal :
#  losses.append(i.cpu().detach().numpy())

'transform = transforms.Compose(\n    [transforms.Resize((224, 224)), transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]\n)\nval_loader, dataset = get_loader(\n        pathDirImage,\n        captionVal,\n        transform=transform,\n        num_workers=2,\n    )\n\nlossesVal = [] \nfor indices, (imgs, captions) in tqdm(\n            enumerate(val_loader), total=len(val_loader), leave=False ):\n            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")\n            imgs = imgs.to(device)\n            captions = captions.to(device)\n\n            Output = model(imgs, captions[:-1])\n            model.caption_image(imgs) \n            technique = nn.CrossEntropyLoss(ignore_index=Dataset.vocab.stoi["<PAD>"])\n            loss = technique(\n                Output.reshape(-1, Output.shape[2]), captions.reshape(-1)\n            )\n            lossesVal.append(loss)'

In [16]:
#plt.plot(pd.Series(losses).rolling(100).mean().dropna())

In [None]:
transform = transforms.Compose(
        [
            transforms.Resize((356, 356)),
            transforms.RandomCrop((299, 299)),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ]
    )


test_loader, dataset = get_loader(
        pathDirImage,
        captionTest,
        transform=transform,
        num_workers=2,
    )

rl_caps1 =[]
rl_caps =[]
for indices, (imgs, captions) in enumerate(test_loader):
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            imgs = imgs.to(device)
            captions = captions.to(device)
            Output = model(imgs, captions[:-1])


            for cap in captions.tolist():
                cap = [dataset.vocab.itos[idx] for idx in cap]
                rl_caps.append(cap)



            caption = model.caption_image(imgs.to(device), dataset.vocab)
            rl_caps1.append(caption)
            #bleu_score(caption.reshape(-1, Output.shape[2]), captions.reshape(-1))

bleu4 = 0
cnt=0
for i in range(len(rl_caps1)):
    cnt+=1
    bleu4+=sentence_bleu(rl_caps[i],rl_caps1[i])
bleu4/=cnt

meteor=0
cnt=0
for i in range(len(rl_caps1)):
    cnt+=1
    meteor+=meteor_score(rl_caps[i],rl_caps1[i])
meteor/=cnt

In [40]:
print(bleu4)

0.5642091925687237


In [41]:
print(meteor)

0.5031928922467322
