# <center>字符级Seq2Seq翻译模型实现</center>

一个基于PyTorch的Seq2Seq代码展示了如何使用Python和PyTorch构建和训练一个序列到序列的机器翻译模型，进行英语到德语的文本翻译。。

1. 设置环境与模型定义

*  数据预处理：加载并清洗英德平行语料
*  字符级分词：构建字符词汇表
*  模型构建：实现编码器-解码器架构
*  训练循环：使用Teacher Forcing策略
*  评估与预测：实现翻译功能


In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.utils import to_categorical
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm


In [None]:
# ==============================================
# 字符级Seq2Seq翻译模型 - 字母表定义与Tokenizer初始化
# 功能：定义英德字母表并创建对应的字符级Tokenizer
# 说明：
# 1. 英语字母表仅包含26个基本字母
# 2. 德语字母表额外包含ä, ö, ü, ß等特殊字符
# 3. 包含制表符\t和换行符\n作为特殊控制字符
# ==============================================


# 定义字母表
english_alphabet = 'abcdefghijklmnopqrstuvwxyz'
german_alphabet = 'abcdefghijklmnopqrstuvwxyzäöüß\t\n'

# 创建英文的Tokenizer
english_tokenizer = Tokenizer(char_level=True, lower=True)
english_tokenizer.fit_on_texts([english_alphabet])
english_word_index = english_tokenizer.word_index

# 创建德文的Tokenizer
german_tokenizer = Tokenizer(char_level=True, lower=True)
german_tokenizer.fit_on_texts([german_alphabet])
german_word_index = german_tokenizer.word_index

# 验证字典大小（可以调整断言范围）
print(f"English tokenizer size: {len(english_word_index)}")
print(f"German tokenizer size: {len(german_word_index)}")


English tokenizer size: 26
German tokenizer size: 32


In [None]:
# ==============================================
# 数据加载与预处理模块
# 功能：从deu-eng文件夹加载英德平行语料库
# 处理流程：
# 1. 遍历指定目录下的所有.txt文件
# 2. 按制表符分割每行内容（格式：英文\t德文\t来源）
# 3. 对文本进行标准化处理（转为小写）
# 4. 为德文句子添加起始符\t和终止符\n
# 注意事项：
# - 文件编码必须为UTF-8以支持特殊字符
# - 每行应有3个字段，否则会被过滤
# - 最终输出英文和德文句子列表
# ==============================================


data_path = 'deu-eng'
english_sentences = []
german_sentences = []

for file_name in os.listdir(data_path):
    if file_name.endswith('.txt'):
        with open(os.path.join(data_path, file_name), 'r', encoding='utf-8') as file:
            for line in file.readlines():
                parts = line.strip().split('\t')
                if len(parts) == 3:
                    english_sentences.append(parts[0].lower())
                    german_sentences.append('\t' + parts[1].lower() + '\n')

print("Total sentence pairs:", len(english_sentences))


Total sentence pairs: 278168


In [None]:
# ==============================================
# 数据预处理与张量转换模块
# 功能：将文本序列转换为模型可处理的张量格式
# 处理流程：
# 1. 使用Tokenizer将文本转换为数字序列
# 2. 将数字序列转换为one-hot编码
# 3. 计算编码器和解码器的最大序列长度
# 4. 初始化三维零矩阵作为输入/输出容器
# 5. 填充数据到固定长度矩阵中
# 6. 将numpy数组转换为PyTorch张量
# 注意事项：
# - 英文和德文使用独立的Tokenizer
# - one-hot编码维度为词汇表大小+1（保留0作为padding）
# - 解码器目标数据比输入数据偏移一个时间步
# - 最终输出三个张量：
#   - encoder_input_data: (样本数, 最大英文长度, 英文词汇量)
#   - decoder_input_data: (样本数, 最大德文长度, 德文词汇量) 
#   - decoder_target_data: 同上，作为训练目标
# ==============================================



english_sequences = english_tokenizer.texts_to_sequences(english_sentences)
german_sequences = german_tokenizer.texts_to_sequences(german_sentences)

english_onehot = [to_categorical(seq, num_classes=len(english_word_index) + 1) for seq in english_sequences]
german_onehot = [to_categorical(seq, num_classes=len(german_word_index) + 1) for seq in german_sequences]

max_encoder_seq_length = max(len(seq) for seq in english_sequences)
max_decoder_seq_length = max(len(seq) for seq in german_sequences)

