Basic RNNs

In [2]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import os 

In [3]:
def generate_time_series(batch_size, n_steps):
    """
    param:
        batch_size:批处理大小
        n_steps: 时间步长
    return: 
        一个形状为[batch_size, n_steps, 1]的列表
        产生单变量时间序列,由两个固定振幅但频率和相位随机的正弦波总和组成
    """
    freq1, freq2, offsets1, offsets2 = np.random.rand(4, batch_size, 1)
    # 在[0,1]创建等差数列，共n_steps个数字
    time = np.linspace(0, 1, n_steps)
    # wave 1
    series = 0.5 * np.sin((time - offsets1) * (freq1 * 10 + 10))
    # wave 2
    series += 0.2 * np.sin((time - offsets2) * (freq2 * 20 + 20))
    # +noise
    series += 0.1 * (np.random.rand(batch_size, n_steps) - 0.5)
    # +dimension 输入特征通常为3D数组
    return series[..., np.newaxis].astype(np.float32)
    # it works when not adding 1d to series
    # return series

单变量时间序列

In [None]:
# 每个时间序列有51个值
# 解释一下，series[示例号，时间序列号]
# X 包含 许多个从0时刻到n_steps-1时刻的时间序列
# Y 包含 许多个n_steps时刻的值
# X获取索引为0~49共50个时刻的时间序列，y获取索引为50时刻为51的时间序列
n_steps = 50
series = generate_time_series(10000, n_steps + 1)
X_train, y_train = series[:7000, :n_steps], series[:7000, -1]
X_valid, y_valid = series[7000:9000, :n_steps], series[7000:9000, -1]
X_test, y_test = series[9000:, :n_steps], series[9000:, -1]


只用简单的线性回归模型

In [None]:
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[50, 1]),
    keras.layers.Dense(1),
])
# 使用MSE损失和Adam优化器编译并在训练集上训练
model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

In [None]:
# 最后在验证集上评估
model.evaluate(X_valid, y_valid)

实现一个简单的RNN


In [None]:

model = keras.models.Sequential([
    keras.layers.SimpleRNN(1, input_shape=[None, 1])
])

optimizer = keras.optimizers.Adam(learning_rate=0.005)
model.compile(loss="mse", optimizer=optimizer)
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

In [None]:
model.evaluate(X_valid, y_valid)

使用深度RNNs

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20, return_sequences=True),
    keras.layers.SimpleRNN(1)
])

model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

In [None]:
model.evaluate(X_valid, y_valid)

让简单RNN层只保留最后一个输出

In [None]:
# 不会使用最终层的隐藏状态
# 由于可能使用别的激活函数，则最好换成密集层
model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20),
    keras.layers.Dense(1)
])

model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

In [None]:
model.evaluate(X_valid, y_valid)

---

用于一次性预测未来10步的时间序列（代码略）

---

用于每一个时刻都预测未来10步的时间序列（下为代码）

时刻0预测 1~10， 时刻1预测 2~11，时刻49预测 50~59

每当模型作预测的时候，模型都只能看到过去时刻的输入，而看不到未来，所以尽管数据集的每个实例每个时间序列对应值都是预设的，模型并不会因此而受到影响

<font color='yellow'>呵呵</font>

In [4]:
n_steps = 50
series = generate_time_series(10000, n_steps + 10)
X_train = series[:7000, :n_steps]
X_valid = series[7000:9000, :n_steps]
X_test = series[9000:, :n_steps]
Y = np.empty((10000, n_steps, 10))
for step_ahead in range(1, 10 + 1):
    Y[..., step_ahead - 1] = series[..., step_ahead:step_ahead + n_steps, 0]
Y_train = Y[:7000]
Y_valid = Y[7000:9000]
Y_test = Y[9000:]

In [5]:
# 使用自定义指标
# 虽然一次预测十步，
# 但还是只用每个时间序列的最后时刻的输出作为评估数据
def last_time_step_mse(Y_true, Y_pred):
    return keras.metrics.mean_squared_error(Y_true[:, -1], Y_pred[:, -1])

使用批量归一化（略）

使用层归一化

In [None]:
from tensorflow.keras.layers import LayerNormalization

In [None]:
# 自定义记忆单元
class LNSimpleRNNCell(keras.layers.Layer):
    def __init__(self, units, activation="tanh", **kwargs):
        super().__init__(**kwargs)
        self.state_size = units
        self.output_size = units
        # 获取没有激活函数的SimpleRNNCell
        self.simple_rnn_cell = keras.layers.SimpleRNNCell(units, activation=None)
        # 获取所需的归一化（此处是层归一化）
        self.layer_norm = keras.layers.LayerNormalization()
        # 获取所需的激活函数
        self.activation = keras.activations.get(activation)
    # 应用于简单的RNN单元
    def call(self, inputs, states):
        # 计算当前输入和先前隐藏状态的线性组合，并返回两个结果
        # new_states[0]等于outputs
        outputs, new_states = self.simple_rnn_cell(inputs, states)
        # 进行层归一化后再激活
        norm_outputs = self.activation(self.layer_norm(outputs))
        # 一个作为输出，一个是新的隐藏状态
        return norm_outputs, [norm_outputs]

所有循环层和keras提供的所有单元都有一个dropout和recurrent_dropout超参数

前者用于每个时间步长的输入的dropout率，后者定义了隐藏装他的dropout率

In [None]:
# 要使用自定义单元，我们需要做的是创建一个keras.layers.RNN层，并向其传递一个单元实例
model = keras.models.Sequential([
    keras.layers.RNN(LNSimpleRNNCell(20), return_sequences=True,
                     input_shape=[None, 1]),
    keras.layers.RNN(LNSimpleRNNCell(20), return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

<font color='red'>解决短期记忆问题</font>

<font color='blue'>如果你把LSTM单元视为黑匣子，那就等于没学</font>

https://blog.csdn.net/shijing_0214/article/details/52081301

https://blog.csdn.net/niuxuerui11/article/details/109036092

In [7]:
# 简单地使用LSTM层而不是SimpleRNN层
model = keras.models.Sequential([
    keras.layers.LSTM(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.LSTM(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])

history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [None]:
model = keras.models.Sequential([
    keras.layers.GRU(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.GRU(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))