<a href="https://colab.research.google.com/github/asafe-eduardo/colab/blob/master/rnn_from_scratch/lstm_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### IMPORTS

In [0]:
import numpy as np #for maths
import pandas as pd #for data manipulation
import matplotlib.pyplot as plt #for visualization

In [0]:
#data
path = 'NationalNames.csv'
data = pd.read_csv(path)

data = np.array(data['Name'][:10000]).reshape(-1, 1)

data = [x.lower() for x in data[:,0]]

data = np.array(data).reshape(-1, 1)

In [207]:
print("Data Shape = {}".format(data.shape) + "\n")
print("Lets see some names: ")
print(data[1:10])

Data Shape = (10000, 1)

Lets see some names: 
[['anna']
 ['emma']
 ['elizabeth']
 ['minnie']
 ['margaret']
 ['ida']
 ['alice']
 ['bertha']
 ['sarah']]


In [0]:
transform_data = np.copy(data)

max_length = 0
for index in range(len(data)):
  max_length = max(max_length, len(data[index, 0]))
  
for index in range(len(data)):
  length = (max_length - len(data[index, 0]))
  string = '.'*length
  transform_data[index,0] = '.'.join([transform_data[index, 0], string])

In [209]:
print("Transformed Data")
print(transform_data[1:10])

Transformed Data
[['anna........']
 ['emma........']
 ['elizabeth...']
 ['minnie......']
 ['margaret....']
 ['ida.........']
 ['alice.......']
 ['bertha......']
 ['sarah.......']]


In [210]:
vocab = list()
for name in transform_data[:,0]:
  vocab.extend(list(name))
  
vocab = set(vocab)
vocab_size = len(vocab)

print("Vocab size = {}".format(len(vocab)))
print("Vocab      = {}".format(vocab))

Vocab size = 27
Vocab      = set(['.', 'a', 'c', 'b', 'e', 'd', 'g', 'f', 'i', 'h', 'k', 'j', 'm', 'l', 'o', 'n', 'q', 'p', 's', 'r', 'u', 't', 'w', 'v', 'y', 'x', 'z'])


In [211]:
char_id = dict()
id_char = dict()

for i, char in enumerate(vocab):
  char_id[char] = i
  id_char[i] = char
  
print('a-{}, 22-{}'.format(char_id['a'], id_char[22]))

a-1, 22-w


In [0]:
train_dataset = []

batch_size = 20

for i in range(len(transform_data) - batch_size+1):
  start = i*batch_size
  end = start+batch_size
  
  batch_data = transform_data[start:end]
  
  if(len(batch_data) != batch_size):
    break
  
  char_list = []
  
  for k in range(len(batch_data[0][0])):
    batch_dataset = np.zeros([batch_size, len(vocab)])
    for j in range(batch_size):
      name = batch_data[j][0]
      char_index = char_id[name[k]]
      batch_dataset[j, char_index] = 1.0
      
    char_list.append(batch_dataset)
    
  train_dataset.append(char_list)

In [0]:
input_units = 100

hidden_units = 256

output_units = vocab_size

learning_rate = 0.005

beta1 = 0.90
beta2 = 0.99

In [0]:
def sigmoid(X):
  return 1/(1+np.exp(-X))

def tanh_activation(X):
  return np.tanh(X)

def softmax(X):
  exp_X = np.exp(X)
  exp_X_sum = np.sum(exp_X, axis=1).reshape(-1, 1)
  exp_X = exp_X/exp_X_sum
  return exp_X

def tanh_derivative(X):
  return 1-(X**2)

In [0]:
def initialize_parameters():
  mean = 0
  std = 0.01
  
  forget_gate_weights = np.random.normal(mean, std, (input_units + hidden_units, hidden_units))
  input_gate_weights = np.random.normal(mean, std, (input_units + hidden_units, hidden_units))
  output_gate_weights = np.random.normal(mean, std, (input_units + hidden_units, hidden_units))
  gate_gate_weights = np.random.normal(mean, std, (input_units + hidden_units, hidden_units))
  
  hidden_output_weights = np.random.normal(mean, std, (hidden_units, output_units))
  
  parameters = dict()
  parameters['fgw'] = forget_gate_weights
  parameters['igw'] = input_gate_weights
  parameters['ogw'] = output_gate_weights
  parameters['ggw'] = gate_gate_weights
  parameters['how'] = hidden_output_weights
  
  return parameters

