In [1]:
import matplotlib.pyplot as plt
from sklearn.datasets import load_iris, fetch_mldata
import numpy as np
from tqdm import tqdm_notebook as tqdm
import logging
from cost import SquaredError, CrossEntropy


def identity(x, deriv=False):
    if deriv:
        return 1
    return x


def sigmoid(x, deriv=False):
    if deriv:
        return x * (1-x)
    return 1 / (1 + np.exp(-x))


def softmax(x, deriv=False, axis=1):
    shift_x = x - np.max(x)
    
    try:
        sm = np.exp(shift_x) / np.sum(np.exp(shift_x), axis=axis, keepdims=True)
    except np.AxisError:
        sm = np.exp(shift_x) / np.sum(np.exp(shift_x), axis=None, keepdims=True)
        
    if deriv: 
        return x * (1 - x) # https://datascience.stackexchange.com/questions/29735/how-to-apply-the-gradient-of-softmax-in-backprop
    else:
        return sm

In [2]:
predictions = np.array([0.1, 0.1, 0.8])
targets = np.array([0, 0, 1])
cost = CrossEntropy()
cost.error(predictions, targets)

0.43386458262986227

In [3]:
class NeuralNetwork:
    
    # Class Attributes
    train_error = list()
    
    def __init__(self, layers, cost_function):
        assert isinstance(layers, list), "Input needs to be a list of Layers"
        assert len(layers) > 1, "Input needs to be a list of at least two Layers"
        self.layers = layers
        self.x = np.zeros(1)
        self.target = np.zeros(1)
        self.current_state = np.zeros(1)
        assert callable(cost_function), "Chose a valid error function"
        self.cost_function = cost_function
        self.l_error = list() # Error over time is saved here

                
    def load_data(self, x: np.ndarray, target: np.ndarray):
        # Check if input and output have the same amount of cases 
        assert len(x) == len(target), f"Input and target output contain a different number of cases ({len(x)} vs. {len(target)})"
        # Check if x and target are numeric numpy arrays
        assert np.issubdtype(x.dtype, np.number) & np.issubdtype(target.dtype, np.number), "Both input and target need to be numeric arrays"
        
        self.x = x.copy()
        self.target = target.copy()
        
    
    def _init_weights(self):
        # First we infer the input size for each of the layer, except the first one
        for i, layer in enumerate(self.layers):
            if i == 0:
                assert layer.input_size, "The first layer need to be initialized with the parameter 'input_size'"
            else:
                layer.input_size = self.layers[i-1].size

        # Initialize the weights with random noise
        np.random.RandomState(42)
        sigma = 0.03
        
        # Then we initialize the weights by using the input size (+1 bias Unit) and amount of units
        for layer in self.layers:
            layer.weights = sigma * np.random.randn(layer.input_size + 1, layer.size)
    
    
    def train(self, n_epochs: int, alpha=0.01):
        for epoch in tqdm(range(n_epochs)):
            # Calculate forward
            self.current_state = self.calc_output(self.x)
            
            error_epoch = self.cost_function(self.current_state, self.target)
            logging.debug(f"Error in epoch {epoch}: {error_epoch}")
            self.l_error.append(error_epoch)
                    
            # Calculate backwards
            # Start with calculating the error/loss at each layer            
            for i, layer in enumerate(reversed(self.layers)):
                if i == 0:
                    layer.error = np.subtract(self.current_state, self.target)
                    # Start with calculating the error/loss at the output
                else:
                    layer.calc_error(prev_error=self.layers[len(self.layers) - i].error, prev_weights=self.layers[len(self.layers) - i].weights)
            
            # Then calculate the partial derivative and update the weights
            for layer in self.layers:
                layer.update_weights(alpha)
                
    def train_sgd(self, n_epochs: int, alpha=0.01):
        for epoch in tqdm(range(n_epochs)):

            for i_step, example in enumerate(self.x):
                # Calculate forward
                self.current_state = example
                for layer in self.layers:
                    layer.forward(self.current_state)
                    self.current_state = layer.activations_out

                mse_epoch = mse(self.current_state, self.target[i_step])
                self.l_error.append(mse_epoch)
                #if not (epoch % 10):
                #    if mse_epoch > (min(self.l_error) * 1.1):
                #        alpha = alpha/2
                #        print("Devide alpha by 2")

                # Calculate backwards
                # Start with calculating the error/loss at each layer            
                for i, layer in enumerate(reversed(self.layers)):
                    if i == 0:
                        layer.error = np.subtract(self.current_state, self.target[i_step])
                        # Start with calculating the error/loss at the output
                    else:
                        layer.calc_error(prev_error=self.layers[len(self.layers) - i].error, prev_weights=self.layers[len(self.layers) - i].weights)

                # Then calculate the partial derivative and update the weights
                for layer in self.layers:
                    layer.update_weights(alpha)
                
    def plot_error(self):
        plt.plot(range(len(self.l_error)), self.l_error)
        plt.show()
        
    
    def calc_output(self, _input):
        # Calculate 
        current_state = _input
        for layer in self.layers:
            layer.forward(current_state)
            current_state = layer.activations_out
        return current_state
    

