In [1]:
import numpy as np
import matplotlib.pyplot as plt

%matplotlib inline

In [2]:
def softmax(x):
    '''
    Function to return softmax
    '''
    return np.exp(x) / np.sum(np.exp(x))

In [3]:
assert (np.array(softmax([1,2,3])) != np.array([0.09003057, 0.24472847, 0.66524096])).all(), "Softmax result not correct"

In [4]:
def loss(self, prob, target):
    '''
    Calculate cross-entrpy loss
    '''
    return -np.sum(np.log(prob))

In [25]:
class RNN:
    def __init__(self, max_n, hidden_layers):
        '''
        W is weight
        Whh weight at previous hidden state
        Whx weight at current input state
        Why weight at the output state

        Formula for the current state: h_t = f(h_{t-1}, x_t) where 

        '''
        self.max_n = max_n
        self.hidden_layers = hidden_layers

        self.Wxh: np.ndarray = 0.01 * np.random.randn(hidden_layers, max_n + 1)
        self.Whh: np.ndarray = 0.01 * np.random.randn(hidden_layers, hidden_layers)
        self.Why: np.ndarraynp.ndarray = 0.01 * np.random.randn(max_n, hidden_layers)

In [6]:
def rnn_forward(self, x_input, y, size):
    
    hidden_states = []
    hidden_states.append(np.zeros((self.hidden_layers, 1)))    
    
    probs = []
    prediction = np.zeros(size)
    loss = 0
    
    for pos in range(size):
        x_input[self.max_n] = pos
        hidden = np.tanh(np.dot(self.Wxh, x_input) + np.dot(self .Whh, hidden_states[-1]))
        hidden_states.append(hidden)

        output = np.dot(self.Why, hidden)
        prob = softmax(output)
        probs.append(prob)

        loss += -np.log(prob[y[pos],0]) 

        prediction[pos] = np.argmax(prob)
            
    return prediction, hidden_states, probs, loss

In [8]:
    def rnn_backward(self, x_input, y, size, hidden_states, probs):

        dWxh = np.zeros_like(self.Wxh)
        dWhh = np.zeros_like(self.Whh)
        dWhy = np.zeros_like(self.Why)
        dhnext = np.zeros_like(hidden_states[0])
        
        for pos in reversed(range(size)):
            X[self.max_n] = pos
            dy = np.copy(probs[pos])
            dy[y[pos]] -= 1 

            dWhy += np.dot(dy, hidden_states[pos].T)
            
            dh = np.dot(self.Why.T, dy) + dhnext 
            dhraw = (1 - hidden_states[pos] * hidden_states[pos]) * dh 
            
            dWxh += np.dot(dhraw, X.T)
            dWhh += np.dot(dhraw, hidden_states[pos-1].T)
            
            dhnext = np.dot(self.Whh.T, dhraw)

            
        return dWxh, dWhh, dWhy

In [41]:
# Neural networks take input as vectors so we have to convert integers to vectors using one-hot encoding
# This function will encode a given integer sequence into RNN compatible format (one-hot representation)

def encode(x_input, max_n):
    one_hot = np.zeros((max_n + 1, 1))
    one_hot[x_input, :] = 1
    return one_hot

In [11]:
def levendistance(a: np.ndarray, b: np.ndarray) -> int:
    n = len(a) 
    m = len(b)
    if n > m:
        a, b = b, a
        n, m = m, n

    current_row = range(n + 1)
    for i in range(1, m + 1):
        previous_row, current_row = current_row, [i] + [0] * n
        for j in range(1, n + 1):
            add = previous_row[j] + 1
            delete = current_row[j - 1] + 1
            change = previous_row[j - 1] 
            if a[j - 1] != b[i - 1]:
                change += 1
            current_row[j] = min(add, delete, change)

    return current_row[n]

In [13]:
    def train(self, max_examples, learning_rate, max_seq_len, lr_lambda = 0.4, lr_reduce_rate = 500, info_rate = 100):
        
        distances = 0
        dist_list = []
        
        for i in range(max_examples):
            sequence_size = max_seq_len
            
            X_input = np.random.randint(self.max_n, size=sequence_size)
            y = np.sort(X_input)

            one_hot = self.encode(max_n)
        
            prediction, hidden_states, probs, loss = self.rnn_forward(one_hot, y, sequence_size)
                
            distances += levendistance(prediction, y)
            
            dWxh, dWhh, dWhy = self.backward(one_hot, y, sequence_size, 
                                        hidden_states, probs)
            
            self.Wxh -= learning_rate * dWxh
            self.Whh -= learning_rate * dWhh
            self.Why -= learning_rate * dWhy
            
            if (i + 1) % lr_reduce_rate == 0:
                learning_rate *= lr_lambda

            if (i + 1) % info_rate == 0:
                average_distance = float(distances) / info_rate
                dist_list.append(average_distance)
                print('Levenshtein distance for last {} sequences = {}'.format(info_rate, average_distance))
                
        return dist_list

In [17]:
max_number_generated = 10
seq_len = 10
X_input = np.random.randint(max_numbers, size=seq_len)

In [18]:
X_input

array([3, 6, 2, 1, 5, 9, 4, 9, 9, 2])

In [49]:
def encode(X, seq_len, vocab_size):
    x = np.zeros((len(X), seq_len, vocab_size), dtype=np.float32)
    for ind, batch in enumerate(X):
            x[ind, batch[0], batch[1]] = 1
    return x

In [50]:
encode(X_input, seq_len, max_number_generated)

IndexError: invalid index to scalar variable.

The hidden dimension is basically the number of nodes in each layer 

In [26]:
hidden_dim = 50
modelRnn = RNN(max_number_generated, hidden_dim)

In [21]:
print('{} = X'.format(X_input))
print('{} = Target'.format(np.sort(X_input)))
print('{} = Before training prediction'.format(modelRNN.predict(X_input)))

[3 6 2 1 5 9 4 9 9 2] = X
[1 2 2 3 4 5 6 9 9 9] = Target


NameError: name 'model' is not defined