In [0]:
def lstm_cell(batch_dataset, prev_activation_matrix, prev_cell_matrix, parameters):
  fgw = parameters['fgw']
  igw = parameters['igw']
  ogw = parameters['ogw']
  ggw = parameters['ggw']
  
  concat_dataset = np.concatenate((batch_dataset, prev_activation_matrix), axis=1)
  
  fa = np.matmul(concat_dataset, fgw)
  fa = sigmoid(fa)
  
  ia = np.matmul(concat_dataset, igw)
  ia = sigmoid(ia)
  
  oa = np.matmul(concat_dataset, ogw)
  oa = sigmoid(oa)
  
  ga = np.matmul(concat_dataset, ggw)
  ga = tahn_activation(ga)
  
  cell_memory_matrix = np.multiply(fa,prev_cell_matrix) + np.multiply(ia, ga)
  
  activation_matrix = np.multiply(oa, tanh_activation(cell_memory_matrix))
  
  lstm_activation = dict()
  lstm_activation['fa'] = fa
  lstm_activation['ia'] = ia
  lstm_activation['oa'] = oa
  lstm_activation['ga'] = ga
  
  return lstm_activation,cell_memory_matrix,activation_matrix

In [0]:
def output_cell(activation_matrix, parameters):
  how = parameters['how']
  
  output_matrix = np.matmul(activation_matrix, how)
  output_matrix = softmax(output_matrix)
  
  return output_matrix

In [0]:
def get_embeddings(batch_dataset, embeddings):
  embedding_dataset = np.matmul(batch_daset, embeddings)
  return embedding_dataset

In [0]:
def forward_propagation(batches, parameters, embeddings):
  batch_size = batches[0].shape[0]
  
  lstm_cache = dict()
  activation_cache = dict()
  cell_cache = dict()
  output_cache = dict()
  embedding_cache = dict()
  
  a0 = np.zeros([batch_size, hidden_units], dtype=np.float32)
  c0 = np.zeros([batch_size, hidden_units], dtype=np.float32)
  
  actiation_cache['a0'] = a0
  cell_cache['c0'] = c0
  
  for i in range(len(batches) - 1):
    batch_dataset = batches[i]
    
    batch_dataset = get_embeddings(batch_dataset, embeddings)
    embedding_cache['emb' + str(i)] = batch_dataset
    
    lstm_activation, ct, at = lstm_cell(batch_dataset, a0, c0, parameters)
    
    ot = output_cell(at, parameters)
    
    lstm_cache['lstm' + str(i+1)] = lstm_activations
    activation_cache['a'+str(i+1)] = at
    cell_cache['c' + str(i+1)] = ct
    output_cache['o'+str(i+1)] = ot
    
    a0 = at
    c0 = ct
    
    return embedding,cache, lst_cache, activation_cache, cell_cache, output_cache

In [0]:
def cal_loss_accuracy(batch_labels, output_cache):
  loss = 0
  acc = 0
  prob = 1
  
  batch_size = batch_labels[0].shape[0]
  
  for i in range(1, len(output_cache) + 1):
    labels = batch_labels[i]
    pred = output_cache['0'+str(i)]
    
    prob = np.multiply(prob, np.sum(np.multiply(labels, pred), axis=1).reshape(-1, 1))
    loss += np.sum((np.multiply(labels, np.log(pred)) + np.multiply(1 - labels, np.log(1-pred))), axis=1).reshape
    acc += np.array(np.argmax(labels, 1) == np.argmax(pred, 1), dtype=np.float32).reshape(-1, 1)
    
    perplexity = np.sum((1/prob) ** (1/len(output_cache)))/batch_size
    loss = np.sum(loss)*np(-1/batch_size)
    acc = np.sum(acc)/(batch_size)
    acc = acc/len(output_cache)
    return perplexity, loss, acc

In [0]:
def calculate_output_cell_error(batch_labels, output_cache, parameters):
  output_error_cache = dict()
  activation_error_cache = dict()
  how = parameters['how']
  
  for i in rnage(1, len(output_cache) +1):
    labels = batch_labels[i]
    pred = output_cache['o'+ str(i)]
    
    error_output = pred - labels
    
    error_activation = np.matmul(error_output, how.T)
    
    output_error_cache['eo'+str(i)] = error_output
    activation_error_cache['ea'+str(i)] = error_activation
    
  return output_error_cache,activation_error_cache

