In [23]:
import logging
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
import torch
import unicodedata
import string
from tqdm import tqdm
from pathlib import Path
from typing import List

import time
import re
from torch.utils.tensorboard import SummaryWriter


In [24]:
logging.basicConfig(level=logging.INFO)

# FILE = "../data/en-fra.txt"
FILE = "/home/wujinyi/sorbonnefile/AMAL/student_tp5/data/en-fra.txt"

writer = SummaryWriter("/tmp/runs/tag-"+time.asctime())

def normalize(s):
    """
    正规化字符串
    将文本中的非 ASCII 字符替换为空格，并且带有字符串正规化
    去除带有许多空格的缩进
    """
    return re.sub(' +',' ', "".join(c if c in string.ascii_letters else " "
         for c in unicodedata.normalize('NFD', s.lower().strip())
         if  c in string.ascii_letters+" "+string.punctuation)).strip()


class Vocabulary:
    """Permet de gérer un vocabulaire.

    En test, il est possible qu'un mot ne soit pas dans le
    vocabulaire : dans ce cas le token "__OOV__" est utilisé.
    Attention : il faut tenir compte de cela lors de l'apprentissage !

    Utilisation:

    - en train, utiliser v.get("blah", adding=True) pour que le mot soit ajouté
      automatiquement
    - en test, utiliser v["blah"] pour récupérer l'ID du mot (ou l'ID de OOV)
    """
    PAD = 0  # 补充的标识
    EOS = 1  # 完成标识
    SOS = 2  # 起始标识
    OOVID = 3  # 表示未知词

    def __init__(self, oov: bool):
        self.oov = oov  # 是否允许OOV
        self.id2word = ["PAD", "EOS", "SOS"]  # 初始化词表中的一些特殊标识
        self.word2id = {"PAD": Vocabulary.PAD, "EOS": Vocabulary.EOS, "SOS": Vocabulary.SOS}  # 初始化字典的转换
        if oov:
            self.word2id["__OOV__"] = Vocabulary.OOVID
            self.id2word.append("__OOV__")

    def __getitem__(self, word: str):
        # 如果词存在于词典中，则返回它的ID
        # 如果未知词，返回OOV的ID
        if self.oov:
            return self.word2id.get(word, Vocabulary.OOVID)
        return self.word2id[word]

    def get(self, word: str, adding=True):
        try:
            # 试图返回词的ID
            return self.word2id[word]
        except KeyError:
            # 如果不存在这个词，并且允许添加，则将这个词添加到词典中
            if adding:
                wordid = len(self.id2word)
                self.word2id[word] = wordid
                self.id2word.append(word)
                return wordid
            if self.oov:
                return Vocabulary.OOVID
            raise

    def __len__(self):
        # 返回词表的长度
        return len(self.id2word)

    def getword(self, idx: int):
        # 根据索引返回字
        if idx < len(self):
            return self.id2word[idx]
        return None

    def getwords(self, idx: List[int]):
        # 返回索引列表对应的词列表
        return [self.getword(i) for i in idx]


# 实现两语言互译数据集的类
class TradDataset():
    def __init__(self, data, vocOrig, vocDest, adding=True, max_len=10):
        self.sentences = []
        for s in tqdm(data.split("\n")):
            # 空行跳过
            if len(s) < 1:
                continue
            # 将数据分割为原文和目标文本，并正规化
            orig, dest = map(normalize, s.split("\t")[:2])
            # 过长的序列跳过
            if len(orig) > max_len:
                continue
            # 将字转换为ID并添加EOS标识，把数据存储到数据集中
            self.sentences.append((torch.tensor([vocOrig.get(o) for o in orig.split(" ")] + [Vocabulary.EOS]),
                                  torch.tensor([vocDest.get(o) for o in dest.split(" ")] + [Vocabulary.EOS])))

    def __len__(self):
        # 返回数据集的大小
        return len(self.sentences)

    def __getitem__(self, i):
        # 返回指定索引的双语言对
        return self.sentences[i]


# 举行补充的 collate 函数
# 用于将不同长度的序列补充为等长序列，并返回其与原文和目标的长度
def collate_fn(batch):
    orig, dest = zip(*batch)
    o_len = torch.tensor([len(o) for o in orig])
    d_len = torch.tensor([len(d) for d in dest])
    return pad_sequence(orig), o_len, pad_sequence(dest), d_len

In [25]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 打开文件，读取数据
with open(FILE) as f:
    lines = f.readlines()

