In [1]:
!pip install -q torchdata==0.3.0 torchtext==0.12 spacy==3.2 altair GPUtil

In [2]:
!python -m spacy download de_core_news_sm
!python -m spacy download en_core_web_sm

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting de-core-news-sm==3.2.0
  Downloading https://github.com/explosion/spacy-models/releases/download/de_core_news_sm-3.2.0/de_core_news_sm-3.2.0-py3-none-any.whl (19.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.1/19.1 MB[0m [31m18.4 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('de_core_news_sm')
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting en-core-web-sm==3.2.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.2.0/en_core_web_sm-3.2.0-py3-none-any.whl (13.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.9/13.9 MB[0m [31m13.9 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy

In [3]:
import os
from os.path import exists
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax, pad
import math
import copy
import time
from torch.optim.lr_scheduler import LambdaLR
import pandas as pd
import altair as alt
from torchtext.data.functional import to_map_style_dataset
from torch.utils.data import DataLoader
from torchtext.vocab import build_vocab_from_iterator
import torchtext.datasets as datasets
import spacy
import GPUtil
import warnings
from torch.utils.data.distributed import DistributedSampler
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

In [4]:
class EncoderDecoder(nn.Module):
    """
    一个标准的解码器，编码器模型
    """

    def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.generator = generator

    def forward(self, src, tgt, src_mask, tgt_mask):
        "接收处理屏蔽的src和目标序列"
        return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)

    def encode(self, src, src_mask):
        return self.encoder(self.src_embed(src), src_mask)

    def decode(self, memory, src_mask, tgt, tgt_mask):
        return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)

class Generator(nn.Module):
    """
    定义一个标准的线性+softmax生成步骤。
    说人话，这个是用来接受最后的decode的结果，并且返回词典中每个词的概率
    """
    def __init__(self, d_model, vocab):
        super().__init__()
        self.proj = nn.Linear(d_model, vocab)

    def forward(self, x):
        "为什么用log_softmax，主要是梯度和计算速度的考虑，可以百度一下，资料很多"
        return log_softmax(self.proj(x), dim=-1)

In [5]:
class LayerNorm(nn.Module):
    def __init__(self, features, eps = 1e-5):
        super().__init__()
        # self.norm = nn.LayerNorm()
        # Add into parameters of modules
        self.gamma = nn.Parameter(torch.ones(features))
        self.beta = nn.Parameter(torch.zeros(features))
        self.eps = eps
    def forward(self, x):
        mean = x.mean(-1, keepdim = True) 
        # std = x.std(-1, keepdim = True)
        return self.gamma * (x - mean) / torch.sqrt(torch.var(x, unbiased = False) + eps) + self.beta


In [6]:
class AddNorm(nn.Module):
    def __init__(self, size, dropout):
        super().__init__()
        # self.norm = nn.LayerNorm()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

In [7]:
def attention(query, key, value, mask = None, dropout = None):
    # Choose the last dim of query
    d_k = query.size(-1)
    # Convert the last 2 dims to get K_t
    scores = torch.matmul(query, key.transpose(-1, -2)) / torch.sqrt(d_k)
    if mask is not None:
    # Mask the pos with -1e9 where the status is True
        scores = scores.masked_fill_(mask == 0, -1e9)
    p_attn = scores.softmax(dim=-1)
    if dropout is not None:
        p_attn = dropout(p_attn)
    return torch.matmul(p_attn, value), p_attn

In [8]:
class Encoder(nn.Module):
    def __init__(self, layer, N):
        super().__init__()
        self.layers = nn.ModuleList([copy.deepcopy(layer) for _ in range(N)])
        # layer
        self.norm = LayerNorm(layer.size)
    def foward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)


In [9]:
class EncoderLayer(nn.Module):
    def __init__(self, size, self_attention, feed_forward, dropout):
        super().__init__()
        self.size = size
        self.self_attention = self_attention
        self.feed_forward = feed_forward
        self.layer1 = AddNorm(size, dropout)
        self.layer2 = AddNorm(size, dropout)
    def forward(self, x):
        x = self.layer1(x, lambda x: self.self_attention(x, x, x, mask))
        return self.layer2(x, self.feed_forward)


