最终模型请查看最后一个单元格，     
作业使用了deepseek-V3帮助理解代码和优化，    
作业完整文件夹也上传了 https://github.com/Yuyu-uu/course-AI-Programming/tree/main/Homework/hw6

# 还是先仔细理解一下原来的代码：    

先构建中英文原始语料文本;    
再进行分词，并构建词汇表，给每个词分配一个唯一的索引，这样就将句子的文本数据转换为索引的数字序列。这个时候它是离散的向量形式，是模型的输入;   

数字序列输入到模型后，先经过 Embedding Layer,嵌入层 `self.embedding = nn.Embedding(input_dim, d_model)`将输入的数字序列转换为维度为 d_model 的连续向量，在连续向量空间中，语义相近的词对应的向量在空间中距离较近；     

在嵌入层之后，Transformer 模型通过多头自注意力机制和前馈神经网络对连续向量进行特征提取和转换。多头自注意力机制允许模型在处理每个位置的向量时，考虑序列中其他位置的信息，从而捕捉长距离的依赖关系。前馈神经网络则对自注意力机制的输出进行进一步的非线性变换，提取更高级的特征；      

经过编码器和解码器的处理后，模型的输出是连续的向量。再通过一个全连接层`self.fc_out = nn.Linear(d_model, output_dim)）`将输出向量映射到词汇表的维度，得到每个词的概率分布。通过选择概率最大的词对应的索引，将连续向量转换回离散的数字序列，再转换回对应文本。

（此外，因为 Transformer 模型本身不具备捕捉序列中位置信息的能力，所以需要额外的位置编码`def _generate_positional_encoding(self, seq_len)`）
 

多头自注意力机制和前馈神经网络的具体操作见课件，直观的理解是，查询向量 qi 可以看作是当前词在寻找与之相关的信息，键向量 kj 可以看作是其他词提供的信息索引，值向量 vj 是其他词携带的具体信息。通过计算查询向量和键向量的点积 scorei,j 可以得到当前词与其他词的关联程度。然后，根据这个关联程度对值向量进行加权求和，就可以得到当前词在考虑其他词信息后的表示。


# 模型目前的参数

Transformer 层：`nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout, batch_first=False)`   
D_MODEL：模型的隐藏层维度，值为 32    
NHEAD：多头注意力机制的头数，值为 2    
NUM_ENCODER_LAYERS：编码器的层数，值为 2    
NUM_DECODER_LAYERS：解码器的层数，值为 2    
DIM_FEEDFORWARD：前馈神经网络的隐藏层维度，值为 32    
DROPOUT：Dropout 概率，值为 0.05   

nn.Transformer 内部采用 ReLU 作为前馈神经网络的激活函数。

损失函数：交叉熵损失：`nn.CrossEntropyLoss(ignore_index = 0)`  

优化器: Adam 优化器 `torch.optim.Adam(model.parameters(), lr = 0.001)`，结合了 AdaGrad 和 RMSProp ，能够自适应地调整每个参数的学习率  

训练参数：
MAX_EPOCH：训练轮次，值为 200   
batch_size：批大小，在 DataLoader 里设置为 8   
lr：学习率，在 Adam 优化器中设置为 0.001

In [1]:
import torch
import spacy
import random
import os
import math

import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter

In [2]:
# 设置随机种子以确保可重复性
torch.manual_seed(42)

# 中文和英文句子
chinese_sentences = [ "你好", "今天 天气 很 好",
                     "今天 天气 很 好",
                     "我 爱 学习","我 喜欢 狗",
                     "天气 很 好","我 爱 养猫","我 喜欢 学习",
                     "你好", "今天 天气 很 好","爱 养猫"
                     "今天", "天气", "很", "好",
                     "我", "爱", "学习","我","喜欢","狗","猫",
                     ]
english_sentences = [ "Hello", "today weather very good",
                     "today weather very good",
                     "I love learning","I like dog",
                     "weather very good","I love cat","I like study",
                     "Hello", "today weather very good","love cat"
                     "today", "weather", "very", "good",
                     "I", "love", "learning","I","like","dog","cat",
                     ]

print(chinese_sentences)
print(english_sentences)


['你好', '今天 天气 很 好', '今天 天气 很 好', '我 爱 学习', '我 喜欢 狗', '天气 很 好', '我 爱 养猫', '我 喜欢 学习', '你好', '今天 天气 很 好', '爱 养猫今天', '天气', '很', '好', '我', '爱', '学习', '我', '喜欢', '狗', '猫']
['Hello', 'today weather very good', 'today weather very good', 'I love learning', 'I like dog', 'weather very good', 'I love cat', 'I like study', 'Hello', 'today weather very good', 'love cattoday', 'weather', 'very', 'good', 'I', 'love', 'learning', 'I', 'like', 'dog', 'cat']


In [5]:
# 加载 spacy 分词器
spacy_ch = spacy.load('zh_core_web_sm')
spacy_en = spacy.load('en_core_web_sm')

