In [1018]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
from sklearn.model_selection import KFold

%matplotlib inline

In [1019]:
class Loss(object):
    
    def __call__(self, predicted, actual):
        """Calculates the loss as a function of the prediction and the actual.
        
        Args:
          predicted (np.ndarray, float): the predicted output labels
          actual (np.ndarray, float): the actual output labels
          
        Returns: (float) 
          The value of the loss for this batch of observations.
        """
        raise NotImplementedError
        
    def derivative(self, predicted, actual):
        """The derivative of the loss with respect to the prediction.
        
        Args:
          predicted (np.ndarray, float): the predicted output labels
          actual (np.ndarray, float): the actual output labels
          
        Returns: (np.ndarray, float) 
          The derivatives of the loss.
        """
        raise NotImplementedError
        
        
class SquaredErrorLoss(Loss):
    
    def __call__(self, predicted, actual):
        return np.sum(
            (predicted - actual) ** 2
        )
    
    def derivative(self, predicted, actual):
        return (
            2 * (predicted - actual)
        )

In [1020]:
class ActivationFunction(object):
        
    def __call__(self, a):
        """Applies activation function to the values in a layer.
        
        Args:
          a (np.ndarray, float): the values from the previous layer (after 
            multiplying by the weights.
          
        Returns: (np.ndarray, float) 
          The values h = g(a).
        """
        return a
    
    def derivative(self, h):
        """The derivatives as a function of the outputs at the nodes.
        
        Args:
          h (np.ndarray, float): the outputs h = g(a) at the nodes.
          
        Returns: (np.ndarray, float) 
          The derivatives dh/da.
        """
        return np.ones(h.shape)
    
class ReLU(ActivationFunction):
    
    def __call__(self, a):
        return np.where(a > 0, a, 0)
    
    def derivative(self, a):
        return np.where(a > 0, 1, 0)

class Sigmoid(ActivationFunction):
    
    def __call__(self, a):
        return 1/(1 + np.exp(-a))
    
    def derivative(self, a):
        return  self.__call__(a) * (1 - self.__call__(a))

In [1021]:
class Layer(object):
    """A data structure for a layer in a neural network.
    
    Attributes:
      num_nodes (int): number of nodes in the layer
      activation_function (ActivationFunction)
      values_pre_activation (np.ndarray, float): most recent values
        in layer, before applying activation function
      values_post_activation (np.ndarray, float): most recent values
        in layer, after applying activation function
    """
    
    def __init__(self, num_nodes, activation_function=ActivationFunction()):
        self.num_nodes = num_nodes
        self.activation_function = activation_function
        
    def get_layer_values(self, values_pre_activation):
        """Applies activation function to values from previous layer.
        
        Stores the values (both before and after applying activation 
        function)
        
        Args:
          values_pre_activation (np.ndarray, float): 
            A (batch size) x self.num_nodes array of the values
            in layer before applying the activation function
        
        Returns: (np.ndarray, float)
            A (batch size) x self.num_nodes array of the values
            in layer after applying the activation function
        """
        self.values_pre_activation = values_pre_activation
        self.values_post_activation = self.activation_function(
            values_pre_activation
        )
        return self.values_post_activation
    
    def get_layer_derivatives(self, values_pre_activation):
        return self.activation_function.derivative(
            values_pre_activation
        )

