### numpy implementatin of a multiplayer LSTM (https://arxiv.org/pdf/1503.04069.pdf)

In [2]:
import numpy as np
from IPython.display import clear_output

Some helper function

In [2]:
def sigmoid(x):
    #clip to prevent overflow
    x = np.clip(x, -709.78, 709.78)
    return 1/(1 + np.exp(-x))

def dsigmoid(y):
    return y*(1-y)

def dtanh(y):
    return 1-y**2

parameter class

In [5]:
class param:
    def __init__(self, shape, is_bias = False):
        if not is_bias:
            self.value = np.random.standard_normal(shape)*0.05
        else:
            self.value = np.zeros(shape)
        self.grad = np.zeros(shape)
        self.m = np.zeros(shape)
        self.v = np.zeros(shape)
    def __call__(self):
        return self.value

In [None]:
#similar to https://pytorch.org/docs/stable/_modules/torch/nn/utils/clip_grad.html#clip_grad_norm_
def clip_grad_norm(grads, max_norm=0.1):
    total_norm = 0.
    
    for grad in grads:
        grad_norm = np.sum(param**2)
        total_norm += grad_norm
    
    total_norm = np.sqrt(total_norm)
    
    clip_coef = max_norm / (total_norm + 1e-6)
    
    if clip_coef < 1:
        for grad in grads:
            grad *= clip_coef
    return grads

pretty straight forward implementation of LSTM layer and fully connected layer

update using adam optimizer

