In [6]:
from transformers import CLIPModel, CLIPProcessor
import os
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import requests
import torch
import torch.nn.functional as F
import torch.nn as nn

In [3]:
clip_m = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")

# both go to lin proj to match final emb vec
clip_m_v = clip_m.vision_model
clip_m_v_proj = clip_m.visual_projection
clip_m_t = clip_m.text_model
clip_m_t_proj = clip_m.text_projection

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

In [51]:
# load image
url = "http://images.cocodataset.org/val2017/000000039769.jpg"
img = Image.open(requests.get(url, stream=True).raw)

# convert image to emb
inputs_v = processor(images=img, return_tensors="pt").pixel_values
outs_v1 = clip_m_v(inputs_v).pooler_output
outs_v_fin = clip_m_v_proj(outs_v1)

# convert image to emb
inputs_t = processor(text=["a photo of a cat", "a photo of a dog"], return_tensors="pt", padding=True)
outs_t1 = clip_m_t(**inputs_t).pooler_output
outs_t_fin = clip_m_t_proj(outs_t1)

# # cosine similarity of image-text
# print((outs_v_fin @ outs_t_fin.T))

# # Get normalized embeddings (already normalized by CLIP)
# image_embeds = outputs.image_embeds  # (batch_size, embedding_dim)
# text_embeds = outputs.text_embeds    # (batch_size, embedding_dim)

# Normalize embeddings
image_embeds_norm = F.normalize(outs_v_fin, p=2, dim=-1)  # Normalize each vector to length 1
text_embeds_norm = F.normalize(outs_t_fin, p=2, dim=-1)

# # Cosine similarity matrix:
# cosine_sim = torch.matmul(image_embeds_norm, text_embeds_norm.T)
cosine_sim = image_embeds_norm @ text_embeds_norm.T

print(cosine_sim)

tensor([[0.2457, 0.1930]], grad_fn=<MmBackward0>)


In [55]:
inputs = processor(text=["a photo of a cat", "a photo of a dog"], images=img, return_tensors="pt", padding=True)
outputs = clip_m(**inputs)
outputs.keys()
# ['logits_per_image', 'logits_per_text', 'text_embeds', 'image_embeds', 'text_model_output', 'vision_model_output']
logits_per_image = outputs.logits_per_image # this is the image-text similarity score or just logits, not normalized
print(f"the image-text similarity score {logits_per_image}")
print(f"cosine similarity score {outputs.image_embeds@ outputs.text_embeds.T}")
probs = logits_per_image.softmax(dim=1) # we can take the softmax to get the label probabilities
print(probs)

the image-text similarity score tensor([[24.5701, 19.3049]], grad_fn=<TBackward0>)
cosine similarity score tensor([[0.2457, 0.1930]], grad_fn=<MmBackward0>)
tensor([[0.9949, 0.0051]], grad_fn=<SoftmaxBackward0>)


In [61]:
processor.tokenizer