In [10]:
class Decoder(nn.Module):
    def __init__(self, layer, N):
        super().__init__()
        self.layers = nn.ModuleList([copy.deepcopy(layer) for _ in range(N)])
        # layer
        self.norm = LayerNorm(layer.size)
    def forward(self, x, memory, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        return self.norm(x)


In [11]:
class DecoderLayer(nn.Module):
    def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
        super().__init__()
        self.size = size
        self.self_attention = self_attn
        self.cross_attention = src_attn
        self.feed_forward = feed_forward
        self.layer1 = AddNorm(size, dropout)
        self.layer2 = AddNorm(size, dropout)
        self.layer3 = AddNorm(size, dropout)
    def forward(self, x, memory, src_mask, tgt_mask):
        x = self.layer1(x, lambda x: self.self_attention(x, x, x, mask))
        x = self.layer2(x, lambda x: self.cross_attention(x, m, m, src_mask))
        return self.layer3(x, self.feed_forward)



In [12]:
def subsequent_mask(size):
    #"屏蔽后面的位置"
    attn_shape = (1, size, size)
    subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(
        torch.uint8
    )
    return subsequent_mask == 0

In [13]:
def example_mask():
    # 第一眼看这个嵌套循环给看懵了。其实就是用两个for循环生成了一个二维坐标，每一个都是一个df对象
    # 看下面这个就好理解了
    # 其实:=[(x,y) for y in range(20) for x in range(20)]

    LS_data = pd.concat(
        [
            pd.DataFrame(
                {
                    "Subsequent Mask": subsequent_mask(20)[0][x, y].flatten(),
                    "Window": y,
                    "Masking": x,
                }
            )
            for y in range(20)
            for x in range(20)
        ]
    )
    return (
        alt.Chart(LS_data)
        .mark_rect()
        .properties(height=250, width=250)
        .encode(
            alt.X("Window:O"),
            alt.Y("Masking:O"),
            alt.Color("Subsequent Mask:Q", scale=alt.Scale(scheme="viridis")),
        )
        .interactive()
    )

In [14]:
class MultiHeadAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        super().__init__()
        self.d_k = d_model // h
        self.h = h
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.attn = attention
        self.dropout = nn.Dropout(p=dropout)
    def forward(self, query, key, value, mask=None):
        if mask is not None:
            # Use the same mask for every head
            mask = mask.unsqueeze(1)
        nbatches = query.size(0)
        # Adjust the output size to simulate heads
        # 这里坐了一个等价变换，通过改变输出的形状模拟了多个输出head
        query = self.w_q(query).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
        key = self.w_k(key).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
        value = self.w_v(value).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
        x, p_atten = self.attn(query, key, value, mask = mask, dropout=self.dropout)
        # Concatenate heads into one head
        x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k)
        return self.w_o(x)


In [15]:
class PositionalWisedFFN(nn.Module):
  # 前馈神经网络
    def __init__(self, d_model, hidden_size, dropout=0.1):
        super().__init__()
        self.w1 = nn.Linear(d_model, hidden_size)
        self.w2 = nn.Linear(hidden_size, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        return self.w2(self.dropout(self.relu(self.w1(x))))


In [16]:
class Embeddings(nn.Module):
    def __init__(self, d_model, vocab):
        super().__init__()
        self.emd = nn.Embedding(vocab, d_model)
        self.d_model = d_model

    def forward(self, x):
        return self.emd(x) * math.sqrt(self.d_model)

In [17]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # 在对数空间中计算位置编码。
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        # 为不属于模型参数的状态增加缓冲
        self.register_buffer("pe", pe)

    def forward(self, x):
        # requires_grad_(False)：禁用梯度下降
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)

In [18]:
def example_positional():
    pe = PositionalEncoding(20, 0)
    y = pe.forward(torch.zeros(1, 100, 20))

    data = pd.concat(
        [
            pd.DataFrame(
                {
                    "embedding": y[0, :, dim],
                    "dimension": dim,
                    "position": list(range(100)),
                }
            )
            for dim in [4, 5, 6, 7]
        ]
    )

    return (
        alt.Chart(data)
        .mark_line()
        .properties(width=800)
        .encode(x="position", y="embedding", color="dimension:N")
        .interactive()
    )




In [19]:
RUN_EXAMPLES = True
def show_example(fn, args=[]):
    if __name__ == "__main__" and RUN_EXAMPLES:
        return fn(*args)


def execute_example(fn, args=[]):
    if __name__ == "__main__" and RUN_EXAMPLES:
        fn(*args)

In [20]:
show_example(example_positional)

In [21]:
def make_model(src_vocab, tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):
    #"帮助: 从超参数中构建一个模型"
    c = copy.deepcopy
    attn = MultiHeadAttention(h, d_model)
    ff = PositionalWisedFFN(d_model, d_ff, dropout)
    position = PositionalEncoding(d_model, dropout)
    model = EncoderDecoder(
        Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
        Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),
        nn.Sequential(Embeddings(d_model, src_vocab), c(position)),
        nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),
        Generator(d_model, tgt_vocab),
    )

    # 这里很重要
    # 用Glorot/fan_avg初始化参数。
    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)
    return model

