In [8]:
import numpy as np
from functools import reduce
%run activators.ipynb
%run cnn.ipynb

[[[ 6.  7.  5.]
  [ 3. -1. -1.]
  [ 2. -1.  4.]]

 [[ 2. -5. -8.]
  [ 1. -4. -4.]
  [ 0. -5. -5.]]]
filter weights:
array([[[-1.008,  0.99 , -0.009],
        [-0.005,  0.994, -0.006],
        [-0.006,  0.995,  0.996]],

       [[-1.004, -1.001, -0.004],
        [-0.01 , -0.009, -0.012],
        [-0.002, -1.002, -0.002]],

       [[-0.002, -0.002, -1.003],
        [-0.005,  0.992, -0.005],
        [ 0.993, -1.008, -1.007]]])
bias:
0.991
filter weights:
array([[[ 9.980e-01,  9.980e-01, -1.001e+00],
        [-1.004e+00, -1.007e+00,  9.970e-01],
        [-4.000e-03, -1.004e+00,  9.980e-01]],

       [[ 0.000e+00,  9.990e-01,  0.000e+00],
        [-1.009e+00, -5.000e-03, -1.004e+00],
        [-1.004e+00,  1.000e+00,  0.000e+00]],

       [[-1.004e+00, -6.000e-03, -5.000e-03],
        [-1.002e+00, -5.000e-03,  9.980e-01],
        [-1.002e+00, -1.000e-03,  0.000e+00]]])
bias:
-0.007
weights(0,0,0): expected - actural 5.000000 - 5.000000
weights(0,0,1): expected - actural 6.000000 - 6.000000
w

In [11]:
class RecurrentLayer(object):
    # state_list的行数等于时间步的数量，对于每个state行数通常对应隐藏层神经元的数量，而列数则对应内部状态如隐藏状态等
    # state_width指隐藏层神经元的个数
    # delta_list同state_list
    # times指时间步长的数量
    def __init__(self,input_width,state_width,activator,learning_rate):
        self.input_width = input_width
        self.state_width = state_width
        self.activator = activator
        self.learning_rate = learning_rate
        self.times = 0       # 当前时刻初始化为t0
        self.state_list = [] # 保存各个时刻的state
        self.state_list.append(np.zeros((state_width, 1))) #初始化s0
        self.U = np.random.uniform(-1e-4, 1e-4,(state_width, input_width))  # 初始化U
        self.W = np.random.uniform(-1e-4, 1e-4,(state_width, state_width))  # 初始化W
        
    # 前向计算
    def forward(self,input_array):
        self.times += 1
        state = (np.dot(self.U, input_array) +np.dot(self.W, self.state_list[-1]))
        element_wise_op(state, self.activator.forward)
        self.state_list.append(state)
        
    # 实现BPTT算法
    def backward(self, sensitivity_array, activator):
        self.calc_delta(sensitivity_array, activator)
        self.calc_gradient()
    
    # 按照梯度下降，更新权重
    def update(self):
        self.W -= self.learning_rate * self.gradient
        
    def calc_delta(self,sensitivity_array,activator):
        # sensitivity_array:输出层的误差项
        self.delta_list=[] # 保存各个时刻的误差项
        for i in range(self.times):
            self.delta_list.append(np.zeros((self.state_width,1)))
        self.delta_list.append(sensitivity_array)
        # 迭代计算每个时刻的误差项
        for k in range(self.times-1,0,-1):
            self.calc_delta_k(k,activator)
            
    def calc_delta_k(self,k,activator):
        # .copy():浅拷贝，确保state变量始终包含原始、未更新的隐藏状态值
        # 如果不使用.copy()方法，而是直接引用self.state_list[k+1]，那么在权重更新后，这个引用也会指向更新后的值，导致我们的delta计算基于错误的数据
        state = self.state_list[k+1].copy()
        element_wise_op(self.state_list[k+1],activator.backward)
        # state[:, 0] 的意思是选取state数组中所有行的第0列的元素，即从state数组中提取了所有行（即所有神经元）的第一列（即隐藏状态）
        self.delta_list[k]=np.dot(np.dot(self.delta_list[k+1].T,self.W),np.diag(state[:,0])).T
        
    def calc_gradient(self):
        self.gradient_list = [] # 保存各个时刻的权重梯度
        for t in range(self.times + 1):
            self.gradient_list.append(np.zeros((self.state_width, self.state_width)))
        for t in range(self.times, 0, -1):
            self.calc_gradient_t(t)
        # 实际的梯度是各个时刻梯度之和
        self.gradient = reduce(lambda a, b: a + b, self.gradient_list,self.gradient_list[0]) # [0]被初始化为0且没有被修改过
        
    # 计算每个时刻t权重的梯度
    def calc_gradient_t(self, t):
        gradient = np.dot(self.delta_list[t],self.state_list[t-1].T)
        self.gradient_list[t] = gradient
        
    def reset_state(self):
        self.times = 0       # 当前时刻初始化为t0
        self.state_list = [] # 保存各个时刻的state
        self.state_list.append(np.zeros((self.state_width, 1)))      # 初始化s0

def data_set():
    x = [np.array([[1], [2], [3]]),
         np.array([[2], [3], [4]])]
    d = np.array([[1], [2]])
    return x, d

# 梯度检查
def gradient_check():
    # 设计一个误差函数，取所有节点输出项之和
    error_function = lambda o: o.sum()
    
    rl = RecurrentLayer(3, 2, IdentityActivator(), 1e-3)

    # 计算forward值
    x, d = data_set()
    rl.forward(x[0])
    rl.forward(x[1])
    
    # 求取sensitivity map
    sensitivity_array = np.ones(rl.state_list[-1].shape,dtype=np.float64)
    # 计算梯度
    rl.backward(sensitivity_array, IdentityActivator())
    
    # 检查梯度
    epsilon = 10e-4
    for i in range(rl.W.shape[0]):
        for j in range(rl.W.shape[1]):
            rl.W[i,j] += epsilon
            rl.reset_state()
            rl.forward(x[0])
            rl.forward(x[1])
            err1 = error_function(rl.state_list[-1])
            rl.W[i,j] -= 2*epsilon
            rl.reset_state()
            rl.forward(x[0])
            rl.forward(x[1])
            err2 = error_function(rl.state_list[-1])
            expect_grad = (err1 - err2) / (2 * epsilon)
            rl.W[i,j] += epsilon
            print('weights(%d,%d): expected - actural %f - %f' % (i, j, expect_grad, rl.gradient[i,j]))

def test():
    l = RecurrentLayer(3, 2, ReluActivator(), 1e-3)
    x, d = data_set()
    l.forward(x[0])
    l.forward(x[1])
    l.backward(d, ReluActivator())
    return l

In [12]:
if __name__ == '__main__':
    test()
    gradient_check()

weights(0,0): expected - actural -0.000175 - -0.000175
weights(0,1): expected - actural 0.000218 - 0.000218
weights(1,0): expected - actural -0.000175 - -0.000175
weights(1,1): expected - actural 0.000218 - 0.000218
