In [1]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import albumentations as alb
from albumentations.pytorch import ToTensorV2
import timm
import cv2
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from collections import Counter


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import torchvision
print(torchvision.__version__)

0.21.0+cu118


In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [None]:
import transformers 
from transformers import AutoTokenizer

class ImageCaptioner(nn.Module):
    def __init__(self, context_length, vocab_size, num_blocks, model_dim, num_heads, dropout_prob):
        super().__init__()
        self.cnn_encoder = timm.create_model('efficientnet_b0', pretrained=True)
        test_image = torch.zeros(1,3,224,224)

        with torch.no_grad():
            cnn_output = self.cnn_encoder(test_image)
        in_features = cnn_output.shape[1]    
        self.project = nn.Linear(in_features, model_dim)

        self.word_embeddings = nn.Embedding(vocab_size, model_dim)
        self.pos_embeddings = nn.Embedding(context_length, model_dim)

        block = nn.TransformerDecoderLayer(model_dim, num_heads, 2*model_dim, dropout=dropout_prob, batch_first=True, norm_first =True)
        self.blocks = nn.TransformerDecoder(block, num_blocks)

        self.vocab_projection = nn.Linear(model_dim, vocab_size)

        
    def forward(self, images, true_labels):
        tok_embedded = self.word_embeddings(true_labels)
        B,T = true_labels.shape
        positions = torch.arange(T).to(device)
        pos_embedded = self.pos_embeddings(positions)
        total_emebddings = tok_embedded + pos_embedded #input to blocks
        
        with torch.no_grad():
            encoded_image = self.project(self.cnn_encoder(images).view(B,-1))
        
        img_for_attention = torch.unsqueeze(encoded_image, 1)

        #Causal/Subsequent Mask
        attention_mask = nn.Transformer.generate_square_subsequent_mask(T).to(device)
        block_output = self.blocks(total_emebddings, img_for_attention, tgt_mask=attention_mask)

        vocabulary_vector = self.vocab_projection(block_output) #B,T,V

        return vocabulary_vector

In [13]:
caption_filename = 'Flickr8k/captions.txt'
missing = '2258277193_586949ec62'

with open(caption_filename) as captions:
    lines = captions.readlines()

get_captions = {}
all_captions = []

for i in range(1,len(lines)):
    data = lines[i].rstrip('\n').split('.jpg,')
    img_name = data[0] + '.jpg'
    if img_name == missing:
        continue

    caption_list = get_captions.get(img_name, [])
    caption_list.append(data[1])
    get_captions[img_name] = caption_list
    all_captions.append(data[1])

In [14]:
print(len(all_captions))

40455


In [15]:
import transformers 
from transformers import AutoTokenizer

In [None]:
df = pd.DataFrame(columns=['filename', 'caption'])
df['filename'] = get_captions.keys()
df['caption'] = df['filename'].map(lambda filename: get_captions[filename])

# Use a pre-trained tokenizer (you can choose different models)
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# Add special tokens if they don't already exist in the tokenizer
special_tokens = ['<UNKNOWN>', '<PAD>', '<START>', '<END>']
special_tokens_dict = {'additional_special_tokens': special_tokens}
tokenizer.add_special_tokens(special_tokens_dict)

4

In [None]:
context_length = 20

class ImageCaptionDataset(Dataset):
    def __init__(self, split):
        self.df = df
        self.img_size = 224
        self.tokenizer = tokenizer  # Use the previously defined transformers tokenizer
        
        transformation_list = [alb.Resize(self.img_size, self.img_size)]
        if split == 'training':
            transformation_list.append(alb.HorizontalFlip())
            transformation_list.append(alb.ColorJitter())
        transformation_list.append(alb.Normalize())
        transformation_list.append(ToTensorV2())

        self.transformations = alb.Compose(transformation_list)
        
        # Get the token IDs for special tokens
        self.start_token_id = tokenizer.convert_tokens_to_ids('<START>')
        self.end_token_id = tokenizer.convert_tokens_to_ids('<END>')
        self.pad_token_id = tokenizer.convert_tokens_to_ids('<PAD>')

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        image_filename, captions = self.df.iloc[idx]
        image = cv2.cvtColor(cv2.imread('Flickr8k/Images/' + image_filename), cv2.COLOR_BGR2RGB)
        processed_image = self.transformations(image=image)['image']
        encoded_captions = []
        
        for i, cap in enumerate(captions):
            token_ids = self.tokenizer.encode(cap, add_special_tokens=False)
            
            token_ids = [self.start_token_id] + token_ids + [self.end_token_id]
            if len(token_ids) <= context_length:
                pads_to_add = context_length - len(token_ids)
                token_ids += [self.pad_token_id] * pads_to_add
            else:
                token_ids = token_ids[:context_length - 1] + [self.end_token_id]
            
            encoded_captions.append(torch.tensor(token_ids, dtype=torch.long))

        random_idx = torch.randint(len(encoded_captions), (1,)).item()
        return processed_image, encoded_captions[random_idx]


In [18]:
training_dataset = ImageCaptionDataset('training')
training_data = DataLoader(training_dataset, batch_size=32, shuffle=True)

In [None]:

vocab_size = len(tokenizer)
num_blocks = 6
model_dim = 512
num_heads = 16  # head_size = model_dim // num_heads
dropout = 0.5

model = ImageCaptioner(context_length, vocab_size, num_blocks, model_dim, num_heads, dropout).to(device)

for layer in model.cnn_encoder.parameters():
    layer.requires_grad = False

pad_token_idx = tokenizer.convert_tokens_to_ids('<PAD>')

loss_fn = nn.CrossEntropyLoss(ignore_index=pad_token_idx)
optim = torch.optim.AdamW(model.parameters(), lr=1e-5)

epochs = 1
num_iterations = 0

def integer_to_word(idx):
    return tokenizer.convert_ids_to_tokens(int(idx.item()))

for epoch in range(epochs):
    for images, captions in training_data:
        images, captions = images.to(device), captions.to(device)
        model_prediction = model(images, captions)
        # _,idx = torch.max(model_prediction, dim= -1)
        # first_caption = idx[0]
        # sentence = []
        # for id in first_caption:
        #     sentence.append(integer_to_word(id))
        #     if id == tokenizer.convert_tokens_to_ids('<END>'):
        #         break
        
        # print(' '.join(sentence))

        B, T= captions.shape
        model_prediction = model_prediction.view(B*T, vocab_size)
        
        loss = loss_fn(model_prediction, captions.view(B*T))
        optim.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 2.0)
        optim.step()
        if num_iterations % 100 == 0:
            print(f"Epoch {epoch+1}, Iteration {num_iterations}, Loss: {loss.item():.4f}")
        
        num_iterations += 1


Epoch 1, Iteration 0, Loss: 12.1261


KeyboardInterrupt: 