Reference: https://github.com/bentrevett/pytorch-seq2seq/

# 数据准备

设置随机数种子，得到一致的结果

In [None]:
import random
import numpy as np
import torch

seed = 1234

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic=True

## Datasets

使用opus-100数据集的en-zh子数据集

In [None]:
import datasets
from datasets import load_dataset

ds = load_dataset("Helsinki-NLP/opus-100", "en-zh")

train_data = ds["train"]
valid_data = ds["validation"]
test_data = ds["test"]

如果提示hub连接失败，可是试试换源

Huggleface镜像源替换环境变量

export HF_ENDPOINT=https://hf-mirror.com

$env:HF_ENDPOINT = "https://hf-mirror.com"

检验dataset是否下载和加载成功

In [None]:
print(ds)
print(train_data[0])

## Tokenizer

接下来使用spacy进行分词，即将一个句子中的单词和短语分离出来，方便进行相关处理和学习训练。

在分词之前，我们需要下载spacy的相关分析模型。

In [None]:
!python -m spacy download zh_core_web_sm

!python -m spacy download en_core_news_sm

或者使用pip的github连接下载，本地使用pip安装也可，注意安装环境。

pip install https://github.com/explosion/spacy-models/releases/download/zh_core_web_sm-3.7.0/zh_core_web_sm-3.7.0-py3-none-any.whl

pip install https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.0/en_core_web_sm-3.7.0-py3-none-any.whl

加载模型

In [None]:
import spacy

en_nlp = spacy.load("en_core_web_sm")
zh_nlp = spacy.load("zh_core_web_sm")

测试加载结果

In [None]:
test_text1 = "This is amazing!"
test_text2 = "这好棒啊"

test_token1 = [token.text for token in en_nlp.tokenizer(test_text1)]
test_token2 = [token.text for token in zh_nlp.tokenizer(test_text2)]
print(test_token1)
print(test_token2)

接下来创建一个函数用于tokenizer，将相应的数据集数据进行分词。

In [None]:
def tokenize_en_zh(example, en_nlp, zh_nlp, max_length, lower, sos_token, eos_token):
    en_tokens = [token.text for token in en_nlp.tokenizer(example['translation']["en"])][:max_length]
    zh_tokens = [token.text for token in zh_nlp.tokenizer(example['translation']["zh"])][:max_length]
    if lower:
        en_tokens = [token.lower() for token in en_tokens]
    en_tokens = [sos_token] + en_tokens + [eos_token]
    zh_tokens = [sos_token] + zh_tokens + [eos_token]
    return {"en_tokens":en_tokens,"zh_tokens":zh_tokens}

max_length = 100
lower = True
sos_token = "<sos>"
eos_token = "<eos>"

fn_kwargs = {
    "en_nlp": en_nlp,
    "zh_nlp": zh_nlp,
    "max_length": max_length,
    "lower": lower,
    "sos_token": sos_token,
    "eos_token": eos_token,
}

train_data = train_data.map(tokenize_en_zh, fn_kwargs=fn_kwargs)
valid_data = valid_data.map(tokenize_en_zh, fn_kwargs=fn_kwargs)
test_data = test_data.map(tokenize_en_zh, fn_kwargs=fn_kwargs)

测试一下分词结果。

In [None]:
print(train_data)
print(train_data[0])

## Vocabularies

接下来开始构建词表，将每个单词用一个对应的索引编号来表示。

In [None]:
import torchtext.vocab

min_freq = 2 # 出现次数少于这个的不建立索引
# 特殊词元
unk_token = "<unk>"
pad_token = "<pad>"
sos_token = "<sos>"
eos_token = "<eos>"

special_tokens = {
    unk_token,
    pad_token,
    sos_token,
    eos_token,
}

en_vocab = torchtext.vocab.build_vocab_from_iterator(
    train_data["en_tokens"],
    min_freq=min_freq,
    specials=special_tokens,
    # max_tokens=10000, # 限制词表长度，一般不用设置
)

