In [1]:
import numpy as np
from copy import deepcopy

In [2]:
def assert_same_shape(a, b):
    assert a.shape == b.shape

In [3]:
def permute_data(x, y):
    perm = np.random.permutation(x.shape[0])
    
    return x[perm], y[perm]

In [4]:
class Operation():
    '''
    Base class for an operation in a neural network
    '''
    
    def __init__(self):
        pass
    
    
    def forward(self, input_):
        '''
        Stores input in the self.input instance variable
        Calls the self._output function
        '''
        self.input_ = input_
        self.output = self._output()
        
        return self.output
    
    
    def backward(self, output_grad):
        '''
        Calls the self._input_grad() function.
        Chekcs that the appropriate shapes match
        '''
        
        assert_same_shape(self.output, output_grad)
        
        self.input_grad = self._input_grad(output_grad)
        
        assert_same_shape(self.input_, self.input_grad)
        
        return self.input_grad
    
    def _output(self):
        '''
        The _output method must be defined for each Operation
        '''
        raise NotImplementedError()
        
        
    def _input_grad(self, output_grad):
        '''
        The _input_grad method must be defined for each Operation
        '''
        raise NotImplementedError()

In [5]:
class ParamOperation(Operation):
    '''
    An Operation with parameters.
    '''
    
    def __init__(self, param):
        '''
        The ParamOperation method
        '''
        super().__init__()
        self.param = param
        
    def backward(self, output_grad):
        '''
        Calls self.input_grad and self._param_grad.
        Checks appropriate shapes.
        '''
        
        assert_same_shape(self.output, output_grad)
        
        self.input_grad  = self._input_grad(output_grad)
        self.param_grad = self._param_grad(output_grad)
        
        assert_same_shape(self.input_, self.input_grad)
        assert_same_shape(self.param, self.param_grad)
        
        return self.input_grad
    
    def _param_grad(self, output_grad):
        '''
        Every subclass of ParamOperation must implement _param_grad
        '''
        raise NotImplementedError()

In [6]:
class WeightMultiply(ParamOperation):
    '''
    Weight multiplication operation for a neural network
    '''
    
    def __init__(self, W):
        '''
        Initialize Operation with self.param = W
        '''
        super().__init__(W) 
    
    def _output(self):
        '''
        Compute output
        '''
        return np.dot(self.input_, self.param)
       
    def _input_grad(self, output_grad):
        '''
        Compute input gradient with respect to loss
        '''
        return np.dot(output_grad, np.transpose(self.param, (1, 0)))    
    
    def _param_grad(self, output_grad):
        '''
        Compute parameter gradient
        '''
        return np.dot(np.transpose(self.input_, (1, 0)), output_grad)

In [7]:
class BiasAdd(ParamOperation):
    '''
    Compute bias addition
    '''
    
    def __init__(self, B):
        '''
        initalize self.param = B
        '''
        assert B.shape[0] == 1
        super().__init__(B)        
        
    def _output(self):
        '''
        Compute output
        '''
        return self.input_ + self.param    
    
    def _input_grad(self, output_grad):
        '''
        Computes input gradient with respect to loss
        '''
        return np.ones_like(self.input_) * output_grad
    
    def _param_grad(self, output_grad):
        '''
        Compute parameter gradient
        '''
        param_grad = output_grad * np.ones_like(self.param)
        return np.sum(param_grad, axis=0).reshape(1, param_grad.shape[1])

In [8]:
class Sigmoid(Operation):
    '''
    Sigmoid activation function
    '''
    
    def __init__(self):
        super().__init__()
        
    def _output(self):
        '''
        Compute output
        '''
        return 1 / (1+np.exp(-1.0 * self.input_))
    
    def _input_grad(self, output_grad):
        '''
        Compute input gradient
        '''
        sigmoid_backward = self.output * (1.0 - self.output)
        input_grad = sigmoid_backward * output_grad
        
        return input_grad

In [9]:
class Linear(Operation):
    '''
    'Identity' activation function
    '''
    
    def __init__(self):
        super().__init__()
        
    def _output(self):
        return self.input_
    
    def _input_grad(self, output_grad):
        return output_grad

In [10]:
class Layer():
    '''
    A "layer" of neurons in a neural network
    '''
    
    def __init__(self, neurons):
        self.neurons = neurons
        self.first = True
        self.params = []
        self.param_grads = []
        self.operations = []
        
    def _setup_layer(self, num_in):
        '''
        The _setup_layer function must be implemented for each layer.
        '''
        raise NotImplementedEror()
        
    def forward(self, input_):
        '''
        Passes input forward through a series of operations.
        '''
        if self.first:
            self._setup_layer(input_)
            self.first = False
            
        self.input_ = input_
        
        for operation in self.operations:
            input_ = operation.forward(input_)
            
        self.output = input_
        
        return self.output
    
    def backward(self, output_grad):
        '''
        Passes output_grad backward through a series of operations.
        Checks appropriate shapes.
        '''
        
        assert_same_shape(self.output, output_grad)
        
        for operation in reversed(self.operations):
            output_grad = operation.backward(output_grad)
        
        input_grad = output_grad
        
        self._param_grads()
        
        return input_grad
    
    def _param_grads(self):
        '''
        Extracts the _param_grads from a layer's operations
        '''
        
        self.param_grads = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.param_grads.append(operation.param_grad)
    
    def _params(self):
        '''
        Extracts the _params from a layer's operations
        '''
        
        self.params = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.params.append(operation.param)

