# IMPLEMENTING RNN FROM SCRATCH IN PYTHON

Implementation¶
We will implement a full Recurrent Neural Network from scratch using Python. We will try to build a text generation model using an RNN. We train our model to predict the probability of a character given the preceding characters. It’s a generative model. Given an existing sequence of characters we sample a next character from the predicted probabilities, and repeat the process until we have a full sentence. This implementation is from Andrej Karparthy great post building a character level RNN. Here we will discuss the implementation details step by step.

General steps to follow:

Initialize weight matrices U, V, W from random distribution and bias b, c with zeros
Forward propagation to compute predictions
Compute the loss
Back-propagation to compute gradients
Update weights based on gradients
Repeat steps 2–5


### Step 1: Initialize:

To start with the implementation of the basic RNN cell, we first define the dimensions of the various parameters U,V,W,b,c.

Dimensions: Let’s assume we pick a vocabulary size vocab_size= 8000 and a hidden layer size hidden_size=100. Then we have:

In [1]:
import numpy as np
import pandas as pd

In [3]:
class RNN:
    def __init__(self,hidden_size,vocab_size,seq_length,learning_rate):
        # hyper params
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.seq_length = seq_length
        self.learning_rate = learning_rate
        
        # model params
        
        self.U= np.random.uniform(-np.sqrt(1./vocab_size),np.sqrt(1./vocab_size),(hidden_size,vocab_size))
        self.V = np.random.uniform(-np.sqrt(1./hidden_size),np.sqrt(1./hidden_size),(vocab_size,hidden_size))
        self.W = np.random.uniform(-np.sqrt(1./hidden_size),np.sqrt(1./hidden_size),(hidden_size,hidden_size))
        self.b = np.zeros((hidden_size,1)) # bias for hidden layer.
        self.c = np.zeros((vocab_size,1)) # bias for output.

Proper initialization of weights seems to have an impact on training results there has been lot of research in this area. It turns out that the best initialization depends on the activation function (tanh in our case) and one recommended approach is to initialize the weights randomly in the interval from[ -1/sqrt(n), 1/sqrt(n)]where n is the number of incoming connections from the previous layer.

## Step 2: Forward pass

Straightforward as per our equations for each timestamp t, we calculate hidden state hs[t] and output os[t] applying softmax to get the probability for the next character.

In [4]:
def forward(self,inputs,hprev):
    xs,hs,os,ycap = {},{},{},{}
    hs[-1] = np.copy(hprev)
    for t in range(len(inputs)):
        xs[t] = zero_init(self.vocab_size,1)
        xs[t][inputs[t]] = 1 # one hot encoding
        hs[t] = np.tanh(np.dot(self.U,xs[t])+np.dot(self.W,hs[t-1])+ self.b) # hidden state
        os[t] = np.dot(self.V,hs[t]) + self.c
        ycap[t] = self.softmax(os[t])
    return xs,hs,ycap

In [6]:
def softmax(x):
    exps = np.exp(x)
    return exps/ np.sum(exps)

In [9]:
softmax([1,2,3])

array([0.09003057, 0.24472847, 0.66524096])

In [10]:
softmax([1000,2000,3000])

  exps = np.exp(x)
  return exps/ np.sum(exps)


array([nan, nan, nan])

In [None]:
# We adjust the softmax function to handle larger numbers like this:

def softmax(self,x):
    p = np.exp(x - np.max(x))
    return p / np.sum(p)

In [None]:
"""import numpy as np

def softmax(x):
    p = np.exp(x - np.max(x))
    return p / np.sum(p)

# Example usage
print(softmax([1, 2, 3]))"""

[0.09003057 0.24472847 0.66524096]


In [None]:
"""import numpy as np

class MyClass:
    def softmax(self, x):
        p = np.exp(x - np.max(x))
        return p / np.sum(p)

# Create an instance of the class
my_instance = MyClass()

# Example usage
print(my_instance.softmax([1, 2, 3]))"""

[0.09003057 0.24472847 0.66524096]


In [13]:
def loss(self,ps,targets):
    '''loss for a sequence'''
    # cross entropy loss
    return sum(-np.log(ps[t][targets[t],0]) for t in range(self.seq_length))

### Step 4: Backward pass¶

