## Implementing LSTM from Scratch without any ML Libraries

In [1]:
import numpy as np
from tqdm import tqdm

- ## Getting Data

- One Hot Encoding is a method of converting categorical data into a binary matrix (0s and 1s), where each category is represented by a vector with all zeros except for a 1 in the position corresponding to that category.

In [2]:
with open("./data.txt", 'r', encoding='utf-8') as f:
    data = f.read()
    
data = data.lower()

#Get the number of characters in our data
chars = set(data)

data_size, char_size = len(data), len(chars)
print(f"Data Size:-{data_size}, Char Size:-{char_size}")

#Create 2 dictionaries to get character to index and vice-versa for One-Hot Encoding.
char_to_idx = {c:i for i,c in enumerate(chars)}
idx_to_char = {i:c for i,c in enumerate(chars)}


#X_train & Y_train are our training data and labels. 
#For every character in X_train there is exactly one character in Y_train.
X_train, Y_train = data[:-1], data[1:]


        
        
        

Data Size:-901, Char Size:-35


## Defining Functions

- Xavier Normalized Initialization (also known as Glorot Initialization) is a weight initialization technique designed to keep the scale of the gradients roughly the same across all layers in a deep neural network.
- Helps prevent vanishing/exploding gradients at initialization.

In [32]:
#==============================Helper Function=============================

def one_hot_encoder(text):
    output = np.zeros((char_size, 1))
    
    output[char_to_idx[text]] = 1
    
    return output

#Xavier Normalized Initialization
def initWeights(input_size, output_size):
    output = np.random.uniform(-1,1, (output_size, input_size)) * np.sqrt(6/(input_size + output_size))
    
    return output

#==========================Activation Functions===============================
def sigmoid_activation(input, derivative = False):
    #The sigmoid derivative is used to propagate error backwards through the network and update the weights.
    #When training, you don’t just use the sigmoid function; you also differentiate it to compute gradients.
    #So,the sigmoid derivative appears in the backpropagation step, not in the forward pass.
    
    if derivative:
        return input * (1-input)
    
    output = 1 / (1 + np.exp(-input))
    
    return output   
    
def tanh_activation(input, derivative=False):
    if derivative:
        return 1 - (input)**2
    
    output = np.tanh(input)
    return output
    

def softmax_activation(input, derivatice=False):
    output = np.exp(input) / np.sum(np.exp(input))
    return output


## Main LSTM Class

