In [1]:
import math
import einops
import torch
import random

import numpy as np
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

  import pynvml  # type: ignore[import]


Using device: cuda


In [2]:
# 数据生成部分，定义词典（数据生成的范围）
# 定义字典
zidian_x = '<SOS>,<EOS>,<PAD>,0,1,2,3,4,5,6,7,8,9,q,w,e,r,t,y,u,i,o,p,a,s,d,f,g,h,j,k,l,z,x,c,v,b,n,m'
zidian_x = {word: i for i, word in enumerate(zidian_x.split(','))}

zidian_xr = [k for k, v in zidian_x.items()]

zidian_y = {k.upper(): v for k, v in zidian_x.items()}

zidian_yr = [k for k, v in zidian_y.items()]
# 生成数据逻辑就是：X是正序小写数字，Y是逆序大写互补数字
def get_data():
    # 定义词集合
    words = [
        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'q', 'w', 'e', 'r',
        't', 'y', 'u', 'i', 'o', 'p', 'a', 's', 'd', 'f', 'g', 'h', 'j', 'k',
        'l', 'z', 'x', 'c', 'v', 'b', 'n', 'm'
    ]

    # 定义每个词被选中的概率
    p = np.array([
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
        13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26
    ])
    p = p / p.sum()

    # 随机选n个词
    n = random.randint(30, 48)
    x = np.random.choice(words, size=n, replace=True, p=p)

    # 采样的结果就是x
    x = x.tolist()

    # y是对x的变换得到的
    # 字母大写,数字取10以内的互补数
    def f(i):
        i = i.upper()
        if not i.isdigit():
            return i
        i = 9 - int(i)
        return str(i)

    y = [f(i) for i in x]
    y = y + [y[-1]]
    # 逆序
    y = y[::-1]

    # 加上首尾符号
    x = ['<SOS>'] + x + ['<EOS>']
    y = ['<SOS>'] + y + ['<EOS>']

    # 补pad到固定长度
    x = x + ['<PAD>'] * 50
    y = y + ['<PAD>'] * 51
    x = x[:50]
    y = y[:51]

    # 编码成数据
    x = [zidian_x[i] for i in x]
    y = [zidian_y[i] for i in y]

    # 转tensor
    x = torch.LongTensor(x)
    y = torch.LongTensor(y)

    return x, y

# 定义数据集
class Dataset(torch.utils.data.Dataset):
    def __init__(self):
        super(Dataset, self).__init__()

    def __len__(self):
        return 1000000

    def __getitem__(self, i):
        return get_data()

# 数据加载器

print('数据加载器定义完成')

数据加载器定义完成


In [3]:
# mask
import torch

from data import zidian_x, zidian_y

# mask pad
def mask_pad(data):
    mask = data == zidian_x['<PAD>']

    # [b, 50] -> [b, 1, 1, 50]
    mask = mask.reshape(-1, 1, 1, 50)

    # 在计算注意力时,是计算50个词和50个词相互之间的注意力,所以是个50*50的矩阵
    # 是pad的列是true,意味着任何词对pad的注意力都是0
    # 但是pad本身对其他词的注意力并不是0
    # 所以是pad的行不是true

    # 复制n次
    # [b, 1, 1, 50] -> [b, 1, 50, 50]
    mask = mask.expand(-1, 1, 50, 50)

    return mask

# 在Y预测阶段变为上三角
def mask_tril(data):
    # b句话,每句话50个词,这里是还没embed的
    # data = [b, 50]

    # 50*50的矩阵表示每个词对其他词是否可见
    # 上三角矩阵,不包括对角线,意味着,对每个词而言,他只能看到他自己,和他之前的词,而看不到之后的词
    # [1, 50, 50]
    """
    [[0, 1, 1, 1, 1],
     [0, 0, 1, 1, 1],
     [0, 0, 0, 1, 1],
     [0, 0, 0, 0, 1],
     [0, 0, 0, 0, 0]]"""
    tril = 1 - torch.tril(torch.ones(1, 50, 50, dtype=torch.long)).to(device)

    # 判断y当中每个词是不是pad,如果是pad则不可见
    # [b, 50]
    mask = data == zidian_y['<PAD>']

    # 变形+转型,为了之后的计算
    # [b, 1, 50]
    mask = mask.unsqueeze(1).long()

    # mask和tril求并集
    # [b, 1, 50] + [1, 50, 50] -> [b, 50, 50]
    mask = mask + tril

    # 转布尔型
    mask = mask > 0

    # 转布尔型,增加一个维度,便于后续的计算
    mask = (mask == 1).unsqueeze(dim=1)

    return mask


数据加载器定义完成