# 设置随机种子
seed = 42  # 你可以选择任何整数作为随机种子
torch.manual_seed(seed)  # 设置 PyTorch 的随机种子
# 随机打乱数据
lines = [lines[x] for x in torch.randperm(len(lines))]
# 训练集与测试集的分割，训练集80%
idxTrain = int(0.8 * len(lines)*0.01)

# 创建英文和法文词汇表
vocEng = Vocabulary(True)
vocFra = Vocabulary(True)
MAX_LEN = 100  # 序列的最大长度
BATCH_SIZE = 100  # Batch Size

# 创建训练数据集和测试数据集
# 通过 TradDataset 来创建数据集，中间包含转换词汇和EOS标识

datatrain = TradDataset("".join(lines[:idxTrain]), vocEng, vocFra, max_len=MAX_LEN)
datatest = TradDataset("".join(lines[idxTrain:int(len(lines)*0.01)]), vocEng, vocFra, max_len=MAX_LEN)

# 使用 DataLoader 带有 collate_fn 来加载数据，配置 batch_size
train_loader = DataLoader(datatrain, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(datatest, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=True)


100%|██████████| 1366/1366 [00:00<00:00, 3800.00it/s]
100%|██████████| 342/342 [00:00<00:00, 6105.61it/s]


In [26]:
torch.manual_seed(seed)
for orig, o_len, dest, d_len in tqdm(train_loader):
    print(orig,orig.shape)  # 从dataloader里输出打乱后的数据，每个句子竖着排列，总共100列(batch size),每次都是随机的
    print(o_len, o_len.shape)  # 从dataloader里输出每个数据的长度，总共100个(batch size)
    print(dest, dest.shape)
    print(d_len, d_len.shape)
    break

  0%|          | 0/14 [00:00<?, ?it/s]

tensor([[  41,  131,   71,  ...,    4,   66,   46],
        [  11,  132,   10,  ...,   34,  307,   11],
        [ 761,   11, 1572,  ...,   35,   95,  146],
        ...,
        [   0,  143,    0,  ...,    0,    0,    0],
        [   0,    1,    0,  ...,    0,    0,    0],
        [   0,    0,    0,  ...,    0,    0,    0]]) torch.Size([19, 100])
tensor([ 7, 18, 10, 15, 10,  6, 11,  8,  4,  7, 16,  6, 11, 11,  6,  7,  7, 12,
         8,  8,  9,  5,  5,  7,  6,  8,  6,  7, 11,  5, 19, 10,  8,  5,  5,  9,
         5, 11,  4, 13,  9,  8,  5,  5,  5, 10,  9,  8,  8, 12,  6, 11,  8,  8,
         8,  8,  8, 11,  7,  6,  7, 10,  8,  8,  6, 10,  6,  8,  7,  6, 11, 12,
         6,  6,  6,  8, 12,  7,  5, 10,  6,  6,  9,  7, 11,  5,  9,  8,  8,  6,
         4,  5, 10,  7,  9, 11, 10,  7,  7,  7]) torch.Size([100])
tensor([[ 703,  145,   81,  ...,    4,   45,  155],
        [  13,  146, 1101,  ...,   37,  692,   13],
        [1476,   55, 1591,  ...,   58,  207,  176],
        ...,
        [   0,  




2. 翻译任务

在翻译任务中，我们将使用两个RNN模型：

一个编码器，用于在读取待翻译的序列后生成隐藏状态。

一个解码器，从隐藏状态开始，生成翻译后的句子。

除了EOS（序列结束）标记外，还需要一个特殊的SOS（序列开始）标记，作为输入给解码器的第一个标记（加上隐藏状态），从而开始翻译句子。

训练解码器有两种方式：

受约束模式（或称为教师强制，teacher forcing），在该模式下，将目标句子传递给解码器，每个时间步都会考虑目标句子中的一个单词：生成过程受到指导，可以精确修正每个生成的隐藏状态。

非受约束模式，在迭代生成翻译时不考虑目标句子：每个时间步引入的是前一个时间步中隐藏状态解码后具有最大概率的单词（或从该分布中随机抽取一个）。这种模式像是在推理阶段生成句子，然后在整个句子生成完成后再进行修正。

非受约束模式比受约束模式更困难：在预测某个单词时的错误会极大地扰乱后续的生成，并且反向传播在随后的序列中效果不佳。然而，这种方式可以更好地泛化，避免记住具体的例子。直观上来说，我们应该在训练开始时使用受约束模式来很好地初始化解码器，然后逐步切换到非受约束模式。这种过程被称为课程学习（Curriculum Learning）。

问题2

在 tp5-traduction.py 中实现编码器-解码器。对于编码器和解码器，使用GRU，并采用以下架构：

编码器：对原始词汇表进行嵌入，然后使用GRU处理嵌入序列。

解码器：对目标词汇表进行嵌入，然后使用GRU，再接一个线性网络来解码隐藏状态（最后加一个softmax）。

在解码器中，你需要一个名为 generate(hidden, lenseq=None) 的方法，该方法从隐藏状态 hidden（以及作为输入的SOS标记）生成一个序列，直到达到指定长度 lenseq，或者生成了EOS标记。

使用课程学习的简单策略来实现训练循环，即对于每个小批量，随机均匀地选择受约束模式或非受约束模式。在非受约束模式下生成时，可以传递目标句子的期望长度。

训练你的模型，并保留一个测试集来验证是否存在过拟合或欠拟合。

使用（在测试中）前一个实验中的生成方法来可视化生成的翻译。

附加说明

在原始论文中，两种模式的选择是基于受约束模式概率递减的方式来进行的，并且这种选择在每个时间步而不是整个小批量上进行。你可以通过改进策略并比较结果来获得额外的分数。



In [27]:
import torch
import torch.nn as nn
import torch.nn.utils.rnn as rnn_utils

# 定义GRU层
embedding_dim = 5  # 嵌入维度
hidden_dim = 7     # 隐藏层维度
gru = nn.GRU(input_size=embedding_dim, hidden_size=hidden_dim, batch_first=True)

# 假设批次中有3个序列，嵌入维度为5
embedded = torch.tensor([
    [[1, 1, 1, 1, 1], [2, 2, 2, 2, 2], [0, 0, 0, 0, 0], [0, 0, 0, 0, 0]],  # 序列1: 实际长度为2
    [[3, 3, 3, 3, 3], [4, 4, 4, 4, 4], [5, 5, 5, 5, 5], [0, 0, 0, 0, 0]],  # 序列2: 实际长度为3
    [[6, 6, 6, 6, 6], [7, 7, 7, 7, 7], [8, 8, 8, 8, 8], [9, 9, 9, 9, 9]]   # 序列3: 实际长度为4
], dtype=torch.float32)  # 形状为 (batch_size, seq_length, embedding_dim) -> (3, 4, 5)

# 每个序列的实际长度
lengths = torch.tensor([2, 3, 4])

# 打包序列
packed_input = rnn_utils.pack_padded_sequence(embedded, lengths.cpu(), batch_first=True, enforce_sorted=False)
print(packed_input)

# 通过GRU计算输出
packed_output, hidden = gru(packed_input)

print("packed_output:", packed_output)
print("hidden shape:", hidden.shape)
print("hidden:", hidden)


PackedSequence(data=tensor([[6., 6., 6., 6., 6.],
        [3., 3., 3., 3., 3.],
        [1., 1., 1., 1., 1.],
        [7., 7., 7., 7., 7.],
        [4., 4., 4., 4., 4.],
        [2., 2., 2., 2., 2.],
        [8., 8., 8., 8., 8.],
        [5., 5., 5., 5., 5.],
        [9., 9., 9., 9., 9.]]), batch_sizes=tensor([3, 3, 2, 1]), sorted_indices=tensor([2, 1, 0]), unsorted_indices=tensor([2, 1, 0]))
packed_output: PackedSequence(data=tensor([[-0.1272, -0.0329, -0.6789,  0.0133, -0.9074, -0.6769, -0.1986],
        [-0.2959, -0.1203, -0.3401,  0.0901, -0.5296, -0.6026, -0.3728],
        [-0.2473, -0.2140, -0.0594,  0.2074, -0.0557, -0.4280, -0.4615],
        [-0.2134, -0.0505, -0.8804,  0.0211, -0.9799, -0.9055, -0.3184],
        [-0.4578, -0.1782, -0.6916,  0.1385, -0.7901, -0.8654, -0.5332],
        [-0.4265, -0.3085, -0.3518,  0.3342, -0.3207, -0.7512, -0.6517],
        [-0.2696, -0.0609, -0.9265,  0.0253, -0.9919, -0.9750, -0.3941],
        [-0.5579, -0.2120, -0.8383,  0.1642, -0.9081, -0.9

In [57]:
#  TODO: 实现编码器、解码器和训练循环

class Encoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(Encoder, self).__init__()
        # 词嵌入层，将词汇表中的每个词映射为固定大小的向量
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=Vocabulary.PAD)
        # GRU层，用于处理嵌入的序列并生成隐藏状态
        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True)

    def forward(self, x, lengths):
        # x: 输入的词序列，lengths: 每个序列的长度
        # print("encoder x:",x.shape)
        embedded = self.embedding(x)  # 将输入序列映射为嵌入表示  # (batch_size, seq_len, embedding_dim)
        # print("encoder embedded:",embedded.shape)
        # print(lengths)
        # lengths = torch.clamp(lengths, max=embedded.size(1))  # 将所有长度限制在 seq_len 范围内，可以确保没有长度超出序列的范围，避免越界错误
        # print(lengths)
        # 将嵌入表示打包为压缩格式，以便GRU忽略填充部分
        packed_input = nn.utils.rnn.pack_padded_sequence(embedded, lengths.cpu(), batch_first=True, enforce_sorted=False)  # (batch_size, seq_len, embedding_dim)
        # 通过GRU计算隐藏状态
        packed_output, hidden = self.gru(packed_input)  # hidden: (1, batch_size, hidden_dim)
        # print("encoder hidden:",hidden.shape)
        return hidden  # 返回最后的隐藏状态


class Decoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(Decoder, self).__init__()
        # 词嵌入层，将目标语言中的每个词映射为固定大小的向量
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=Vocabulary.PAD)
        # GRU层，用于处理嵌入的输入和隐藏状态
        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True)
        # 全连接层，用于将GRU的输出映射到词汇表大小的向量上
        self.fc = nn.Linear(hidden_dim, vocab_size)
        # 使用log softmax来计算每个词的概率
        self.softmax = nn.LogSoftmax(dim=2)

    def forward(self, x, hidden):
        # x: 输入词序列, hidden: 上一步的隐藏状态
        embedded = self.embedding(x).unsqueeze(1)  # (batch_size, 1, embedding_dim)
        # embedded = self.embedding(x)  # (batch_size, 1, embedding_dim)
        # print("decoder embedded:",embedded.shape)
        # print("decoder hidden:", hidden.shape)  #(num_layers=1, batch_size, hidden_dim)   
        output, hidden = self.gru(embedded, hidden)  # 通过GRU计算输出和更新后的隐藏状态
        output = self.fc(output)  # 全连接层映射到词汇表大小
        output = self.softmax(output)  # 计算词的概率分布
        return output, hidden  # 返回输出和更新后的隐藏状态

    def generate(self, hidden, max_len, sos_token, eos_token):
        # 根据初始隐藏状态生成输出序列
        inputs = torch.tensor([[sos_token]]).to(device)  # 起始输入为SOS标记
        outputs = []
        for _ in range(max_len):
            output, hidden = self(inputs, hidden)  # 通过GRU生成输出
            topv, topi = output.topk(1)  # 选择概率最大的词
            outputs.append(topi.item())  # 将生成的词添加到输出序列中
            if topi.item() == eos_token:  # 如果生成EOS标记，停止生成
                break
            inputs = topi.squeeze().detach()  # 下一个输入是当前时间步生成的词
        return outputs  # 返回生成的序列


def train_model(encoder, decoder, train_loader, criterion, encoder_optimizer, decoder_optimizer, num_epochs):
    for epoch in range(num_epochs):
        total_loss = 0
        for orig, o_len, dest, d_len in tqdm(train_loader):
            # 将输入数据和目标数据转移到设备上（如GPU）
            orig = orig.transpose(0,1)
            dest = dest.transpose(0,1)
            # print("orig:",orig.shape)
            # print("dest:",dest.shape)
            orig, dest = orig.to(device), dest.to(device)
            o_len, d_len = o_len.to(device), d_len.to(device)

            # 梯度清零
            encoder_optimizer.zero_grad()
            decoder_optimizer.zero_grad()

            # 前向传播通过编码器
            # print("encoder:", orig.shape)
            # print("o_len:", o_len.shape)
            hidden = encoder(orig, o_len)
            # print("hidden:", hidden.shape)

            # 初始化解码器的输入为SOS标记，隐藏状态为编码器的最后隐藏状态
            decoder_input = torch.tensor([Vocabulary.SOS] * dest.size(0)).to(device)
            decoder_hidden = hidden

            loss = 0
            # 随机决定是否使用teacher forcing
            use_teacher_forcing = True if torch.rand(1).item() > 0.5 else False

            if use_teacher_forcing:
                # 使用teacher forcing: 每次将目标词作为下一步的输入
                # for di in range(dest.size(0)):
                for di in range(dest.size(1)):
                    # print("decoder_input:", decoder_input.shape)
                    # print("decoder_hidden:", decoder_hidden.shape)
                    decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
                    # print("decoder output:", decoder_output.shape)
                    # print("dest:", dest[:, di].shape)
                    # decoder_input = dest[di]  # 下一步的输入是实际目标
                    # loss += criterion(decoder_output.squeeze(1), dest[di])
                    decoder_input = dest[:, di]  # 下一步的输入是实际目标
                    loss += criterion(decoder_output.squeeze(1), dest[:, di])
            else:
                # 不使用teacher forcing: 使用解码器自己的预测作为下一步的输入
                # for di in range(dest.size(0)):
                for di in range(dest.size(1)):
                    # print("decoder_input:", decoder_input.shape)
                    # print("decoder_hidden:", decoder_hidden.shape)
                    decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
                    topv, topi = decoder_output.topk(1)  # 选择概率最大的词
                    decoder_input = topi.squeeze().detach()  # 使用当前预测作为下一个时间步的输入
                    # print("decoder input:", decoder_input.shape)
                    # print("decoder output:", decoder_output.shape)
                    # print("dest:", dest[:, di].shape)
                    # loss += criterion(decoder_output.squeeze(1), dest[di])
                    loss += criterion(decoder_output.squeeze(1), dest[:, di])
                    # if decoder_input.item() == Vocabulary.EOS:  # 如果生成EOS标记，停止解码
                    #     break

            # 反向传播
            loss.backward()
            encoder_optimizer.step()
            decoder_optimizer.step()

            total_loss += loss.item() / dest.size(0)

        # 打印每个epoch的损失
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss:.4f}")


def evaluate_model(encoder, decoder, test_loader, criterion, max_len):
    encoder.eval()  # 设置为评估模式
    decoder.eval()
    total_loss = 0
    with torch.no_grad():  # 禁用梯度计算
        for orig, o_len, dest, d_len in tqdm(test_loader):
            orig = orig.transpose(0,1)
            dest = dest.transpose(0,1)
            orig, dest = orig.to(device), dest.to(device)
            o_len, d_len = o_len.to(device), d_len.to(device)

            # 前向传播通过编码器
            # print(orig)
            # print(o_len)
            # o_len = torch.clamp(o_len, max=orig.size(1))
            hidden = encoder(orig, o_len)
            # decoder_input = torch.tensor([[Vocabulary.SOS]] * dest.size(1)).to(device)
            decoder_input = torch.tensor([Vocabulary.SOS] * dest.size(0)).to(device)
            decoder_hidden = hidden
            loss = 0

            # 逐步生成输出序列
            # for di in range(dest.size(0)):
            for di in range(dest.size(1)):
                decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
                topv, topi = decoder_output.topk(1)  # 选择概率最大的词
                decoder_input = topi.squeeze().detach()  # 使用自己的预测作为下一个时间步的输入
                loss += criterion(decoder_output.squeeze(1), dest[:, di])
                # if decoder_input.item() == Vocabulary.EOS:  # 如果生成EOS标记，停止解码
                #     break

            total_loss += loss.item() / dest.size(0)

    # 打印测试集上的损失
    print(f"Test Loss: {total_loss:.4f}")

In [58]:
# import os
# os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
device = "cpu"
torch.manual_seed(seed)

# 初始化模型、损失函数和优化器
embedding_dim = 256
hidden_dim = 512
encoder = Encoder(len(vocEng), embedding_dim, hidden_dim).to(device)
decoder = Decoder(len(vocFra), embedding_dim, hidden_dim).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=Vocabulary.PAD)  # 忽略填充部分的损失
encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.001)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.001)

