In [None]:
! pip install --upgrade pip

In [None]:
!pip install torchtext contractions

In [None]:
!pip install -U spacy

In [None]:
!python -m spacy download en_core_web_sm

In [None]:
!pip install jieba

In [3]:
from modelscope import AutoModel, AutoTokenizer

In [1]:
import os
import torch
import torch.nn as nn
import math
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from typing import Iterable, List
from torch import Tensor
from torch.nn import Transformer
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
from tqdm import tqdm
import jieba
import unicodedata
from collections import Counter
import contractions
from timeit import default_timer as timer
import re

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 数据处理

In [2]:
# 加载对应的tokenizer
# 源语言是英语
SRC_LANGUAGE = 'en'
# 目标语言是中文
TGT_LANGUAGE = 'zh'


In [3]:
def read_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        return file.readlines()

In [4]:
def data_iterator(src_data, tgt_data):
    """
    src_data: [en]
    tgt_data: [zh]
    """
    # 将传入的数据以数据对的形式返回
    # [(de, en), (de, en)]
    return [(src_line.strip(), tgt_line.strip()) for src_line, tgt_line in zip(src_data, tgt_data)]

In [5]:
def yield_tokens(data_iter, language):
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}
    for data_sample in data_iter:
        # 获取对应语言的分词器,并用来对对应部分的句子做分词
        yield token_transform[language](data_sample[language_index[language]])

In [6]:
# 字符规范化函数
def unicodeToAscii(text):
    return ''.join(c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn')

# 英文数据预处理函数
def preprocess_en(text):
    text = unicodeToAscii(text.strip())
    text = contractions.fix(text)
    text = re.sub(r'\（[^）]*\）', '', text)
    text = re.sub(r"[^a-zA-Z0-9.!?]+", r" ", text)  # 保留数字
    return text

# 中文数据预处理函数
def preprocess_zh(text):
    # 去除(掌声)这些脏数据
    text = re.sub(r'\（[^）]*\）', '', text)
    text = re.sub(r"[^\u4e00-\u9fa5，。！？0-9a-zA-Z]", "", text)  # 保留数字和英文字母
    return text


In [7]:
with open('./data/en-zh.dic', 'r', encoding='utf-8') as f:
    dic = [line.strip().split('\t') for line in f.readlines()]
    dic_en_zh = {key: value for key, value in dic}
    dic_zh_en = {value: key for key, value in dic}
    

# 将每个词添加进jieba的分词表中
for key, value in dic_en_zh.items():
    jieba.add_word(key)
    jieba.add_word(value)



Building prefix dict from the default dictionary ...
Loading model from cache C:\Users\Administrator\AppData\Local\Temp\jieba.cache
Loading model cost 0.849 seconds.
Prefix dict has been built successfully.


In [8]:
with open("./data/train.txt", "a+", encoding='utf-8') as f:
    for en, zh in dic:
        f.write(f"{en}\t{zh}\n")

In [9]:

def add_split_symbols(tokens, special_dict):
    return ['<|sword|>' + token + '<|eword|>' if token in special_dict else token for token in tokens]


In [10]:
# 定义词表
token_transform = {}
vocab_transform = {}

token_transform[SRC_LANGUAGE] = lambda x: add_split_symbols(x.split(' '), dic_en_zh)
token_transform[TGT_LANGUAGE] = lambda x: list(jieba.cut(x))

# 你可能还需要构建词表（vocabulary），这里省略了相关代码

In [11]:
with open("./data/train.txt", 'r', encoding='utf-8') as f:
    data = f.readlines()
    en_data = [preprocess_en(line.strip().split('\t')[0]) for line in data]
    zh_data = [preprocess_zh(line.strip().split('\t')[1]) for line in data]

In [12]:
# 测试自定义的分词和添加特殊符号功能
test_sentence_en = "Oxford philosopher and transhumanist Nick Bostrom examines the future of humankind and asks whether we might alter the fundamental nature of humanity to solve our most intrinsic problems."
token_transform[SRC_LANGUAGE](test_sentence_en)

['Oxford',
 'philosopher',
 'and',
 'transhumanist',
 '<|sword|>Nick<|eword|>',
 'Bostrom',
 'examines',
 'the',
 '<|sword|>future<|eword|>',
 'of',
 'humankind',
 'and',
 'asks',
 'whether',
 'we',
 'might',
 'alter',
 'the',
 'fundamental',
 'nature',
 'of',
 '<|sword|>humanity<|eword|>',
 'to',
 'solve',
 'our',
 'most',
 'intrinsic',
 'problems.']

In [24]:
en_data[len(en_data)-len(dic)]
en_data_ = []
for i in range(len(en_data)):
    if i < len(en_data)-len(dic):
        en_data_.append(en_data[i])
    else:
        en_data_.append('<|sword|>'+en_data[i]+'<|eword|>')

en_data = en_data_

In [25]:
with open('./data/train.en', 'w', encoding='utf-8') as f:
    for line in en_data:
        f.write(line+"\n")
        
with open('./data/train.zh', 'w', encoding='utf-8') as f:
    for line in zh_data:
        f.write(line+"\n")

In [26]:
# 加载训练和验证数据
train_src_file = './data/train.en'  
train_tgt_file = './data/train.zh'  

valid_src_file = './data/dev_en.txt'  
valid_tgt_file = './data/dev_zh.txt'  

train_src_data = read_data(train_src_file)
train_tgt_data = read_data(train_tgt_file)

valid_src_data = read_data(valid_src_file)
valid_tgt_data = read_data(valid_tgt_file)

train_data = data_iterator(train_src_data, train_tgt_data)
valid_data = data_iterator(valid_src_data, valid_tgt_data)


In [27]:
# 定义特殊字符以及它们在词汇表中的索引
# UNK_IDX：未知词的索引
# PAD_IDX：填充词的索引
# BOS_IDX：句子开始符的索引
# EOS_IDX：句子结束符的索引
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX, SWORD_IDX, EWORD_IDX = 0, 1, 2, 3, 4, 5
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>', '<|sword|>', "<|eword|>"]

# 构建 vocab_transform
# vocab_transform 是一个字典，用于存储源语言和目标语言的词汇表
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    vocab_transform[ln] = build_vocab_from_iterator(
        yield_tokens(train_data, ln),  # 从数据迭代器中生成分词结果
        min_freq=1,  # 词汇表中的词必须至少出现1次
        specials=special_symbols,  # 特殊符号列表
        special_first=True  # 将特殊符号放在词汇表的前面
    )
print(vocab_transform)

# 将unk设置为默认字符
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    vocab_transform[ln].set_default_index(UNK_IDX)

{'en': Vocab(), 'zh': Vocab()}


# 定义模型

In [28]:
# 定义位置编码器
class PositionalEncoding(nn.Module):
    def __init__(self, emb_size, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(-torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, max_len).reshape(max_len, 1)
        pos_embedding = torch.zeros((max_len, emb_size))
        # 填充
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        # 变成三维, 方便后期计算
        pos_embedding = pos_embedding.unsqueeze(-2)
        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        # 将token_embedding和位置编码相融合
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])


