# RNN & LSTM

In [None]:
import numpy as np

class RNN:
    
    def __init__(self, input_size, hidden_size, output_size):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        
        # only 1 hidden layer
        self.U = np.zeros((input_size, hidden_size))
        self.V = np.zeros((hidden_size, output_size))
        self.W = np.zeros((hidden_size, hidden_size))
        
    def forward(self, input_x):
        time_steps = len(input_x)
        
        # initial state (1, hidden_size) + state at each timestep (time_steps, hidden_size)
        s = np.zeros((time_steps + 1, self.hidden_size))
        s[-1] = np.zeros(self.hidden_size)
        
        # output at each timestep
        o = np.zeros((time_steps, self.output_size))
        # go throught all time steps
        for t in range(time_steps):
            s[t] = np.tanh(np.dot(input_x[t], self.U) + np.dot(self.W, s[t - 1]))
            o[t] = np.dot(s[t], self.V)
            
        return o, s
    
        
    def bptt(self, x, y):
        time_steps = len(x)
        # forward pass
        # o(time_steps, output_size), s(1 + time_steps, hidden_size)
        o, s = self.forward(x)
        
        delta_U = np.zeros_like(self.U)
        delta_V = np.zeros_like(self.V)
        delta_W = np.zeros_like(self.W)
        
        # delta_o(time_steps, output_size), y(1, output_size)
        delta_o = o - y
        
        for t in range(time_steps - 1, -1, -1):
            delta_V += np.outer(s[t], delta_o[t])
            
            delta_s = self.tanh_prime(np.dot(x[t], self.U) + np.dot(self.W, s[t - 1])) #(1, hidden_size)
            delta_s = delta_s * np.dot(delta_o[t], self.V.T)
            
            # backward 10 time_steps at most
            for k in range(t, max(-1, t - 10), -1):
                delta_U += np.outer(x[k], delta_s)
                delta_W += np.outer(delta_s, s[k - 1].T)
                
                temp = self.tanh_prime(np.dot(x[k-1], self.U) + np.dot(self.W, s[k-2]))
                delta_s = temp * np.dot(delta_s, self.W)

        return delta_U, delta_V, delta_W
    
    def update(self, batch, lr):
        du = np.zeros_like(self.U)
        dv = np.zeros_like(self.V)
        dw = np.zeros_like(self.W)
        
        for x, y in batch:
            delta_u, delta_v, delta_w = self.bptt(x, y)
            du += delta_u
            dv += delta_v
            dw += delta_w
        
        self.U -= lr * du
        self.V -= lr * dv
        self.W -= lr * dw
        
    def train(self, train_data, epochs, batch_size, lr):
        for i in range(epochs):
            np.random.shuffle(train_data)
            batches = [train_data[t : t + batch_size] for t in range(0, len(train_data), batch_size)]
            
            for batch in batches:
                self.update(batch, lr)
            
            loss = 0
            for x, y in train_data:
                o, s = self.forward(x)
                loss += (o[-1] - y)
            loss /= len(train_data)
            print('Epoch %d done, loss: %f' % (i + 1, loss))
        
    def tanh_prime(self, x):
        return 1 - np.tanh(x) ** 2

In [None]:
import tensorflow as tf
mnist = tf.keras.datasets.mnist

def onehot(y):
    arr = np.zeros([y.shape[0], 10])
    for i in range(y.shape[0]):
        arr[i][y[i]] = 1
    return arr

(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
y_train = onehot(y_train)
y_test = onehot(y_test)

train_data = [t for t in zip(x_train, y_train)]
test_data = [t for t in zip(x_test, y_test)]

input_size = 28
hidden_size = 100
output_size = 10

rnn = RNN(input_size, hidden_size, output_size)

lr = 0.001
epochs = 100
batch_size = 50

rnn.train(train_data, epochs, batch_size, lr)

correct = 0
for x, y in test_data:
    ret = rnn.forward(x)
    if np.argmax(ret) == np.argmax(y):
        correct += 1

print('test accuracy: %f' % (float(correct) / len(test_data)))