# B站：神奇的布欧
# 微信：l1243278923

In [1]:
import torch
import torch.nn as nn
import torch.utils.data as Data
import random
import pandas as pd
from torchtext import data
from torchtext.vocab import build_vocab_from_iterator

# 0. 判断GPU是否可用

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 1. 数据预处理

# a. 构建数据转换方式

In [3]:
numStrDict = {'0': '零', '1': '一', '2': '二', '3': '三', '4': '四', '5': '五', '6': '六', '7': '七', '8': '八',
              '9': '九'}


def numToStr(number):
    tempList = [i for i in str(number)]
    strList = [numStrDict[i] for i in tempList]
    return tempList, strList

# b. 构建数据集

In [4]:
numList = list()
strList = list()
for i in range(51000):
    tempNum = random.randint(10000000, 999999999999)
    tempNum, tempStr = numToStr(tempNum)
    numList.append(" ".join(tempNum))
    strList.append(" ".join(tempStr))
df = pd.DataFrame({
    'number': numList,
    'string': strList
})

In [5]:
df

Unnamed: 0,number,string
0,5 4 6 5 9 5 5 4 2 2 1 0,五 四 六 五 九 五 五 四 二 二 一 零
1,8 8 8 1 0 4 5 0 3 1 4 8,八 八 八 一 零 四 五 零 三 一 四 八
2,5 1 1 0 5 4 6 0 4 7 3 7,五 一 一 零 五 四 六 零 四 七 三 七
3,2 5 5 3 0 3 8 2 3 7 6 6,二 五 五 三 零 三 八 二 三 七 六 六
4,2 7 0 5 9 1 2 3 9 1 9 8,二 七 零 五 九 一 二 三 九 一 九 八
...,...,...
50995,3 8 4 2 4 4 0 7 0 4 4 0,三 八 四 二 四 四 零 七 零 四 四 零
50996,4 3 8 3 5 3 1 8 3 8 6 7,四 三 八 三 五 三 一 八 三 八 六 七
50997,6 5 3 7 3 3 9 2 3 8 7 7,六 五 三 七 三 三 九 二 三 八 七 七
50998,6 6 5 2 8 2 6 2 9 9 2 2,六 六 五 二 八 二 六 二 九 九 二 二


# c. 构建词表

In [6]:
def yield_tokens(data_iter):
    tokenizer = data.get_tokenizer("basic_english")
    for test in data_iter:
        yield tokenizer(test)

In [7]:
numVocab = build_vocab_from_iterator(yield_tokens(df.number), min_freq=1, specials=['<PAD>', '<SOS>', '<EOS>'])
numVocab.set_default_index(numVocab['<PAD>'])
strVocab = build_vocab_from_iterator(yield_tokens(df.string), min_freq=1, specials=['<PAD>', '<SOS>', '<EOS>'])
strVocab.set_default_index(strVocab['<PAD>'])

In [8]:
numVocab_size = numVocab.__len__()
strVocab_size = strVocab.__len__()
print(numVocab_size)
print(strVocab_size)

13
13


In [9]:
print(numVocab.get_stoi())
print(strVocab.get_stoi())

{'<EOS>': 2, '<PAD>': 0, '1': 7, '<SOS>': 1, '8': 3, '3': 4, '5': 5, '6': 6, '7': 8, '2': 9, '4': 10, '9': 11, '0': 12}
{'六': 6, '<EOS>': 2, '<PAD>': 0, '八': 3, '<SOS>': 1, '三': 4, '五': 5, '一': 7, '七': 8, '二': 9, '四': 10, '九': 11, '零': 12}


In [10]:
def word2index(vocab, word):
    return vocab[word]


def index2word(vocab, index):
    return vocab.lookup_token(index)


def wordList2indexList(vocab, wordList):
    return vocab.lookup_indices(wordList)


def indexList2wordList(vocab, indexList):
    return vocab.lookup_tokens(indexList)

# d. 获取最大长度