In [29]:
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        # 调用nn中的预定义层Embedding, 获取一个词嵌入对象self.embedding
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        # 让 embeddings vector 在增加 之后的 position encoding 之前相对大一些的操作，
        # 主要是为了让position encoding 相对的小，这样会让原来的 embedding vector 中的信息在和 position encoding 的信息相加时不至于丢失掉
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

In [30]:
class Seq2SeqTransformer(nn.Module):
    def __init__(self, num_encoder_layers, num_decoder_layers, emb_size, nhead, src_vocab_size, tgt_vocab_size, dim_feedforward=512, dropout=0.1):
        super(Seq2SeqTransformer, self).__init__()
        # 创建Transformer对象
        self.transformer = Transformer(d_model=emb_size,
                                       nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout)
        # 创建全连接线性层
        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        # 创建源语言的embedding层
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        # 创建目标语言的embedding层
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        # 创建位置编码器层对象
        self.positional_encoding = PositionalEncoding(
            emb_size, dropout=dropout)

    def forward(self, src, trg, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, memory_key_padding_mask):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None, src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        return self.generator(outs)

    def encode(self, src, src_mask):
        return self.transformer.encoder(self.positional_encoding(self.src_tok_emb(src)), src_mask)

    def decode(self, tgt, memory, tgt_mask):
        return self.transformer.decoder(self.positional_encoding(self.tgt_tok_emb(tgt)), memory, tgt_mask)


# 定义辅助函数

