# Building a Recurrent Neural Network (RNN) from Scratch in NumPy for Text Generation

**Author** Niklas Wicklund

**Last revised** 07-04-2024


In this project, I delve into the realm of natural language processing (NLP) by constructing a Recurrent Neural Network (RNN) entirely from scratch using NumPy. My goal is to train this RNN on a given text dataset (in this case, a sample from Harry Potter) and then use it to generate new text that mimics the style and structure of the original text

#### Why from scratch?

While there are powerful deep learning libraries available like TensorFlow and PyTorch, understanding the mechanics of an RNN at this level can be immensely beneficial. Building an RNN from scratch offers a deep understanding of its inner workings. By implementing the core functionalities myself, I gain insights into the fundamental concepts of sequence modeling, backpropagation, and gradient descent optimization. 

_This project was done as an assignment in one of my courses at KTH Royal Institute of Technology_


### Imports

In [3]:
import numpy as np
import pandas as pd
from tqdm import tqdm
from tqdm import trange

### Utility functions

In [2]:
def process_book():
    # Read all characters in the txt file goblet_book.txt and store in a python list, we remove all newlines and tabs
    with open('goblet_book.txt', 'r') as file:
        data = file.read()

    # Create a list of all unique characters in the text
    book_chars = list(set(data))
    K = len(book_chars)
    # Create a dictionary that maps each character to a unique index
    char_to_ind = { char:ind for ind, char in enumerate(book_chars) }
    # Create a dictionary that maps each unique index back to a character
    ind_to_char = { ind:char for ind, char in enumerate(book_chars) }
    return [*data], book_chars, char_to_ind, ind_to_char, K

In [8]:
def one_hot(x, K):
    # using np.eye
    return np.eye(K)[x].T

### RNN class