In [11]:
numMaxLength = max(len(i.split(" ")) for i in df["number"])
strMaxLength = max(len(i.split(" ")) for i in df["string"])
maxLength = max(numMaxLength, strMaxLength) + 2

In [12]:
maxLength

14

# e. 截长补短

In [13]:
def sentenceDeal(vocab, wordList, maxLength):
    wordList.insert(0, "<SOS>")
    wordList.append("<EOS>")
    for i in range(maxLength - len(wordList)):
        wordList.append("<PAD>")
    return wordList2indexList(vocab, wordList)

In [14]:
tokenizer = data.get_tokenizer("basic_english")
allNumberList = list()
for index in range(len(df)):
    allNumberList.append(sentenceDeal(numVocab, tokenizer(df.number[index]), maxLength))
allStringList = list()
for index in range(len(df)):
    allStringList.append(sentenceDeal(strVocab, tokenizer(df.string[index]), maxLength))

In [15]:
df["numberCode"] = allNumberList
df["stringCode"] = allStringList

In [16]:
df

Unnamed: 0,number,string,numberCode,stringCode
0,5 4 6 5 9 5 5 4 2 2 1 0,五 四 六 五 九 五 五 四 二 二 一 零,"[1, 5, 10, 6, 5, 11, 5, 5, 10, 9, 9, 7, 12, 2]","[1, 5, 10, 6, 5, 11, 5, 5, 10, 9, 9, 7, 12, 2]"
1,8 8 8 1 0 4 5 0 3 1 4 8,八 八 八 一 零 四 五 零 三 一 四 八,"[1, 3, 3, 3, 7, 12, 10, 5, 12, 4, 7, 10, 3, 2]","[1, 3, 3, 3, 7, 12, 10, 5, 12, 4, 7, 10, 3, 2]"
2,5 1 1 0 5 4 6 0 4 7 3 7,五 一 一 零 五 四 六 零 四 七 三 七,"[1, 5, 7, 7, 12, 5, 10, 6, 12, 10, 8, 4, 8, 2]","[1, 5, 7, 7, 12, 5, 10, 6, 12, 10, 8, 4, 8, 2]"
3,2 5 5 3 0 3 8 2 3 7 6 6,二 五 五 三 零 三 八 二 三 七 六 六,"[1, 9, 5, 5, 4, 12, 4, 3, 9, 4, 8, 6, 6, 2]","[1, 9, 5, 5, 4, 12, 4, 3, 9, 4, 8, 6, 6, 2]"
4,2 7 0 5 9 1 2 3 9 1 9 8,二 七 零 五 九 一 二 三 九 一 九 八,"[1, 9, 8, 12, 5, 11, 7, 9, 4, 11, 7, 11, 3, 2]","[1, 9, 8, 12, 5, 11, 7, 9, 4, 11, 7, 11, 3, 2]"
...,...,...,...,...
50995,3 8 4 2 4 4 0 7 0 4 4 0,三 八 四 二 四 四 零 七 零 四 四 零,"[1, 4, 3, 10, 9, 10, 10, 12, 8, 12, 10, 10, 12...","[1, 4, 3, 10, 9, 10, 10, 12, 8, 12, 10, 10, 12..."
50996,4 3 8 3 5 3 1 8 3 8 6 7,四 三 八 三 五 三 一 八 三 八 六 七,"[1, 10, 4, 3, 4, 5, 4, 7, 3, 4, 3, 6, 8, 2]","[1, 10, 4, 3, 4, 5, 4, 7, 3, 4, 3, 6, 8, 2]"
50997,6 5 3 7 3 3 9 2 3 8 7 7,六 五 三 七 三 三 九 二 三 八 七 七,"[1, 6, 5, 4, 8, 4, 4, 11, 9, 4, 3, 8, 8, 2]","[1, 6, 5, 4, 8, 4, 4, 11, 9, 4, 3, 8, 8, 2]"
50998,6 6 5 2 8 2 6 2 9 9 2 2,六 六 五 二 八 二 六 二 九 九 二 二,"[1, 6, 6, 5, 9, 3, 9, 6, 9, 11, 11, 9, 9, 2]","[1, 6, 6, 5, 9, 3, 9, 6, 9, 11, 11, 9, 9, 2]"