In [11]:
class Dense(Layer):
    '''
    A fully conneted layer that inherits from "Layer."
    '''
    
    def __init__(self, neurons, activation):
        '''
        Requires an activation function upon intialization.
        this program uses sigmoid as the activation function
        '''
        super().__init__(neurons)
        self.activation = activation
        
    def _setup_layer(self, input_):
        '''
        Defines the operations of a fully connected layer.
        '''
        if self.seed:
            np.random.seed(self.seed)
        
        self.params = []
        
        # weights
        self.params.append(np.random.randn(input_.shape[1], self.neurons))
        
        # bias
        self.params.append(np.random.randn(1, self.neurons))
        
        self.operations = [WeightMultiply(self.params[0]), BiasAdd(self.params[1]), self.activation]
        
        return None

In [12]:
class Loss():
    '''
    Objectifies loss of the neural network
    '''
    
    def __init__(self):
        pass
    
    def forward(self, prediction, target):
        '''
        computes actual loss
        '''
        assert_same_shape(prediction, target)
        
        self.prediction = prediction
        self.target = target
        
        loss_value = self._output()
        
        return loss_value
    
    def backward(self):
        '''
        Computes gradients of loss function with respect to its input
        '''
        self.input_grad = self._input_grad()
        
        assert_same_shape(self.prediction, self.input_grad)
        
        return self.input_grad
        
    
    def _output(self):
        '''
        Every subclass should have its own _output() method
        '''
        raise NotImplementedError()
        
    def _input_grad(self):
        '''
        Every subclass should have its own _input_grad() method
        '''
        raise NotImplementedError()

In [13]:
class MeanSquaredError(Loss):
    '''
    The meansquarederror loss of a neural network
    '''     
    def __init__(self):
        super().__init__()
    
    def _output(self):
        
        loss = np.sum(np.power(self.prediction-self.target, 2)) / self.prediction.shape[0]
        return loss
    
    def _input_grad(self):
        '''
        returns gradient of loss with respect to input for MSE loss'''
        
        return (2.0 * (self.prediction - self.target) / self.prediction.shape[0])

In [14]:
class NeuralNetwork():
    
    def __init__(self, layers, loss, seed):
        self.layers = layers
        self.loss = loss
        self.seed = seed
        
        if seed:
            for layer in self.layers:
                setattr(layer, "seed", self.seed)
                
    def forward(self, x_batch):
        '''
        Passes data through a series of layers
        '''
        
        x_input = x_batch
        
        for layer in self.layers:
            x_input = layer.forward(x_input)
        
        return x_input
    
    def backward(self, loss_grad):
        '''
        computes gradient in a backward pass
        '''
        
        grad = loss_grad
        for layer in reversed(self.layers):
            grad = layer.backward(grad)
            
        return None
    
    def train_batch(self, x_batch, y_batch):
        '''
        passes data forward through layers
        returns loss of a layer
        passes data backward through the layers
        '''
        
        predictions = self.forward(x_batch)
        
        loss = self.loss.forward(predictions, y_batch)
        
        # self.loss.backward() function call is passed through to seflf.backward()
        # and is the first value for 'grad' ebefore it is looped
        self.backward(self.loss.backward())
        
        return loss
    
    def param(self):
        '''
        Gets the parameters for the network
        '''
        for layer in self.layers:
            yield from layer.params
            
    def param_grad(self):
        '''
        Gets the parameter gradient for the the network
        '''
        for layer in self.layers:
            yield from layer.param_grads

In [15]:
class Optimizer():
    '''
    Base class for a neural network optimizer
    '''
    
    def __init__(self, init_learning_rate):
        # Every optimizer must have an initial learning rate
        self.init_learning_rate = init_learning_rate
        
    def set(self):
        '''
        Subclass should have its own defined .set() method
        '''
        raise NotImplementedError()

In [16]:
class SGD(Optimizer):
    
    def __init__(self, init_learning_rate):
        super().__init__(init_learning_rate)
        
    def step(self):
        '''
        For each parameter, adjust in the appropriate direction, with the magnitude
        of the adjusment based of the learning rate
        '''
        for param, param_grad in zip(self.net.param() , self.net.param_grad()):
            param -= self.init_learning_rate * param_grad