In [0]:
def calculate_single_lstm_cell_error(activation_output_error,next_activation_error,
                                     next_cell_error,parameters, lstm_activation, 
                                     cell_activation, prev_cell_activation):
  
  activation_error = activation_output_error + next_activation_error
  
  oa = lstm_activation['oa']
  eo = np.multiply(activation_error, tanh_activation(cell_activation))
  eo = np.multiply(np.multiply(eo,oa),1-oa)
  
  cell_error = np.multiply(activation_error,oa)
  cell_error = np.multiply(cell_error, tanh_derivate(tanh_activation(cell_activation)))
  cell_error += next_cell_error
  
  ia = lstm_activation['ia']
  ga = lstm_activation['ga']
  ei = np.multiply(cell_error, ga)
  ei = np.multiply(np.multiply(ei, ia), 1-ia)
  
  eg = np.multiply(cell_error, ia)
  eg = np.multiply(eg, tanh_derivative(ga))
  
  fa = lstm_activation['fa']
  ef = np.multiply(cell_error, prev_cell_activation)
  ef = np.multiply(np.multiply(ef, fa), 1-fa)
  
  prev_cell_error = np.multiply(cell_error, fa)
  
  fgw = parameters['fgw']
  igw = parameters['igw']
  ggw = parameters['ggw']
  ogw = parameters['ogw']
  
  embed_activation_error = np.matmul(ef,fgw.T)
  embed_activation_error = np.matmul(ei,igw.T)
  embed_activation_error = np.matmul(eo,ogw.T)
  embed_activation_error = np.matmul(eg,ggw.T)
  
  input_hidden_units = fgw.shape[0]
  hidden_units = fgw.shape[1]
  input_units = input_hidden_units - hidden_units
  
  prev_activation_error = embed_activation_error[:, :input_units]
  
  lstm_error = dict()
  lstm_error['ef'] = ef
  lstm_error['ei'] = ei
  lstm_error['eo'] = eo
  lstm_error['eg'] = eg
  
  return prev_activation_error, prev_cell_error, embed_error, lstm_error

In [0]:
def calculate_output_cell_derivatives(output_error_cache, activation_cache, parameters):
  dhow = np.zeros(parameters['how'].shape)
  
  batch_size = activation_cache['a1'].shape[0]
  
  for i in range(1, len(output_error_cache)+1):
    
    output_error = output_error_cache['eo' + str(i)]
    
    activation = activation_cache['a'+str(i)]
    
    dhow = np.matmul(activation.T, output_error) / batch_size
    
  return dhow

In [0]:
def calculate_single_lstm_cell_derivatives(lstm_error, embedding_matrix, activation_matrix):
  ef = lstm_error['ef']
  ei = lstm_error['ei']
  eo = lstm_error['eo']
  eg = lstm_error['eg']
  
  concat_matrix = np.concatenate((embedding_matrix, activation_matrix), axis=1)
  
  batch_size = embedding_matrix.shape[0]
  
  dfgw = np.matmul(concat_matrix.T, ef) / batch_size
  digw = np.matmul(concat_matrix.T, ei) / batch_size
  dogw = np.matmul(concat_matrix.T, eo) / batch_size
  dggw = np.matmul(concat_matrix.T, eg) / batch_size
  
  derivatives = dict()
  derivatives['dfgw'] = dfgw
  derivatives['digw'] = digw
  derivatives['dogw'] = dogw
  derivatives['dggw'] = dggw
  
  return derivatives