CLIPTokenizerFast(name_or_path='openai/clip-vit-base-patch32', vocab_size=49408, model_max_length=77, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'bos_token': '<|startoftext|>', 'eos_token': '<|endoftext|>', 'unk_token': '<|endoftext|>', 'pad_token': '<|endoftext|>'}, clean_up_tokenization_spaces=False),  added_tokens_decoder={
	49406: AddedToken("<|startoftext|>", rstrip=False, lstrip=False, single_word=False, normalized=True, special=True),
	49407: AddedToken("<|endoftext|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
}

In [65]:
BOS_token_id = processor.tokenizer.convert_tokens_to_ids("<|startoftext|>")

In [63]:
outs_v_fin.shape

torch.Size([1, 512])

In [None]:
import datasets
import transformers
import pandas as pd
import numpy as np
import torch
from torch.utils.data.dataset import Dataset
from pathlib import Path

#Tokenizer from scratch on vocabulary of corpus
from tokenizers import ByteLevelBPETokenizer

# Decoder
from transformers import RobertaConfig
from transformers import RobertaForMaskedLM # RobertaLM for learning
from transformers import RobertaTokenizerFast # After training tokenizer we will wrap it so it can be used by Roberta model

#Encoder-Decoder Model
from transformers import VisionEncoderDecoderModel

#Training
# When using previous version of the library you need the following two lines
from transformers import Seq2SeqTrainer
from transformers import Seq2SeqTrainingArguments
# Latest version imports
from transformers import Trainer, TrainingArguments

import torch

# If there's a GPU available...
if torch.cuda.is_available():    

    # Tell PyTorch to use the GPU.    
    device = torch.device("cuda")

    print('There are %d GPU(s) available.' % torch.cuda.device_count())

    print('We will use the GPU:', torch.cuda.get_device_name(0))

# If not...
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

In [None]:
import random 
def train_test_split(dictionary):
    images = dictionary.keys()
    images_test = random.sample(images,int(0.3*len(images)))
    images_train = [img for img in images if img not in images_test]

    train_dict = {
      img: dictionary[img] for img in images_train
    }

    test_dict = {
      img: dictionary[img] for img in images_test
    }
    return(train_dict,test_dict)

train,test = train_test_split(images_caption_dict)

In [None]:
import pandas as pd

def get_df(dictionary):
    df = pd.DataFrame([])

    captions = []
    images = []
    for image in list(images_caption_dict.keys()):
        caption= images_caption_dict[image]
        if use_all == True:
            captions.append(tokenizer.sep_token.join([' '.join(capt.replace('<s> ','').replace('  <e>','').strip().split(' ') for capt in caption])
        else:
            for capt in caption:
                captions.append(' '.join(capt.replace('<s> ','').replace('  <e>','').strip().split(' ')[:30]))
                images.append(image)

    df['images'] = images
    df['captions'] = captions
    return(df)

train_df = get_df(train)
test_df = get_df(test)

In [None]:
from transformers import ViTFeatureExtractor
from transformers import RobertaTokenizerFast
feature_extractor = ViTFeatureExtractor.from_pretrained("google/vit-base-patch16-224-in21k")
tokenizer = RobertaTokenizerFast.from_pretrained('Byte_tokenizer')

In [None]:
projected_image_embedding = projection_layer(image_embedding)  # shape: (batch_size, prefix_length * lm_embed_dim)
projected_image_embedding = projected_image_embedding.view(batch_size, prefix_length, lm_embed_dim)


In [None]:
class ImagePrefixCaptioner(nn.Module):
    def __init__(self, image_encoder, lm_model, prefix_length, embed_dim):
        super().__init__()
        self.image_encoder = image_encoder
        self.lm_model = lm_model
        self.prefix_length = prefix_length
        self.embed_dim = embed_dim
        
        self.projection = nn.Linear(image_encoder.output_dim, prefix_length * embed_dim)
    
    def forward(self, images, input_ids, attention_mask):
        batch_size = images.size(0)
        image_embeds = self.image_encoder(images)  # (batch_size, image_embed_dim)
        prefix_embeds = self.projection(image_embeds).view(batch_size, self.prefix_length, self.embed_dim)
        
        token_embeds = self.lm_model.transformer.wte(input_ids)
        
        inputs_embeds = torch.cat([prefix_embeds, token_embeds], dim=1)
        
        prefix_attention_mask = torch.ones(batch_size, self.prefix_length).to(input_ids.device)
        extended_attention_mask = torch.cat([prefix_attention_mask, attention_mask], dim=1)
        
        outputs = self.lm_model(inputs_embeds=inputs_embeds, attention_mask=extended_attention_mask, labels=input_ids)
        return outputs


In [70]:
outs_v_fin.shape

torch.Size([1, 512])

In [84]:
inputs_t

{'input_ids': tensor([[49406,   320,  1125,   539,   320,  2368, 49407]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1]])}

In [16]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer

dec_tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
dec_model = GPT2LMHeadModel.from_pretrained("gpt2")

print(
    dec_tokenizer.pad_token, dec_tokenizer.eos_token, dec_tokenizer.bos_token, 
    dec_tokenizer.pad_token_id, dec_tokenizer.eos_token_id, dec_tokenizer.bos_token_id,
    )

dec_tokenizer.pad_token = dec_tokenizer.eos_token
dec_tokenizer.pad_token_id = dec_tokenizer.eos_token_id

# Confirm embedding size
print(dec_model.config.n_embd)  # 768 for GPT2-base


None <|endoftext|> <|endoftext|> None 50256 50256
768


In [None]:
# Get vision embedding from image encoder

max_length = 512
model_d = 512
batch_size = 1
prefix_length = 16
lm_embed_v_dim = int(model_d//prefix_length)
target_m_dim = 768

text = ["a photo of a cat"]

lin_m_v = nn.Linear(model_d, model_d)
lin_m_t = nn.Linear(model_d, lm_embed_v_dim)
lin_m_vt4dec = nn.Linear(lm_embed_v_dim, target_m_dim)

# load image
url = "http://images.cocodataset.org/val2017/000000039769.jpg"
img = Image.open(requests.get(url, stream=True).raw)

# convert image to emb
inputs_v = processor(images=img, return_tensors="pt").pixel_values
outs_v1 = clip_m_v(inputs_v).pooler_output
outs_v_fin = clip_m_v_proj(outs_v1)

# projection layer
projected_image_embed = lin_m_v(outs_v_fin)  # shape: (batch_size, prefix_length * lm_embed_dim)
projected_image_embed = projected_image_embed.view(batch_size, prefix_length, lm_embed_v_dim)
print(projected_image_embed.shape) # (batch_size, prefix_length, lm_embed_dim)

# convert text to emb
inputs_t = processor(text=text, return_tensors="pt", padding=True)
outs_t1 = clip_m_t(**inputs_t).last_hidden_state
outs_t_fin = clip_m_t_proj(outs_t1) # shape: (batch_size, seq_len, lm_embed_dim)
projected_text_embed = lin_m_t(outs_t_fin)  # shape: (batch_size, prefix_length * lm_embed_dim)
print(projected_text_embed.shape)
# outs_t_fin.shape # torch.Size([1, 7, 512])

# Concatenate image prefix embeddings with token embeddings
inputs_embeds = torch.cat([projected_image_embed, projected_text_embed], dim=1)
# inputs_embeds.shape # [batch_size, img_seq+text_seq, model_d_sub]
inputs_embeds4dec_m = lin_m_vt4dec(inputs_embeds)
print(f"final emb size from encoder {inputs_embeds4dec_m.shape}") # torch.Size([1, 23, 768]) or [batch, img+txt, dec_dim]

# Modify attention mask to accommodate prefix tokens
prefix_attention_mask = torch.ones(batch_size, prefix_length) #.to(device)
attention_mask = torch.cat([prefix_attention_mask, inputs_t.attention_mask], dim=1)

# set label
# Prepare labels
text_with_eos = [dec_tokenizer.eos_token + t + dec_tokenizer.eos_token for t in text]  # '<|endoftext|>'
labels = dec_tokenizer(text_with_eos, return_tensors="pt", padding=True).input_ids
# labels = dec_tokenizer(text, return_tensors="pt", padding=True).input_ids

# Pad labels to match full input length (prefix + text)
# Set prefix part to -100 so loss is ignored there
padding_labels = torch.full((batch_size, prefix_length), -100)  # ignore loss here
labels = torch.cat([padding_labels, labels], dim=1)  # (batch_size, prefix_len + text_len)

out_dec = dec_model(
    inputs_embeds=inputs_embeds4dec_m, 
    attention_mask=attention_mask, 
    labels=labels,  # this triggers loss computation
    )
# out_dec.keys() # ['loss', 'logits', 'past_key_values']
logits = out_dec.logits
logits.shape # torch.Size([1, 23, 50257])
# loss = outputs.loss
# loss.backward()
# # optimizer step, etc.

# Shifted for causal LM (already done internally if you pass labels to the model directly)
loss = F.cross_entropy(
    logits.view(-1, logits.size(-1)),  # [batch_size * seq_len, vocab_size]
    labels.view(-1),                   # [batch_size * seq_len]
    ignore_index=-100                 # to ignore prefix tokens or padding
)


torch.Size([1, 16, 32])
torch.Size([1, 7, 32])
final emb size from encoder torch.Size([1, 23, 768])


RuntimeError: Expected target size [1, 50257], got [1, 22]

In [34]:
logits.view(-1, logits.size(-1)).shape

torch.Size([23, 50257])

In [38]:
logits.shape

torch.Size([1, 23, 50257])

In [43]:
loss = out_dec.loss
loss

tensor(7.5349, grad_fn=<NllLossBackward0>)

In [42]:
# Flatten
logits_flat = logits.view(-1, logits.size(-1))       # [batch * seq_len, vocab_size]
labels_flat = labels.view(-1)                        # [batch * seq_len]

loss = F.cross_entropy(logits_flat, labels_flat, ignore_index=-100)
loss

tensor(7.8355, grad_fn=<NllLossBackward0>)

In [44]:
# Step 1: Shift logits to exclude the last prediction
logits = out_dec.logits[:, :-1, :]   # Predict tokens at t+1

# Step 2: Shift labels to exclude the first token
labels = labels[:, 1:]               # Targets for prediction

# Step 3: Flatten for cross-entropy
logits = logits.reshape(-1, logits.size(-1))  # (batch * seq_len-1, vocab_size)
labels = labels.reshape(-1)                   # (batch * seq_len-1)

# Step 4: Apply loss (ignore -100 label positions, e.g., for image prefix or padding)
loss = F.cross_entropy(logits, labels, ignore_index=-100)
loss

tensor(7.5349, grad_fn=<NllLossBackward0>)