# Recurrent Neural Network in NumPy

In [224]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import io, os, sys, types
from IPython import get_ipython
from nbformat import read
from IPython.core.interactiveshell import InteractiveShell

from . import Activation

import sklearn as sk

import copy

import abc

## Model

In [228]:
class Rnn:
    
    def __init__(self, input_dim, hidden_dim, output_dim, activation_h=Sigmoid(), activation_o=Sigmoid()):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.batch_size = None
        self.lrate = None
        
        self.activation_h = activation_h
        self.activation_o = activation_o
        
        self.layers = {
            'input': [],
            'hidden': [],
            'output': []
        }
        
        self.synapse_ih = {
            'weights': np.random.random((input_dim, self.hidden_dim ))*2-1,
            'biases': np.random.random((1, self.hidden_dim))*2-1
        } 
        self.synapse_hh = {
            'weights': np.random.random((self.hidden_dim , self.hidden_dim ))*2-1
        }
        self.synapse_ho = {
            'weights': np.random.random((self.hidden_dim , self.output_dim))*2-1,
            'biases': np.random.random((1, self.output_dim))*2-1
        }
        
    def train(self, train_generator, validation_generator, epochs=1000, batch_size=1, lrate=0.1):
        self.batch_size = batch_size
        self.lrate = lrate
        print('TRAINING PHASE:\n\n')
        
        for epoch in range(epochs):
            train_error = 0
            validation_error = 0
            
            train_len = 1
            validation_len = 1
            
            for index, batch in enumerate(train_generator()):
                prediction = self._forward_prop(x=batch['input'], y=batch['target'])
                
                error = np.array([batch['target'][index] - prediction[index] for index in range(len(batch['target']))])
                self._back_prop()
                
                train_error += np.abs(error).sum()/len(batch['target'])
                
                train_len = index
                
            for index, batch in  enumerate(validation_generator()):
                prediction = self._forward_prop(batch['input'], y=batch['target'])
                
                error = np.array([batch['target'][index] - prediction[index] for index in range(len(batch['target']))])
                
                validation_error += np.abs(error).sum()/len(batch['target'])
                
                validation_len = index
                
            if epoch % 100 == 0:
                print('Epoch: {0}\t Train Error: {1:.2f}\t Validation Error: {2:.2f}'.format(epoch, train_error/train_len, validation_error/validation_len))
                
    def test(self, test_generator, evaluate):
        print('\n\nTRAINING PHASE:\n\n')
        
        for batch in test_generator():
            evaluate(batch['input'], batch['target'], self._forward_prop(x=batch['input'], y=batch['target']))
    
    def _forward_prop(self, x, y):
        """x is the sequencial input and y is the array of the corresponding targets"""
        self.layers['input'] = []
        self.layers['hidden'] = [np.zeros((self.batch_size, self.hidden_dim))]
        self.layers['output'] = []
        
        self.delta_ho = []
        
        for index in range(x.shape[0]):
            self.layers['input'].append(np.atleast_2d(x[index]))
            
            self.layers['hidden'].append(np.dot(self.layers['input'][-1], self.synapse_ih['weights']) + 
                                                      np.dot(self.activation_h.function(self.layers['hidden'][-1]), self.synapse_hh['weights']) +
                                                      self.synapse_ih['biases'])
            self.layers['output'].append(np.dot(self.activation_h.function(self.layers['hidden'][-1]), self.synapse_ho['weights']) +
                                                       self.synapse_ho['biases'])
            
            self.delta_ho.append((np.atleast_2d(y[index]).T - self.activation_h.function(self.layers['output'][-1]))*self.activation_o.derivative(self.layers['output'][-1]))
        
        return [self.activation_o.function(self.layers['output'][index]) for index in range(x.shape[0])]
    
    def _back_prop(self):
        synapse_update_ih = {
            'weights': np.zeros_like(self.synapse_ih['weights']),
            'biases': np.zeros_like(self.synapse_ih['biases'])
        }
        synapse_update_hh = {
            'weights': np.zeros_like(self.synapse_hh['weights'])
        }
        synapse_update_ho = {
            'weights': np.zeros_like(self.synapse_ho['weights']),
            'biases': np.zeros_like(self.synapse_ho['biases'])
        }
        
        delta_hh = None
        delta_next_hh = np.zeros((self.batch_size, self.hidden_dim))
        
        for index in range(len(self.delta_ho)):
            synapse_update_ho['weights'] +=  np.dot(self.activation_o.function(self.layers['output'][-index-1]).T, np.atleast_2d(self.delta_ho[-index-1]))
            synapse_update_ho['biases'] +=  np.atleast_2d(self.delta_ho[-index].sum(axis=0))
            
            delta_hh = np.multiply((np.dot(delta_next_hh, self.synapse_hh['weights'].T) + 
                         np.dot(self.delta_ho[-index-1], self.synapse_ho['weights'].T)), self.activation_h.derivative(self.layers['hidden'][-index-1]))
            
            synapse_update_hh['weights'] += np.dot(self.activation_h.function(self.layers['hidden'][-index-2]).T, np.atleast_2d(delta_hh))
            
            synapse_update_ih['weights'] += np.dot(self.layers['input'][-index-1].T, np.atleast_2d(delta_hh))
            synapse_update_ih['biases'] += np.atleast_2d(delta_hh.sum(axis=0))
            delta_next_hh = delta_hh
        
        self.synapse_ih['weights'] +=  self.lrate * (synapse_update_ih['weights']/self.batch_size)
        self.synapse_hh['weights'] +=  self.lrate * (synapse_update_hh['weights']/self.batch_size)
        self.synapse_ho['weights'] +=  self.lrate * (synapse_update_ho['weights']/self.batch_size)
        
        self.synapse_ih['biases'] +=  self.lrate * (synapse_update_ih['biases']/self.batch_size)
        self.synapse_ho['biases'] +=  self.lrate * (synapse_update_ho['biases']/self.batch_size)
    

