In [1]:
# 初始化
%load_ext autoreload
%autoreload 2
import sys
import os
os.chdir('E:\GitHub\QA-abstract-and-reasoning')
sys.path.append('E:\GitHub\QA-abstract-and-reasoning')

In [2]:
import tensorflow as tf
from utils.config_gpu import config_gpu
import numpy as np

In [3]:
config_gpu()

1 Physical GPUs, 1 Logical GPUs


# 参数准备
## 模型参数

In [110]:
vocab_size = 32768
embedding_dim = 300
embedding_matrix = np.zeros((32768, 300))
dec_units = enc_units = 256
batch_size = 64

## 模型输入

In [6]:
# 构造GRU的输入 enc_input (batch_size, enc_len)
enc_input = tf.zeros((batch_size, 200))
# 插入嵌入层 enc_input2 (batch_size, enc_len, embedding_dim)
embedding = tf.keras.layers.Embedding(vocab_size, 
                                      embedding_dim, 
                                      weights=[embedding_matrix],
                                      trainable=False)
enc_input2 = embedding(enc_input)

# GRU测试
## 构造单\双向GRU

In [8]:
gru = tf.keras.layers.GRU(enc_units,
                       return_sequences=True,
                       return_state=True,
                       recurrent_initializer='glorot_uniform')
bigru = tf.keras.layers.Bidirectional(gru, merge_mode="concat")

## 单向GRU测试

In [39]:
# 获取初始状态
initial_state = gru.get_initial_state(enc_input2)
initial_state

[<tf.Tensor: id=1612, shape=(64, 256), dtype=float32, numpy=
 array([[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]], dtype=float32)>]

In [40]:
# 进入gru层
# output 所有gru单元的输出 (batch_size, enc_len, enc_units)
# state 最后一个gru单元的输出 (batch_size, enc_units)
output, state = gru(enc_input2, initial_state=initial_state)

In [64]:
output, state = gru(enc_input2)

## 双向GRU测试

In [12]:
# initial_state 传入的是个列表 因为bigru要初始化前向和后向的gru单元
# 所以initial_state 的长度是2 [state1, state2]
# (batch_size, enc_len, enc_units*2)
# forward_state, backward_state (batch_size, enc_units*2)
output, forward_state, backward_state = bigru(enc_input2, initial_state=initial_state*2)

# LSTM测试

In [13]:
lstm = tf.keras.layers.LSTM(enc_units,
                       return_sequences=True,
                       return_state=True,
                       recurrent_initializer='glorot_uniform')
bilstm = tf.keras.layers.Bidirectional(lstm, merge_mode="concat")