In [5]:
class LSTM:
    def __init__(self, num_input, num_hidden, lr = 0.01, betas = (0.9, 0.999), epsilon = 1e-8, wd = 0.001):
        self.num_input = num_input
        self.num_hidden = num_hidden
        
        self.lr = lr
        self.betas = betas
        self.epsilon = epsilon
        self.wd = wd
        self.update_iter = 1
        
        #W of shape (num_hidden, num_input)
        self.Wz = param((num_hidden, num_input))
        self.Wi = param((num_hidden, num_input))
        self.Wf = param((num_hidden, num_input))
        self.Wo = param((num_hidden, num_input))
        
        self.Rz = param((num_hidden, num_hidden))
        self.Ri = param((num_hidden, num_hidden))
        self.Rf = param((num_hidden, num_hidden))
        self.Ro = param((num_hidden, num_hidden))

        self.bz = param((num_hidden, 1),True)
        self.bi = param((num_hidden, 1),True)
        self.bf = param((num_hidden, 1),True)
        self.bo = param((num_hidden, 1),True)
        
        self.params = [
            self.Wz, self.Wi, self.Wf, self.Wo, 
            self.Rz, self.Ri, self.Rf, self.Ro, 
            self.bz, self.bi, self.bf, self.bo,
        ]
        
        self.cache = {}
            
    def reset_cache(self):
        """
        delta is deltas passed down from layer above
        X, dX:
        of shape (seq_len, num_input)
        Z,I,F,O,C,Y, 
        dY, dO, dC, dF, dI, dZ, delta:
        of shape (num_hidden, 1)
        """

        for key in ['Z' ,'I' ,'F' ,'C' ,'O' ,'Y' ,'dY' ,'dO' ,'dC' ,'dF' ,'dI' ,'dZ', 'delta']:
            self.cache[key] = [np.zeros((self.num_hidden, 1))] * (self.seq_len+1)
        for key in ['X', 'dX']:
            self.cache[key] = [np.zeros((self.num_input, 1))] * (self.seq_len+1)
        
    def forward_step(self, t):
        xt = self.cache['X'][t]
        Y  = self.cache['Y'][t-1]
        C  = self.cache['C'][t-1]
        #xt of shape (num_input,  1)
        #Y  of shape (num_hidden, 1)
        Z = np.tanh(self.Wz() @ xt + self.Rz() @ Y + self.bz())
        I = sigmoid(self.Wi() @ xt + self.Ri() @ Y + self.bi())
        F = sigmoid(self.Wf() @ xt + self.Rf() @ Y + self.bf())
        C = Z * I + C * F
        O = sigmoid(self.Wo() @ xt + self.Ro() @ Y + self.bo())
        Y = np.tanh(C) * O

        self.cache['Z'][t] = Z
        self.cache['I'][t] = I
        self.cache['F'][t] = F
        self.cache['C'][t] = C
        self.cache['O'][t] = O
        self.cache['Y'][t] = Y
        return

    def backward_step(self, t):
        dY = \
        self.cache['delta'][t] + \
        self.Rz().T @ self.cache['dZ'][t+1] + \
        self.Ri().T @ self.cache['dI'][t+1] + \
        self.Rf().T @ self.cache['dF'][t+1] + \
        self.Ro().T @ self.cache['dO'][t+1]
        
        dO = dY * np.tanh(self.cache['C'][t]) * dsigmoid(self.cache['O'][t])
        dC = dY * self.cache['O'][t] *   dtanh(np.tanh(self.cache['C'][t])) + self.cache['dC'][t+1] * self.cache['F'][t+1]
        dF = dC * self.cache['C'][t-1] * dsigmoid(self.cache['F'][t])
        dI = dC * self.cache['Z'][t] *   dsigmoid(self.cache['I'][t])
        dZ = dC * self.cache['I'][t] *   dtanh(np.tanh(self.cache['Z'][t]))
        
        dX = \
        self.Wz().T @ dZ
        self.Wi().T @ dI
        self.Wf().T @ dF
        self.Wo().T @ dO

        self.cache['dY'][t] = dY
        self.cache['dO'][t] = dO
        self.cache['dC'][t] = dC
        self.cache['dF'][t] = dF
        self.cache['dI'][t] = dI
        self.cache['dZ'][t] = dZ
        self.cache['dX'][t] = dX

        self.Wz.grad += np.outer(dZ, self.cache['X'][t])
        self.Wi.grad += np.outer(dI, self.cache['X'][t])
        self.Wf.grad += np.outer(dF, self.cache['X'][t])
        self.Wo.grad += np.outer(dO, self.cache['X'][t])

        self.Rz.grad += np.outer(self.cache['dZ'][t+1], self.cache['Y'][t])
        self.Ri.grad += np.outer(self.cache['dI'][t+1], self.cache['Y'][t])
        self.Rf.grad += np.outer(self.cache['dF'][t+1], self.cache['Y'][t])
        self.Ro.grad += np.outer(self.cache['dO'][t+1], self.cache['Y'][t])

        self.bz.grad += dZ
        self.bi.grad += dI
        self.bf.grad += dF
        self.bo.grad += dO
        return

    def forward(self, X):
        #X: input of shape (seq_len, num_input)
        seq_len, _ = X.shape
        self.seq_len = seq_len
        self.reset_cache()
        for t in range(seq_len):
            self.cache['X'][t] = X[t][:,np.newaxis]

        #forward propagation
        for t in range(seq_len):
            self.forward_step(t)
        return np.concatenate([ele.T for ele in self.cache['Y']][:-1], axis=0)

    def backward(self, delta):
        #delta: deltas passed down from layer above of shape (seq_len, num_hidden)
        for t in range(self.seq_len):
            self.cache['delta'][t] = delta[t][:,np.newaxis]

        for t in reversed(range(self.seq_len)):
            self.backward_step(t)
        return  np.concatenate([ele.T for ele in self.cache['delta']][:-1], axis=0)

    def zerograd(self):
        for param in self.params:
            param.grad.fill(0)

    def update(self):
        #self.clip_grad()
        for param in self.params:
            param.value -= self.lr * self.wd * param.value
            param.m = self.betas[0] * param.m + (1 - self.betas[0]) * param.grad
            param.v = self.betas[1] * param.v + (1 - self.betas[1]) * param.grad ** 2
            m_corr = param.m / (1 - self.betas[0]**self.update_iter)
            v_corr = param.v / (1 - self.betas[1]**self.update_iter)

            param.value -= self.lr * m_corr / (np.sqrt(v_corr) + self.epsilon)
            
        self.update_iter += 1
        return

    def clip_grad(self):
        grad_list = [param.grad for param in self.params]
        clipped_grad = clip_grad_norm(grad_list)
        for i, param in enumerate(self.params):
            param.grad = clipped_grad[i]
            
