In [47]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

In [48]:
def softmax(s):
    """
    Implementation of the softmax activation function

    Args:
        s: an 1xd vector of a classifier's outputs

    Returns:
        An 1xd vector with the results of softmax given the input
        vector s.
    """
    exponents = np.exp(s - np.max(s, axis = 0)) # Max subtraction for numerical stability
    output_exp_sum = np.sum(exponents, axis = 0)
    p = exponents / output_exp_sum
    return p

In [116]:
class RNN:
    """
    Implementation of a simple RNN.
    
    Attributes:
        k: dimensionality of input
        m: Hidden state dimensionality
        eta: learning rate initial value
        seq_length: Length of input sequences used during training
    """
    
    
    def __init__(self, k, m, seq_length = 25,  eta = 0.01, sig = 0.01):
        '''
        Args:
            k: dimensionality of input
            m: Hidden state dimensionality
            seq_length: Length of input sequences used during training
            eta: learning rate initial value
            sig: standard deviation of normal distribution used to init-
                ialize the weights
        '''
        # Initialize hyperparameters
        self.m = m
        self.eta = eta
        self.seq_length = seq_length
        
        # Initialize bias vectors
        self.b = np.zeros((m, 1))
        self.c = np.zeros((k, 1))
        # Initialize weight matrices
        
        self.U = np.random.randn(m, k) * sig
        self.W = np.random.randn(m, m) * sig
        self.V = np.random.randn(k, m) * sig
        
    def synthesize_seq(self, h0, x0, n):
        """
        Synthesizes a sequence of characters
        
        Args:
         h0: Hidden state at time 0.
         x0: First dummy input to RNN.
         n: Length of sequence to generate.
         
        """
        synthesized_seq = []
        h_t = h0
        x_t = x0
        
        for i in range(n):
            a_t = self.W.dot(h_t) + self.U.dot(x_t) + self.b
            h_t = np.tanh(a_t)

            o_t = self.V.dot(h_t) + self.c
            p_t = softmax(o_t)
            
            #sample character based on softmax output and store it
            sampled_char = np.random.choice(list(range(self.V.shape[0])), p = p_t.flatten())
            synthesized_seq.append(sampled_char)
        
        return synthesized_seq
    
    def cross_entropy_loss(self, h0, X, Y):
        """
        Calculates the cross entropy loss
        """
        log_X = np.multiply(Y , self.forwardPass(h0, X)[0]).sum(axis=0)
        log_X[log_X == 0] = np.finfo(float).eps
        return -np.log(log_X)

    def computeLoss(self, h0, X, Y):
        """
        Computes the loss of the network given a batch of data.
        
        Args:
            h0: Initial hidden state
            X_batch: NxD matrix with N data sample inputs
            Y_batch: NxD matrix with N data sample outputs
        
        Returns:
            A scalar float value corresponding to the loss.
        """        
        return np.sum(self.cross_entropy_loss(h0, X, Y))

    
    def forwardPass(self, h0, X):
        """
            Performs the forward pass for each timestep and returns
            the probability of each word in each timestep
            
            Args:
                h0: Initial hidden state
                X: Input matrix
                
            Returns:
                A matrix with the probability of each word in each timestep.
        """
        T = X.shape[1]
        P = np.zeros((X.shape[0], T))
        O = np.zeros((X.shape[0], T))
        H = np.zeros((self.m, T))
        h_t = h0
        for i in range(T):
            a_t = self.W.dot(h_t) + self.U.dot(X[:,i].reshape(-1, 1)) + self.b
            h_t = np.tanh(a_t)
            H[:,i] = h_t.flatten()
            O[:,i] = self.V.dot(h_t).flatten() + self.c.flatten()
            P[:,i] = softmax(O[:,i].reshape(1, -1)).reshape(1, -1)
        return P, O, H
    
    def backwardPass(self, X, Y, P, O, H):
        # Initialize gradients to zero matrices
        grad_U = np.zeros(self.U.shape)
        grad_W = np.zeros(self.W.shape)
        grad_V = np.zeros(self.V.shape)
        grad_b = np.zeros(self.b.shape)
        grad_c = np.zeros(self.c.shape)
        grad_h_next = np.zeros((self.m, 1))
        
        # Get total number of timesteps
        T = Y.shape[1]

        # For each timestep
        for t in range(T):
            g = P[:,t] - Y[:,t]
            # Update gradients
            grad_c[:, 0] += g
            grad_V += np.outer(g, H[:,t])
            
            grad_h = (1 - H[:, t] ** 2) * (self.V.T.dot(g) + grad_h_next[:, 0])
            grad_U += np.outer(grad_h, X[:,t])
            grad_W += np.outer(grad_h, H[:,t])
            grad_b[:,0] += grad_h

            # Next hidden state gradient
            grad_h_next = np.dot(self.W.T, grad_h).reshape(1, -1)
            
       
        return grad_W, grad_U, grad_V, grad_b, grad_c 

    def compute_grad_num_slow(self, X_batch, Y_batch, h0,  h = 1e-5):
        '''Centered difference gradient'''
        # Initialize all gradients to zero
        grad_W = np.zeros(self.W.shape)
        grad_U = np.zeros(self.U.shape) 
        grad_V = np.zeros(self.V.shape) 
        grad_b = np.zeros(self.b.shape)
        grad_c = np.zeros(self.c.shape)

       #['dW', 'dU', 'dV', 'db', 'dc']
        # Gradient w.r.t W
        for j in tqdm(range(self.W.shape[0])):
            for k in range(self.W.shape[1]):
                self.W[j, k] -= h
                c1 = self.computeLoss(h0, X_batch, Y_batch)
                self.W[j, k] += 2 * h
                c2 = self.computeLoss(h0, X_batch, Y_batch)
                self.W[j, k] -= h
                grad_W[j, k] = (c2-c1) / (2 * h)
       
        
         # Gradient w.r.t U
        for j in tqdm(range(self.U.shape[0])):
            for k in range(self.U.shape[1]):
                self.U[j, k] -= h
                c1 = self.computeLoss(h0, X_batch, Y_batch)
                self.U[j, k] += 2 * h
                c2 = self.computeLoss(h0, X_batch, Y_batch)
                self.U[j, k] -= h
                grad_U[j, k] = (c2-c1) / (2 * h)
       
         # Gradient w.r.t V
        for j in tqdm(range(self.V.shape[0])):
            for k in range(self.V.shape[1]):
                self.V[j, k] -= h
                c1 = self.computeLoss(h0, X_batch, Y_batch)
                self.V[j, k] += 2 * h
                c2 = self.computeLoss(h0, X_batch, Y_batch)
                self.V[j, k] -= h
                grad_V[j, k] = (c2-c1) / (2 * h)
       
        # Gradient w.r.t b
        for j in tqdm(range(self.b.shape[0])):
            self.b[j] -= h
            c1 = self.computeLoss(h0, X_batch, Y_batch)
            self.b[j] += 2 * h
            c2 = self.computeLoss(h0, X_batch, Y_batch)
            self.b[j] -= h
            grad_b[j] = (c2-c1) / (2 * h)
       
        # Gradient w.r.t c
        for j in tqdm(range(self.c.shape[0])):
            self.c[j] -= h
            c1 = self.computeLoss(h0, X_batch, Y_batch)
            self.c[j] += 2 * h
            c2 = self.computeLoss(h0, X_batch, Y_batch)
            self.c[j] -= h
            grad_c[j] = (c2-c1) / (2 * h)
       
    
        return grad_W, grad_U, grad_V, grad_b, grad_c 

