In [27]:
import torch
import torch.nn as nn
import os
from pathlib import Path
from torchtext.vocab import Vocab
import pandas as pd
from transformers import AutoTokenizer
from tokenizers import Tokenizer
import math
from tqdm import tqdm
import json
from torchtext.data.metrics import bleu_score

In [28]:
class PositionalEncoding(nn.Module):
    "Implement the PE function."

    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # 初始化Shape为(max_len, d_model)的PE (positional encoding)
        pe = torch.zeros(max_len, d_model).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
        # 初始化一个tensor [[0, 1, 2, 3, ...]]
        position = torch.arange(0, max_len).unsqueeze(1)
        # 这里就是sin和cos括号中的内容，通过e和ln进行了变换
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        # 计算PE(pos, 2i)
        pe[:, 0::2] = torch.sin(position * div_term)
        # 计算PE(pos, 2i+1)
        pe[:, 1::2] = torch.cos(position * div_term)
        # 为了方便计算，在最外面在unsqueeze出一个batch
        pe = pe.unsqueeze(0)
        # 如果一个参数不参与梯度下降，但又希望保存model的时候将其保存下来
        # 这个时候就可以用register_buffer
        self.register_buffer("pe", pe)

    def forward(self, x):
        """
        x 为embedding后的inputs，例如(1,7, 128)，batch size为1,7个单词，单词维度为128
        """
        # 将x和positional encoding相加。
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)


In [29]:
class TranslationModel(nn.Module):

    def __init__(self, d_model, src_vocab, tgt_vocab, dropout=0.1):
        super(TranslationModel, self).__init__()

        # 定义原句子的embedding
        self.src_embedding = nn.Embedding(len(src_vocab), d_model, padding_idx=2)
        # 定义目标句子的embedding
        self.tgt_embedding = nn.Embedding(len(tgt_vocab), d_model, padding_idx=2)
        # 定义posintional encoding
        self.positional_encoding = PositionalEncoding(d_model, dropout, max_len=72) # 写死了最大长度为72
        # 定义Transformer
        self.transformer = nn.Transformer(d_model, dropout=dropout, batch_first=True)

        # 定义最后的预测层，这里并没有定义Softmax，而是把他放在了模型外。
        self.predictor = nn.Linear(d_model, len(tgt_vocab))

    def forward(self, src, tgt):
        """
        进行前向传递，输出为Decoder的输出。注意，这里并没有使用self.predictor进行预测，
        因为训练和推理行为不太一样，所以放在了模型外面。
        :param src: 原batch后的句子，例如[[0, 12, 34, .., 1, 2, 2, ...], ...]
        :param tgt: 目标batch后的句子，例如[[0, 74, 56, .., 1, 2, 2, ...], ...]
        :return: Transformer的输出，或者说是TransformerDecoder的输出。
        """

        """
        生成tgt_mask，即阶梯型的mask，例如：
        [[0., -inf, -inf, -inf, -inf],
        [0., 0., -inf, -inf, -inf],
        [0., 0., 0., -inf, -inf],
        [0., 0., 0., 0., -inf],
        [0., 0., 0., 0., 0.]]
        tgt.size()[-1]为目标句子的长度。
        """
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size()[-1]).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
        # 掩盖住原句子中<pad>的部分，例如[[False,False,False,..., True,True,...], ...]
        src_key_padding_mask = TranslationModel.get_key_padding_mask(src)
        # 掩盖住目标句子中<pad>的部分
        tgt_key_padding_mask = TranslationModel.get_key_padding_mask(tgt)

        # 对src和tgt进行编码
        src = self.src_embedding(src)
        tgt = self.tgt_embedding(tgt)
        # 给src和tgt的token增加位置信息
        src = self.positional_encoding(src)
        tgt = self.positional_encoding(tgt)

        # 将准备好的数据送给transformer
        out = self.transformer(src, tgt,
                               tgt_mask=tgt_mask,
                               src_key_padding_mask=src_key_padding_mask,
                               tgt_key_padding_mask=tgt_key_padding_mask)

        """
        这里直接返回transformer的结果。因为训练和推理时的行为不一样，
        所以在该模型外再进行线性层的预测。
        """
        return out

    @staticmethod
    def get_key_padding_mask(tokens):
        """
        用于key_padding_mask
        """
        return tokens == 2