In [17]:
# 训练集
class trainDataset(Data.Dataset):
    def __len__(self):
        return len(df[:50000])
    def __getitem__(self, i):
        x = df[:50000].numberCode.tolist()[i]
        y = df[:50000].stringCode.tolist()[i]
        x = torch.LongTensor(x)
        y = torch.LongTensor(y)
        return x, y
# 测试集
class testDataset(Data.Dataset):
    def __len__(self):
        return len(df[50000:])
    def __getitem__(self, i):
        x = df[50000:].numberCode.tolist()[i]
        y = df[50000:].stringCode.tolist()[i]
        x = torch.LongTensor(x)
        y = torch.LongTensor(y)
        return x, y

In [18]:
# 训练集
traindataset = trainDataset()
print(len(traindataset))
testdataset = testDataset()
print(len(testdataset))

50000
1000


# f. 数据集加载器

In [19]:
trainLoader = Data.DataLoader(dataset=traindataset,
                              batch_size=64,
                              shuffle=True,
                              drop_last=True)
testLoader = Data.DataLoader(dataset=testdataset,
                              batch_size=64,
                              shuffle=True,
                              drop_last=True)
len(trainLoader), next(iter(trainLoader))

(781,
 [tensor([[ 1,  5, 10,  5,  9,  7,  7,  7,  3, 12, 10,  7,  2,  0],
          [ 1,  6,  5,  3, 10,  3,  3, 11,  9,  7,  7,  3,  6,  2],
          [ 1, 10,  6, 12,  7,  3,  9,  8,  6, 12,  5,  3,  4,  2],
          [ 1,  6,  8,  6, 12,  8,  4, 12,  4,  6,  9,  9,  2,  0],
          [ 1,  7, 12, 10,  7,  5,  9,  5, 12,  9, 10, 12, 10,  2],
          [ 1,  6,  4,  5,  5,  6,  3,  8, 10, 12,  4, 11, 11,  2],
          [ 1,  6,  5,  5,  8, 12,  9,  5, 10,  5, 11,  8, 10,  2],
          [ 1,  8,  4,  6,  4, 12,  6,  6,  7,  8, 10,  8,  4,  2],
          [ 1,  3,  5,  5,  5,  8,  9,  7,  6,  3, 10,  5,  7,  2],
          [ 1,  8,  7, 10,  8,  9,  6,  9,  4, 12,  6, 10,  9,  2],
          [ 1,  3, 12,  9,  5, 11, 11,  3,  5,  4, 10,  7, 12,  2],
          [ 1,  9, 12,  9,  5,  9,  3,  6,  9, 10,  8,  3,  9,  2],
          [ 1,  7,  8,  9, 12, 11,  7, 12,  3,  4,  5,  5,  8,  2],
          [ 1, 11, 12,  8, 11, 10, 12, 12,  8,  5,  5, 11,  7,  2],
          [ 1,  5, 10,  4, 10, 12,  4, 12,

# 2. 模型构建

# a. Encode

In [20]:
class Encoder(nn.Module):
    def __init__(self, encoder_vocab_size, encoder_embedding_dim, encoder_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(num_embeddings=encoder_vocab_size, embedding_dim=encoder_embedding_dim)
        self.gru = nn.GRU(input_size=encoder_embedding_dim, hidden_size=encoder_hidden_size, batch_first=True)
    def forward(self, x):
        x = self.embedding(x)
        _, encoder_hidden = self.gru(x)
        return encoder_hidden

# b. Decode

In [21]:
class Decoder(nn.Module):
    def __init__(self, decoder_vocab_size, decoder_embedding_dim, decoder_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(num_embeddings=decoder_vocab_size, embedding_dim=decoder_embedding_dim)
        self.gru = nn.GRU(input_size=decoder_embedding_dim, hidden_size=decoder_hidden_size, batch_first=True)
    def forward(self, x, hidden):
        x = self.embedding(x)
        decoder_output, decoder_hidden = self.gru(x, hidden)
        return decoder_output, decoder_hidden

# c. Seq2Seq

In [22]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder_vocab_size, encoder_embedding_dim, encoder_hidden_size,
                 decoder_vocab_size, decoder_embedding_dim, decoder_hidden_size):
        super().__init__()
        self.encoder = Encoder(encoder_vocab_size, encoder_embedding_dim, encoder_hidden_size)
        self.decoder = Decoder(decoder_vocab_size, decoder_embedding_dim, decoder_hidden_size)
        self.fc = nn.Linear(decoder_hidden_size, decoder_vocab_size)
    def forward(self, encoder_x, decoder_x):
        decoder_input = decoder_x[:, :-1]
        encoder_hidden = self.encoder(encoder_x)
        decoder_output, _ = self.decoder(decoder_input, encoder_hidden)
        out = self.fc(decoder_output)
        return out

In [23]:
encoder_embedding_dim = 32
encoder_hidden_size = 512
decoder_embedding_dim = 32
decoder_hidden_size = 512
model = Seq2Seq(numVocab_size, encoder_embedding_dim, encoder_hidden_size,
                strVocab_size, decoder_embedding_dim, decoder_hidden_size).to(device)

In [24]:
model

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(13, 32)
    (gru): GRU(32, 512, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(13, 32)
    (gru): GRU(32, 512, batch_first=True)
  )
  (fc): Linear(in_features=512, out_features=13, bias=True)
)

# 3. 训练

In [25]:
def train():
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fun = nn.CrossEntropyLoss()
    model.train()
    for epoch in range(1):
        for step, (x, y) in enumerate(trainLoader):
            out = model(x.to(device), y.to(device))
            y = y[:, 1:]
            loss = loss_fun(out.reshape(-1, out.shape[-1]).to("cpu"), y.reshape(-1).to("cpu"))
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            if step % 200 == 0:
                acc = (out.to("cpu").argmax(dim=2) == y).sum().item() / (len(y) * (maxLength - 1))
                print(f"epoch：{epoch + 1}，step：{step + 1}， 损失：{loss.item()}，准确率：{acc}")
    torch.save(model, "./seq2seq数字翻译.model")

In [26]:
train()

epoch：1，step：1， 损失：2.5715537071228027，准确率：0.0673076923076923
epoch：1，step：201， 损失：0.9391135573387146，准确率：0.6514423076923077
epoch：1，step：401， 损失：0.31643038988113403，准确率：0.8978365384615384
epoch：1，step：601， 损失：0.11560644209384918，准确率：0.9711538461538461


# 4. 测试

In [40]:
@torch.no_grad()
def test(dataDf, index, strVocab):
    model = torch.load("./seq2seq数字翻译.model").to("cpu")
    model.eval()
    result = list()
    x = torch.Tensor(dataDf.iloc[index]["numberCode"]).to(dtype=torch.long)
    x = torch.unsqueeze(x, dim=0)
    decoder_input = torch.unsqueeze(torch.Tensor([word2index(strVocab, "<SOS>")]), dim=0).to(dtype=torch.long)
    encoder_hidden = model.encoder(x)
    decoder_hidden = encoder_hidden
    while True:
        decoder_output, decoder_hidden = model.decoder(decoder_input, decoder_hidden)
        out = model.fc(decoder_output)
        word = index2word(strVocab, torch.argmax(out[0][0]))
        result.append(word)
        if word == '<EOS>' or len(result) >= maxLength - 2:
            break
        decoder_input = torch.unsqueeze(torch.Tensor([torch.argmax(out[0][0])]), dim=0).to(dtype=torch.long)
    return "".join(result)

In [45]:
testIndex = 50096
preOut = test(df, testIndex, strVocab).replace("<SOS>", "").replace("<EOS>", "")
print(f"Number：{df.number[testIndex]}\nString：{df.string[testIndex]}\n预测：{preOut}")

Number：2 3 7 7 8 9 6 6 1 3 3
String：二 三 七 七 八 九 六 六 一 三 三
预测：二三七七八九六六一三三
