In [1]:
import numpy as np
import random
import matplotlib.pyplot as plt
from time import time

In [46]:
class LSTM():
    def __init__(self, 
                 input_shape, 
                 output_shape,
                 o_activation_function,
                 i_activation_function,
                 f_activation_function,
                 c_new_activation_function,
                 h_activation_function,
                 initial_hidden_state,
                 initial_cell_state
                ):
        """
        Input for a model x is a list of samples (x = [sample1, sample2, ...]) where each sample is a list of inputs (sample1 = [input1, input2, ...])
        and each input is a list of features (input1 = [feature1, feature2, ...]). input_shape is length of each input input1, input2, ... . 
        """
        self.input_shape = input_shape
        self.output_shape = output_shape
        
        self.weights_o = [list(np.random.rand(input_shape)) + list(np.random.rand(output_shape)) for i in range(output_shape)]
        self.weights_i = [list(np.random.rand(input_shape)) + list(np.random.rand(output_shape)) for i in range(output_shape)]
        self.weights_f = [list(np.random.rand(input_shape)) + list(np.random.rand(output_shape)) for i in range(output_shape)]
        self.weights_c_new = [list(np.random.rand(input_shape)) + list(np.random.rand(output_shape)) for i in range(output_shape)]
        
        self.biases_o = list(np.random.rand(input_shape)) + list(np.random.rand(output_shape))
        self.biases_i = list(np.random.rand(input_shape)) + list(np.random.rand(output_shape))
        self.biases_f = list(np.random.rand(input_shape)) + list(np.random.rand(output_shape))
        self.biases_c_new = list(np.random.rand(input_shape)) + list(np.random.rand(output_shape))
        
        self.o_activation_function = o_activation_function
        self.i_activation_function = i_activation_function
        self.f_activation_function = f_activation_function
        self.c_new_activation_function = c_new_activation_function
        self.h_activation_function = h_activation_function
        
        # initial_hidden_state and initial_cell_state must have shape = self.output_shape
        self.initial_hidden_state = initial_hidden_state
        self.initial_cell_state = initial_cell_state
        
        
    def predict(self, x, return_hidden_states = False):
        """
        x is a list of samples (x = [sample1, sample2, ...]) where each sample is a list of inputs (sample1 = [input1, input2, ...])
        and each input is a list of features (input1 = [feature1, feature2, ...]).
        If return_hidden_states = True then this function except for prediction returns also all hidden states values and
        cell states which are needed to calculate derivatives.
        """
            
        hidden_states = []
        cell_states = []
        for sample_number in range(len(x)):
            hidden_states.append([])
            cell_states.append([])
            for time_step in range(len(x[sample_number])):
                if time_step == 0:
                    o = self.o_activation_function(np.dot(self.weights_o, np.concatenate([x[sample_number][time_step], self.initial_hidden_state])))
                    i = self.i_activation_function(np.dot(self.weights_i, np.concatenate([x[sample_number][time_step], self.initial_hidden_state])))
                    f = self.f_activation_function(np.dot(self.weights_f, np.concatenate([x[sample_number][time_step], self.initial_hidden_state])))
                    c_new = self.c_new_activation_function(np.dot(self.weights_c_new, np.concatenate([x[sample_number][time_step], self.initial_hidden_state])))

                    cell_states[sample_number].append(np.add(np.multiply(f, self.initial_cell_state), np.multiply(i, c_new)))
                else:
                    o = self.o_activation_function(np.dot(self.weights_o, np.concatenate([x[sample_number][time_step], hidden_states[sample_number][time_step - 1]])))
                    i = self.i_activation_function(np.dot(self.weights_i, np.concatenate([x[sample_number][time_step], hidden_states[sample_number][time_step - 1]])))
                    f = self.f_activation_function(np.dot(self.weights_f, np.concatenate([x[sample_number][time_step], hidden_states[sample_number][time_step - 1]])))
                    c_new = self.c_new_activation_function(np.dot(self.weights_c_new, np.concatenate([x[sample_number][time_step], hidden_states[sample_number][time_step - 1]])))

                    cell_states[sample_number].append(np.add(np.multiply(f, cell_states[sample_number][time_step - 1]), np.multiply(i, c_new)))

                hidden_states[sample_number].append(np.multiply(o, self.h_activation_function(cell_states[sample_number][time_step])))
            
        # hidden states for last time step are predictions
        predictions = [hidden_states[i][-1] for i in range(len(x))]
        if return_hidden_states:
            return predictions, hidden_states, cell_states
        else:
            return predictions
        
        
    def dh_dw(self,
              x,
              cell_states,
              hidden_states,
              time_step,
              weight_mark,
              weight_row,
              weight_column
             ):
        """
        Calculating a derivative of hidden statte where input for a model is equal to x, 
        for time step = time_step w.r.t weight.
        Argument weight_mark can be set to one of the values 'o', 'i', 'f' and 'c_new', and it indicates for which function this weight is (o, i, f or c_new). 
        Arguments weight_row and weight_column indicates w.r.t which weight we calculate a derivative. If for example weight_mark = 'i' then we calculate
        a derivative w.r.t weight = self.weights_i[weight_row][weight_column].
        Argument hidden_states is a list of hidden states when input for a model was equal to x.
        Argument cell_states is a list of cell states when input for a model was equal to x.
        """
        
        
        dh_dw = self.four_func_deriv(x = x,
                                     hidden_states = hidden_states,
                                     cell_states = cell_states,
                                     time_step = time_step,
                                     function_mark = 'o',
                                     weight_mark = weight_mark,
                                     weight_row = weight_row,
                                     weight_column = weight_column
                                    )
        dh_dw = np.multiply(dh_dw, self.h_activation_function(cell_states[time_step], deriv = True))
        dh_dw = np.multiply(dh_dw, self.dc_dw(x = x,
                                              hidden_states = hidden_states,
                                              cell_states = cell_states,
                                              time_step = time_step,
                                              weight_mark = weight_mark,
                                              weight_row = weight_row,
                                              weight_column = weight_column
                                            )
                           )
        
        return dh_dw
    
    
    def dc_dw(self,
              x,
              hidden_states,
              cell_states,
              time_step,
              weight_mark,
              weight_row,
              weight_column,
             ):
        """
        Calculating a derivative of c function where input for a model is equal to x, 
        for time step = time_step w.r.t weight.
        Argument weight_mark can be set to one of the values 'o', 'i', 'f' and 'c_new', and it indicates for which function this weight is (o, i, f or c_new). 
        Arguments weight_row and weight_column indicates w.r.t which weight we calculate a derivative. If for example weight_mark = 'i' then we calculate
        a derivative w.r.t weight = self.weights_i[weight_row][weight_column].
        Argument hidden_states is a list of hidden states when input for a model was equal to x.
        Argument cell_states is a list of cell states when input for a model was equal to x.
        """
        
        sum_element1 = self.four_func_deriv(x = x,
                                             hidden_states = hidden_states,
                                             cell_states = cell_states,
                                             time_step = time_step,
                                             function_mark = 'f',
                                             weight_mark = weight_mark,
                                             weight_row = weight_row,
                                             weight_column = weight_column
                                            )
        if time_step > 0:
            sum_element1 = np.multiply(sum_element1, self.dc_dw(x = x,
                                                                hidden_states = hidden_states,
                                                                cell_states = cell_states,
                                                                time_step = time_step - 1,
                                                                weight_mark = weight_mark,
                                                                weight_row = weight_row,
                                                                weight_column = weight_column
                                                               )
                                      )
        else:
            sum_element1 = np.multiply(sum_element1, self.initial_cell_state)
            
        sum_element2 = self.four_func_deriv(x = x,
                                              hidden_states = hidden_states,
                                              cell_states = cell_states,
                                              time_step = time_step,
                                              function_mark = 'i',
                                              weight_mark = weight_mark,
                                              weight_row = weight_row,
                                              weight_column = weight_column
                                             )
        sum_element2 = np.multiply(sum_element2, self.four_func_deriv(x = x,
                                                                      hidden_states = hidden_states,
                                                                      cell_states = cell_states,
                                                                      time_step = time_step,
                                                                      function_mark = 'c_new',
                                                                      weight_mark = weight_mark,
                                                                      weight_row = weight_row,
                                                                      weight_column = weight_column
                                                                     )
                                  )
        
        return np.add(sum_element1, sum_element2)
        
        
    def four_func_deriv(self,
                        x,
                        hidden_states,
                        cell_states,
                        time_step,
                        function_mark,
                        weight_mark,
                        weight_row,
                        weight_column
                       ):
        """
        Calculating a derivative of choosen function.
        Argument function_mark indicated which function we calculate derivative of (function_mark can be equal to 'o', 'i', 'f' or 'c_new') where input for a model is 
        equal to x for time step = time_step w.r.t weight.
        Argument weight_mark can be set to one of the values 'o', 'i', 'f' and 'c_new', and it indicates for which function this weight is (o, i, f or c_new). 
        Arguments weight_row and weight_column indicates w.r.t which weight we calculate a derivative. If for example weight_mark = 'i' then we calculate
        a derivative w.r.t weight = self.weights_i[weight_row][weight_column].
        Argument hidden_states is a list of hidden states when input for a model was equal to x.
        Argument cell_states is a list of cell states when input for a model was equal to x.
        """
        
        if function_mark == 'o':
            function = self.o_activation_function
            weights = self.weights_o
            biases = self.biases_o
        elif function_mark == 'i':
            function = self.i_activation_function
            weights = self.weights_i
            biases = self.biases_i
        elif function_mark == 'f':
            function = self.f_activation_function
            weights = self.weights_f
            biases = self.biases_f
        elif function_mark == 'c_new':
            function = self.c_new_activation_function
            weights = self.weights_c_new
            biases = self.biases_c_new
            
        derivative = []
        for element_number in range(self.output_shape):
            if weight_mark != function_mark:
                if time_step >= 1:
                    derivative_element = function(np.dot(weights[element_number], 
                                                         np.concatenate([hidden_states[time_step - 1], x[time_step]])
                                                        ) 
                                                  + biases[element_number], 
                                                  deriv = True
                                                 )
                    dh_dw = self.dh_dw(x = x,
                                       cell_states = cell_states,
                                       hidden_states = hidden_states,
                                       time_step = time_step - 1,
                                       weight_mark = weight_mark,
                                       weight_row = weight_row,
                                       weight_column = weight_column
                                      )
                    derivative_element *= np.sum(np.multiply(dh_dw, weights[element_number]))
                    derivative.append(derivative_element)
                elif time_step == 0:
                    derivative.append(0)
                else:
                    raise Exception('time_step argument must be > 0')

            else:
                if time_step >= 1 and weight_row == element_number:
                    derivative_element = function(np.dot(weights[element_number], 
                                                          np.concatenate([hidden_states[time_step - 1], x[time_step]])
                                                         ) 
                                                   + biases[element_number], 
                                                   deriv = True
                                                  )
                    dh_dw = self.dh_dw(x = x,
                                       cell_states = cell_states,
                                       hidden_states = hidden_states,
                                       time_step = time_step - 1,
                                       weight_mark = weight_mark,
                                       weight_row = weight_row,
                                       weight_column = weight_column
                                      )
                    
                    multiplied_vectors = np.multiply(dh_dw, weights[element_number])
                    if weight_column < self.output_shape:
                        multiplied_vectors[weight_column] += hidden_states[time_step - 1][weight_column]
                    else:
                        multiplied_vectors = np.append(multiplied_vectors, np.concatenate([hidden_states[time_step - 1], x[time_step]])[weight_column])
                   
                    derivative_element *= np.sum(multiplied_vectors)             
                    derivative.append(derivative_element)

                elif time_step >= 1 and weight_row != element_number:
                    derivative_element = function(np.dot(weights[element_number], 
                                                          np.concatenate([hidden_states[time_step - 1], x[time_step]])
                                                         ) 
                                                   + biases[element_number], 
                                                   deriv = True
                                                  )
                    dh_dw = self.dh_dw(x = x,
                                         cell_states = cell_states,
                                         hidden_states = hidden_states,
                                         time_step = time_step - 1,
                                         weight_mark = weight_mark,
                                         weight_row = weight_row,
                                         weight_column = weight_column
                                        )
                    derivative_element *= np.sum(np.multiply(dh_dw, weights[element_number]))
                    derivative.append(derivative_element)
                    
                elif time_step == 0 and weight_row == element_number:
                    derivative_element = function(np.dot(weights[element_number], 
                                                          np.concatenate([self.initial_hidden_state, x[time_step]])
                                                         ) 
                                                   + biases[element_number], 
                                                   deriv = True
                                                  )
                    derivative_element *= np.concatenate([self.initial_hidden_state, x[time_step]])[weight_column]
                    derivative.append(derivative_element)
                    
                else:
                    derivative.append(0)
                    
        return derivative

        
    def train(self, x_train, y_train, loss_function, epochs, batch_size, learning_rate):
        for epoch in range(epochs):
            sample_number = 0
            while sample_number < len(x_train):
                # we will use weight_mark, weight_row and weight_column to select weight w.r.t which we will
                # calculate derivative of a loss function and which we will update
                for weight_mark in ['i', 'o', 'f', 'c_new']:
                    for weight_row in range(self.output_shape):
                        for weight_column in range(self.input_shape + self.output_shape):
                            # derivative of a loss function w.r.t weight
                            dl_dw = 0
                            i = 0
                            # for the first iteration and after updating a weight we need to calculate predictions, hidden_states and cell states
                            predictions, hidden_states, cell_states = self.predict(x_train[sample_number + i : sample_number + i + batch_size], 
                                                                                   return_hidden_states = True
                                                                                  )
                            while i < batch_size and sample_number + i < len(x_train):
                                prediction_deriv = self.dh_dw(x = x_train[sample_number + i],
                                                              cell_states = cell_states[i],
                                                              hidden_states = hidden_states[i],
                                                              time_step = len(x_train[sample_number + i]) - 1,
                                                              weight_mark = weight_mark,
                                                              weight_row = weight_row,
                                                              weight_column = weight_column
                                                             )

                                dl_dw += loss_function(y_train[sample_number + i : sample_number + i + batch_size], 
                                                       predictions, 
                                                       prediction_deriv
                                                      )
                                i += 1

                            print('dl_dw: ', dl_dw)
                            # updating a weight
                            if weight_mark == 'i':
                                self.weights_i[weight_row][weight_column] -= dl_dw * learning_rate
                            elif weight_mark == 'o':
                                self.weights_o[weight_row][weight_column] -= dl_dw * learning_rate
                            elif weight_mark == 'f':
                                self.weights_f[weight_row][weight_column] -= dl_dw * learning_rate
                            elif weight_mark == 'c_new':
                                self.weights_c_new[weight_row][weight_column] -= dl_dw * learning_rate

                            i = 0
                        
                sample_number += i + 1