![](https://img-blog.csdn.net/20180712120310214?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FteV9tbQ==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)

![](http://colah.github.io/posts/2015-08-Understanding-LSTMs/img/LSTM3-focus-o.png)

## 单向
与gru不同的是，lstm的输出会多一个

In [52]:
enc_input = tf.zeros((batch_size, 200))
enc_input2 = embedding(enc_input)
# initial_state [state1, state2] state (batch_size, enc_units)
initial_state = lstm.get_initial_state(enc_input2)
# enc_output (batch_size, enc_len, enc_units)
# c_t, h_t (batch_size, enc_units)
enc_output, enc_hidden, c_t = lstm(enc_input2, initial_state=initial_state)

# 试着搭建LSTM的seq2seq模型

In [62]:
# LSTM RNN网络范例
class RNN(tf.keras.Model):
    def __init__(self, num_chars, batch_size, seq_length):
        super().__init__()
        self.num_chars = num_chars
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.cell = tf.keras.layers.LSTMCell(units=256)
        self.dense = tf.keras.layers.Dense(units=self.num_chars)

    def call(self, inputs, from_logits=False):
        inputs = tf.one_hot(inputs, depth=self.num_chars)       # [batch_size, seq_length, num_chars]
        state = self.cell.get_initial_state(batch_size=self.batch_size, dtype=tf.float32)
        for t in range(self.seq_length):
            output, state = self.cell(inputs[:, t, :], state)
        logits = self.dense(output)
        if from_logits:
            return logits
        else:
            return tf.nn.softmax(logits)

In [145]:
class LSTM_Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, embedding_matrix, enc_units, batch_size):
        super(LSTM_Encoder, self).__init__()
        self.batch_size = batch_size
        self.enc_units = enc_units
        self.use_bi_lstm = False
        # 双向
        if self.use_bi_lstm:
            self.enc_units = self.enc_units // 2

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, weights=[embedding_matrix],
                                                   trainable=False)
        self.lstm = tf.keras.layers.LSTM(self.enc_units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
        

        self.bi_lstm = tf.keras.layers.Bidirectional(self.lstm)

    def call(self, enc_input):
        # (batch_size, enc_len, embedding_dim)
        enc_input_embedded = self.embedding(enc_input)
        initial_state = self.lstm.get_initial_state(enc_input_embedded)

        if self.use_bi_lstm:
            # 是否使用双向GRU
            output, forward_state, backward_state = self.bi_lstm(enc_input_embedded, initial_state=initial_state * 2)
            enc_hidden = tf.keras.layers.concatenate([forward_state, backward_state], axis=-1)

        else:
            # 单向GRU
            output, enc_hidden, c_t  = self.lstm(enc_input_embedded, initial_state=initial_state)

        return output, enc_hidden, c_t

In [146]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, dec_hidden, enc_output):
        # dec_hidden shape == (batch_size, dec_units)
        # enc_output (batch_size, enc_len, enc_units)

        # hidden_with_time_axis shape == (batch_size, 1, dec_units)
        # we are doing this to perform addition to calculate the score
        hidden_with_time_axis = tf.expand_dims(dec_hidden, 1)

        # we get 1 at the last axis because we are applying score to self.V
        # self.V 括号内的维度为 (batch_size, enc_len, attn_units)
        # score (batch_size, enc_len, 1)
        score = self.V(tf.nn.tanh(
            self.W1(enc_output) + self.W2(hidden_with_time_axis)))

        # attention_weights (batch_size, enc_len, 1)
        attention_weights = tf.nn.softmax(score, axis=1)

        # # 使用注意力权重*编码器输出作为返回值，将来会作为解码器的输入
        # enc_output (batch_size, enc_len, enc_units)
        # attention_weights (batch_size, enc_len, 1)
        context_vector = attention_weights * enc_output

        # context_vector (batch_size, enc_units)
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

In [147]:
class LSTM_Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, embedding_matrix, dec_units, batch_size):
        super(LSTM_Decoder, self).__init__()
        self.batch_sz = batch_size
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, weights=[embedding_matrix],
                                                   trainable=False)
        
        self.lstm = tf.keras.layers.LSTMCell(units=self.dec_units,
                                            recurrent_initializer='glorot_uniform')
        self.attention = BahdanauAttention(self.dec_units)
        self.fc = tf.keras.layers.Dense(vocab_size)

    def call(self, dec_input, prev_dec_state, enc_output):
        # dec_input 一个单词 (batch_size, )
        
        # prev_dec_state LSTM 的state有两个，用list装起来
        # prev_dec_state[0] h_t (batch_size, units)
        # prev_dec_state[1] c_t (batch_size, units)
        
        # enc_output 用来计算注意力
        # dec_input (batch_size, embedding_dim)
        dec_input = self.embedding(dec_input)
        # 用h_t计算注意力
        
        # context_vector (batch_size, units)
        # attention_weights (batch_size, enc_len, 1)
        context_vector, attention_weights = self.attention(prev_dec_state[0], enc_output)
        # tf.squeeze(attention_weights)
        
        #dec_input (batch_size, units+embedding_dim)
        dec_input = tf.concat([context_vector, dec_input], axis=-1)

        # dec_output (batch_size, units)
        # `dec_state` same as `prev_dec_state`
        # PS: dec_output.shape == dec_state[0].shape
        dec_output, dec_state = self.lstm(dec_input, prev_state)
        # 来自源代码:
        # tf.keras.layers.LSTMCell 的返回值如下
        # return h, [h, c]
        
        # pred (batch_size, vocab_size)
        pred = self.fc(dec_output)
        return pred, dec_state, attention_weights

In [148]:
# 创建一个LSTM编码器
lstm_enc = LSTM_Encoder(vocab_size, embedding_dim, embedding_matrix, enc_units, batch_size)

# 获得encoder的输出
enc_output, enc_hidden, c_t = lstm_enc(enc_input)
# 构造decoder的初始输入
dec_input = tf.constant([32766] * batch_size)
# 初始化LSTM decoder
lstm_dec = LSTM_Decoder(vocab_size, embedding_dim, embedding_matrix, dec_units, batch_size)

In [152]:
prev_state = [enc_hidden, c_t]
# 计算decoder的输出
pred, dec_state, attention_weights = lstm_dec(dec_input, prev_state, enc_output)