In [0]:
def backward_propagation(batch_labels,embedding_cache,lstm_cache,activation_cache,cell_cache,output_cache,parameters):
    output_error_cache,activation_error_cache = calculate_output_cell_error(batch_labels,output_cache,parameters)
    
    lstm_error_cache = dict()
    
    embedding_error_cache = dict()
    
    eat = np.zeros(activation_error_cache['ea1'].shape)
    ect = np.zeros(activation_error_cache['ea1'].shape)
    
    for i in range(len(lstm_cache),0,-1):
        pae,pce,ee,le = calculate_single_lstm_cell_error(activation_error_cache['ea'+str(i)],eat,ect,parameters,lstm_cache['lstm'+str(i)],cell_cache['c'+str(i)],cell_cache['c'+str(i-1)])
        
        lstm_error_cache['elstm'+str(i)] = le
        
        embedding_error_cache['eemb'+str(i-1)] = ee
        
        eat = pae
        ect = pce
    
    
    derivatives = dict()
    derivatives['dhow'] = calculate_output_cell_derivatives(output_error_cache,activation_cache,parameters)
    
    lstm_derivatives = dict()
    for i in range(1,len(lstm_error_cache)+1):
        lstm_derivatives['dlstm'+str(i)] = calculate_single_lstm_cell_derivatives(lstm_error_cache['elstm'+str(i)],embedding_cache['emb'+str(i-1)],activation_cache['a'+str(i-1)])
    
    derivatives['dfgw'] = np.zeros(parameters['fgw'].shape)
    derivatives['digw'] = np.zeros(parameters['igw'].shape)
    derivatives['dogw'] = np.zeros(parameters['ogw'].shape)
    derivatives['dggw'] = np.zeros(parameters['ggw'].shape)
    
    for i in range(1,len(lstm_error_cache)+1):
        derivatives['dfgw'] += lstm_derivatives['dlstm'+str(i)]['dfgw']
        derivatives['digw'] += lstm_derivatives['dlstm'+str(i)]['digw']
        derivatives['dogw'] += lstm_derivatives['dlstm'+str(i)]['dogw']
        derivatives['dggw'] += lstm_derivatives['dlstm'+str(i)]['dggw']
    
    return derivatives,embedding_error_cache

In [0]:
def update_parameters(parameters,derivatives,V,S,t):
    dfgw = derivatives['dfgw']
    digw = derivatives['digw']
    dogw = derivatives['dogw']
    dggw = derivatives['dggw']
    dhow = derivatives['dhow']
    
    fgw = parameters['fgw']
    igw = parameters['igw']
    ogw = parameters['ogw']
    ggw = parameters['ggw']
    how = parameters['how']
    
    vfgw = V['vfgw']
    vigw = V['vigw']
    vogw = V['vogw']
    vggw = V['vggw']
    vhow = V['vhow']
    
    sfgw = S['sfgw']
    sigw = S['sigw']
    sogw = S['sogw']
    sggw = S['sggw']
    show = S['show']
    
    vfgw = (beta1*vfgw + (1-beta1)*dfgw)
    vigw = (beta1*vigw + (1-beta1)*digw)
    vogw = (beta1*vogw + (1-beta1)*dogw)
    vggw = (beta1*vggw + (1-beta1)*dggw)
    vhow = (beta1*vhow + (1-beta1)*dhow)
    
    sfgw = (beta2*sfgw + (1-beta2)*(dfgw**2))
    sigw = (beta2*sigw + (1-beta2)*(digw**2))
    sogw = (beta2*sogw + (1-beta2)*(dogw**2))
    sggw = (beta2*sggw + (1-beta2)*(dggw**2))
    show = (beta2*show + (1-beta2)*(dhow**2))
    
    fgw = fgw - learning_rate*((vfgw)/(np.sqrt(sfgw) + 1e-6))
    igw = igw - learning_rate*((vigw)/(np.sqrt(sigw) + 1e-6))
    ogw = ogw - learning_rate*((vogw)/(np.sqrt(sogw) + 1e-6))
    ggw = ggw - learning_rate*((vggw)/(np.sqrt(sggw) + 1e-6))
    how = how - learning_rate*((vhow)/(np.sqrt(show) + 1e-6))
    
    parameters['fgw'] = fgw
    parameters['igw'] = igw
    parameters['ogw'] = ogw
    parameters['ggw'] = ggw
    parameters['how'] = how
    
    V['vfgw'] = vfgw 
    V['vigw'] = vigw 
    V['vogw'] = vogw 
    V['vggw'] = vggw
    V['vhow'] = vhow
    
    S['sfgw'] = sfgw 
    S['sigw'] = sigw 
    S['sogw'] = sogw 
    S['sggw'] = sggw
    S['show'] = show
    
    return parameters,V,S    


In [0]:
def update_embedding(embedings, embedding_error_cache, batch_labels):
  embedding_derivatives = np.zeros(embeddings.shape)
  
  batch_size = batch_labels[0].shape[0]
  
  for i in range(len(embedding_error_cache)):
    embedding_derivatives += np.matmul(batch_labels[i].T, embedding_error_cache['eemb'+str(i)])/batch_size
  embeddings = embeddings - learning_rate*embedding_derivatives
  return embeddings

In [0]:
def initialize_V(parameters):
  Vfgw = np.zeros(parameters['fgw'].shape)
  Vigw = np.zeros(parameters['igw'].shape)
  Vogw = np.zeros(parameters['ogw'].shape)
  Vggw = np.zeros(parameters['ggw'].shape)
  Vhow = np.zeros(parameters['how'].shape)
  
  
  V = dict()
  V['vfgw'] = Vfgw
  V['vigw'] = Vigw
  V['vogw'] = Vogw
  V['vggw'] = Vggw
  V['vhow'] = Vhow
  
  return V