zh_vocab = torchtext.vocab.build_vocab_from_iterator(
    train_data["zh_tokens"],
    min_freq=min_freq,
    specials=special_tokens,
    # max_tokens=20000, # 限制词表长度，一般不用设置
)

# 处理默认返回结果
en_vocab.set_default_index(en_vocab[unk_token])
zh_vocab.set_default_index(zh_vocab[unk_token])

查看词表建立结果

In [None]:
print(en_vocab.get_itos()[:10])
print(zh_vocab.get_itos()[:10])
print(len(en_vocab))
print(len(zh_vocab))

接下来创建一个对数据集进行numericalize编码的函数。

In [None]:
def numericalize_en_zh(example, en_vocab, zh_vocab):
    en_ids = en_vocab.lookup_indices(example["en_tokens"])
    zh_ids = zh_vocab.lookup_indices(example["zh_tokens"])
    return {"en_ids": en_ids, "zh_ids": zh_ids}

fn_kwargs = {"en_vocab": en_vocab, "zh_vocab": zh_vocab}
train_data = train_data.map(numericalize_en_zh, fn_kwargs=fn_kwargs)
valid_data = valid_data.map(numericalize_en_zh, fn_kwargs=fn_kwargs)
test_data = test_data.map(numericalize_en_zh, fn_kwargs=fn_kwargs)

查看numericalize结果

In [None]:
train_data[0]

将ids使用with_format转换为pytorch的tensor类型

In [None]:
data_type = "torch"
format_columns = ["en_ids","zh_ids"]

train_data = train_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

valid_data = valid_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

test_data = test_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

检查结果

In [None]:
print(type(train_data[0]["en_ids"]))
print(train_data[0])

## DataLoader

最后一步将数据装入pytorch的DataLoader中

collate_fn 接收一个batch将其中的en_ids和zh_ids进行padding

In [None]:
from torch import nn

def get_collate_fn(pad_index):
    def collate_fn(batch):
        batch_en_ids = [example["en_ids"] for example in batch]
        batch_zh_ids = [example["zh_ids"] for example in batch]
        batch_en_ids = nn.utils.rnn.pad_sequence(batch_en_ids, padding_value=pad_index)
        batch_zh_ids = nn.utils.rnn.pad_sequence(batch_zh_ids, padding_value=pad_index)
        batch = {
            "en_ids": batch_en_ids,
            "zh_ids": batch_zh_ids,
        }
        return batch

    return collate_fn

接下来创建dataloader

In [None]:
import torch.utils.data

def get_data_loader(dataset, batch_size, pad_index, shuffle=False):
    collate_fn = get_collate_fn(pad_index)
    data_loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        collate_fn=collate_fn,
        shuffle=shuffle
    )
    return data_loader

In [None]:
batch_size = 128
pad_index = en_vocab[pad_token]

train_data_loader = get_data_loader(train_data, batch_size, pad_index, shuffle=True)
valid_data_loader = get_data_loader(valid_data, batch_size, pad_index)
test_data_loader = get_data_loader(test_data, batch_size, pad_index)

# 创建模型

## Encoder

In [None]:
from torch import nn

class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.hidden_dim=hidden_dim
        self.n_layers=n_layers

        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x = [length, batch]
        embedded = self.dropout(self.embedding(x))
        # embedded = [length, batch, embedding dim]
        outputs, (hidden, cell) = self.rnn(embedded)
        # outputs = [length, batch, hidden_dim * n directions] # n directions 单向LSTM为1双向为2
        # hidden,cell = [n layers * n directions, batch, hidden dim]
        return hidden, cell

## Decoder

In [None]:
from torch import nn