In [1052]:
class FullyConnectedNeuralNetwork(object):
    """A data structure for a fully-connected neural network.
    
    Attributes:
      layers (Layer): A list of Layer objects.
      loss (Loss): The loss function to use in training.
      learning_rate (float): The learning rate to use in backpropagation.
      weights (list, np.ndarray): A list of weight matrices,
        length should be len(self.layers) - 1
      biases (list, float): A list of bias terms,
        length should be equal to len(self.layers)
    """
    
    def __init__(self, layers, loss, learning_rate):
        self.layers = layers
        self.loss = loss
        self.learning_rate = learning_rate
        
        # initialize weight matrices and biases to zeros
        self.weights = []
        self.updatedWeights = []
        self.biases = []
        self.updatedBiases = []
        mu, sigma = 0, 1
        for i in range(1, len(self.layers)):
            w = np.matrix(np.random.normal(mu, sigma, (self.layers[i - 1].num_nodes, self.layers[i].num_nodes)))
            self.weights.append(w)
            self.updatedWeights.append(w)
            self.biases.append(np.zeros(self.layers[i].num_nodes))
            self.updatedBiases.append(np.zeros(self.layers[i].num_nodes))

    def feedforward(self, inputs):
        """Predicts the output(s) for a given set of input(s).
        
        Args:
          inputs (np.ndarray, float): A (batch size) x self.layers[0].num_nodes array
          
        Returns: (np.ndarray, float) 
          An array of the predicted output labels, length is the batch size
        """
        self.storedValuesZ = [np.mean(inputs, axis = 0)]
        self.storedValuesA = [np.mean(inputs, axis = 0)]
        a = inputs
        self.inputs = inputs
        ## Iterate layers
        for i, layer in enumerate(self.layers):
            ## g(hw + b),  h = previous layer values
            if i != len(self.layers) - 1:
                z = np.matrix(np.add(a * self.weights[i], np.matrix(self.biases[i])))
                self.storedValuesZ.append(np.mean(z, axis = 0))
                a = np.matrix(self.layers[i + 1].get_layer_values(z))
                self.storedValuesA.append(np.mean(a, axis = 0))
        return a
    
    def predict(self, inputs):
        h = inputs
        ## Iterate layers
        for i, layer in enumerate(self.layers):
            ## g(hw + b),  h = previous layer values
            if i != len(self.layers) - 1:
                a = np.matrix(np.add(h * self.updatedWeights[i], np.matrix(self.updatedBiases[i])))
                h = self.layers[i+1].get_layer_values(a)
        return h
    
    def backProp(self, predicted, actual):
        # Update First weights
        dlda = np.mean(self.loss.derivative(predicted, actual), axis = 0).T

        dadz = self.layers[-1].get_layer_derivatives(self.storedValuesZ[-1]).T
        delta = np.multiply(dlda, dadz)
        self.updatedWeights[-1]= self.weights[-1] - np.multiply(self.learning_rate * delta, self.storedValuesA[-2]).T
        self.updatedBiases[-1] = self.biases[-1] - np.multiply(self.learning_rate, delta).T
        # Update rest of the weights
        for l in range(2, len(self.layers)):
            z = self.storedValuesZ[-l]
            dadz = self.layers[-l].get_layer_derivatives(z)
            delta = np.multiply(self.weights[-l + 1] * delta, dadz.T)
            self.updatedBiases[-l] = self.biases[-l] - np.multiply(self.learning_rate, delta).T
            self.updatedWeights[-l] = self.weights[-l] - np.multiply(self.learning_rate, np.dot(delta, self.storedValuesA[-l - 1])).T
        self.weights = self.updatedWeights
        self.biases = self.updatedBiases
        
    def train(self, inputs, labels):
        """Trains neural network based on a batch of training data.
        
        Args:
          inputs (np.ndarray): A (batch size) x self.layers[0].num_nodes array
          labels (np.ndarray): An array of ground-truth output labels, 
            length is the batch size.
        """
        predicted = self.feedforward(inputs)
        self.backProp(predicted, labels)
        return predicted
    
    def train_epochs(self, inputs, labels, epochs = 50, num_batches = 10):
        kf = KFold(n_splits=num_batches)
        random.shuffle(inputs)
        mini_batches = [inputs]
        for i in range(epochs):
            for train_index, test_index in kf.split(X):
                predicted = self.feedforward(X[train_index])
                self.backProp(predicted, y[train_index])
        predicted = self.predict(X)
        return predicted

In [1053]:
network = FullyConnectedNeuralNetwork(
    layers=[Layer(1),Layer(5, ReLU()), Layer(4, ReLU()), Layer(1)],
    loss = SquaredErrorLoss(),
    learning_rate=0.00001
)

In [1054]:
from sklearn import datasets

In [1055]:
diabetes = datasets.load_diabetes()

In [1056]:
X = np.matrix(diabetes.data[:, np.newaxis, 2])

In [1057]:
y = np.matrix(diabetes.target).T

In [1058]:
t = network.train(X,y)

In [1059]:
p = network.predict(X)

In [1060]:
v = network.train_epochs(X,y, 1000)

In [1061]:
v - y

matrix([[-123.4295279 ],
        [ -47.4295279 ],
        [-113.4295279 ],
        [-178.45704117],
        [-107.41792772],
        [ -69.4295279 ],
        [-110.36819166],
        [ -35.41792772],
        [ -82.4295279 ],
        [-282.39006492],
        [ -73.36819166],
        [ -41.39006492],
        [-151.45704117],
        [-157.24424317],
        [ -90.39006492],
        [-143.41792772],
        [-138.35360948],
        [-116.35360948],
        [ -69.45812253],
        [-140.36819166],
        [ -40.41792772],
        [ -21.41792772],
        [ -40.46048032],
        [-217.41792772],
        [-156.41792772],
        [-174.41792772],
        [-109.35360948],
        [ -57.41792772],
        [-103.41792772],
        [-255.41792772],
        [-101.36819166],
        [ -31.41792772],
        [-313.41792772],
        [ -59.35725502],
        [ -37.41792772],
        [ -74.33538176],
        [-237.41792772],
        [-248.41792772],
        [-224.42660115],
        [ -62.40464709],