In [4]:
# myPositionEmbedding, 位置编码
class myPositionEmbedding(torch.nn.Module):
    def __init__(self,voc_size,max_len,emb_dim):
        super().__init__()
        # 创建嵌入层 "b,50->b,50,32"
        self.emb=torch.nn.Embedding(voc_size,emb_dim)
        self.emb.weight.data.normal_(0, 0.1)

        # 创建PE参数记得把他移动到模型中哦
        pe=torch.zeros(max_len, emb_dim,dtype=torch.float32)
        # for嵌套复杂度太高了，
        pos = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        i = torch.arange(0, emb_dim, 2, dtype=torch.float32)
        inv_freq = 1.0 / (1.0e-3 ** (i / emb_dim))
        # 广播机制（50,1） （32）
        argument = pos * inv_freq
        pe[:, 0::2] = torch.sin(argument)
        pe[:, 1::2] = torch.cos(argument)
        # 送到模型中，并且不更新
        self.register_buffer("pe",pe)

    def forward(self,x):
        x=self.emb(x)
        return x+self.pe
    


In [5]:
# 多头注意力机制
class MultiHead(torch.nn.Module):
    def __init__(self,head_num,in_ch):
        super().__init__()
        self.wq=torch.nn.Linear(in_ch,in_ch)
        self.wk=torch.nn.Linear(in_ch,in_ch)
        self.wv=torch.nn.Linear(in_ch,in_ch)
        self.num_head=head_num
        self.in_ch=in_ch
        self.norm = torch.nn.LayerNorm(normalized_shape=in_ch, elementwise_affine=True)
        self.dropout = torch.nn.Dropout(p=0.1)
        self.out_fc=torch.nn.Linear(in_ch,in_ch)

    def forward(self,x,mask,cros_flag,VX=0):
        # x->[b,l,d]
        if cros_flag==0:
            res=x.clone()
            x = self.norm(x)
            Q= einops.rearrange(self.wq(x),"... b (d e) ->... d b e ",d=self.num_head)
            K=einops.rearrange(self.wk(x),"... b (d e) ->... d b e ",d=self.num_head)
            V=einops.rearrange(self.wv(x),"... b (d e) ->... d b e ",d=self.num_head)
            
            # score = torch.matmul(Q, K.permute(0, 1, 3, 2))
            # score /= 8 ** 0.5
            # score = score.masked_fill_(mask, -float('inf'))
            # score = torch.softmax(score, dim=-1)
            # z= torch.matmul(score, V)
            z=torch.softmax(((Q@K.permute(0, 1, 3, 2))/((self.in_ch/self.num_head)**0.5)).masked_fill_(mask, -float('inf')),dim=-1)@V

            z=einops.rearrange(z,"... a b c -> ... b (a c)")

            return self.dropout(self.out_fc(z))+res
        else:
            res=x.clone()
            x = self.norm(x)
            VX= self.norm(VX)
            Q= einops.rearrange(self.wq(x),"... b (d e) ->... d b e ",d=self.num_head)
            K=einops.rearrange(self.wk(VX),"... b (d e) ->... d b e ",d=self.num_head)
            V=einops.rearrange(self.wv(VX),"... b (d e) ->... d b e ",d=self.num_head)

            z=torch.softmax(((Q@K.permute(0, 1, 3, 2))/((self.in_ch/self.num_head)**0.5)).masked_fill_(mask, -float('inf')),dim=-1)@V
            # score = torch.matmul(Q, K.permute(0, 1, 3, 2))
            # score /= 8 ** 0.5
            # score = score.masked_fill_(mask, -float('inf'))
            # score = torch.softmax(score, dim=-1)
            # z= torch.matmul(score, V)
            z=einops.rearrange(z,"... a b c -> ... b (a c)")

            return self.dropout(self.out_fc(z))+res


In [6]:
# 全连接组成
class FullyConnected(torch.nn.Module):
    def __init__(self,in_ch):
        super().__init__()
        self.fc = torch.nn.Sequential(
            torch.nn.Linear(in_features=in_ch, out_features=in_ch*2),
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=in_ch*2, out_features=in_ch),
            torch.nn.Dropout(p=0.1),
        )

        self.norm = torch.nn.LayerNorm(normalized_shape=32,
        elementwise_affine=True)
    
    def forward(self,x):
        res=x.clone()
        x=self.norm(x)
        x=self.fc(x)
        return x+res

In [7]:
# 编码器组成
class EncoderLayer(torch.nn.Module):
    def __init__(self,num_head,in_ch ):
        super().__init__()
        self.mul=MultiHead(num_head,in_ch=in_ch)
        self.fc=FullyConnected(in_ch)
    def forward(self,x,mask):
        # x [b,long,emb]
        # 多头注意力
        x=self.mul(x,mask,cros_flag=0)
        # 前馈神经网络
        # x [b,long,emb]
        x=self.fc(x)
        return x

class Encoder(torch.nn.Module):
    def __init__(self,num_head,in_ch):
        super().__init__()
        self.layer_1 = EncoderLayer(num_head,in_ch)
        self.layer_2 = EncoderLayer(num_head,in_ch)
        self.layer_3 = EncoderLayer(num_head,in_ch)

    def forward(self, x, mask):
        x = self.layer_1(x, mask)
        x = self.layer_2(x, mask)
        x = self.layer_3(x, mask)
        return x


