In [2]:
import torch
from torch import nn
import torch.nn.functional as F
import math



In [35]:
class TokenEmbedding(nn.Embedding):#这个类用于将离散的词汇索引（通常是整数）映射到连续的密集向量
    def __init__(self,vocab_size,d_model):#d_model是qkv空间的维度
        super(TokenEmbedding,self).__init__(vocab_size,d_model,padding_idx=1)

In [36]:
class PositionalEmbedding(nn.Module):
    def __init__(self,d_model,maxlen,device):
        super(PositionalEmbedding,self).__init__()
        self.encoding=torch.zeros(maxlen,d_model,device=device)
        self.encoding.requires_grad_(False)

        pos=torch.arange(0,maxlen,device=device)
        pos=pos.float().unsqueeze(1)#加一维,变成向量
        _2i=torch.arange(0,d_model,2,device=device)
        self.encoding[:,0::2] = torch.sin(pos/(10000**(_2i/d_model)))#broadcast机制
        self.encoding[:,1::2] = torch.cos(pos/(10000**(_2i/d_model)))

    def forward(self,x):
        seq_leng=x.shape[1]#x 的 形状 ： batch sequenceline dimension
        return self.encoding[:seq_leng,:]#选择前seq_leng行，所有列

In [37]:
class TransformerEmbedding(nn.Module):
    def __init__(self,vocab_size,d_model,maxlen,dropout,device):
        super(TransformerEmbedding,self).__init__()
        self.tok_emb=TokenEmbedding(vocab_size,d_model)
        self.pos_emb=PositionalEmbedding(d_model,maxlen,device)
        self.drop=nn.Dropout(dropout)
    
    def forward(self,x):
        tok_emb=self.tok_emb(x)
        pos_emb=self.pos_emb(x)
        return self.drop(tok_emb+pos_emb)#将token嵌入和pos嵌入相加，再通过dropout
        

![image.png](./picture/layer.png)

In [38]:
class LayerNorm(nn.Module):#图像用BatchNorm比较多 
    def __init__(self, d_model, eps=1e-10):
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(d_model))
        self.beta = nn.Parameter(torch.zeros(d_model))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        var = x.var(-1, unbiased=False, keepdim=True)
        out = (x - mean) / torch.sqrt(var + self.eps)
        out = self.gamma * out + self.beta
        return out

![image.png](./picture/ffn.png)

In [39]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self,d_model,hidden,dropout=0.1):
        super(PositionwiseFeedForward,self).__init__()
        self.fc1=nn.Linear(d_model,hidden)
        self.fc2=nn.Linear(hidden,d_model)
        self.dropout=nn.Dropout(dropout)
    
    def forward(self,x):
        feature_map=self.fc1(x)
        feature_map=F.relu(feature_map)
        feature_map=self.fc2(feature_map)
        feature_map=self.dropout(feature_map)#在哪里dropout
        return feature_map


