In [None]:
import numpy as np
from collections import OrderedDict
import torch
from torch import nn
from typing import Tuple, Union

1.clip的训练方式为对比学习，论文中图2的bag of words prediction和tansformer language model代表什么？

2.图像编码器和文本编码器的forward过程。

3.如何对CLIP进行finetune。

In [None]:
# CLIP的输入包括两部分，image和text，分别对其进行预处理后得到送入到网络结构中的embedding。
import torch
import clip
from PIL import Image

device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)

# 图像预处理 
'''
图像变换函数，对缩放后的图像进行随机正方形裁剪是训练过程中唯一的数据增强方法。

Compose(
step 1: Resize(size=224, interpolation=bicubic, max_size=None, antialias=None)
step 2: CenterCrop(size=(224, 224))
step 3: <function _convert_image_to_rgb at 0x7f4e27b1f0d0>
step 4: ToTensor()
        Normalize(mean=(0.48145466, 0.4578275, 0.40821073), std=(0.26862954, 0.26130258, 0.27577711))   
)

step 5: Normalize(mean=(0.48145466, 0.4578275, 0.40821073), std=(0.26862954, 0.26130258, 0.27577711))
<shape: torch.Size([3, 224, 224])>
正则化：(pixel_value - mean)/std

step 6:unsqueeze(0)
<shape: torch.Size([1, 3, 224, 224])>
维度扩充：CHW 扩充一维变为NCHW。
''' 
image = preprocess(Image.open("CLIP.png")).unsqueeze(0).to(device)


In [None]:
# 文本预处理
'''
1.使用tokenizer将text进行分词处理，根据vocabulary获得对应id；
可以通过clip.tokenize()调用不区分大小写的分词器，
使用到了BPE(Byte-Pair Encoding)。
默认情况下，输出被填充为77个tokens(用0进行填充，表示的字符为"!")。

2. 根据词表id索引到对应的token_embedding。
'''

text = clip.tokenize(["a diagram", "a dog", "a cat"]).to(device)

# 图像编码

In [None]:
class LayerNorm(nn.LayerNorm):
    """Subclass torch's LayerNorm to handle fp16."""

    def forward(self, x: torch.Tensor):
        orig_type = x.dtype
        ret = super().forward(x.type(torch.float32))
        return ret.type(orig_type)

class QuickGELU(nn.Module):
    def forward(self, x: torch.Tensor):
        return x * torch.sigmoid(1.702 * x)


