In [None]:

!pip install kaggle transformers sentence-transformers torch torchvision numpy matplotlib pillow


In [None]:
!kaggle datasets download sakshighadigaonkar/flickr-8k


In [None]:
# libraries
import numpy as np
from numpy import array
import matplotlib.pyplot as plt
%matplotlib inline

import string
import os
from PIL import Image
from time import time

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn.functional as F

In [None]:
token_path = "./flickr-8k/Flickr8k_text/Flickr8k.token.txt"
train_images_path = './flickr-8k/Flickr8k_text/Flickr_8k.trainImages.txt'
test_images_path = './flickr-8k/Flickr8k_text/Flickr_8k.testImages.txt'
images_path = './flickr-8k/Flickr8k_Dataset/Flicker8k_Dataset'

doc = open(token_path,'r').read()

In [None]:
descriptions = dict()
for line in doc.split('\n'):
        tokens = line.split()
        if len(line) > 2:
          image_id = tokens[0].split('.')[0]
          image_desc = ' '.join(tokens[1:])
          if image_id not in descriptions:
              descriptions[image_id] = list()
          descriptions[image_id].append(image_desc)

In [None]:
vocabulary = set()
for key in descriptions.keys():
        [vocabulary.update(d.split()) for d in descriptions[key]]
print('Original Vocabulary Size: %d words' % len(vocabulary))

In [None]:
## Manipulation of the descriptions to prepare them to join them with their correspondent images
lines = list()
for key, desc_list in descriptions.items():
    for desc in desc_list:
        lines.append(key + ' ' + desc)
new_descriptions = '\n'.join(lines)

In [None]:
## Collects in train the images we are going to use for training purposes.
doc = open(train_images_path,'r').read()
dataset = list()
for line in doc.split('\n'):
    if len(line) > 1:
      identifier = line.split('.')[0]
      dataset.append(identifier)

train = set(dataset)

In [None]:
#In this case we are going to actually get the images and stored them in train_img list
import glob

img = glob.glob(images_path + '/*.jpg')
train_images = set(open(train_images_path, 'r').read().strip().split('\n'))
train_img = []
for i in img:
    if i.split('/')[-1].split('\\')[-1] in train_images: ## .split('\\')[-1] this is added if you run this in a windows machine. Remove it if you are running in MacOS/Linux machine
        train_img.append(i)

test_images = set(open(test_images_path, 'r').read().strip().split('\n'))
test_img = []
for i in img: 
    if i.split('/')[-1].split('\\')[-1] in test_images:  ## .split('\\')[-1] this is added if you run this in a windows machine. Remove it if you are running in MacOS/Linux machine
        test_img.append(i)

In [None]:
train_descriptions = dict()
for line in new_descriptions.split('\n'):
    tokens = line.split()
    image_id, image_desc = tokens[0], tokens[1:]
    if image_id in train:
        if image_id not in train_descriptions:
            train_descriptions[image_id] = list()
        desc = 'startseq ' + ' '.join(image_desc) + ' endseq' # adding the start and end sequence tokens for latter transformation
        train_descriptions[image_id].append(desc)

In [None]:
all_train_captions = []
for key, val in train_descriptions.items():
    for cap in val:
        all_train_captions.append(cap)

In [None]:
## Filtering out less relevant words from our description vocabulary. 
word_count_threshold = 10
word_counts = {}
nsents = 0
for sent in all_train_captions:
    nsents += 1
    for w in sent.split(' '):
        word_counts[w] = word_counts.get(w, 0) + 1
        
vocab = [w for w in word_counts if word_counts[w] >= word_count_threshold]

print('Vocabulary szie for words that have 10 or more occurrences= %d' % (len(vocab)))

In [None]:
# we build up our mapping to map each word to an index.

ixtoword = {}
wordtoix = {}
ix = 1
for w in vocab:
    wordtoix[w] = ix
    ixtoword[ix] = w
    ix += 1

vocab_size = len(ixtoword) + 1

In [None]:
# we capture the longest description in our description data set.
all_desc = list()
for key in train_descriptions.keys():
    [all_desc.append(d) for d in train_descriptions[key]]
lines = all_desc
max_length = max(len(d.split()) for d in lines)

print('Description Length: %d' % max_length)

# Text Encoding

In [None]:
# 임베딩 가져오기
from sentence_transformers import SentenceTransformer, AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("nvidia/NV-Embed-v2")
embedding_model = SentenceTransformer('nvidia/NV-Embed-v2', trust_remote_code=True)
embedding_model.max_seq_length = 32768
embedding_model.tokenizer.padding_side="right"
def add_eos(input_examples):
  input_examples = [input_example + embedding_model.tokenizer.eos_token for input_example in input_examples]
  return input_examples



embeddings_index = {} 


# Image Encoding


In [None]:
vision_model = models.vit_b_32(pretrained=True)
vit = nn.Sequential(*list(vision_model.children())[:-1]) 

In [None]:
def preprocess_image(image_path):
    image = Image.open(image_path)
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    return transform(image).unsqueeze(0)




In [None]:
train_img = train_img[:100]
test_img = test_img[:100]

In [None]:
def encode_image(image_path):
    image = preprocess_image(image_path)
    with torch.no_grad():
        image = vit(image)
    return image

encoding_train_img = {}
for i in train_img:
    encoding_train_img[i] = encode_image(i)
training_features = encoding_train_img

encoding_test_img = {}
for i in test_img:
    encoding_test_img[i] = encode_image(i)

In [None]:
train_DL = torch.utils.data.DataLoader()

# DataLoader 만들기

# Image Captioning

In [None]:
class Image_Caption(nn.Module):
    def __init__(self, drop_p=0.5, image_dim=256, vocab_size=32000, embedding_dim=4096, hidden_dim=1024):
        super(Image_Caption, self).__init__()
        self.image_encoder = nn.Sequential(
            nn.Dropout(drop_p),
            nn.Linear(image_dim, 256),
            nn.ReLU(),
        )

        self.embedding = embedding_model
        self.text_dropout = nn.Dropout(drop_p)
        self.linear = nn.Linear(embedding_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, 256, batch_first=True)

        self.decoder = nn.Sequential(
            nn.Linear(256, vocab_size),
            nn.ReLu(),
            nn.Linear(vocab_size, vocab_size),
            nn.Softmax(dim=1)
        )
    def forward(self, image_features, text):
        image_features = self.image_encoder(image_features)
        with torch.no_grad():
            text_features = self.embedding.encode(add_eos(text), batch_size=1, prompt=False, normalize_embeddings=True)
        text_features = self.text_dropout(text_features)
        text_features = self.linear(text_features)
        lstm_out, _ = self.lstm(text_features)
        text_features = lstm_out[:, -1, :]
        fused_features = image_features + text_features

        output = self.decoder(fused_features)
        return output
    
model = Image_Caption()

vocab_size = len(ixtoword) + 1



In [None]:
train_descriptions_100 = {}
import os

for key, item in train_features.items():
  key = os.path.basename(key)
  key = key.split(".")
  train_descriptions_100[key[0]] = train_descriptions[key[0]]

In [None]:
epochs = 15
batch_size = 3
steps_per_epoch = 100
generator = data_generator(train_descriptions, train_img, wordtoix, max_length, batch_size)
model.fit(generator, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=1)