encoder_input_data = np.zeros((len(english_sentences), max_encoder_seq_length, len(english_word_index) + 1), dtype='float32')
decoder_input_data = np.zeros((len(german_sentences), max_decoder_seq_length, len(german_word_index) + 1), dtype='float32')
decoder_target_data = np.zeros_like(decoder_input_data)

for i, (enc, dec) in enumerate(zip(english_onehot, german_onehot)):
    encoder_input_data[i, :len(enc)] = enc
    decoder_input_data[i, :len(dec)] = dec
    decoder_target_data[i, :len(dec)-1] = dec[1:]

# 转为 Tensor
encoder_input_data = torch.tensor(encoder_input_data, dtype=torch.float32)
decoder_input_data = torch.tensor(decoder_input_data, dtype=torch.float32)
decoder_target_data = torch.tensor(decoder_target_data, dtype=torch.float32)


In [None]:
# ==============================================
# 自定义数据集类 TranslationDataset
# 功能：封装Seq2Seq模型训练所需的数据加载逻辑
# 继承自PyTorch的Dataset类，实现三个核心方法：
# 1. __init__: 初始化编码器输入、解码器输入和目标输出
# 2. __len__: 返回数据集样本总数
# 3. __getitem__: 按索引返回单个样本
# 使用说明：
# - 配合DataLoader实现批量加载和随机打乱
# - 输入数据应为预处理后的张量形式
# - 输出三个张量分别对应：
#   - 编码器输入 (英文序列)
#   - 解码器输入 (德文序列)
#   - 解码器目标 (偏移一个时间步的德文序列)
# ==============================================



class TranslationDataset(Dataset):
    def __init__(self, encoder_input, decoder_input, decoder_target):
        self.encoder_input = encoder_input
        self.decoder_input = decoder_input
        self.decoder_target = decoder_target

    def __len__(self):
        return self.encoder_input.shape[0]

    def __getitem__(self, idx):
        return self.encoder_input[idx], self.decoder_input[idx], self.decoder_target[idx]

dataset = TranslationDataset(encoder_input_data, decoder_input_data, decoder_target_data)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# 验证数据形状
batch = next(iter(dataloader))
print("Encoder shape:", batch[0].shape)
print("Decoder input shape:", batch[1].shape)
print("Decoder target shape:", batch[2].shape)


Encoder shape: torch.Size([64, 427, 27])
Decoder input shape: torch.Size([64, 401, 33])
Decoder target shape: torch.Size([64, 401, 33])


In [None]:
# ==============================================
# Seq2Seq 模型类定义
# 功能：实现基于LSTM的编码器-解码器架构
# 结构组成：
# 1. 编码器LSTM：将输入序列编码为隐藏状态
# 2. 解码器LSTM：基于编码器隐藏状态生成目标序列
# 3. 全连接层：将解码器输出映射到目标词汇空间
# 参数说明：
# - input_dim: 输入维度（英文词汇量+1）
# - output_dim: 输出维度（德文词汇量+1） 
# - hidden_dim: 隐藏层维度（默认256）
# 前向传播流程：
# 1. 编码器处理输入序列，输出最终隐藏状态
# 2. 解码器使用编码器隐藏状态初始化
# 3. 全连接层处理解码器输出
# 注意事项：
# - 使用batch_first=True保持数据维度一致
# - 自动检测并使用GPU加速
# ==============================================

class Seq2SeqModel(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim):
        super(Seq2SeqModel, self).__init__()
        self.encoder = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.decoder = nn.LSTM(output_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, encoder_input, decoder_input):
        _, (hidden, cell) = self.encoder(encoder_input)
        decoder_output, _ = self.decoder(decoder_input, (hidden, cell))
        return self.fc(decoder_output)

input_dim = len(english_word_index) + 1
output_dim = len(german_word_index) + 1
hidden_dim = 256

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Seq2SeqModel(input_dim, output_dim, hidden_dim).to(device)
print(model)


Seq2SeqModel(
  (encoder): LSTM(27, 256, batch_first=True)
  (decoder): LSTM(33, 256, batch_first=True)
  (fc): Linear(in_features=256, out_features=33, bias=True)
)


