# Recurrent Networks for character processing

In this notebook, we will be dealing with a vanilla implementation of a recurrent neural network based on Andrej Karpathy's simple "gist" code.

This code does not use pytorch but was rather intended to give you the absolute basic implementation of a RNN.

The idea of the code was to implement an RNN that is able to read a file of text and to learn the probabilities of characters following other characters from that corpus - a typical natural language processing (NLP) application with a long history.


## Reading data

In [1]:
import numpy as np

# this reads the data (corpus), builds a vocabulary and 
# dictionary to index the characters of the corpus
class DataReader:
    def __init__(self, path, seq_length):
        # uncomment below and comment the next two lines, 
        # if you don't want to use an input file
        # self.data = "In this notebook, we will be dealing with a vanilla implementation of a recurrent neural network based on Andrej Karpathy's simple gist code. This code does not use pytorch but was rather intended to give you the absolute basic implementation of a RNN. The idea of the code was to implement an RNN that is able to read a file of text and to learn the probabilities of characters following other characters from that corpus - a typical natural language processing (NLP) application with a long history."
        # read the file
        #self.fp = open(path, "r")
        #self.data = self.fp.read()
        self.data = "hello i want to learn"
        # make a list of all unique characters in the file
        chars = list(set(self.data))
        # create dictionaries that map each character to where
        # they appear in the corpus and vice versa
        self.char_to_ix = {ch:i for (i,ch) in enumerate(chars)}
        self.ix_to_char = {i:ch for (i,ch) in enumerate(chars)}
        # length of dataset
        self.data_size = len(self.data)
        # length of vocabulary (unique characters)
        self.vocab_size = len(chars)
        # current processing position
        self.pointer = 0
        # processing window
        self.seq_length = seq_length

    def next_batch(self):
        # for our current window
        input_start = self.pointer
        input_end = self.pointer + self.seq_length
        # take all characters in that window
        inputs = [self.char_to_ix[ch] for ch in self.data[input_start:input_end]]
        # ... and try to predict the NEXT character
        targets = [self.char_to_ix[ch] for ch in self.data[input_start+1:input_end+1]]
        # go to next window
        self.pointer += self.seq_length
        if self.pointer + self.seq_length + 1 >= self.data_size:
            # reset pointer
            self.pointer = 0
        return inputs, targets

    def just_started(self):
        return self.pointer == 0

    def close(self):
        self.fp.close()


## Barebone RNN class
Again, just using numpy calculations.