In [3]:
def sigmoid(x, deriv = False):
    x = np.array(x)
    if deriv:
        return (np.exp(-x)) / (1 + np.exp(-x)) ** (2)
    else:
        return 1.0 / (1.0 + np.exp(-x))

In [4]:
def tanh(x, deriv = False):
    if deriv:
        return 1 - tanh(x) ** 2
    else:
        return (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x))

In [5]:
def cross_entropy(real_value, prediction, prediction_deriv = None):
    """
    If prediction_deriv != None then this function returns a derivative of a cross entropy function. In other case
    it returns normal cross entropy value.
    prediction_deriv is a derivative of a model's prediction w.r.t variable w.r.t which we want to calculate
    a derivative of a cross entropy function. This derivative is calculated in point of actual value of the variable.
    """
    
    # real_value = [real_value for real_value in real_value]
    # prediction =  [prediction for prediction in prediction]
    
    if prediction_deriv != None:
        return -np.sum([real_value * prediction ** (-1) * prediction_deriv for real_value, prediction, prediction_deriv in zip(real_value, prediction, prediction_deriv)])
    else:
        return -np.sum([real_value * np.log(prediction) for real_value, prediction in zip(real_value, prediction)])

In [6]:
x_train = [[[1], [1]], [[2], [2]]]
y_train = [1,2]

In [47]:
model = LSTM(input_shape = 1, 
             output_shape = 1,
             o_activation_function = sigmoid,
             i_activation_function = sigmoid,
             f_activation_function = sigmoid,
             c_new_activation_function = tanh,
             h_activation_function = sigmoid,
             initial_hidden_state = [0],
             initial_cell_state = [0]
            )

In [48]:
model.train(x_train = x_train, 
            y_train = y_train, 
            loss_function = cross_entropy, 
            epochs = 50, 
            batch_size = 2, 
            learning_rate = 0.1)

dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_dw:  0.0
dl_d

In [45]:
model.predict(x_train)

[array([0.43953316]), array([0.59027391])]