def unison_shuffled_copies(a, b):
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

In [4]:
class Layer:
    
    def __init__(self, size: int, activation, input_size=False):
        assert isinstance(size, int), "The number of nodes needs to be of type int"
        self.size = size
        assert callable(activation), "Chose a valid activation function"
        self.activation = activation
        self.activations_in = np.zeros(1)
        self.activations_out = np.zeros(size)
        self.error = np.zeros(size)
        self.weights = np.zeros(size)
        self.isfirst = False
        self.input_size = input_size
        if input_size:
            self.isfirst = True
        
    def __len__(self):
        return self.size


class Dense(Layer):

    def forward(self, activations_in):
        # Save incoming activations for later backpropagation and add bias unit
        if activations_in.ndim == 1:
            ones_shape = 1
        else:
            ones_shape = (len(activations_in), 1) + activations_in.shape[2:]
        self.activations_in = np.hstack((np.ones(shape=ones_shape), activations_in))
        self.activations_out = self.activation(np.dot(self.activations_in, self.weights)) 

    def calc_error(self, prev_error, prev_weights):
        self.error = np.dot(prev_error, prev_weights.T[:, 1:]) * self.activation(self.activations_out, deriv=True)
    
    def update_weights(self, alpha):
        # The first layer does have one weight less due to the missing bias unit
        # Calculate the partial derivatives for the Error in respect to each weight
        if self.isfirst:
            if self.activations_in.ndim == 1:
                partial_derivative = self.activations_in[:, np.newaxis] * self.error[np.newaxis, :]
                gradient = partial_derivative
            else:
                partial_derivative = self.activations_in[:, :1, np.newaxis] * self.error[: , np.newaxis, :]
                gradient = np.average(partial_derivative, axis=0)
        else:
            if self.activations_in.ndim == 1:
                partial_derivative = self.activations_in[:, np.newaxis] * self.error[np.newaxis, :]
                gradient = partial_derivative
            else:
                partial_derivative = self.activations_in[:, :, np.newaxis] * self.error[: , np.newaxis, :]
                gradient = np.average(partial_derivative, axis=0)
        #print(f"Weights before update: {self.weights}")
        self.weights += -alpha * gradient
        #print(f"Weights after update: {self.weights}")
        

def batch_gd(weights, alpha, gradient):
    return -alpha * gradient + weights
    

def stochastic_gd():
    pass

    
def mini_batch_gd():
    pass

    
class Activision(Layer):
    
    def forward(self, activations_in):
        self.activations_out = self.activation(np.dot(activations_in, weights))
        
    def backward(self):
        pass                             


In [5]:
class Optimizer:
    pass

class SGD(Optimizer):
    def __init__(self, batch_size, momentum=0.9):
        pass
        
    def gradient_decent(self, weights, gradient, learning_rate=0.03):
        return weights - learning_rate * gradient
    
    def momentum(self, gradient, rate=0.9):
        pass
        
    
        
        

In [None]:
logging.basicConfig(level='ERROR')

data = load_iris()
x = data['data']
target = data['target']

# One-Hot-Encoding the output (since its categorical)
n_categories = 3
y = np.eye(n_categories)[target.astype(int)]

#x, y = unison_shuffled_copies(x, y)

nn = NeuralNetwork([Dense(6, sigmoid, input_size=4), Dense(3, softmax)], cost_function=SquaredError())
nn.load_data(x, y)
nn.init_weights()

nn.train(50, alpha=0.1)

In [None]:
data = fetch_mldata('MNIST original', data_home='./')

x = data['data']
target = data['target']

# One-Hot-Encoding the output (since its categorical)
n_categories = 10
y = np.eye(n_categories)[target.astype(int)]

nn = NeuralNetwork([Dense(100, sigmoid, input_size=784), Dense(n_categories, softmax)], cost_function=CrossEntropy())
nn.load_data(x, y)
nn.init_weights()

nn.train(500, alpha=0.3)



HBox(children=(IntProgress(value=0, max=500), HTML(value='')))

In [None]:
nn.plot_error()

In [None]:
prediction = nn.calc_output(x)
prediction = np.argmax(prediction, axis=1)
(prediction == target).sum() / len(target)