class FC:
    def __init__(self, num_input, num_output, lr = 0.01, betas = (0.9, 0.999), epsilon = 1e-8, wd = 0.001):
        self.num_input = num_input
        self.num_output = num_output
        
        self.lr = lr
        self.betas = betas
        self.epsilon = epsilon
        self.wd = wd
        self.update_iter = 1
        
        #W of shape (num_hidden, num_input)
        self.W = param((num_input, num_output))
        self.b = param((num_output, 1),True)
        
        self.params = [
            self.W,
            self.b,
        ]
        
        self.cache = {}
            
    def reset_cache(self):
        """
        X:
        of shape (num_feature, num_input)
        Y:
        of shape (num_feature, num_output)
        """
        for key in ['X', 'dX']:
            self.cache[key] = np.zeros((self.num_feature, self.num_input))
        for key in ['Y']:
            self.cache[key] = np.zeros((self.num_feature, self.num_output))

    def forward(self, X):
        #X: input of shape (num_feature, num_input)
        num_feature, _ = X.shape
        self.num_feature = num_feature
        self.reset_cache()
        self.cache['X'] = X
        
        Y = X @ self.W()
        self.cache['Y'] = Y
        return self.cache['Y']

    def backward(self, dY):
        #dY: deltas passed down from layer above of shape (num_feature, num_output)
        self.W.grad += self.cache['X'].T @ dY
        self.b.grad += np.mean(dY, axis=0, keepdims=True).T
        
        dX = dY @ self.W().T
        return dX

    def zerograd(self):
        for param in self.params:
            param.grad.fill(0)

    def update(self):
        #self.clip_grad()
        for param in self.params:
            param.value -= self.lr * self.wd * param.value
            param.m = self.betas[0] * param.m + (1 - self.betas[0]) * param.grad
            param.v = self.betas[1] * param.v + (1 - self.betas[1]) * param.grad ** 2
            m_corr = param.m / (1 - self.betas[0]**self.update_iter)
            v_corr = param.v / (1 - self.betas[1]**self.update_iter)

            param.value -= self.lr * m_corr / (np.sqrt(v_corr) + self.epsilon)
            
        self.update_iter += 1
        return

    def clip_grad(self):
        grad_list = [param.grad for param in self.params]
        clipped_grad = clip_grad_norm(grad_list)
        for i, param in enumerate(self.params):
            param.grad = clipped_grad[i]
            
class build_model:
    def __init__(self, num_input, num_hidden, lr = 0.01, batch_size = 8):
        self.batch_size = batch_size
        self.layer1 = LSTM(num_input ,num_hidden, lr = lr)
        self.layer2 = LSTM(num_hidden ,num_hidden, lr = lr)
        self.layer3 = LSTM(num_hidden ,num_hidden, lr = lr)
        self.head   = FC(num_hidden,num_input, lr = lr)
        self.cache = {}
        
        self.layers = [self.layer1, self.layer2, self.layer3, self.head]
    
    def __call__(self, X):
        out = X
        for layer in self.layers:
            out = layer.forward(out)
        self.cache['output'] = out
        return out
    
    def loss(self, label):
        loss = 0
        dx = np.copy(self.cache['output'])
        for i in range(dx.shape[0]):
            dx[i] = np.exp(dx[i]) / np.sum(np.exp(dx[i]))
            loss += -np.log(dx[i][label[i]]) / dx.shape[0]
            dx[i][label[i]]-=1
        self.cache['dx'] = dx / self.batch_size
        return loss
    
    def backward(self):
        dx = self.cache['dx']
        for layer in reversed(self.layers):
            dx = layer.backward(dx)
        return
    
    def zerograd(self):
        for layer in self.layers:
            layer.zerograd()

    def update(self):
        for layer in self.layers:
            layer.update()
        self.zerograd()