In [17]:
def backward(self, xs, hs, ps, targets):
            # backward pass: compute gradients going backwards
            dU, dW, dV = np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self.V)
            db, dc = np.zeros_like(self.b), np.zeros_like(self.c)
            dhnext = np.zeros_like(hs[0])
            for t in reversed(range(self.seq_length)):
                dy = np.copy(ps[t])
                #through softmax
                dy[targets[t]] -= 1 # backprop into y
                #calculate dV, dc
                dV += np.dot(dy, hs[t].T)
                dc += dc
                #dh includes gradient from two sides, next cell and current output
                dh = np.dot(self.V.T, dy) + dhnext # backprop into h
                # backprop through tanh non-linearity 
                dhrec = (1 - hs[t] * hs[t]) * dh  #dhrec is the term used in many equations
                db += dhrec
                #calculate dU and dW
                dU += np.dot(dhrec, xs[t].T)
                dW += np.dot(dhrec, hs[t-1].T)
                #pass the gradient from next cell to the next iteration.
                dhnext = np.dot(self.W.T, dhrec)
            # clip to mitigate exploding gradients
            for dparam in [dU, dW, dV, db, dc]:
                np.clip(dparam, -5, 5, out=dparam) 
            return dU, dW, dV, db, dc
    

# Step 5: Update weights

In [18]:
def update_model(self, dU, dW, dV, db, dc):
        # parameter update with adagrad
        for param, dparam, mem in zip([self.U, self.W, self.V, self.b, self.c],
                                  [dU, dW, dV, db, dc],
                                  [self.mU, self.mW, self.mV, self.mb, self.mc]):
            mem += dparam*dparam
            param += -self.learning_rate*dparam/np.sqrt(mem+1e-8) # adagrad update

### Step 6: Repeat steps 2–5

In order for our model to learn from the data and generate text, we need to train it for sometime and check loss after each iteration. If the loss is reducing over a period of time that means our model is learning what is expected of it.


### Generate Text

We train for some time and if all goes well, we should have our model ready to predict some text. Let us see how it works for us.

We will implement a predict method to predict few words like below:

In [3]:
def predict(self, data_reader, start, n):

        #initialize input vector
        x = zero_init(self.vocab_size, 1)
        chars = [ch for ch in start]
        ixes = []
        for i in range(len(chars)):
            ix = data_reader.char_to_ix[chars[i]]
            x[ix] = 1
            ixes.append(ix)

        h = np.zeros((self.hidden_size,1))
        # predict next n chars
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            y = np.dot(self.V, h) + self.c
            p = np.exp(y)/np.sum(np.exp(y))
            ix = np.random.choice(range(self.vocab_size), p = p.ravel())
            x = zero_init(self.vocab_size,1)
            x[ix] = 1
            ixes.append(ix)
        txt = ''.join(data_reader.ix_to_char[i] for i in ixes)
        return txt


## Putting it all together:

In [4]:
def zero_init(rows, cols):
    return np.zeros((rows, cols))

