# LSTM 实现 

[https://zhuanlan.zhihu.com/p/81549798](https://zhuanlan.zhihu.com/p/81549798)

[https://zhuanlan.zhihu.com/p/32085405](https://zhuanlan.zhihu.com/p/32085405)


https://zhuanlan.zhihu.com/p/54868269




In [182]:
import tensorflow as tf
from tensorflow.python.keras import initializers
from tensorflow.python.keras import backend as K
from tensorflow.python.ops import array_ops
from tensorflow.python.keras import activations
from tensorflow.python.util.tf_export import keras_export
import numpy as np
from tensorflow import keras
import os
import time
import pickle
print('tf version: ', tf.__version__)
print('GPU : ', tf.test.is_gpu_available())
print('GPU list', tf.config.list_physical_devices('GPU'))

tf version:  2.1.0
GPU :  False
GPU list []


In [207]:
class LSTM_CELL(tf.keras.layers.Layer):
    def __init__(self, units=256, **kwargs):
        # lstm 维度
        self.units = units
        super(LSTM_CELL, self).__init__(**kwargs)

    def build(self, input_shape):
        input_dim = input_shape[-1]
        self.w = self.add_weight(shape=(input_dim, self.units * 4), name='kernel',
            initializer=initializers.get('glorot_uniform'))
        print("LSTM w.shape: {}".format(self.w.shape))
        

        # u 保存hadden 的权重
        self.u = self.add_weight(shape=(self.units, self.units * 4),
                                                name='recurrent_kernel',
                                                initializer=initializers.get('orthogonal'))
        print("LSTM u.shape: {}".format(self.u.shape))
        self.bias = self.add_weight(
            shape=(self.units * 4), name='bias',
            initializer=initializers.get('zeros'))
        print("LSTM b.shape: {}".format(self.bias.shape))
#         self.recurrent_activation = activations.get('hard_sigmoid')
#         self.activation = activations.get('tanh')
        
        self.sigmoid = activations.get('hard_sigmoid')
        self.tanh = activations.get('tanh')

    def call(self, inputs, states):
        """
        input shape 是三维 ，同时会计算batch_size 个样本数据
        """
#         print("---- call ---  states ", states)
        last_h = states[0]   # h(t-1)
        last_c = states[1]   # c(c-1)
        # i information 输入门
        # f forget 遗忘门
        # o output 输出门
        # c. cell
        # 四个权重保持在一个变量里面
        w_i, w_f, w_c, w_o = tf.split(self.w, num_or_size_splits=4, axis=1)
#         print('w_i.shape', w_i.shape)
        b_i, b_f, b_c, b_o = tf.split(self.bias, num_or_size_splits=4, axis=0)
#         print('b_i.shape', b_i.shape)
        # w x
        x_i = K.dot(inputs, w_i)
#         print("inputs shape {} * w_i shape {} = x_i shape {} ".format(inputs.shape, w_i.shape, x_i.shape))
        x_f = K.dot(inputs, w_f)
        x_c = K.dot(inputs, w_c)
        x_o = K.dot(inputs, w_o)
        # w x + b
        x_i = K.bias_add(x_i, b_i)
        x_f = K.bias_add(x_f, b_f)
        x_c = K.bias_add(x_c, b_c)
        x_o = K.bias_add(x_o, b_o)

        u_i, u_f, u_c, u_o = tf.split(self.u, num_or_size_splits=4, axis=1)
        # w x + u * h + x
        i = self.sigmoid(x_i + K.dot(last_h, u_i))
        f = self.sigmoid(x_f + K.dot(last_h, u_f))
#         c = f * last_c + self.tanh(x_c + K.dot(last_h, u_c))
        c = f * last_c + i * self.tanh(x_c + K.dot(last_h, u_c))

        o = self.sigmoid(x_o + K.dot(last_h, u_o))

        # 计算 h
        h = o * self.tanh(c)
        
        return h, (h, c)

class Rnn(tf.keras.layers.Layer):
    def __init__(self, units=128):
        super(Rnn, self).__init__()
        self.cell = LSTM_CELL(units)
        self.init_state = None
    def build(self, input_shape):
        print('Rnn shape: ', input_shape)
        shape = input_shape.as_list()
        n_batch = shape[0]
        init_h = tf.zeros(shape=[n_batch, self.cell.units])
        init_c = init_h
        self.init_state = (init_h, init_c)

    def call(self, inputs):
        """
        前向传播， 依次遍历每个时间序列
        第一个维度是 样本数量
        第二个维度是 时间序列
        """
        # time step
        ts = inputs.shape.as_list()[1]
#       print(inputs.shape.as_list())
        h, c = self.init_state
        for i in range(ts):
            h, (h, c) = self.cell(inputs[:, i],(h, c))
        return h


In [208]:
a = tf.random.normal(shape=(2, 3, 4))
print(a.shape)
rnn = Rnn(5)
h = rnn(a)
print(h.shape)
# print(a[0])
# [4, 28, 28]
# LSTM w.shape: (28, 1024)
# LSTM u.shape: (256, 1024)
# LSTM b.shape: (1024,)
# w_i.shape (28, 256)
# b_i.shape (256,)

(2, 3, 4)
Rnn shape:  (2, 3, 4)
LSTM w.shape: (4, 20)
LSTM u.shape: (5, 20)
LSTM b.shape: (20,)
(2, 5)


In [209]:
class MyModel(tf.keras.Model):
    def __init__(self):
        super(MyModel, self).__init__()

        # office lstm
        #self.rnn = tf.keras.layers.RNN(tf.keras.layers.LSTMCell(256))
        # my lstm
        self.rnn = Rnn(256)
        # 三层网络， 128 Dense + 10 softmax
        self.d1 = tf.keras.layers.Dense(128, activation="relu")
        self.d2 = tf.keras.layers.Dense(10, activation="softmax")
    def call(self, x):
        """
        前向传播， 预测， 输入x， 输出y。 
        """
        x = self.rnn(x)
        # [batch_size, d1.output_size], [4, 128]
        x = self.d1(x)
        # [batch_size, d2.output_size], [4, 10]
        # 最后输出分类
        x = self.d2(x)
#       print('------x.shape', x.shape)
        return x

In [210]:
@tf.function
def train_step(model, loss, opti, images, labels, train_loss, train_acc):
    with tf.GradientTape() as tape:
        # pred [batch_size, n_class] (4, 10)
        pred = model(images)
        loss_val = loss(labels, pred)
    train_loss.update_state(loss_val)
    train_acc.update_state(labels, pred)
    grad = tape.gradient(loss_val, model.trainable_variables)
    opti.apply_gradients(zip(grad, model.trainable_variables))

In [211]:
# 定义优化器
opti = tf.keras.optimizers.Adam()
# 定义损失函数
loss = tf.keras.losses.SparseCategoricalCrossentropy()
# 用于记录损失值
train_loss = tf.keras.metrics.Mean()
# 记录正确率
train_acc = tf.keras.metrics.SparseCategoricalAccuracy()
# 加载数据
fashion_mnist = keras.datasets.fashion_mnist
(train_images, train_labels), _ = fashion_mnist.load_data()
train_images = train_images / 255.0
num_used = 5000
train_images = train_images[:num_used]
train_labels = train_labels[:num_used]
train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(10000).batch(4)
# 定义模型
model = MyModel()
epochs = 30



In [212]:
train_images[0].shape

(28, 28)

In [None]:
list_time_cost = list()
list_acc = list()
for epoch in range(epochs):
    # train
    train_loss.reset_states()
    train_acc.reset_states()
    # images [batch_size, height, width] (4, 28, 28)
    # labels [batch_size]
    start = time.time()
    for images, labels in train_ds:
        train_step(model, loss, opti, images, labels, train_loss, train_acc)
    ends = time.time()
    cost = ends - start
    list_time_cost.append(cost)
    list_acc.append(train_acc.result().numpy())
    print("Time: {:.2f} s, Epoch: {:2d}, loss: {:.5f}, acc: {:.5f}".format(cost, epoch, train_loss.result(), train_acc.result()))
# with open("./output/my_lstm_acc.pkl", "wb") as fw:
#     pickle.dump(list_acc, fw)
# with open("./output/my_lstm_time_cost.pkl", "wb") as fw:
#     pickle.dump(list_time_cost, fw)



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Rnn shape:  (4, 28, 28)
LSTM w.shape: (28, 1024)
LSTM u.shape: (256, 1024)
LSTM b.shape: (1024,)
Time: 27.05 s, Epoch:  0, loss: 1.07026, acc: 0.59680
Time: 21.57 s, Epoch:  1, loss: 0.72783, acc: 0.72920
Time: 21.61 s, Epoch:  2, loss: 0.58067, acc: 0.78720
Time: 21.49 s, Epoch:  3, loss: 0.52058, acc: 0.80900
Time: 21.50 s, Epoch:  4, loss: 0.47311, acc: 0.82900
Time: 21.60 s, Epoch:  5, loss: 0.43479, acc: 0.83560
Time: 21.55 s, Epoch:  6, loss: 0.41062, acc: 0.84780
Time: 21.50 s, Epoch:  7, loss: 0.38830, acc: 0.85960
Time: 21.66 s, Epoch:  8, loss: 0.36042, acc: 0.86460