data used here is Shakespeare's sonnets from https://www.gutenberg.org/ebooks/1041

In [6]:
with open('./data/sonnet.txt') as f:
    lines = f.readlines()
lines = [l.strip().lower() for l in lines]
sonnets = []
start = 'i'
end = 'cliv'
start_index = lines.index(start)
end_index = lines.index(end)+17
i = start_index
while i < end_index:
    if len(lines[i]) > 20:
        for j in range(14):
            sonnets.append(lines[i+j])
        i+=14
    else:
        i+=1
        
all_ = ' '.join(sonnets)
idx_to_char = list(set(all_))
idx_to_char.sort()
char_to_idx = dict()
for i in range(len(idx_to_char)):
    char_to_idx[idx_to_char[i]] = i
    
num_char = len(idx_to_char)
max_len = 60
all_ = all_[:(len(all_) // max_len) * max_len]

sonnets = [all_[i:i+max_len] for i in range(0, len(all_), max_len)]

In [7]:
def onehot_encode(tokens,max_len,num_char):
    one_hot = np.zeros((max_len, num_char))
    for i in range(len(tokens)):
        one_hot[i][tokens[i]] = 1
    return one_hot

def tokenize_and_label(inp,max_len,num_char):
    tokens = [char_to_idx[' ']]*max_len
    labels = [char_to_idx[' ']]*max_len
    
    for i in range(len(inp)):
        tokens[i] = char_to_idx[inp[i]]
        if i != 0 :
            labels[i-1] = char_to_idx[inp[i]]
    return (onehot_encode(tokens,max_len,num_char), np.array(labels))

def predict(model,max_len = 30,seed_str = ''):
    start_index = len(seed_str)
    tokens = [char_to_idx[' ']]*max_len
    for i in range(len(seed_str)):
        tokens[i] = char_to_idx[seed_str[i]]
    while start_index+1 < max_len:
        inp = onehot_encode(tokens,max_len,num_char)
        out = model(inp)
        pred = np.copy(out)
        for i in range(pred.shape[0]):
            pred[i] = np.exp(pred[i]) / np.sum(np.exp(pred[i]))
        pred_token = np.argmax(pred, axis=1)[start_index-1]
        tokens[start_index] = pred_token
        start_index+=1
    pred_string = ''.join([idx_to_char[token] for token in tokens if idx_to_char[token] != '='])
    return pred_string

In [8]:
bs, hidden_size = 32, 512
model = build_model(num_char,hidden_size,lr = 1e-4,batch_size = bs)

In [9]:
iter_count = 0
num_epoches = 500

standard training loop with gradient accumulation to simulate batched input

In [10]:
pred_string = ''
for i in range(num_epoches):
    for sonnet in sonnets:
        tokens, labels = tokenize_and_label(sonnet, max_len, num_char)
        
        output = model(tokens)
        
        loss = model.loss(labels)
        
        clear_output(wait=True)
        print(f'epoch: {iter_count//len(sonnets)}\t iter: {model.head.update_iter}\t loss: {loss}')
        print(pred_string)
            
        if iter_count % (1*bs) == 0:
            pred_string = predict(model,max_len)
        
        model.backward()
        if (iter_count+1)%bs ==0:
            model.update()
        
        iter_count+=1

epoch: 24	 iter: 1208	 loss: 2.5494861344586384
a the the the the the the the the the the the the the the t 


KeyboardInterrupt: 