In [2]:
from datasets import load_dataset, concatenate_datasets
from torchvision import transforms
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import transformers
import torch


  from .autonotebook import tqdm as notebook_tqdm


In [4]:
from PIL import ImageOps
dataset = load_dataset("nlphuji/flickr30k")
dataset = dataset['test'].select(range(100))

def make_square(image):
    # Make image square
    size = max(image.size)
    new_image = ImageOps.pad(image, (size, size), color='white')
    new_image = new_image.resize((256, 256))
    # fig, ax = plt.subplots(1, 2, figsize=(10, 5))
    # ax[0].imshow(image, cmap='gray')
    # ax[1].imshow(new_image, cmap='gray')
    # plt.show()
    return new_image

def transform_images(examples):
    image = examples['image']
    processed_image = np.array(make_square(image))
    return {
        'image': examples['image'],  # Keep original image
        'image_processed': processed_image,  # Add processed image
        # 'caption': examples['caption']  # Keep other fields
    }

transformed_images = dataset.map(transform_images)


In [5]:
CLIP = transformers.CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = transformers.CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

tokenizer = processor.tokenizer

# Access vocabulary
vocab = tokenizer.get_vocab()  # Returns dict of {token: index}
vocab_size = tokenizer.vocab_size  # Get total vocabulary size

text_model = CLIP.text_model

class Dataset(torch.utils.data.Dataset):
    def __init__(self, dataset):
        self.dataset = dataset
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        dataset = self.dataset[idx]
        image = dataset['image_processed'] #height, width, channels
        captions = dataset['caption']
        
        patches_array = split_image_to_patches(image, image.shape[0], image.shape[1], 32, 3)
        patches_tensor = [torch.tensor(patch.flatten(), dtype=torch.float32) for patch in patches_array] 
        image_tensor = torch.stack(patches_tensor)


        
        random_caption_idx = torch.randint(0, len(captions), (1,)).item()
        selected_caption = captions[random_caption_idx]
        tokenized_caption = tokenizer(selected_caption, return_tensors="pt", padding=True)

        
        
        return {
            'image': image_tensor,
            'caption': tokenized_caption['input_ids'],
        }
    
reverse_vocab = {idx: token for token, idx in vocab.items()}  # {index: token}

datasetclass = Dataset(transformed_images)


In [14]:
from torch import nn
import copy
import math



class Transformer(nn.Module):
    def __init__(self, d_model, text_encoder, image_encoder, decoder, tgt_vocab_size):
        super(Transformer, self).__init__()

        self.text_encoder = text_encoder
        self.image_encoder = image_encoder
        self.decoder = decoder

        self.fc = nn.Linear(tgt_vocab_size, tgt_vocab_size)

    def forward(self, image, caption):
        print(caption.shape, "caption.shape")
        # text_encoder_output = self.text_encoder.forward(caption)
        # print(text_encoder_output)
        # print(text_encoder_output.shape, "text_encoder_output.shape")
        image_encoder_output = self.image_encoder.forward(image)
        # print(image_encoder_output.last_hidden_state, "image_encoder_output.shape")
        # print(text_encoder_output.shape, image_encoder_output.shape)
        dec_output = self.decoder.forward(caption, image_encoder_output.last_hidden_state)

        output = self.fc(dec_output)
        return output


def clones(module, N):
    "Produce N identical layers."
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class DecoderLayer(nn.Module):
    def __init__(self, input_dim, tgt_vocab_size, intermediate_attn_dim, n_loops, feed_forward, self_attn_layer, cross_attn_layer):
        super(DecoderLayer, self).__init__()
        self.self_attn_layer = self_attn_layer
        self.cross_attn_layer = cross_attn_layer
        self.FF_layer = feed_forward
        self.tgt_vocab_size = tgt_vocab_size
        self.input_dim = input_dim
        self.n_loops = n_loops

        self.projectbacktovocab = torch.nn.Linear(intermediate_attn_dim, tgt_vocab_size)

        self.norm1 = torch.nn.LayerNorm(input_dim)
        self.norm2 = torch.nn.LayerNorm(input_dim)
        self.norm3 = torch.nn.LayerNorm(input_dim)

    def forward(self, x, encoder_output, mask):
        embedding = x
        attn, prob = self.self_attn_layer.forward(embedding, embedding, embedding, mask)
        print(attn.shape, "attn.shape")
        print(embedding.shape, "embedding.shape")
        x = self.norm1(attn + embedding)
        attn, prob = self.cross_attn_layer.forward(query_input=x, key_input=encoder_output, value_input=encoder_output)
        print(attn.shape, "cross attn.shape")
        # attn = self.projectbacktovocab(attn)
        x = self.norm2(x + attn)

        ff_output = self.FF_layer(x)
        x = self.norm3(x + ff_output)
        # x = self.projectbacktovocab(x)
        return x

