# MNIST Digit Recognition with N-Layer Neural Net

In [None]:
# import the MNIST data set
import cloudpickle, gzip, numpy

f = gzip.open('data/mnist.pkl.gz', 'rb')
train_set, valid_set, test_set = cloudpickle.load(f, encoding='latin1')
f.close()

print(len(train_set))

In [None]:
import numpy as np

class NN:
    
    def __init__(self):
        np.random.seed()
        self.weights = []
        self.hl_sizes = []
        
    def sigmoid(self, x):
        return 1/(1 + np.exp(-x))
    
    def d_sigmoid_output(self, out):
        return out * (1 - out)
    
    def test(self,X,Y):
        # initialize layers
        layers = []
        
        # how many training examples are provided?
        num_inputs = X.shape[0]
        
        # add a bias node with a value of 1 to
        # the input layer and hidden layers
        X = np.append(X,np.ones((num_inputs,1)),axis=1)
        
        # load examples into input layer
        layers.append(X)
        
        # feed forward to all hidden layers
        for j in range(len(self.hl_sizes)):
            # compute node values
            layer = self.sigmoid(np.dot(layers[j], self.weights[j]))
        
            # reset bias node
            layer[:,layer.shape[1]-1] = 1
            layers.append(layer)

        # feed forward to output layer
        layers.append(self.sigmoid(np.dot(layers[j+1], self.weights[j+1])))
        
        return layers[len(layers)-1]
        
    def train(self, X=np.array([]), Y=np.array([]), iterations=1, alpha=1, bias=1, batch_size=0, hl_sizes=[4], dropout=False, dropout_ratio=0.5):
        if X.size == 0 or Y.size == 0:
            raise Exception("You must specify both X and Y")
        if X.shape[0] != Y.shape[0]:
            raise Exception("Number of rows in X and Y must be the same: %d != %d" % (X.shape[0],Y.shape[0]))
        
        # set hidden layer sizes
        self.hl_sizes = hl_sizes
        
        # how many training examples are provided?
        num_inputs = X.shape[0]
        
        # how many nodes do we need for then input layer?
        input_vector_length = X.shape[1]
        
        # if batch_size is not provided (i.e. is set to zero by default),
        # then default to full batch for gradient descent
        if batch_size <= 0:
            batch_size = num_inputs
        
        # add a bias node with a value of 1 to
        # the input layer and hidden layers
        X = np.append(X,np.ones((num_inputs,1)),axis=1)
        
        # randomly initialize weights between -1 and 1
        # for input_layer -> h1_layer
        self.weights = []
        self.weights.append(2*np.random.random((input_vector_length + 1, self.hl_sizes[0] + 1)) - 1)
        
        # randomly initialize weights between -1 and 1
        # between all hidden layers
        if len(self.hl_sizes) > 1:
            for j in range(1,len(self.hl_sizes)):
                self.weights.append(2*np.random.random((hl_sizes[j-1] + 1, self.hl_sizes[j] + 1)) - 1)
        
        # randomly initialize weights between -1 and 1
        # for last hidden layer -> output_layer
        self.weights.append(2*np.random.random((self.hl_sizes[len(self.hl_sizes)-1] + 1,Y.shape[1])) - 1)
        
        # initialize layers
        layers = []
        
        for i in range(iterations):
            
            batch_start = 0
            
            layers = []
            weight_updates = []
            
            for j in range(len(self.weights)):
                weight_updates.append(np.zeros_like(self.weights[j]))
            
            total_avg_training_error = 0
            
            # --------------------------------------------------------
            # TRAIN IN MINI BATCHES
            # (or full batch if no batch_size was provided)
            #
            # TODO: This current implementation isn't very useful
            #       because it still processes batches serially.
            #       The whole point of mini-batches is so you can
            #       parallelize the gradient decent during backpropagation.
            #       So, the next iteration of this code should take 
            #       adcantage of threading to actually parallelize the
            #       crunching of the mini-batches 
            # --------------------------------------------------------
            
            while batch_start <= (num_inputs - 1):
                
                batch_end = batch_start + batch_size
                
                x = X[batch_start : batch_end]
                y = Y[batch_start : batch_end]
                
                # TODO: make batch selection random so each iteration
                #       isn't always seeing the same mini-batches

                # set input layer equal to the examples in the mini-batch
                layers.append(x)
                
                # -------------------------------
                # FEED FORWARD
                # -------------------------------
                
                # feed forward to all hidden layers
                for j in range(len(self.hl_sizes)):
                    # compute node values
                    layer = self.sigmoid(np.dot(layers[j], self.weights[j]))
                    
                    # perform dropout if requested
                    if dropout == True:
                        
                        # generate an array of 1's and 0's
                        # drawn from a binomial distribution with probability of dropout_ratio
                        dropouts = np.random.binomial(1, 1 - dropout_ratio, (layer.shape[0],layer.shape[1]))
                                           
                        # multiply the node values in the layer by the randomly assigned 
                        # values in dropouts, effectively "turning off" some nodes (they get multiplied by 0)
                        layer = layer * dropouts
                    
                    # reset bias node
                    layer[:,layer.shape[1]-1] = 1
                    layers.append(layer)
                    
                    # reset bias node in the layer that was just computed
                    for k in range(batch_size):
                        try:
                            layers[j+1][k][self.hl_sizes[j]] = 1
                        except IndexError:
                            break
                
                # reset output layer
                layers.append(np.zeros((x.shape[0],10)))
                    
                # feed forward to output layer
                layers[len(layers)-1] = self.sigmoid(np.dot(layers[j+1], self.weights[j+1]))
                
                # measure error
                training_error = layers[len(layers)-1] - y
                total_avg_training_error += np.mean(np.abs(training_error))
                
                # -------------------------------
                # BACKPROPAGATE
                # -------------------------------
                
                deltas = []
                errors = []

                errors.insert(0, training_error)
                deltas.insert(0, training_error * self.d_sigmoid_output(layers[len(layers)-1]))
                
                # continue backwards, calculating the error contribution from each layer
                # and an appropriate delta for determining weight updates
                for j in reversed(range(0,len(self.hl_sizes))):
                    errors.insert(0, np.dot(deltas[0],self.weights[j+1].T))
                    deltas.insert(0, errors[0] * self.d_sigmoid_output(layers[j+1]))
            
                # calculate weight updates       
                for j in range(0,len(self.weights)):
                    weight_updates[j] += np.dot(layers[j].T, deltas[j])
                
                # figure out what index to start grabbing our next batch from
                batch_start += batch_size
            
            # At this point, the batch has been processed, and all the updates
            # for the weights have been calculated
            
            # update weights
            for j in range(len(self.weights)-1):
                self.weights[j] -= (alpha * weight_updates[j])
            
            if i % 100 == 0:
                print("Iteration: %d Error: %.5f" % (i,total_avg_training_error))
        
        print("Iteration: %d Error: %.5f" % (i,total_avg_training_error))