In [22]:
class Batch:
    """训练期间用于保存一批带掩码的数据的对象"""

    def __init__(self, src, tgt=None, pad=2):  # 2 = <blank>
        self.src = src
        # 关于下面这一段：
        # (torch.tensor([[ 0, 2, 4, 5, 1, 0, 2 ]]) != 2).unsqueeze(-2)
        # print：tensor([[[ True, False,  True,  True,  True,  True, False]]])
        # 实际是把2元素打上掩码。
        self.src_mask = (src != pad).unsqueeze(-2)
        if tgt is not None:
            # 下面两个分别去掉句子的开始符和结束符，这两个符合不参与运算
            self.tgt = tgt[:, :-1]
            self.tgt_y = tgt[:, 1:]
            # 把输入指定位置的下三角掩码
            self.tgt_mask = self.make_std_mask(self.tgt, pad)
            self.ntokens = (self.tgt_y != pad).data.sum()

    @staticmethod
    def make_std_mask(tgt, pad):
        "创建一个掩码来隐藏并填充未来的word"
        # 和src一样，需要把<blank>符合盖住
        tgt_mask = (tgt != pad).unsqueeze(-2)
        # 取&操作
        tgt_mask = tgt_mask & subsequent_mask(tgt.size(-1)).type_as(
            tgt_mask.data
        )
        return tgt_mask

In [23]:
class TrainState:
    """用来跟踪当前训练的情况，包括步数，梯度步数，样本使用数和已经处理的tokens数量"""

    step: int = 0  # Steps in the current epoch
    accum_step: int = 0  # Number of gradient accumulation steps
    samples: int = 0  # total # of examples used
    tokens: int = 0  # total # of tokens processed

In [24]:
def run_epoch(
    data_iter,
    model,
    loss_compute,
    optimizer,
    scheduler,
    mode="train",
    accum_iter=1,
    train_state=TrainState(),
):
    """一个训练epoch
    data_iter: 可迭代对象，一次返回一个Batch对象或者加上索引
    model:训练的模型，这里就是Transformer
    loss_compute: SimpleLossCompute对象，用于计算损失
    optimizer: 优化器。这里是Adam优化器。验证时，optimizer是DummyOptimizer。DummyOptimizer不会真的更新模型参数，主要用于不同优化器效果的对比。
    scheduler：执行控制器。scheduler是一种用于调整优化器学习率的工具。 它可以帮助我们在训练过程中根据指定的策略调整学习率，
      以提高模型的性能这里是LambdaLR对象，用于调整Adam的学习率，实现WarmUp；验证时，scheduler是DummyScheduler。
    accum_iter: 每迭代n个batch更新一次模型的参数。这里默认n=1，就是每次batch都更新参数。
    train_state: TrainState对象，用于保存前训练的情况
    """
    start = time.time()
    total_tokens = 0
    total_loss = 0
    tokens = 0
    n_accum = 0
    for i, batch in enumerate(data_iter):
        # 注意这里的out是decoder输出的结果，这会还没有经过最后一层linear+softmax
        out = model.forward(
            batch.src, batch.tgt, batch.src_mask, batch.tgt_mask
        )
        # 这里才传入out和训练目标tgt_y计算了loss和loss_node。loss_node返回的是正则化的损失；
        # loss用来计算损失，loss_node用来梯度下降更新参数
        loss, loss_node = loss_compute(out, batch.tgt_y, batch.ntokens)
        # loss_node = loss_node / accum_iter
        # 只有在train或者train+log的模式才开启参数更新
        if mode == "train" or mode == "train+log":
            # 先通过backward计算出来梯度
            loss_node.backward()
            train_state.step += 1
            train_state.samples += batch.src.shape[0]
            train_state.tokens += batch.ntokens
            if i % accum_iter == 0:
                # 调用依次梯度下降
                optimizer.step()
                optimizer.zero_grad(set_to_none=True)
                n_accum += 1
                train_state.accum_step += 1
            # 我们在备注里提到过,scheduler的作用就是用来优化学习，控制学习率等超参数。这里调用step就是更新学习率相关的参数
            scheduler.step()

        total_loss += loss
        total_tokens += batch.ntokens
        tokens += batch.ntokens
        
        # 下面是每40个epoch打印下相关日志
        if i % 40 == 1 and (mode == "train" or mode == "train+log"):
            # 学习率
            lr = optimizer.param_groups[0]["lr"]
            # 40个epoch花费的时间
            elapsed = time.time() - start
            print(
                (
                    "Epoch Step: %6d | Accumulation Step: %3d | Loss: %6.2f "
                    + "| Tokens / Sec: %7.1f | Learning Rate: %6.1e"
                )
                % (i, n_accum, loss / batch.ntokens, tokens / elapsed, lr)
            )
            start = time.time()
            tokens = 0
        del loss
        del loss_node
    return total_loss / total_tokens, train_state

In [25]:
def rate(step, model_size, factor, warmup):
    """
    我们必须将LambdaLR函数的最小步数默认为1。以避免零点导致的负数学习率。
    """
    if step == 0:
        step = 1
    return factor * (
        model_size ** (-0.5) * min(step ** (-0.5), step * warmup ** (-1.5))
    )