In [40]:
class GroupQueryAttention(nn.Module):
    def __init__(self,d_model,n_heads,n_groups):
        super(GroupQueryAttention,self).__init__()
        self.d_model=d_model
        self.n_heads=n_heads
        self.n_groups=n_groups

        assert d_model%n_heads == 0
        self.n_heads_groups=self.n_heads//self.n_groups#整除操作符
        self.head_dim=d_model//self.n_heads

        self.w_q=nn.Linear(d_model,d_model)
        self.w_l=nn.Linear(d_model,self.n_groups*self.head_dim)#这里的维数
        self.w_v=nn.Linear(d_model,self.n_groups*self.head_dim)

        self.w_combine=nn.Linear(d_model,d_model)#将多头输出合并为单头输出
        self.softmax=nn.Softmax(dim=-1)
    """
    1. 使用`[:,:,None,:,:]`在第二个维度（即头数维度）上添加一个新的维度，大小为1。  
    2. 使用`expand`将新维度扩展到`n_groups`和`n_heads_groups`。  
    3. 使用`contiguous`确保张量是连续的（这有助于后续的`view`操作）。  
    4. 使用`view`改变张量的形状，使其变为`[batch, n_groups * n_heads_groups, time, head_dim]`。
    """
    def expand(self,data):
        batch,time=data.shape[0],data.shape[2]
        data=data[:,:,None,:,:].expand(batch,self.n_groups,self.n_heads_groups,time,self.head_dim).contiguous()
        data=data.view(batch,self.n_groups*self.n_heads_groups,time,self.head_dim)

    def forward(self,q,k,v,mask=None):
        q=self.w_q(q)
        k=self.w_k(k)
        v=self.w_v(v)

        batch=q.shape[0]    
        q=q.view(batch,-1,self.n_groups*self.n_heads_groups,self.head_dim).permute(0,2,1,3)#permute 方法用于重新排列张量的维度。
        k=k.view(batch,-1,self.n_groups,self.head_dim).permute(0,2,1,3)
        v=v.view(batch,-1,self.n_groups,self.head_dim).permute(0,2,1,3)

        k=self.expand(k)
        v=self.expand(v)
        score=q@k.transpose(2,3)/math.sqrt(self.head_dim)#@是矩阵乘法matmul
        if mask is not None:
            score=score.masked_fill(mask==0,-1e9)
        score=self.softmax(score)@v
        #.contiguous 确保张量是连续的（contiguous）在内存中。在 PyTorch 中，当张量经过某些操作（如 transpose、permute 等）后，其内存可能不再是连续的，这可能导致后续的某些操作（如 view）失败。
        score=score.permute(0,2,1,3).contiguous().view(batch,-1,self.d_model)
        output=self.w_combine(score)
        return output
    


In [41]:
class MultiHeadAttention(nn.Module):
    def __init__(self,d_model,n_head):
        super(MultiHeadAttention,self).__init__()
        self.n_head = n_head
        self.d_model = d_model
        self.w_q = nn.Linear(d_model, d_model)#query
        self.w_k = nn.Linear(d_model, d_model)#key
        self.w_v = nn.Linear(d_model, d_model)#value
        self.w_combine = nn.Linear(d_model, d_model)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, q, k, v, mask=None):
        batch, time, dimension = q.shape
        n_d = self.d_model // self.n_head#整除
        q, k, v = self.w_q(q), self.w_k(k), self.w_v(v)

        q = q.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)#通道维度交换
        k = k.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)
        v = v.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)

        score = q @ k.transpose(2, 3) / math.sqrt(n_d)
        if mask is not None:
            # mask = torch.tril(torch.ones(time, time, dtype=bool))
            score = score.masked_fill(mask == 0, -1e9)#在softmax时把0的地方设置为负无穷
        score = self.softmax(score) @ v

        score = score.permute(0, 2, 1, 3).contiguous().view(batch, time, dimension)#通道维度交换回来    

        output = self.w_combine(score)
        return output

In [42]:
class EncoderLayer(nn.Module):
    def __init__(self,d_model,ffn_hidden,n_head,drop_prob)->None:
        super(EncoderLayer,self).__init__()
        self.attention=MultiHeadAttention(d_model,n_head)
        self.norm1=LayerNorm(d_model)
        self.drop1=nn.Dropout(drop_prob)

        self.ffn=PositionwiseFeedForward(d_model,ffn_hidden,drop_prob)
        self.norm2=LayerNorm(d_model)
        self.drop2=nn.Dropout(drop_prob)

    def forward(self,x,mask=None):
        _x=x
        x=self.attention(x,x,x,mask)
        x=self.drop1(x)
        x=self.norm1(x+_x)

        _x=x
        x=self.ffn(x)
        x=self.drop2(x)
        x=self.norm2(x+_x)
        return x
    

In [43]:
class DecoderLayer(nn.Module):
    def __init__(self,d_model,ffn_hidden,n_head,drop_prob)->None:
        super(DecoderLayer,self).__init__()
        self.attention1=MultiHeadAttention(d_model,n_head)
        self.norm1=LayerNorm(d_model)
        self.drop1=nn.Dropout(drop_prob)

        self.attention2=MultiHeadAttention(d_model,n_head)
        self.norm2=LayerNorm(d_model)
        self.drop2=nn.Dropout(drop_prob)

        self.ffn=PositionwiseFeedForward(d_model,ffn_hidden,drop_prob)
        self.norm3=LayerNorm(d_model)
        self.drop3=nn.Dropout(drop_prob)

    def forward(self,dec,enc,t_mask,s_mask):
        _x=dec
        x=self.attention1(dec,dec,dec,t_mask)#下三角掩码
        x=self.drop1(x)
        x=self.norm1(x+_x)

        if enc is not None:
            _x=x
            x=self.attention2(x,enc,enc,s_mask)
            x=self.drop2(x)
            x=self.norm2(x+_x)
        _x=x
        x=self.ffn(x)
        x=self.drop3(x)
        x=self.norm3(x+_x)
        return x


