In [1]:
import numpy as np
from numpy import array
import pandas as pd
import matplotlib.pyplot as plt
import string
import os
from PIL import Image
import glob
import pickle
from time import time

In [2]:
# load descriptions
def load_doc(filename):
    file = open(filename, 'r')
    text = file.read()
    file.close()
    return text
    
  
def load_descriptions(doc):
    mapping = dict()
    for line in doc.split('\n'):
        tokens = line.split()
        if len(line) < 2:
            continue
        image_id, image_desc = tokens[0], tokens[1:]
        image_id = image_id.split('.')[0]
        image_desc = ' '.join(image_desc)
        if image_id not in mapping:
            mapping[image_id] = list()
        mapping[image_id].append(image_desc)
    return mapping
  
def clean_descriptions(descriptions):
    table = str.maketrans('', '', string.punctuation)
    for key, desc_list in descriptions.items():
        for i in range(len(desc_list)):
            desc = desc_list[i]
            desc = desc.split()
            desc = [word.lower() for word in desc]
            desc = [w.translate(table) for w in desc]
            desc = [word for word in desc if len(word)>1]
            desc = [word for word in desc if word.isalpha()]
            desc_list[i] =  ' '.join(desc)
            
    return descriptions

# save descriptions to file, one per line
def save_descriptions(descriptions, filename):
    lines = list()
    for key, desc_list in descriptions.items():
        for desc in desc_list:
            lines.append(key + ' ' + desc)
    data = '\n'.join(lines)
    file = open(filename, 'w')
    file.write(data)
    file.close()


# load clean descriptions into memory
def load_clean_descriptions(filename, dataset):
    doc = load_doc(filename)
    descriptions = dict()
    for line in doc.split('\n'):
        tokens = line.split()
        image_id, image_desc = tokens[0], tokens[1:]
        if image_id in dataset:
            if image_id not in descriptions:
                descriptions[image_id] = list()
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
            descriptions[image_id].append(desc)
    return descriptions
  
def load_set(filename):
    doc = load_doc(filename)
    dataset = list()
    for line in doc.split('\n'):
        if len(line) < 1:
            continue
        identifier = line.split('.')[0]
        dataset.append(identifier)
    return set(dataset)

In [3]:
filename = "../input/flickr8k/Flickr8k_text/Flickr8k.token.txt"
doc = load_doc(filename)
descriptions = load_descriptions(doc)
descriptions = clean_descriptions(descriptions)
save_descriptions(descriptions, 'descriptions.txt')
filename = '../input/flickr8k/Flickr8k_text/Flickr_8k.trainImages.txt'
train = load_set(filename)
train_descriptions = load_clean_descriptions('descriptions.txt', train)

In [4]:
# Create a list of all the training captions
all_train_captions = []
for key, val in train_descriptions.items():
    for cap in val:
        all_train_captions.append(cap)
        
        
# Consider only words which occur at least 10 times in the corpus
word_count_threshold = 10
word_counts = {}
nsents = 0
for sent in all_train_captions:
    nsents += 1
    for w in sent.split(' '):
        word_counts[w] = word_counts.get(w, 0) + 1

vocab = [w for w in word_counts if word_counts[w] >= word_count_threshold]
print('Preprocessed words {} -> {}'.format(len(word_counts), len(vocab)))


ixtoword = {}
wordtoix = {}

ix = 1
for w in vocab:
    wordtoix[w] = ix
    ixtoword[ix] = w
    ix += 1
    
vocab_size = len(ixtoword) + 1 # one for appended 0's

# Load Glove vectors
glove_dir = 'glove.6B'
embeddings_index = {}
f = open('../input/glove6b200d/glove.6B.200d.txt', encoding="utf-8")

for line in f:
    values = line.split()
    word = values[0]
    coefs = np.asarray(values[1:], dtype='float32')
    embeddings_index[word] = coefs
f.close()

embedding_dim = 200

# Get 200-dim dense vector for each of the words in out vocabulary
embedding_matrix = np.zeros((vocab_size, embedding_dim))

for word, i in wordtoix.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

Preprocessed words 7578 -> 1651


In [5]:

# # Below path contains all the images
# all_images_path = '../input/flickr8k/Flickr8k_Dataset/'
# # Create a list of all image names in the directory
# all_images = glob.glob(all_images_path + '*.jpg')

# # Create a list of all the training and testing images with their full path names
# def create_list_of_images(file_path):
#     images_names = set(open(file_path, 'r').read().strip().split('\n'))
#     images = []

#     for image in all_images: 
#         if image[len(all_images_path):] in images_names:
#             images.append(image)
  
#     return images


# train_images_path = '../input/flickr8k/Flickr8k_text/Flickr_8k.trainImages.txt'
# test_images_path = '../input/flickr8k/Flickr8k_text/Flickr_8k.testImages.txt'

# train_images = create_list_of_images(train_images_path)
# test_images = create_list_of_images(test_images_path)

# #preprocessing the images
# def preprocess(image_path):
#     img = image.load_img(image_path, target_size=(299, 299))
#     x = image.img_to_array(img)
#     x = np.expand_dims(x, axis=0)
#     x = preprocess_input(x)
#     return x

# # Load the inception v3 model
# model = InceptionV3(weights='imagenet')

# # Create a new model, by removing the last layer (output layer) from the inception v3
# model_new = Model(model.input, model.layers[-2].output)

# # Encoding a given image into a vector of size (2048, )
# def encode(image):
#     image = preprocess(image) 
#     fea_vec = model_new.predict(image) 
#     fea_vec = np.reshape(fea_vec, fea_vec.shape[1])
#     return fea_vec
  

# encoding_train = {}
# for img in train_images:
#     encoding_train[img[len(all_images_path):]] = encode(img)
    
    
# encoding_test = {}
# for img in test_images:
#     encoding_test[img[len(all_images_path):]] = encode(img)
    
# #Save the bottleneck features to disk
# with open("encoded_train_images.pkl", "wb") as encoded_pickle:
#     pickle.dump(encoding_train, encoded_pickle)
    
# with open("encoded_test_images.pkl", "wb") as encoded_pickle:
#     pickle.dump(encoding_test, encoded_pickle)
    
    
train_features = open("../input/image-caption-dataset/encoded_train_images.pkl", "rb")
train_features = pickle.load(train_features)

In [6]:
from tqdm.notebook import tqdm

In [7]:
list_to_csv = []
with open('../input/image-caption-dataset/descriptions.txt', 'r') as descriptions:
    lines = descriptions.readlines()
    for index in tqdm(range(0, len(lines))):
        image_and_words = lines[index].replace('\n', '').split(' ')
        
        image = image_and_words[0]
        words = image_and_words[1:]
        
        new_list_word = [word for word in words if word in wordtoix]
        if len(new_list_word) == 0:
            continue
        
        list_to_csv.append([image, 'startseq', new_list_word[0]])
        
        phrase_to_each_row = ['sartseq']
        for index_word, word in enumerate(new_list_word):
            if index_word == (len(new_list_word) - 1):
                phrase_to_each_row.append(word)
                list_to_csv.append([image, ' '.join(phrase_to_each_row), 'endseq'])
            else:
                phrase_to_each_row.append(word)
                list_to_csv.append([image, ' '.join(phrase_to_each_row), new_list_word[index_word + 1]])

  0%|          | 0/40460 [00:00<?, ?it/s]

In [8]:
ímagens = []
with open('../input/flickr8k/Flickr8k_text/Flickr_8k.trainImages.txt', 'r') as train_images:
    images = train_images.readlines()
    imagens = [image.replace('.jpg\n', '') for image in images]

In [9]:
data_frame_image_words = pd.DataFrame(list_to_csv, columns=['image', 'text_input', 'word_output'])
data_frame_image_words.to_csv('image_descriptions.csv', index=False)
data_frame_image_words.head()

Unnamed: 0,image,text_input,word_output
0,1000268201_693b08cb0e,startseq,child
1,1000268201_693b08cb0e,sartseq child,in
2,1000268201_693b08cb0e,sartseq child in,pink
3,1000268201_693b08cb0e,sartseq child in pink,dress
4,1000268201_693b08cb0e,sartseq child in pink dress,is


In [10]:
train_data_frame = data_frame_image_words.loc[data_frame_image_words['image'].isin(imagens)].reset_index(drop=True)