In [5]:
class RNN:
    def __init__(self, hidden_size, vocab_size, seq_length, learning_rate):
        # hyper parameters
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.seq_length = seq_length
        self.learning_rate = learning_rate
        # model parameters
        self.U = np.random.uniform(-np.sqrt(1./vocab_size), np.sqrt(1./vocab_size), (hidden_size, vocab_size))
        self.V = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (vocab_size, hidden_size))
        self.W = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (hidden_size, hidden_size))
        self.b = np.zeros((hidden_size, 1)) # bias for hidden layer
        self.c = np.zeros((vocab_size, 1)) # bias for output
        
        # memory vars for adagrad, 
        #ignore if you implement another approach
        self.mU = np.zeros_like(self.U)
        self.mW = np.zeros_like(self.W)
        self.mV = np.zeros_like(self.V)
        self.mb = np.zeros_like(self.b)
        self.mc = np.zeros_like(self.c)
    
    def softmax(self, x):
        p = np.exp(x- np.max(x))
        return p / np.sum(p)
        
    def forward(self, inputs, hprev):
            xs, hs, os, ycap = {}, {}, {}, {}
            hs[-1] = np.copy(hprev)
            for t in range(len(inputs)):
                xs[t] = zero_init(self.vocab_size,1)
                xs[t][inputs[t]] = 1 # one hot encoding , 1-of-k
                hs[t] = np.tanh(np.dot(self.U,xs[t]) + np.dot(self.W,hs[t-1]) + self.b) # hidden state
                os[t] = np.dot(self.V,hs[t]) + self.c # unnormalised log probs for next char
                ycap[t] = self.softmax(os[t]) # probs for next char
            return xs, hs, ycap
        
    def backward(self, xs, hs, ps, targets):
            # backward pass: compute gradients going backwards
            dU, dW, dV = np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self.V)
            db, dc = np.zeros_like(self.b), np.zeros_like(self.c)
            dhnext = np.zeros_like(hs[0])
            for t in reversed(range(self.seq_length)):
                dy = np.copy(ps[t])
                #through softmax
                dy[targets[t]] -= 1 # backprop into y
                #calculate dV, dc
                dV += np.dot(dy, hs[t].T)
                dc += dc
                #dh includes gradient from two sides, next cell and current output
                dh = np.dot(self.V.T, dy) + dhnext # backprop into h
                # backprop through tanh non-linearity 
                dhrec = (1 - hs[t] * hs[t]) * dh  #dhrec is the term used in many equations
                db += dhrec
                #calculate dU and dW
                dU += np.dot(dhrec, xs[t].T)
                dW += np.dot(dhrec, hs[t-1].T)
                #pass the gradient from next cell to the next iteration.
                dhnext = np.dot(self.W.T, dhrec)
            # clip to mitigate exploding gradients
            for dparam in [dU, dW, dV, db, dc]:
                np.clip(dparam, -5, 5, out=dparam) 
            return dU, dW, dV, db, dc
    
    def loss(self, ps, targets):
            """loss for a sequence"""
            # calculate cross-entrpy loss
            return sum(-np.log(ps[t][targets[t],0]) for t in range(self.seq_length))
        
    
    def update_model(self, dU, dW, dV, db, dc):
        # parameter update with adagrad
        for param, dparam, mem in zip([self.U, self.W, self.V, self.b, self.c],
                                  [dU, dW, dV, db, dc],
                                  [self.mU, self.mW, self.mV, self.mb, self.mc]):
            mem += dparam*dparam
            param += -self.learning_rate*dparam/np.sqrt(mem+1e-8) # adagrad update
                
                
    def sample(self, h, seed_ix, n):
            """
            sample a sequence of integers from the model
            h is memory state, seed_ix is seed letter from the first time step
            """
            x = zero_init(self.vocab_size, 1)
            x[seed_ix] = 1
            ixes = []
            for t in range(n):
                h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
                y = np.dot(self.V, h) + self.c
                p = np.exp(y)/np.sum(np.exp(y))
                ix = np.random.choice(range(self.vocab_size), p = p.ravel())
                x = zero_init(self.vocab_size,1)
                x[ix] = 1
                ixes.append(ix)
            return ixes

    def train(self, data_reader):
            iter_num = 0
            threshold = 0.01
            smooth_loss = -np.log(1.0/data_reader.vocab_size)*self.seq_length
            while (smooth_loss > threshold):
                if data_reader.just_started():
                    hprev = np.zeros((self.hidden_size,1))
                inputs, targets = data_reader.next_batch()
                xs, hs, ps = self.forward(inputs, hprev)
                dU, dW, dV, db, dc = self.backward(xs, hs, ps, targets)
                loss = self.loss(ps, targets)
                self.update_model(dU, dW, dV, db, dc)
                smooth_loss = smooth_loss*0.999 + loss*0.001
                hprev = hs[self.seq_length-1]
                if not iter_num%500:
                    sample_ix = self.sample(hprev, inputs[0], 200)
                    print( ''.join(data_reader.ix_to_char[ix] for ix in sample_ix))
                    print( "\n\niter :%d, loss:%f"%(iter_num, smooth_loss))
                iter_num += 1

    def predict(self, data_reader, start, n):

        #initialize input vector
        x = zero_init(self.vocab_size, 1)
        chars = [ch for ch in start]
        ixes = []
        for i in range(len(chars)):
            ix = data_reader.char_to_ix[chars[i]]
            x[ix] = 1
            ixes.append(ix)

        h = np.zeros((self.hidden_size,1))
        # predict next n chars
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            y = np.dot(self.V, h) + self.c
            p = np.exp(y)/np.sum(np.exp(y))
            ix = np.random.choice(range(self.vocab_size), p = p.ravel())
            x = zero_init(self.vocab_size,1)
            x[ix] = 1
            ixes.append(ix)
        txt = ''.join(data_reader.ix_to_char[i] for i in ixes)
        return txt

In [None]:
import numpy as np