In [46]:
class LSTM:
    def __init__(self, input_size, output_size, hidden_size, num_epochs, learning_rate):
        #Hyperparameters
        self.learning_rate = learning_rate
        self.hidden_size = hidden_size
        self.num_epochs = num_epochs
        
        #Forget Gate weight & bias matrix
        self.weight_forget = initWeights(input_size, hidden_size)
        self.bias_forget = np.zeros((hidden_size,1))
        
        #Input Gate weight & bias matrix
        self.weight_input = initWeights(input_size, hidden_size)
        self.bias_input = np.zeros((hidden_size,1))
        
        #Candidate Gate weight & bias matrix
        self.weight_candidate = initWeights(input_size, hidden_size)
        self.bias_candidate = np.zeros((hidden_size,1))
        
        #Output Gate weight & bias matrix
        self.weight_output = initWeights(input_size, hidden_size)
        self.bias_output = np.zeros((hidden_size,1))
        
        #Final Gate weight & bias matrix
        self.weight_final = initWeights(hidden_size, output_size)
        self.bias_final = np.zeros((output_size,1))
        
        
    #========Reset Network Memory Function====================
    '''
    - Reset Memory Function is used to prevent the network from using information it shouldn’t have, the network’s memory must be wiped before each forward propagation
    - Wiping the network memory prevents the network from cheating but not from learning
    '''
    def reset_memory(self):
        self.concat_inputs = {}
        
        #The network needs hidden_state and cell_state when it starts even though there is not any information to recall from the past.
        #So we initialize hidden_state and cell_state with zeros.
        self.hidden_state = {-1:np.zeros((self.hidden_size, 1))}
        self.cell_state = {-1:np.zeros((self.hidden_size, 1))}
        
        self.activation_outputs = {}
        self.forget_gates = {}
        self.candidate_gates = {}
        self.output_gates = {}
        self.input_gates = {}
        self.output = {}
        
        
    #================Forward Function==========================
    """
    This function will reset the memory and will perform each of the described computations
    """
    def forward(self,inputs):
        self.reset_memory()
        
        outputs = []
        for q in range(len(inputs)):
            self.concat_inputs[q] = np.concatenate((self.hidden_state[q-1], inputs[q]))
            
            self.forget_gates[q] = sigmoid_activation(np.dot(self.weight_forget , self.concat_inputs[q]) + self.bias_forget)
            self.input_gates[q] = sigmoid_activation(np.dot(self.weight_input, self.concat_inputs[q]) + self.bias_input)
            self.candidate_gates[q] = tanh_activation(np.dot(self.weight_candidate, self.concat_inputs[q])+ self.bias_candidate)
            self.output_gates[q] = sigmoid_activation(np.dot(self.weight_output , self.concat_inputs[q]) + self.bias_output)
            
            
            self.cell_state[q] = self.forget_gates[q] * self.cell_state[q -1] + self.input_gates[q] * self.candidate_gates[q]
            self.hidden_state[q] = self.output_gates[q] * tanh_activation(self.cell_state[q])
            
            outputs += [np.dot(self.weight_final, self.hidden_state[q])+ self.bias_final]
            
        return outputs
                
    #====================Backpropagation Function======================
    """
    We implement BPTT(Backpropagation Through Time) which means that the error for each weight and bias is the sum of its error through time
    We will update the weights and biases of the network using the errors found
    """    
    def bptt(self, errors,inputs):
        derivative_weight_forget, derivative_bias_forget = 0,0
        derivative_weight_input, derivative_bias_input = 0,0
        derivative_weight_output, derivative_bias_output = 0,0
        derivative_weight_candidate, derivative_bias_candidate = 0,0
        derivative_weight_final, derivative_bias_final = 0,0
        
        #zeros_like():- will create an array of zeros with the same shape and type as the given a given array. 
        derivative_hidden_next, derivative_cell_next = np.zeros_like(self.hidden_state[0]), np.zeros_like(self.cell_state[0])
        
        for q in reversed(range(len(inputs))):
            error = errors[q]
            
            #Final Gate weights and biases errors computation
            derivative_weight_final += np.dot(error, self.hidden_state[q].T)
            derivative_bias_final += error
            
            #Hidden Error Computation
            derivative_hidden_state = np.dot(self.weight_final.T, error) + derivative_hidden_next
            
            #Output Gate weights and biases error computation
            d_o = tanh_activation(self.cell_state[q]) * derivative_hidden_state * sigmoid_activation(self.output_gates[q] , derivative=True)
            derivative_weight_output += np.dot(d_o , inputs[q].T)
            derivative_bias_output += d_o
            
            #Cell State Error computation
            derivative_cell_state = tanh_activation(tanh_activation(self.cell_state[q]), derivative=True) * self.output_gates[q] * derivative_hidden_state + derivative_cell_next
            
            #Forget Gate weights and biases error computation
            d_f = derivative_cell_state * self.cell_state[q-1] * sigmoid_activation(self.forget_gates[q], derivative=True)
            derivative_weight_forget += np.dot(d_f, inputs[q].T)
            derivative_bias_forget += d_f
            
            #Input Gate weights and biases error computation
            d_i = derivative_cell_state * self.candidate_gates[q] * sigmoid_activation(self.input_gates[q] , derivative= True)
            derivative_weight_input += np.dot(d_i, inputs[q].T)
            derivative_bias_input += d_i
            
            #Candidate Gate weights and biases error computation
            d_c = derivative_cell_state * self.input_gates[q] * tanh_activation(self.candidate_gates[q] , derivative=True)
            derivative_weight_candidate += np.dot(d_c , inputs[q].T)
            derivative_bias_candidate += d_c
            
            #Sum of error at Each State (Concatenated Input error)
            d_z = np.dot(self.weight_forget.T, d_f) + np.dot(self.weight_input.T , d_i) + np.dot(self.weight_candidate.T , d_c) + np.dot(self.weight_output.T , d_o)
            
            #Error of hidden_state and cell_state at Next Time Step
            derivative_hidden_next = d_z[:self.hidden_size, :]
            derivative_cell_next = self.forget_gates[q] * derivative_cell_state
            
        for derivative_ in (derivative_weight_forget, derivative_bias_forget,derivative_weight_input, derivative_bias_input, derivative_weight_candidate, derivative_bias_candidate , derivative_weight_output, derivative_bias_output, derivative_weight_final, derivative_bias_final):
            #np.clip():- A function which clips any value larger than 1 to 1 and smaller than -1 to -1.
            np.clip(derivative_ , -1, 1,out=derivative_)
            
        self.weight_forget += derivative_weight_forget * self.learning_rate
        self.bias_forget += derivative_bias_forget * self.learning_rate
        
        self.weight_input += derivative_weight_input * self.learning_rate
        self.bias_input += derivative_bias_input * self.learning_rate
        
        self.weight_output += derivative_weight_output * self.learning_rate
        self.bias_output += derivative_bias_output * self.learning_rate
            
        self.weight_candidate += derivative_weight_candidate * self.learning_rate
        self.bias_candidate += derivative_bias_candidate * self.learning_rate
        
        self.weight_final += derivative_weight_final * self.learning_rate
        self.bias_final += derivative_bias_final * self.learning_rate
        
    #======================Train Function=====================================
    """ 
    Train function One_Hot Encodes each input and uses forward propagation to obtain the probability of each output being correct.
    The error is calculated so that the prediction plus the error is equal to the desired output or label.
    """
    
    def train(self, inputs, labels):
        
        #One_hot Encode each of the input 
        inputs = [one_hot_encoder(input) for input in inputs]
        
        for _ in tqdm(range(self.num_epochs)):
            predictions = self.forward(inputs)
            
            errors = []
            
            for q in range(len(predictions)):
                errors += [-softmax_activation(predictions[q])]
                
                
                errors[-1][char_to_idx[labels[q]]] += 1
                
            #Backpropagate the errors
            self.bptt(errors, self.concat_inputs)
            
    #=============================Test Function===================================
    """ 
    The test function forward propagates the data to obtain the networks probabilities of each character being correct.
    """
    def test(self,inputs, labels):
        accuracy = 0
        probabilities = self.forward([one_hot_encoder(input) for input in inputs])
        
        output =''
        for q in range(len(labels)):
            prediction = idx_to_char[np.random.choice([*range(char_size)], p = softmax_activation(probabilities[q].reshape(-1)))]
            
            output += prediction
            
            if prediction == labels[q]:
                accuracy += 1
                
        print(f"Original Text:-\n{labels}\n")
        print(f"Predicted Text:- \n{" ".join(output)}\n")
        
        print(f"Accuracy: {round(accuracy * 100 / len(inputs), 2)}%")
        
        

