In [2]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import os
from tqdm import tqdm
from pickle import dump, load


In [3]:
image_path = "/Users/aditya/Programming/DL/datasets/Flicker8k_Dataset"
text_path = "/Users/aditya/Programming/DL/datasets/Flickr8k_text"

# check whether the image path exists in the disk
if not os.path.exists(image_path):
    raise Exception("Image path not found")

# check whether the text path exists in the disk
if not os.path.exists(text_path):
    raise Exception("Text path not found")

In [4]:
import string

'''
Class to load the dataset and preprocess them.
'''

class LoadToken:
    def __init__(self):
        self.token_path = text_path + "/Flickr8k.token.txt"

        if not os.path.exists(self.token_path):
            raise Exception("file not found")
    
    '''
    Method to load the document into memory
    
    PARAMS: None
    RETURNS: (str) text 
    '''
    def load_doc(self):
        file = open(self.token_path, "r")
        text = file.read()
        file.close()
        return text
    
    '''
    Method to the generate the image caption map
    
    PARAMS: None
    RETURNS: (dict) dictionary with image path as keys and captions as values
    '''
    def generate_img_caption_map(self):
        # load the doc 
        file = self.load_doc()
        
        # split the file based on the param '\n'
        captions = file.split("\n")
        print("captions : ", captions)
        descriptions = {}
        
        # iterate through each line of the document
        for caption in captions[:-1]:
            # split the line into image name and caption based on the param '\t'
            img, caption = caption.split("\t")
            
            # add the caption to the description based on the image name
            if img[:-2] not in descriptions:
                descriptions[img[:-2]] = [caption]
            else:
                descriptions[img[:-2]].append(caption)
        
        # return the dictionary of image-caption set
        return descriptions
    
    '''
    Method to clean the caption stored in the dictionary
    
    PARAMS: img_caption_map (dict) - keys (image name) and values (list of captions)
    RETURNS: updated dictionary
    '''
    def clean_captions(self, img_caption_map):
        # create a mapping table to convert punctuations into empty string.
        table = str.maketrans('','',string.punctuation)
        
        # iterate through (image_name, caption) dictionary
        for img, caps in img_caption_map.items():
            # iterate through each captions
            for i, cap in enumerate(caps):
                # replace the '-' into empty stirng
                cap.replace("-", " ")
                
                # split the sentence into words
                sentence = cap.split()
                
                # convert all words to lower case
                sentence = [word.lower() for word in sentence]   
                
                # remove punctuations in words
                sentence = [word.translate(table) for word in sentence]
                
                # remove words with less than 1 character
                sentence = [word for word in sentence if (len(word)>1)]                
                
                # remove words with numbers in them
                sentence = [word for word in sentence if (word.isalpha())]
                
                # convert back to string
                cap = " ".join(sentence)
                img_caption_map[img][i] = cap
                
        return img_caption_map
    
    '''
    Method to build the vocablury
    
    PARAMS: cleaned image caption map
    RETURNS: set consisting of unique words in the captions
    '''
    def build_vocab(self, img_cap_map):
        # initialize a empty set
        vocab = set()
        
        # iterate through each key in the img_cap_map
        for key in img_cap_map.keys():
            # iterate through each caption in the img_cap_map
            for cap in img_cap_map[key]:
                # add the word to the vocablury
                vocab.update(cap.split())
        
        # return the unique set of words
        return vocab
    
    '''
    Method to save the cleaned image caption map to the disk
    
    PARAMS: 
        img_cap_map (dict) - dictionary consisting of image-caption map
        filename (string) - name of the file to be saved in the disk
    '''
    def save_img_caption_map(self, img_cap_map, filename):
        # initialize an empty list
        lines = list()
        
        # iterate through each image-cpation in the img_cap_map
        for key, caps in img_cap_map.items():
            # iterate through each caption in the captions
            for cap in caps:
                # reform the sentence with '\t' as the 
                lines.append(key + "\t" + cap)
        
        # save the dictionary into a text file
        data = "\n".join(lines)
        file = open(filename, "w")
        file.write(data)
        file.close()
        
# create an instance of the class object
l = LoadToken()

# generate a image-caption map
img_cap_map = l.generate_img_caption_map()

# preprocess the image-caption map
captions = l.clean_captions(img_cap_map)

# build a vocablury based on the image-caption map
vocab = l.build_vocab(captions)