class Decoder(nn.Module):
    def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.output_dim=output_dim
        self.hidden_dim=hidden_dim
        self.n_layers=n_layers

        self.embedding = nn.Embedding(output_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, hidden, cell):
        # x = [batch]
        # hidden,cell = [n layers * n directions, batch, hidden dim]
        x = x.unsqueeze(0)
        # x = [1, batch]
        embedded = self.dropout(self.embedding(x))
        # embedded = [1, batch, embedding dim]
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # output = [seq length, batch, hidden dim * n directions]
        # hidden/cell = [n layers * n directions, batch, hidden dim]
        # seq length & n directions = 1 here
        prediction = self.fc_out(output.squeeze(0))
        # prediction = [batch, output dim]
        return prediction, hidden, cell

## Seq2Seq

In [None]:
from torch import nn
import random

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        assert (
            encoder.hidden_dim == decoder.hidden_dim
            ), "Hidden dimensions of encoder and decoder must be equal!"
        assert (
            encoder.n_layers == decoder.n_layers
            ), "Encoder and decoder must have equal number of layers!"
        
    def forward(self, src, trg, teacher_forcing_ratio):
        # src = [src length, batch]
        # trg = [trg length, batch]
        # if teacher_forcing_ratio is 0.75 we user ground-truth inputs 75% of the time
        batch_size = trg.shape[1]
        trg_length = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        # tensor to store decoder outputs
        outputs = torch.zeros(trg_length, batch_size, trg_vocab_size).to(self.device)
        # last hidden state of encoder
        hidden, cell = self.encoder(src)
        # first input to decoder is <sos>
        input = trg[0, :]
        for t in range(1, trg_length):
            output, hidden, cell = self.decoder(input, hidden, cell)
            # place predictions in a tensor
            outputs[t] = output
            # decide if we are going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio
            # get higgest predicted token from predictions
            top1 = output.argmax(1)
            # if teacher_force use actual as input else top1
            input = trg[t] if teacher_force else top1
        return outputs

# 训练

## 初始化模型

训练之前需要先初始化模型和加载数据集（已完成）。

In [None]:
input_dim = len(en_vocab)
output_dim = len(zh_vocab)
encoder_embedding_dim = 256
decoder_embedding_dim = 256
hidden_dim = 512
n_layers = 2
encoder_dropout = 0.5
decoder_dropout = 0.5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

encoder = Encoder(
    input_dim,
    encoder_embedding_dim,
    hidden_dim,
    n_layers,
    encoder_dropout
)

decoder = Decoder(
    output_dim,
    decoder_embedding_dim,
    hidden_dim,
    n_layers,
    decoder_dropout
)

model = Seq2Seq(encoder, decoder, device).to(device)

均匀分布初始化权重

使用apply的时候，这个函数将会在每个模块和子模块中调用，对每个模块使用nn.init.uniform_进行均匀采样

In [None]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)

model.apply(init_weights)

查看模型参数个数

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"The mode has {count_parameters(model):,} trainable parameters")

## 优化器

In [None]:
import torch.optim

optimizer = torch.optim.Adam(model.parameters())

## 损失函数

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=pad_index)

## 训练

训练用函数

In [None]:
def train_fn(model, data_loader, optimizer, criterion, clip, teacher_forcing_ratio, device):
    model.train()
    epoch_loss = 0
    for i, batch in enumerate(data_loader):
        src = batch["en_ids"].to(device)
        trg = batch["zh_ids"].to(device)
        # src/trg = [length, batch]
        optimizer.zero_grad()
        output = model(src, trg, teacher_forcing_ratio)
        # output = [length, batch, vocab size]
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        # output = [(length-1)*batch, vocab size]
        trg = trg[1:].view(-1)
        # trg = [(length-1)*batch]
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

evaluate用函数

In [None]:
def evaluate_fn(model, data_loader, criterion, device):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for i, batch in enumerate(data_loader):
            src = batch["en_ids"].to(device)
            trg = batch["zh_ids"].to(device)
            output = model(src, trg, 0) # turnoff teacher forcing
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss/len(data_loader)

