In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

from tensorflow import data as tfdata
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import optimizers
from tensorflow import losses
from tensorflow.keras import initializers as init

from tensorflow.keras.utils import plot_model

RNN
    - 全称： Recurrent Netural Netword  
    - 是一种适宜于处理序列数据的NN  
    - 广泛使用与语言模型，文本生成，机器翻译等  
    - 是为了更好的处理时序信息而设计的

    

这里我们使用RNN来进行尼采风格文本的自动生成  
本质其实预测一段英文文本的连续字母的概率分布  
通过逐个输入一段长为seq_length的序列，输出这些序列连续的下一个字符的概率分布，  
依次递归，生成接下来的字符，即可完成文本生成的任务


## 定义DataLoader

以字符为单位进行编码   
设字符种类数为num_chars，则每一个字符赋予一个0到nums_chars-1之间的唯一整数编号i

In [None]:
class DataLoader():
    def __init__(self):
        # 获取相关数据,尼采相关的文本信息
        path = tf.keras.utils.get_file('nietzsche.txt', origin='https://s3.amazonaws.com/text-datasets/nietzsche.txt')

        with open(path, 'r', encoding="utf-8") as fin:
            self.raw_text = fin.read().lower()
        # 获取所有的字符集
        self.chars = sorted(list(set(self.raw_text)))
        # 双向编码的字典
        self.char_indices = dict((c, i) for i,c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i,c in enumerate(self.chars))
        # 文本的编码信息
        self.text = [self.char_indices[c] for c in self.raw_text]
        
    def get_batch(self, seq_length, batch_size):
        """seq_length为目标字符串的长度
        """
        seq = []
        next_char = []

        for i in range(batch_size):
            index = np.random.randint(0, len(self.text)-seq_length)
            # 最后的seq_length个字符
            seq.append(self.text[index: index+seq_length])
            next_char.append(self.text[index+seq_length])
        # 返回最后seq_length的编码，一级最后一个字符
        return np.array(seq), np.array(next_char) #[batch_size,  seq_length], [num_chars]

## 模型实现

首先在_\_init__中实例化一个LSTMCell单元，以及一个线性变化用的全连接层。  
首先对序列进行One-hot编码。变化后的序列tensor形状为\[seq_length, num_chars\]  
然后初始化RNN单元状态，存入变量state中   
接下来将序列从头到为依次送入RNN单元，即在t时刻，将上一个时刻t-1的RNN单元状态state和序列的第t个元素inputs\[t, :\]送入RNN单元，得到当前时刻的输出output和RNN单元状态。  
取出RNN单元最后一次的输出，通过全连接层变化到nums_chars维，作为模型的输出

In [None]:
class RNN(tf.keras.Model):
    def __init__(self, num_chars, batch_size, seq_length):
        super().__init__()
        self.num_chars = num_chars
        self.seq_length = seq_length
        self.batch_size = batch_size
        
        # 初始化一个LSTMCell
        self.cell = tf.keras.layers.LSTMCell(units=256)
        # 初始化一个全连接层
        self.dense = tf.keras.layers.Dense(units=self.num_chars)
    
    def call(self, inputs, from_logits=False):
        # [batch_size, seq_length, num_chars]
        inputs = tf.one_hot(inputs, depth=self.num_chars)
        # 获得RNN的初始状态
        state = self.cell.get_initial_state(batch_size=self.batch_size, dtype=tf.float32)
        # 
        for t in range(self.seq_length):
            # 通过当前输入和前一刻的状态，得到输出和当前时间状态
            # 其实Output输出的为当前字符之后其他所有可能字符的概率分布
            # 这里训练使用每个样本中第t个位置的上下文？
            # state 其实是隐层的权重矩阵
            output, state = self.cell(inputs[:, t, :], state)
        logits = self.dense(output)

        # from_logits 参数控制输出是否通过softmax进行归一化, False表示使用， true表示不使用，即表示是否直接使用原始输出值
        if from_logits:
            return logits
        else:
            return  tf.nn.softmax(logits)
    
    def predict(self, inputs, temperature=1.):
        """inputs: 输入
           temperature： 分布控制参数
        """
        batch_size, _ =  tf.shape(inputs)

        # 调用训练好的模型进行预测，获取下一个字符的概率分布
        logits = self(inputs, from_logits=True)
        # 使用带temperature参数的 softmax函数 获得归一化的概率分布值
        prob = tf.nn.softmax(logits/temperature).numpy()
        # 在预测的概率分布上随机取样
        result  = np.array([np.random.choice(self.num_chars, p=prob[i, :]) for i in range(batch_size.numpy())])
        return result



## 模型超参

In [None]:
num_batches = 1000
seq_length = 40
batch_size = 50
learning_rete = 0.001

## 训练过程
- 从DataLoader中随机一批训练数据
- 将数据送入模型，计算模型的预测值 
- 将预测值与真实值进行比较，计算损失函数loss
- 计算损失函数关于模型变量的导数
- 使用优化器更新参数

In [None]:
# 实例化读取数据
data_loader = DataLoader()
data_loader.indices_char.pop(53)
data_loader.indices_char.pop(54)
data_loader.indices_char.pop(55)
data_loader.indices_char.pop(56)
data_loader.char_indices.pop("ä")
data_loader.char_indices.pop("æ")
data_loader.char_indices.pop("é")
data_loader.char_indices.pop("ë")
# 实例化模型
model = RNN(len(data_loader.chars), batch_size, seq_length)
# 优化器
optimizer = optimizers.Adam(learning_rate=learning_rete)

for batch_index in range(num_batches):
    # 从训练姐中随机取数据
    X, y = data_loader.get_batch(seq_length, batch_size)
    with tf.GradientTape() as tape:
        # 自动调用call方法
        y_pred = model(X)
        # 选用交叉熵损失函数(其实这些可以自定义,交叉熵适用于分类问题, 带sparse可以使用one-hot类型)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_pred=y_pred, y_true=y)
        loss = tf.reduce_mean(loss)
        if batch_index%200 == 0:
            print("batch %d \t loss: %f" % (batch_index, loss))
    grads = tape.gradient(loss, model.variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))

## 预测

文本生成过程有一点需要特别注意，使用tr.argmax()来将最大概率的值作为预测的值对于文本预测有些绝对，会使得生成的文本失去丰富性。  
因此采用np.random.choice()按照分布概率随机采样   
同时加入temperature参数控制分布的形状  参数值越大则分布越平缓，丰富度越高。

为此为RNN类增加predict方法, 见RNN 类

In [None]:
X_, _ = data_loader.get_batch(seq_length, 1)

# 设置四种不同的丰富度(temperature)
for diversity in [0.2, 0.5, 1.0, 1.2]:
    X = X_
    print("丰富度: %f" % diversity)
    for t in range(400):
        # 预测下一个字符的编号
        y_pred = model.predict(X, diversity)
        print(data_loader.indices_char[y_pred[0]], end="", flush=True)
        # 将预测的字符接在输入X的末尾，并截断第一个字符，保证X的长度不变， 方便循环预测
        X = np.concatenate([X[:, 1:], np.expand_dims(y_pred, axis=1)], axis=-1)
    print("\n")