# save the img captions map
l.save_img_caption_map(img_cap_map, "img_cap_map.txt")

print("The length of the vocablury : ", len(vocab))
print(img_cap_map)
            


IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [5]:
'''
Method to tokenize the image caption map, where each word as a unique 
value associated to them

PARAMS: 
    captions (list of captions) 
RETURNS:
    tokens (dictionary) - return the tokenizer
'''
class Tokenizer:
    def __init__(self):
        self.word_index = 0
        self.tokens = {}
        
    def get_size(self):
        return self.word_index
        
    def get_word(self, word):
        print(word)
        return self.tokens[word]
    
    def create_tokenizer(self, captions):
        tokens = {}

        # iterate through each sentence in the caption list
        for sentence in captions:
            # iterate through each word in the sentence
            for word in sentence.split(" "):
                # if word not in tokens, assign a new token
                if word not in tokens:
                    tokens[word] = self.word_index
                    self.word_index+=1
                    
        self.tokens = tokens
        return tokens
    
    def text_to_sequence(self, string):
        out = []
        for word in string.split(" "):
            if word not in self.tokens:
                self.tokens[word] = self.word_index + 1
                self.word_index += 1
            seq_out = self.tokens[word]
            out.append(seq_out)
        return out
    

# tokenizer = get_tokenizer("basic_english")
captions_lst = []
for img, caption in captions.items():
    captions_lst.extend(caption)

T = Tokenizer()
tokens = T.create_tokenizer(captions_lst)
# print(tokens)

print("total number of tokens : ", T.word_index)
    

total number of tokens :  8764


In [5]:
# Try traning the model using transfer learning
from PIL import Image
import torchvision
from torchvision.models.feature_extraction import create_feature_extractor
from torchvision import transforms

def extract_features(directory):
    model_maxVitT = torchvision.models.maxvit_t(pretrained = True)
    
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    
    transform = transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)])
    
    for param in model_maxVitT.parameters():
        param.requires_grad = False
        
    model_maxVitT.eval()
    features = {}
    
    for img in tqdm(os.listdir(directory)[:]):
        filename = directory + "/" + img
        image = Image.open(filename)
        image = transform(image)
        image = image.reshape(1, image.shape[0], image.shape[1], image.shape[2])
        out = model_maxVitT(image)
        features[img] = out
        
    return features

features = extract_features(image_path)
print("Total number of features : ", len(features))
dump(features, open("features.p", "wb"))


  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]
  0%|          | 19/8091 [00:06<43:16,  3.11it/s]


KeyboardInterrupt: 

In [None]:
from torch.utils.data import Dataset, IterableDataset
from torch.utils.data import random_split
from torch.utils.data import DataLoader
from itertools import cycle

features = load(open("features.p", "rb"))
print(features[list(features.keys())[0]].shape)

def get_max_len(img_cap_map):
    l = -1
    for key, values in img_cap_map.items():
        for value in values:
            if len(value.split()) > l:
                l = len(value.split())
    return l

class ImageDataset(IterableDataset):
    def __init__(self, features, tokenizer, cap_map, description):
        self.features = features
        self.feat_keys = list(features.keys())
        
        self.tokenizer = tokenizer
        self.cap_map = cap_map
        self.max_len = get_max_len(cap_map)
        
        self.description = description
        self.desc_list = self.dict_to_list(self.description)
        self.batch_size = 10
        
    def __len__(self):
        return len(self.features)
    
    def dict_to_list(self, dictionary):
        l = []
        for key in dictionary.keys():
            [l.append(d) for d in dictionary[key]]
        
        return l
        
    def pad_sequences(self, seq):
        out = []
        out = [0] * (self.max_len - len(seq))
        out.extend(seq)
        return out
    
    def to_categorical(self, word):
        return np.eye(self.tokenizer.get_size(), dtype = "uint8")[word]
    
    def _create_seq(self, feature):        
        inp1, inp2, out = [], [], []       
        
        for desc in self.desc_list:
            seq = self.tokenizer.text_to_sequence(desc)
            for i in range(len(seq)):
                in_seq, out_seq = seq[:i], seq[i]
                in_seq = self.pad_sequences(in_seq)
                out_seq = self.to_categorical(out_seq)
                
                yield (feature, in_seq, out_seq)
                