# 分词函数
def tokenize_ch(text):
    return [tok.text for tok in spacy_ch.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

# 构建词汇表

def build_vocab(data, min_freq=1):
    counter = Counter()
    for tokens in data:
        counter.update(tokens)
    vocab = {word: idx + 4 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<pad>'] = 0
    vocab['<sos>'] = 1
    vocab['<eos>'] = 2
    vocab['<unk>'] = 3
    return vocab

# 构建中文和英文词汇表
chinese_vocab = build_vocab([tokenize_ch(s) for s in chinese_sentences])
english_vocab = build_vocab([tokenize_en(s) for s in english_sentences])

def sentence_to_indices(sentence, vocab):
    return [vocab['<sos>']] + [vocab.get(word, vocab['<unk>']) for word in sentence] + [vocab['<eos>']]

# 将句子转换为索引序列
data = [
    (sentence_to_indices(tokenize_en(english), english_vocab),  # 英文句子和英文词汇表
    sentence_to_indices(tokenize_ch(chinese), chinese_vocab)   # 中文句子和中文词汇表
    )
    for chinese, english in zip(chinese_sentences, english_sentences)
]
print(chinese_vocab)
print(english_vocab)
print(data)


{'你好': 4, '今天': 5, '天气': 6, '很': 7, '好': 8, '我': 9, '爱': 10, '学习': 11, '喜欢': 12, '狗': 13, '养': 14, '猫': 15, '养猫': 16, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
{'Hello': 4, 'today': 5, 'weather': 6, 'very': 7, 'good': 8, 'I': 9, 'love': 10, 'learning': 11, 'like': 12, 'dog': 13, 'cat': 14, 'study': 15, 'cattoday': 16, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
[([1, 4, 2], [1, 4, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 9, 10, 11, 2], [1, 9, 10, 11, 2]), ([1, 9, 12, 13, 2], [1, 9, 12, 13, 2]), ([1, 6, 7, 8, 2], [1, 6, 7, 8, 2]), ([1, 9, 10, 14, 2], [1, 9, 10, 14, 15, 2]), ([1, 9, 12, 15, 2], [1, 9, 12, 11, 2]), ([1, 4, 2], [1, 4, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 10, 16, 2], [1, 10, 16, 5, 2]), ([1, 6, 2], [1, 6, 2]), ([1, 7, 2], [1, 7, 2]), ([1, 8, 2], [1, 8, 2]), ([1, 9, 2], [1, 9, 2]), ([1, 10, 2], [1, 10, 2]), ([1, 11, 2], [1, 11, 2]), ([1, 9, 2], [1, 9, 2]), ([1, 12, 2], [1, 12, 2]), ([1, 13, 2], [1,

In [6]:
# 数据整理函数
def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_pad = pad_sequence([torch.tensor(s) for s in src_batch], padding_value=0, batch_first=False)
    trg_pad = pad_sequence([torch.tensor(t) for t in trg_batch], padding_value=0, batch_first=False)
    return src_pad, trg_pad

class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return self.data[idx]

dataset = TranslationDataset(data)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)

In [7]:
# Transformer模型（修改输入输出维度的含义，代码结构不变）
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.d_model = d_model
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout, batch_first=False)
        self.fc_out = nn.Linear(d_model, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def _generate_positional_encoding(self, seq_len):
        position = torch.arange(seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d_model, 2, dtype=torch.float) * (-math.log(10000.0) / self.d_model))
        pe = torch.zeros(seq_len, self.d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(1)  # [1, seq_len, d_model]

    def forward(self, src, trg, trg_mask=None, padding_mask=None):
        src_seq_length, N = src.shape
        trg_seq_length, N = trg.shape
        # 动态生成位置编码
        src_pos = self._generate_positional_encoding(src_seq_length).to(src.device)
        trg_pos = self._generate_positional_encoding(trg_seq_length).to(trg.device)
        # 扩展位置编码的形状以匹配输入
        src_pos = src_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]
        trg_pos = trg_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]
        
        src = self.dropout(self.embedding(src) + src_pos)
        trg = self.dropout(self.embedding(trg) + trg_pos)
        if trg_mask is None:
            output = self.transformer(src, trg)
        else:
            output = self.transformer(src, trg, tgt_mask=trg_mask, tgt_key_padding_mask=padding_mask)
        prediction = self.fc_out(output)
        return prediction

In [9]:
# 调整输入输出维度（源语言英文词汇表长度→输入，目标语言中文词汇表长度→输出）
INPUT_DIM = len(english_vocab)  # 源语言（英文）词汇表大小
OUTPUT_DIM = len(chinese_vocab)  # 目标语言（中文）词汇表大小
D_MODEL = 32
NHEAD = 2
NUM_ENCODER_LAYERS = 2
NUM_DECODER_LAYERS = 2
DIM_FEEDFORWARD = 32
DROPOUT = 0.05
MAX_EPOCH = 200

model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD, DROPOUT)
# print(model)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
# optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9)


# 训练循环
for epoch in range(MAX_EPOCH):
    for i, (src, trg) in enumerate(dataloader):
        trg_mask = nn.Transformer.generate_square_subsequent_mask(trg.size(0)-1).bool()
        padding_mask = (trg[1:,] == 0).transpose(0, 1)
        output = model(src, trg[:-1,], trg_mask=trg_mask, padding_mask=padding_mask)
        loss = criterion(output.view(-1, OUTPUT_DIM), trg[1:,].view(-1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    if epoch % 100 == 99:     
        print(f'Epoch {epoch + 1}, Loss: {loss.item()}')

print("Training Finished")




Epoch 100, Loss: 0.046130478382110596
Epoch 200, Loss: 0.011155758053064346
Training Finished


In [10]:
# 保存模型
model_save_path = "./model/mymodel_en2zh.pth"
os.makedirs("model", exist_ok=True)
torch.save(model.state_dict(), model_save_path)
print(f"Model have saved to {model_save_path}")

Model have saved to ./model/mymodel_en2zh.pth


In [12]:
# 翻译函数
def translate_sentence(sentence, src_vocab, trg_vocab, model, max_len=50):
    model.eval()
    # 更改分词函数
    tokens = tokenize_en(sentence)
    indices = sentence_to_indices(tokens, src_vocab)    
    src_tensor = torch.tensor(indices).unsqueeze(1)
    #src_len=torch.tensor(len(indices)).unsqueeze(0)
    #print("src_tensor:",src_tensor)
    trg_indices = [trg_vocab['<sos>']]  # 目标语言以<sos>开始
    
    for i in range(max_len):
        trg_tensor = torch.tensor(trg_indices).unsqueeze(1)
        with torch.no_grad():
            output = model(src_tensor, trg_tensor)
        pred_token = output.argmax(2)[-1].item()
        trg_indices.append(pred_token)
        if pred_token == trg_vocab['<eos>']:
            break
    
    # 转换为目标语言（中文）tokens
    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(i)] for i in trg_indices]
    final_tokens = [token for token in trg_tokens if token not in ['<sos>', '<eos>']]
    return ' '.join(final_tokens)

# 加载模型（结构不变）
loaded_model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD, DROPOUT)
loaded_model.load_state_dict(torch.load(model_save_path, weights_only=True))

# 测试翻译（输入英文句子）
english_test_sentences = ["Hello", "today weather very good", "I love dog", "I like cat","love cat"]
for sentence in english_test_sentences:
    translation = translate_sentence(sentence, english_vocab, chinese_vocab, loaded_model)
    print(f'Input: {sentence} -> Translated: {translation}')

Input: Hello -> Translated: 你好
Input: today weather very good -> Translated: 今天 天气 很 好
Input: I love dog -> Translated: 我 爱 狗
Input: I like cat -> Translated: 我 喜欢 狗
Input: love cat -> Translated: 爱 猫


以上将输入变为英文，输出变为中文，其他结构不变，实现英译中。但可以看到，翻译结果并不都正确，语料库中出现的原例都可以翻译正确，但比如 “like cat” 就翻译错了，“like” 和 “cat” 同时出现的情况没有学习过。

在不更改语料库的情况下，我们尝试调整网络结构，再尝试不同的激活函数、损失函数，再探究合适的训练轮次和更优的学习率调整策略，最后扫描一下训练参数网格，得到最适合这个任务的参数组合。

（虽然感觉这个问题主要还是语料库太小，“like”后面只出现过“dog”，所以学习到的模型“like”和“dog”关系会很密切，查询向量和键向量的点积会比较大。而且如果要的是这种泛化能力，即翻译未出现过的词汇组合，调整代码使这个loss最小并不一定可以实现这个目的，不过这个泛化能力的标准也没法准确描述，也许可以让它翻译多个未出现的组合，看哪种参数组合翻译对的多？不过这么小的语料库这样比较也不够科学）
以下还是以Loss最小为优劣标准，即训练模型的目标是将语料库的原例尽量翻译准确。

以下先尝试不同的网络结构：

In [2]:
import torch
import spacy
import random
import os
import math

import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter

# 设置随机种子以确保可重复性
torch.manual_seed(42)

# 中文和英文句子
chinese_sentences = [
    "你好", "今天 天气 很 好",
    "今天 天气 很 好",
    "我 爱 学习", "我 喜欢 狗",
    "天气 很 好", "我 爱 养猫", "我 喜欢 学习",
    "你好", "今天 天气 很 好", "爱 养猫",
    "今天", "天气", "很", "好",
    "我", "爱", "学习", "我", "喜欢", "狗", "猫",
]
english_sentences = [
    "Hello", "today weather very good",
    "today weather very good",
    "I love learning", "I like dog",
    "weather very good", "I love cat", "I like study",
    "Hello", "today weather very good", "love cat",
    "today", "weather", "very", "good",
    "I", "love", "learning", "I", "like", "dog", "cat",
]

# 加载 spacy 分词器
spacy_ch = spacy.load('zh_core_web_sm')
spacy_en = spacy.load('en_core_web_sm')


# 分词函数
def tokenize_ch(text):
    return [tok.text for tok in spacy_ch.tokenizer(text)]


def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]


# 构建词汇表
def build_vocab(data, min_freq=1):
    counter = Counter()
    for tokens in data:
        counter.update(tokens)
    vocab = {word: idx + 4 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<pad>'] = 0
    vocab['<sos>'] = 1
    vocab['<eos>'] = 2
    vocab['<unk>'] = 3
    return vocab


# 构建中文和英文词汇表
chinese_vocab = build_vocab([tokenize_ch(s) for s in chinese_sentences])
english_vocab = build_vocab([tokenize_en(s) for s in english_sentences])


def sentence_to_indices(sentence, vocab):
    return [vocab['<sos>']] + [vocab.get(word, vocab['<unk>']) for word in sentence] + [vocab['<eos>']]


# 将句子转换为索引序列
data = [
    (sentence_to_indices(tokenize_en(english), english_vocab),  # 英文句子和英文词汇表
     sentence_to_indices(tokenize_ch(chinese), chinese_vocab)  # 中文句子和中文词汇表
     )
    for chinese, english in zip(chinese_sentences, english_sentences)
]

# 数据整理函数
def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_pad = pad_sequence([torch.tensor(s) for s in src_batch], padding_value=0, batch_first=False)
    trg_pad = pad_sequence([torch.tensor(t) for t in trg_batch], padding_value=0, batch_first=False)
    return src_pad, trg_pad


class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]


dataset = TranslationDataset(data)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)


# Transformer模型（修改输入输出维度的含义，代码结构不变）
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                 dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.d_model = d_model
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                                          dropout, batch_first=False)
        self.fc_out = nn.Linear(d_model, output_dim)
        self.dropout = nn.Dropout(dropout)

    def _generate_positional_encoding(self, seq_len):
        position = torch.arange(seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d_model, 2, dtype=torch.float) * (-math.log(10000.0) / self.d_model))
        pe = torch.zeros(seq_len, self.d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(1)  # [1, seq_len, d_model]

    def forward(self, src, trg, trg_mask=None, padding_mask=None):
        src_seq_length, N = src.shape
        trg_seq_length, N = trg.shape
        # 动态生成位置编码
        src_pos = self._generate_positional_encoding(src_seq_length).to(src.device)
        trg_pos = self._generate_positional_encoding(trg_seq_length).to(trg.device)
        # 扩展位置编码的形状以匹配输入
        src_pos = src_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]
        trg_pos = trg_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]

        src = self.dropout(self.embedding(src) + src_pos)
        trg = self.dropout(self.embedding(trg) + trg_pos)
        if trg_mask is None:
            output = self.transformer(src, trg)
        else:
            output = self.transformer(src, trg, tgt_mask=trg_mask, tgt_key_padding_mask=padding_mask)
        prediction = self.fc_out(output)
        return prediction


# 定义要搜索的参数网格
D_MODEL_LIST = [16, 32, 64]
NHEAD_LIST = [2, 4]
NUM_ENCODER_LAYERS_LIST = [2, 3]
NUM_DECODER_LAYERS_LIST = [2, 3]
DIM_FEEDFORWARD_LIST = [16, 32, 64]
DROPOUT = 0.05
MAX_EPOCH = 200

best_loss = float('inf')
best_params = {}

# 网格搜索
for D_MODEL in D_MODEL_LIST:
    for NHEAD in NHEAD_LIST:
        for NUM_ENCODER_LAYERS in NUM_ENCODER_LAYERS_LIST:
            for NUM_DECODER_LAYERS in NUM_DECODER_LAYERS_LIST:
                for DIM_FEEDFORWARD in DIM_FEEDFORWARD_LIST:
                    INPUT_DIM = len(english_vocab)  # 源语言（英文）词汇表大小
                    OUTPUT_DIM = len(chinese_vocab)  # 目标语言（中文）词汇表大小

                    model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS,
                                        DIM_FEEDFORWARD, DROPOUT)
                    # 定义损失函数和优化器
                    criterion = nn.CrossEntropyLoss(ignore_index=0)
                    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

                    total_loss = 0
                    for epoch in range(MAX_EPOCH):
                        for i, (src, trg) in enumerate(dataloader):
                            trg_mask = nn.Transformer.generate_square_subsequent_mask(trg.size(0) - 1).bool()
                            padding_mask = (trg[1:,] == 0).transpose(0, 1)
                            output = model(src, trg[:-1,], trg_mask=trg_mask, padding_mask=padding_mask)
                            loss = criterion(output.view(-1, OUTPUT_DIM), trg[1:,].view(-1))
                            optimizer.zero_grad()
                            loss.backward()
                            optimizer.step()
                            total_loss += loss.item()

                    avg_loss = total_loss / (MAX_EPOCH * len(dataloader))
                    if avg_loss < best_loss:
                        best_loss = avg_loss
                        best_params = {
                            'D_MODEL': D_MODEL,
                            'NHEAD': NHEAD,
                            'NUM_ENCODER_LAYERS': NUM_ENCODER_LAYERS,
                            'NUM_DECODER_LAYERS': NUM_DECODER_LAYERS,
                            'DIM_FEEDFORWARD': DIM_FEEDFORWARD
                        }
                    print(f"Parameters: {best_params}, Loss: {best_loss}")