In [30]:
class Translation:

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model: nn.Module  = torch.load('model/model.pt', map_location=device)
    enVocab: Vocab = torch.load('dataset/vocab_en.pt', map_location=device)
    zhVocab: Vocab = torch.load('dataset/vocab_zh.pt', map_location=device)
    tokenizer: Tokenizer = Tokenizer.from_file('bert-base-chinese/tokenizer.json')
    maxLength = 72

    def cut(self, src: str) -> list[torch.Tensor]:
        result = []
        for i in range(0, len(src), self.maxLength - 2):
            temp = src[i: min(i + self.maxLength - 2, len(src))]
            temp = torch.tensor([0] + self.enVocab(self.enTokenizer(temp)) + [1]).unsqueeze(0).to(self.device)
            result.append(temp)
        return result
    
    def translation(self, src: str) -> str:
        # 将与原句子分词后，通过词典转为index，然后增加<bos>和<eos>
        src = self.cut(src)
        result = ''
        for s in src:
            tgt = torch.tensor([[0]]).to(self.device)
            # 一个一个词预测，直到预测为<eos>，或者达到句子最大长度
            for i in range(self.maxLength):
                # 进行transformer计算
                out = self.model(s, tgt)
                # 预测结果，因为只需要看最后一个词，所以取`out[:, -1]`
                predict = self.model.predictor(out[:, -1])
                # 找出最大值的index
                y = torch.argmax(predict, dim=1)
                # 和之前的预测结果拼接到一起
                tgt = torch.concat([tgt, y.unsqueeze(0)], dim=1)
                # 如果为<eos>，说明预测结束，跳出循环
                if y == 1:
                    break
            # 将预测tokens拼起来
            tgt = ''.join(self.zhVocab.lookup_tokens(tgt.squeeze().tolist())).replace("<s>", "").replace("</s>", "")
            result += tgt
        return result
    
    def enTokenizer(self, text: str) -> list[str]:
        return self.tokenizer.encode(text, add_special_tokens=False).tokens
    
    def __call__(self, text: str) -> str:
        return self.translation(text)

In [31]:
def get_line_count(file: str) -> int:
    count = 0
    with open(file, 'r', encoding='utf-8') as f:
        for line in f:
            count += 1
    return count

def read_file(file: str, cache: str) -> list[list[str]]:
    if cache is not None and os.path.exists(cache):
        return torch.load(cache)
    
    chTokenizer = AutoTokenizer.from_pretrained('bert-base-chinese')
    result = []
    TranslationModel = Translation()
    with open(file, 'r', encoding='utf-8') as f:
        for line in tqdm(f, desc="translating:", total=get_line_count(file)):
            line = json.loads(line)
            english: str = line['english']
            chinese: str = line['chinese']
            if chinese.isascii(): # if chinese is ascii, then it is english, which means we need to swap english and chinese
                english, chinese = chinese, english
            translated = TranslationModel(english)
            # result.append([english, [chTokenizer.tokenize(chinese)], [[chTokenizer.tokenize(translated)]]])
            result.append([english, chinese, translated])

    if cache is not None and not os.path.exists(cache):
        torch.save(result, cache)
    
    return result

In [32]:
# data = read_file('data/functional_test.json', 'data/functional_test.pt')
data = read_file('data/functional_test.json', None)
# data = read_file('data/translation2019zh_valid.json', 'data/translation2019zh_valid.pt')

translating:: 100%|██████████| 20/20 [00:17<00:00,  1.12it/s]


In [33]:
for i in range(len(data)):
    english, chinese, translated = data[i]
    
    print(f"English: {english}")
    print(f"Chinese: {chinese}")
    print(f"Translated: {translated}")
    print("-" * 114)
    

English: Slowly and not without struggle, America began to listen.
Chinese: 美国缓慢地开始倾听，但并非没有艰难曲折。
Translated: ①不要与结构结合，began来列出来，以免受关注。
------------------------------------------------------------------------------------------------------------------
English: I didn't own a Thesaurus until four years ago and I use a small Webster's dictionary that I'd bought at K-Mart for 89 cents.
Chinese: 直到四年前我才有了一本词典。我使用的是用89美分在K市场里买来的一本韦氏小词典。我从来不使用单词处理程序。
Translated: 迪尔瓦尼不过是一个300℃，多年前，而且200℃的时候使用一个小小的1500℃。"R'Neddiuking那个'D'D'D'bougt,在30-1000℃,89cest的89cent。"
------------------------------------------------------------------------------------------------------------------
English: portlet, you must write three short deployment descriptors: web.xml, portlet.xml, and geronimo-web.xml. (Some of these may have been generated by your IDE.)
Chinese: portlet 之后，您必须编写三个简短的部署描述符：web.xml、portlet.xml 和 geronimo-web.xml（这其中的一些文件可能已经由 IDE 生成）。
Translated: 你的portlet，你的Must写了三个stor的deployutordeployipoderipripoter