In [1]:
import torch
import torch.nn as nn
import numpy as np
import os
import math
import torch.nn.functional as F

In [2]:
vocab_size=51200
seq_len=144
hidden_size=512
batch_size=16

tokenizer分词器（调用bert）

In [3]:
from transformers import BertTokenizer
bert_dir= "D:\code\\vlm-longtail\pretrained\\bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained(bert_dir, truncation_side='right')
tokenizer.add_special_tokens({"bos_token": "[DEC]"})
tokenizer

BertTokenizer(name_or_path='D:\code\vlm-longtail\pretrained\bert-base-uncased', vocab_size=30522, model_max_length=512, is_fast=False, padding_side='right', truncation_side='right', special_tokens={'bos_token': '[DEC]', 'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'}, clean_up_tokenization_spaces=True)

In [4]:
tokens =tokenizer(
            "hello good moring",
            padding="max_length",
            truncation=True,
            max_length=seq_len,
            return_tensors="pt",
        ).input_ids
tokens

tensor([[  101,  7592,  2204, 22993,  3070,   102,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,  

# 位置编码和embedding层

In [77]:

class PositionalEncoding(nn.Module):
    """
    Implements the positional encoding as described in the "Attention is All You Need" paper.
    hidden_size: 模型的维度，必须是偶数
    max_len: 预先计算的最大序列长度
    dropout: Dropout层的比率
    """
    # 位置编码的形状为 (max_len, hidden_size)
    def __init__(self,hidden_size,max_len=5000):
        super(PositionalEncoding, self).__init__()
        # self.dropout = nn.Dropout(p=dropout)

        #创建一个底板tensor，作为填入位置编码的容器
        pe = torch.zeros(max_len,hidden_size)
        position = torch.arange(0,max_len,dtype=torch.float).unsqueeze(1)
        #shape = (max_len,1)
        
        div_term = torch.exp(torch.arange(0,hidden_size,2).float()*(-math.log(10000)/hidden_size))
        #shape = (hidden_size/2,)


        pe[:,0::2] = torch.sin(position*div_term)
        pe[:,1::2] = torch.cos(position*div_term)
        pe = pe.unsqueeze(0) #在第0维增加一个维度，变成 (1, max_len, hidden_size)

        self.register_buffer('pe', pe)  #将pe注册为buffer，这样它不会被视为模型参数，但会被保存和加载

    def forward(self, x):
        """
        x: 输入的tensor，形状为 (batch_size, seq_len, hidden_size)
        返回位置编码后的tensor，形状为 (batch_size, seq_len, hidden_size)
        """
        x = x + self.pe[:,x.size(1),:]
        return x  #应用dropout层
    


In [None]:
embedding = nn.Embedding(vocab_size, hidden_size)
input_ids = torch.randint(0,vocab_size,(batch_size, seq_len))
# input_ids
embedding_output = embedding(input_ids)
pe = PositionalEncoding(max_len=5000, hidden_size=hidden_size)
output = pe(embedding_output)


In [None]:
class Embedding_layer(nn.Module):
    def __init__(self,vocab_size,hidden_size,dropout=0.1,max_len=5000,layernorm_eps=1e-12):
        super(Embedding_layer, self).__init__()
        self.embedding = nn.Embedding(vocab_size,hidden_size)
        self.pe = PositionalEncoding(max_len=max_len, hidden_size=hidden_size)
        self.dropout=nn.Dropout(p=dropout)
        self.layernorm =nn.LayerNorm(hidden_size,eps=layernorm_eps)

    def forward(self, input_ids):
        input_emb = self.embedding(input_ids)
        input_emb = self.pe(input_emb)
        input_emb = self.dropout(input_emb)
        # input_emb = self.layernorm(input_emb)
        return input_emb

# 注意力机制

In [53]:
def scaled_dot_product_attention(q, k, v, mask=None):    
    """
        计算缩放点积注意力
        Args:
            q (torch.Tensor): 查询张量, 形状 (..., seq_len_q, d_k)
            k (torch.Tensor): 键张量, 形状 (..., seq_len_k, d_k)
            v (torch.Tensor): 值张量, 形状 (..., seq_len_v, d_v), seq_len_k == seq_len_v
            mask (torch.Tensor, optional): 掩码张量, 形状 (..., seq_len_q, seq_len_k). Defaults to None.
        Returns:
            torch.Tensor: 输出张量
            torch.Tensor: 注意力权重
    """
    d_k=q.size(-1)
    score = torch.matmul(q,k.transpose(-2,-1)) / math.sqrt(d_k)
    if mask is not None:
        
        score = score.masked_fill(mask==0,-1e9)


    p_attn = F.softmax(score,dim=-1)
    output = torch.matmul(p_attn,v)

    return output,p_attn


class MultiHeadAttention(nn.Module):
    def __init__(self,hidden_size,num_heads=8,dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.head_dim = hidden_size //num_heads

        assert self.head_dim * num_heads == hidden_size

        self.w_q = nn.Linear(hidden_size,hidden_size)
        self.w_k = nn.Linear(hidden_size,hidden_size)
        self.w_v = nn.Linear(hidden_size,hidden_size)
        self.w_o = nn.Linear(hidden_size,hidden_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self,q,k,v,mask=None):
        batch_size = q.size(0)
        q = self.w_q(q).view(batch_size,-1,self.num_heads,self.head_dim).transpose(1,2)
        k = self.w_k(k).view(batch_size,-1,self.num_heads,self.head_dim).transpose(1,2)
        v = self.w_v(v).view(batch_size,-1,self.num_heads,self.head_dim).transpose(1,2)
        # [batch,num_heads,seq_len,head_dim]
        
        output, p_attn = scaled_dot_product_attention(q,k,v,mask)
        
        output = output.view(batch_size,-1,self.num_heads*self.head_dim)
        output = self.w_o(output)
        output = self.dropout(output)
        
        return output
        
        
    

In [52]:

# 实例化多头注意力模块
mha = MultiHeadAttention(hidden_size=hidden_size, num_heads=8)

# 创建假的输入数据 (通常在自注意力中, q, k, v是相同的)
# 形状: (batch_size, seq_len, d_model)
q = torch.randn(batch_size, seq_len, hidden_size)
k = torch.randn(batch_size, seq_len, hidden_size)
v = torch.randn(batch_size, seq_len, hidden_size)

print(f"输入张量的形状: {q.shape}")

# 前向传播
output = mha(q, k, v, mask=None)

print(f"输出张量的形状: {output.shape}")

# 验证一下输出
# 最终输出的维度应该和输入的维度完全一致
assert output.shape == torch.Size([batch_size, seq_len, hidden_size])

print("\n多头注意力模块构建成功！")

输入张量的形状: torch.Size([64, 144, 512])
输出张量的形状: torch.Size([64, 144, 512])

多头注意力模块构建成功！


In [21]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self,hidden_size,ffn_dim,dropout=0.1):
        super(PositionwiseFeedForward,self).__init__()
        self.w_1 = nn.Linear(hidden_size,ffn_dim)
        self.w_2 = nn.Linear(ffn_dim,hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()
    def forward(self,x):
        x = self.w_1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.w_2(x)
        return x

In [22]:
class SublayerConnect(nn.Module):
    def __init__(self,hidden_size,dropout=0.1):
        super(SublayerConnect,self).__init__()
        self.norm = nn.LayerNorm(hidden_size)
        self.dropout = nn.Dropout(dropout)
    def forward(self,x,sublayer):
        residual_output = sublayer(x) + x
        return self.norm(residual_output)

In [28]:
class EncoderLayer(nn.Module):
    def __init__(self,hidden_size):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(hidden_size=hidden_size)
        self.ffn = PositionwiseFeedForward(hidden_size=hidden_size,ffn_dim=hidden_size*4)
        self.sublayer = nn.ModuleList(SublayerConnect(hidden_size=hidden_size) for _ in range(2))
    def forward(self, x, mask):
        x = self.sublayer[0](x,lambda x:self.self_attn(x,x,x,mask))
        x = self.sublayer[1](x , self.ffn)
        return x
    

In [33]:
class DecoderLayer(nn.Module):
    def __init__(self, hidden_size):
        super(DecoderLayer, self).__init__()
        self.mask_attn = MultiHeadAttention(hidden_size=hidden_size)
        self.ffn = PositionwiseFeedForward(hidden_size=hidden_size,ffn_dim=hidden_size*4)
        self.cross_attn = MultiHeadAttention(hidden_size=hidden_size)
        self.sublayer = nn.ModuleList(SublayerConnect(hidden_size=hidden_size) for _ in range(3))
        
    def forward(self,x,memory,src_mask,tgt_mask):
        x = self.sublayer[0](x,lambda x : self.mask_attn(x,x,x,tgt_mask))
        x = self.sublayer[1](x,lambda x : self.cross_attn(x,memory,memory,src_mask))
        x = self.sublayer[2](x,self.ffn)
        return x

In [44]:
def make_src_mask(src, pad_idx):
    """
    为源序列创建掩码
    Args:
        src (torch.Tensor): 源序列张量, 形状 (batch_size, src_len)
        pad_idx (int): padding token的ID
    Returns:
        torch.Tensor: 源序列掩码, 形状 (batch_size, 1, 1, src_len)
    """
    # 1. 创建基础掩码 (True/False tensor)
    # 形状: (batch_size, src_len)
    src_mask = (src != pad_idx)

    # 2. 增加维度以适配多头注意力机制
    # 形状: (batch_size, 1, 1, src_len)
    return src_mask.unsqueeze(1).unsqueeze(2)

# --- 生成并检查 ---
encoder_output  = torch.randint(0,vocab_size,(batch_size, 10))
decoder_input = torch.randint(0,vocab_size,(batch_size, 20))
pad_token_id = 1
src_mask = make_src_mask(encoder_output, pad_token_id)

def make_tgt_mask(tgt, pad_idx):
    """
    为目标序列创建掩码
    Args:
        tgt (torch.Tensor): 目标序列张量, 形状 (batch_size, tgt_len)
        pad_idx (int): padding token的ID
    Returns:
        torch.Tensor: 目标序列掩码, 形状 (batch_size, 1, tgt_len, tgt_len)
    """
    batch_size, tgt_len = tgt.shape
    device = tgt.device
    
    # 1. 创建padding掩码
    # 形状: (batch_size, 1, 1, tgt_len)
    tgt_pad_mask = (tgt != pad_idx).unsqueeze(1).unsqueeze(2)

    # 2. 创建look-ahead掩码
    # 形状: (tgt_len, tgt_len)
    lookahead_mask = torch.tril(torch.ones((tgt_len, tgt_len), device=device)).bool()

    # 3. 合并两个掩码
    # tgt_pad_mask广播为 (batch_size, 1, 1, tgt_len)
    # lookahead_mask广播为 (1, 1, tgt_len, tgt_len) -> (batch_size, 1, tgt_len, tgt_len)
    tgt_mask = tgt_pad_mask & lookahead_mask
    
    return tgt_mask

print("\n--- 源序列掩码 (src_mask) ---")
print("最终形状:", src_mask.shape) # torch.Size([2, 1, 1, 5])
# print("第一个样本的掩码:", src_mask[0]) # tensor([[[[True, True, True, True, True]]]])
# print("第二个样本的掩码:", src_mask[1]) # tensor([[[[True, True, False, False, False]]]])

tgt_mask = make_tgt_mask(decoder_input, pad_token_id)

print("\n--- 目标序列掩码 (tgt_mask) ---")
print("最终形状:", tgt_mask.shape) # torch.Size([2, 1, 6, 6])

print("\n第一个样本的掩码 (无padding):")
# print(tgt_mask[0].squeeze()) # .squeeze()是为了方便查看


--- 源序列掩码 (src_mask) ---
最终形状: torch.Size([64, 1, 1, 10])

--- 目标序列掩码 (tgt_mask) ---
最终形状: torch.Size([64, 1, 20, 20])

第一个样本的掩码 (无padding):


In [51]:
decoder = DecoderLayer(hidden_size=hidden_size)
x = torch.randn((batch_size, 20, hidden_size))
memory = torch.randn((batch_size, 10, hidden_size))
output = decoder(x,memory, src_mask=src_mask, tgt_mask=tgt_mask)

torch.Size([64, 1, 20, 20])
torch.Size([64, 1, 1, 10])


In [None]:
import copy
def clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class Encoder(nn.Module):
    def __init__(self, layer, N):
        """
        核心编码器，是N个相同层的堆栈
        Args:
            layer (EncoderLayer): 一个编码器层实例
            N (int): 堆叠的层数
        """
        super(Encoder, self).__init__()
        # 使用clones函数复制N个编码器层
        self.layers = clones(layer, N)
        # 再添加一个最终的层归一化
        self.norm = nn.LayerNorm(layer.size)

    def forward(self, x, mask):
        """
        依次将输入和掩码传递给每个层
        Args:
            x (torch.Tensor): 输入张量, 形状 (batch_size, seq_len, d_model)
            mask (torch.Tensor): 掩码
        Returns:
            torch.Tensor: 输出张量, 形状 (batch_size, seq_len, d_model)
        """
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)