## Examples

#### Binary Addition

In [304]:
int2binary_8bit = {}

binary = np.unpackbits(np.array([range(2**8)],dtype=np.uint8).T,axis=1)
for i in range(2**8):
    int2binary_8bit[i] = binary[i]

int2binary_16bit = {}

def binary16(x):
    num = np.zeros(16)
    for digit, value in enumerate('{0:016b}'.format(x)):
        num[digit] = value
    return np.array(num, dtype='int')

for i in range(2**16):
    int2binary_16bit[i] = binary16(i)

def binary2int(x):
    return np.array([((2**logit)*bit) for logit, bit in enumerate(x[-1::-1])]).sum()

def binary_generator_8bit():
    for _ in range(5):
        int_num1 = np.random.randint(2**8/2)
        int_num2 = np.random.randint(2**8/2)
        a = int2binary_8bit[int_num1][::-1]
        b = int2binary_8bit[int_num2][::-1]
        c = int2binary_8bit[int_num1 + int_num2][::-1]
        yield {'input': np.c_[a,b], 'target': c}

def binary_generator_16bit():
    for _ in range(5):
        int_num1 = np.random.randint(2**16/2)
        int_num2 = np.random.randint(2**16/2)
        a = int2binary_16bit[int_num1][::-1]
        b = int2binary_16bit[int_num2][::-1]
        c = int2binary_16bit[int_num1 + int_num2][::-1]
        yield {'input': np.c_[a,b], 'target': c}
        
def evaluate(input_data, target, prediction):
    p = np.zeros(len(prediction))
    for index, value in enumerate(reversed(prediction)):
        p[index] = value
    
    a = input_data[::-1,0]
    b = input_data[::-1,1]
    t = target[::-1]
    print(('Addition problem:\n {0} ({1})\n+{2} ({3})\n'
           '-------------------\n'
           ' {4} ({5}) (Target)\n {6} ({7}) (Prediction)\n\n').format(a, binary2int(a),
                                                                      b, binary2int(b),
                                                                      t, binary2int(t), 
                                                                      np.array(np.round(p), dtype='int'), 
                                                                      binary2int(np.array(np.round(p), dtype='int'))))


In [305]:
rnn_object = Rnn(input_dim=2, hidden_dim=16, output_dim=1)

rnn_object.train(train_generator=binary_generator_8bit,
                 validation_generator=binary_generator_8bit,
                 epochs=2000,
                 batch_size=1,
                 lrate=0.1)

rnn_object.test(test_generator=binary_generator_8bit,
                evaluate=evaluate)

rnn_object.test(test_generator=binary_generator_16bit,
                evaluate=evaluate)

TRAINING PHASE:


Epoch: 0	 Train Error: 0.66	 Validation Error: 0.62
Epoch: 100	 Train Error: 0.59	 Validation Error: 0.62
Epoch: 200	 Train Error: 0.63	 Validation Error: 0.60
Epoch: 300	 Train Error: 0.60	 Validation Error: 0.62
Epoch: 400	 Train Error: 0.61	 Validation Error: 0.61
Epoch: 500	 Train Error: 0.61	 Validation Error: 0.61
Epoch: 600	 Train Error: 0.56	 Validation Error: 0.66
Epoch: 700	 Train Error: 0.58	 Validation Error: 0.60
Epoch: 800	 Train Error: 0.62	 Validation Error: 0.58
Epoch: 900	 Train Error: 0.58	 Validation Error: 0.59
Epoch: 1000	 Train Error: 0.59	 Validation Error: 0.60
Epoch: 1100	 Train Error: 0.56	 Validation Error: 0.56
Epoch: 1200	 Train Error: 0.42	 Validation Error: 0.45
Epoch: 1300	 Train Error: 0.24	 Validation Error: 0.24
Epoch: 1400	 Train Error: 0.14	 Validation Error: 0.12
Epoch: 1500	 Train Error: 0.08	 Validation Error: 0.08
Epoch: 1600	 Train Error: 0.08	 Validation Error: 0.07
Epoch: 1700	 Train Error: 0.05	 Validation Error: 0.06
Epoc