# To read the training data and make a vocabulary and dictiornary to index the chars
class DataReader:
    def __init__(self, path, seq_length):
        #uncomment below , if you dont want to use any file for text reading and comment next 2 lines
        #self.data = "some really long text to test this. maybe not perfect but should get you going."
        self.fp = open(path, "r")
        self.data = self.fp.read()
        #find unique chars
        chars = list(set(self.data))
        #create dictionary mapping for each char
        self.char_to_ix = {ch:i for (i,ch) in enumerate(chars)}
        self.ix_to_char = {i:ch for (i,ch) in enumerate(chars)}
        #total data
        self.data_size = len(self.data)
        #num of unique chars
        self.vocab_size = len(chars)
        self.pointer = 0
        self.seq_length = seq_length

    def next_batch(self):
        input_start = self.pointer
        input_end = self.pointer + self.seq_length
        inputs = [self.char_to_ix[ch] for ch in self.data[input_start:input_end]]
        targets = [self.char_to_ix[ch] for ch in self.data[input_start+1:input_end+1]]
        self.pointer += self.seq_length
        if self.pointer + self.seq_length + 1 >= self.data_size:
            # reset pointer
            self.pointer = 0
        return inputs, targets

    def just_started(self):
        return self.pointer == 0

    def close(self):
        self.fp.close()

In [14]:
seq_length = 25
#read text from the "input.txt" file
data_reader = DataReader("/Users/jasper/Desktop/RNN_from_scratch/input.txt", seq_length)
rnn = RNN(hidden_size=100, vocab_size=data_reader.vocab_size,seq_length=seq_length,learning_rate=1e-1)
rnn.train(data_reader)

geit ilmfsefsbeei:edsnessbhss.umohv:aitmsnsw.Ive:aatpeis aspogaay.fhhgsss,Iivmsfslassssfylhdg.,Hseasb:stessisiae:msHpssmlwt,I,aaHbeysoa asbop:yunisiissvhlsse psgaagiseosmepssisvusbpitIwssisba,.w.ssHus


iter :0, loss:80.472036
 day, the sun is s beaulip l day, the sun iu shinin ss a beautiful day, the sun is shining and I love tl do whattpas I beautiful day, the sun isishiningnand and do whattin as shiningnan is shining and


iter :500, loss:58.984736
ing I love to do whathin is shining and I love to do whatt aya i  do whatisul day, the sun is shining ao bha t is shining and I love to do whatt whattin: s andnI lovedto do whato whatt e ss  if shinin


iter :1000, loss:35.917290
iful day, the sun is shining and I love to do whatiful day, the sun is shining and I love to do whatting s d le utifulivn is shining and I love to do whato bpis a beautiful day, the sun is shining and


iter :1500, loss:21.843763
o whatiin  s dove to do whatting s dove to do whattin is shining and I love to do wh

In [19]:
import numpy as np


# To read the training data and make a vocabulary and dictiornary to index the chars
class DataReader2:
    def __init__(self, seq_length):
        #uncomment below , if you dont want to use any file for text reading and comment next 2 lines
        self.data = "some really long text to test this. maybe not perfect but should get you going."
        #self.fp = open(path, "r")
        #self.data = self.fp.read()
        #find unique chars
        chars = list(set(self.data))
        #create dictionary mapping for each char
        self.char_to_ix = {ch:i for (i,ch) in enumerate(chars)}
        self.ix_to_char = {i:ch for (i,ch) in enumerate(chars)}
        #total data
        self.data_size = len(self.data)
        #num of unique chars
        self.vocab_size = len(chars)
        self.pointer = 0
        self.seq_length = seq_length

    def next_batch(self):
        input_start = self.pointer
        input_end = self.pointer + self.seq_length
        inputs = [self.char_to_ix[ch] for ch in self.data[input_start:input_end]]
        targets = [self.char_to_ix[ch] for ch in self.data[input_start+1:input_end+1]]
        self.pointer += self.seq_length
        if self.pointer + self.seq_length + 1 >= self.data_size:
            # reset pointer
            self.pointer = 0
        return inputs, targets

    def just_started(self):
        return self.pointer == 0

    def close(self):
        self.fp.close()

In [20]:
seq_length = 25
#read text from the "input.txt" file
data_reader = DataReader2(seq_length)
rnn = RNN(hidden_size=100, vocab_size=data_reader.vocab_size,seq_length=seq_length,learning_rate=1e-1)
rnn.train(data_reader)

imtulllaorxthaolallylcddorlrdxxmlyrllllrrlrdrl lxrlrlllldprrltlfrllxol rltfnronaldrrlfduoxtah trylodg nllullt lddlflrryelrblrlgoyrdldhgarrllotailslrc rllpxtsflyyfltylllllcmlxhdarodr.he.nxlllxlllllxrll