In [31]:
# 生成一个上三角矩阵掩码，用于目标序列
def generate_square_subsequent_mask(sz):
    # 生成一个sz x sz的上三角矩阵，值全为1
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    # 将上三角矩阵中的0位置的值替换为负无穷大，将1位置的值替换为0,因为在transform库中的掩码是对0为非遮掩部分
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

In [32]:
def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]
    # 生成目标序列的掩码
    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    # 源序列的掩码，全为0
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)
    # 源序列和目标序列的填充掩码，标记出填充位置
    # 这里转置的原因是:
    # src和tgt的shape是(seq_len, batch_siez), 通过转置后,我们的src_padding_mask为(batch_size, seq_len)
    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

In [33]:
# 示例源序列和目标序列
src = torch.tensor([[5, 2, 3], [4, 5, 1], [1, 1, 1]], dtype=torch.long, device=DEVICE)
tgt = torch.tensor([[3, 2, 3], [4, 1, 1], [1, 1, 1]], dtype=torch.long, device=DEVICE)

# 创建掩码
src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt)

print("src_mask:\n", src_mask)
print("tgt_mask:\n", tgt_mask)
print("src_padding_mask:\n", src_padding_mask)
print("tgt_padding_mask:\n", tgt_padding_mask)

src_mask:
 tensor([[False, False, False],
        [False, False, False],
        [False, False, False]], device='cuda:0')
tgt_mask:
 tensor([[0., -inf, -inf],
        [0., 0., -inf],
        [0., 0., 0.]], device='cuda:0')
src_padding_mask:
 tensor([[False, False,  True],
        [False, False,  True],
        [False,  True,  True]], device='cuda:0')
tgt_padding_mask:
 tensor([[False, False,  True],
        [False,  True,  True],
        [False,  True,  True]], device='cuda:0')


In [35]:
# 将多个转换函数串联起来
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

In [36]:
# 将token列表转换为tensor，并添加开始和结束标记
def tensor_transform(token_ids: List[int]):
    return torch.cat((torch.tensor([BOS_IDX]),
                      torch.tensor(token_ids),
                      torch.tensor([EOS_IDX])))

In [37]:
text_transform = {}
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    text_transform[ln] = sequential_transforms(token_transform[ln],  # 分词
                                               vocab_transform[ln],  # 数值化
                                               tensor_transform)  # 添加BOS/EOS并转换为tensor

In [38]:
# 数据批处理函数，用于DataLoader
def collate_fn(batch):
    """
    [('Zwei junge weiße Männer sind im Freien in der Nähe vieler Büsche.',
  'Two young, White males are outside near many bushes.'),.....]
    """
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        # 对源语言和目标语言的句子进行转换处理
        src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
        tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))
    # 对源语言和目标语言的批次进行填充
    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
    return src_batch, tgt_batch

In [39]:
BATCH_SIZE = 16
train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE, collate_fn=collate_fn, shuffle=True)
valid_dataloader = DataLoader(valid_data, batch_size=BATCH_SIZE, collate_fn=collate_fn)

![](./img/imp_2.png)

In [40]:
# 设置种子用于生成随机数，以使得结果是确定的
torch.manual_seed(0)

# 设置调用时候使用的参数
SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 512
BATCH_SIZE = 128
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

# 实例化Transformer对象
transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
                                 NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)
# 为了保证每层的输入和输出的方差相同, 防止梯度消失问题
for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)
# 如果有GPU则将模型移动到GPU上
transformer = transformer.to(DEVICE)
# 定义损失函数
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)
# 定义优化器  betas: 用于计算梯度及其平方的运行平均值的系数  eps:添加到分母以提高数值稳定性
"""
betas 是 Adam 优化器中两个超参数的元组，用于计算一阶和二阶矩估计的指数衰减率。
第一个值 0.9 是用于计算梯度的一阶矩（即动量）的衰减率。较高的值表示动量更大，历史梯度的影响更长久。
第二个值 0.98 是用于计算梯度的二阶矩（即平方梯度）的衰减率。较高的值表示对最近梯度变化的敏感度更低。
"""
optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)



# 定义训练函数

In [41]:
def train_epoch(model, optimizer, dataloader):
    model.train()
    losses = 0
    for src, tgt in tqdm(dataloader, desc="Training", leave=False):
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)
        # 这一步将目标序列的最后一个时间步去掉，得到 tgt_input。这是因为在训练过程中，我们使用目标序列的前 T个时间步。
        tgt_input = tgt[:-1, :]
        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
        logits = model(src, tgt_input, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, src_padding_mask)
        optimizer.zero_grad()
        tgt_out = tgt[1:].to(torch.long)
        logits = logits.to(torch.float32)
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        loss.backward()
        optimizer.step()
        losses += loss.item()
    return losses / len(dataloader)