开始训练

In [None]:
import tqdm

n_epochs = 10
clip = 1.0
teacher_forcing_ratio = 0.5

best_valid_loss = float("inf")

for epoch in tqdm.tqdm(range(n_epochs)):
    train_loss = train_fn(
        model,
        train_data_loader,
        optimizer,
        criterion,
        clip,
        teacher_forcing_ratio,
        device
    )
    
    valid_loss = evaluate_fn(
        model,
        valid_data_loader,
        criterion,
        device
    )
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), "./model/s2s-enzh-model.pt")
    print(f"\tTrain Loss: {train_loss:7.3f} | Train PPL: {np.exp(train_loss):7.3f}")
    print(f"\tValid Loss: {valid_loss:7.3f} | Valid PPL: {np.exp(valid_loss):7.3f}")

# 评估模型

首先测试模型的loss

In [None]:
model.load_state_dict(torch.load("./model/s2s-enzh-model.pt"))
test_loss = evaluate_fn(model, test_data_loader, criterion, device)
print(f"\tTest Loss: {test_loss:7.3f} | Test PPL: {np.exp(test_loss):7.3f}")

接下来评估模型的BELU

首先是翻译用函数

In [None]:
def translate_sentence(sentence, model, en_nlp, zh_nlp, en_vocab, zh_vocab, 
                       lower, sos_token, eos_token, device, max_output_length=25,):
    model.eval()
    with torch.no_grad():
        if isinstance(sentence, str):
            tokens = [token.text for token in en_nlp.tokenizer(sentence)]
        else:
            tokens = [token for token in sentence]
        tokens = [sos_token] + tokens + [eos_token]
        ids = en_vocab.lookup_indices(tokens)
        tensor = torch.LongTensor(ids).unsqueeze(-1).to(device)
        hidden, cell = model.encoder(tensor)
        inputs = zh_vocab.lookup_indices([sos_token])
        for _ in range(max_output_length):
            inputs_tensor = torch.LongTensor([inputs[-1]]).to(device)
            output, hidden, cell = model.decoder(inputs_tensor, hidden, cell)
            predicted_token = output.argmax(-1).item()
            inputs.append(predicted_token)
            if predicted_token == zh_vocab[eos_token]:
                break
        tokens = zh_vocab.lookup_tokens(inputs)
    return tokens

测试翻译函数

In [None]:
sentence = test_data[0]['translation']['en']
expected_translation = test_data[0]['translation']['zh']

print(sentence)
print(expected_translation)

translation = translate_sentence(sentence, model, en_nlp, zh_nlp, en_vocab, zh_vocab,
                                 lower, sos_token, eos_token)
print(translation)

接下来将test_data进行翻译

In [None]:
translations = [translate_sentence(example['translation']["en"], model, en_nlp, zh_nlp, en_vocab, zh_vocab,
                                 lower, sos_token, eos_token)
                for example in tqdm.tqdm(test_data)]

predictions = ["".join(translation[1:-1]) for translation in translations]
references = [example['translation']["zh"] for example in test_data]

查看预测和参考内容

In [None]:
print(predictions[0])
print(references[0])

将结果tokenize

In [None]:
def get_tokenizer_fn(nlp, lower):
    def tokenizer_fn(s):
        tokens = [token.text for token in nlp.tokenizer(s)]
        if lower:
            tokens = [token.lower() for token in tokens]
        return tokens
    return tokenizer_fn

tokenizer_fn = get_tokenizer_fn(zh_nlp, lower)

测试函数

In [None]:
print(tokenizer_fn(predictions[0]))
print(tokenizer_fn(references[0]))

计算bleu

In [None]:
import evaluate

bleu = evaluate.load("bleu")
results = bleu.compute(
    predictions=predictions, references=references, tokenizer=tokenizer_fn
)

查看结果BLEU

In [None]:
print(results)