In [None]:
X = train_set[0][:1000]
_Y = train_set[1][:1000]
Y = np.empty((len(_Y),10))

print(len(train_set[0]))

for idx, y in enumerate(_Y):
    if y == 0:
         Y[idx] = [1,0,0,0,0,0,0,0,0,0]
    elif y == 1:
        Y[idx] = [0,1,0,0,0,0,0,0,0,0]
    elif y == 2:
        Y[idx] = [0,0,1,0,0,0,0,0,0,0]
    elif y == 3:
        Y[idx] = [0,0,0,1,0,0,0,0,0,0]
    elif y == 4:
        Y[idx] = [0,0,0,0,1,0,0,0,0,0]
    elif y == 5:
        Y[idx] = [0,0,0,0,0,1,0,0,0,0]
    elif y == 6:
        Y[idx] = [0,0,0,0,0,0,1,0,0,0]
    elif y == 7:
        Y[idx] = [0,0,0,0,0,0,0,1,0,0]
    elif y == 8:
        Y[idx] = [0,0,0,0,0,0,0,0,1,0]
    elif y == 9:
        Y[idx] = [0,0,0,0,0,0,0,0,0,1]
        

input_size = len(X[0])

net = NN()
net.train(X=X,Y=Y,iterations=10000,hl_sizes=[input_size*2, input_size/2], alpha=0.01, dropout=True, dropout_ratio=0.5)

In [134]:
X = valid_set[0]
Y = valid_set[1]

results = net.test(X,Y)
_results = []

for i, result in enumerate(results):
    
    for j,el in enumerate(result):
        result[j] = int(el+0.5)
    
    if np.array_equal(result,[1,0,0,0,0,0,0,0,0,0]):
        _results.append(0)
    elif np.array_equal(result,[0,1,0,0,0,0,0,0,0,0]):
        _results.append(1)
    elif np.array_equal(result,[0,0,1,0,0,0,0,0,0,0]):
        _results.append(2)
    elif np.array_equal(result,[0,0,0,1,0,0,0,0,0,0]):
        _results.append(3)
    elif np.array_equal(result,[0,0,0,0,1,0,0,0,0,0]):
        _results.append(4)
    elif np.array_equal(result,[0,0,0,0,0,1,0,0,0,0]):
        _results.append(5)
    elif np.array_equal(result,[0,0,0,0,0,0,1,0,0,0]):
        _results.append(6)
    elif np.array_equal(result,[0,0,0,0,0,0,0,1,0,0]):
        _results.append(7)
    elif np.array_equal(result,[0,0,0,0,0,0,0,0,1,0]):
        _results.append(8)
    elif np.array_equal(result,[0,0,0,0,0,0,0,0,0,1]):
        _results.append(9)
    else:
        _results.append("n/a")

correct = 0
for i, y in enumerate(Y):
    if y == _results[i]:
        correct += 1
        
print("%d/%d correct" % (correct,len(X)))
print("Test accuracy: %.2f" % (correct/len(X)))
    

8684/10000 correct
Test accuracy: 0.87