print(f"Best parameters: {best_params}, Best loss: {best_loss}")
    

Parameters: {'D_MODEL': 16, 'NHEAD': 2, 'NUM_ENCODER_LAYERS': 2, 'NUM_DECODER_LAYERS': 2, 'DIM_FEEDFORWARD': 16}, Loss: 0.9271707016353806
Parameters: {'D_MODEL': 16, 'NHEAD': 2, 'NUM_ENCODER_LAYERS': 2, 'NUM_DECODER_LAYERS': 2, 'DIM_FEEDFORWARD': 32}, Loss: 0.7484967168420553
Parameters: {'D_MODEL': 16, 'NHEAD': 2, 'NUM_ENCODER_LAYERS': 2, 'NUM_DECODER_LAYERS': 2, 'DIM_FEEDFORWARD': 64}, Loss: 0.6622638514513771
Parameters: {'D_MODEL': 16, 'NHEAD': 2, 'NUM_ENCODER_LAYERS': 2, 'NUM_DECODER_LAYERS': 2, 'DIM_FEEDFORWARD': 64}, Loss: 0.6622638514513771
Parameters: {'D_MODEL': 16, 'NHEAD': 2, 'NUM_ENCODER_LAYERS': 2, 'NUM_DECODER_LAYERS': 2, 'DIM_FEEDFORWARD': 64}, Loss: 0.6622638514513771
Parameters: {'D_MODEL': 16, 'NHEAD': 2, 'NUM_ENCODER_LAYERS': 2, 'NUM_DECODER_LAYERS': 3, 'DIM_FEEDFORWARD': 64}, Loss: 0.6528403010840217
Parameters: {'D_MODEL': 16, 'NHEAD': 2, 'NUM_ENCODER_LAYERS': 2, 'NUM_DECODER_LAYERS': 3, 'DIM_FEEDFORWARD': 64}, Loss: 0.6528403010840217
Parameters: {'D_MODEL': 16,

使用以上最优参数组合：

In [3]:
import torch
import spacy
import random
import os
import math

import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter


# 设置随机种子以确保可重复性
torch.manual_seed(42)

# 中文和英文句子
chinese_sentences = [ "你好", "今天 天气 很 好",
                     "今天 天气 很 好",
                     "我 爱 学习","我 喜欢 狗",
                     "天气 很 好","我 爱 养猫","我 喜欢 学习",
                     "你好", "今天 天气 很 好","爱 养猫"
                     "今天", "天气", "很", "好",
                     "我", "爱", "学习","我","喜欢","狗","猫",
                     ]
english_sentences = [ "Hello", "today weather very good",
                     "today weather very good",
                     "I love learning","I like dog",
                     "weather very good","I love cat","I like study",
                     "Hello", "today weather very good","love cat"
                     "today", "weather", "very", "good",
                     "I", "love", "learning","I","like","dog","cat",
                     ]

print(chinese_sentences)
print(english_sentences)


# 加载 spacy 分词器
spacy_ch = spacy.load('zh_core_web_sm')
spacy_en = spacy.load('en_core_web_sm')

# 分词函数
def tokenize_ch(text):
    return [tok.text for tok in spacy_ch.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

# 构建词汇表

def build_vocab(data, min_freq=1):
    counter = Counter()
    for tokens in data:
        counter.update(tokens)
    vocab = {word: idx + 4 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<pad>'] = 0
    vocab['<sos>'] = 1
    vocab['<eos>'] = 2
    vocab['<unk>'] = 3
    return vocab

# 构建中文和英文词汇表
chinese_vocab = build_vocab([tokenize_ch(s) for s in chinese_sentences])
english_vocab = build_vocab([tokenize_en(s) for s in english_sentences])

def sentence_to_indices(sentence, vocab):
    return [vocab['<sos>']] + [vocab.get(word, vocab['<unk>']) for word in sentence] + [vocab['<eos>']]

# 将句子转换为索引序列
data = [
    (sentence_to_indices(tokenize_en(english), english_vocab),  # 英文句子和英文词汇表
    sentence_to_indices(tokenize_ch(chinese), chinese_vocab)   # 中文句子和中文词汇表
    )
    for chinese, english in zip(chinese_sentences, english_sentences)
]
print(chinese_vocab)
print(english_vocab)
print(data)


# 数据整理函数
def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_pad = pad_sequence([torch.tensor(s) for s in src_batch], padding_value=0, batch_first=False)
    trg_pad = pad_sequence([torch.tensor(t) for t in trg_batch], padding_value=0, batch_first=False)
    return src_pad, trg_pad

class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return self.data[idx]

dataset = TranslationDataset(data)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)


# Transformer模型（修改输入输出维度的含义，代码结构不变）
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.d_model = d_model
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout, batch_first=False)
        self.fc_out = nn.Linear(d_model, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def _generate_positional_encoding(self, seq_len):
        position = torch.arange(seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d_model, 2, dtype=torch.float) * (-math.log(10000.0) / self.d_model))
        pe = torch.zeros(seq_len, self.d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(1)  # [1, seq_len, d_model]

    def forward(self, src, trg, trg_mask=None, padding_mask=None):
        src_seq_length, N = src.shape
        trg_seq_length, N = trg.shape
        # 动态生成位置编码
        src_pos = self._generate_positional_encoding(src_seq_length).to(src.device)
        trg_pos = self._generate_positional_encoding(trg_seq_length).to(trg.device)
        # 扩展位置编码的形状以匹配输入
        src_pos = src_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]
        trg_pos = trg_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]
        
        src = self.dropout(self.embedding(src) + src_pos)
        trg = self.dropout(self.embedding(trg) + trg_pos)
        if trg_mask is None:
            output = self.transformer(src, trg)
        else:
            output = self.transformer(src, trg, tgt_mask=trg_mask, tgt_key_padding_mask=padding_mask)
        prediction = self.fc_out(output)
        return prediction


# 调整输入输出维度（源语言英文词汇表长度→输入，目标语言中文词汇表长度→输出）
INPUT_DIM = len(english_vocab)  # 源语言（英文）词汇表大小
OUTPUT_DIM = len(chinese_vocab)  # 目标语言（中文）词汇表大小
D_MODEL = 64
NHEAD = 4
NUM_ENCODER_LAYERS = 2
NUM_DECODER_LAYERS = 2
DIM_FEEDFORWARD = 64
DROPOUT = 0.05
MAX_EPOCH = 200

model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD, DROPOUT)
# print(model)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
# optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9)


# 训练循环
for epoch in range(MAX_EPOCH):
    for i, (src, trg) in enumerate(dataloader):
        trg_mask = nn.Transformer.generate_square_subsequent_mask(trg.size(0)-1).bool()
        padding_mask = (trg[1:,] == 0).transpose(0, 1)
        output = model(src, trg[:-1,], trg_mask=trg_mask, padding_mask=padding_mask)
        loss = criterion(output.view(-1, OUTPUT_DIM), trg[1:,].view(-1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    if epoch % 100 == 99:     
        print(f'Epoch {epoch + 1}, Loss: {loss.item()}')

print("Training Finished")



# 保存模型
model_save_path = "./model/mymodel_en2zh.pth"
os.makedirs("model", exist_ok=True)
torch.save(model.state_dict(), model_save_path)
print(f"Model have saved to {model_save_path}")


# 翻译函数
def translate_sentence(sentence, src_vocab, trg_vocab, model, max_len=50):
    model.eval()
    # 更改分词函数
    tokens = tokenize_en(sentence)
    indices = sentence_to_indices(tokens, src_vocab)    
    src_tensor = torch.tensor(indices).unsqueeze(1)
    #src_len=torch.tensor(len(indices)).unsqueeze(0)
    #print("src_tensor:",src_tensor)
    trg_indices = [trg_vocab['<sos>']]  # 目标语言以<sos>开始
    
    for i in range(max_len):
        trg_tensor = torch.tensor(trg_indices).unsqueeze(1)
        with torch.no_grad():
            output = model(src_tensor, trg_tensor)
        pred_token = output.argmax(2)[-1].item()
        trg_indices.append(pred_token)
        if pred_token == trg_vocab['<eos>']:
            break
    
    # 转换为目标语言（中文）tokens
    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(i)] for i in trg_indices]
    final_tokens = [token for token in trg_tokens if token not in ['<sos>', '<eos>']]
    return ' '.join(final_tokens)

# 加载模型（结构不变）
loaded_model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD, DROPOUT)
loaded_model.load_state_dict(torch.load(model_save_path, weights_only=True))

# 测试翻译（输入英文句子）
english_test_sentences = ["Hello", "today weather very good", "I love dog", "I like cat","love cat"]
for sentence in english_test_sentences:
    translation = translate_sentence(sentence, english_vocab, chinese_vocab, loaded_model)
    print(f'Input: {sentence} -> Translated: {translation}')

