In [2]:
import torch.nn as nn
import torch
import torch.nn.functional as F
from mpmath.libmp.libelefun import atan_newton
from torch.nn.functional import embedding

In [3]:
MAX_LENGTH = 10

PAD_token = 0
SOS_token = 1
EOS_token = 2

In [4]:
# RNN编码器
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size, dropout_p=0.1):
        super().__init__()
        self.hidden_size = hidden_size # 隐藏态维度大小
        self.embedding = nn.Embedding(input_size, hidden_size) # 嵌入层
        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True) # RNN单元
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, X):
        X = self.embedding(X)
        X = self.dropout(X)
        output, hidden = self.rnn(X)
        return output, hidden

In [5]:
encoder = EncoderRNN(input_size=10, hidden_size=5)
input_vector = torch.arange(10).unsqueeze(0) # (1,10)
output, hidden = encoder(input_vector) # output:(1,10,5) hidden(1,1,5)
print('输入向量的维度：',input_vector.size())
print('输出向量的维度：',output.size())
print('最终隐藏态的维度：',hidden.size())

输入向量的维度： torch.Size([1, 10])
输出向量的维度： torch.Size([1, 10, 5])
最终隐藏态的维度： torch.Size([1, 1, 5])


In [8]:
class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(DecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)

    def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        decoder_input = torch.empty(batch_size, 1, dtype=torch.long).fill_(
            SOS_token)  # Start of Sentence词元，用于表示开始生成一个句子
        decoder_hidden = encoder_hidden # 编码器隐藏态 作为 解码器隐藏态 (1,10,5) ☆☆☆☆☆
        decoder_outputs = []
        for i in range(MAX_LENGTH):
            decoder_output, decoder_hidden = self.forward_step(
                decoder_input, decoder_hidden)
            decoder_outputs.append(decoder_output) # 输出追加到 decoder_outputs
            if target_tensor is not None:
                decoder_input = target_tensor[:, i].unsqueeze(1)
            else:
                _, topi = decoder_output.topk(1) # 最高概率的索引值
                decoder_input = topi.squeeze(-1).detach()


        decoder_outputs = torch.cat(decoder_outputs, dim=1)
        decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)
        return decoder_outputs, decoder_hidden, None # decoder_hidden 最后一个RNN单元输出的最后一个隐藏态

    def forward_step(self, x, hidden):
        x = self.embedding(x)
        x = F.relu(x)
        x, hidden = self.rnn(x, hidden) # x:(1,1,5) hidden(1,1,5)
        output = self.out(x) # (1,1,10)
        return output, hidden

In [9]:
decoder = DecoderRNN(hidden_size=5, output_size=10)
target_vector = torch.tensor([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]) # (1,10) 模拟目标target
encoder_outputs, encoder_hidden = encoder(input_vector)
print('encoder_outputs shape:',encoder_outputs.shape)
print('encoder_hidden shape:',encoder_hidden.shape)
output, hidden, _ = decoder(encoder_outputs, encoder_hidden, target_vector)

encoder_outputs shape: torch.Size([1, 10, 5])
encoder_hidden shape: torch.Size([1, 1, 5])


In [10]:
'''
前面部分代码实现了单独的Encoder2Decoder的Seq2Seq结构
主要思想是：Encoder的输出隐藏态作为Decoder的初始隐藏态
然后进行依次RNN单元的计算，每个RNNCell的输出就是该单元的预测
如果是进行强制学习的话，就是将target输出作为下一时刻输入
否则就是将当前时刻输出，作为下一时刻RNNCell的输入
这样逐Cell进行计算，计算出最终的预测序列
'''
print("输出向量的维度:", output.size())
print("最终隐藏状态的维度:", hidden.size())

输出向量的维度: torch.Size([1, 10, 10])
最终隐藏状态的维度: torch.Size([1, 1, 5])


In [12]:
# 注意力机制实现
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.Wa = nn.Linear(hidden_size, hidden_size)
        self.Ua = nn.Linear(hidden_size, hidden_size)
        self.Va = nn.Linear(hidden_size, 1)

    def forward(self, query, keys):
        scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys)))
        scores = scores.squeeze(2).unsqueeze(1)
        weights = F.softmax(scores, dim=-1)
        context = torch.bmm(weights, keys) # 只适用于 三维矩阵 乘法
        return context, weights

In [22]:
class AttentionDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1):
        super(AttentionDecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.attention = Attention(hidden_size)
        self.rnn = nn.RNN(2 * hidden_size, hidden_size, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        decoder_input = torch.empty(batch_size, 1, dtype=torch.long).fill_(SOS_token)
        decoder_hidden = encoder_hidden # (1,10,5)
        decoder_outputs = []
        attentions = []
        for i in range(MAX_LENGTH):
            decoder_output, decoder_hidden, attn_weights = self.forward_step(
                decoder_input, decoder_hidden, encoder_outputs)
            decoder_outputs.append(decoder_output)
            attentions.append(attn_weights)
            if target_tensor is not None:
                decoder_input = target_tensor[:, i].unsqueeze(1)
            else:
                _, topi = decoder_output.topk(1)
                decoder_input = topi.squeeze(-1).detach()

        decoder_outputs = torch.cat(decoder_outputs, dim=1)
        decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)
        attentions = torch.cat(attentions, dim=1)
        return decoder_outputs, decoder_hidden, attentions

    def forward_step(self, input, hidden, encoder_outputs):
        embedded = self.dropout(self.embedding(input))
        query = hidden.permute(1, 0, 2)
        context, attn_weights = self.attention(query, encoder_outputs) # (1,1,5) (1,10,5)
        # print('attn_weights.shape:',attn_weights.shape) # (1,1,10)
        input_rnn = torch.cat((embedded, context), dim=2)
        output, hidden = self.rnn(input_rnn, hidden)
       # print('output shape:',output.shape,'hidden shape:', hidden.shape)
        output = self.out(output)
        return output, hidden, attn_weights

In [23]:
decoder = AttentionDecoderRNN(hidden_size=5, output_size=10)
target_vector = torch.tensor([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]) # (1,10)
encoder_outputs, encoder_hidden = encoder(input_vector)
print('encoder_outpus.shape:',encoder_outputs.shape,'encoder_hidden.shape:',encoder_hidden.shape)
output, hidden, attentions = decoder(
    encoder_outputs, encoder_hidden, target_vector)
print("输出向量的维度:", output.size())
print('hidden size:',hidden.size())
print("注意力权重的维度:", attentions.size())

encoder_outpus.shape: torch.Size([1, 10, 5]) encoder_hidden.shape: torch.Size([1, 1, 5])
attn_weights.shape: torch.Size([1, 1, 10])
attn_weights.shape: torch.Size([1, 1, 10])
attn_weights.shape: torch.Size([1, 1, 10])
attn_weights.shape: torch.Size([1, 1, 10])
attn_weights.shape: torch.Size([1, 1, 10])
attn_weights.shape: torch.Size([1, 1, 10])
attn_weights.shape: torch.Size([1, 1, 10])
attn_weights.shape: torch.Size([1, 1, 10])
attn_weights.shape: torch.Size([1, 1, 10])
attn_weights.shape: torch.Size([1, 1, 10])
输出向量的维度: torch.Size([1, 10, 10])
hidden size: torch.Size([1, 1, 5])
注意力权重的维度: torch.Size([1, 10, 10])
