Implementing a Neural Network with negative sampling for word2vec task

In [189]:
import numpy as np
import random as rd
import time


In [190]:
class Network:
    """
    The main object we're going to use accross this notebook
    It's a neural network that takes as input a list of 
    layers nodes
    
    Ex: [2, 3, 1] is a 3 layer network, with 2 neurons of input, 3 neurons 
    in the hidden layer and 1 for the output layer
    
    Supposedly it can take more than just 3 layers but I didnt test it
    
    It initializes an object with the proper weights, biases, activations and z
    based on the layers list. It also has the layers list and the number of layers
    
    The weights and biases initialized following a Gaussian with standard deviation 1/sqrt(n_in)
    with n_in = number of weights into the neuron
    """
    def __init__(self, layers: list):        
        np.random.seed(42)        
        b = []
        w = []
        a = []
        z = []
        for l in range(0, len(layers)):
            # skipping one layer for the weights and biases
            if (l+1) < len(layers):
                b.append(np.random.normal(loc=0, scale=1,size=layers[l+1]))
                wScale = 1/np.sqrt(layers[l])
                w.append(np.random.normal(loc=0,scale=wScale,size=[layers[l],layers[l+1]]))
                #print(w[l])
            a.append(np.zeros(layers[l]))
            z.append(np.zeros(layers[l]))
        # b[i][j] -> "i" is which layer, "j" which neuron
        # w[i][j][k] -> "i" is which layer, "j" which neuron of the first layer, "k" which neuron of the second layer
        self.b = b
        self.w = w
        self.a = a
        self.z = z
        self.nLayers = len(layers)
        self.layers = layers
        
    @staticmethod
    def copy(net):
        copiedNet = Network([784,30,10])
        copiedNet.a = np.copy(net.a)
        copiedNet.z = np.copy(net.z)
        for l in range(2):
            copiedNet.w[l] = np.copy(net.w[l])
            copiedNet.b[l] = np.copy(net.b[l])
        return copiedNet
            

In [191]:
def sigmoid(n: float):
    return 1.0/(1.0+np.exp(-n))

def sigmoid_derivative(n: float):
    """Derivative of the sigmoid function."""
    return sigmoid(n)*(1-sigmoid(n))

def softmax(n: float):
    return np.exp(n)/sum(np.exp(n))
    
    

In [192]:
def feedForward(net: Network) -> Network:
    """
    Feedforwading the activations to the next layer
    
    It will take as input the network already with the input image as the activation 
    on the first layer and then feedforward to the next layrse
    
    It returns the network with all the activations set
    """
    
    # resetting the activations as to not take any info from the activation of
    # the previous number while maintanin the first activation
    for i in range(1, net.nLayers):
        net.z[i] = np.zeros(net.layers[i])
        net.a[i] = np.zeros(net.layers[i])
        
        
    # the input is a oneHotVector, so there'll only be one value that is one, the rest will be 0
    # because of this, only the value that is one matters
    oneHotInput = np.argmax(net.z[0])
    for receivingNeuron in range(net.layers[1]):
        net.z[1][receivingNeuron] += net.a[0][oneHotInput] * net.w[0][oneHotInput][receivingNeuron]
        net.z[1][receivingNeuron] += net.b[0][receivingNeuron]
    net.a[1] = sigmoid(net.z[1])
    
    for l in range(1, net.nLayers-1):
        for receivingNeuron in range(net.layers[l+1]):
            for givingNeuron in range(net.layers[l]):
                net.z[l+1][receivingNeuron] += net.a[l][givingNeuron] * net.w[l][givingNeuron][receivingNeuron]
            net.z[l+1][receivingNeuron] += net.b[l][receivingNeuron]
    
        # if its the last layer it will apply softmax
        if l == net.nLayers-2:
            net.a[l+1] = softmax(net.z[l+1])
        else:
            net.a[l+1] = sigmoid(net.z[l+1])

            
    return net
    
    