iter :0, loss:77.276622
thstest this. maybe nou long text to test this. maybe not perfect but should get you goipe test this. maybe not perfect but should get you peald get you goirxt should get you goilully lot t but should


iter :500, loss:53.682609
ct but should get you goieet but should get bou notext tut this. maybe not perfect but should get you goiegt but should get you goitethis. maybe not perfect but should get you goise iest this.fmaybt t


iter :1000, loss:32.731891
est this. maybt te seut shoully lo get you goib thyut should get you goieft but should get you goilhnbe not perfect but should get you goi trse thbut should get you goild get youlg text to test this. 


iter :1500, loss:19.935701
ect but should get you goibet but should get you goiset buthshould get you goie t bu

In [22]:
import numpy as np

# Define a function to initialize a zero vector
def zero_init(size, dim):
    return np.zeros((size, dim))

# Define the RNN class (as provided in your code)
class RNN:
    def __init__(self, hidden_size, vocab_size, seq_length, learning_rate):
        # hyper parameters
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.seq_length = seq_length
        self.learning_rate = learning_rate
        # model parameters
        self.U = np.random.uniform(-np.sqrt(1./vocab_size), np.sqrt(1./vocab_size), (hidden_size, vocab_size))
        self.V = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (vocab_size, hidden_size))
        self.W = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (hidden_size, hidden_size))
        self.b = np.zeros((hidden_size, 1)) # bias for hidden layer
        self.c = np.zeros((vocab_size, 1)) # bias for output
        
        # memory vars for adagrad, 
        #ignore if you implement another approach
        self.mU = np.zeros_like(self.U)
        self.mW = np.zeros_like(self.W)
        self.mV = np.zeros_like(self.V)
        self.mb = np.zeros_like(self.b)
        self.mc = np.zeros_like(self.c)
    
    def softmax(self, x):
        p = np.exp(x- np.max(x))
        return p / np.sum(p)
        
    def forward(self, inputs, hprev):
        xs, hs, os, ycap = {}, {}, {}, {}
        hs[-1] = np.copy(hprev)
        for t in range(len(inputs)):
            xs[t] = zero_init(self.vocab_size,1)
            xs[t][inputs[t]] = 1 # one hot encoding , 1-of-k
            hs[t] = np.tanh(np.dot(self.U,xs[t]) + np.dot(self.W,hs[t-1]) + self.b) # hidden state
            os[t] = np.dot(self.V,hs[t]) + self.c # unnormalised log probs for next char
            ycap[t] = self.softmax(os[t]) # probs for next char
        return xs, hs, ycap
        
    def backward(self, xs, hs, ps, targets):
        # backward pass: compute gradients going backwards
        dU, dW, dV = np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self.V)
        db, dc = np.zeros_like(self.b), np.zeros_like(self.c)
        dhnext = np.zeros_like(hs[0])
        for t in reversed(range(self.seq_length)):
            dy = np.copy(ps[t])
            #through softmax
            dy[targets[t]] -= 1 # backprop into y
            #calculate dV, dc
            dV += np.dot(dy, hs[t].T)
            dc += dc
            #dh includes gradient from two sides, next cell and current output
            dh = np.dot(self.V.T, dy) + dhnext # backprop into h
            # backprop through tanh non-linearity 
            dhrec = (1 - hs[t] * hs[t]) * dh  #dhrec is the term used in many equations
            db += dhrec
            #calculate dU and dW
            dU += np.dot(dhrec, xs[t].T)
            dW += np.dot(dhrec, hs[t-1].T)
            #pass the gradient from next cell to the next iteration.
            dhnext = np.dot(self.W.T, dhrec)
        # clip to mitigate exploding gradients
        for dparam in [dU, dW, dV, db, dc]:
            np.clip(dparam, -5, 5, out=dparam) 
        return dU, dW, dV, db, dc
    
    def loss(self, ps, targets):
        """loss for a sequence"""
        # calculate cross-entrpy loss
        return sum(-np.log(ps[t][targets[t],0]) for t in range(self.seq_length))
        
    
    def update_model(self, dU, dW, dV, db, dc):
        # parameter update with adagrad
        for param, dparam, mem in zip([self.U, self.W, self.V, self.b, self.c],
                                  [dU, dW, dV, db, dc],
                                  [self.mU, self.mW, self.mV, self.mb, self.mc]):
            mem += dparam*dparam
            param += -self.learning_rate*dparam/np.sqrt(mem+1e-8) # adagrad update
                
                
    def sample(self, h, seed_ix, n):
        """
        sample a sequence of integers from the model
        h is memory state, seed_ix is seed letter from the first time step
        """
        x = zero_init(self.vocab_size, 1)
        x[seed_ix] = 1
        ixes = []
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            y = np.dot(self.V, h) + self.c
            p = np.exp(y)/np.sum(np.exp(y))
            ix = np.random.choice(range(self.vocab_size), p = p.ravel())
            x = zero_init(self.vocab_size,1)
            x[ix] = 1
            ixes.append(ix)
        return ixes

    def train(self, data_reader):
        iter_num = 0
        threshold = 0.01
        smooth_loss = -np.log(1.0/data_reader.vocab_size)*self.seq_length
        while (smooth_loss > threshold):
            if data_reader.just_started():
                hprev = np.zeros((self.hidden_size,1))
            inputs, targets = data_reader.next_batch()
            xs, hs, ps = self.forward(inputs, hprev)
            dU, dW, dV, db, dc = self.backward(xs, hs, ps, targets)
            loss = self.loss(ps, targets)
            self.update_model(dU, dW, dV, db, dc)
            smooth_loss = smooth_loss*0.999 + loss*0.001
            hprev = hs[self.seq_length-1]
            if not iter_num%500:
                sample_ix = self.sample(hprev, inputs[0], 200)
                print( ''.join(data_reader.ix_to_char[ix] for ix in sample_ix))
                print( "\n\niter :%d, loss:%f"%(iter_num, smooth_loss))
            iter_num += 1

    def predict(self, data_reader, start, n):
        #initialize input vector
        x = zero_init(self.vocab_size, 1)
        chars = [ch for ch in start]
        ixes = []
        for i in range(len(chars)):
            ix = data_reader.char_to_ix[chars[i]]
            x[ix] = 1
            ixes.append(ix)

        h = np.zeros((self.hidden_size,1))
        # predict next n chars
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            y = np.dot(self.V, h) + self.c
            p = np.exp(y)/np.sum(np.exp(y))
            ix = np.random.choice(range(self.vocab_size), p = p.ravel())
            x = zero_init(self.vocab_size,1)
            x[ix] = 1
            ixes.append(ix)
        txt = ''.join(data_reader.ix_to_char[i] for i in ixes)
        return txt