In [26]:
def example_learning_schedule():
    opts = [
        [512, 1, 4000],  # example 1
        [512, 1, 8000],  # example 2
        [256, 1, 4000],  # example 3
    ]

    dummy_model = torch.nn.Linear(1, 1)
    learning_rates = []

    # 在我们的配置列表里有三个不同的案例.
    for idx, example in enumerate(opts):
        # 创建一个Adam优化器
        optimizer = torch.optim.Adam(
            dummy_model.parameters(), lr=1, betas=(0.9, 0.98), eps=1e-9
        )
        # 这里创建一个神经网络学习调度器，优化器就是刚才创建的Adam优化器，lr(学习率)的调整函数就是上面的rate函数，根据当前的step来调整。
        lr_scheduler = LambdaLR(
            optimizer=optimizer, lr_lambda=lambda step: rate(step, *example)
        )
        tmp = []
        # 采取20K假训练步骤，保存每一步的学习率
        for step in range(20000):
            # 把当前的学习率追加到等会输出的list
            tmp.append(optimizer.param_groups[0]["lr"])
            # optimizer.step()是更新网络的参数
            optimizer.step()
            # scheduler.step()是更新学习率等控制网络学习的参数
            lr_scheduler.step()
        learning_rates.append(tmp)

    learning_rates = torch.tensor(learning_rates)

    # 关掉最大限制，使得altair能够处理超过5000行的数据
    alt.data_transformers.disable_max_rows()

    opts_data = pd.concat(
        [
            pd.DataFrame(
                {
                    "Learning Rate": learning_rates[warmup_idx, :],
                    "model_size:warmup": ["512:4000", "512:8000", "256:4000"][
                        warmup_idx
                    ],
                    "step": range(20000),
                }
            )
            for warmup_idx in [0, 1, 2]
        ]
    )

    return (
        alt.Chart(opts_data)
        .mark_line()
        .properties(width=600)
        .encode(x="step", y="Learning Rate", color="model_size:warmup:N")
        .interactive()
    )


In [27]:
class LabelSmoothing(nn.Module):
    #"实现标签平滑."

    def __init__(self, size, padding_idx, smoothing=0.0):
        super(LabelSmoothing, self).__init__()
        # 定义一个KL散度loss网络，损失的计算方式是sum，求和
        self.criterion = nn.KLDivLoss(reduction="sum")
        self.padding_idx = padding_idx
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.size = size
        self.true_dist = None

    def forward(self, x, target):
        assert x.size(1) == self.size
        # 克隆一份作为真实分布
        true_dist = x.data.clone()
        # 用smoothing/(self.size - 2) 填充
        true_dist.fill_(self.smoothing / (self.size - 2))
        # 用confidence填充指定位置的数据，scatter_用法参考[8]
        true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        true_dist[:, self.padding_idx] = 0
        mask = torch.nonzero(target.data == self.padding_idx)
        if mask.dim() > 0:
            true_dist.index_fill_(0, mask.squeeze(), 0.0)
        self.true_dist = true_dist
        # detach，把变量从计算图分离，可参考 https://zhuanlan.zhihu.com/p/389738863
        return self.criterion(x, true_dist.clone().detach())

In [28]:
class SimpleLossCompute:
    "一个简单的损失计算和训练函数"

    def __init__(self, generator, criterion):
        """
        generator: Generator对象，用于根据Decoder的输出预测token
        criterion: LabelSmoothing对象，用于对Label进行平滑处理和损失计算
        """
        self.generator = generator
        self.criterion = criterion

    def __call__(self, x, y, norm):
        # 这里顺便用最简单的例子展示了smoothing的用法
        # 先把decoder的x输入generator，得到预测的x
        # 然后把x和预测的y传入，criterion会对y做平滑处理，需要注意的是:
        # 这里传入的y展开成了一个一阶张量，即向量，因为在criterion内部会对它打包，会为每个单词生成一个概率向量
        x = self.generator(x)
        sloss = (
            self.criterion(
                x.contiguous().view(-1, x.size(-1)), y.contiguous().view(-1)
            )
            / norm
        )
        
        # 这里又在搞事情，相当于第一个没有norm,第二个sloss是norm版本的，除以的是一个常量,batch.ntokens
        return sloss.data * norm, sloss

In [29]:
import heapq

class Beam:
    def __init__(self, beam_width):
        self.heap = list()  # 存储各个beam search的结果
        self.beam_width = beam_width  # beam的数量

    def add(self, probility, complete, seq, decoder_input, decoder_hidden):
        """
        添加数据，同时判断总的数据个数，多则删除
        :param probility: 概率乘积
        :param complete: 最后一个是否为EOS
        :param seq: list，所有token的列表
        :param decoder_input: 下一次进行解码的输入，通过前一次获得
        :param decoder_hidden: 下一次进行解码的hidden，通过前一次获得
        :return:
        """
        heapq.heappush(self.heap, [probility, complete,
                                   seq, decoder_input, decoder_hidden])
        # 判断数据的个数，如果大，则弹出。保证数据总个数小于等于beam_width
        if len(self.heap) > self.beam_width:
            heapq.heappop(self.heap)

    def __iter__(self):  # 让该beam能够被迭代
        return iter(self.heap)