#                 inp1.append(feature)
#                 inp2.append(in_seq)
#                 out.append(out_seq)
                
        # return inp1, inp2, out
        
    def get_stream(self, data_list):
        return chain.from_iterable(map(self._create_seq, cycle(data_list)))
    
    def get_streams(self):
        return zip(*[self.get_stream(self.features) for _ in range(self.batch_size)])
    
    def __getitem__(self, index):
        feature = self.features[self.feat_keys[index]][0]
        for i in self._create_seq(feature):
            print(i)
        inp_img, inp_seq, out_word = self._create_seq(feature)
        return (inp_img, inp_seq, out_word)
        
    def __iter__(self):
        # feature = self.features[self.feat_keys[index]][0]
        # inp_img, inp_seq, out_word = self._create_seq(feature)
        return self.get_streams()

        
max_len_desc = get_max_len(img_cap_map)
print("max length of description : ", max_len_desc)
dataset = ImageDataset(features, T, img_cap_map, captions) 
print("feature len : ", len(features))
train_set, test_set = random_split(dataset, [int(len(features)*0.8), len(features)-int(len(features)*0.8)])
batch_size = 1024

print("train set length : ", len(train_set))
print("test set length : ", len(test_set))

train_loader = DataLoader(dataset = train_set, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset = test_set, batch_size = batch_size, shuffle = True)

for i in train_loader:
    print(i)

