## transformer

In [None]:
import torch
import torch.nn as nn
import numpy as np

class TransformerEncoder(nn.Module):
  def __init__(self, d_model, n_heads, mlp_ratio =4):
      super().__init__()

      self.d_model = d_model
      self.n_heads = n_heads

      self.ln1 = nn.LayerNorm(d_model)

      self.mha = MultiheadAttention(d_model, n_heads)

      self.ln2 = nn.LayerNorm(d_model)

      self.mlp = nn.Sequential(
          nn.Linear(d_model, d_model*mlp_ratio),
          nn.GELU(),
          nn.Linear(d_model * mlp_ratio, d_model)
      )

  #For clip even though its a encoder model it requires mask ->to account for padded for max seq_length
  def forward(self, x, mask = None):

      x_n = self.mha(self.ln1(x), mask = mask)
      x = x + self.mlp(self.ln2(x_n))

      return x  # x.shape -->  [B,max_seq_len,d_model]


class AttentionHead(nn.Module):
  def __init__(self, d_model, qkv_dim):
    super().__init__()

    self.qkv_dim = qkv_dim

    self.query = nn.Linear(d_model, qkv_dim)
    self.key = nn.Linear(d_model, qkv_dim)
    self.value = nn.Linear(d_model, qkv_dim)

  def forward(self, x, mask=None):
    # x.shape --> [B, max_seq_len, d_model]
    Q = self.query(x)
    K = self.key(x)
    V = self.value(x)

    attention = Q @ K.transpose(-2, -1) #[B, max_seq_len, max_seq_len]
    attention = attention / (self.qkv_dim ** 0.5)
    # apply attention mask for padded sequence
    if mask is not None:
      mask = attention.masked_fill(
          mask == 0, float("-inf")
      )# torch.tensor.masked_fill

    attention = torch.softmax(attention , dim=-1) # (softmax(Q_K^T)/sqrt(d_k)).V

    attention = attention @ V

    return attention # Y_i

class MultiheadAttention(nn.Module):
  def __init__(self, d_model, n_heads):
    super().__init__()
    # d_model --> embed dimension
    # n_heads --> nums of heads
    self.qkv_dim = d_model // n_heads

    self.W_o = nn.Linear(d_model, d_model)
    self.multi_head = nn.ModuleList(
        [AttentionHead(d_model, self.qkv_dim) for _ in range(n_heads)]
    )

  def forward(self, x, mask=None):
    # x.shape --> [B, max_seq, d_model]
    # concatenates the outputs

    out = torch.cat(
        [head(x , mask=mask) for head in self.multi_head], dim=-1
    ) # [ B, max_seq_len, d_model]

    out = self.W_o(out) # [B, max_seq_len, d_model]

    return out

class PositionalEmbedding(nn.Module):
  def __init__(self, d_model, max_seq_len):
    super().__init__()
    self.d_model = d_model
    self.max_seq_len = max_seq_len
    pe = torch.zeros(max_seq_len, d_model)
    position = torch.arange(0, max_seq_len).unsqueeze(1)
    div_term = torch.exp(
        torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)
    )
    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)

    self.register_buffer("pe", pe.unsqueeze(0))

  def forward(self, x):
    # x.shape --> [B, max_seq_len, d_model]
    seq_len = x.size(1)
    return x + self.pe[:, :seq_len]
    # [B, max_seq_len, d_model] + [1, max_seq_len, d_model]


## vit

In [None]:
import torch
from torch import nn
from .transformer import TransformerEncoder, PositionalEmbedding 