In [4]:
class RNN:
    def __init__(self, m, K,char_to_ind,ind_to_char):
        self.m = m
        self.K = K
        
        self.char_to_ind = char_to_ind
        self.ind_to_char = ind_to_char

        # Initialize the parameters
        self.initalize_parameters(m, K)

    def initalize_parameters(self, m, K):
        """
        Initialize the parameters of the RNN model.
        Inputs:
            m: Number of hidden units
            K: Number of unique characters in the text
        """

        sig = 0.05
        # Initialize the bias parameters and store in a dictionary
        self.params = {}
        self.params['b'] = np.zeros((m, 1))
        self.params['c'] = np.zeros((K, 1))
        # Initialize the weight parameters and store in a dictionary as random numbers
        self.params['U'] = np.random.randn(m, K) * sig
        self.params['W'] = np.random.randn(m, m) * sig
        self.params['V'] = np.random.randn(K, m) * sig
    
    def get_book_sequence(self,book_data,seq_length,e):
        """
        Generate a sequence of characters from the book data.
        Inputs:
            book_data: List of characters in the book
            seq_length: Length of the sequence
            e: Index of the sequence
        Outputs:
            X: Sequence of input vectors (one hot encoded) (K x seq_length)
            Y: Sequence of output vectors (one hot encoded) (K x seq_length)
        """
        X_chars = book_data[e:seq_length + e]
        Y_chars = book_data[e+1:seq_length+e+1]
            

        # Convert the labelled sequence to one hot encoded vectors with shape (K x seq_length)
        X = np.eye(self.K,dtype=int)[[self.char_to_ind[char] for char in X_chars]].T
        Y = np.eye(self.K,dtype=int)[[self.char_to_ind[char] for char in Y_chars]].T
        return X,Y

    def softmax(self, o):
        eps = np.finfo(float).eps
        p = np.exp(o) / (np.sum(np.exp(o)) + eps)
        return p
    
    def l_cross(self, Y, p):
        """
        Compute the cross entropy loss between the true labels Y and the predicted probabilities P.
        Inputs:
            Y: True labels (K x n)
            P: Predicted probabilities (K x n)
        Outputs:
            loss: Cross entropy loss
        """
        eps = np.finfo(float).eps
        loss = -np.sum(Y*np.log(p + eps))
        return loss
    def compute_loss(self, X,Y,h0):
       # Forward pass
        P,_ = self.forward_pass(h0,X,Y)
        # Compute the loss
        loss = self.l_cross(Y,P)
        return loss

    
    def compute_gradients(self, X, Y, P):
        """
        Perform a backward pass of the RNN model.
        Inputs:
            X: Sequence of input vectors (one hot encoded) (K x n)
            Y: Sequence of output vectors (one hot encoded) (K x n)
            P: Softmax of the output vectors (K x n)
        Outputs:
            gradients: Dictionary containing the gradients of the parameters
        """
        n = X.shape[1]
        grads = {}
        grads['dU'] = np.zeros((self.m, self.K))
        grads['dW'] = np.zeros((self.m, self.m))
        grads['dV'] = np.zeros((self.K, self.m))
        grads['db'] = np.zeros((self.m, 1))
        grads['dc'] = np.zeros((self.K, 1))

        do = [None]*n
        dh = [None]*n
        da = [None]*n

        # The last layer is different and is done before the loop
        do[-1] = (-(Y[:,-1] - P[:,-1]).T).reshape(-1,1)
        dh[-1] = (self.params['V'].T @ do[-1]).reshape(-1,1)
        c = (1 - np.tanh(self.forward['a'][-1])**2).reshape(-1)
        a = np.diag(c)
        b = dh[-1]

        # a is shape (m, m), b is shape (m, 1)
        da[-1] = (a @ b).reshape(-1,1)
        
        # Compute the gradients of h and a
        for t in reversed(range(0,n)):
            do[t] = (-(Y[:,t] - P[:,t]).T).reshape(-1,1)
            assert(do[t].shape == (self.K, 1))

            grads['dV'] += do[t] @ self.forward['h'][t].T
            grads['dc'] += do[t]
            
            # Compute the gradients of h and a
            if t == n-1:
                dh[t] = self.params['V'].T @ do[t]
            else:
                dh[t] = self.params['V'].T @ do[t] + self.params['W'].T @ da[t+1]

            da[t] = np.multiply(dh[t], (1 - np.tanh(self.forward['a'][t])**2).reshape(-1,1))

            # Compute the gradients of U and W
            grads['dU'] += np.matmul(da[t], X[:,t].reshape(-1,1).T)
            grads['dW'] += np.matmul(da[t], self.forward['h'][t-1].T)

            grads['db'] += da[t]
            

        # Clip the gradients, to prevent exploding gradients
        for grad in grads:
            grads[grad] = np.clip(grads[grad], -5, 5)
        return grads

    def forward_step(self, h_prev, X):
        """
        Perform a forward step of the RNN model.
        Inputs:
            h_prev: Previous hidden state (m x 1)
            X: Input vector (one hot encoded) (K x 1)
        Outputs:
            a: Activation (m x 1)
            h: Hidden state (m x 1)
            o: Output (K x 1)
            p: Softmax of the output (K x 1)
        """
        a = self.params['W'] @ h_prev + (self.params['U']@X).reshape(self.m,1) + self.params['b']
        h = np.tanh(a)
        o = self.params['V'] @ h + self.params['c']
        p = self.softmax(o)

        # Reshape p to be a column vector (K x 1)
        return a, h, o, p.reshape(-1)
    
    
    
    def forward_pass(self, h0, X, Y):
        """
        Perform a forward pass of the RNN model.
        Inputs:
            h0: Initial hidden state (m x 1)
            X: Sequence of input vectors (one hot encoded) (K x n)
            Y: Sequence of output vectors (one hot encoded) (K x n)
        Outputs:
            P: Softmax of the output vectors (K x n)
            forward: Dictionary containing the intermediate values from the forward pass
        """
        n = X.shape[1]
        forward = {}
        forward['h'] = [None]*(n+1)
        forward['a'] = [None]*n
        forward['o'] = [None]*n
        forward['p'] = [None]*n

        h_prev= h0
        for t in range(n):
            forward['a'][t], forward['h'][t], forward['o'][t], forward['p'][t] = self.forward_step(h_prev, X[:,t])
            h_prev = forward['h'][t]
        forward['h'][-1] = h0

        P = np.array(forward['p']).T
        
        forward['loss'] = self.l_cross(Y,P)
        self.forward = forward
        return P, forward

    def train(self,book_data, h0, eta, n_epochs, seq_length,optimizer = 'adagrad'):
        """
        Train the RNN model using the book data.
        Inputs:
            book_data: List of characters in the book
            h0: Initial hidden state
            eta: Learning rate
            n_epochs: Number of epochs
            seq_length: Length of the sequence
        Outputs:
            params: Dictionary containing the trained parameters
            smooth_loss: List of the smoothed loss values
            best_model: Dictionary containing the best model parameters
            df: Dataframe containing the relevant data during training
        """
        with open('stats.txt', 'a') as f:
            #Print the current training settings to the file
            f.write("\n\n### New training session ###\n\n")
            f.write(f"K: {self.K}, Length of book: {len(book_data)}\n")
            f.write(f"eta: {eta}, n_epochs: {n_epochs}, seq_length: {seq_length}\n")
        
        ada_params = {
            'U': np.zeros_like(self.params['U']),
            'W': np.zeros_like(self.params['W']),
            'V': np.zeros_like(self.params['V']),
            'b': np.zeros_like(self.params['b']),
            'c': np.zeros_like(self.params['c'])
        }
        eps = np.finfo(float).eps

        hprev = np.zeros((self.m, 1))

        # Make use of smooth_loss to compute the loss
        smooth_loss = None
        smooth_loss_epoch = []

        #Define best dictionary to store the best parameters, from start have same parameters as self.params but copies
        best_model = {
            'loss': np.inf,
            'params':{}
        }
        for key in self.params:
            best_model['params'][key] = self.params[key].copy()

        #Define a dataframe that stores relevant data during training
        df = pd.DataFrame(columns=['epoch','update_step','e', 'smooth_loss','loss'])
        update_step = 0
        for epoch in range(n_epochs):
            hprev = np.zeros((self.m, 1))
            # For each epoch we should iterate the entire book
            smooth_loss_list = []
            # for e loop that iterates over the book
            with trange(0, len(book_data), seq_length) as t:
                for e in t:
                    # Check if e is too large
                    if e + seq_length + 1 >= len(book_data):
                        break

                    X,Y = self.get_book_sequence(book_data, seq_length,e)
                    p, forward = self.forward_pass(hprev, X, Y)

                    # Save last hidden state to hprev (remeber that h is 1 longer so the last is the initial hidden state, need to save the second last)
                    hprev = forward['h'][-2]
                    loss = forward['loss']
                    if(smooth_loss == None):
                        smooth_loss = loss
                    else:
                        smooth_loss = 0.999*smooth_loss + 0.001*loss

                    if loss < best_model['loss']:
                        for key in self.params:
                            best_model['params'][key] = self.params[key].copy()
                        best_model['loss'] = loss
                    
                    smooth_loss_list.append(smooth_loss)
                    gradients = self.compute_gradients(X, Y, p)

                    if optimizer == 'adagrad':
                        for key in self.params:
                            # Adaboost
                            ada_params[key] += gradients['d'+key]**2
                            self.params[key] -= (eta*gradients['d'+key])/(np.sqrt(ada_params[key]) + eps)
                    else:
                        # Test to use Adam instead
                        beta1 = 0.9

                        for key in self.params:
                            ada_params[key] = beta1*ada_params[key] + (1-beta1)*gradients['d'+key]**2
                            self.params[key] -= (eta*gradients['d'+key])/(np.sqrt(ada_params[key]) + eps)

                    if(update_step % 100 == 0):
                        t.set_description(f'Smooth loss <{smooth_loss}>')
                    #Synthesize a sample sequence from the current model every 10000 update steps
                    if(update_step % 10000 == 0):
                        #Save to dataframe
                        row = pd.DataFrame([[epoch, update_step, e, smooth_loss,loss]], columns=['epoch','update_step','e','smooth_loss','loss'])
                        df = pd.concat([df,row], axis=0, ignore_index=True)
                        #t.set_description(f'Smooth loss <{smooth_loss}>')
                        # Print the stats continuously to a file
                        Y = self.synthesize(hprev, X[:,0], 200)
                        with open('stats.txt', 'a') as f:
                            #Start by a seperator
                            f.write("\n\n")
                            #Write epoch
                            f.write(f"Epoch: {epoch}\n")
                            f.write(f"Update Step: {update_step}\n")
                            f.write(f"Smooth Loss: {smooth_loss}\n")
                            string = self.convert_to_character_sequence(Y)
                            print("Update step: ", update_step)
                            print(string)
                            f.write(f"Sample Sequence: {string}\n")
                    update_step +=1
            smooth_loss_epoch.append(smooth_loss_list)
            
        return self.params,best_model, smooth_loss,smooth_loss_epoch,df
    # Function to generate a sequence of characters using the RNN model
    def synthesize(self, h0, x0, n):
        """
        Synthesize a sequence of characters using the RNN model.
        Inputs:
            h0: Initial hidden state
            x0: Initial input vector  (one hot encoded) (K x 1)
            n: Number of characters to synthesize
        Outputs:
            Y: One hot encoded sequence of characters (n x K)
        """

        Y = np.zeros((self.K, n))
        h = h0
        x = x0
        for t in range(n):
            
            a,h,o,p = self.forward_step(h, x)

            # Randomly select the next character from our probability p
            
            index = np.random.choice(range(self.K), p = p.ravel())
            x = np.zeros((self.K))# np.eye(self.K)[index]
            x[index] = 1
            # One hot encode the character chosen, set row index to 1
            Y[:,t] = np.eye(self.K)[index]
        return Y
    
    def convert_to_character_sequence(self,Y):
        """
        Convert a sequence of one hot encoded characters to a sequence of characters.
        Inputs:
            Y: One hot encoded sequence of characters (n x K)
        Outputs:
            sequence: Sequence of characters
        """
        sequence = ""
        for i in range(Y.shape[1]):
            sequence += self.ind_to_char[np.argmax(Y[:,i])]
        return sequence
    
    def set_params(self, params):
        """
        Set the parameters of the model.
        Inputs:
            params: Dictionary containing the parameters
        """
        self.params = params
    def compute_grads_num_slow(self,X,Y,h=1e-5):
        # Numerical gradient calculation using finite difference
        # Slow implementation
        # Initialize gradients
        gradients = {}
        for key in self.params:
            gradients['d'+key] = np.zeros(self.params[key].shape)
        
        # Set hprev to zero
        hprev = np.zeros((self.m,1))
        for key in self.params:
            for i in tqdm(range(self.params[key].size)):
                
                old = self.params[key].flat[i] # Store the original value of the parameter
                self.params[key].flat[i] =  old - h
                l1 = self.compute_loss(X,Y,hprev)
                self.params[key].flat[i] = old + h
                l2 = self.compute_loss(X,Y,hprev)
                gradients['d'+key].flat[i] = (l2 - l1)/(2*h)
                self.params[key].flat[i] = old # Reset the value of the parameter
        return gradients
    
    def test_gradients(self, X, Y):
        """
        Test the gradients computed using the function compute_gradients
        Inputs:
            X: Input sequence of characters
            Y: Output sequence of characters
            hprev: Initial hidden state
        """
        hprev = np.zeros((self.m,1))
        
        # Forward pass
        P, forward = self.forward_pass(hprev, X, Y)
        # Compute gradients using backprop
        gradients_a = self.compute_gradients(X, Y, P)
        # Compute gradients using numerical gradient
        gradients_n = self.compute_grads_num_slow(X, Y,h=1e-4)

        df = pd.DataFrame(columns=['gradient','absolute error ok (%)','absolute worst diff','relative error ok (%)', 'relative worst diff','average relative error'])

        threshold = 1e-5
        # Compute the relative error
        for key in gradients_a:
            a = gradients_a[key]
            n = gradients_n[key]
            # Check if any NaN values seperately for numerical and analytical gradients
            if np.isnan(a).any():
                print("!! Analytical gradient has NaN values")
            if np.isnan(n).any():
                print("!! Numerical gradient has NaN values")

            # Compute the absolute and relative error
            abs_error = np.abs(a-n)
            # Compute how many datapoints are below the threshold as an accuracy measure
            abs_acc = np.mean(abs_error < threshold)*100
            abs_worst = np.max(abs_error)
            #Same as above but for relative error
            rel_error = np.abs(a-n)/np.maximum(1e-6,np.abs(a)+np.abs(n))
            rel_acc = np.mean(rel_error < threshold)*100
            rel_worst = np.max(rel_error)
            #Average relative error
            avg_rel_error = np.mean(rel_error)

            row = pd.DataFrame([{'gradient':key,'absolute error ok (%)':abs_acc,'absolute worst diff': abs_worst,'relative error ok (%)':rel_acc,'relative worst diff':rel_worst,'average relative error':avg_rel_error}])
            df = pd.concat([df,row], axis=0, ignore_index=True)

        return df