In [193]:
def setInput(net: Network, oneHotTokens):
    """
    Inputs the MNIST number into the network, since the number is a 28x28 matrix, 
    we transform it into a 784 array
    
    We also scale the pixels as to be between 0 and 1 for the sigmoid function 
    instead of 0 and 255
    
    Returns the network with the proper activations on all layers since it pass 
    through the feedforward step
    """
    net.z[0] = np.zeros(len(oneHotTokens))
    net.a[0] = np.zeros(len(oneHotTokens))
    index = np.argmax(oneHotTokens)
    net.z[0][index] = 1
    net.a[0][index] = 1
    net = feedForward(net)
    
    return net

In [194]:
def testNetwork(net: Network, test_X, test_y, nTests: int):
    """
    A function to test our network
    
    It returns the overall accuracy and the numbers our network guessed
    """
    
    correctOutput = 0
    X = test_X[:nTests]
    y = test_y[:nTests]
    outputs = np.zeros((net.layers[-1]))
    error = 0
    vocabSize = net.layers[-1]
    for i in range(nTests):
        net = setInput(net, X[i])
        networkOutput = np.argmax(net.a[-1])
        outputs[networkOutput] += 1
        #print(f"number: {y[i]}, networkOutput: {networkOutput}, activations: {net.a[-1]}")
        error += np.sum(pow((y[i] - net.a[-1]), 2))/vocabSize 
    return error, outputs


In [195]:
def gridSearch(train_X, train_y, test_X, test_y, batchSize: int, learningRates: list, epochs: int, lamb):
    """
    A function to perform a gridSearch in order to find the best learningRates

    It takes as input the network, the training images of MNIST, the training labels,
    the test images, the test labels, the batchSize for SGD,
    a list of learningRates as to find the best inside the list
    the number of epochs to perform SGD
    
    
    It returns the best network accross all learning rates list
    """
    bestAcc = 0
    for eta in learningRates:
        # resetting the network
        net = Network([784,30,10])
        net = SGD(net, train_X, train_y, batchSize=batchSize, nEpochs=epochs, learningRate=eta, lamb=lamb)
        acc, outputs = testNetwork(net, test_X, test_y, batchSize) 
        if acc > bestAcc:
            bestNet = net
            bestAcc = acc
    return bestNet


The list below is all equations that were used to compute the erros and then propagate through the network:

To calculate the error on the last layer: 
$$\delta^L = (a^L - y)\odot \sigma'(z^L)$$

To calculate the error on the other layers:
$$\delta^l = ((w^{l+1})^T\delta^{l+1})\odot \sigma'(z^l)$$

To repass the error to the bias: 
$$\frac{\partial C}{\partial b^l_j} = \delta^l_j$$

To repass the error to the weights:
$$\frac{\partial C}{\partial w^l_{jk}} = a^{l-1}_k\delta^l_j$$

