In [6]:
import numpy as np
import random
import pickle 
import matplotlib.pyplot as plt
import math

In [7]:
import pickle
import gzip


def load_data():
    with gzip.open("../data/mnist.pkl.gz", "rb") as file:
        train, val, test = pickle.load(file, encoding="latin1")
    return train, val, test

def load_data_wrapper():
    train, val, test = load_data()
    
    train_inputs = [np.reshape(x, (784, 1)) for x in train[0]]
    train_outputs = [one_hot_encode(y, 10) for y in train[1]]
    train_data = list(zip(train_inputs, train_outputs))
    
    val_inputs = [np.reshape(x, (784, 1)) for x in val[0]]
    val_data = list(zip(val_inputs, val[1]))
    
    test_inputs = [np.reshape(x, (784, 1)) for x in test[0]]
    test_data = list(zip(test_inputs, test[1]))
    
    return train_data, val_data, test_data

def one_hot_encode(y, num_categories):
    one_hot = np.zeros((num_categories, 1))
    one_hot[y] = 1
    return one_hot

In [8]:
def sigmoid(z):
    return 1.0/(1.0+np.exp(-z))

def sigmoid_derivative(z):
    """Can be simple derivated by differentiating sigmoid with respect to z and rearranging."""
    return sigmoid(z)*(1-sigmoid(z))

In [14]:
import timeit

In [60]:
class Network(object):
    def __init__(self, sizes):
        """sizes: list of ints signifying size of each layer"""
        self.num_layers = len(sizes)
        self.sizes = sizes
        self.biases = [np.random.randn(y, 1) for y in sizes[1:]]  # column vector of length y
        self.weights = [np.random.randn(next_h, prev_h) 
                        for next_h, prev_h in zip(sizes[1:],sizes[:-1])]        
    
    
    def feedforward(self, a):
        for w, b in zip(self.weights, self.biases):
            a = sigmoid(np.dot(w, a)+b)
        return a
    
    def SGD(self, train_data, epochs, batch_size, learning_rate, test_data=None):
        if test_data:
            n_test = len(test_data)
        n_train = len(train_data)
        
        for i in range(epochs):
            random.shuffle(train_data)
            mini_batches = [train_data[j:j+batch_size] for j in range(0, n_train, batch_size)]
            
            for mini_batch in mini_batches:
                self.update_mini_batch(mini_batch, learning_rate)
            
            if test_data:
                print(f"Epoch {i}: {self.evaluate(test_data)} / {n_test}")
                
            else:
                print(f"Epoch {i}: complete")
                
    def update_mini_batch(self, mini_batch, learning_rate):
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        nabla_w = [np.zeros(w.shape) for w in self.weights]
        
        for x, y in mini_batch:
            delta_nabla_b, delta_nabla_w = self.backprop(x, y)
            nabla_b = [nb+dnb for nb, dnb in zip(nabla_b,delta_nabla_b)]
            nabla_w = [nw+dnw for nw, dnw in zip(nabla_w,delta_nabla_w)]
        
        self.weights = [w -(learning_rate/len(mini_batch))*nw 
                        for w, nw in zip(self.weights, nabla_w)]    
        self.biases = [b -(learning_rate/len(mini_batch))*nb 
                        for b, nb in zip(self.biases, nabla_b)]
        
        
    def backprop(self, x, y):
        
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        nabla_w = [np.zeros(w.shape) for w in self.weights]
        
        # Forward pass
        activation = x      
        activations = [x]   # activation is sigmoid(z)
        zs = []      # Z is Wx+b
        
        for w, b in zip(self.weights, self.biases):
            z = np.dot(w, activation)+b
            zs.append(z)
            activation = sigmoid(z)
            activations.append(activation)
        
        # Backward pass
        dLoss = self.MSE_derivative(activations[-1], y)  # scalar
        dz = sigmoid_derivative(zs[-1]) * dLoss  # chain rule (dLoss/dz_2) = (dLoss/da_2)*(da_2/dz_2)
        nabla_b[-1] = dz
        nabla_w[-1] = dz.dot(activations[-2].T)
        
        for l in range(2, self.num_layers):
            z = zs[-l]
            d_sigmoid = sigmoid_derivative(z)
            dz = np.dot(self.weights[-l+1].T, dz) * d_sigmoid
            nabla_b[-l] = dz
            nabla_w[-l] = dz.dot(activations[-l-1].T)
        
        return nabla_b, nabla_w
    
            

    def MSE_derivative(self, activation, y):
        return activation - y
    
    def evaluate(self, test_data):
        test_results = [(np.argmax(self.feedforward(x)), y)
                        for (x, y) in test_data]
        return sum([x == y for x, y in test_results])
    
    def get_activations_for_example(self, x):
        activations = [x]
        a = x
        for w, b in zip(self.weights, self.biases):
            a = sigmoid(np.dot(w, a)+b)
            activations.append(a)
            
        return activations
    
    def plot_first_layer_weights(self):
        fig=plt.figure(figsize=(15,15))
        w_h, w_w = net.weights[0].shape
        fig_cols = math.floor(math.sqrt(w_h))
        fig_rows = math.ceil(w_h/fig_cols)
        
                
        for i in range(w_h):
            img = np.reshape(self.weights[0][i], (28, 28))    
            fig.add_subplot(fig_rows, fig_cols, i+1)
            plt.imshow(img, interpolation='nearest')
        fig.suptitle(f'First Layer Weights', fontsize=40)

In [61]:
train, val, test = load_data_wrapper()

In [62]:
net = Network([784, 30, 10])

In [63]:
import time

start = time.time()

net.SGD(train, epochs=10, batch_size=20, learning_rate=3.0)

end = time.time()
print(f"time taken = {start-end}")
print(f"time taken = {end-start}")

TypeError: cannot unpack non-iterable NoneType object