### Using the LSTM class on our data

In [47]:
hidden_size = 25

lstm = LSTM(input_size= char_size + hidden_size, output_size=char_size ,hidden_size=hidden_size, num_epochs=1_000, learning_rate=0.05)

#Training 
lstm.train(X_train, Y_train)

#Test
lstm.test(X_train, Y_train)

100%|██████████| 1000/1000 [02:02<00:00,  8.18it/s]

Original Text:-
""to be, or not to be, that is the question: whether \
'tis nobler in the mind to suffer the slings and arrows of ou\
trageous fortune, or to take arms against a sea of troubles a\
nd by opposing end them. to die—to sleep, no more; and by a s\
leep to say we end the heart-ache and the thousand natural sh\
ocks that flesh is heir to: 'tis a consummation devoutly to b\
e wish'd. to die, to sleep; to sleep, perchance to dream—ay, \
there's the rub: for in that sleep of death what dreams may c\
ome, when we have shuffled off this mortal coil, must give us\
 pause—there's the respect that makes calamity of so long lif\
e. for who would bear the whips and scorns of time, th'oppres\
sor's wrong, the proud man's contumely, the pangs of dispriz'\
d love, the law's delay, the insolence of office, and the spu\
rns that patient merit of th'unworthy takes, when he himself \
might his quietus make""".

Predicted Text:- 
" " t o   b e ,   o r   n o t   t o   b e ,   t h a t   i s   t 