# 训练模型
num_epochs = 20
train_model(encoder, decoder, train_loader, criterion, encoder_optimizer, decoder_optimizer, num_epochs)


100%|██████████| 14/14 [00:18<00:00,  1.34s/it]


Epoch 1/20, Loss: 17.9255


100%|██████████| 14/14 [00:16<00:00,  1.15s/it]


Epoch 2/20, Loss: 14.0942


100%|██████████| 14/14 [00:21<00:00,  1.55s/it]


Epoch 3/20, Loss: 13.5466


100%|██████████| 14/14 [00:18<00:00,  1.29s/it]


Epoch 4/20, Loss: 12.8739


100%|██████████| 14/14 [00:17<00:00,  1.22s/it]


Epoch 5/20, Loss: 11.9295


100%|██████████| 14/14 [00:18<00:00,  1.33s/it]


Epoch 6/20, Loss: 11.8982


100%|██████████| 14/14 [00:20<00:00,  1.47s/it]


Epoch 7/20, Loss: 10.7950


100%|██████████| 14/14 [00:18<00:00,  1.32s/it]


Epoch 8/20, Loss: 10.3523


100%|██████████| 14/14 [00:17<00:00,  1.28s/it]


Epoch 9/20, Loss: 9.7280


100%|██████████| 14/14 [00:09<00:00,  1.53it/s]


