In [1]:
from transformers import BertTokenizer,Trainer,TrainingArguments
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
import torch
from torch import nn
import torch.nn.functional as F
from torchvision.models.resnet import resnet152, resnet50, resnet18 # 导入 resnet-152
import torchvision
from torchvision import transforms, datasets
import pickle
import evaluate
import numpy as np
import matplotlib.pyplot as plt
import math
import random

In [3]:
class PositionalEncoding(nn.Module):
    "Implement the PE function."

    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # 初始化Shape为(max_len, d_model)的PE (positional encoding)
        pe = torch.zeros(max_len, d_model)
        # 初始化一个tensor [[0, 1, 2, 3, ...]]
        position = torch.arange(0, max_len).unsqueeze(1)
        # 这里就是sin和cos括号中的内容，通过e和ln进行了变换
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        # 计算PE(pos, 2i)
        pe[:, 0::2] = torch.sin(position * div_term)
        # 计算PE(pos, 2i+1)
        pe[:, 1::2] = torch.cos(position * div_term)
        # 为了方便计算，在最外面在unsqueeze出一个batch
        pe = pe.unsqueeze(0)
        # 如果一个参数不参与梯度下降，但又希望保存model的时候将其保存下来
        # 这个时候就可以用register_buffer
        self.register_buffer("pe", pe)

    def forward(self, x):
        """
        x 为embedding后的inputs，例如(1,7, 128)，batch size为1,7个单词，单词维度为128
        """
        # 将x和positional encoding相加。
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)


In [4]:
class CopyTaskModel(nn.Module):

    def __init__(self, d_model=128):
        super(CopyTaskModel, self).__init__()

        # 定义词向量，词典数为10。我们不预测两位小数。
        self.embedding = nn.Embedding(num_embeddings=10, embedding_dim=128)
        # 定义Transformer。超参是我拍脑袋想的
        self.transformer = nn.Transformer(d_model=128, num_encoder_layers=2, num_decoder_layers=2, dim_feedforward=512, batch_first=True)

        # 定义位置编码器
        self.positional_encoding = PositionalEncoding(d_model, dropout=0)

        # 定义最后的线性层，这里并没有用Softmax，因为没必要。
        # 因为后面的CrossEntropyLoss中自带了
        self.predictor = nn.Linear(128, 10)

    def forward(self, src, tgt):
        # 生成mask
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size()[-1])
        src_key_padding_mask = CopyTaskModel.get_key_padding_mask(src)
        tgt_key_padding_mask = CopyTaskModel.get_key_padding_mask(tgt)

        # 对src和tgt进行编码
        src = self.embedding(src)
        tgt = self.embedding(tgt)
        # 给src和tgt的token增加位置信息
        src = self.positional_encoding(src)
        tgt = self.positional_encoding(tgt)

        # 将准备好的数据送给transformer
        out = self.transformer(src, tgt,
                               tgt_mask=tgt_mask,
                               src_key_padding_mask=src_key_padding_mask,
                               tgt_key_padding_mask=tgt_key_padding_mask)

        """
        这里直接返回transformer的结果。因为训练和推理时的行为不一样，
        所以在该模型外再进行线性层的预测。
        """
        return out

    @staticmethod
    def get_key_padding_mask(tokens):
        """
        用于key_padding_mask
        """
        key_padding_mask = torch.zeros(tokens.size())
        key_padding_mask[tokens == 2] = -torch.inf
        return key_padding_mask


In [5]:
model = CopyTaskModel()

In [6]:
src = torch.LongTensor([[0, 3, 4, 5, 6, 1, 2, 2]])
tgt = torch.LongTensor([[3, 4, 5, 6, 1, 2, 2]])
out = model(src, tgt)
print(out.size())
print(out)