In [None]:
# ==============================================
# 模型训练模块
# 功能：执行Seq2Seq模型的训练过程
# 训练流程：
# 1. 定义损失函数(交叉熵损失)和优化器(Adam)
# 2. 设置训练轮次(epoch)和初始学习率(0.001)
# 3. 每个epoch中：
#    - 将模型设为训练模式
#    - 遍历数据加载器获取批次数据
#    - 将数据移动到指定设备(CPU/GPU)
#    - 清零梯度
#    - 前向传播计算输出
#    - 计算损失(输出与目标比较)
#    - 反向传播计算梯度
#    - 优化器更新参数
#    - 累计epoch损失
# 4. 打印每个epoch的平均损失
# 5. 训练完成后保存模型参数
# 注意事项：
# - 使用tqdm显示训练进度条
# - 损失计算时需reshape输出和目标张量
# - 模型参数保存为seq2seq.pth文件
# ==============================================

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for encoder_input, decoder_input, decoder_target in tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        encoder_input, decoder_input, decoder_target = encoder_input.to(device), decoder_input.to(device), decoder_target.to(device)
        optimizer.zero_grad()
        output = model(encoder_input, decoder_input)
        loss = criterion(output.view(-1, output_dim), decoder_target.argmax(dim=2).view(-1))
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {epoch_loss / len(dataloader):.4f}")
torch.save(model.state_dict(), "seq2seq.pth")


Epoch 1/10: 100%|██████████| 4347/4347 [03:22<00:00, 21.42it/s]


Epoch 1, Loss: 0.1769


Epoch 2/10: 100%|██████████| 4347/4347 [03:23<00:00, 21.41it/s]


Epoch 2, Loss: 0.1282


Epoch 3/10: 100%|██████████| 4347/4347 [03:20<00:00, 21.72it/s]


Epoch 3, Loss: 0.1099


Epoch 4/10: 100%|██████████| 4347/4347 [03:19<00:00, 21.84it/s]


Epoch 4, Loss: 0.1014


Epoch 5/10: 100%|██████████| 4347/4347 [03:21<00:00, 21.61it/s]


Epoch 5, Loss: 0.0939


Epoch 6/10: 100%|██████████| 4347/4347 [03:19<00:00, 21.83it/s]


Epoch 6, Loss: 0.1015


Epoch 7/10: 100%|██████████| 4347/4347 [03:21<00:00, 21.56it/s]


Epoch 7, Loss: 0.1164


Epoch 8/10: 100%|██████████| 4347/4347 [03:21<00:00, 21.59it/s]


Epoch 8, Loss: 0.1012


Epoch 9/10: 100%|██████████| 4347/4347 [03:21<00:00, 21.55it/s]


Epoch 9, Loss: 0.1432


Epoch 10/10: 100%|██████████| 4347/4347 [03:06<00:00, 23.32it/s]

Epoch 10, Loss: 0.1229





In [None]:
# ==============================================
# 句子翻译函数 translate_sentence
# 功能：使用训练好的Seq2Seq模型进行单句翻译
# 参数：
# - model: 训练好的Seq2Seq模型
# - sentence: 待翻译的英文句子
# - max_length: 最大解码长度(默认100)
# 处理流程：
# 1. 将模型设为评估模式
# 2. 对输入句子进行预处理和one-hot编码
# 3. 初始化解码器输入(起始符\t)
# 4. 编码器处理输入获得隐藏状态
# 5. 循环解码直到遇到终止符\n或达到最大长度
# 6. 每次解码步骤：
#    - 使用当前隐藏状态预测下一个字符
#    - 将预测字符作为下一时间步输入
#    - 遇到终止符则停止解码
# 返回：拼接后的德文翻译结果
# 注意事项：
# - 使用torch.no_grad()禁用梯度计算
# - 自动处理设备转移(CPU/GPU)
# ==============================================

def translate_sentence(model, sentence, max_length=100):
    model.eval()
    seq = english_tokenizer.texts_to_sequences([sentence.lower()])
    onehot = to_categorical(seq[0], num_classes=len(english_word_index) + 1)
    input_tensor = torch.tensor(onehot, dtype=torch.float32).unsqueeze(0).to(device)

    decoder_input = torch.zeros((1, 1, output_dim), dtype=torch.float32).to(device)
    decoder_input[0, 0, german_word_index['\t']] = 1

    with torch.no_grad():
        _, (hidden, cell) = model.encoder(input_tensor)

    decoded = []
    for _ in range(max_length):
        with torch.no_grad():
            output, (hidden, cell) = model.decoder(decoder_input, (hidden, cell))
            out = model.fc(output)
            pred = torch.argmax(out, dim=2).item()
            if pred == german_word_index['\n']:
                break
            decoded.append(german_tokenizer.index_word.get(pred, ''))
            decoder_input = torch.zeros((1, 1, output_dim), dtype=torch.float32).to(device)
            decoder_input[0, 0, pred] = 1

    return ''.join(decoded)

# 示例翻译
print("EN: how are you")
print("DE:", translate_sentence(model, "how are you"))


EN: how are you
DE: sichdiesenneurbestern
