In [None]:
import numpy as np
import matplotlib.pyplot as plt
import time
from IPython import display

In [None]:
class DataReader:
    def __init__(self, path, seq_length):
        self.fp = open(path, 'r')
        self.data = self.fp.read()
        #self.fp.close()
        self.seq_length = seq_length
        #find unique characters
        self.data = self.data.split()
        chars = list(set(self.data))
        self.vocab_size = len(chars)
        #dictionary to map characters and integers to each other
        self.char_to_int = {ch:i for i,ch in enumerate(chars)}
        self.int_to_char = {i:ch for i,ch in enumerate(chars)}
        self.data_length = len(self.data)
        self.pointer = 0
        '''skipping something will look later'''
    def close(self):
        self.fp.close()
    
    def next_batch(self):
        input_start = self.pointer
        input_end = self.pointer + self.seq_length
        inputs = [self.char_to_int[ch] for ch in self.data[input_start:input_end]]
        #for each input word, the target is the next word
        targets = [self.char_to_int[ch] for ch in self.data[input_start+1:input_end+1]]
        self.pointer += self.seq_length
        if self.pointer + self.seq_length + 1 >= self.data_length:
            self.pointer = 0
        return inputs, targets
    
    def just_started(self):
        return self.pointer == 0
    

*   Initialization of weights have an impact on the training of the model, so we have to choose initialization according to our activation function. In our case we will be using tanh as activation function, so **Xavier initialization** is the best choice for this.

