In [1]:
import numpy as np

In [2]:
class Operation():
    '''
    Base class for an "operation" in a neural network.
    '''
    def __init__(self):
        pass
    def forward(self, input_):
        '''
        Stores input in the self._input instance variable
        Calls the self._output() function.
        '''
        self.input_ = input_
        self.output = self._output()
        return self.output
    def backward(self, output_grad):
        '''
        Calls the self._input_grad() function.
        Checks that the appropriate shapes match.
        '''
        assert_same_shape(self.output, output_grad)
        self.input_grad = self._input_grad(output_grad)
        assert_same_shape(self.input_, self.input_grad)
        return self.input_grad
    def _output(self):
        '''
        The _output method must be defined for each Operation.
        '''
        raise NotImplementedError()
    def _input_grad(self, output_grad):
        '''
        The _input_grad method must be defined for each Operation.
        '''
        raise NotImplementedError()


In [3]:
class ParamOperation(Operation):
    '''
    An Operation with parameters.
    '''
    def __init__(self, param):
        '''
        The ParamOperation method
        '''
        super().__init__()
        self.param = param
    def backward(self, output_grad):
        '''
        Calls self._input_grad and self._param_grad.
        Checks appropriate shapes.
        '''
        assert_same_shape(self.output, output_grad)
        self.input_grad = self._input_grad(output_grad)
        self.param_grad = self._param_grad(output_grad)
        assert_same_shape(self.input_, self.input_grad)
        assert_same_shape(self.param, self.param_grad)
        return self.input_grad
    def _param_grad(self, output_grad):
        '''
        Every subclass of ParamOperation must implement _param_grad.
        '''
        raise NotImplementedError()

In [4]:
class WeightMultiply(ParamOperation):
    '''
    Weight multiplication operation for a neural network.
    '''
    def __init__(self, W):
        '''
        Initialize Operation with self.param = W.
        '''
        super().__init__(W)
    def _output(self):
        '''
        Compute output.
        '''
        return np.dot(self.input_, self.param)
    def _input_grad(self, output_grad):
        '''
        Compute input gradient.
        '''
        return np.dot(output_grad, np.transpose(self.param, (1, 0)))
    def _param_grad(self, output_grad):
        '''
        Compute parameter gradient.
        '''
        return np.dot(np.transpose(self.input_, (1, 0)), output_grad)


In [5]:
class BiasAdd(ParamOperation):
    '''
    Compute bias addition.
    '''
    def __init__(self,B):
        '''
        Initialize Operation with self.param = B.
        Check appropriate shape.
        '''
        assert B.shape[0] == 1
        super().__init__(B)
    def _output(self):
        '''
        Compute output.
        '''
        return self.input_ + self.param
    def _input_grad(self, output_grad):
        '''
        Compute input gradient.
        '''
        return np.ones_like(self.input_) * output_grad
    def _param_grad(self, output_grad):
        '''
        Compute parameter gradient.
        '''
        param_grad = np.ones_like(self.param) * output_grad
        return np.sum(param_grad, axis=0).reshape(1, param_grad.shape[1])

class Sigmoid(Operation):
    '''
    Sigmoid activation function.
    '''
    def __init__(self):
        '''Pass'''
        super().__init__()
    def _output(self):
        '''
        Compute output.
        '''
        return 1.0/(1.0+np.exp(-1.0 * self.input_))
    def _input_grad(self, output_grad):
        '''
        Compute input gradient.
        '''
        sigmoid_backward = self.output * (1.0 - self.output)
        input_grad = sigmoid_backward * output_grad
        return input_grad