In [11]:
import torch
import torch.nn as nn
from torch.nn import Parameter
import torch.nn.functional as F
from torch.optim import Adam, SGD, AdamW
from torch.nn.utils.rnn import pad_sequence
import random, os

In [18]:
CFG = {
    'LR': 1e-3,
    'EPOCHS': 5,
    'BATCH_SIZE': 512,
    'DEVICE': 'cuda' if torch.cuda.is_available() else 'cpu',
}

In [13]:
class CustomDataLoader(torch.utils.data.Dataset):
    def __init__(self, data_frame= None, word_to_index = None, images_dict = None):
        self.data_frame = data_frame
        self.images_dict = images_dict
        self.word_to_index = word_to_index
        self.pad_tensor = torch.ones(34)
        self.num_classes = len(word_to_index)
    
    def __len__(self):
        return len(self.data_frame)
    
    def to_categorical(self, y):
    
        return np.eye(self.num_classes + 1)[y]
    
    def __getitem__(self, index):
        image_phrase_word = self.data_frame.iloc[index].values
        
        image_name = image_phrase_word[0]
        phrase = image_phrase_word[1]
        word_to_predict = image_phrase_word[2]
        
        phrase_to_index = [self.word_to_index[word] for word in phrase.split(' ') if word in self.word_to_index]
        phrase_to_index = torch.FloatTensor(phrase_to_index)
        
        word_to_predict_index = self.word_to_index[word_to_predict]
        
        image = self.images_dict[image_name + '.jpg']
        
        image = torch.tensor(image, dtype=torch.float32)
        phrase_indexs = torch.tensor(pad_sequence([phrase_to_index, self.pad_tensor], batch_first=True)[0], dtype=torch.long)
        #target = torch.tensor(word_to_predict_index, dtype=torch.long)
        target = torch.tensor(torch.from_numpy(self.to_categorical(word_to_predict_index)), dtype=torch.float32)

        return image, phrase_indexs, target

In [19]:
train = CustomDataLoader(data_frame = train_data_frame, word_to_index = wordtoix, images_dict = train_features)

train_loader = torch.utils.data.DataLoader(train,
                                        shuffle=True,
                                        pin_memory=True,
                                        batch_size=CFG['BATCH_SIZE'],
                                        num_workers=0)

In [15]:
class CustomModel(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.dp_05 = nn.Dropout(p = 0.5)
        self.dp_05_2 = nn.Dropout(p = 0.5)
        self.linear_image_input = nn.Linear(2048, 256)
        self.linear_output = nn.Linear(512, num_classes + 1)
        self.embedding = nn.Embedding.from_pretrained(torch.from_numpy(embedding_matrix).type(torch.float32))
        self.rnn = nn.LSTM(200, 256, batch_first=True)
        self.relu = nn.ReLU()
        
    def forward(self, image_features, phrase):
        image_features = self.dp_05(image_features)
        image_features = self.linear_image_input(image_features)
        image_features = self.relu(image_features)
        
        phrase_embedidng = self.embedding(phrase)
        phrase_embedidng = self.dp_05_2(phrase_embedidng)
        phrase_rnn = self.rnn(phrase_embedidng)[0]
        image_features_and_phrase = torch.cat((image_features, phrase_rnn[:, -1, :]), 1)
        output = self.linear_output(image_features_and_phrase)
        
        return output

In [16]:
model = CustomModel(len(wordtoix)).to(CFG['DEVICE'])
_loss = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=CFG['LR'])

In [None]:
loss_mean = []

see_loss = 40

for epoch in range(CFG['EPOCHS']):
    for i, (image, phrase, target) in enumerate(tqdm(train_loader, total=len(train_loader))):
        
        image = image.to(CFG['DEVICE'])
        target = target.to(CFG['DEVICE'])
        phrase = phrase.to(CFG['DEVICE'])
        
        output = model(image, phrase)
        
        optimizer.zero_grad()
        loss = _loss(output, target)
        loss.backward()
        optimizer.step()
        
        loss_mean.append(loss.item())
        
        if (i % see_loss) == 0:
            print(np.mean(loss_mean))
            loss_mean = []
        
        del image, phrase, target, output
    