In [1]:
import pandas as pd
import numpy as np

In [2]:
class Layer:
    def __init__(self):
        self.input = None
        self.output = None
        
    # computes the output Y of a layer for a given input X
    def forward_propagation(self, input):
        raise NotImplementedError
        
    # computes the dE/dX for a given dE/dY (and update parameters if any)
    def backward_propagation(self, output_error, learning_rate):
        raise NotImplementedError

In [3]:
# inherit from base class Layer
class FCLayer(Layer):
    # input_size = number of input neurons
    # output_size = number of output neurons
    def __init__(self, input_size, output_size):
        self.weights = np.random.rand(input_size, output_size) - 0.5
        self.bias = np.random.rand(1, output_size) - 0.5
    
    # returns output for a given input
    def forward_propagation(self, input_data):
        self.input = input_data
        self.output = np.dot(self.input, self.weights) + self.bias
        return self.output
    
    # computes dE/dW, dE/dB for a given output_error=dE/dY. Returns input_error=dE/dX.
    def backward_propagation(self, output_error, learning_rate):
        input_error = np.dot(output_error, self.weights.T)
        weights_error = np.dot(self.input.T, output_error)
        # dBias = output_error
        
        # update parameters
        self.weights -= learning_rate * weights_error
        self.bias -= learning_rate * output_error
        return input_error

In [4]:
class ActivationLayer(Layer):
    def __init__(self, activation, activation_prime):
        self.activation = activation
        self.activation_prime = activation_prime
        
    # returns the activated input
    def forward_propagation(self, input_data):
        self.input = input_data
        self.output = self.activation(self.input)
        return self.output
    
    # returns input_error=dE/dX for a given output_error=dE/dY.
    # learning_rate is not used because there is no "learnable" parameters
    def backward_propagation(self, output_error, learning_rate):
        return self.activation_prime(self.input) * output_error

In [5]:
# activation function and its derivative
def tanh(x):
    return np.tanh(x)

def tanh_prime(x):
    return 1 - np.tanh(x)**2

In [6]:
# loss function and its derivative
def mse(y_true, y_pred):
    return np.mean(np.power(y_true-y_pred, 2))

def mse_prime(y_true, y_pred):
    return 2*(y_pred-y_true)/y_true.size

In [22]:
class Network:
    def __init__(self):
        self.layers = []
        self.loss = None
        self.loss_prime = None
        
    # add layer to network
    def add(self, layer):
        self.layers.append(layer)
        
    # set loss to use
    def use(self, loss, loss_prime):
        self.loss = loss
        self.loss_prime = loss_prime
        
    # predict output for given input
    def predict(self, input_data):
        # sample dimension first
        samples = len(input_data)
        result = []
    
        # run network over all samples
        for i in range(samples):
            # forward propagation
            output = input_data[i]
            for layer in self.layers:
                output = layer.forward_propagation(output)
            result.append(output)
        
        return result
    
    # train the network
    def fit(self, x_train, y_train, epochs, learning_rate):
        # sample dimension first
        samples = len(x_train)
        
        # training loop
        for i in range(epochs):
            err = 0
            for j in range(samples):
                output = x_train[j]
                for layer in self.layers:
                    output = layer.forward_propagation(output)
                    
                # compute loss (for display purpose only)
                err += self.loss(y_train[j], output)
                
                # backward propagation
                error = self.loss_prime(y_train[j], output)
                for layer in reversed(self.layers):
                    error = layer.backward_propagation(error, learning_rate)
            
            # calculate average error on all samples
            err /= samples
            if i%5==0:
                print('epoch %d/%d  error=%f' % (i+1, epochs, err))
        print('epoch %d/%d  error=%f' % (epochs, epochs, err))

In [8]:
# training data
x_train = np.array([[[0,0]], [[0,1]], [[1,0]], [[1,1]]])
y_train = np.array([[[0]], [[1]], [[1]], [[0]]])

# network
net = Network()
net.add(FCLayer(2, 3))
net.add(ActivationLayer(tanh, tanh_prime))
net.add(FCLayer(3, 1))
net.add(ActivationLayer(tanh, tanh_prime))

# train
net.use(mse, mse_prime)
net.fit(x_train, y_train, epochs=1000, learning_rate=0.1)

# test
out = net.predict(x_train)
print(out)

epoch 1/1000  error=0.517518
epoch 101/1000  error=0.250011
epoch 201/1000  error=0.142480
epoch 301/1000  error=0.005787
epoch 401/1000  error=0.002192
epoch 501/1000  error=0.001295
epoch 601/1000  error=0.000902
epoch 701/1000  error=0.000686
epoch 801/1000  error=0.000550
epoch 901/1000  error=0.000457
[array([[0.00072843]]), array([[0.97357667]]), array([[0.9707143]]), array([[-0.00089326]])]


In [9]:
train = np.array(pd.read_csv("train.csv")).astype('float32')
test = np.array(pd.read_csv("test.csv")).astype('float32')

In [10]:
train.shape, test.shape

((42000, 785), (28000, 784))

In [11]:
def to_categorical(data):
    col = np.max(data)+1
    rows = len(data)
    result = np.zeros((rows,col))
    for i in range(data.shape[0]):
        result[i,data[i]] = 1
    return result

In [12]:
X_train, y_train = train[:,1:]/255, to_categorical(train[:,0].astype(int))
y_train = y_train.reshape(1,-1,10)
X_test = test/255
X_train = X_train.reshape(X_train.shape[0], 1, 28*28)
X_test = X_test.reshape(X_test.shape[0], 1, 28*28)

In [13]:
y_train.shape

(1, 42000, 10)

In [23]:
mnist_net = Network()
mnist_net.add(FCLayer(28*28, 100))
mnist_net.add(ActivationLayer(tanh, tanh_prime))
mnist_net.add(FCLayer(100, 50))
mnist_net.add(ActivationLayer(tanh, tanh_prime))
mnist_net.add(FCLayer(50, 10))
mnist_net.add(ActivationLayer(tanh, tanh_prime))

In [24]:
mnist_net.use(mse, mse_prime)
mnist_net.fit(X_train[0:1000], y_train[0], epochs=35, learning_rate=0.1)

epoch 1/35  error=0.228520
epoch 6/35  error=0.055211
epoch 11/35  error=0.032248
epoch 16/35  error=0.022475
epoch 21/35  error=0.016669
epoch 26/35  error=0.013295
epoch 31/35  error=0.010810
epoch 35/35  error=0.009445


In [26]:
X_train.shape

(42000, 1, 784)

In [27]:
mnist_net.predict(X_train[10000])

[array([[0.02440131, 0.04731472, 0.26015316, 0.06391321, 0.13102897,
         0.06844612, 0.10711165, 0.07204907, 0.20143757, 0.06395159]])]