['你好', '今天 天气 很 好', '今天 天气 很 好', '我 爱 学习', '我 喜欢 狗', '天气 很 好', '我 爱 养猫', '我 喜欢 学习', '你好', '今天 天气 很 好', '爱 养猫今天', '天气', '很', '好', '我', '爱', '学习', '我', '喜欢', '狗', '猫']
['Hello', 'today weather very good', 'today weather very good', 'I love learning', 'I like dog', 'weather very good', 'I love cat', 'I like study', 'Hello', 'today weather very good', 'love cattoday', 'weather', 'very', 'good', 'I', 'love', 'learning', 'I', 'like', 'dog', 'cat']
{'你好': 4, '今天': 5, '天气': 6, '很': 7, '好': 8, '我': 9, '爱': 10, '学习': 11, '喜欢': 12, '狗': 13, '养': 14, '猫': 15, '养猫': 16, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
{'Hello': 4, 'today': 5, 'weather': 6, 'very': 7, 'good': 8, 'I': 9, 'love': 10, 'learning': 11, 'like': 12, 'dog': 13, 'cat': 14, 'study': 15, 'cattoday': 16, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
[([1, 4, 2], [1, 4, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 9, 10, 11, 2], [1, 9, 10, 11, 2]), ([1, 9, 12, 13, 2], [1, 9, 12, 

Loss明显降低，且正确翻译了“I like cat”，看来增加维度和多头数量可以使模型更正确的定位每个词在向量空间中的位置，从而更准确的翻译
以下再尝试不同的激活函数（GELU）：

In [4]:
import torch
import spacy
import random
import os
import math

import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter


# 设置随机种子以确保可重复性
torch.manual_seed(42)

# 中文和英文句子
chinese_sentences = [
    "你好", "今天 天气 很 好",
    "今天 天气 很 好",
    "我 爱 学习", "我 喜欢 狗",
    "天气 很 好", "我 爱 养猫", "我 喜欢 学习",
    "你好", "今天 天气 很 好", "爱 养猫",
    "今天", "天气", "很", "好",
    "我", "爱", "学习", "我", "喜欢", "狗", "猫",
]
english_sentences = [
    "Hello", "today weather very good",
    "today weather very good",
    "I love learning", "I like dog",
    "weather very good", "I love cat", "I like study",
    "Hello", "today weather very good", "love cat",
    "today", "weather", "very", "good",
    "I", "love", "learning", "I", "like", "dog", "cat",
]

print(chinese_sentences)
print(english_sentences)


# 加载 spacy 分词器
spacy_ch = spacy.load('zh_core_web_sm')
spacy_en = spacy.load('en_core_web_sm')

# 分词函数
def tokenize_ch(text):
    return [tok.text for tok in spacy_ch.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

# 构建词汇表

def build_vocab(data, min_freq=1):
    counter = Counter()
    for tokens in data:
        counter.update(tokens)
    vocab = {word: idx + 4 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<pad>'] = 0
    vocab['<sos>'] = 1
    vocab['<eos>'] = 2
    vocab['<unk>'] = 3
    return vocab

# 构建中文和英文词汇表
chinese_vocab = build_vocab([tokenize_ch(s) for s in chinese_sentences])
english_vocab = build_vocab([tokenize_en(s) for s in english_sentences])

def sentence_to_indices(sentence, vocab):
    return [vocab['<sos>']] + [vocab.get(word, vocab['<unk>']) for word in sentence] + [vocab['<eos>']]

# 将句子转换为索引序列
data = [
    (sentence_to_indices(tokenize_en(english), english_vocab),  # 英文句子和英文词汇表
     sentence_to_indices(tokenize_ch(chinese), chinese_vocab)  # 中文句子和中文词汇表
     )
    for chinese, english in zip(chinese_sentences, english_sentences)
]
print(chinese_vocab)
print(english_vocab)
print(data)


# 数据整理函数
def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_pad = pad_sequence([torch.tensor(s) for s in src_batch], padding_value=0, batch_first=False)
    trg_pad = pad_sequence([torch.tensor(t) for t in trg_batch], padding_value=0, batch_first=False)
    return src_pad, trg_pad

class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

dataset = TranslationDataset(data)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)


# Transformer模型（修改输入输出维度的含义，代码结构不变）
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                 dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.d_model = d_model
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                                          dropout, batch_first=False)
        self.fc1 = nn.Linear(d_model, dim_feedforward)
        self.activation = nn.GELU()
        self.fc2 = nn.Linear(dim_feedforward, output_dim)
        self.dropout = nn.Dropout(dropout)

    def _generate_positional_encoding(self, seq_len):
        position = torch.arange(seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d_model, 2, dtype=torch.float) * (-math.log(10000.0) / self.d_model))
        pe = torch.zeros(seq_len, self.d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(1)  # [1, seq_len, d_model]

    def forward(self, src, trg, trg_mask=None, padding_mask=None):
        src_seq_length, N = src.shape
        trg_seq_length, N = trg.shape
        # 动态生成位置编码
        src_pos = self._generate_positional_encoding(src_seq_length).to(src.device)
        trg_pos = self._generate_positional_encoding(trg_seq_length).to(trg.device)
        # 扩展位置编码的形状以匹配输入
        src_pos = src_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]
        trg_pos = trg_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]

        src = self.dropout(self.embedding(src) + src_pos)
        trg = self.dropout(self.embedding(trg) + trg_pos)
        if trg_mask is None:
            output = self.transformer(src, trg)
        else:
            output = self.transformer(src, trg, tgt_mask=trg_mask, tgt_key_padding_mask=padding_mask)
        output = self.fc1(output)
        output = self.activation(output)
        prediction = self.fc2(output)
        return prediction


# 调整输入输出维度（源语言英文词汇表长度→输入，目标语言中文词汇表长度→输出）
INPUT_DIM = len(english_vocab)  # 源语言（英文）词汇表大小
OUTPUT_DIM = len(chinese_vocab)  # 目标语言（中文）词汇表大小
D_MODEL = 64
NHEAD = 4
NUM_ENCODER_LAYERS = 2
NUM_DECODER_LAYERS = 2
DIM_FEEDFORWARD = 64
DROPOUT = 0.05
MAX_EPOCH = 200

model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD,
                    DROPOUT)
# print(model)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9)


# 训练循环
for epoch in range(MAX_EPOCH):
    for i, (src, trg) in enumerate(dataloader):
        trg_mask = nn.Transformer.generate_square_subsequent_mask(trg.size(0) - 1).bool()
        padding_mask = (trg[1:,] == 0).transpose(0, 1)
        output = model(src, trg[:-1,], trg_mask=trg_mask, padding_mask=padding_mask)
        loss = criterion(output.view(-1, OUTPUT_DIM), trg[1:,].view(-1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if epoch % 100 == 99:
        print(f'Epoch {epoch + 1}, Loss: {loss.item()}')

print("Training Finished")

# 保存模型
model_save_path = "./model/mymodel_en2zh.pth"
os.makedirs("model", exist_ok=True)
torch.save(model.state_dict(), model_save_path)
print(f"Model have saved to {model_save_path}")


# 翻译函数
def translate_sentence(sentence, src_vocab, trg_vocab, model, max_len=50):
    model.eval()
    # 更改分词函数
    tokens = tokenize_en(sentence)
    indices = sentence_to_indices(tokens, src_vocab)
    src_tensor = torch.tensor(indices).unsqueeze(1)
    # src_len=torch.tensor(len(indices)).unsqueeze(0)
    # print("src_tensor:",src_tensor)
    trg_indices = [trg_vocab['<sos>']]  # 目标语言以<sos>开始

    for i in range(max_len):
        trg_tensor = torch.tensor(trg_indices).unsqueeze(1)
        with torch.no_grad():
            output = model(src_tensor, trg_tensor)
        pred_token = output.argmax(2)[-1].item()
        trg_indices.append(pred_token)
        if pred_token == trg_vocab['<eos>']:
            break

    # 转换为目标语言（中文）tokens
    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(i)] for i in trg_indices]
    final_tokens = [token for token in trg_tokens if token not in ['<sos>', '<eos>']]
    return ' '.join(final_tokens)

# 加载模型（结构不变）
loaded_model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD,
                           DROPOUT)
loaded_model.load_state_dict(torch.load(model_save_path, weights_only=True))

# 测试翻译（输入英文句子）
english_test_sentences = ["Hello", "today weather very good", "I love dog", "I like cat", "love cat"]
for sentence in english_test_sentences:
    translation = translate_sentence(sentence, english_vocab, chinese_vocab, loaded_model)
    print(f'Input: {sentence} -> Translated: {translation}')
    

['你好', '今天 天气 很 好', '今天 天气 很 好', '我 爱 学习', '我 喜欢 狗', '天气 很 好', '我 爱 养猫', '我 喜欢 学习', '你好', '今天 天气 很 好', '爱 养猫', '今天', '天气', '很', '好', '我', '爱', '学习', '我', '喜欢', '狗', '猫']
['Hello', 'today weather very good', 'today weather very good', 'I love learning', 'I like dog', 'weather very good', 'I love cat', 'I like study', 'Hello', 'today weather very good', 'love cat', 'today', 'weather', 'very', 'good', 'I', 'love', 'learning', 'I', 'like', 'dog', 'cat']
{'你好': 4, '今天': 5, '天气': 6, '很': 7, '好': 8, '我': 9, '爱': 10, '学习': 11, '喜欢': 12, '狗': 13, '养': 14, '猫': 15, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
{'Hello': 4, 'today': 5, 'weather': 6, 'very': 7, 'good': 8, 'I': 9, 'love': 10, 'learning': 11, 'like': 12, 'dog': 13, 'cat': 14, 'study': 15, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
[([1, 4, 2], [1, 4, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 9, 10, 11, 2], [1, 9, 10, 11, 2]), ([1, 9, 12, 13, 2], [1, 9, 12, 13, 2]), ([1, 6, 7



Epoch 100, Loss: 0.04443631321191788
Epoch 200, Loss: 0.0012153564020991325
Training Finished
Model have saved to ./model/mymodel_en2zh.pth
Input: Hello -> Translated: 你好
Input: today weather very good -> Translated: 今天 天气 很 好
Input: I love dog -> Translated: 我 爱 狗
Input: I like cat -> Translated: 我 喜欢 学习
Input: love cat -> Translated: 爱 养 猫


还是ReLU激活函数效果好，下面尝试另一种损失函数（ nn.KLDivLoss 散度损失，这个损失函数可以用于衡量两个概率分布之间的差异，可能也适用于自然语言处理任务）：

In [5]:
import torch
import spacy
import random
import os
import math

import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter


# 设置随机种子以确保可重复性
torch.manual_seed(42)

# 中文和英文句子
chinese_sentences = [
    "你好", "今天 天气 很 好",
    "今天 天气 很 好",
    "我 爱 学习", "我 喜欢 狗",
    "天气 很 好", "我 爱 养猫", "我 喜欢 学习",
    "你好", "今天 天气 很 好", "爱 养猫",
    "今天", "天气", "很", "好",
    "我", "爱", "学习", "我", "喜欢", "狗", "猫",
]
english_sentences = [
    "Hello", "today weather very good",
    "today weather very good",
    "I love learning", "I like dog",
    "weather very good", "I love cat", "I like study",
    "Hello", "today weather very good", "love cat",
    "today", "weather", "very", "good",
    "I", "love", "learning", "I", "like", "dog", "cat",
]

print(chinese_sentences)
print(english_sentences)


# 加载 spacy 分词器
spacy_ch = spacy.load('zh_core_web_sm')
spacy_en = spacy.load('en_core_web_sm')

# 分词函数
def tokenize_ch(text):
    return [tok.text for tok in spacy_ch.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

# 构建词汇表

def build_vocab(data, min_freq=1):
    counter = Counter()
    for tokens in data:
        counter.update(tokens)
    vocab = {word: idx + 4 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<pad>'] = 0
    vocab['<sos>'] = 1
    vocab['<eos>'] = 2
    vocab['<unk>'] = 3
    return vocab

# 构建中文和英文词汇表
chinese_vocab = build_vocab([tokenize_ch(s) for s in chinese_sentences])
english_vocab = build_vocab([tokenize_en(s) for s in english_sentences])

def sentence_to_indices(sentence, vocab):
    return [vocab['<sos>']] + [vocab.get(word, vocab['<unk>']) for word in sentence] + [vocab['<eos>']]

# 将句子转换为索引序列
data = [
    (sentence_to_indices(tokenize_en(english), english_vocab),  # 英文句子和英文词汇表
     sentence_to_indices(tokenize_ch(chinese), chinese_vocab)  # 中文句子和中文词汇表
     )
    for chinese, english in zip(chinese_sentences, english_sentences)
]
print(chinese_vocab)
print(english_vocab)
print(data)


# 数据整理函数
def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_pad = pad_sequence([torch.tensor(s) for s in src_batch], padding_value=0, batch_first=False)
    trg_pad = pad_sequence([torch.tensor(t) for t in trg_batch], padding_value=0, batch_first=False)
    return src_pad, trg_pad

class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

dataset = TranslationDataset(data)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)


# Transformer模型（修改输入输出维度的含义，代码结构不变）
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                 dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.d_model = d_model
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                                          dropout, batch_first=False)
        self.fc_out = nn.Linear(d_model, output_dim)
        self.dropout = nn.Dropout(dropout)

    def _generate_positional_encoding(self, seq_len):
        position = torch.arange(seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d_model, 2, dtype=torch.float) * (-math.log(10000.0) / self.d_model))
        pe = torch.zeros(seq_len, self.d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(1)  # [1, seq_len, d_model]

    def forward(self, src, trg, trg_mask=None, padding_mask=None):
        src_seq_length, N = src.shape
        trg_seq_length, N = trg.shape
        # 动态生成位置编码
        src_pos = self._generate_positional_encoding(src_seq_length).to(src.device)
        trg_pos = self._generate_positional_encoding(trg_seq_length).to(trg.device)
        # 扩展位置编码的形状以匹配输入
        src_pos = src_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]
        trg_pos = trg_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]

        src = self.dropout(self.embedding(src) + src_pos)
        trg = self.dropout(self.embedding(trg) + trg_pos)
        if trg_mask is None:
            output = self.transformer(src, trg)
        else:
            output = self.transformer(src, trg, tgt_mask=trg_mask, tgt_key_padding_mask=padding_mask)
        prediction = self.fc_out(output)
        return prediction


# 调整输入输出维度（源语言英文词汇表长度→输入，目标语言中文词汇表长度→输出）
INPUT_DIM = len(english_vocab)  # 源语言（英文）词汇表大小
OUTPUT_DIM = len(chinese_vocab)  # 目标语言（中文）词汇表大小
D_MODEL = 64
NHEAD = 4
NUM_ENCODER_LAYERS = 2
NUM_DECODER_LAYERS = 2
DIM_FEEDFORWARD = 64
DROPOUT = 0.05
MAX_EPOCH = 200

model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD,
                    DROPOUT)
# print(model)
# 定义损失函数和优化器
criterion = nn.KLDivLoss(reduction='batchmean', log_target=False)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9)


# 训练循环
for epoch in range(MAX_EPOCH):
    for i, (src, trg) in enumerate(dataloader):
        trg_mask = nn.Transformer.generate_square_subsequent_mask(trg.size(0) - 1).bool()
        padding_mask = (trg[1:,] == 0).transpose(0, 1)
        output = model(src, trg[:-1,], trg_mask=trg_mask, padding_mask=padding_mask)
        # 将模型输出转换为对数概率分布
        output_log_probs = nn.functional.log_softmax(output.view(-1, OUTPUT_DIM), dim=1)
        # 将目标转换为 one - hot 编码
        trg_one_hot = torch.nn.functional.one_hot(trg[1:,].view(-1), num_classes=OUTPUT_DIM).float()
        loss = criterion(output_log_probs, trg_one_hot)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if epoch % 100 == 99:
        print(f'Epoch {epoch + 1}, Loss: {loss.item()}')

print("Training Finished")

# 保存模型
model_save_path = "./model/mymodel_en2zh.pth"
os.makedirs("model", exist_ok=True)
torch.save(model.state_dict(), model_save_path)
print(f"Model have saved to {model_save_path}")


# 翻译函数
def translate_sentence(sentence, src_vocab, trg_vocab, model, max_len=50):
    model.eval()
    # 更改分词函数
    tokens = tokenize_en(sentence)
    indices = sentence_to_indices(tokens, src_vocab)
    src_tensor = torch.tensor(indices).unsqueeze(1)
    # src_len=torch.tensor(len(indices)).unsqueeze(0)
    # print("src_tensor:",src_tensor)
    trg_indices = [trg_vocab['<sos>']]  # 目标语言以<sos>开始

    for i in range(max_len):
        trg_tensor = torch.tensor(trg_indices).unsqueeze(1)
        with torch.no_grad():
            output = model(src_tensor, trg_tensor)
        pred_token = output.argmax(2)[-1].item()
        trg_indices.append(pred_token)
        if pred_token == trg_vocab['<eos>']:
            break

    # 转换为目标语言（中文）tokens
    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(i)] for i in trg_indices]
    final_tokens = [token for token in trg_tokens if token not in ['<sos>', '<eos>']]
    return ' '.join(final_tokens)