In [117]:
def onehot_encode(chars, char_dictionary):
    """
    Encodes a string of characters to a matrix with one hot encoding.
    
    Args:
        chars: The input string
        char_dictionary: A dictionary that maps each possible character
            of the vocabulary being used to a unique index.
        
    Returns: 
        A NxM matrix where N is the number of distinct characters in the
        vocabulary and M is the number of characters in the string.
    """
    N = len(char_dictionary.keys())
    M = len(chars)
    encoded_string = np.zeros((N, M))
    for i, char in enumerate(chars):
        unique_index = char_dictionary[char]
        encoded_string[unique_index, i] = 1
    return encoded_string        

In [129]:
def getRelativeErrors(grad1, grad2):
    """
    Computes the relative errors of grad_1 and grad_2 gradients
    """
    abs_diff = np.absolute(grad1 - grad2) 
    abs_sum = np.absolute(grad1) + np.absolute(grad2)
    max_elems = np.where(abs_sum > np.finfo(float).eps, abs_sum, np.finfo(float).eps)
    relativeErrors = abs_diff / max_elems
    return relativeErrors

###  Read in the data

In [118]:
with open('goblet_book.txt', 'r') as fileobj:
    data = fileobj.read()

In [119]:
# Get dictionary of unique characters in the book
characters = set(data)
char_dictionary = dict([ (elem, i) for i, elem in enumerate(characters) ])
inv_char_dictionary = {v: k for k, v in char_dictionary.items()}
voc_size = len(char_dictionary)

### Extract input and output data using one-hot encoding

In [120]:
seq_length = 25
X_chars = data[:seq_length]
Y_chars = data[1:seq_length + 1]
X = onehot_encode(X_chars, char_dictionary)
Y = onehot_encode(Y_chars, char_dictionary) 

In [121]:
# Initialize dimensionality of hidden state
m = 5
# Initialize the initial hidden state to a zero vector
h0 = np.zeros((m, 1))

In [122]:
rnn_model = RNN(k = voc_size, m = m)

In [123]:
rnn_model.synthesize_seq(h0, X[:,0].reshape(-1, 1), 10)

[44, 40, 63, 38, 14, 12, 22, 38, 75, 2]

In [124]:
P, O, H = rnn_model.forwardPass(h0, X)

In [125]:
loss = rnn_model.computeLoss(h0, X, Y)

In [130]:
grad_W, grad_U, grad_V, grad_b, grad_c  = rnn_model.backwardPass(X, Y, P, O, H)

In [131]:
approx_grad_W, approx_grad_U, approx_grad_V, approx_grad_b, approx_grad_c = rnn_model.compute_grad_num_slow(X, Y, h0)

100%|███████████████████████████████████████████| 5/5 [00:00<00:00, 147.06it/s]
100%|████████████████████████████████████████████| 5/5 [00:00<00:00,  9.76it/s]
100%|█████████████████████████████████████████| 83/83 [00:00<00:00, 157.50it/s]
100%|███████████████████████████████████████████| 5/5 [00:00<00:00, 625.18it/s]
100%|█████████████████████████████████████████| 83/83 [00:00<00:00, 764.97it/s]


In [132]:
errorsW = getRelativeErrors(grad_W, approx_grad_W)
errorsU = getRelativeErrors(grad_U, approx_grad_U)
errorsV = getRelativeErrors(grad_V, approx_grad_V)
errorsb = getRelativeErrors(grad_b, approx_grad_b)
errorsc = getRelativeErrors(grad_c, approx_grad_c)
print(np.max(errorsW))
print(np.max(errorsU))
print(np.max(errorsV))
print(np.max(errorsb))
print(np.max(errorsc))

1.0
1.0
1.0
1.0
1.0