In [44]:
class Encoder(nn.Module):
    def __init__(self,enc_voc_size,max_len,d_model,ffn_hidden,n_head,n_layer,drop_prob,device):
        super(Encoder,self).__init__()
        self.embedding=TransformerEmbedding(enc_voc_size,d_model,max_len,drop_prob,device)
        #这行代码实际上使用了列表推导式（list comprehension）而不是显式的for循环，但它背后的概念是相同的：重复执行某个操作（在这个例子中是创建EncoderLayer对象）n_layer次。
        self.layers=nn.ModuleList([EncoderLayer(d_model,ffn_hidden,n_head,drop_prob)
                                   for _ in range(n_layer)
                                   ])
    def forward(self,x,s_mask):
        x=self.embedding(x)
        for layer in self.layers:
            x=layer(x,s_mask)
        return x
    

In [45]:
class Decoder(nn.Module):
    def __init__(self,dec_voc_size,max_len,d_model,ffn_hidden,n_head,n_layer,drop_prob,device):
        super(Decoder,self).__init__()
        self.embedding=TransformerEmbedding(dec_voc_size,d_model,max_len,drop_prob,device)
        self.layers=nn.ModuleList([DecoderLayer(d_model,ffn_hidden,n_head,drop_prob)
                                   for _ in range(n_layer)
                                   ])
        self.fc=nn.Linear(d_model,dec_voc_size)

    def forward(self,dec,enc,t_mask,s_mask):
        dec=self.embedding(dec)
        for layer in self.layers:
            dec=layer(dec,enc,t_mask,s_mask)
        dec=self.fc(dec)

        return dec
    

In [46]:
class Transformer(nn.Module):
    def __init__(
        self,
        src_pad_idx,
        trg_pad_idx,
        enc_voc_size,
        dec_voc_size,
        max_len,
        d_model,
        n_heads,
        ffn_hidden,
        n_layers,
        drop_prob,
        device,
    ):
        super(Transformer, self).__init__()
        self.encoder = Encoder(
            enc_voc_size,
            max_len,
            d_model,
            ffn_hidden,
            n_heads,
            n_layers,
            drop_prob,
            device,
        )
        self.decoder = Decoder(
            dec_voc_size,
            max_len,
            d_model,
            ffn_hidden,
            n_heads,
            n_layers,
            drop_prob,
            device,
        )

        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device
    def make_pad_mask(self,q,k,pad_idx_q,pad_idx_k):
        len_q,len_k=q.size(1),k.size(1)#.size和.shape有什么区别
