Base for this notebook: https://colah.github.io/posts/2015-08-Understanding-LSTMs/ <br>
Got the derivatives from: https://medium.com/@aidangomez/let-s-do-this-f9b699de31d9
https://mmuratarat.github.io/2019-01-19/dimensions-of-lstm <br>
Blog Post used for debugging: https://medium.com/@aidangomez/let-s-do-this-f9b699de31d9

In [122]:
import numpy as np
import random as rd
from keras.datasets import imdb

np.random.seed(42)
rd.seed(42)


In [123]:
def sigmoid(n: float):
    return 1.0/(1.0+np.exp(-n))

def sigmoid_derivative(n: float):
    """Derivative of the sigmoid function."""
    return sigmoid(n)*(1-sigmoid(n))
    
def tanh(n: float):
    return np.tanh(n)
    

In [124]:
def padding(X, maxLen):
    for i in range(len(X)):
        if len(X[i]) > maxLen:
            X[i] = X[i][:maxLen]
        else:
            X[i] = np.pad(X[i], (0 ,maxLen - len(X[i])), 'constant', constant_values=(0, 0))
    return X

In [207]:
maxLen = 80
(train_X, train_y), (test_X, test_y) = imdb.load_data(seed=42, num_words=500)
print(np.shape(train_X))
print(np.shape(train_y))
train_X = padding(train_X, maxLen)




(25000,)
(25000,)


In [214]:
# transforming the list to numpy arrays
aux = np.zeros((len(train_X), maxLen))
for i in range(25000):
    aux[i] = np.asarray(train_X[i])
train_X = aux

In [None]:
# np.save('train_X', train_X, allow_pickle=True)
# np.save('train_y', train_y, allow_pickle=True)