class ResidualAttentionBlock(nn.Module):
    def __init__(self, d_model: int, n_head: int, attn_mask: torch.Tensor = None):
        super().__init__()

        self.attn = nn.MultiheadAttention(d_model, n_head)
        self.ln_1 = LayerNorm(d_model)
        self.mlp = nn.Sequential(OrderedDict([
            ("c_fc", nn.Linear(d_model, d_model * 4)),
            ("gelu", QuickGELU()),
            ("c_proj", nn.Linear(d_model * 4, d_model))
        ]))
        self.ln_2 = LayerNorm(d_model)
        self.attn_mask = attn_mask

    def attention(self, x: torch.Tensor):
        self.attn_mask = self.attn_mask.to(dtype=x.dtype, device=x.device) if self.attn_mask is not None else None
        return self.attn(x, x, x, need_weights=False, attn_mask=self.attn_mask)[0]

    def forward(self, x: torch.Tensor):
        x = x + self.attention(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x
    
class Transformer(nn.Module):
    def __init__(self, width: int, layers: int, heads: int, attn_mask: torch.Tensor = None):
        super().__init__()
        self.width = width
        self.layers = layers
        self.resblocks = nn.Sequential(*[ResidualAttentionBlock(width, heads, attn_mask) for _ in range(layers)])

    def forward(self, x: torch.Tensor):
        return self.resblocks(x)


In [None]:
#  ViT   model.encode_image()调用模型 
class VisionTransformer(nn.Module):
    def __init__(self, input_resolution: int, patch_size: int, width: int, layers: int, heads: int, output_dim: int):
        super().__init__()
        self.input_resolution = input_resolution
        self.output_dim = output_dim
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=width, kernel_size=patch_size, stride=patch_size, bias=False)
        # width相当于transform中的d_model
        scale = width ** -0.5
        self.class_embedding = nn.Parameter(scale * torch.randn(width))
        self.positional_embedding = nn.Parameter(scale * torch.randn((input_resolution // patch_size) ** 2 + 1, width))
        self.ln_pre = LayerNorm(width)
 
        self.transformer = Transformer(width, layers, heads)
 
        self.ln_post = LayerNorm(width)
        self.proj = nn.Parameter(scale * torch.randn(width, output_dim))
 
    def forward(self, x: torch.Tensor):
        # x:[1,3,224,224]
        x = self.conv1(x)  # shape = [*, width, grid, grid] # 将图片分成[32,32]个patch [1,768,7,7]
        x = x.reshape(x.shape[0], x.shape[1], -1)  # shape = [*, width, grid ** 2],合并高宽 [1,768,49]
        x = x.permute(0, 2, 1)  # shape = [*, grid ** 2, width] ，更换位置 [1,49,768]
        x = torch.cat([self.class_embedding.to(x.dtype) + torch.zeros(x.shape[0], 1, x.shape[-1], dtype=x.dtype, device=x.device), x], dim=1)  # shape = [*, grid ** 2 + 1, width],添加cls token[1,50,768]
        x = x + self.positional_embedding.to(x.dtype)  # 这里位置编码是可学习的参数，可能是切了path顺序让模型自己学习吧  [1,50,768]
        x = self.ln_pre(x)  # [1,50,768]
 
        x = x.permute(1, 0, 2)  # NLD -> LND  # [pixel,b,d_model]=[50,1,768]
        x = self.transformer(x)  # 多头transformer [50,1,768]
        x = x.permute(1, 0, 2)  # LND -> NLD  # [1,50,768]
 
        x = self.ln_post(x[:, 0, :])  # x[:, 0, :] 将所有信息汇聚到cls token中，只需前面来做下游任务 [1,768]
 
        if self.proj is not None:  # self.proj是可学习参数，维度为[768,512]
            x = x @ self.proj  # 通过学习参数将维度再次融合变成512特征，最终为[1,512]
 
        return x
   

# 文本编码

In [None]:
class CLIP():

    def __init__(self,
                 embed_dim: int,
                  # vision
                 image_resolution: int,
                 vision_layers: Union[Tuple[int, int, int, int], int],
                 vision_width: int,
                 vision_patch_size: int,
                 # text
                 context_length: int,
                 vocab_size: int,
                 transformer_width: int,
                 transformer_heads: int,
                 transformer_layers: int):
        super().__init__()

        self.context_length = context_length
        vision_heads = vision_width // 64

        '''
        图像token序列加入一个extra learnable [class] embedding, 
        [class] token学习到的embedding作为图像的输出embedding,与原始vit论文类似。
        另外，图像编码器也可以使用ResNet，但是clip中对原始的ResNet有较大幅度的修改，详情看代码。
        '''

        self.visual = VisionTransformer(
                input_resolution=image_resolution,
                patch_size=vision_patch_size,
                width=vision_width,
                layers=vision_layers,
                heads=vision_heads,
                output_dim=embed_dim
            )

        self.transformer = Transformer(
            width=transformer_width,
            layers=transformer_layers,
            heads=transformer_heads,
            attn_mask=self.build_attention_mask()
        )
        # 将单词（token）转换为密集的词嵌入向量
        self.token_embedding = nn.Embedding(vocab_size, transformer_width)   
        self.positional_embedding = nn.Parameter(torch.empty(self.context_length, transformer_width))
        self.ln_final = LayerNorm(transformer_width)
        # 文本特征转换，可学习的参数
        self.text_projection = nn.Parameter(torch.empty(transformer_width, embed_dim))   
    
    # 图片编码特征
    def encode_image(self, image):
        return self.visual(image.type(self.dtype))


# BERT
    # 文本编码 [batch_size(句子数), n_ctx(句子中的单词数，不够补0)] [3, 77]
    def encode_text(self, text):   
        # x 每个句子前面有[CLS]，最后有[Seq]
        x = self.token_embedding(text).type(self.dtype)  # 维度表示 [3，77，512]表示： [batch_siz(句子数), n_ctx(句子中的单词数，不够补0), d_model(嵌入层维度)] 

        # 可学习的位置编码，[3, 77, 512] 
        x = x + self.positional_embedding.type(self.dtype) 

        x = x.permute(1, 0, 2)  # NLD -> LND [77, 3, 512]
        x = self.transformer(x)   # Transformer encoder [77, 3, 512]
        x = x.permute(1, 0, 2)  # LND -> NLD  [3, 77, 512]
        x = self.ln_final(x).type(self.dtype)  # LN层
    
        # x.shape = [batch_size, n_ctx, transformer.width]
        # 结束符"<|endoftext|>"的编号最大，得到该位置的embedding
        # 获取每个句子最后一个seq字段，seq是最大的，因此能获得句子中的单词数    【【take features from the eot embedding (eot_token is the highest number in each sequence)
        x = x[torch.arange(x.shape[0]), text.argmax(dim=-1)] @ self.text_projection   # 矩阵乘法
    
        return x


In [None]:
# attention_mask
def build_attention_mask(self):
    # lazily create causal attention mask, with full attention between the vision tokens
    # pytorch uses additive attention mask; fill with -inf
    mask = torch.empty(self.context_length, self.context_length)
    mask.fill_(float("-inf"))
    mask.triu_(1)  # zero out the lower diagonal
    return mask

'''
当self.context_length = 5时，返回得到的mask如下，
tensor([[0., -inf, -inf, -inf, -inf],
        [0.,   0., -inf, -inf, -inf],
        [0.,   0.,   0., -inf, -inf],
        [0.,   0.,   0.,   0., -inf],
        [0.,   0.,   0.,   0.,   0.]])
这是一个下三角矩阵，阻止对某些位置的attention，使得每个token只关注Transformer的自注意力层中的左侧标记。

以text = "Hello World!"为例，下方矩阵代表attention map，'v'代表非0值。
                  开始    hello  world    !     结束
         tensor([[49406,  3306,  1002,   256, 49407]]) 
tensor([[49406,     v       0      0      0      0
          3306,     v       v      0      0      0
          1002,     v       v      v      0      0
           256,     v       v      v      v      0
         49407]])   v       v      v      v      v

attention的mask机制屏蔽了token对其右侧邻居token的感受野，
"!"可以学习到"Hello"和"World"的特征，
而"Hello"只能学习到开始符和自身的特征。
这是从左到右的单向embedding学习，并且以结束符的embedding作为整个text的输出embedding。

# text_projection
self.text_projection = nn.Parameter(torch.empty(transformer_width, embed_dim))
'''