#         # (Batch, Time, len_q, len_k)
#         * `q.ne(pad_idx_q)`: 检查`q`中的每个元素是否不等于`pad_idx_q`，返回一个布尔张量，其中True表示非填充元素，False表示填充元素。  
# * `unsqueeze(1)`和`unsqueeze(3)`: 在第二维和第四维上增加一个维度，使张量的形状从`(Batch, len_q)`变为`(Batch, 1, len_q, 1)`。  
# * `repeat(1, 1, 1, len_k)`: 在第四维上重复张量`len_k`次，使其形状变为`(Batch, 1, len_q, len_k)`。
        q = q.ne(pad_idx_q).unsqueeze(1).unsqueeze(3)
        q = q.repeat(1, 1, 1, len_k)

        k = k.ne(pad_idx_k).unsqueeze(1).unsqueeze(2)
        k = k.repeat(1, 1, len_q, 1)

        mask = q & k
        return mask

    def make_causal_mask(self, q, k):
        len_q, len_k = q.size(1), k.size(1)
        mask = (
        # """
        # * `torch.ones(len_q, len_k)`: 创建一个形状为`(len_q, len_k)`的全1矩阵。  
        # * `torch.tril(...)`: 使用`torch.tril`函数，我们保留矩阵的下三角部分（包括对角线），并将其余部分设置为0。在因果掩码中，下三角部分（包括对角线）为True，表示这些位置上的元素在计算自注意力时可以“看到”或“注意”到；而上三角部分为False，表示在计算当前位置的注意力时，不能“看到”未来的位置。  
        # * `.type(torch.BoolTensor)`: 将结果矩阵的数据类型转换为布尔类型（True/False）。  
        # * `.to(self.device)`: 将掩码移动到与类实例关联的设备上（可能是CPU或某个GPU）。
        # """
            torch.tril(torch.ones(len_q, len_k)).type(torch.BoolTensor).to(self.device)
        )
        return mask

    def forward(self, src, trg):
        src_mask = self.make_pad_mask(src, src, self.src_pad_idx, self.src_pad_idx)
        trg_mask = self.make_pad_mask(
            trg, trg, self.trg_pad_idx, self.trg_pad_idx
        ) * self.make_causal_mask(trg, trg)#按元素相乘
        src_trg_mask = self.make_pad_mask(trg, src, self.trg_pad_idx, self.src_pad_idx)

        enc = self.encoder(src, src_mask)
        ouput = self.decoder(trg, enc, trg_mask, src_trg_mask)
        return ouput
    




In [47]:
def initialize_weights(m):
    if hasattr(m, "weight") and m.weight.dim()>1:
        nn.init.kaiming_uniform(m.weight.data)

        

In [50]:
if __name__ == "__main__":
    enc_voc_size = 5893
    dec_voc_size = 7853
    src_pad_idx = 1
    trg_pad_idx = 1
    trg_sos_idx = 2
    batch_size = 128
    max_len = 1024
    d_model = 512
    n_layers = 3
    n_heads = 2
    ffn_hidden = 1024
    drop_prob = 0.1
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    model = Transformer(
        src_pad_idx=src_pad_idx,
        trg_pad_idx=trg_pad_idx,
        d_model=d_model,
        enc_voc_size=enc_voc_size,
        dec_voc_size=dec_voc_size,
        max_len=max_len,
        ffn_hidden=ffn_hidden,
        n_heads=n_heads,
        n_layers=n_layers,
        drop_prob=drop_prob,
        device=device,
    ).to(device)

    model.apply(initialize_weights)
    src = torch.load("tensor_src.pt")
    src = torch.cat((src, torch.ones(src.shape[0], 2, dtype=torch.int)), dim=-1)
    trg = torch.load("tensor_trg.pt")

    result = model(src, trg)
    print(result, result.shape)
    print("------done-------")

  nn.init.kaiming_uniform(m.weight.data)


tensor([[[-0.6030,  0.0775, -0.2066,  ..., -0.1603,  0.6623,  0.0877],
         [-0.9200, -1.3989,  0.9649,  ...,  1.6006,  0.7463, -0.1547],
         [-0.4330, -0.9066,  0.5620,  ...,  2.5040,  1.5987,  0.9786],
         ...,
         [-0.2800, -1.7155,  0.1142,  ...,  1.9078,  1.4097, -0.0481],
         [-0.1703, -0.2110,  1.2231,  ...,  1.7769,  2.7302, -0.5516],
         [-0.5018, -0.6823,  0.8812,  ...,  3.5247,  2.0960, -0.7109]],

        [[ 1.3253, -1.5052,  0.8819,  ...,  0.5924,  2.2748,  0.5442],
         [ 1.8348, -2.2479,  0.3603,  ...,  2.0228,  1.9620,  0.5682],
         [ 1.0710, -1.0933,  0.7381,  ...,  1.8463,  1.5230, -0.2546],
         ...,
         [ 1.2449,  0.0946,  0.0158,  ...,  1.9686,  2.2020,  0.4011],
         [-0.6390, -0.2855,  2.8089,  ...,  1.4181,  3.3454, -0.8175],
         [-0.1937, -0.7905,  0.7057,  ...,  2.4416,  1.2948, -1.4453]],

        [[-0.4480, -0.1526,  0.7066,  ...,  1.6856,  2.2949,  0.3683],
         [ 0.5991, -0.0410,  0.6618,  ...,  1