In [11]:
import numpy as np

class DataReader:
    def __init__(self, path, seq_length):
        #uncomment below , if you dont want to use any file for text reading and comment next 2 lines
        #self.data = "some really long text to test this. maybe not perfect but should get you going."
        self.fp = open(path, "r")
        self.data = self.fp.read()
        #find unique chars
        chars = list(set(self.data))
        #create dictionary mapping for each char
        self.char_to_ix = {ch:i for (i,ch) in enumerate(chars)}
        self.ix_to_char = {i:ch for (i,ch) in enumerate(chars)}
        #total data
        self.data_size = len(self.data)
        #num of unique chars
        self.vocab_size = len(chars)
        self.pointer = 0
        self.seq_length = seq_length

    def next_batch(self):
        input_start = self.pointer
        input_end = self.pointer + self.seq_length
        inputs = [self.char_to_ix[ch] for ch in self.data[input_start:input_end]]
        targets = [self.char_to_ix[ch] for ch in self.data[input_start+1:input_end+1]]
        self.pointer += self.seq_length
        if self.pointer + self.seq_length + 1 >= self.data_size:
            # reset pointer
            self.pointer = 0
        return inputs, targets

    def just_started(self):
        return self.pointer == 0

    def close(self):
        self.fp.close()

In [12]:
class recurrent_layer():
    
    def __init__(self, hiden_size, vocab_size, seq_length, learning_rate):
        #hyper params
        self.hidden_size = hidden_size #number of states to consider for next computation
        self.vocab_size = vocab_size #number of words in dict
        self.seq_length = seq_length #sequence length to predict up to
        self.learning_rate = learning_rate
        
        #model params
        self.U = np.random.rand(-np.sqrt(1./vocab_size), np.sqrt(1./vocab_size), (hidden_size, vocab_size)) #first to are range for uniform, last is dim of matrix
        self.V = np.random.rand(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (vocab_size, hidden_size)) #first to are range for uniform, last is dim of matrix
        self.W = np.random.rand(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (hidden_size, hidden_size)) #first to are range for uniform, last is dim of matrix
        self.b = np.zeros((hidden_size,1)) #bias for hidden layer
        self.c = np.zeros((vocab_size,1)) #bias for output 
    
    def softmax(self, x):
        exps = np.exp(x)
        return exps/np.sum(exps)
    
    def loss(self, ycap, targets):
        return sum(-np.log10(ycap[t][targets[t],0]) for t in range(self.seq_length))
    
    def forward(self, inputs, hprev):
        xs, hs, os, ycap = {}, {}, {}, {}
        hs[-1] = np.copy(hprev)
        
        for t in range(len(inputs)):
            xs[t] = np.zeros(self.vocab_size,1)
            xs[t][inputs[t]] = 1
            hs[t] = np.tanh(np.dot(self.U, xs[t]) + np.dot(self.W, hs[t-1]) + self.b)
            os[t] = np.dot(self.V, hs[t]) + self.c
            ycap[t] = self.softmax(os[t])
        return xs,hs,ycap
          
    def backward(self, xs, hs, ycap, targets):
        
        dU, dW, dV = np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self.V)
        dc, db = np.zeros_like(self.c), np.zeros_like(self.b)
        dhnext = np.zeros_like(hs[0])
        
        for t in reversed(range(self.seq_length)):
            #output grad
            dy = np.copy(ycap[t])
            dy[targets[t]] -= 1
            
            dV += np.dot(dy, hs[t].T)
            dc += dc
            
            dh = dhnext + np.dot(self.V.T, dy)
            
            dhrec = (1 - hs[t] * hs[t]) * dh #tanh_prime
            db += dhrec
            
            dU += np.dot(dhrec, xs[t].T)
            dW += np.dot(dhrec, hs[t-1].T)
            
            dhnext = np.dot(self.W.T, dhrec)
            
        for dparam in [dU, dW, dV, db, dc]:
            np.clip(dparam, -5, 5, out=dparam)
        return dU, dW, dV, db, dc
    
    def update_model(self, dU, dV, dW, db, dc):
        for param, dparam in zip([self.U, self.W, self.V, self.b, self.c], [dU, dW, dV, db, dc]):
            param += -self.learning_rate * dparam
            
    def predict(self, data_reader, start, n):
        #init input vector
        x = np.zeros(self.vocab_size,1)
        chars = [ch for ch in start]
        ixes = []
        for i in range(len(chars)):
            ix = data_reader.char_to_ix[chars[i]]
            x[ix] = 1
            ixes.append(ix)
            
        h = np.zeros((self.hidden_size,1))
        
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W + h) + self.b)
            y = np.dot(self.V, h) + self.c
            p = np.exp(y)/np.sum(np.exp(y))
            ix = np.random.choice(range(self.vocab_size), p=p.ravel())
            x[ix] = 1
            ixes.append(ix)
            
        txt = "".join(data_reader.ix_to_char[i] for i in ixes)
        return txt

In [6]:
hs = {}
type(hs)

dict