# To read the training data and make a vocabulary and dictionary to index the chars
class DataReader:
    def __init__(self, path, seq_length):
        #uncomment below , if you dont want to use any file for text reading and comment next 2 lines
        #self.data = "some really long text to test this. maybe not perfect but should get you going."
        self.fp = open(path, "r")
        self.data = self.fp.read()
        #find unique chars
        chars = list(set(self.data))
        #create dictionary mapping for each char
        self.char_to_ix = {ch:i for (i,ch) in enumerate(chars)}
        self.ix_to_char = {i:ch for (i,ch) in enumerate(chars)}
        #total data
        self.data_size = len(self.data)
        #num of unique chars
        self.vocab_size = len(chars)
        self.pointer = 0
        self.seq_length = seq_length

    def next_batch(self):
        input_start = self.pointer
        input_end = self.pointer + self.seq_length
        inputs = [self.char_to_ix[ch] for ch in self.data[input_start:input_end]]
        targets = [self.char_to_ix[ch] for ch in self.data[input_start+1:input_end+1]]
        self.pointer += self.seq_length
        if self.pointer + self.seq_length + 1 >= self.data_size:
            # reset pointer
            self.pointer = 0
        return inputs, targets

    def just_started(self):
        return self.pointer == 0

    def close(self):
        self.fp.close()

# Example usage
# Create a DataReader instance
data_reader = DataReader('/Users/jasper/Desktop/RNN_from_scratch/input.txt', seq_length=25)

# Create an RNN instance
hidden_size = 100
seq_length = 25
learning_rate = 0.1
rnn = RNN(hidden_size, data_reader.vocab_size, seq_length, learning_rate)

# Predict the next characters starting with a given word
start_word = "hello"
num_chars_to_predict = 50
predicted_text = rnn.predict(data_reader, start_word, num_chars_to_predict)

print(predicted_text)

hellogeHiuo: dH:muHgI:gIua gdya:hm,mgsuuy abuhf:noimifd