torch.Size([1, 7, 128])
tensor([[[-0.1443, -0.1420, -1.4241,  1.3634,  0.9349, -1.4224, -1.0949,
          -0.1085, -1.0228,  1.3970,  0.7386, -1.4319, -0.7540, -0.0611,
          -0.1000, -0.5736, -1.0333, -1.2741, -1.8638,  0.0470,  0.8020,
           1.5381,  0.2843, -1.2397, -0.5574, -0.0993, -0.2226,  1.0915,
          -0.6805,  1.2655,  0.8416,  0.6378, -0.0479,  1.4101, -0.4453,
           0.2572,  1.0721,  0.3767, -0.5386,  0.4034, -2.0918, -0.8804,
          -1.1549,  0.7348,  0.0442,  0.2122, -0.9396, -0.2617,  0.2342,
           0.8461, -0.1015, -0.4978,  0.1787, -1.2960, -1.0765,  1.4962,
           0.6281, -0.8321, -0.5948, -0.0889,  1.0555,  0.2710,  0.0245,
           0.7625, -0.4059, -0.3218,  0.1276,  1.6868,  0.1329,  2.0615,
          -1.0582,  1.0689,  1.6706,  0.7455,  0.7841, -0.6914,  0.4980,
           0.2294, -0.1563, -0.5104,  1.4300, -1.3512, -1.7642,  1.2600,
          -0.3690, -0.4674, -0.5708, -0.8825,  0.2682,  0.9221, -1.1555,
          -0.2634,  2.4917,

In [7]:
criteria = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)

In [8]:
def generate_random_batch(batch_size, max_length=16):
    src = []
    for i in range(batch_size):
        # 随机生成句子长度
        random_len = random.randint(1, max_length - 2)
        # 随机生成句子词汇，并在开头和结尾增加<bos>和<eos>
        random_nums = [0] + [random.randint(3, 9) for _ in range(random_len)] + [1]
        # 如果句子长度不足max_length，进行填充
        random_nums = random_nums + [2] * (max_length - random_len - 2)
        src.append(random_nums)
    src = torch.LongTensor(src)
    # tgt不要最后一个token
    tgt = src[:, :-1]
    # tgt_y不要第一个的token
    tgt_y = src[:, 1:]
    # 计算tgt_y，即要预测的有效token的数量
    n_tokens = (tgt_y != 2).sum()

    # 这里的n_tokens指的是我们要预测的tgt_y中有多少有效的token，后面计算loss要用
    return src, tgt, tgt_y, n_tokens


In [9]:
generate_random_batch(batch_size=2, max_length=6)

(tensor([[0, 4, 8, 9, 4, 1],
         [0, 9, 1, 2, 2, 2]]),
 tensor([[0, 4, 8, 9, 4],
         [0, 9, 1, 2, 2]]),
 tensor([[4, 8, 9, 4, 1],
         [9, 1, 2, 2, 2]]),
 tensor(7))

In [29]:
a = torch.tensor([1.0,2,3,4],requires_grad=True)
b = torch.tensor(2,requires_grad=False)
for i in range(10):
    c = F.cross_entropy(a, b)
    c.backward()
    a = a - a.grad * 0.01
    print(a)

tensor([0.9997, 1.9991, 3.0076, 3.9936], grad_fn=<SubBackward0>)


  a = a - a.grad * 0.01


TypeError: unsupported operand type(s) for *: 'NoneType' and 'float'

In [13]:
total_loss = 0

for step in range(2000):
    # 生成数据
    src, tgt, tgt_y, n_tokens = generate_random_batch(batch_size=2, max_length=max_length)

    # 清空梯度
    optimizer.zero_grad()
    # 进行transformer的计算
    out = model(src, tgt)
    # 将结果送给最后的线性层进行预测
    out = model.predictor(out)
    """
    计算损失。由于训练时我们的是对所有的输出都进行预测，所以需要对out进行reshape一下。
            我们的out的Shape为(batch_size, 词数, 词典大小)，view之后变为：
            (batch_size*词数, 词典大小)。
            而在这些预测结果中，我们只需要对非<pad>部分进行，所以需要进行正则化。也就是
            除以n_tokens。
    """
    loss = criteria(out.contiguous().view(-1, out.size(-1)), tgt_y.contiguous().view(-1)) / n_tokens
    # 计算梯度
    loss.backward()
    # 更新参数
    optimizer.step()

    total_loss += loss

    # 每40次打印一下loss
    if step != 0 and step % 40 == 0:
        print("Step {}, total_loss: {}".format(step, total_loss))
        total_loss = 0


Step 40, total_loss: 2.4486610889434814
Step 80, total_loss: 2.074915885925293
Step 120, total_loss: 1.9676580429077148
Step 160, total_loss: 1.878318428993225
Step 200, total_loss: 1.8532143831253052
Step 240, total_loss: 1.676632046699524
Step 280, total_loss: 1.4618895053863525
Step 320, total_loss: 1.3819327354431152
Step 360, total_loss: 1.289496660232544
Step 400, total_loss: 1.193058729171753
Step 440, total_loss: 1.043522834777832
Step 480, total_loss: 1.0972378253936768
Step 520, total_loss: 0.9340924024581909
Step 560, total_loss: 1.0643209218978882
Step 600, total_loss: 0.9257394075393677
Step 640, total_loss: 0.5943058729171753
Step 680, total_loss: 0.6407249569892883
Step 720, total_loss: 0.5495839715003967
Step 760, total_loss: 0.4921736717224121
Step 800, total_loss: 0.5572090148925781
Step 840, total_loss: 0.5156943202018738
Step 880, total_loss: 0.5976853966712952
Step 920, total_loss: 0.8885796070098877
Step 960, total_loss: 0.563235342502594
Step 1000, total_loss: 0.

In [14]:
model = model.eval()
# 随便定义一个src
src = torch.LongTensor([[0, 5, 3, 4, 6, 8, 9, 9, 8, 1, 2, 2]])
# tgt从<bos>开始，看看能不能重新输出src中的值
tgt = torch.LongTensor([[0]])

In [15]:
# 一个一个词预测，直到预测为<eos>，或者达到句子最大长度
for i in range(max_length):
    # 进行transformer计算
    out = model(src, tgt)
    # 预测结果，因为只需要看最后一个词，所以取`out[:, -1]`
    predict = model.predictor(out[:, -1])
    # 找出最大值的index
    y = torch.argmax(predict, dim=1)
    # 和之前的预测结果拼接到一起
    tgt = torch.concat([tgt, y.unsqueeze(0)], dim=1)

    # 如果为<eos>，说明预测结束，跳出循环
    if y == 1:
        break
print(tgt)

tensor([[0, 5, 3, 4, 6, 8, 9, 9, 8, 1]])