Epoch 10/20, Loss: 8.9119


100%|██████████| 14/14 [00:10<00:00,  1.38it/s]


Epoch 11/20, Loss: 7.7960


100%|██████████| 14/14 [00:10<00:00,  1.39it/s]


Epoch 12/20, Loss: 8.7738


100%|██████████| 14/14 [00:10<00:00,  1.31it/s]


Epoch 13/20, Loss: 7.8870


100%|██████████| 14/14 [00:09<00:00,  1.41it/s]


Epoch 14/20, Loss: 7.6308


100%|██████████| 14/14 [00:09<00:00,  1.48it/s]


Epoch 15/20, Loss: 7.8322


100%|██████████| 14/14 [00:09<00:00,  1.45it/s]


Epoch 16/20, Loss: 6.9870


100%|██████████| 14/14 [00:08<00:00,  1.59it/s]


Epoch 17/20, Loss: 7.5312


100%|██████████| 14/14 [00:09<00:00,  1.52it/s]


Epoch 18/20, Loss: 6.4772


100%|██████████| 14/14 [00:12<00:00,  1.10it/s]


Epoch 19/20, Loss: 6.3126


100%|██████████| 14/14 [00:09<00:00,  1.56it/s]

Epoch 20/20, Loss: 4.7861





In [59]:
# 在测试集上评估模型
evaluate_model(encoder, decoder, test_loader, criterion, MAX_LEN)