# 评估函数

In [42]:
def evaluate(model, dataloader):
    model.eval()
    losses = 0
    for src, tgt in tqdm(dataloader, desc="Evaluating", leave=False):
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)
        tgt_input = tgt[:-1, :]
        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
        logits = model(src, tgt_input, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, src_padding_mask)
        tgt_out = tgt[1:, :]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        losses += loss.item()
    return losses / len(dataloader)

In [43]:
# 创建数据加载器
NUM_EPOCHS = 3

for epoch in range(1, NUM_EPOCHS + 1):
    start_time = timer()
    train_loss = train_epoch(transformer, optimizer, train_dataloader)
    end_time = timer()
    val_loss = evaluate(transformer, valid_dataloader)
    print(f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}, Epoch time = {(end_time - start_time):.3f}s")


  attn_output = scaled_dot_product_attention(q, k, v, attn_mask, dropout_p, is_causal)
                                                             

KeyboardInterrupt: 

In [None]:


# 模型保存和加载
path = './model/transformer_translation_5.pth'
torch.save(transformer.state_dict(), path)

# 加载模型
transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE, NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)
transformer.load_state_dict(torch.load(path))



<All keys matched successfully>

In [44]:
# 贪婪解码函数，用于从模型中生成翻译结果
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    # 将输入数据和掩码移动到设备上
    src = src.to(DEVICE)
    src_mask = src_mask.to(DEVICE)
    
    # 编码器对源序列进行编码
    memory = model.encode(src, src_mask)
    
    # 初始化目标序列，以开始符号开始
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    
    # 逐步生成目标序列
    for i in range(max_len - 1):
        memory = memory.to(DEVICE)
        
        # 生成目标序列掩码
        tgt_mask = (generate_square_subsequent_mask(ys.size(0)).type(torch.bool)).to(DEVICE)
        # 解码器对目标序列进行解码
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)
        
        # 生成下一个词的概率分布
        prob = model.generator(out[:, -1])
        
        # 选择概率最高的词作为下一个词
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()
        
        # 将下一个词添加到目标序列中
        ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
        
        # 如果生成结束符，则停止生成
        if next_word == EOS_IDX:
            break
    
    # 返回生成的目标序列
    return ys


In [45]:

# 翻译函数，将源语言句子翻译成目标语言句子
def translate(model: torch.nn.Module, src_sentence: str):
    model.eval()  # 设置模型为评估模式
    
    # 将源语言句子进行分词、数值化和tensor转换
    src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)
    src = src.to(DEVICE)
    # 获取源序列的长度
    num_tokens = src.shape[0]
    
    # 创建源序列掩码，全为0
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    
    # 使用贪婪解码生成目标语言句子
    tgt_tokens = greedy_decode(model, src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()
    
    # 将生成的目标语言句子tensor转换为字符串，并去掉开始和结束符
    return " ".join(vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<bos>", "").replace("<eos>", "")


In [46]:
with open("./data/test_en.txt", 'r', encoding='utf-8') as f:
    test_data = f.readlines()

In [47]:
test_data[:5]

['The canneries are gone. The pollution has abated.\n',
 'Handspring Puppet Co.: The genius puppetry behind War Horse\n',
 'But now we would like you to put Joey through some paces.\n',
 "And this is exactly what we've been seeing with teenagers and kids doing it in school, under the table, and texting under the table to their friends.\n",
 'When it was announced that they were going to do every child in Uruguay, the first 100,000, boom, went to OLPC.\n']

In [None]:
with open("sumbit.txt", 'w', encoding='utf-8') as f:
    for line in test_data:
        transformer.to(DEVICE)
        res = translate(transformer, line)
        f.write(''.join(res.split(' '))+'\n')
        

In [48]:
src = 'Eine Gruppe von Männern lädt Baumwolle auf einen Lastwagen'
transformer.to(DEVICE)
translate(transformer, src)
#标准答案: A group of men are loading cotton onto a truck

' 设计 的 是 ， 由 一个 被 称为 的 人 ， 被 称为 的 ！'