In [3]:
# bare-bones implementation of RNN
class RNN:
    # inputs: size of hidden layer, size of vocabulary,
    # window size, learning rate for gradient descent
    def __init__(self, hidden_size, vocab_size, seq_length, learning_rate):
        # hyper parameters
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.seq_length = seq_length
        self.learning_rate = learning_rate
        # initialization of model weight matrices
        self.U = np.random.uniform(-np.sqrt(1./vocab_size), np.sqrt(1./vocab_size), (hidden_size, vocab_size))
        self.V = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (vocab_size, hidden_size))
        self.W = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (hidden_size, hidden_size))
        self.b = np.zeros((hidden_size, 1)) # bias for hidden layer
        self.c = np.zeros((vocab_size, 1)) # bias for output
        
        # the original implementation from Andrej uses
        # ADAGRAD, which tracks the weight changes and
        # makes use of these additional matrices
        self.mU = np.zeros_like(self.U)
        self.mW = np.zeros_like(self.W)
        self.mV = np.zeros_like(self.V)
        self.mb = np.zeros_like(self.b)
        self.mc = np.zeros_like(self.c)

    # standard softmax implementation for probability output     
    def softmax(self, x):
        p = np.exp(x- np.max(x))
        return p / np.sum(p)
    
    # forward pass through the RNN 
    # given the previous state
    def forward(self, inputs, hprev):
            xs, hs, os, ycap = {}, {}, {}, {}
            hs[-1] = np.copy(hprev)
            for t in range(len(inputs)):
                xs[t] = np.zeros((self.vocab_size,1))
                # one hot encoding, 1-of-k
                xs[t][inputs[t]] = 1
                # hidden state of the RNN
                hs[t] = np.tanh(np.dot(self.U,xs[t]) + np.dot(self.W,hs[t-1]) + self.b)
                # non-normalized log probalities for the next characters
                os[t] = np.dot(self.V,hs[t]) + self.c
                # probabilities for the next character
                ycap[t] = self.softmax(os[t])
            return xs, hs, ycap
        
    # backprop through the network given all states and targets    
    def backward(self, xs, hs, ps, targets):
            # backward pass: compute gradients going backwards
            dU, dW, dV = np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self.V)
            db, dc = np.zeros_like(self.b), np.zeros_like(self.c)
            dhnext = np.zeros_like(hs[0])
            # go backwards
            for t in reversed(range(self.seq_length)):
                dy = np.copy(ps[t])
                # through softmax
                # backprop into y [BCE loss]
                dy[targets[t]] -= 1
                # calculate dV, dc
                dV += np.dot(dy, hs[t].T)
                dc += dc
                # dh includes gradient from two sides, 
                # the next cell and the current output
                # backprop into h
                dh = np.dot(self.V.T, dy) + dhnext 
                # backprop through tanh non-linearity
                dhrec = (1 - hs[t] * hs[t]) * dh
                # bias change
                db += dhrec
                # calculate dU and dW
                dU += np.dot(dhrec, xs[t].T)
                dW += np.dot(dhrec, hs[t-1].T)
                # pass the gradient from next cell 
                # to the next iteration.
                dhnext = np.dot(self.W.T, dhrec)
            # clip to avoid *exploding* gradients
            # note that this does NOT avoid vanishing
            # gradients
            for dparam in [dU, dW, dV, db, dc]:
                np.clip(dparam, -5, 5, out=dparam) 
            return dU, dW, dV, db, dc
    
    # defines loss for a window of characters
    def loss(self, ps, targets):
            # cross-entropy loss
            return sum(-np.log(ps[t][targets[t],0]) for t in range(self.seq_length))
        
    # parameter update using ADAGRAD
    def update_model(self, dU, dW, dV, db, dc):
        # take all parameters
        for param, dparam, mem in zip([self.U, self.W, self.V, self.b, self.c],
                                  [dU, dW, dV, db, dc],
                                  [self.mU, self.mW, self.mV, self.mb, self.mc]):
            # apply memory
            mem += dparam*dparam
            # update parameters dampened by memory
            param += -self.learning_rate*dparam/np.sqrt(mem+1e-8) # adagrad update
                
    # this function samples from the model
    # inputs are the hidden memory state and a seed character,
    # and the number of samples to be made
    def sample(self, h, seed_ix, n):
            x = np.zeros((self.vocab_size, 1))
            # one-hot encoding
            x[seed_ix] = 1
            ixes = []
            for t in range(n):
                # go through the model and get the probabilities
                h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
                y = np.dot(self.V, h) + self.c
                p = np.exp(y)/np.sum(np.exp(y))
                # choose among the characters
                ix = np.random.choice(range(self.vocab_size), p = p.ravel())
                # set to chosen character and append
                x = np.zeros((self.vocab_size,1))
                x[ix] = 1
                ixes.append(ix)
            return ixes
        
    # train loop
    def train(self, data_reader):
            iter_num = 0
            threshold = 0.01
            # set loss to random predictor
            smooth_loss = -np.log(1.0/data_reader.vocab_size)*self.seq_length
            while (smooth_loss > threshold):
                if data_reader.just_started():
                    hprev = np.zeros((self.hidden_size,1))
                inputs, targets = data_reader.next_batch()
                # forward, backward, loss, update
                xs, hs, ps = self.forward(inputs, hprev)
                dU, dW, dV, db, dc = self.backward(xs, hs, ps, targets)
                loss = self.loss(ps, targets)
                self.update_model(dU, dW, dV, db, dc)
                smooth_loss = smooth_loss*0.999 + loss*0.001
                hprev = hs[self.seq_length-1]
                if not iter_num%500:
                    # sample 200 characters from the model
                    # given the current first batch character as seed
                    sample_ix = self.sample(hprev, inputs[0], 200)
                    print( ''.join(data_reader.ix_to_char[ix] for ix in sample_ix))
                    print( "\n\niter :%d, loss:%f"%(iter_num, smooth_loss))
                iter_num += 1
                
    # predict a set of characters
    def predict(self, data_reader, start, n):
        # initialize input vector
        x = np.zeros((self.vocab_size,1))
        chars = [ch for ch in start]
        ixes = []
        for i in range(len(chars)):
            ix = data_reader.char_to_ix[chars[i]]
            x[ix] = 1
            ixes.append(ix)

        h = np.zeros((self.hidden_size,1))
        # predict next n chars
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            y = np.dot(self.V, h) + self.c
            p = np.exp(y)/np.sum(np.exp(y))
            ix = np.random.choice(range(self.vocab_size), p = p.ravel())
            x = np.zeros((self.vocab_size,1))
            x[ix] = 1
            ixes.append(ix)
        txt = ''.join(data_reader.ix_to_char[i] for i in ixes)
        return txt

In [4]:
seq_length = 15
# read text from the "input.txt" file
data_reader = DataReader("t8.shakespeare.txt", seq_length)
rnn = RNN(hidden_size=100, vocab_size=data_reader.vocab_size,
          seq_length=seq_length,learning_rate=1e-1)
print("vocabulary size of",rnn.vocab_size)

vocabulary size of 11


In [5]:
rnn.train(data_reader)

lwollro trehohll o   nwtnloo i  wnnlao tw iirwant o  innwnnaoeo oo wanealteoho wannaooantr rloe tail i wantni iioaewnitloo r nwaitni  r i awnnlor  htnntntoi o hwnt oa wawrlle eaa iinanl   o iinweall  


iter :0, loss:35.968673
want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to 


iter :500, loss:22.443530
want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to 


iter :1000, loss:13.622876
want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to want to wanr to want to want to want to want to 


iter :1500, loss:8.268312
want to want to want to want to want to want to want to want to want to want to want 

KeyboardInterrupt: 