In [6]:
class Layer(object):
    '''
    A "layer" of neurons in a neural network.
    '''
    def __init__(self,neurons):
        '''
        The number of "neurons" roughly corresponds to the "breadth" of the
        layer
        '''
        self.neurons = neurons
        self.first = True
        self.params= []
        self.param_grads = []
        self.operations= []
    def _setup_layer(self, num_in):
        '''
        The _setup_layer function must be implemented for each layer.
        '''
        raise NotImplementedError()
    def forward(self, input_):
        '''
        Passes input forward through a series of operations.
        '''
        if self.first:
            self._setup_layer(input_)
            self.first = False
        self.input_ = input_
        for operation in self.operations:
            input_ = operation.forward(input_)
        self.output = input_
        return self.output
    def backward(self, output_grad):
        '''
        Passes output_grad backward through a series of operations.
        Checks appropriate shapes.
        '''
        assert_same_shape(self.output, output_grad)
        for operation in reversed(self.operations):
            output_grad = operation.backward(output_grad)
        input_grad = output_grad
        self._param_grads()
        return input_grad
    def _param_grads(self):
        '''
        Extracts the _param_grads from a layer's operations.
        '''
        self.param_grads = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.param_grads.append(operation.param_grad)
    def _params(self):
        '''
        Extracts the _params from a layer's operations.
        '''
        self.params = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.params.append(operation.param)


In [7]:
class Dense(Layer):
    '''
    A fully connected layer that inherits from "Layer."
    '''
    def __init__(self,neurons: int,activation: Operation = Sigmoid()):
        '''
        Requires an activation function upon initialization.
        '''
        super().__init__(neurons)
        self.activation = activation
    def _setup_layer(self, input_):
        '''
        Defines the operations of a fully connected layer.
        '''
        if self.seed:
            np.random.seed(self.seed)
        self.params = []
        # weights
        self.params.append(np.random.randn(input_.shape[1], self.neurons))
        # bias
        self.params.append(np.random.randn(1, self.neurons))
        self.operations = [WeightMultiply(self.params[0]),
        BiasAdd(self.params[1]),
        self.activation]
        return None

In [8]:
class Loss(object):
    '''
    The "loss" of a neural network.
    '''
    def __init__(self):
        '''Pass'''
        pass
    def forward(self, prediction, target):
        '''
        Computes the actual loss value.
        '''
        assert_same_shape(prediction, target)
        self.prediction = prediction
        self.target = target
        loss_value = self._output()
        return loss_value
    def backward(self):
        '''
        Computes gradient of the loss value with respect to the input to the
        loss function.
        '''
        self.input_grad = self._input_grad()
        assert_same_shape(self.prediction, self.input_grad)
        return self.input_grad
    def _output(self):
        '''
        Every subclass of "Loss" must implement the _output function.
        '''
        raise NotImplementedError()
    def _input_grad(self):
        '''
        Every subclass of "Loss" must implement the _input_grad function.
        '''
        raise NotImplementedError()

In [9]:
class MeanSquaredError(Loss):
    def __init__(self):
        '''Pass'''
        super().__init__()
    def _output(self):
        '''
        Computes the per-observation squared error loss.
        '''    
        loss =np.sum(np.power(self.prediction - self.target, 2))/self.prediction.shape[0]
        return loss
    def _input_grad(self):
        '''
        Computes the loss gradient with respect to the input for MSE loss.
        '''
        return 2.0 * (self.prediction - self.target)/self.prediction.shape[0]


In [11]:
class NeuralNetwork(object):
    '''
    The class for a neural network.
    '''
    def __init__(self, layers,loss,seed= 1):
        '''
        Neural networks need layers, and a loss.
        '''
        self.layers = layers
        self.loss = loss
        self.seed = seed
        if seed:
            for layer in self.layers:
                setattr(layer, "seed", self.seed)
    def forward(self, x_batch):
        '''
        Passes data forward through a series of layers.
        '''
        x_out = x_batch
        for layer in self.layers:
            x_out = layer.forward(x_out)
        return x_out
    def backward(self, loss_grad):
        '''
        Passes data backward through a series of layers.
        '''
        grad = loss_grad
        for layer in reversed(self.layers):
            grad = layer.backward(grad)
        return None
    def train_batch(self,x_batch,y_batch):
        '''
        Passes data forward through the layers.
        Computes the loss.
        Passes data backward through the layers.
        '''
        predictions = self.forward(x_batch)
        loss = self.loss.forward(predictions, y_batch)
        self.backward(self.loss.backward())
        return loss
    def params(self):
        '''
        Gets the parameters for the network.
        '''
        for layer in self.layers:
            yield from layer.params
    def param_grads(self):
        '''
        Gets the gradient of the loss with respect to the parameters for the
        network.
        '''
        for layer in self.layers:
            yield from layer.param_grads