In [17]:
class Trainer():
    '''
    Trains a neural network
    '''
    
    def __init__(self, net, optim):
        self.net = net
        self.optim = optim
        self.start = True
        
        setattr(self.optim, "net", self.net)
        
        
        
    def generate_batches(self, x, y, batch_size):
        
        assert x.shape[0] == y.shape[0]
        
        for i in range(0, x.shape[0], batch_size):
            
            x_batch = x[i:i + batch_size]
            y_batch = y[i:i + batch_size]
            
        yield x_batch, y_batch
        
        
    def fit(self,
            x_train,
            y_train,
            x_test,
            y_test,
            epochs,
            eval_every,
            seed,
            batch_size):
        '''
        Fits the neural network on the training data for a certain number of epochs
        Evaluates the neurl network every 'evel_every' epochs
        '''
    
        np.random.seed(seed)
    
        if self.start:
            for layer in self.net.layers:
                layer.first = True
            self.start = False

            self.least_loss = 1e9

        for epoch in range(epochs):

            if (epoch+1) % eval_every:

                previous_model = deepcopy(self.net)

            x_train, y_train = permute_data(x_train, y_train)

            batches = self.generate_batches(x_train, y_train, batch_size)

            for i, (x_batch, y_batch) in enumerate(batches):
                
                loss = self.net.train_batch(x_batch, y_batch)

                self.optim.step()

            if (epoch+1) % eval_every == 0:

                preds = self.net.forward(x_test)
                loss = self.net.loss.forward(preds, y_test)

                if loss < self.least_loss:
                    print(f"Validation loss after epoch {epoch+1} is {loss:.3f}")
                    self.least_loss = loss
                else:
                    print(f"Loss increased after epoch {epoch+1}, final lass was {self.least_loss:.3f}; using model from epoch {epoch+1-eval_every}")
                    self.net = previous_model
                    setattr(self.optim, "net", self.net)

In [18]:
def eval_regression_model(model, x_test, y_test):
    preds = model.forward(x_test)
    
    mae = np.mean(np.abs(preds - y_test))
    rmse = np.sqrt(np.mean(np.power(preds - y_test, 2)))
    
    print(f"Mean Absolute Error: {mae:.2f}")
    print(f"Root Mean Square Error: {rmse:.2f}")

In [19]:
linear_regression = NeuralNetwork(
                                layers = [Dense(neurons = 1, activation=Linear())],
                                loss = MeanSquaredError(),
                                seed=20190501
                                )

neural_network = NeuralNetwork(
                            layers = [
                                Dense(neurons = 13, activation = Sigmoid()),
                                Dense(neurons = 1, activation = Linear())],
                            loss = MeanSquaredError(),
                            seed=20190501
                                )

deep_network = NeuralNetwork(
                            layers = [
                                Dense(neurons = 13, activation = Sigmoid()),
                                Dense(neurons = 13, activation = Sigmoid()),
                                Dense(neurons = 1, activation = Linear())],
                            loss = MeanSquaredError(),
                            seed=20190501
                                )

In [20]:
def to_2d_np(a):
    assert a.ndim == 1
    
    return a.reshape(-1, 1)

In [21]:
from sklearn.datasets import load_boston

boston = load_boston()

data = boston.data
target = boston.target
features = boston.feature_names

In [22]:
from sklearn.preprocessing import StandardScaler
s = StandardScaler()
data = s.fit_transform(data)

In [23]:
from sklearn.model_selection import train_test_split

x_train, x_test, y_train, y_test = train_test_split(data, target, test_size=.3, random_state=80718)

In [24]:
y_train, y_test = to_2d_np(y_train), to_2d_np(y_test)

In [25]:
trainer = Trainer(linear_regression, SGD(init_learning_rate=0.01))

trainer.fit(x_train, y_train, x_test, y_test,
           epochs = 50,
           eval_every = 10,
           batch_size = 32,
           seed = 20190501)
print()
eval_regression_model(linear_regression, x_test, y_test)

Validation loss after epoch 10 is 404.322
Validation loss after epoch 20 is 294.336
Validation loss after epoch 30 is 230.459
Validation loss after epoch 40 is 163.284
Validation loss after epoch 50 is 103.427

Mean Absolute Error: 8.19
Root Mean Square Error: 10.17


In [26]:
trainer = Trainer(neural_network, SGD(init_learning_rate = 0.01))

trainer.fit(x_train, y_train, x_test, y_test,
           epochs = 50,
           eval_every = 10,
           batch_size = 32,
           seed = 20190501)
print()
eval_regression_model(neural_network, x_test, y_test)

Validation loss after epoch 10 is 129.548
Validation loss after epoch 20 is 61.646
Validation loss after epoch 30 is 57.538
Validation loss after epoch 40 is 50.866
Validation loss after epoch 50 is 43.314

Mean Absolute Error: 4.53
Root Mean Square Error: 6.58


In [27]:
trainer = Trainer(deep_network, SGD(init_learning_rate=0.01))

trainer.fit(x_train, y_train, x_test, y_test,
           epochs = 50,
           eval_every = 10,
           batch_size = 32,
           seed = 20190501)
print()
eval_regression_model(deep_network, x_test, y_test)

Validation loss after epoch 10 is 109.131
Validation loss after epoch 20 is 83.087
Validation loss after epoch 30 is 78.645
Validation loss after epoch 40 is 74.414
Validation loss after epoch 50 is 71.426

Mean Absolute Error: 6.67
Root Mean Square Error: 8.45