100%|██████████| 4/4 [00:00<00:00,  5.21it/s]

Test Loss: 6.2689





In [60]:
import torch
import torch.nn.functional as F

def translate_sentence(encoder, decoder, sentence, vocEng, vocFra, max_len=20):
    # 1. 处理输入句子，将其转换为索引
    encoder.eval()
    decoder.eval()

    # 将输入句子分词，并转换为词汇表中的索引
    indices = [vocEng.get(word, adding=False) for word in sentence.split(' ')]
    input_tensor = torch.tensor(indices, dtype=torch.long).unsqueeze(0).to(device)  # 形状 (1, seq_len)

    # 2. 使用编码器得到隐藏状态
    with torch.no_grad():
        lengths = [input_tensor.shape[1]]  # 输入序列的长度
        embedded = encoder.embedding(input_tensor)  # 获取嵌入表示
        packed_input = nn.utils.rnn.pack_padded_sequence(embedded, lengths, batch_first=True, enforce_sorted=False)
        encoder_output, encoder_hidden = encoder.gru(packed_input)

    # 3. 使用解码器逐步生成翻译
    decoder_input = torch.tensor([[Vocabulary.SOS]]).to(device)  # 起始符，形状 (1, 1)
    decoder_hidden = encoder_hidden  # 使用编码器的最后隐藏状态作为解码器的初始隐藏状态

    translated_sentence = []

    for _ in range(max_len):
        with torch.no_grad():
            embedded = decoder.embedding(decoder_input)  # 获取解码器输入的嵌入表示
            output, decoder_hidden = decoder.gru(embedded, decoder_hidden)  # 通过GRU获取输出
            output = decoder.fc(output)  # 获取词汇表大小的输出
            output = F.log_softmax(output, dim=-1)  # 获取词的概率分布

        # 获取概率最高的词的索引
        top1 = output.argmax(dim=-1).item()

        # 如果生成结束符 EOS，则停止翻译
        if top1 == Vocabulary.EOS:
            break

        # 添加生成的单词到翻译列表中
        translated_sentence.append(vocFra.getword(top1))

        # 准备下一个时间步的输入
        decoder_input = torch.tensor([[top1]]).to(device)

    return ' '.join(translated_sentence)

# 示例输入句子
input_sentence = "how are you"
translated = translate_sentence(encoder, decoder, input_sentence, vocEng, vocFra)
print(f"输入句子: {input_sentence}")
print(f"生成翻译: {translated}")

输入句子: how are you
生成翻译: comment etes vous