# 加载模型（结构不变）
loaded_model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD,
                           DROPOUT)
loaded_model.load_state_dict(torch.load(model_save_path, weights_only=True))

# 测试翻译（输入英文句子）
english_test_sentences = ["Hello", "today weather very good", "I love dog", "I like cat", "love cat"]
for sentence in english_test_sentences:
    translation = translate_sentence(sentence, english_vocab, chinese_vocab, loaded_model)
    print(f'Input: {sentence} -> Translated: {translation}')
    

['你好', '今天 天气 很 好', '今天 天气 很 好', '我 爱 学习', '我 喜欢 狗', '天气 很 好', '我 爱 养猫', '我 喜欢 学习', '你好', '今天 天气 很 好', '爱 养猫', '今天', '天气', '很', '好', '我', '爱', '学习', '我', '喜欢', '狗', '猫']
['Hello', 'today weather very good', 'today weather very good', 'I love learning', 'I like dog', 'weather very good', 'I love cat', 'I like study', 'Hello', 'today weather very good', 'love cat', 'today', 'weather', 'very', 'good', 'I', 'love', 'learning', 'I', 'like', 'dog', 'cat']
{'你好': 4, '今天': 5, '天气': 6, '很': 7, '好': 8, '我': 9, '爱': 10, '学习': 11, '喜欢': 12, '狗': 13, '养': 14, '猫': 15, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
{'Hello': 4, 'today': 5, 'weather': 6, 'very': 7, 'good': 8, 'I': 9, 'love': 10, 'learning': 11, 'like': 12, 'dog': 13, 'cat': 14, 'study': 15, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
[([1, 4, 2], [1, 4, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 9, 10, 11, 2], [1, 9, 10, 11, 2]), ([1, 9, 12, 13, 2], [1, 9, 12, 13, 2]), ([1, 6, 7

和CrossEntropyLoss的效果差不多，但收敛速度可以更快。以下再尝试`optim.SGD(model.parameters(), lr=0.005, momentum=0.9)`，可以在训练过程中根据损失函数的梯度更新模型参数，利用动量来加速收敛可能可以避免陷入局部最优解：

In [6]:
import torch
import spacy
import random
import os
import math

import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter


# 设置随机种子以确保可重复性
torch.manual_seed(42)

# 中文和英文句子
chinese_sentences = [
    "你好", "今天 天气 很 好",
    "今天 天气 很 好",
    "我 爱 学习", "我 喜欢 狗",
    "天气 很 好", "我 爱 养猫", "我 喜欢 学习",
    "你好", "今天 天气 很 好", "爱 养猫",
    "今天", "天气", "很", "好",
    "我", "爱", "学习", "我", "喜欢", "狗", "猫",
]
english_sentences = [
    "Hello", "today weather very good",
    "today weather very good",
    "I love learning", "I like dog",
    "weather very good", "I love cat", "I like study",
    "Hello", "today weather very good", "love cat",
    "today", "weather", "very", "good",
    "I", "love", "learning", "I", "like", "dog", "cat",
]

print(chinese_sentences)
print(english_sentences)


# 加载 spacy 分词器
spacy_ch = spacy.load('zh_core_web_sm')
spacy_en = spacy.load('en_core_web_sm')

# 分词函数
def tokenize_ch(text):
    return [tok.text for tok in spacy_ch.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

# 构建词汇表

def build_vocab(data, min_freq=1):
    counter = Counter()
    for tokens in data:
        counter.update(tokens)
    vocab = {word: idx + 4 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<pad>'] = 0
    vocab['<sos>'] = 1
    vocab['<eos>'] = 2
    vocab['<unk>'] = 3
    return vocab

# 构建中文和英文词汇表
chinese_vocab = build_vocab([tokenize_ch(s) for s in chinese_sentences])
english_vocab = build_vocab([tokenize_en(s) for s in english_sentences])

def sentence_to_indices(sentence, vocab):
    return [vocab['<sos>']] + [vocab.get(word, vocab['<unk>']) for word in sentence] + [vocab['<eos>']]

# 将句子转换为索引序列
data = [
    (sentence_to_indices(tokenize_en(english), english_vocab),  # 英文句子和英文词汇表
     sentence_to_indices(tokenize_ch(chinese), chinese_vocab)  # 中文句子和中文词汇表
     )
    for chinese, english in zip(chinese_sentences, english_sentences)
]
print(chinese_vocab)
print(english_vocab)
print(data)


# 数据整理函数
def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_pad = pad_sequence([torch.tensor(s) for s in src_batch], padding_value=0, batch_first=False)
    trg_pad = pad_sequence([torch.tensor(t) for t in trg_batch], padding_value=0, batch_first=False)
    return src_pad, trg_pad

class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

dataset = TranslationDataset(data)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)


# Transformer模型（修改输入输出维度的含义，代码结构不变）
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                 dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.d_model = d_model
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                                          dropout, batch_first=False)
        self.fc_out = nn.Linear(d_model, output_dim)
        self.dropout = nn.Dropout(dropout)

    def _generate_positional_encoding(self, seq_len):
        position = torch.arange(seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d_model, 2, dtype=torch.float) * (-math.log(10000.0) / self.d_model))
        pe = torch.zeros(seq_len, self.d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(1)  # [1, seq_len, d_model]

    def forward(self, src, trg, trg_mask=None, padding_mask=None):
        src_seq_length, N = src.shape
        trg_seq_length, N = trg.shape
        # 动态生成位置编码
        src_pos = self._generate_positional_encoding(src_seq_length).to(src.device)
        trg_pos = self._generate_positional_encoding(trg_seq_length).to(trg.device)
        # 扩展位置编码的形状以匹配输入
        src_pos = src_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]
        trg_pos = trg_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]

        src = self.dropout(self.embedding(src) + src_pos)
        trg = self.dropout(self.embedding(trg) + trg_pos)
        if trg_mask is None:
            output = self.transformer(src, trg)
        else:
            output = self.transformer(src, trg, tgt_mask=trg_mask, tgt_key_padding_mask=padding_mask)
        prediction = self.fc_out(output)
        return prediction


# 调整输入输出维度（源语言英文词汇表长度→输入，目标语言中文词汇表长度→输出）
INPUT_DIM = len(english_vocab)  # 源语言（英文）词汇表大小
OUTPUT_DIM = len(chinese_vocab)  # 目标语言（中文）词汇表大小
D_MODEL = 64
NHEAD = 4
NUM_ENCODER_LAYERS = 2
NUM_DECODER_LAYERS = 2
DIM_FEEDFORWARD = 64
DROPOUT = 0.05
MAX_EPOCH = 200

model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD,
                    DROPOUT)
# print(model)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9)

# 训练循环
for epoch in range(MAX_EPOCH):
    for i, (src, trg) in enumerate(dataloader):
        trg_mask = nn.Transformer.generate_square_subsequent_mask(trg.size(0) - 1).bool()
        padding_mask = (trg[1:,] == 0).transpose(0, 1)
        output = model(src, trg[:-1,], trg_mask=trg_mask, padding_mask=padding_mask)
        loss = criterion(output.view(-1, OUTPUT_DIM), trg[1:,].view(-1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if epoch % 100 == 99:
        print(f'Epoch {epoch + 1}, Loss: {loss.item()}')

print("Training Finished")

# 保存模型
model_save_path = "./model/mymodel_en2zh.pth"
os.makedirs("model", exist_ok=True)
torch.save(model.state_dict(), model_save_path)
print(f"Model have saved to {model_save_path}")


# 翻译函数
def translate_sentence(sentence, src_vocab, trg_vocab, model, max_len=50):
    model.eval()
    # 更改分词函数
    tokens = tokenize_en(sentence)
    indices = sentence_to_indices(tokens, src_vocab)
    src_tensor = torch.tensor(indices).unsqueeze(1)
    # src_len=torch.tensor(len(indices)).unsqueeze(0)
    # print("src_tensor:",src_tensor)
    trg_indices = [trg_vocab['<sos>']]  # 目标语言以<sos>开始

    for i in range(max_len):
        trg_tensor = torch.tensor(trg_indices).unsqueeze(1)
        with torch.no_grad():
            output = model(src_tensor, trg_tensor)
        pred_token = output.argmax(2)[-1].item()
        trg_indices.append(pred_token)
        if pred_token == trg_vocab['<eos>']:
            break

    # 转换为目标语言（中文）tokens
    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(i)] for i in trg_indices]
    final_tokens = [token for token in trg_tokens if token not in ['<sos>', '<eos>']]
    return ' '.join(final_tokens)

# 加载模型（结构不变）
loaded_model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD,
                           DROPOUT)
loaded_model.load_state_dict(torch.load(model_save_path, weights_only=True))

# 测试翻译（输入英文句子）
english_test_sentences = ["Hello", "today weather very good", "I love dog", "I like cat", "love cat"]
for sentence in english_test_sentences:
    translation = translate_sentence(sentence, english_vocab, chinese_vocab, loaded_model)
    print(f'Input: {sentence} -> Translated: {translation}')
    

['你好', '今天 天气 很 好', '今天 天气 很 好', '我 爱 学习', '我 喜欢 狗', '天气 很 好', '我 爱 养猫', '我 喜欢 学习', '你好', '今天 天气 很 好', '爱 养猫', '今天', '天气', '很', '好', '我', '爱', '学习', '我', '喜欢', '狗', '猫']
['Hello', 'today weather very good', 'today weather very good', 'I love learning', 'I like dog', 'weather very good', 'I love cat', 'I like study', 'Hello', 'today weather very good', 'love cat', 'today', 'weather', 'very', 'good', 'I', 'love', 'learning', 'I', 'like', 'dog', 'cat']
{'你好': 4, '今天': 5, '天气': 6, '很': 7, '好': 8, '我': 9, '爱': 10, '学习': 11, '喜欢': 12, '狗': 13, '养': 14, '猫': 15, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
{'Hello': 4, 'today': 5, 'weather': 6, 'very': 7, 'good': 8, 'I': 9, 'love': 10, 'learning': 11, 'like': 12, 'dog': 13, 'cat': 14, 'study': 15, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
[([1, 4, 2], [1, 4, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 9, 10, 11, 2], [1, 9, 10, 11, 2]), ([1, 9, 12, 13, 2], [1, 9, 12, 13, 2]), ([1, 6, 7

实际效果并不是很好，还是使用原来的学习率策略，最后扫一下dropouts，batch_sizes，max_epochs等参数：

In [7]:
import torch
import spacy
import random
import os
import math

import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter


# 设置随机种子以确保可重复性
torch.manual_seed(42)

# 中文和英文句子
chinese_sentences = [
    "你好", "今天 天气 很 好",
    "今天 天气 很 好",
    "我 爱 学习", "我 喜欢 狗",
    "天气 很 好", "我 爱 养猫", "我 喜欢 学习",
    "你好", "今天 天气 很 好", "爱 养猫",
    "今天", "天气", "很", "好",
    "我", "爱", "学习", "我", "喜欢", "狗", "猫",
]
english_sentences = [
    "Hello", "today weather very good",
    "today weather very good",
    "I love learning", "I like dog",
    "weather very good", "I love cat", "I like study",
    "Hello", "today weather very good", "love cat",
    "today", "weather", "very", "good",
    "I", "love", "learning", "I", "like", "dog", "cat",
]

print(chinese_sentences)
print(english_sentences)


# 加载 spacy 分词器
spacy_ch = spacy.load('zh_core_web_sm')
spacy_en = spacy.load('en_core_web_sm')

# 分词函数
def tokenize_ch(text):
    return [tok.text for tok in spacy_ch.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

# 构建词汇表

def build_vocab(data, min_freq=1):
    counter = Counter()
    for tokens in data:
        counter.update(tokens)
    vocab = {word: idx + 4 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<pad>'] = 0
    vocab['<sos>'] = 1
    vocab['<eos>'] = 2
    vocab['<unk>'] = 3
    return vocab

# 构建中文和英文词汇表
chinese_vocab = build_vocab([tokenize_ch(s) for s in chinese_sentences])
english_vocab = build_vocab([tokenize_en(s) for s in english_sentences])

def sentence_to_indices(sentence, vocab):
    return [vocab['<sos>']] + [vocab.get(word, vocab['<unk>']) for word in sentence] + [vocab['<eos>']]

# 将句子转换为索引序列
data = [
    (sentence_to_indices(tokenize_en(english), english_vocab),  # 英文句子和英文词汇表
     sentence_to_indices(tokenize_ch(chinese), chinese_vocab)  # 中文句子和中文词汇表
     )
    for chinese, english in zip(chinese_sentences, english_sentences)
]
print(chinese_vocab)
print(english_vocab)
print(data)


# 数据整理函数
def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_pad = pad_sequence([torch.tensor(s) for s in src_batch], padding_value=0, batch_first=False)
    trg_pad = pad_sequence([torch.tensor(t) for t in trg_batch], padding_value=0, batch_first=False)
    return src_pad, trg_pad

class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]


# Transformer模型（修改输入输出维度的含义，代码结构不变）
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                 dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.d_model = d_model
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                                          dropout, batch_first=False)
        self.fc_out = nn.Linear(d_model, output_dim)
        self.dropout = nn.Dropout(dropout)

    def _generate_positional_encoding(self, seq_len):
        position = torch.arange(seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d_model, 2, dtype=torch.float) * (-math.log(10000.0) / self.d_model))
        pe = torch.zeros(seq_len, self.d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(1)  # [1, seq_len, d_model]

    def forward(self, src, trg, trg_mask=None, padding_mask=None):
        src_seq_length, N = src.shape
        trg_seq_length, N = trg.shape
        # 动态生成位置编码
        src_pos = self._generate_positional_encoding(src_seq_length).to(src.device)
        trg_pos = self._generate_positional_encoding(trg_seq_length).to(trg.device)
        # 扩展位置编码的形状以匹配输入
        src_pos = src_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]
        trg_pos = trg_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]

        src = self.dropout(self.embedding(src) + src_pos)
        trg = self.dropout(self.embedding(trg) + trg_pos)
        if trg_mask is None:
            output = self.transformer(src, trg)
        else:
            output = self.transformer(src, trg, tgt_mask=trg_mask, tgt_key_padding_mask=padding_mask)
        prediction = self.fc_out(output)
        return prediction


# 调整输入输出维度（源语言英文词汇表长度→输入，目标语言中文词汇表长度→输出）
INPUT_DIM = len(english_vocab)  # 源语言（英文）词汇表大小
OUTPUT_DIM = len(chinese_vocab)  # 目标语言（中文）词汇表大小
D_MODEL = 64
NHEAD = 4
NUM_ENCODER_LAYERS = 2
NUM_DECODER_LAYERS = 2
DIM_FEEDFORWARD = 64

# 定义要搜索的参数网格
learning_rates = [0.001, 0.01]
dropouts = [0.05, 0.1]
batch_sizes = [8, 16]
max_epochs = [200, 300]

best_loss = float('inf')
best_lr = None
best_dropout = None
best_batch_size = None
best_max_epoch = None

for lr in learning_rates:
    for dropout in dropouts:
        for batch_size in batch_sizes:
            for max_epoch in max_epochs:
                dataset = TranslationDataset(data)
                dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

                model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS,
                                    DIM_FEEDFORWARD, dropout)

                # 定义损失函数和优化器
                criterion = nn.KLDivLoss(reduction='batchmean', log_target=False)
                optimizer = torch.optim.Adam(model.parameters(), lr=lr)

                total_loss = 0
                # 训练循环
                for epoch in range(max_epoch):
                    for i, (src, trg) in enumerate(dataloader):
                        trg_mask = nn.Transformer.generate_square_subsequent_mask(trg.size(0) - 1).bool()
                        padding_mask = (trg[1:,] == 0).transpose(0, 1)
                        output = model(src, trg[:-1,], trg_mask=trg_mask, padding_mask=padding_mask)
                        # 将模型输出转换为对数概率分布
                        output_log_probs = nn.functional.log_softmax(output.view(-1, OUTPUT_DIM), dim=1)
                        # 将目标转换为 one - hot 编码
                        trg_one_hot = torch.nn.functional.one_hot(trg[1:,].view(-1), num_classes=OUTPUT_DIM).float()
                        loss = criterion(output_log_probs, trg_one_hot)
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()
                        total_loss += loss.item()

                    if epoch % 100 == 99:
                        print(f'Epoch {epoch + 1}, Loss: {loss.item()}, LR: {lr}, Dropout: {dropout}, Batch Size: {batch_size}, Max Epoch: {max_epoch}')

                avg_loss = total_loss / (max_epoch * len(dataloader))
                if avg_loss < best_loss:
                    best_loss = avg_loss
                    best_lr = lr
                    best_dropout = dropout
                    best_batch_size = batch_size
                    best_max_epoch = max_epoch

print(f"Best learning rate: {best_lr}, Best dropout: {best_dropout}, Best batch size: {best_batch_size}, Best max epoch: {best_max_epoch}, Best loss: {best_loss}")

# 使用最佳参数重新训练模型
dataset = TranslationDataset(data)
dataloader = DataLoader(dataset, batch_size=best_batch_size, shuffle=True, collate_fn=collate_fn)

model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS,
                    DIM_FEEDFORWARD, best_dropout)
criterion = nn.KLDivLoss(reduction='batchmean', log_target=False)
optimizer = torch.optim.Adam(model.parameters(), lr=best_lr)

for epoch in range(best_max_epoch):
    for i, (src, trg) in enumerate(dataloader):
        trg_mask = nn.Transformer.generate_square_subsequent_mask(trg.size(0) - 1).bool()
        padding_mask = (trg[1:,] == 0).transpose(0, 1)
        output = model(src, trg[:-1,], trg_mask=trg_mask, padding_mask=padding_mask)
        # 将模型输出转换为对数概率分布
        output_log_probs = nn.functional.log_softmax(output.view(-1, OUTPUT_DIM), dim=1)
        # 将目标转换为 one - hot 编码
        trg_one_hot = torch.nn.functional.one_hot(trg[1:,].view(-1), num_classes=OUTPUT_DIM).float()
        loss = criterion(output_log_probs, trg_one_hot)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if epoch % 100 == 99:
        print(f'Epoch {epoch + 1}, Loss: {loss.item()}')

print("Training Finished")

# 保存模型
model_save_path = "./model/mymodel_en2zh.pth"
os.makedirs("model", exist_ok=True)
torch.save(model.state_dict(), model_save_path)
print(f"Model have saved to {model_save_path}")


# 翻译函数
def translate_sentence(sentence, src_vocab, trg_vocab, model, max_len=50):
    model.eval()
    # 更改分词函数
    tokens = tokenize_en(sentence)
    indices = sentence_to_indices(tokens, src_vocab)
    src_tensor = torch.tensor(indices).unsqueeze(1)
    # src_len=torch.tensor(len(indices)).unsqueeze(0)
    # print("src_tensor:",src_tensor)
    trg_indices = [trg_vocab['<sos>']]  # 目标语言以<sos>开始

    for i in range(max_len):
        trg_tensor = torch.tensor(trg_indices).unsqueeze(1)
        with torch.no_grad():
            output = model(src_tensor, trg_tensor)
        pred_token = output.argmax(2)[-1].item()
        trg_indices.append(pred_token)
        if pred_token == trg_vocab['<eos>']:
            break

    # 转换为目标语言（中文）tokens
    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(i)] for i in trg_indices]
    final_tokens = [token for token in trg_tokens if token not in ['<sos>', '<eos>']]
    return ' '.join(final_tokens)