In [233]:
class LSTM:
    """
    h = array of outputs
    x = array of inputs
    """

    def __init__(self, nInputs, nFeatures, nUnits, nOutputs, batchSize):
        nGates = 4
        if nUnits < nOutputs:
            print('the number of cells cannot be less than the number of outputs')

        # [t][i][j], t = timestep, i = which batch, j = which cell or feature
        x = np.zeros((nInputs, batchSize, nFeatures))
        h = np.zeros((nInputs, batchSize, nUnits))
        i = np.zeros((nInputs, batchSize, nUnits))
        f = np.zeros((nInputs, batchSize, nUnits))
        o = np.zeros((nInputs, batchSize, nUnits))
        tildeC = np.zeros((nInputs, batchSize, nUnits))
        C = np.zeros((nInputs, batchSize, nUnits))
        wxScale = 1 / np.sqrt(nFeatures * nUnits)
        whScale = 1 / np.sqrt(nUnits * nUnits)
        Wxi = np.random.normal(loc=0, scale=wxScale, size=[nFeatures, nUnits])
        Wxf = np.random.normal(loc=0, scale=wxScale, size=[nFeatures, nUnits])
        Wxc = np.random.normal(loc=0, scale=wxScale, size=[nFeatures, nUnits])
        Wxo = np.random.normal(loc=0, scale=wxScale, size=[nFeatures, nUnits])
        Whi = np.random.normal(loc=0, scale=whScale, size=[nUnits, nUnits])
        Whf = np.random.normal(loc=0, scale=whScale, size=[nUnits, nUnits])
        Whc = np.random.normal(loc=0, scale=whScale, size=[nUnits, nUnits])
        Who = np.random.normal(loc=0, scale=whScale, size=[nUnits, nUnits])
        bi = np.random.normal(loc=0, scale=1, size=[nUnits])
        bf = np.random.normal(loc=0, scale=1, size=[nUnits])
        bc = np.random.normal(loc=0, scale=1, size=[nUnits])
        bo = np.random.normal(loc=0, scale=1, size=[nUnits])
        self.x = x
        self.h = h
        self.i = i
        self.f = f
        self.o = o
        self.tildeC = tildeC
        self.C = C
        self.Wxi = Wxi
        self.Wxf = Wxf
        self.Wxc = Wxc
        self.Wxo = Wxo
        self.Whi = Whi
        self.Whf = Whf
        self.Whc = Whc
        self.Who = Who


        self.Wx = np.array([self.Wxc, self.Wxi, self.Wxf, self.Wxo])
        self.Wh = np.array([self.Whc, self.Whi, self.Whf, self.Who])

        self.bi = bi
        self.bf = bf
        self.bc = bc
        self.bo = bo

        self.b = np.array([self.bc, self.bi, self.bf, self.bo])
        self.nUnits = nUnits
        self.nGates = nGates
        self.nInputs = nInputs
        self.batchSize = batchSize
        self.nFeatures = nFeatures



    def forgetGate(self, t):
        if t == 0:
            self.f[t] = sigmoid(np.dot(self.x[t], self.Wxf) + self.bf)
        else:
            self.f[t] = sigmoid(np.dot(self.h[t - 1], self.Whf) + np.dot(self.x[t], self.Wxf) + self.bf)
        return self.f[t]

    def inputGate(self, t):
        if t == 0:
            self.i[t] = sigmoid(np.dot(self.x[t], self.Wxi) + self.bi)
            self.tildeC[t] = tanh(np.dot(self.x[t], self.Wxc) + self.bc)
        else:
            self.i[t] = sigmoid(np.dot(self.h[t - 1], self.Whi) + np.dot(self.x[t], self.Wxi) + self.bi)
            self.tildeC[t] = tanh(np.dot(self.h[t - 1], self.Whc) + np.dot(self.x[t], self.Wxc) + self.bc)
        return self.i[t] * self.tildeC[t]

    def outputGate(self, t):
        if t == 0:
            self.o[t] = sigmoid(np.dot(self.x[t], self.Wxo) + self.bo)
        else:
            self.o[t] = sigmoid(np.dot(self.h[t - 1], self.Who) + np.dot(self.x[t], self.Wxo) + self.bo)
        self.h[t] = self.o[t] * tanh(self.C[t])
        return self.h[t]

    def getNewState(self, t):
        self.forgetGate(t)
        self.inputGate(t)
        self.C[t] = self.C[t - 1] * self.f[t] + self.i[t] * self.tildeC[t]
        self.h[t] = self.outputGate(t)
        return self.C[t], self.h[t]

    def setInput(self, x):
        for t in range(0, self.nInputs):
            self.x[t] = x[t]
            self.getNewState(t)
        return

    def testNetwork(self, test_X, test_y, nTests: int):
        """
        A function to test our network

        It returns the overall accuracy and the numbers our network guessed
        """

        X = test_X[:nTests]
        y = test_y[:nTests]
        outputs = np.zeros(2)
        mse = 0
        for i in range(0, nTests, self.batchSize):
            # shaping the input to have the same dimensions as x[t] and h[t]
            batchX = np.zeros((self.nInputs, self.batchSize, self.nFeatures))
            for j in range(self.batchSize):
                batchX[:, j] = np.reshape(X[i + j], (-1, self.nFeatures))
            batchY = np.reshape(y[i:i + self.batchSize], (-1, self.nUnits))
            self.setInput(batchX)
            networkOutputs = np.where(self.h[-1] < 0.5, 0, 1)
            for out in networkOutputs:
                outputs[out] += 1
            # print(f"number: {y[i]}, networkOutput: {networkOutput}, activations: {net.a[-1]}")
            for j in range(self.batchSize):
                mse += pow((batchY[j] - networkOutputs[j]), 2)
        mse = mse/self.batchSize
        return mse, outputs

    def backProp(self, y):
        i = self.i
        f = self.f
        C = self.C
        tildeC = self.tildeC
        o = self.o
        Wx = self.Wx
        Wh = self.Wh


        deltaH = np.zeros_like(self.h)
        triangle = np.zeros((self.nInputs, self.batchSize, self.nUnits))
        triangleH = np.zeros_like(self.h)
        deltaC = np.zeros_like(C)
        deltaTildeC = np.zeros_like(tildeC)
        deltaI = np.zeros_like(i)
        deltaF = np.zeros_like(f)
        deltaO = np.zeros_like(o)
        deltaX = np.zeros_like(self.x)
        deltaGates = np.zeros((self.nInputs, self.nGates, np.shape(self.C[0])[0], np.shape(self.C[0])[1]))

        # appending one extra element because of the first interaction in the if below
        # deltaState and f have t + 1
        deltaC     = np.append(deltaC, np.zeros((1,self.batchSize, self.nUnits)), axis=0)
        f          = np.append(f, np.zeros((1,self.batchSize, self.nUnits)), axis=0)
        deltaGates = np.append(deltaGates, np.zeros((self.nGates, np.shape(self.C[0])[0], np.shape(self.C[0])[1])), axis=0)

        # C has t - 1 in deltaF
        C = np.append(C, np.zeros((1,self.batchSize, self.nUnits)), axis=0)

        # not too sure about delta because I only have 1 output
        for t in range(self.nInputs - 1, -1, -1):
            # I need the dot product to have dimensions batchsize, 1
            triangle[t] = self.h[t] - y[t]
            for j in range(self.batchSize):
                triangleH[t, j] = np.dot(np.transpose(Wh), deltaGates[t + 1,:,j])
            deltaH[t] = triangle[t] + triangleH[t]
            deltaC[t] = deltaH[t] * o[t] * (1 - pow(tanh(C[t]), 2)) + deltaC[t + 1] * f[t + 1]
            deltaTildeC[t] = deltaC[t] * i[t] * (1 - pow(tildeC[t], 2))
            deltaI[t] = deltaC[t] * tildeC[t] * i[t] * (1 - i[t])
            deltaF[t] = deltaC[t] * C[t - 1] * f[t] * (1 - f[t])
            deltaO[t] = deltaH[t] * tanh(C[t]) * o[t] * (1 - o[t])
            deltaGates[t] = np.array([deltaTildeC[t], deltaI[t], deltaF[t], deltaO[t]])
            for j in range(self.batchSize):
                deltaX[t, j] = np.dot(np.transpose(Wx), deltaGates[t, :, j])[:, 0]
                # even though eventually it will calculate triangleH[-1], it will not be used
                triangleH[t - 1, j] = np.dot(np.transpose(Wh), deltaGates[t, :, j])[:, 0]
        deltaWx = np.zeros_like(self.Wx)
        deltaWh = np.zeros_like(self.Wh)
        deltaB = np.zeros_like(self.b)
        for t in range(self.nInputs):
            for j in range(self.batchSize):
                for k in range(self.nUnits):
                    deltaWx += np.outer(deltaGates[t, :, j, k], self.x[t, j])
                    if t < self.nInputs - 1: # deltaWh is until T - 1
                        deltaWh += np.outer(deltaGates[t + 1, :, j, k], self.h[t, j])
                    #deltaWx[:, :, k] += np.outer(deltaGates[t, :, j, k], self.x[t, j])
                    #deltaWh[:, :, k] += np.outer(deltaGates[t + 1, :, j, k], self.h[t, j])
                # deltaB is from zero to T, different from many posts
                deltaB += deltaGates[t, :, j]
        return deltaWx, deltaWh, deltaB

    def SGD(self, X: list, y: list, SGDbatchSize: int, nEpochs: int, learningRate, lamb):
        """
        Implementation of Stochastic Gradient Descent

        It takes as input the network, the MNIST dataset, the MNIST labels of the dataset,
        the size of the batch to do gradient descent, the number of epochs it should run,
        the learning rate eta (I found the best eta to be in the order of 1s)
        and the regularization term lambda

        It returns a trained network
        """
        bestMse = 0
        bestEpoch = 0
        eta = learningRate
        etaChangeEpoch = 0
        for epoch in range(nEpochs):
            batch = rd.sample(range(np.shape(X)[0]), SGDbatchSize)
            nablaWx = np.zeros_like(self.Wx)
            nablaWh = np.zeros_like(self.Wh)
            nablaB = np.zeros_like(self.b)
            for i in range(0, SGDbatchSize, self.batchSize):

                # getting the batches
                batchX = []
                batchY = []
                for j in range(self.batchSize):
                    batchX.append(X[batch[i+j]])
                    batchY.append(y[batch[i+j]])

                # shaping the input to have the same dimensions as x[t]
                shapedBatchX = np.zeros((self.nInputs, self.batchSize, self.nFeatures))
                for j in range(self.batchSize):
                    if self.nFeatures == 1:
                        shapedBatchX[:, j, 0] = batchX[j]
                    else:
                        shapedBatchX[:, j] = batchX[j]
                        
                # shaping the output to have the same dimensions as h[t]
                shapedBatchY = np.zeros((self.nInputs, self.batchSize, self.nUnits))
                for j in range(self.batchSize):
                    shapedBatchY[:, j] = batchY[j]

                self.setInput(shapedBatchX)
                # finding what should be modified based on this particular example
                deltaNablaWx, deltaNablaWh, deltaNablaB = self.backProp(shapedBatchY)
                # passing this modifications to our overall modifications matrices
                nablaWx += deltaNablaWx
                nablaWh += deltaNablaWh
                nablaB += deltaNablaB

            # applying the changes to our network
            self.b = self.b - eta * (nablaB / SGDbatchSize)
            self.Wx = self.Wx - eta * (nablaWx / SGDbatchSize) - eta * (lamb / SGDbatchSize) * self.Wx
            self.Wh = self.Wh - eta * (nablaWh / SGDbatchSize) - eta * (lamb / SGDbatchSize) * self.Wh
            mse, outputs = self.testNetwork(X, y, nTests=SGDbatchSize)
            if mse > bestMse:
                bestMse = mse
                bestEpoch = epoch
            if epoch%100 == 0:
                print(f'learningRate: {learningRate} epochs: {epoch} mse: {mse}, outputs: {outputs}')
        print(f'best acc: {bestMse} on epoch: {bestEpoch}')