In [12]:

class Optimizer(object):
    '''
    Base class for a neural network optimizer.
    '''
    def __init__(self,
                 lr: float = 0.01):
        '''
        Every optimizer must have an initial learning rate.
        '''
        self.lr = lr

    def step(self):
        '''
        Every optimizer must implement the "step" function.
        '''
        pass

In [13]:
class SGD(Optimizer):
    '''
    Stochasitc gradient descent optimizer.
    '''    
    def __init__(self,
                 lr: float = 0.01):
        '''Pass'''
        super().__init__(lr)

    def step(self):
        '''
        For each parameter, adjust in the appropriate direction, with the magnitude of the adjustment 
        based on the learning rate.
        '''
        for (param, param_grad) in zip(self.net.params(),
                                       self.net.param_grads()):

            param -= self.lr * param_grad


In [15]:
from copy import deepcopy
from typing import Tuple

class Trainer(object):
    '''
    Trains a neural network
    '''
    def __init__(self,
                 net: NeuralNetwork,
                 optim: Optimizer):
        '''
        Requires a neural network and an optimizer in order for training to occur. 
        Assign the neural network as an instance variable to the optimizer.
        '''
        self.net = net
        self.optim = optim
        self.best_loss = 1e9
        setattr(self.optim, 'net', self.net)
        
    def generate_batches(self,
                         X,
                         y,
                         size: int = 32):
        '''
        Generates batches for training 
        '''
        assert X.shape[0] == y.shape[0], \
        '''
        features and target must have the same number of rows, instead
        features has {0} and target has {1}
        '''.format(X.shape[0], y.shape[0])

        N = X.shape[0]

        for ii in range(0, N, size):
            X_batch, y_batch = X[ii:ii+size], y[ii:ii+size]

            yield X_batch, y_batch

            
    def fit(self, X_train, y_train,
            X_test, y_test,
            epochs: int=100,
            eval_every: int=10,
            batch_size: int=32,
            seed: int = 1,
            restart: bool = True):
        '''
        Fits the neural network on the training data for a certain number of epochs.
        Every "eval_every" epochs, it evaluated the neural network on the testing data.
        '''

        np.random.seed(seed)
        if restart:
            for layer in self.net.layers:
                layer.first = True

            self.best_loss = 1e9

        for e in range(epochs):

            if (e+1) % eval_every == 0:
                
                # for early stopping
                last_model = deepcopy(self.net)

            X_train, y_train = permute_data(X_train, y_train)

            batch_generator = self.generate_batches(X_train, y_train,
                                                    batch_size)

            for ii, (X_batch, y_batch) in enumerate(batch_generator):

                self.net.train_batch(X_batch, y_batch)

                self.optim.step()

            if (e+1) % eval_every == 0:

                test_preds = self.net.forward(X_test)
                loss = self.net.loss.forward(test_preds, y_test)

                if loss < self.best_loss:
                    print(f"Validation loss after {e+1} epochs is {loss:.3f}")
                    self.best_loss = loss
                else:
                    print(f"""Loss increased after epoch {e+1}, final loss was {self.best_loss:.3f}, using the model from epoch {e+1-eval_every}""")
                    self.net = last_model
                    # ensure self.optim is still updating self.net
                    setattr(self.optim, 'net', self.net)
                    break

In [18]:
def mae(y_true, y_pred):
    '''
    Compute mean absolute error for a neural network.
    '''    
    return np.mean(np.abs(y_true - y_pred))

def rmse(y_true, y_pred):
    '''
    Compute root mean squared error for a neural network.
    '''
    return np.sqrt(np.mean(np.power(y_true - y_pred, 2)))

def eval_regression_model(model: NeuralNetwork,
                          X_test,
                          y_test):
    '''
    Compute mae and rmse for a neural network.
    '''
    preds = model.forward(X_test)
    preds = preds.reshape(-1, 1)
    print("Mean absolute error: {:.2f}".format(mae(preds, y_test)))
    print()
    print("Root mean squared error {:.2f}".format(rmse(preds, y_test)))