class Decoder(nn.Module):
     def __init__(self, tgt_vocab_size, pad_token, embedding_layer, layer, n_loops):
        super(Decoder, self).__init__()
        self.embedding_layer = embedding_layer #convert token IDs to embeddings
        self.pad_token = pad_token
        self.norm1 = torch.nn.LayerNorm(tgt_vocab_size)
        self.layers = clones(layer, n_loops)

        self.projectbacktovocab = torch.nn.Linear(512, tgt_vocab_size)


     def forward(self, x, encoder_output):
        # mask = self.generate_padding_mask(x)
        mask = True
        print(x.shape, "x.shape pre embedding")
        x = self.embedding_layer.forward(x)
        print(x.shape, "x.shape")
        for layer in self.layers:
            x = layer(x, encoder_output, mask)
        x = self.projectbacktovocab(x)
        x = self.norm1(x)
        return x
     
   

class Attention_Layer(nn.Module):
    def __init__(self, d_model, num_heads):
        super(Attention_Layer, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads

        self.W_q = torch.nn.Linear(d_model, d_model)
        self.W_k = torch.nn.Linear(d_model, d_model)
        self.W_v = torch.nn.Linear(d_model, d_model)
    
    def forward(self, query_input, key_input, value_input, mask=None):
        dim_k = self.d_model // self.num_heads
        query = self.W_q(query_input)
        key = self.W_k(key_input)
        value = self.W_v(value_input)
        

        
        query_key = torch.matmul(query, key.transpose(-1, -2)) / math.sqrt(dim_k)

        if mask is not None:
            nopeak_mask = (1 - torch.triu(torch.ones(query_key.size(-2), query_key.size(-1)), diagonal=1)).bool()
            query_key = query_key.masked_fill(~nopeak_mask, float('-inf'))
            # print(mask.shape, 'mask.shape', query_key.shape)
            # query_key = query_key.masked_fill(~mask, float('-inf'))

        prob = query_key.softmax(dim=-1)
        weighted_attention = torch.matmul(prob, value)
        return weighted_attention, prob
    
class Cross_Attention_Layer(nn.Module):
    def __init__(self, encoder_output_dim, decoder_dim, d_model, num_heads):
        super(Cross_Attention_Layer, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads

        self.W_q = torch.nn.Linear(decoder_dim, d_model)

        self.W_k = torch.nn.Linear(encoder_output_dim, d_model)
        self.W_v = torch.nn.Linear(encoder_output_dim, d_model)
    
    def forward(self, query_input, key_input, value_input, mask=None):
        dim_k = self.d_model // self.num_heads
        query = self.W_q(query_input)
        key = self.W_k(key_input)
        value = self.W_v(value_input)
        
        query_key = torch.matmul(query, key.transpose(-1, -2)) / math.sqrt(dim_k)
        prob = query_key.softmax(dim=-1)
        weighted_attention = torch.matmul(prob, value)
        return weighted_attention, prob


    

In [15]:
d_model = 512
text_dimension_embedding = 512
image_encoder_output_dim = 768

self_attn_layer = Attention_Layer(d_model=d_model, num_heads=1)
cross_attn_layer = Cross_Attention_Layer(encoder_output_dim=image_encoder_output_dim, decoder_dim=text_dimension_embedding, d_model=d_model, num_heads=1)

feed_forward = nn.Sequential(nn.Linear(d_model, 2048), nn.ReLU(), nn.Linear(2048, d_model))

text_model = CLIP.text_model
text_embedder = text_model.embeddings

decoder_layer = DecoderLayer(input_dim=text_dimension_embedding, tgt_vocab_size=vocab_size, intermediate_attn_dim=d_model, n_loops=6, feed_forward=feed_forward, self_attn_layer=self_attn_layer, cross_attn_layer=cross_attn_layer)



decoder = Decoder(vocab_size, pad_token="pad", embedding_layer=text_embedder, layer=decoder_layer, n_loops=6)


transformer = Transformer(d_model=d_model, text_encoder=text_embedder, image_encoder=CLIP.vision_model, decoder=decoder, tgt_vocab_size=vocab_size)


In [2]:
transformed_images[0]

NameError: name 'transformed_images' is not defined

In [6]:
image_processed = processor(images=np.array(transformed_images[0]['image_processed']), return_tensors="pt", padding=True)

# token_ids = token_ids['input_ids']
print(image_processed['pixel_values'].shape)
visionmodel = CLIP.vision_model
visionmodel.eval()
vision_output = visionmodel(image_processed['pixel_values'])
print(vision_output.last_hidden_state.shape, "vision_output.shape")


text = "hello i am dog"
tokenized_text = tokenizer(text, return_tensors="pt", padding="max_length", max_length=77, truncation=True)
textmodel = CLIP.text_model
# textmodel.eval()

text_encoder_output = textmodel(tokenized_text['input_ids'])
print(text_encoder_output.last_hidden_state.shape)
last_hidden_state_text = text_encoder_output.last_hidden_state

torch.Size([1, 3, 224, 224])
torch.Size([1, 50, 768]) vision_output.shape
torch.Size([1, 77, 512])


In [15]:
print(tokenized_text['input_ids'])
captions = tokenized_text['input_ids']
print((captions != tokenizer.pad_token_id).all())
print((captions != tokenizer.pad_token_id).any() != (captions != tokenizer.pad_token_id).all())
if (captions != tokenizer.pad_token_id).any() != (captions != tokenizer.pad_token_id).all():
    attention_mask = (captions != tokenizer.pad_token_id)

print(attention_mask.shape)

tensor([[49406,  3306,   328,   687,  1929, 49407, 49407, 49407, 49407, 49407,
         49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407,
         49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407,
         49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407,
         49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407,
         49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407,
         49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407,
         49407, 49407, 49407, 49407, 49407, 49407, 49407]])
tensor(False)
tensor(True)
torch.Size([1, 77])


In [19]:
def extend_padding_mask(padding_mask, batch_size, device):
    """Extend padding mask to include image token."""
    if padding_mask is None:
        return None
    
    # Create mask for image token (always valid)
    image_mask = torch.ones(padding_mask.shape[0], 1, dtype=torch.bool, device=device)
    
    # Concatenate with caption padding mask
    extended_mask = torch.cat([image_mask, padding_mask], dim=1)
    
    return extended_mask 
extended = extend_padding_mask(attention_mask, 5, torch.device("cpu"))
print(extended)

tensor([[ True,  True,  True,  True,  True,  True, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False]])


In [21]:
text = "hello i am dog"
tokenized_text = tokenizer(text, return_tensors="pt", padding=True)
print(tokenized_text['input_ids'])

tensor([[49406,  3306,   328,   687,  1929, 49407]])


In [27]:
tokenizer.pad_token_id

49407

In [19]:
tokenized_text = tokenizer(text, return_tensors="pt", padding=True)
textmodel = CLIP.text_model
# textmodel.eval()

text_encoder_output = textmodel(tokenized_text['input_ids'])
processed_full = processor(images=[np.array(transformed_images[0]['image_processed'])], text="hello i am dog", return_tensors="pt", padding=True)

print(CLIP(**processed_full).keys())

odict_keys(['logits_per_image', 'logits_per_text', 'text_embeds', 'image_embeds', 'text_model_output', 'vision_model_output'])


In [20]:
print(CLIP(**processed_full)['text_model_output'])

BaseModelOutputWithPooling(last_hidden_state=tensor([[[ 0.3393,  0.1165,  0.1020,  ...,  0.2468,  0.5906,  0.1013],
         [ 0.3177, -0.4308, -0.8810,  ...,  0.5321, -0.7913, -0.4851],
         [ 0.8905, -0.1765,  0.1217,  ...,  0.8202, -0.1785,  0.4732],
         [-0.3952,  1.1094,  0.1687,  ...,  0.8740, -0.4523, -0.5020],
         [-0.2759,  0.5866,  1.2806,  ..., -0.0064,  0.5442, -1.3609],
         [ 0.7603,  0.5137,  0.9196,  ..., -0.8102, -0.2604, -0.0036]]],
       grad_fn=<NativeLayerNormBackward0>), pooler_output=tensor([[ 7.6033e-01,  5.1368e-01,  9.1960e-01,  1.2506e+01, -8.6933e+00,
          1.0663e+00,  2.1425e+00,  1.4863e+00, -6.2332e-01,  5.3471e-01,
          1.5366e+00, -1.3907e+00,  2.8199e-01,  1.3469e-01,  6.0156e-01,
         -1.6914e+00, -1.7101e+00, -5.1569e-01, -1.7452e-01,  2.9796e-01,
          8.0737e-01, -3.0274e-01, -5.0148e-01,  2.9735e-01,  1.2268e-01,
         -5.9684e-01, -1.2002e+00, -5.7734e-01, -6.6179e-01,  5.5981e-01,
          4.0388e-01,  7.

In [17]:
def turn_to_one_hot(label_array, vocab_size):

    # Your existing one-hot encoding code
    label_tensor = torch.tensor(label_array)
    one_hot = torch.nn.functional.one_hot(label_tensor, num_classes=vocab_size)

    # # Create start token
    # start_token = torch.zeros(13)
    # start_token[10] = 1

    # Add start token to sequence
    
    return one_hot
print(turn_to_one_hot(tokenized_text['input_ids'], vocab_size).size())

torch.Size([1, 7, 49408])


  label_tensor = torch.tensor(label_array)


In [None]:
transformer.eval()
print(image_processed['pixel_values'].shape, last_hidden_state_text.shape)
# textonehot = turn_to_one_hot(tokenized_text['input_ids'], vocab_size)
# token_ids['pixel_values']
print(type(tokenized_text['input_ids']), "tokenized_text['input_ids'].shape")
transformer.forward(image_processed['pixel_values'], tokenized_text['input_ids'])

torch.Size([1, 3, 224, 224]) torch.Size([1, 7, 512])
<class 'torch.Tensor'> tokenized_text['input_ids'].shape
torch.Size([1, 7]) caption.shape
torch.Size([1, 7]) x.shape pre embedding
torch.Size([1, 7, 512]) x.shape
torch.Size([1, 7, 512]) attn.shape
torch.Size([1, 7, 512]) embedding.shape
torch.Size([1, 7, 512]) cross attn.shape
torch.Size([1, 7, 512]) attn.shape
torch.Size([1, 7, 512]) embedding.shape
torch.Size([1, 7, 512]) cross attn.shape
torch.Size([1, 7, 512]) attn.shape
torch.Size([1, 7, 512]) embedding.shape
torch.Size([1, 7, 512]) cross attn.shape
torch.Size([1, 7, 512]) attn.shape
torch.Size([1, 7, 512]) embedding.shape
torch.Size([1, 7, 512]) cross attn.shape
torch.Size([1, 7, 512]) attn.shape
torch.Size([1, 7, 512]) embedding.shape
torch.Size([1, 7, 512]) cross attn.shape
torch.Size([1, 7, 512]) attn.shape
torch.Size([1, 7, 512]) embedding.shape
torch.Size([1, 7, 512]) cross attn.shape


tensor([[[ 0.2604,  0.0381, -0.8481,  ..., -0.0376, -0.4862, -0.0947],
         [ 0.2296,  0.1469, -0.6619,  ...,  0.0222, -0.5660, -0.0420],
         [ 0.2029,  0.3369, -0.5353,  ...,  0.0363, -0.5809, -0.0202],
         ...,
         [ 0.1553,  0.4184, -0.5808,  ...,  0.1228, -0.7123,  0.0349],
         [ 0.0856,  0.4178, -0.6168,  ...,  0.1669, -0.6255,  0.0569],
         [ 0.1304,  0.5361, -0.3280,  ...,  0.1989, -0.5903,  0.0642]]],
       grad_fn=<ViewBackward0>)

: 

In [7]:



text = "hello i am dog"
tokenized_text = tokenizer(text, return_tensors="pt", padding="max_length", max_length=10, truncation=True)
print(tokenized_text['input_ids'])

tensor([[49406,  3306,   328,   687,  1929, 49407, 49407, 49407, 49407, 49407]])


In [12]:
def generate_padding_mask(caption, pad_token):
        """
        Generate combined padding and causal mask for decoder self-attention.
        Args:
            caption: Input caption tensor of shape (batch_size, seq_len, vocab_size)
        Returns:
            Attention mask of shape (batch_size, seq_len, seq_len) where:
            - pad tokens are masked with 0
            - future tokens are masked with 0 (causal masking)
            - valid tokens are marked with 1
        """
        # batch_size, seq_length, _ = caption.shape
        
        # Get padding mask by checking if the last index (pad token) is 1
        # print(caption.shape, "caption")
        padding_mask = (caption.squeeze(1) != pad_token).bool()  # [batch_size, seq_len]
        padding_mask = torch.cat([torch.ones(padding_mask.shape[0], 1, device=padding_mask.device, dtype=torch.bool), padding_mask], dim=1)
        # Each item in the batch gets its own mask because:
        # 1. padding_mask is [batch_size, seq_len]
        # 2. When we do the unsqueeze operations, we maintain the batch dimension:
        print(padding_mask.shape, "padding_mask.shape")
        padding_mask_self = padding_mask.unsqueeze(1) * padding_mask.unsqueeze(2)
        print(padding_mask_self.shape, "padding_mask_self.shape")
        # Create final mask by combining padding and causal masks
        final_mask = padding_mask_self
        # cross_attn_mask = padding ._ma .sk
        # print(padding_mask[0], "padding mask")
        # print(cross_attn_mask[1], "final_mask")
        # Create final mask by combining padding and causal masks
        
        return final_mask, padding_mask

print(generate_padding_mask(tokenized_text['input_ids'], tokenizer.pad_token_id))

torch.Size([1, 11]) padding_mask.shape
torch.Size([1, 11, 11]) padding_mask_self.shape
(tensor([[[ True,  True,  True,  True,  True,  True, False, False, False, False,
          False],
         [ True,  True,  True,  True,  True,  True, False, False, False, False,
          False],
         [ True,  True,  True,  True,  True,  True, False, False, False, False,
          False],
         [ True,  True,  True,  True,  True,  True, False, False, False, False,
          False],
         [ True,  True,  True,  True,  True,  True, False, False, False, False,
          False],
         [ True,  True,  True,  True,  True,  True, False, False, False, False,
          False],
         [False, False, False, False, False, False, False, False, False, False,
          False],
         [False, False, False, False, False, False, False, False, False, False,
          False],
         [False, False, False, False, False, False, False, False, False, False,
          False],
         [False, False, False,

In [13]:
print(tokenizer.pad_token_id)

49407


In [14]:
tokenizer.special_tokens_map

{'bos_token': '<|startoftext|>',
 'eos_token': '<|endoftext|>',
 'unk_token': '<|endoftext|>',
 'pad_token': '<|endoftext|>'}

In [15]:
tokenizer.eos_token_id

49407

In [25]:
tokenized_caption = tokenizer(text, return_tensors="pt", padding="max_length", max_length=25, truncation=True)
input_ids = tokenized_text['input_ids']

# Find where the EOS token (49407) is
eos_positions = (input_ids == tokenizer.eos_token_id).nonzero()
if len(eos_positions) > 0:
    first_eos_pos = eos_positions[0][1]
    
    # Replace all padding tokens after the first EOS with 49408 (<<<PAD>>>)
    # Keep one EOS token (49407) at the first EOS position
    input_ids[0, first_eos_pos+1:] = 49408

tokenized_text['input_ids'] = input_ids
print(tokenized_text['input_ids'])


tensor([[49406,  3306,   328,   687,  1929, 49407, 49408, 49408, 49408, 49408]])


In [21]:
vocab = tokenizer.get_vocab()  # Returns dict of {token: index
vocab_size = tokenizer.vocab_size
reverse_vocab = {idx: token for token, idx in vocab.items()}

vocab["<<<PAD>>>"]

49408

In [27]:
reverse_vocab[49408]

'<<<PAD>>>'

In [28]:
tensor([[[ -8.1673,  -5.8411, -10.3520,  ..., -10.7300,   0.4442,  -0.2268]]]

NameError: name 'tensor' is not defined

In [30]:
vocab["the"]

599

In [31]:
import torch
tensor1 = torch.tensor([[ True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False]])

In [50]:
# Create 1,8,45,45 weight tensor
weight_tensor = torch.ones((1, 8, 45, 45))

# Expand tensor1 to 45,45
expanded_tensor1 = tensor1.expand(45, 45)
# print(expanded_tensor1, "expanded_tensor1.shape")

# Set last 2 rows to False
false_index = (tensor1.squeeze(0) == False).nonzero()[0].squeeze(0).item()
print(false_index, "false_index")
expanded_mask = torch.clone(tensor1)
print(expanded_mask, "expanded_mask.shape")
expanded_mask[false_index:, :] = False
print(expanded_mask, "expanded_mask.shape")

10 false_index
tensor([[ True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False]]) expanded_mask.shape
tensor([[ True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False, False, False, False, False,
         False, False, False, False, False]]) expanded_mask.shape


In [45]:
(tensor1.squeeze(0) == False).nonzero()[0].squeeze(0).item()

10