# 加载模型（结构不变）
loaded_model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD,
                           best_dropout)
loaded_model.load_state_dict(torch.load(model_save_path, weights_only=True))

# 测试翻译（输入英文句子）
english_test_sentences = ["Hello", "today weather very good", "I love dog", "I like cat", "love cat"]
for sentence in english_test_sentences:
    translation = translate_sentence(sentence, english_vocab, chinese_vocab, loaded_model)
    print(f'Input: {sentence} -> Translated: {translation}')
    

['你好', '今天 天气 很 好', '今天 天气 很 好', '我 爱 学习', '我 喜欢 狗', '天气 很 好', '我 爱 养猫', '我 喜欢 学习', '你好', '今天 天气 很 好', '爱 养猫', '今天', '天气', '很', '好', '我', '爱', '学习', '我', '喜欢', '狗', '猫']
['Hello', 'today weather very good', 'today weather very good', 'I love learning', 'I like dog', 'weather very good', 'I love cat', 'I like study', 'Hello', 'today weather very good', 'love cat', 'today', 'weather', 'very', 'good', 'I', 'love', 'learning', 'I', 'like', 'dog', 'cat']
{'你好': 4, '今天': 5, '天气': 6, '很': 7, '好': 8, '我': 9, '爱': 10, '学习': 11, '喜欢': 12, '狗': 13, '养': 14, '猫': 15, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
{'Hello': 4, 'today': 5, 'weather': 6, 'very': 7, 'good': 8, 'I': 9, 'love': 10, 'learning': 11, 'like': 12, 'dog': 13, 'cat': 14, 'study': 15, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
[([1, 4, 2], [1, 4, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 9, 10, 11, 2], [1, 9, 10, 11, 2]), ([1, 9, 12, 13, 2], [1, 9, 12, 13, 2]), ([1, 6, 7



Epoch 100, Loss: 0.00631779246032238, LR: 0.001, Dropout: 0.05, Batch Size: 8, Max Epoch: 200
Epoch 200, Loss: 0.0035212922375649214, LR: 0.001, Dropout: 0.05, Batch Size: 8, Max Epoch: 200
Epoch 100, Loss: 0.009551836177706718, LR: 0.001, Dropout: 0.05, Batch Size: 8, Max Epoch: 300
Epoch 200, Loss: 0.0023489058949053288, LR: 0.001, Dropout: 0.05, Batch Size: 8, Max Epoch: 300
Epoch 300, Loss: 0.008600478991866112, LR: 0.001, Dropout: 0.05, Batch Size: 8, Max Epoch: 300
Epoch 100, Loss: 0.03161943331360817, LR: 0.001, Dropout: 0.05, Batch Size: 16, Max Epoch: 200
Epoch 200, Loss: 0.004653779324144125, LR: 0.001, Dropout: 0.05, Batch Size: 16, Max Epoch: 200
Epoch 100, Loss: 0.01760675199329853, LR: 0.001, Dropout: 0.05, Batch Size: 16, Max Epoch: 300
Epoch 200, Loss: 0.0051731388084590435, LR: 0.001, Dropout: 0.05, Batch Size: 16, Max Epoch: 300
Epoch 300, Loss: 0.004482160322368145, LR: 0.001, Dropout: 0.05, Batch Size: 16, Max Epoch: 300
Epoch 100, Loss: 0.014309111051261425, LR: 0.

最佳参数是Best learning rate: 0.001, Best dropout: 0.05, Best batch size: 8, Best max epoch: 300

以下为完整的最终模型，具体修改为： 

D_MODEL：模型的隐藏层维度，值为 64    
NHEAD：多头注意力机制的头数，值为 4    
NUM_ENCODER_LAYERS：编码器的层数，值为 2 （不变）   
NUM_DECODER_LAYERS：解码器的层数，值为 2  （不变）  
DIM_FEEDFORWARD：前馈神经网络的隐藏层维度，值为 64    
DROPOUT：Dropout 概率，值为 0.05   （不变）    
MAX_EPOCH：训练轮次，值为 300   
batch_size：批大小，在 DataLoader 里设置为 8   （不变）
lr：学习率，在 Adam 优化器中设置为 0.001  （不变）

损失函数：交叉熵损失和散度损失效果差不多，散度损失收敛速度可以更快，epoch200就可以了 

In [9]:
import torch
import spacy
import random
import os
import math

import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter


# 设置随机种子以确保可重复性
torch.manual_seed(42)

# 中文和英文句子
chinese_sentences = [
    "你好", "今天 天气 很 好",
    "今天 天气 很 好",
    "我 爱 学习", "我 喜欢 狗",
    "天气 很 好", "我 爱 养猫", "我 喜欢 学习",
    "你好", "今天 天气 很 好", "爱 养猫",
    "今天", "天气", "很", "好",
    "我", "爱", "学习", "我", "喜欢", "狗", "猫",
]
english_sentences = [
    "Hello", "today weather very good",
    "today weather very good",
    "I love learning", "I like dog",
    "weather very good", "I love cat", "I like study",
    "Hello", "today weather very good", "love cat",
    "today", "weather", "very", "good",
    "I", "love", "learning", "I", "like", "dog", "cat",
]

print(chinese_sentences)
print(english_sentences)


# 加载 spacy 分词器
spacy_ch = spacy.load('zh_core_web_sm')
spacy_en = spacy.load('en_core_web_sm')

# 分词函数
def tokenize_ch(text):
    return [tok.text for tok in spacy_ch.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

# 构建词汇表

def build_vocab(data, min_freq=1):
    counter = Counter()
    for tokens in data:
        counter.update(tokens)
    vocab = {word: idx + 4 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<pad>'] = 0
    vocab['<sos>'] = 1
    vocab['<eos>'] = 2
    vocab['<unk>'] = 3
    return vocab

# 构建中文和英文词汇表
chinese_vocab = build_vocab([tokenize_ch(s) for s in chinese_sentences])
english_vocab = build_vocab([tokenize_en(s) for s in english_sentences])

def sentence_to_indices(sentence, vocab):
    return [vocab['<sos>']] + [vocab.get(word, vocab['<unk>']) for word in sentence] + [vocab['<eos>']]

# 将句子转换为索引序列
data = [
    (sentence_to_indices(tokenize_en(english), english_vocab),  # 英文句子和英文词汇表
     sentence_to_indices(tokenize_ch(chinese), chinese_vocab)  # 中文句子和中文词汇表
     )
    for chinese, english in zip(chinese_sentences, english_sentences)
]
print(chinese_vocab)
print(english_vocab)
print(data)


# 数据整理函数
def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_pad = pad_sequence([torch.tensor(s) for s in src_batch], padding_value=0, batch_first=False)
    trg_pad = pad_sequence([torch.tensor(t) for t in trg_batch], padding_value=0, batch_first=False)
    return src_pad, trg_pad

class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

dataset = TranslationDataset(data)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)


# Transformer模型（修改输入输出维度的含义，代码结构不变）
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                 dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.d_model = d_model
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward,
                                          dropout, batch_first=False)
        self.fc_out = nn.Linear(d_model, output_dim)
        self.dropout = nn.Dropout(dropout)

    def _generate_positional_encoding(self, seq_len):
        position = torch.arange(seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d_model, 2, dtype=torch.float) * (-math.log(10000.0) / self.d_model))
        pe = torch.zeros(seq_len, self.d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(1)  # [1, seq_len, d_model]

    def forward(self, src, trg, trg_mask=None, padding_mask=None):
        src_seq_length, N = src.shape
        trg_seq_length, N = trg.shape
        # 动态生成位置编码
        src_pos = self._generate_positional_encoding(src_seq_length).to(src.device)
        trg_pos = self._generate_positional_encoding(trg_seq_length).to(trg.device)
        # 扩展位置编码的形状以匹配输入
        src_pos = src_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]
        trg_pos = trg_pos.expand(-1, N, -1)  # [1, seq_len, d_model] -> [seq_len, batch_size, d_model]

        src = self.dropout(self.embedding(src) + src_pos)
        trg = self.dropout(self.embedding(trg) + trg_pos)
        if trg_mask is None:
            output = self.transformer(src, trg)
        else:
            output = self.transformer(src, trg, tgt_mask=trg_mask, tgt_key_padding_mask=padding_mask)
        prediction = self.fc_out(output)
        return prediction


# 调整输入输出维度（源语言英文词汇表长度→输入，目标语言中文词汇表长度→输出）
INPUT_DIM = len(english_vocab)  # 源语言（英文）词汇表大小
OUTPUT_DIM = len(chinese_vocab)  # 目标语言（中文）词汇表大小
D_MODEL = 64
NHEAD = 4
NUM_ENCODER_LAYERS = 2
NUM_DECODER_LAYERS = 2
DIM_FEEDFORWARD = 64
DROPOUT = 0.05
MAX_EPOCH = 200

model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD,
                    DROPOUT)
# print(model)
# 定义损失函数和优化器
criterion = nn.KLDivLoss(reduction='batchmean', log_target=False)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9)


# 训练循环
for epoch in range(MAX_EPOCH):
    for i, (src, trg) in enumerate(dataloader):
        trg_mask = nn.Transformer.generate_square_subsequent_mask(trg.size(0) - 1).bool()
        padding_mask = (trg[1:,] == 0).transpose(0, 1)
        output = model(src, trg[:-1,], trg_mask=trg_mask, padding_mask=padding_mask)
        # 将模型输出转换为对数概率分布
        output_log_probs = nn.functional.log_softmax(output.view(-1, OUTPUT_DIM), dim=1)
        # 将目标转换为 one - hot 编码
        trg_one_hot = torch.nn.functional.one_hot(trg[1:,].view(-1), num_classes=OUTPUT_DIM).float()
        loss = criterion(output_log_probs, trg_one_hot)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if epoch % 100 == 99:
        print(f'Epoch {epoch + 1}, Loss: {loss.item()}')

print("Training Finished")

# 保存模型
model_save_path = "./model/mymodel_en2zh.pth"
os.makedirs("model", exist_ok=True)
torch.save(model.state_dict(), model_save_path)
print(f"Model have saved to {model_save_path}")


# 翻译函数
def translate_sentence(sentence, src_vocab, trg_vocab, model, max_len=50):
    model.eval()
    # 更改分词函数
    tokens = tokenize_en(sentence)
    indices = sentence_to_indices(tokens, src_vocab)
    src_tensor = torch.tensor(indices).unsqueeze(1)
    # src_len=torch.tensor(len(indices)).unsqueeze(0)
    # print("src_tensor:",src_tensor)
    trg_indices = [trg_vocab['<sos>']]  # 目标语言以<sos>开始

    for i in range(max_len):
        trg_tensor = torch.tensor(trg_indices).unsqueeze(1)
        with torch.no_grad():
            output = model(src_tensor, trg_tensor)
        pred_token = output.argmax(2)[-1].item()
        trg_indices.append(pred_token)
        if pred_token == trg_vocab['<eos>']:
            break

    # 转换为目标语言（中文）tokens
    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(i)] for i in trg_indices]
    final_tokens = [token for token in trg_tokens if token not in ['<sos>', '<eos>']]
    return ' '.join(final_tokens)