def beam_decode(model, src, src_mask, max_len, start_symbol, BEAM_SIZE):
    model.eval()
    beam_seq = Beam(BEAM_SIZE)
    # 构造第一次需要的输入数据，保存在堆中
    memory = model.encode(src, src_mask)
    ys = torch.zeros(1, 1).fill_(start_symbol).type_as(src.data)
    out = model.decode(
        memory, src_mask, ys, subsequent_mask(ys.size(1)).type_as(src.data)
    )
    beam_seq.add(1, False, [ys], ys, out)
    while True:
        cur_beam = Beam(BEAM_SIZE)
        # 取出堆中的数据，进行forward_step的操作，获得当前时间步的output，hidden
        for _probility, _complete, _seq, _decoder_input, _decoder_hidden in beam_seq:
            # 判断前一次的_complete是否为True，如果是，则不需要forward
            if _complete == True:
                cur_beam.add(_probility, _complete, _seq,
                             _decoder_input, _decoder_hidden)
            else:
                decoder_hidden = model.decode(
                    memory, src_mask, _decoder_input, subsequent_mask(ys.size(1)).type_as(src.data))
                decoder_output_t = model.generator(decoder_hidden[:, -1])
                value, index = torch.topk(decoder_output_t, BEAM_SIZE)
                # 从output中选择topk（k=beam width）个输出，作为下一次的input
                for m, n in zip(value, index):
                    decoder_input = torch.LongTensor([[n[0]]])
                    decoder_input = torch.cat(
                        [_decoder_input, decoder_input], dim=1)
                    seq = _seq + [n[0]]
                    probility = _probility * m[0]
                    # probility = _probility + m
                    if n[0].item() == 1:  # index of </s>
                        complete = True
                    else:
                        complete = False
                    cur_beam.add(probility, complete, seq,
                                 decoder_input, decoder_hidden)
        # 获取新的堆中的优先级最高（概率最大）的数据，判断数据是否是EOS结尾或者是否达到最大长度，如果是，停止迭代
        best_prob, best_complete, best_seq, _, _ = max(cur_beam)
        if best_complete == True or len(best_seq) - 1 == max_len:  # 减去</s>
            seq = [i.item() for i in best_seq]
            return seq
            # return best_seq
        else:
            # 重新遍历新的堆中的数据
            beam_seq = cur_beam


In [30]:
def load_tokenizers():

    try:
        spacy_de = spacy.load("de_core_news_sm")
    except IOError:
        os.system("python -m spacy download de_core_news_sm")
        spacy_de = spacy.load("de_core_news_sm")

    try:
        spacy_en = spacy.load("en_core_web_sm")
    except IOError:
        os.system("python -m spacy download en_core_web_sm")
        spacy_en = spacy.load("en_core_web_sm")

    return spacy_de, spacy_en

In [31]:
def tokenize(text, tokenizer):
    """对text进行分词
    :param text: 要分词的文本，例如'I love you'
    :param tokenizer: 分词模型，例如：spacy_en
    :return: 分词结果，例如 ["I", "love", "you"]
    """
    return [tok.text for tok in tokenizer.tokenizer(text)]


def yield_tokens(data_iter, tokenizer, index):
    """yield一个Token List
    :param data_iter: 包含句子对的可迭代对象。例如：[("I love you", "我爱你"), ...]
    :param tokenizer: 分词模型。例如spacy_en
    :param index: 要对句子对儿的哪个语言进行分词，例如0表示对上例的英文进行分词
    :return: yield本轮的分词结果，例如['I', 'love', 'you']
    """
    for from_to_tuple in data_iter:
        yield tokenizer(from_to_tuple[index])

In [32]:
def build_vocabulary(spacy_de, spacy_en):
    """
    构建德语词典和英语词典
    :return: 返回德语词典和英语词典，均为：Vocab对象
             Vocab对象官方地址为：https://pytorch.org/text/stable/vocab.html#vocab
    """
    def tokenize_de(text):
        return tokenize(text, spacy_de)

    def tokenize_en(text):
        return tokenize(text, spacy_en)
    
    # [9]build_vocab_from_iterator()函数
    print("Building German Vocabulary ...")
    train, val, test = datasets.Multi30k(language_pair=("de", "en"))
    vocab_src = build_vocab_from_iterator(
        yield_tokens(train + val + test, tokenize_de, index=0),
        min_freq=2,
        specials=["<s>", "</s>", "<blank>", "<unk>"],
    )

    print("Building English Vocabulary ...")
    train, val, test = datasets.Multi30k(language_pair=("de", "en"))
    vocab_tgt = build_vocab_from_iterator(
        yield_tokens(train + val + test, tokenize_en, index=1),
        min_freq=2,
        specials=["<s>", "</s>", "<blank>", "<unk>"],
    )

    vocab_src.set_default_index(vocab_src["<unk>"])
    vocab_tgt.set_default_index(vocab_tgt["<unk>"])

    return vocab_src, vocab_tgt


