<a href="https://colab.research.google.com/github/2SpaceMasterRace/2SpaceMasterRace/blob/main/Transformer_MappingNetwork.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Packages

In [None]:
!git version

git version 2.25.1


In [None]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu117
!pip install transformers
!pip install numpy
!pip install git+https://github.com/openai/CLIP.git

Looking in indexes: https://download.pytorch.org/whl/cu117, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-4zuy5uu0
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-4zuy5uu0
  Resolved https://github.com/openai/CLIP.git to commit a9b1bf5920416aaeaec965c25dd9e8f98c864f16
  Preparing metadata (setup.py) ... [?25l[?25hdone


Import Libraries

In [None]:
import clip
import os
from torch import nn
import numpy as np
import torch
import torch.nn.functional as nnf
import sys
from typing import Tuple, List, Union, Optional
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AdamW, get_linear_schedule_with_warmup
from tqdm import tqdm, trange
from google.colab import files
from enum import Enum

CLIP Model

In [None]:
T = torch.Tensor


D = torch.device
CPU = torch.device('cpu')


def get_device(device_id: int) -> D:
    if not torch.cuda.is_available():
        return CPU
    device_id = min(torch.cuda.device_count() - 1, device_id)
    return torch.device(f'cuda:{device_id}')


CUDA = get_device

current_directory = os.getcwd()
save_path = os.path.join(os.path.dirname(current_directory), "pretrained_models")
os.makedirs(save_path, exist_ok=True)
model_path = os.path.join(save_path, 'model_wieghts.pt')



class MLP(nn.Module):

    def forward(self, x: T) -> T: #takes input tensor x
        return self.model(x)

    def __init__(self, sizes: Tuple[int, ...], bias=True, act=nn.Tanh): #defining MLP with size of nn, bias and activation
        super(MLP, self).__init__()
        layers = []
        for i in range(len(sizes) -1):
            layers.append(nn.Linear(sizes[i], sizes[i + 1], bias=bias)) #if bias is true new layer is added
            if i < len(sizes) - 2:
                layers.append(act())
        self.model = nn.Sequential(*layers) #displays output sequentially


class ClipCaptionModel(nn.Module):

    #@functools.lru_cache #FIXME
    def get_dummy_token(self, batch_size: int, device: D) -> T: #creates a tensor of details from batch size and device
        return torch.zeros(batch_size, self.prefix_length, dtype=torch.int64, device=device)

    def forward(self, tokens: T, prefix: T, mask: Optional[T] = None, labels: Optional[T] = None):
        embedding_text = self.gpt.transformer.wte(tokens) # creates a tensor of word embeddings, key words in a numerical form
        prefix_projections = self.clip_project(prefix).view(-1, self.prefix_length, self.gpt_embedding_size) #new tensor of prefix
        embedding_cat = torch.cat((prefix_projections, embedding_text), dim=1) #concatenates both the sensors and crates a new tensor 
        if labels is not None: #in case of pre- existing labels
            dummy_token = self.get_dummy_token(tokens.shape[0], tokens.device)
            labels = torch.cat((dummy_token, tokens), dim=1)
        out = self.gpt(inputs_embeds=embedding_cat, labels=labels, attention_mask=mask) #output tensor formed 
        return out

    def __init__(self, prefix_length: int, prefix_size: int = 512):
        super(ClipCaptionModel, self).__init__()
        self.prefix_length = prefix_length
        self.gpt = GPT2LMHeadModel.from_pretrained('gpt2')
        self.gpt_embedding_size = self.gpt.transformer.wte.weight.shape[1]
        if prefix_length > 10:  #if greater than 10 a linear projection layer is used otherwise a MLP with two hidden layers is used 
            self.clip_project = nn.Linear(prefix_size, self.gpt_embedding_size * prefix_length)
        else:
            self.clip_project = MLP((prefix_size, (self.gpt_embedding_size * prefix_length) // 2, self.gpt_embedding_size * prefix_length))


class ClipCaptionPrefix(ClipCaptionModel):

    def parameters(self, recurse: bool = True): #parameters of projection layer
        return self.clip_project.parameters()

    def train(self, mode: bool = True): #sets to evaluation mode so data is not updated during training
        super(ClipCaptionPrefix, self).train(mode)
        self.gpt.eval()
        return self

Mapping Network

In [None]:
class MlpTransformer(nn.Module):
     def __init__(self, in_dim, h_dim, out_d: Optional[int] = None, act=nnf.relu, dropout=0.): #Input dimension, Hidden dimension, output dimension, RELU activation function
         super().__init__()
         out_d = out_d if out_d is not None else in_dim #Output dimension of nn
         self.fc1 = nn.Linear(in_dim, h_dim) #First Fully Connected Layer 
         self.act = act #Activation function at first FC
         self.fc2 = nn.Linear(h_dim, out_d) #Second Fully Connected Layer 
         self.dropout = nn.Dropout(dropout) #Dropout layer at 2nd FC i.e,drops random inputs or sets it to zero to prevent overfitting 

     def forward(self, x): # x is the input
         x = self.fc1(x) #applying linear transformation using weights and bias on x and storing back on x
         x = self.act(x)
         x = self.dropout(x) #sets random inputs of x to 0
         x = self.fc2(x) #linear transformation on output of dropout layer
         x = self.dropout(x)
         return x

class MLP(nn.Module): #building a MLP to compare with the transformer

    def forward(self, x: torch.Tensor) -> torch.Tensor: #input x of type tensor, returns a tensor output
        return self.model(x)

    def __init__(self, sizes: Tuple[int, ...], bias=True, act=nn.Tanh): #size of input and output layer, bias and activation function
        super(MLP, self).__init__()
        layers = []
        for i in range(len(sizes) - 1): #loop from 0 to size -1
            layers.append(nn.Linear(sizes[i], sizes[i + 1], bias=bias))
            if i < len(sizes) - 2:
                layers.append(act())
        self.model = nn.Sequential(*layers) #sequentially shows the output


'''
Forward mechanism in multi-headed attention layer:
1.The input is transformed to queries and keys using a linear layer
2.separate the keys and values tensors along the third dimension (adding 2 in index 2 of the parameters)
3.compute attention score
4.check if masking is applied
5.apply softmax
6.compute weighted sum and reshape + apply linear transformation
'''


class MultiHeadAttention(nn.Module):

''' Arguments:
dim_self  : an integer representing the input dimensionality of the queries, keys, and values.
din_ref   : an integer representing the input dimensionality of the reference tensor.
num_heads : an integer representing the number of attention heads to use.
bias      : a boolean indicating whether or not to include bias terms in the linear layers.
dropout: a float representing the dropout probability.   '''
                                                                          
    def __init__(self, dim_self, dim_ref, num_heads, bias=True, dropout=0.): #dimensions of nn as well ass head i.e., headers are defined
        self.num_heads = num_heads
        head_dim = dim_self // num_heads
        self.scale = head_dim ** -0.5 #calculates the scaling factor for the attention scores by taking the inverse score root of head_dim
        self.to_queries = nn.Linear(dim_self, dim_self, bias=bias) #mapping to query vectors
        self.to_keys_values = nn.Linear(dim_ref, dim_self * 2, bias=bias) #maps the input to the key and value vectors. Input size dim_ref, output size dim_self*2
        self.project = nn.Linear(dim_self, dim_self) #maps concatenated output of query, key and value vectors to final outputs
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, y=None, mask=None):# x is input tensor and y is another tensor for computing keys and values and mask tensor
        y = y if y is not None else x #y=x if y is none
        b, n, c = x.shape #b=batch size, n=sequence length and c=input dimensionality
        _, m, d = y.shape #m=reference sequence length,d=reference input dimensionality
        # b n h dh
        queries = self.to_queries(x).reshape(b, n, self.num_heads, c // self.num_heads) #output is reshaped to have the above dimensions
        # b m 2 h dh
        keys_values = self.to_keys_values(y).reshape(b, m, 2, self.num_heads, c // self.num_heads)
        keys, values = keys_values[:, :, 0], keys_values[:, :, 1] #extracts the key and value vectors from the reshaped keys_values tensor
        attention = torch.einsum('bnhd,bmhd->bnmh', queries, keys) * self.scale ''' torch.einsun computes the attention scores by taking the dot product between the queries and keys
                                                                                  then scale the result by the square root of the head dimension (`self.scale`) - Batch Multiplication
                                                                                '''                                                                                                   
        if mask is not None:
            if mask.dim() == 2: #checks if dim of mask is 2
                mask = mask.unsqueeze(1)  ''' adds a singleton dimension at index 1 to match the shape of the attention tensor '''
            attention = attention.masked_fill(mask.unsqueeze(3), float("-inf")) #applying mask to attention tensor
        attention = attention.softmax(dim=2)
        out = torch.einsum('bnmh,bmhd->bnhd', attention, values).reshape(b, n, c) '''computes weighted sum of the values using computed attention weights and reshapes the output to specified dim'''
        out = self.project(out)
        return out, attention


class TransformerLayer(nn.Module):

    def forward_with_attention(self, x, y=None, mask=None): 
        x_, attention = self.attn(self.norm1(x), y, mask) #output is the tensor x and attention tensor
        x = x + x_ #input x plus transformed x tensor
        x = x + self.mlp(self.norm2(x)) #applies mlptransformer to x and adds the result to x 
        return x, attention #returns 2 tensors

    def forward(self, x, y=None, mask=None):
        x = x + self.attn(self.norm1(x), y, mask)[0] #applies multihead attention layer to input x and y uding mask. The output is transformed x added to x
        x = x + self.mlp(self.norm2(x)) 
        return x #returns a single tensor

    def __init__(self, dim_self, dim_ref, num_heads, mlp_ratio=4., bias=False, dropout=0., act=nnf.relu, #mlp ratio=ratio of hidden dimension
                 norm_layer: nn.Module = nn.LayerNorm): # norm_layer=normalisation layer
        super().__init__()
        self.norm1 = norm_layer(dim_self)
        self.attn = MultiHeadAttention(dim_self, dim_ref, num_heads, bias=bias, dropout=dropout)
        self.norm2 = norm_layer(dim_self)
        self.mlp = MlpTransformer(dim_self, int(dim_self * mlp_ratio), act=act, dropout=dropout)


class Transformer(nn.Module):

    def forward_with_attention(self, x, y=None, mask=None):
        attentions = []
        for layer in self.layers:
            x, att = layer.forward_with_attention(x, y, mask)
            attentions.append(att)
        return x, attentions

    def forward(self, x, y=None, mask=None):
        for i, layer in enumerate(self.layers):
            if i % 2 == 0 and self.enc_dec: # cross
                x = layer(x, y)
            elif self.enc_dec:  # self
                x = layer(x, x, mask)
            else:  # self or cross
                x = layer(x, y, mask)
        return x

    def __init__(self, dim_self: int, num_heads: int, num_layers: int, dim_ref: Optional[int] = None,
                 mlp_ratio: float = 2., act=nnf.relu, norm_layer: nn.Module = nn.LayerNorm, enc_dec: bool = False):
        super(Transformer, self).__init__()
        dim_ref = dim_ref if dim_ref is not None else dim_self  #sets dimension of the reference embedding to the input embedding dimension if dim_ref is not provided
        #in some cases reference embedding = input embedding
        self.enc_dec = enc_dec  #checks whether transformer is an encoder decoder architecture or not
        if enc_dec:
            num_layers = num_layers * 2   #if it is, number of layers is doubled to account for the encoder and decoder
        layers = [] #create layer of the transformer
        for i in range(num_layers):
            if i % 2 == 0 and enc_dec:  # cross
                layers.append(TransformerLayer(dim_self, dim_ref, num_heads, mlp_ratio, act=act, norm_layer=norm_layer))
            elif enc_dec:  # self
                layers.append(TransformerLayer(dim_self, dim_self, num_heads, mlp_ratio, act=act, norm_layer=norm_layer))
            else:  # self or cross
                layers.append(TransformerLayer(dim_self, dim_ref, num_heads, mlp_ratio, act=act, norm_layer=norm_layer))
       #if enc_dec = true and the iteration number is even, a cross attention layer is created that takes the input and ref embedding as inputs
       #if enc_dec = true and the iteration number is odd, a self attention layer is created that takes the output of previous layer as input
       #if enc_dec = false a self attention layer is created that takes the input embedding as input
        self.layers = nn.ModuleList(layers) #allows transformer layers to be treated as a single module.



class TransformerMapper(nn.Module):

       def forward(self, x): #input x is tensor
        x = self.linear(x).view(x.shape[0], self.clip_length, -1) #x is passed through linear layer that maps the input tensor dim_clip to clip_length * dimembedding
        #the resulting tensor is then reshaped. This converts the flat representation of the input into a 3D tensor where-
        #first dimension represents batch size
        #second dimension represents the length of the input sequence
        #third dimension represents the size of the embedding space
        prefix = self.prefix_const.unsqueeze(0).expand(x.shape[0], *self.prefix_const.shape)  #prefix tensor is created by duplicating along the batch dimension and then expanding the tensor along the batch dimension to match the batch size of the input tensor
        prefix = torch.cat((x, prefix), dim=1)  #The prefix tensor is then concatenated with the input tensor along the second dimension
        #this adds the prefix to the beginning of each input seq
        out = self.transformer(prefix)[:, self.clip_length:]  #concatenated tensor is passed through the transformer model
        return out

    def __init__(self, dim_clip: int, dim_embedding: int, prefix_length: int, clip_length: int, num_layers: int = 8):
        super(TransformerMapper, self).__init__()   #this line class the constructor of the supercalss of transformermapper
        self.clip_length = clip_length    
        self.transformer = Transformer(dim_embedding, 8, num_layers) #8 - number of attention heads in the transformer model
        self.linear = nn.Linear(dim_clip, clip_length * dim_embedding)  #creats an instance of the nn.linear class which is fully connected layer that maps inputs of size dim_clip to outputs of size clip_length * dim_embedding
        #this layer will be used to map the input clip to the embeddings that will be fed into the transformer model.
        self.prefix_const = nn.Parameter(torch.randn(prefix_length, dim_embedding), requires_grad=True) #creates a learnable paramter called prefix_const. the values of this tensor are initialized randomly using a normal distribution with mean 0 and variance 1.
        #requires_grad = true indicating that the values will be updated during training.
        #It will be added to the output of the transformer model to generate the final output clip.