def initialize_S(parameters):
  Sfgw = np.zeros(parameters['fgw'].shape)
  Sigw = np.zeros(parameters['igw'].shape)
  Sogw = np.zeros(parameters['ogw'].shape)
  Sggw = np.zeros(parameters['ggw'].shape)
  Show = np.zeros(parameters['how'].shape)
  
  S = dict()
  S['sfgw'] = Sfgw
  S['sigw'] = Sigw
  S['sogw'] = Sogw
  S['sggw'] = Sggw
  S['show'] = Show
  
  return S


In [0]:
def train(train_dataset, iters=1000, batch_size=20):
  parameters = initialize_parameters()
  
  V = initialize_V(parameters)
  S = initialize_S(parameters)
  
  embeddings = np.random.normal(0, 0.01, (len(vocab), input_units))
  
  J = []
  P = []
  A = []
  
  for step in range(iters):
    index = step%len(train_dataset)
    batches = train_dataset[index]
    
    embedding_cache,lstm_cache,activation_cache,cell_cache,output_cache = forward_propagation(batches, parameters, embeddings)
    
    perplexity, loss, acc = cal_loss_accuracy(batches, output_cache)
    
    derivatives, embedding, error_cache = backward_propagation(batches, embedding_cache, lstm_cache, activation_cache,cell_cache,output_cache,parameters)
    
    parameters, V, S = update_parameters(parameters, derivatives, V, S, step)
    
    embeddings = update_embeddings(embeddings, embedding_error_cache, batches)
    
    J.append(loss)
    P.append(perplexity)
    A.append(acc)
    
    if(step%1000==0):
      print("For Single Batch :")
      print('Step       ={}'.format(step))
      print('Loss       ={}'.format(round(loss,2)))
      print('Perplexity ={}'.format(round(perplexity,2)))
      print('Accuracy   ={}'.format(round(acc*100,2)))
      print('\n')
    
    return embeddings, parameters,J,P,A    

In [0]:
embeddings, parameters, J, P, A = train(train_dataset, iters=8001)

In [0]:
avg_loss = list()
avg_acc = list()
avg_perp = list()
i = 0
while(i<len(J)):
    avg_loss.append(np.mean(J[i:i+30]))
    avg_acc.append(np.mean(A[i:i+30]))
    avg_perp.append(np.mean(P[i:i+30]))
    i += 30

plt.plot(list(range(len(avg_loss))),avg_loss)
plt.xlabel("x")
plt.ylabel("Loss (Avg of 30 batches)")
plt.title("Loss Graph")
plt.show()

plt.plot(list(range(len(avg_perp))),avg_perp)
plt.xlabel("x")
plt.ylabel("Perplexity (Avg of 30 batches)")
plt.title("Perplexity Graph")
plt.show()

plt.plot(list(range(len(avg_acc))),avg_acc)
plt.xlabel("x")
plt.ylabel("Accuracy (Avg of 30 batches)")
plt.title("Accuracy Graph")
plt.show()    

In [0]:
def predict(parameters,embeddings,id_char,vocab_size):
    names = []
    
    for i in range(20):
        a0 = np.zeros([1,hidden_units],dtype=np.float32)
        c0 = np.zeros([1,hidden_units],dtype=np.float32)

        name = ''
        
        batch_dataset = np.zeros([1,vocab_size])
        
        index = np.random.randint(0,27,1)[0]
        
        batch_dataset[0,index] = 1.0
        
        name += id_char[index]
        
        char = id_char[index]
        
        while(char!='.'):
            batch_dataset = get_embeddings(batch_dataset,embeddings)

            lstm_activations,ct,at = lstm_cell(batch_dataset,a0,c0,parameters)

            ot = output_cell(at,parameters)
            
            pred = np.random.choice(27,1,p=ot[0])[0]
                
            name += id_char[pred]
            
            char = id_char[pred]
            
            batch_dataset = np.zeros([1,vocab_size])
            batch_dataset[0,pred] = 1.0

            a0 = at
            c0 = ct
            
        names.append(name)
        
    return names

In [0]:
predict(parameters,embeddings,id_char,vocab_size)

In [0]:
predict(parameters,embeddings,id_char,vocab_size)