def load_vocab(spacy_de, spacy_en):
    if not exists("vocab.pt"):
        vocab_src, vocab_tgt = build_vocabulary(spacy_de, spacy_en)
        torch.save((vocab_src, vocab_tgt), "vocab.pt")
    else:
        vocab_src, vocab_tgt = torch.load("vocab.pt")
    print("Finished.\nVocabulary sizes:")
    print(len(vocab_src))
    print(len(vocab_tgt))
    return vocab_src, vocab_tgt


# if is_interactive_notebook():
    # 均成为脚本之后的全局变量
spacy_de, spacy_en = show_example(load_tokenizers)
vocab_src, vocab_tgt = show_example(load_vocab, args=[spacy_de, spacy_en])

Finished.
Vocabulary sizes:
8315
6384


In [33]:
class DummyOptimizer(torch.optim.Optimizer):
    def __init__(self):
        self.param_groups = [{"lr": 0}]
        None

    def step(self):
        None

    def zero_grad(self, set_to_none=False):
        None


class DummyScheduler:
    def step(self):
        None

In [34]:
def collate_batch(
    batch,
    src_pipeline,
    tgt_pipeline,
    src_vocab,
    tgt_vocab,
    device,
    max_padding=128,
    pad_id=2,
):
    """返回真正的训练批次张量，并且文本被['i', 'love', 'you']处理成[3,4,5]数字张量。
    :param batch: 一个batch的语句对。例如：
                  [('Ein Kleinkind ...', 'A toddler in ...'), # [(德语), (英语)
                   ....                                       # ...
                   ...]                                       # ... ]
    :param src_pipeline: 德语分词器，也就是tokenize_de方法，后面会定义其实就是对spacy_de的封装
    :param tgt_pipeline: 英语分词器，也就是tokenize_en方法
    :param src_vocab: 德语词典，Vocab对象
    :param tgt_vocab: 英语词典，Vocab对象
    :param device: cpu或cuda
    :param max_padding: 句子的长度。pad长度不足的句子和裁剪长度过长的句子，目的是让不同长度的句子可以组成一个tensor
    :param pad_id: '<blank>'在词典中对应的index
    :return: src和tgt。处理后并batch后的句子。例如：
             src为：[[0, 4354, 314, ..., 1, 2, 2, ..., 2],  [0, 4905, 8567, ..., 1, 2, 2, ..., 2]]
             其中0是<bos>, 1是<eos>, 2是<blank>；src的Shape为(batch_size, max_padding)；tgt同理。
    """
    bs_id = torch.tensor([0], device=device)  # <s> token id
    eos_id = torch.tensor([1], device=device)  # </s> token id
    src_list, tgt_list = [], []
    for (_src, _tgt) in batch:
        processed_src = torch.cat(
            [
                bs_id,
                torch.tensor(
                    src_vocab(src_pipeline(_src)),
                    dtype=torch.int64,
                    device=device,
                ),
                eos_id,
            ],
            0,
        )
        processed_tgt = torch.cat(
            [
                bs_id,
                torch.tensor(
                    tgt_vocab(tgt_pipeline(_tgt)),
                    dtype=torch.int64,
                    device=device,
                ),
                eos_id,
            ],
            0,
        )
        src_list.append(
            # 警告 - 覆盖padding - len的负值的值
            pad(
                processed_src,
                (
                    0,
                    max_padding - len(processed_src),
                ),
                value=pad_id,
            )
        )
        tgt_list.append(
            pad(
                processed_tgt,
                (0, max_padding - len(processed_tgt)),
                value=pad_id,
            )
        )

    src = torch.stack(src_list)
    tgt = torch.stack(tgt_list)
    return (src, tgt)