### Training

In [5]:
eta = 0.1
seq_length = 25
m = 100

# Initialize the hidden state to zero
h0 = np.zeros((m, 1))

book_data,book_chars, char_to_ind, ind_to_char,K = process_book()

In [None]:
rnn = RNN(m, K, char_to_ind, ind_to_char)
# Train the RNN model and get the trained parameters
params,best_model, smooth_loss,smooth_loss_epochs,stats = rnn.train(book_data, h0, eta, 10, seq_length,optimizer='adagrad')

### Synthesize text using the best model

In [53]:
rnn = RNN(m, K, char_to_ind, ind_to_char)
rnn.set_params(best_model['params'])

loss = best_model['loss']
print("Best loss: ",loss)
# Generate a sequence of characters
h0 = np.zeros((m, 1))
x0 = one_hot(char_to_ind['T'],K)
Y = rnn.synthesize(h0, x0, 1000)
s = rnn.convert_to_character_sequence(Y)
print(s)

Best loss:  12.1730772637276

"An, said. Cislipidate asofe.
"Now.
"We've in undonted," croble ot shitaset; she wand was beenly hass comesnded task she lingonited sotht you?" ."
"Whot Harry.
"Khtichr scraars giving as Ron, his wancer.  Ouldluble," shanrinary, Hond, a bullared knowly, Deantant more.  See I you, face unice," saw Bancy was glince, at the Snckoose, and their werudleds make here a cercy was looked conjurrighe; to mu. They," said Kn his the snirting yeljom.  Ibach what said, leach mack Drigh.
That sudged a prown. "Peogly, caumblac lakusce, Stael "He'd if then hirspingt keent.
"Salk sharl and undely ancelfe me Aice, oven fate as have Bar!" searighit lober to for treet.
"A sermione."
FYou'velosed eckine?" sere she wall spoolly one itch macinjt ouch ank Moody's ale to with Cris, said, Summanand when ho now an off hlet, villy?"
"Krumert-insisby Be to brignony know, a it mime squorking to be," said Mr.
"Maged witch arry these I figy," said Kritch his moms?" said How uselved cloull

### Result after 10 epochs of training

Best loss: 12.1730772637276


_"An, said. Cislipidate asofe.
"Now.
"We've in undonted," croble ot shitaset; she wand was beenly hass comesnded task she lingonited sotht you?" ."
"Whot Harry.
"Khtichr scraars giving as Ron, his wancer.  Ouldluble," shanrinary, Hond, a bullared knowly, Deantant more.  See I you, face unice," saw Bancy was glince, at the Snckoose, and their werudleds make here a cercy was looked conjurrighe; to mu. They," said Kn his the snirting yeljom.  Ibach what said, leach mack Drigh.
That sudged a prown. "Peogly, caumblac lakusce, Stael "He'd if then hirspingt keent.
"Salk sharl and undely ancelfe me Aice, oven fate as have Bar!" searighit lober to for treet.
"A sermione."
FYou'velosed eckine?" sere she wall spoolly one itch macinjt ouch ank Moody's ale to with Cris, said, Summanand when ho now an off hlet, villy?"
"Krumert-insisby Be to brignony know, a it mime squorking to be," said Mr.
"Maged witch arry these I figy," said Kritch his moms?" said How uselved cloully, sit had fraster." "Velved_