In [8]:
# 解码器层
class DecoderLayer(torch.nn.Module):
    def __init__(self,num_head,in_ch):
        super().__init__()

        self.mul1 = MultiHead(num_head,in_ch)
        self.mul2 = MultiHead(num_head,in_ch)

        self.fc = FullyConnected(in_ch)

    def forward(self, x, y, mask_pad_x, mask_tril_y):
        # 先计算y的自注意力,维度不变
        y = self.mul1(y, mask_tril_y, cros_flag=0)
        # 结合x和y的注意力计算,维度不变
        # [b, 50, 32],[b, 50, 32] -> [b, 50, 32]
        y = self.mul2(y, mask_pad_x, cros_flag=1,VX=x)
        y = self.fc(y)
        return y

class Decoder(torch.nn.Module):
    def __init__(self,num_head,in_ch):
        super().__init__()

        self.layer_1 = DecoderLayer(num_head,in_ch)
        self.layer_2 = DecoderLayer(num_head,in_ch)
        self.layer_3 = DecoderLayer(num_head,in_ch)

    def forward(self, x, y, mask_pad_x, mask_tril_y):
        y = self.layer_1(x, y, mask_pad_x, mask_tril_y)
        y = self.layer_2(x, y, mask_pad_x, mask_tril_y)
        y = self.layer_3(x, y, mask_pad_x, mask_tril_y)
        return y


In [9]:
class Transformer(torch.nn.Module):
    def __init__(self,voc_size,max_len,num_head,in_ch):
        super().__init__()
        # self.embed_x = PositionEmbedding()
        # self.embed_y = PositionEmbedding()
        self.embed_x = myPositionEmbedding(voc_size,max_len,in_ch)
        self.embed_y = myPositionEmbedding(voc_size,max_len,in_ch)
        self.encoder = Encoder(num_head,in_ch)
        self.decoder = Decoder(num_head,in_ch)
        self.fc_out = torch.nn.Linear(in_ch, 39)
    
    def forward(self,x,y):
        mask_x=mask_pad(x)
        mask_y=mask_tril(y)
        x=self.embed_x(x)
        y=self.embed_y(y)
        # x[b,50]->[b,50,32]
        x=self.encoder(x,mask_x)
        y=self.decoder(x,y,mask_x,mask_y)
        y=self.fc_out(y)
        return y
        

In [10]:
loader = torch.utils.data.DataLoader(dataset=Dataset(),
                                     batch_size=128,
                                     drop_last=True,
                                     shuffle=True,
                                     collate_fn=None)
model = Transformer(39,50,4,32)
loss_func = torch.nn.CrossEntropyLoss()
model.to(device)
loss_func.to(device)
optim = torch.optim.Adam(model.parameters(), lr=2e-3)
sched = torch.optim.lr_scheduler.StepLR(optim, step_size=3, gamma=0.5)

In [11]:
for epoch in range(10):
    for i,(x,y) in enumerate(loader):
        # x->[b,50]
        x=x.to(device)
        y=y.to(device)
        
        # 在训练时,是拿y的每一个字符输入,预测下一个字符,所以不需要最后一个字
        pred = model(x, y[:, :-1])
        # pred->[b,50,39]->[b*50,39]
        pred = pred.reshape(-1, 39)
        # y:[b, 51] -> [b*50]
        y = y[:, 1:].reshape(-1)

        # 忽略pad的数据
        select = y != zidian_y['<PAD>']
        pred = pred[select]
        y = y[select]

        loss = loss_func(pred, y)
        current_batch_loss = loss.item()
        optim.zero_grad()
        loss.backward()
        optim.step()

        # [select, 39] -> [select]
        if i%200==0:
            pred = pred.argmax(1)
            correct = (pred == y).sum().item()
            accuracy = correct / len(pred)
            lr = optim.param_groups[0]['lr']
            print(epoch, i, lr, loss.item(), accuracy)

    sched.step()

0 0 0.002 3.80072283744812 0.027413127413127413
0 200 0.002 3.208475351333618 0.11238489005825973
0 400 0.002 2.970705270767212 0.1779530588687957
0 600 0.002 1.875807762145996 0.4497254307896232
0 800 0.002 1.0439825057983398 0.6754167465031615
0 1000 0.002 0.7463297843933105 0.7639775585219578
0 1200 0.002 0.580879807472229 0.8214015151515152
0 1400 0.002 0.42304712533950806 0.8677717810331534
0 1600 0.002 0.33712196350097656 0.9029695479477965
0 1800 0.002 0.28465762734413147 0.9187358916478555
0 2000 0.002 0.1992313116788864 0.9419149334832303
0 2200 0.002 0.21679513156414032 0.9339215318190351
0 2400 0.002 0.14478090405464172 0.9580527227246488
0 2600 0.002 0.11044387519359589 0.9698588325066768
0 2800 0.002 0.1027359664440155 0.9681176021382207
0 3000 0.002 0.08668502420186996 0.9746954076850984
0 3200 0.002 0.0877983421087265 0.9735632183908046
0 3400 0.002 0.08615375310182571 0.9775708040296521
0 3600 0.002 0.08491507917642593 0.9769200930954228
0 3800 0.002 0.05847073346376419