# 加载模型（结构不变）
loaded_model = Transformer(INPUT_DIM, OUTPUT_DIM, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD,
                           DROPOUT)
loaded_model.load_state_dict(torch.load(model_save_path, weights_only=True))

# 测试翻译（输入英文句子）
english_test_sentences = ["Hello", "today weather very good", "I love dog", "I like cat", "love cat"]
for sentence in english_test_sentences:
    translation = translate_sentence(sentence, english_vocab, chinese_vocab, loaded_model)
    print(f'Input: {sentence} -> Translated: {translation}')
    

['你好', '今天 天气 很 好', '今天 天气 很 好', '我 爱 学习', '我 喜欢 狗', '天气 很 好', '我 爱 养猫', '我 喜欢 学习', '你好', '今天 天气 很 好', '爱 养猫', '今天', '天气', '很', '好', '我', '爱', '学习', '我', '喜欢', '狗', '猫']
['Hello', 'today weather very good', 'today weather very good', 'I love learning', 'I like dog', 'weather very good', 'I love cat', 'I like study', 'Hello', 'today weather very good', 'love cat', 'today', 'weather', 'very', 'good', 'I', 'love', 'learning', 'I', 'like', 'dog', 'cat']
{'你好': 4, '今天': 5, '天气': 6, '很': 7, '好': 8, '我': 9, '爱': 10, '学习': 11, '喜欢': 12, '狗': 13, '养': 14, '猫': 15, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
{'Hello': 4, 'today': 5, 'weather': 6, 'very': 7, 'good': 8, 'I': 9, 'love': 10, 'learning': 11, 'like': 12, 'dog': 13, 'cat': 14, 'study': 15, '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
[([1, 4, 2], [1, 4, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 5, 6, 7, 8, 2], [1, 5, 6, 7, 8, 2]), ([1, 9, 10, 11, 2], [1, 9, 10, 11, 2]), ([1, 9, 12, 13, 2], [1, 9, 12, 13, 2]), ([1, 6, 7