class VisionEncoder(nn.Module):
  def __init__(self, d_model, img_size, patch_size,
               n_channels, n_heads, n_layers, emb_dim):
    super().__init__()
    assert (
        img_size[0] % patch_size[0] == 0 and img_size[1] % patch_size[1] == 0
    ), "image dimensions should be divisible by patch dim"
    assert d_model % n_heads == 0, "d_model should be divisible by n_heads"

    self.num_patches = (img_size[0] * img_size[1]) // (patch_size[0] * patch_size[1])
    # max_seq length

    self.max_seq_length = self.num_patches + 1

    self.linear_proj = nn.Conv2d(
        in_channels=n_channels,
        out_channels=d_model,
        kernel_size=patch_size,
        stride=patch_size[0],
    )

    self.cls_token = nn.Parameter(torch.randn(1,1,d_model),requires_grad=True)

    self.positional_embedding = PositionalEmbedding(d_model, self.max_seq_length)

    self.transformer_encoder = nn.ModuleList(
        [TransformerEncoder(d_model, n_heads) for _ in range(n_layers)]
    )

    self.projection = nn.Parameter(torch.randn(d_model, emb_dim))

  def forward(self, x, mask=None):
    x = self.linear_proj(x)
    # [B,C,H,W] -> (B, d_model, patch_col_d_model, patch_row_d_model)
    x = x.flatten(2).transpose(-2, -1)
    # (B, d_model, Patch_col_d_model, Patch_row_height) --> Flatten (B, d_model, Patch) --> .transpose(-2,-1) (B, Patch, d_model)

    x = torch.cat(
        (self.cls_token.expand(x.shape[0], -1, -1), x), dim=1
    )

    x = self.positional_embedding(x)

    for encoder_layer in self.transformer_encoder:
      x = encoder_layer(x, mask)

    x = x[:, 0, :]

    if self.projection is not None:
      x = x @ self.projection

    x = x / torch.norm(x, dim=-1,keepdim=True)

    return x





tokenizer


In [None]:
import torch
import torch.nn as nn
from .transformer import TransformerEncoder, PositionalEmbedding

def tokenizer(text, encode=True, mask=None, max_seq_length=32):
  if encode:
    # CLIP 在输入文本的开头和结尾分别加上 [BOS] (Begin Of Sequence) 和 [EOS] (End Of Sequence) 标记。
    # 这里用 chr(2) 代表 BOS，chr(3) 代表 EOS
    out = chr(2) + text + chr(3)

    # 代码确保所有输出长度一致为 max_seq_length。太长就切掉，太短就补 chr(0)。
    if len(out) > max_seq_length:
      out = out[:max_seq_length]

    out = out + "".join(
        [chr(0) for _ in range(max_seq_length - len(out))]
    )

    out = torch.IntTensor(list(out.encode("utf-8")))

    mask = torch.ones(len(out.nonzero()))

    if len(mask) < max_seq_length:
      mask = torch.cat((mask, torch.zeros(max_seq_length - len(mask)))).type(torch.IntTensor)
    else:
      mask = mask.type(torch.IntTensor)
  else:
    out = [chr(x) for x in text[1: len(mask.nonzero()) - 1]]
    out = "".join(out)
    mask = None
  
  return out, mask

class TextEncoder(nn.Module):
  def __init__(self, vocab_size, d_model, max_seq_length, n_layers,
               n_heads, emb_dim):
    super().__init__()
    self.max_seq_length = max_seq_length
    self.embed = nn.Embedding(vocab_size, d_model)

    self.positional_embedding = PositionalEmbedding(d_model, max_seq_length)

    self.transformer_encoder = nn.ModuleList(
        [TransformerEncoder(d_model, n_heads) for _ in range(n_layers)]
    )

    self.projection = nn.Parameter(torch.randn(d_model, emb_dim))

  def forward(self, text, mask=None):
    x = self.embed(text)
    x = self.positional_embedding(x)

    for encoder_layer in self.transformer_encoder:
      x = encoder_layer(x, mask=mask) # [B, max_seq_length, d_model]
    

    # Transformer 输出的 x 是一个序列，包含每个单词的特征（比如 [SOT, "一只", "狗", EOT, Pad, Pad...]）。
    # 取出每句话最后一个有效字符（即 EOS 标记）对应的向量。python 高级索引 给的是两个列表进行索引

    x = x[
        torch.arange(text.shape[0]), 
        torch.sub(torch.sum(mask[:,0], dim=1),1)
          ]
    
    # 投影到多模态公共空间。
    if self.projection is not None:
      x = x @ self.projection
    
    # L2 归一化
    x = x / torch.norm(x, dim=-1, keepdim=True)

    return x