torch.Size([1, 1000])
max length of description :  32
feature len :  8091
train set length :  6472
test set length :  1619
(tensor([ 1.7584e+00,  2.3391e-01,  7.6147e-01,  7.1104e-01,  5.3948e-01,
         9.2740e-01,  7.5931e-01,  9.8646e-01,  1.4057e+00,  8.9571e-01,
         8.6744e-01,  8.3444e-02,  8.9342e-01,  9.6140e-01,  1.6569e+00,
         8.2297e-01,  4.5584e-01,  1.5576e+00,  1.5395e+00,  1.5129e+00,
         6.5549e-01,  8.1597e-01,  4.1611e-01,  1.6993e+00,  1.0080e+00,
         9.7956e-01,  1.1930e+00,  2.1053e+00,  9.0237e-01,  8.7561e-01,
         1.2640e+00,  1.5600e+00,  1.6934e+00,  1.1078e+00,  1.3484e+00,
         1.2823e+00,  9.6105e-01,  1.3204e+00,  6.1552e-01,  1.1118e+00,
         7.9369e-01,  7.5185e-01,  5.5474e-01,  1.7245e+00,  1.5809e+00,
         5.3167e-01,  5.0530e-01,  9.1247e-01,  5.6576e-01,  1.3411e+00,
         1.5338e+00,  2.3797e+00,  2.3206e+00,  1.0631e+00,  1.0108e+00,
         1.0754e+00,  1.1164e+00,  9.2635e-01,  1.2023e+00,  1.6232e+00,


IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



(tensor([ 1.7584e+00,  2.3391e-01,  7.6147e-01,  7.1104e-01,  5.3948e-01,
         9.2740e-01,  7.5931e-01,  9.8646e-01,  1.4057e+00,  8.9571e-01,
         8.6744e-01,  8.3444e-02,  8.9342e-01,  9.6140e-01,  1.6569e+00,
         8.2297e-01,  4.5584e-01,  1.5576e+00,  1.5395e+00,  1.5129e+00,
         6.5549e-01,  8.1597e-01,  4.1611e-01,  1.6993e+00,  1.0080e+00,
         9.7956e-01,  1.1930e+00,  2.1053e+00,  9.0237e-01,  8.7561e-01,
         1.2640e+00,  1.5600e+00,  1.6934e+00,  1.1078e+00,  1.3484e+00,
         1.2823e+00,  9.6105e-01,  1.3204e+00,  6.1552e-01,  1.1118e+00,
         7.9369e-01,  7.5185e-01,  5.5474e-01,  1.7245e+00,  1.5809e+00,
         5.3167e-01,  5.0530e-01,  9.1247e-01,  5.6576e-01,  1.3411e+00,
         1.5338e+00,  2.3797e+00,  2.3206e+00,  1.0631e+00,  1.0108e+00,
         1.0754e+00,  1.1164e+00,  9.2635e-01,  1.2023e+00,  1.6232e+00,
         1.6735e+00,  1.3392e+00,  1.0929e+00,  2.0217e+00,  1.0812e+00,
         4.9683e-01,  1.0653e+00,  1.4947e+00,  1.

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



(tensor([ 1.7584e+00,  2.3391e-01,  7.6147e-01,  7.1104e-01,  5.3948e-01,
         9.2740e-01,  7.5931e-01,  9.8646e-01,  1.4057e+00,  8.9571e-01,
         8.6744e-01,  8.3444e-02,  8.9342e-01,  9.6140e-01,  1.6569e+00,
         8.2297e-01,  4.5584e-01,  1.5576e+00,  1.5395e+00,  1.5129e+00,
         6.5549e-01,  8.1597e-01,  4.1611e-01,  1.6993e+00,  1.0080e+00,
         9.7956e-01,  1.1930e+00,  2.1053e+00,  9.0237e-01,  8.7561e-01,
         1.2640e+00,  1.5600e+00,  1.6934e+00,  1.1078e+00,  1.3484e+00,
         1.2823e+00,  9.6105e-01,  1.3204e+00,  6.1552e-01,  1.1118e+00,
         7.9369e-01,  7.5185e-01,  5.5474e-01,  1.7245e+00,  1.5809e+00,
         5.3167e-01,  5.0530e-01,  9.1247e-01,  5.6576e-01,  1.3411e+00,
         1.5338e+00,  2.3797e+00,  2.3206e+00,  1.0631e+00,  1.0108e+00,
         1.0754e+00,  1.1164e+00,  9.2635e-01,  1.2023e+00,  1.6232e+00,
         1.6735e+00,  1.3392e+00,  1.0929e+00,  2.0217e+00,  1.0812e+00,
         4.9683e-01,  1.0653e+00,  1.4947e+00,  1.

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [None]:
from torchsummary import summary
import torch
print("not died")

class Model(torch.nn.Module):
    def __init__(self, input_size, vocab_size):
        super(Model, self).__init__()
        self.vocab_size = vocab_size
        
        # layers
        self.linear1 = torch.nn.Linear(input_size, 256)
        # self.linear2 = torch.nn.Linear(max_length, vocab_size) 
        self.embedding1 = torch.nn.Embedding(vocab_size, 256, padding_idx=0)
        self.lstm1 = torch.nn.LSTM(256, 256, 2)
        
        # stable layers
        self.dropout = torch.nn.Dropout(p=0.2)
        self.relu = torch.nn.ReLU()
        self.softmax = torch.nn.Softmax()
        
    def forward(self, x1, x2):
        # dense branch 
        self.input_layer1 = self.linear1(x1)
        drop1 = self.dropout(self.input_layer1)
        act = self.relu(drop1)
        
        # lstm branch
        self.input_layer2 = self.embedding1(x2)
        drop2 = self.dropout(embed1)
        lstm1 = self.lstm1(drop2)
        
        # merge 2 branches
        dec1 = act + lstm1
        dec2 = self.linear1(dec1)
        self.output_layer = self.linear(dec2)
        self.output_layer = self.softmax(self.output_layer)
        
        return self.output_layer
    
device = torch.device("mps")
model = Model(1000, 32)

model.to(device)

learning_rate = 1e-3
num_epochs = 10

print("not died")

optimzer = torch.optim.Adam(model.parameters(), lr = learning_rate)
criterion = torch.nn.CrossEntropyLoss()

print("not died")

for epoch in tqdm(range(num_epochs)):
    losses = []
    print("not died")
    for batch_idx, (inp1, inp2, targets) in enumerate(train_loader):
        print("not died")
        inp1 = inp1.to(device = device)
        print("not died")
        inp2 = inp2.to(device = device)
        print("not died")
        targets = targets(device = device)
        
        print("not died")
        # forward
        scores = model(inp1, inp2)
        loss = criterion(scores, target)
        print("not died")
        
        losses.append(loss.item())
        
        # backward
        optmizer.zero_grad()
        print("not died")
        loss.backward()
        
        
        # gradient descent 
        optimizer.step()
        
    print("cost of epoch {epoch} is {sum(losses)/len(losses)}")
        
        
        
        
        

not died
not died
not died


  0%|          | 0/10 [00:00<?, ?it/s]

not died


In [36]:
def runLengthDecoding(encodedString):
    decodedString = ""
    count = ""
    for iter, i in enumerate(encodedString):
        if iter%2 == 0:
            count = i
        else:
            decodedString += i * int(count)
            count = ""
    return decodedString

print("a")
print(runLengthDecoding("11"))

a
1


In [None]:
https://data-flair.training/blogs/python-based-project-image-caption-generator-cnn/