In [28]:
class RNN:
    def __init__(self, hidden_size, vocab_size, seq_length, learning_rate):
        # hyper parameters
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.seq_length = seq_length
        self.learning_rate = learning_rate
        # model parameters
        self.U = np.random.uniform(-np.sqrt(1./vocab_size), np.sqrt(1./vocab_size), (hidden_size, vocab_size))
        self.V = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (vocab_size, hidden_size))
        self.W = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (hidden_size, hidden_size))
        self.b = np.zeros((hidden_size, 1)) # bias for hidden layer
        self.c = np.zeros((vocab_size, 1)) # bias for output
        
        # memory vars for adagrad, 
        #ignore if you implement another approach
        self.mU = np.zeros_like(self.U)
        self.mW = np.zeros_like(self.W)
        self.mV = np.zeros_like(self.V)
        self.mb = np.zeros_like(self.b)
        self.mc = np.zeros_like(self.c)
    
    
    def softmax(self, x):
        p = np.exp(x- np.max(x))
        return p / np.sum(p)
        
    def forward(self, inputs, hprev):
        xs, hs, os, ycap = {}, {}, {}, {}
        hs[-1] = np.copy(hprev)
        for t in range(len(inputs)):
            xs[t] = zero_init(self.vocab_size,1)
            xs[t][inputs[t]] = 1 # one hot encoding , 1-of-k
            hs[t] = np.tanh(np.dot(self.U,xs[t]) + np.dot(self.W,hs[t-1]) + self.b) # hidden state
            os[t] = np.dot(self.V,hs[t]) + self.c # unnormalised log probs for next char
            ycap[t] = self.softmax(os[t]) # probs for next char
        return xs, hs, ycap
        
    def backward(self, xs, hs, ps, targets):
        # backward pass: compute gradients going backwards
        dU, dW, dV = np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self.V)
        db, dc = np.zeros_like(self.b), np.zeros_like(self.c)
        dhnext = np.zeros_like(hs[0])
        for t in reversed(range(self.seq_length)):
            dy = np.copy(ps[t])
            #through softmax
            dy[targets[t]] -= 1 # backprop into y
            #calculate dV, dc
            dV += np.dot(dy, hs[t].T)
            dc += dc
            #dh includes gradient from two sides, next cell and current output
            dh = np.dot(self.V.T, dy) + dhnext # backprop into h
            # backprop through tanh non-linearity 
            dhrec = (1 - hs[t] * hs[t]) * dh  #dhrec is the term used in many equations
            db += dhrec
            #calculate dU and dW
            dU += np.dot(dhrec, xs[t].T)
            dW += np.dot(dhrec, hs[t-1].T)
            #pass the gradient from next cell to the next iteration.
            dhnext = np.dot(self.W.T, dhrec)
        # clip to mitigate exploding gradients
        for dparam in [dU, dW, dV, db, dc]:
            np.clip(dparam, -5, 5, out=dparam) 
        return dU, dW, dV, db, dc
    
    def loss(self, ps, targets):
        """loss for a sequence"""
        # calculate cross-entrpy loss
        return sum(-np.log(ps[t][targets[t],0]) for t in range(self.seq_length))
        
    
    def update_model(self, dU, dW, dV, db, dc):
        # parameter update with adagrad
        for param, dparam, mem in zip([self.U, self.W, self.V, self.b, self.c],
                                  [dU, dW, dV, db, dc],
                                  [self.mU, self.mW, self.mV, self.mb, self.mc]):
            mem += dparam*dparam
            param += -self.learning_rate*dparam/np.sqrt(mem+1e-8) # adagrad update
                
                
    def sample(self, h, seed_ix, n):
        """
        sample a sequence of integers from the model
        h is memory state, seed_ix is seed letter from the first time step
        """
        x = zero_init(self.vocab_size, 1)
        x[seed_ix] = 1
        ixes = []
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            y = np.dot(self.V, h) + self.c
            p = np.exp(y)/np.sum(np.exp(y))
            ix = np.random.choice(range(self.vocab_size), p = p.ravel())
            x = zero_init(self.vocab_size,1)
            x[ix] = 1
            ixes.append(ix)
        return ixes


    def train(self, data_reader, num_epochs=10):
        iter_num = 0
        threshold = 0.01
        smooth_loss = -np.log(1.0/data_reader.vocab_size)*self.seq_length
        for epoch in range(num_epochs):
            while (smooth_loss > threshold):
                if data_reader.just_started():
                    hprev = np.zeros((self.hidden_size,1))
                inputs, targets = data_reader.next_batch()
                xs, hs, ps = self.forward(inputs, hprev)
                dU, dW, dV, db, dc = self.backward(xs, hs, ps, targets)
                loss = self.loss(ps, targets)
                self.update_model(dU, dW, dV, db, dc)
                smooth_loss = smooth_loss*0.999 + loss*0.001
                hprev = hs[self.seq_length-1]
                if not iter_num % 500:
                    sample_ix = self.sample(hprev, inputs[0], 200)
                    print(''.join(data_reader.ix_to_char[ix] for ix in sample_ix))
                    print("\n\niter :%d, loss:%f" % (iter_num, smooth_loss))
                iter_num += 1
            print(f"Epoch {epoch + 1}/{num_epochs} completed with loss: {smooth_loss}")

    def predict(self, data_reader, start, n):
        #initialize input vector
        x = zero_init(self.vocab_size, 1)
        chars = [ch for ch in start]
        ixes = []
        for i in range(len(chars)):
            ix = data_reader.char_to_ix[chars[i]]
            x[ix] = 1
            ixes.append(ix)

        h = np.zeros((self.hidden_size,1))
        # predict next n chars
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            y = np.dot(self.V, h) + self.c
            p = np.exp(y)/np.sum(np.exp(y))
            ix = np.random.choice(range(self.vocab_size), p = p.ravel())
            x = zero_init(self.vocab_size,1)
            x[ix] = 1
            ixes.append(ix)
        txt = ''.join(data_reader.ix_to_char[i] for i in ixes)
        return txt