In [35]:
def create_dataloaders(
    device,
    vocab_src,
    vocab_tgt,
    spacy_de,
    spacy_en,
    batch_size=12000,
    max_padding=128,
    is_distributed=False,
):
    """创建一个dataloaders，实际上返回了两个，一个训练集，一个验证集的
    :param device: cpu或cuda
    :param vocab_src: 源词典，本例中为德语词典
    :param vocab_tgt: 目标词典，本例中为英语词典
    :param spacy_de: 德语分词器
    :param spacy_en: 英语分词器
    :param batch_size: 每个批次的样本量
    :param max_padding: 句子的最大长度。也是需要填充的长度。
    
    :return: 训练集dataloaders，验证集dataloaders
    """
    # def create_dataloaders(batch_size=12000):
    def tokenize_de(text):
        return tokenize(text, spacy_de)

    def tokenize_en(text):
        return tokenize(text, spacy_en)

    def collate_fn(batch):
        return collate_batch(
            batch,
            tokenize_de,
            tokenize_en,
            vocab_src,
            vocab_tgt,
            device,
            max_padding=max_padding,
            pad_id=vocab_src.get_stoi()["<blank>"],
        )

    train_iter, valid_iter, test_iter = datasets.Multi30k(
        language_pair=("de", "en")
    )

    train_iter_map = to_map_style_dataset(
        train_iter
    )  # DistributedSampler needs a dataset len()
    # 这里对sampler开启了分布式采样
    train_sampler = (
        DistributedSampler(train_iter_map) if is_distributed else None
    )
    valid_iter_map = to_map_style_dataset(valid_iter)
    valid_sampler = (
        DistributedSampler(valid_iter_map) if is_distributed else None
    )
    # [11]torch.utils.data.Dataloader()
    train_dataloader = DataLoader(
        train_iter_map,
        batch_size=batch_size,
        shuffle=(train_sampler is None),
        sampler=train_sampler,
        collate_fn=collate_fn,
    )
    valid_dataloader = DataLoader(
        valid_iter_map,
        batch_size=batch_size,
        shuffle=(valid_sampler is None),
        sampler=valid_sampler,
        collate_fn=collate_fn,
    )
    return train_dataloader, valid_dataloader

In [36]:
def train_worker(
    gpu,
    ngpus_per_node,
    vocab_src,
    vocab_tgt,
    spacy_de,
    spacy_en,
    config,
    is_distributed=False,
):
    print(f"Train worker process using GPU: {gpu} for training", flush=True)
    torch.cuda.set_device(gpu)

    pad_idx = vocab_tgt["<blank>"]
    d_model = 512
    model = make_model(len(vocab_src), len(vocab_tgt), N=6)
    model.cuda(gpu)
    module = model
    is_main_process = True
    if is_distributed:
        # 具体参考Q&A[10]
        dist.init_process_group(
            "nccl", init_method="env://", rank=gpu, world_size=ngpus_per_node
        )
        # DDP->torch.nn.parallel.DistributedDataParallel:
        # 用于在分布式训练环境中在多个GPU或机器上并行训练一个模型。
        # 它的工作原理是在每个GPU或机器上复制模型，并在训练期间使用通信后端来同步模型的梯度和缓冲区。
        model = DDP(model, device_ids=[gpu])
        module = model.module
        is_main_process = gpu == 0
    
    # 创建一个标签平滑处理模型
    criterion = LabelSmoothing(
        size=len(vocab_tgt), padding_idx=pad_idx, smoothing=0.1
    )
    # 使用指定的GPU
    criterion.cuda(gpu)
    # 创建Dataloaders
    train_dataloader, valid_dataloader = create_dataloaders(
        gpu,
        vocab_src,
        vocab_tgt,
        spacy_de,
        spacy_en,
        batch_size=config["batch_size"] // ngpus_per_node,
        max_padding=config["max_padding"],
        is_distributed=is_distributed,
    )

    optimizer = torch.optim.Adam(
        model.parameters(), lr=config["base_lr"], betas=(0.9, 0.98), eps=1e-9
    )
    lr_scheduler = LambdaLR(
        optimizer=optimizer,
        lr_lambda=lambda step: rate(
            step, d_model, factor=1, warmup=config["warmup"]
        ),
    )
    train_state = TrainState()

    for epoch in range(config["num_epochs"]):
        if is_distributed:
            train_dataloader.sampler.set_epoch(epoch)
            valid_dataloader.sampler.set_epoch(epoch)

        model.train()
        print(f"[GPU{gpu}] Epoch {epoch} Training ====", flush=True)
        _, train_state = run_epoch(
            (Batch(b[0], b[1], pad_idx) for b in train_dataloader),
            model,
            SimpleLossCompute(module.generator, criterion),
            optimizer,
            lr_scheduler,
            mode="train+log",
            accum_iter=config["accum_iter"],
            train_state=train_state,
        )

        GPUtil.showUtilization()
        if is_main_process:
            file_path = "%s%.2d.pt" % (config["file_prefix"], epoch)
            torch.save(module.state_dict(), file_path)
        torch.cuda.empty_cache()

        print(f"[GPU{gpu}] Epoch {epoch} Validation ====", flush=True)
        model.eval()
        sloss = run_epoch(
            (Batch(b[0], b[1], pad_idx) for b in valid_dataloader),
            model,
            SimpleLossCompute(module.generator, criterion),
            DummyOptimizer(),
            DummyScheduler(),
            mode="eval",
        )
        print(sloss)
        torch.cuda.empty_cache()

    if is_main_process:
        file_path = "%sfinal.pt" % config["file_prefix"]
        torch.save(module.state_dict(), file_path)