In [217]:
train_X[0]

array([  1.,  11.,   2.,  11.,   4.,   2.,   2.,   2., 299.,   2.,   2.,
         2.,   2.,  37.,  47.,  27.,   2.,   2.,   2.,  19.,   6.,   2.,
        15.,   2.,   2.,  17.,   2.,   2.,   2.,   2.,   2.,  46.,   4.,
       232.,   2.,  39., 107.,   2.,  11.,   4.,   2., 198.,  24.,   4.,
         2., 133.,   4., 107.,   7.,  98., 413.,   2.,   2.,  11.,  35.,
         2.,   8., 169.,   4.,   2.,   5., 259., 334.,   2.,   8.,   4.,
         2.,  10.,  10.,  17.,  16.,   2.,  46.,  34., 101.,   2.,   7.,
        84.,  18.,  49.])

In [225]:
shapedBatchX = np.zeros((80, 10, 1))
np.shape(shapedBatchX[:, 0, 0])

(80,)

In [234]:
etas = [0.1, 1, 10]
features = 1
if features == 1:
    # adding a dimension so the operations can be done
    train_X = np.expand_dims(train_X, axis=1)
    
for l in etas:
    print(f"learning rate: {l}")
    lstm = LSTM(nInputs=80, nFeatures=features, nUnits=80, nOutputs=1, batchSize=10)
    lstm.SGD(train_X, train_y, SGDbatchSize=10, nEpochs=200, learningRate=l, lamb=0)

learning rate: 0.1


ValueError: all the input arrays must have same number of dimensions, but the array at index 0 has 4 dimension(s) and the array at index 1 has 3 dimension(s)