# To read the training data and make a vocabulary and dictionary to index the chars
class DataReader:
    def __init__(self, path, seq_length):
        #uncomment below , if you dont want to use any file for text reading and comment next 2 lines
        #self.data = "some really long text to test this. maybe not perfect but should get you going."
        self.fp = open(path, "r")
        self.data = self.fp.read()
        #find unique chars
        chars = list(set(self.data))
        #create dictionary mapping for each char
        self.char_to_ix = {ch:i for (i,ch) in enumerate(chars)}
        self.ix_to_char = {i:ch for (i,ch) in enumerate(chars)}
        #total data
        self.data_size = len(self.data)
        #num of unique chars
        self.vocab_size = len(chars)
        self.pointer = 0
        self.seq_length = seq_length

    def next_batch(self):
        input_start = self.pointer
        input_end = self.pointer + self.seq_length
        inputs = [self.char_to_ix[ch] for ch in self.data[input_start:input_end]]
        targets = [self.char_to_ix[ch] for ch in self.data[input_start+1:input_end+1]]
        self.pointer += self.seq_length
        if self.pointer + self.seq_length + 1 >= self.data_size:
            # reset pointer
            self.pointer = 0
        return inputs, targets

    def just_started(self):
        return self.pointer == 0

    def close(self):
        self.fp.close()

# Example usage
# Create a DataReader instance
data_reader = DataReader('/Users/jasper/Desktop/RNN_from_scratch/input.txt', seq_length=25)

# Create an RNN instance
hidden_size = 300
seq_length = 25
learning_rate = 0.05
rnn = RNN(hidden_size, data_reader.vocab_size, seq_length, learning_rate)

# Train the RNN for a specified number of epochs
num_epochs = 25
rnn.train(data_reader, num_epochs=num_epochs)

# Predict the next characters starting with a given word
start_word = "hello"
num_chars_to_predict = 50
predicted_text = rnn.predict(data_reader, start_word, num_chars_to_predict)

print(predicted_text)

eutnaytalb,tpinbd,y s aeHum,anfdmuivets isnis:is is:is:isuewHisdis:id:i,:bgnioudsnisfiv is is is is  steHuvs:Hwtes islis is isgIme  ismwIIis is is is isais is ispildoslisnisaisuibbis is:is asg,, is is


iter :0, loss:80.472025
bh,degnI laanhaau i:es tebsantevnd s , I bofehsahe o   aig ahhss gss s aty dggss i anhiiagoha.iIawdsig Ip lsul o  sfhde ulowahss ifouytI a apg s diiug in ts h a g a yoduloI g  iisiul  uyoteht ,lti m :


iter :500, loss:74.927583
anan itis is: iteis a beautiodloveyt ahe nda fhI tha whatwhdfthaabha whattha whav ti: d wnldve to do whatitioalo whatwheabdo whasado what lo whatwhadahofwhe snn sg a i shit ns a d I be thatHypoehatwho


iter :1000, loss:53.647586
s is s is:nio is and I love t  s a beauit:  ishin: it is a beautifuu day, the iuning and I love to do whatwhatihatwhatil,I love to do whd is a beautiful day, the sun is shining and I love to do whatwh


iter :1500, loss:33.713689
syo thatbha whavitt the sun is shining snd I love to do whataho thatitg whatido what