In [37]:
def train_distributed_model(vocab_src, vocab_tgt, spacy_de, spacy_en, config):
    """分布式GPU训练的主入口
    """
    # 译者把源代码下面这一段屏蔽了，train_worker的定义就在上面一个Block，不需要导入了。
    # from the_annotated_transformer import train_worker

    # mp: 就是import torch.multiprocessing as mp中的多进程启动
    # train_worker：每个训练的入口
    # nprocs：需要启动的进程数
    # args: 每个进程都会收到的参数
    ngpus = torch.cuda.device_count()
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12356"
    print(f"Number of GPUs detected: {ngpus}")
    print("Spawning training processes ...")
    # spawn从头构建一个子进程，父进程的数据等拷贝到子进程空间内，拥有自己的Python解释器，
    # 所以需要重新加载一遍父进程的包，因此启动较慢，由于数据都是自己的，安全性较高
    # 可参考：https://stackoverflow.com/questions/64095876/multiprocessing-fork-vs-spawn
    # spawn会将train_worker以(i, args)形式调用，i是master为每个子进程自动分配，所以不用传。
    mp.spawn(
        train_worker,
        nprocs=ngpus,
        args=(ngpus, vocab_src, vocab_tgt, spacy_de, spacy_en, config, True),
    )


def train_model(vocab_src, vocab_tgt, spacy_de, spacy_en, config):
    # distributed是用来控制是否分布式训练，如果开启分布式训练，就走第一个if
    if config["distributed"]:
        train_distributed_model(
            vocab_src, vocab_tgt, spacy_de, spacy_en, config
        )
    # 不启用分布式就直接调用train_worker
    else:
        train_worker(
            0, 1, vocab_src, vocab_tgt, spacy_de, spacy_en, config, False
        )


def load_trained_model():
    config = {
        "batch_size": 32,
        "distributed": False, # 这个参数控制是否启动分布式训练
        "num_epochs": 8,
        "accum_iter": 10,
        "base_lr": 1.0,
        "max_padding": 72,
        "warmup": 3000,
        "file_prefix": "multi30k_model_",
    }
    # model保存的位置
    model_path = "multi30k_model_final.pt"
    
    # 如果模型不存在，则调用train_model先训练。这里传入的前四个参数都是"准备数据"章节的全局变量。
    if not exists(model_path):
        train_model(vocab_src, vocab_tgt, spacy_de, spacy_en, config)

    # 使用词典创建模型
    model = make_model(len(vocab_src), len(vocab_tgt), N=6)
    # 加载模型参数
    model.load_state_dict(torch.load("multi30k_model_final.pt"))
    return model


model = load_trained_model()

Train worker process using GPU: 0 for training




[GPU0] Epoch 0 Training ====


NotImplementedError: ignored

In [None]:
def check_outputs(
    valid_dataloader,
    model,
    vocab_src,
    vocab_tgt,
    n_examples=15,
    pad_idx=2,
    eos_string="</s>",
):
    results = [()] * n_examples
    for idx in range(n_examples):
        print("\nExample %d ========\n" % idx)
        b = next(iter(valid_dataloader))
        rb = Batch(b[0], b[1], pad_idx)
        greedy_decode(model, rb.src, rb.src_mask, 64, 0)[0]

        src_tokens = [
            vocab_src.get_itos()[x] for x in rb.src[0] if x != pad_idx
        ]
        tgt_tokens = [
            vocab_tgt.get_itos()[x] for x in rb.tgt[0] if x != pad_idx
        ]

        print(
            "Source Text (Input)        : "
            + " ".join(src_tokens).replace("\n", "")
        )
        print(
            "Target Text (Ground Truth) : "
            + " ".join(tgt_tokens).replace("\n", "")
        )
        # model_out = greedy_decode(model, rb.src, rb.src_mask, 72, 0)[0]
        model_out = beam_decode(model, rb.src, rb.src_mask, 72, 0, BEAM_SIZE=3)
        model_txt = (
            " ".join(
                [vocab_tgt.get_itos()[x] for x in model_out if x != pad_idx]
            ).split(eos_string, 1)[0]
            + eos_string
        )
        print("Model Output               : " + model_txt.replace("\n", ""))
        results[idx] = (rb, src_tokens, tgt_tokens, model_out, model_txt)
    return results


def run_model_example(n_examples=5):
    global vocab_src, vocab_tgt, spacy_de, spacy_en

    print("Preparing Data ...")
    _, valid_dataloader = create_dataloaders(
        torch.device("cpu"),
        vocab_src,
        vocab_tgt,
        spacy_de,
        spacy_en,
        batch_size=1,
        is_distributed=False,
    )

    print("Loading Trained Model ...")

    model = make_model(len(vocab_src), len(vocab_tgt), N=6)
    model.load_state_dict(
        torch.load("multi30k_model_final.pt", map_location=torch.device("cpu"))
    )

    print("Checking Model Outputs:")
    example_data = check_outputs(
        valid_dataloader, model, vocab_src, vocab_tgt, n_examples=n_examples
    )
    return model, example_data

In [None]:
execute_example(run_model_example)