In [196]:
def backProp(net: Network, y, nNegSamples, unigramTable, SIdic) -> Network:
    """
    The backpropagation step: first we calculate the error on the last layer, 
    then we pass to the previous layers all the while applying the error 
    to the weights and biases. Here we used Cross-entropy as our cost function
    
    Example on a 3 layer network: We calculate the error on the last layer, 
    apply it to the last layer's weights and biases, and then calculate the 
    error on the next layer, propagate to the weights and biases and it's done
    
    It takes as input the network and the label of the number the network was activated on
    
    It returns the modifications to the weights and biases (nablaW and nablaB) 
    the network should have
    """
    layers = net.layers
    nablaB = [np.zeros(i.shape) for i in net.b]
    nablaW = [np.zeros(i.shape) for i in net.w]
    delta = np.zeros(net.layers[-1])
    
    
    # findig n random words from the table
    randomWords = np.random.randint(len(unigramTable), size=nNegSamples)
    negSamples = []
    for i in randomWords:
        # finding the int representation of the word and the position on the oneHotVec
        negSamples.append(SIdic[unigramTable[i]])
    
    # finding the correct word on the oneHotVector y
    correctWord = np.argmax(y)
    
    # if the correct word was chosen, it removes from the negative samples
    if correctWord in negSamples:
        negSamples = np.delete(negSamples, np.where(negSamples == correctWord))

    delta[correctWord] += (net.a[-1][correctWord] - 1)    
    for sample in negSamples:
        delta[sample] += (net.a[-1][sample] - 0)
    
    inputWord = np.argmax(net.z[0])
    for l in range(net.nLayers-1, 0, -1):
        #nablaB and nablaW have -1 because they only have 2 layers instead of 3
        nablaB[l-1] = delta
                
        # if its the last layer, it'll only update the negativeSamples and the correctOutput
        if l == net.nLayers-1:
            for j in negSamples:
                for k in range(layers[l-1]):
                    nablaW[l-1][k][j] += net.a[l-1][k]*delta[j]
            for k in range(layers[l-1]):
                nablaW[l-1][k][correctWord] += net.a[l-1][k]*delta[correctWord]
        
        # on the first layer, only the input word will have its weights and biases adjusted
        if l-1 == 0:
            for j in range(layers[l]):
                nablaW[l-1][inputWord][j] += net.a[l-1][inputWord]*delta[j]            
        else:
            for j in range(layers[l]):
                for k in range(layers[l-1]):
                    nablaW[l-1][k][j] += net.a[l-1][k]*delta[j]
        
        # finding the error one layer behind
        # in the book it needs a transpose because its weight[layer][receivingNeuron][givingNeuron]
        # but my implementation uses weight[layer][givingNeuron][receivingNeuron] so it's not necessary
        delta = (np.dot(net.w[l-1], delta))*sigmoid_derivative(net.z[l-1])
    
    
    
    for l in range(net.nLayers-1, 0, -1):
        #nablaB and nablaW have -1 because they only have 2 layers instead of 3
        nablaB[l-1] = delta
                
        for j in range(layers[l]):
            for k in range(layers[l-1]):
                nablaW[l-1][k][j] += net.a[l-1][k]*delta[j]
        
        # finding the error one layer behind
        # in the book it needs a transpose because its weight[layer][receivingNeuron][givingNeuron]
        # but my implementation uses weight[layer][givingNeuron][receivingNeuron] so it's not necessary
        delta = (np.dot(net.w[l-1], delta))*sigmoid_derivative(net.z[l-1])
        
    return nablaB, nablaW


In [197]:
def SGD(net: Network, X: list, y: list, nNegSamples, unigramTable, SIdic, batchSize: int, nEpochs: int, 
        learningRate, lamb) -> Network:
    """
    Implementation of Stochastic Gradient Descent
    
    It takes as input the network, the MNIST dataset, the MNIST labels of the dataset, 
    the size of the batch to do gradient descent, the number of epochs it should run,
    the learning rate eta (I found the best eta to be in the order of 1s)
    and the regularization term lambda
    
    It returns a trained network
    """
    bestAcc = 0
    bestEpoch = 0
    minError = np.inf
    eta = learningRate
    for epoch in range(nEpochs):
        batch = rd.sample(range(len(X)), batchSize)
        nablaB = [np.zeros(i.shape) for i in net.b]
        nablaW = [np.zeros(i.shape) for i in net.w]
        for i in batch:
            start_time = time.time()
            net = setInput(net, X[i])
            print(f'setInput time: {time.time() - start_time}')
            # finding what should be modified based on this particular example
            start_time = time.time()
            deltaNablaB, deltaNablaW = backProp(net, y[i], nNegSamples, unigramTable, SIdic)
            print(f'backProp time: {time.time() - start_time}')
            # passing this modifications to our overall modifications matrices
            for l in range(net.nLayers-1):
                nablaB[l] += deltaNablaB[l]
                nablaW[l] += deltaNablaW[l]
        
        # applying the changes to our network
        for l in range(net.nLayers-1):
            net.b[l] = net.b[l] - eta * (nablaB[l]/batchSize) 
            net.w[l] = net.w[l] - eta * (nablaW[l]/batchSize) - eta * (lamb/batchSize) *  net.w[l]
        error, outputs = testNetwork(net, X, y, nTests=batchSize)
        if error < minError:
            minError = error
            bestEpoch = epoch
        print(f'learningRate: {learningRate} epochs: {epoch} error: {error}, outputs: {outputs}')
    print(f'min error: {minError} on epoch: {bestEpoch}')
    return net
        