In [None]:
class RNN:
    def __init__(self, hidden_size, vocab_size, seq_length, learning_rate):
        #hyper parameters
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.seq_length = seq_length
        self.learning_rate = learning_rate
        #model parameters are initialized using Xavier initialization
        self.U = np.random.uniform(-np.sqrt(1./vocab_size), np.sqrt(1./vocab_size), (hidden_size, vocab_size))
        self.V = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (vocab_size, hidden_size))
        self.W = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (hidden_size, hidden_size))
        self.b = np.zeros((hidden_size, 1)) #bias for hidden layer
        self.c = np.zeros((vocab_size, 1)) #bias for output layer

        #memory variables for adagrad
        self.mU = np.zeros_like(self.U)
        self.mW = np.zeros_like(self.W)
        self.mV = np.zeros_like(self.V)
        self.mb = np.zeros_like(self.b)
        self.mc = np.zeros_like(self.c)

    #define the softmax function
    def softmax(self, x):
        e_x = np.exp(x - np.max(x))
        return e_x / e_x.sum(axis=0)
    
    #defining the froward pass
    def forward(self, inputs, hprev):
        xs ,hs, os, ycap = {}, {}, {}, {}
        hs[-1] = np.copy(hprev)
        for t in range(len(inputs)):
            xs[t] = np.zeros((self.vocab_size, 1))
            xs[t][inputs[t]] = 1 #one hot encoding
            hs[t] = np.tanh(np.dot(self.U, xs[t]) + np.dot(self.W, hs[t-1]) + self.b)
            os[t] = np.dot(self.V, hs[t]) + self.c
            ycap[t] = self.softmax(os[t])
        return xs, hs, ycap
    
    #defining the backward pass
    def backward(self, xs, hs, ps, targets):
        #initialize the gradients
        dU, dW, dV = np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self.V)
        db, dc = np.zeros_like(self.b), np.zeros_like(self.c)
        dhnext = np.zeros_like(hs[0])
        #backpropagate through time in reverse
        for t in reversed(range(self.seq_length)):
            dy = np.copy(ps[t])
            #gradient through the softmax layer
            dy[targets[t]] -= 1
            #calculating the gradients wrt V and c
            dV += np.dot(dy, hs[t].T)
            dc += dy
            #backpropagate the gradient to the hidden layer
            #dh includes gradient form two sides, from the output layer and from the next time step
            dh = np.dot(self.V.T, dy) + dhnext
            #backpropagate through the tanh
            dhraw = (1 - hs[t] * hs[t]) * dh
            db += dhraw
            #calculate the gradients wrt U and W
            dU += np.dot(dhraw, xs[t].T)
            dW += np.dot(dhraw, hs[t-1].T)
            #calculate the gradient wrt the next hidden state
            dhnext = np.dot(self.W.T, dhraw)
        #clip the gradients to mitigate the exploding gradients problem
        for dparam in [dU, dW, dV, db, dc]:
            np.clip(dparam, -5, 5, out=dparam)
        return dU, dW, dV, db, dc
    
    #define the cross entropy loss for one predicted sequence
    def loss(self, ps, targets):
        '''here we are using the negative log likelihood loss(just another name for cross entropy loss)'''
        return sum(-np.log(ps[t][targets[t], 0]) for t in range(self.seq_length))
    
    #define the update function
    def update(self, dU, dW, dV, db, dc):
        #updating parameters using adagrad
        for param, dparam, mem in zip([self.U, self.W, self.V, self.b, self.c], [dU, dW, dV, db, dc], [self.mU, self.mW, self.mV, self.mb, self.mc]):
            mem += dparam * dparam
            param += -self.learning_rate * dparam / np.sqrt(mem + 1e-8)

    def train(self, data_reader):
        iter_num = 1
        threshold = 0.01
        training_loss = []
        '''fundamentals of smooth loss are not clear yet'''
        #smooth_loss = -np.log(1.0/data_reader.vocab_size)*self.seq_length
        for i in range(10000):
            plt.clf()
            if data_reader.just_started():
                hprev = np.zeros((self.hidden_size, 1))
            #creating the input and target sequences of length seq_length for training
            inputs, targets = data_reader.next_batch()
            #forward pass
            xs, hs, ps = self.forward(inputs, hprev)
            #backward pass
            dU, dW, dV, db, dc = self.backward(xs, hs, ps, targets)
            #calculate the loss
            loss = self.loss(ps, targets)
            training_loss.append(loss)
            #update the model parameters
            self.update(dU, dW, dV, db, dc)
            #update the hidden state
            hprev = hs[self.seq_length-1]
            plt.plot(training_loss, label="Train Loss")
            plt.xlabel("Iterations")
            plt.ylabel("Loss")
            plt.title("Training Loss Curve")
            plt.legend()
            display.display(plt.gcf())
            display.clear_output(wait=True)
            time.sleep(0.01)          
            print('iter %d, loss: %f' % (iter_num, loss))
            time.sleep(0.001)
            iter_num += 1
    #define function for predicting the next sequence
    def predict(self, data_reader, start, n):
        start = start.split()
        #initialize the input vector
        x = np.zeros((self.vocab_size, 1))
        chars = [ch for ch in start]
        ixes = []
        for i in range(len(chars)):
            ix = data_reader.char_to_int[chars[i]]
            x[ix] = 1
            ixes.append(ix)
        #initialize the hidden state
        h = np.zeros((self.hidden_size, 1))
        #predict the next n characters
        for t in range(n):
            h= np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            o = np.dot(self.V, h) + self.c
            p = self.softmax(o)
            #predict the index of the next character
            ix = np.random.choice(range(self.vocab_size), p=p.ravel())
            #update the input vector
            x = np.zeros((self.vocab_size, 1))
            x[ix] = 1
            ixes.append(ix)
        #generate the text with spaces between words
        txt = ' '.join(data_reader.int_to_char[ix] for ix in ixes)
        return txt


In [None]:
data = DataReader('RNN_input.txt', 15)
print(data.vocab_size)
#print(data.data[0])
#print(data.char_to_int)
#print(data.int_to_char)
rnn = RNN(hidden_size=100, vocab_size=data.vocab_size,seq_length=15,learning_rate=0.1)
rnn.train(data)

In [None]:
rnn.predict(data, 'rainy', 15)