In [1]:
import numpy as np

from numpy import ndarray

from typing import List, Tuple

In [2]:
from sklearn.datasets import load_boston
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error

In [3]:
def assert_same_shape(array: ndarray,
                      array_grad: ndarray):
    
    assert array.shape == array_grad.shape, \
        """
        Two ndarrays should have the same shape;
        instead, first ndarray's shape is {0}
        and second ndarray's shape is {1}.
        """.format(tuple(array.shape), tuple(array_grad.shape))
    
    return None

### Base Class

mental model for all operations that will encounter throughout deep learning. Sends inputs forward and gradients backward and tests, if shapes are matching.

Operation class is needed for:
   - activation function
   
ParamOperation class is needed for:
   - weight multiplication
   - bias addition

In [4]:
class Operation():
    """Base class for an artificial neural network"""
    def __init__(self):
        pass
    
    def forward(self, input_: ndarray) -> ndarray:
        """
        stores input in the self.input_ instance variable
        calls """
        self.input_ = input_
        
        self.output = self._output()
        
        return self.output
        
    def backward(self, output_grad: ndarray) -> ndarray:
        """
        calls the _input_grad function
        """
        assert_same_shape(self.output, output_grad)

        self.input_grad = self._input_grad(output_grad)
        
        assert_same_shape(self.input_, self.input_grad)

        return self.input_grad

    def _output(self) -> ndarray:

        raise NotImplementedError()
        
        
    def _input_grad(self, output_grad: ndarray) -> ndarray:

        raise NotImplementedError()

In [5]:
class ParamOperation(Operation):
    """
    allows operation with parameters
    """
    
    def __init__(self, param: ndarray) -> ndarray:
        
        super().__init__()
        self.param = param
        
    def backward(self, output_grad: ndarray) -> ndarray:
        """
        Calls self._input and self._param_grad
        """
        
        assert_same_shape(self.output, output_grad)

        self.input_grad = self._input_grad(output_grad)
        self.param_grad = self._param_grad(output_grad)

        assert_same_shape(self.input_, self.input_grad)
        assert_same_shape(self.param, self.param_grad)
        
        return self.input_grad
    
    def _param_grad(self, output_grad: ndarray) -> ndarray:
        
        raise NotImplementedError()


## Building Blocks

there are three kinds if blocks:

- matrix multiplication of the input with the parameter matrix
- addition of the bias term
- activation function (here sigmoid)

In [6]:
class WeightMultiply(ParamOperation):
    """Weight multiplication for a neural network"""
    
    def __init__(self, W: ndarray):
        """Init Operation with self.param = W"""
        super().__init__(W)
        
    def _output(self) -> ndarray:
        
        return np.dot(self.input_, self.param)
    
    def _input_grad(self, output_grad: ndarray) -> ndarray:
        
        return np.dot(output_grad, np.transpose(self.param, (1,0)))
        
    def _param_grad(self, output_grad: ndarray) -> ndarray:
        
        return np.dot(np.transpose(self.input_,(1,0)), output_grad)

In [7]:
class BiasAdd(ParamOperation):
    """Add bias"""
    
    def __init__(self, B: ndarray):
        """Init Operation with self.param = B"""
        assert B.shape[0] == 1
        
        super().__init__(B)
        
    def _output(self) -> ndarray:
        
        return self.input_ + self.param
    
    def _input_grad(self, output_grad: ndarray) -> ndarray:
        
        return np.ones_like(self.input_) * output_grad
        
    def _param_grad(self, output_grad: ndarray) -> ndarray:
        
        param_grad = np.ones_like(self.param) * output_grad
        return np.sum(param_grad, axis=0).reshape(1, param_grad.shape[1])

In [8]:
class Sigmoid(Operation):
    """Sigmoid activation function"""
    
    def __init__(self) -> None:
        
        super().__init__()
        
    def _output(self) -> ndarray:
        return 1.0/(1.0 + np.exp(-1.0 * self.input_))
    
    def _input_grad(self, output_grad: ndarray) -> ndarray:
        
        sigmoid_backward = self.output * (1 - self.output)
        input_grad = sigmoid_backward * output_grad
        return input_grad

In [9]:
class Linear(Operation):
    """
    Identity" activation function
    """

    def __init__(self) -> None:

        super().__init__()

    def _output(self) -> ndarray:

        return self.input_

    def _input_grad(self, output_grad: ndarray) -> ndarray:

        return output_grad

## Layer Blueprint

- forward and backward methods simply involve sending the input forwards through a series of operations

    - defining the correct series of operations in a setup_layer function and initializing and storing the parameters in these operations
    
    - storing the correct values in self.input_ and self.output on the forward method
    
    - performing the correct assertion checking in the backward method

In [10]:
class Layer():
    """layer in a neural network"""
    
    def __init__(self, neurons: int):
        
        self.neurons = neurons
        self.first = True
        self.params: List[ndarray] = []
        self.param_grad: List[ndarray] = []
        self.operations: List[ndarray] = []
            
    def _setup_layer(self, num_in: int) -> None:
        
        raise NotImplementedError()
        
    def forward(self, input_: ndarray) -> ndarray:
        
        if self.first:
            self._setup_layer(input_)
            self.first = False
            
        self.input_ = input_
        
        for operation in self.operations:
            
            input_ = operation.forward(input_)
            
        self.output = input_
        
        return self.output
    
    def backward(self, output_grad: ndarray) -> ndarray:
        
        assert_same_shape(self.output, output_grad)
        
        for operation in reversed(self.operations):
            output_grad = operation.backward(output_grad)
            
        input_grad = output_grad
        
        self._param_grads()
        
        return input_grad
    
    def _param_grads(self) -> ndarray:
        
        self.param_grads = []
        
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.param_grads.append(operation.param_grad)
                
    def _params(self) -> ndarray:
        
        self.params = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.params.append(operation.param)

### Dense Layer

In [11]:
class Dense(Layer):
    """fully connected layer"""
    
    def __init__(self,
                 neurons: int,
                 activation: Operation = Sigmoid()) -> None:
        """requires an activation function upon initialization"""
        
        super().__init__(neurons)
        self.activation = activation
        
    def _setup_layer(self, input_: ndarray) -> None:
        """
        defines options for a fully connected layer
        """
        if self.seed:
            np.random.seed(self.seed)
            
        self.params = []
        
        # weights
        self.params.append(np.random.randn(input_.shape[1], self.neurons))
        
        #bias
        self.params.append(np.random.randn(1, self.neurons))

        self.operations = [WeightMultiply(self.params[0]),
                           BiasAdd(self.params[1]),
                           self.activation]
        
        return None

## Loss Class

In [12]:
class Loss(object):
    """loss calculation of the network"""
    
    def __init__(self):
        pass
    
    def forward(self, prediction: ndarray, target: ndarray) -> float:
        """
        computes the actual loss value
        """
        assert_same_shape(prediction, target)
        
        self.prediction = prediction
        self.target = target
        
        loss_value = self._output()
        
        return loss_value
    
    
    def backward(self) -> ndarray:
        """
        computes gradient of the loss value with respect to the input of the loss function
        """
        self.input_grad = self._input_grad()

        assert_same_shape(self.prediction, self.target)
        
        return self.input_grad
    
    def _output(self) -> float:
        """
        every subclass of "loss" had to implement the output function!
        """
        raise NotImplementedError()
        
    def _input_grad(self) -> float:
        """
        every subclass of "loss" had to implement the input_grad function!
        """
        raise NotImplementedError()

In [13]:
class MeanSquaredError(Loss):
    
    def __init__(self):
        super().__init__()
        
    def _output(self) -> float:
        """
        computes the observation squared error loss
        """
        loss = np.sum(np.power(self.prediction - self.target, 2)) / self.prediction.shape[0]
        
        return loss
        
    def _input_grad(self) -> ndarray:
        """
        calculates the loss gradient with respect to the input of the mse loss
        """
        return 2.0 * (self.prediction - self.target) / self.prediction.shape[0]

## "NeuralNetwork" class

Basically the class should take batches of observations and targets und learn the relationship between X and y.
With the Layer and Operation classes, it needs following:

   - take X and pass it forward through each layer until the result presents the prediction
   - prediction should be compared to the true value to calculate the loss and loss gradient. (partial derivative of the loss with respect to each element in the last layer of the network)
   - the loss gradient will be send backward though each layer, along the way computing the parameter gradients. (partial derivatives of the loss with respect to each parameter)

In [14]:
class NeuralNetwork():
    
    
    def __init__(self, 
                 layers: List[Layer],
                 loss: Loss,
                 seed: float=1):
        
        self.layers = layers
        self.loss = loss
        self.seed = seed
        if seed:
            for layer in self.layers:
                setattr(layer, "seed", self.seed)
    
    def forward(self, x_batch: ndarray) -> ndarray:
        """
        passes data forward through the series of layers
        """
        x_out = x_batch
        for layer in self.layers:
            x_out = layer.forward(x_out)
            
        return x_out
    
    def backward(self, loss_grad: ndarray) -> None:
        """
        passes loss gradient backward through the series of layers
        """
        
        grad = loss_grad
        
        for layer in reversed(self.layers):
            grad = layer.backward(grad)
    
        return None
    
    def train_batch(self,
                    x_batch: ndarray,
                    y_batch: ndarray) -> float:
        """
        Passes data forward through the layers.
        Computes the loss.
        Passes data backward through the layers.
        """
        
        prediction = self.forward(x_batch)

        loss = self.loss.forward(prediction, y_batch)

        self.backward(self.loss.backward())

        return loss
    
    def params(self):
        """
        Gets parameters for the network.
        """
        for layer in self.layers:
            yield from layer.params

    def param_grads(self):
        """
        Gets the gradient of the loss with respect to the parameters for the network.
        """
        for layer in self.layers:
            yield from layer.param_grads
        


## Trainer and Optimizer

In [15]:
class Optimizer(object):
    """
    Base class for an optimizer.
    """
    def __init__(self,
                 lr: float = 0.01):

        self.lr = lr

    def step(self) -> None:

        pass

In [16]:
class SGD(Optimizer):
    """
    Stochasitc gradient descent optimizer.
    """    
    def __init__(self,
                 lr: float = 0.01) -> None:

        super().__init__(lr)

    def step(self):
        """
        Update each parameter ased on the learning rate.
        """
        for (param, param_grad) in zip(self.net.params(),
                                       self.net.param_grads()):

            param -= self.lr * param_grad

In [17]:
class Trainer(object):
    """
    Trains a neural network
    """
    def __init__(self,
                 net: NeuralNetwork,
                 optim: Optimizer) -> None:
        """
        Assign the neural network as an instance variable to the optimizer.
        """
        self.net = net
        self.optim = optim
        setattr(self.optim, 'net', self.net)
        
    def generate_batches(self,
                         X: ndarray,
                         y: ndarray,
                         size: int = 32) -> Tuple[ndarray]:
        """
        Generates training batches  
        """
        assert X.shape[0] == y.shape[0], \
            """
            features and target must have the same number of rows, instead
            features has {0} and target has {1}
            """.format(X.shape[0], y.shape[0])

        N = X.shape[0]

        for ii in range(0, N, size):
            X_batch, y_batch = X[ii:ii+size], y[ii:ii+size]

            yield X_batch, y_batch

            
    def fit(self, X_train: ndarray, y_train: ndarray,
            X_valid: ndarray, y_valid: ndarray,
            epochs: int=100,
            eval_every: int=10,
            batch_size: int=32,
            seed: int = 1,
            restart: bool = True)-> None:
        """
        Fits the neural network on the training data
        Every "eval_every" epochs, it evaluated the neural network on the validation data.
        """
        np.random.seed(seed)
        if restart:
            for layer in self.net.layers:
                layer.first = True

        for e in range(epochs):
                
            X_train, y_train = permute_data(X_train, y_train)

            batch_generator = self.generate_batches(X_train, y_train,
                                                    batch_size)

            for ii, (X_batch, y_batch) in enumerate(batch_generator):

                self.net.train_batch(X_batch, y_batch)

                self.optim.step()

            if (e+1) % eval_every == 0:

                valid_preds = self.net.forward(X_valid)
                loss = self.net.loss.forward(valid_preds, y_valid)

                print(f"Validation loss after {e+1} epochs is {loss:.3f}")


### helper functions

In [18]:
def permute_data(X, y):
    perm = np.random.permutation(X.shape[0])
    return X[perm], y[perm]

In [19]:
def to_2d_np(a: ndarray, 
          type: str="col") -> ndarray:
    '''
    Turns a 1D Tensor into 2D
    '''

    assert a.ndim == 1, \
    "Input tensors must be 1 dimensional"
    
    if type == "col":        
        return a.reshape(-1, 1)
    elif type == "row":
        return a.reshape(1, -1)

In [20]:
def eval_model(model: NeuralNetwork,
                          X_test: ndarray,
                          y_test: ndarray):

    prediction = model.forward(X_test)
    prediction = prediction.reshape(-1, 1)
    print("Mean absolute error: {:.2f}".format(mean_absolute_error(prediction, y_test)))
    print("\nRoot mean squared error {:.2f}".format(mean_squared_error(prediction, y_test, squared=False)))

### application

In [21]:
linear_regresion = NeuralNetwork(layers=[Dense(neurons=1,
                                               activation=Linear())],
                                loss=MeanSquaredError(),
                                seed=1
                                )

neural_network = NeuralNetwork(layers=[Dense(neurons=13,
                                            activation=Sigmoid()),
                                      Dense(neurons=1,
                                            activation=Linear())],
                                loss=MeanSquaredError(),
                                seed=1
                                )

neural_network_2 = NeuralNetwork(layers=[Dense(neurons=13,
                                            activation=Sigmoid()),
                                         Dense(neurons=13,
                                            activation=Sigmoid()),
                                      Dense(neurons=1,
                                            activation=Linear())],
                                loss=MeanSquaredError(),
                                seed=1
                                )

In [22]:
boston = load_boston()
data = boston.data
target = boston.target
features = boston.feature_names

In [23]:
s = StandardScaler()
data = s.fit_transform(data)

In [24]:
X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=0.3, random_state=1)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.3, random_state=1)

# make target 2d array
y_train, y_val, y_test = to_2d_np(y_train), to_2d_np(y_val), to_2d_np(y_test)

In [25]:
trainer = Trainer(linear_regresion, SGD(lr=0.01))

trainer.fit(X_train, y_train, X_val, y_val,
       epochs = 50,
       eval_every = 10,
       seed=1);

Validation loss after 10 epochs is 41.396
Validation loss after 20 epochs is 21.537
Validation loss after 30 epochs is 22.436
Validation loss after 40 epochs is 23.285
Validation loss after 50 epochs is 23.175


In [26]:
eval_model(linear_regresion, X_test, y_test)

Mean absolute error: 3.44

Root mean squared error 4.50


In [27]:
trainer = Trainer(neural_network, SGD(lr=0.01))

trainer.fit(X_train, y_train, X_val, y_val,
       epochs = 50,
       eval_every = 10,
       seed=1);

Validation loss after 10 epochs is 31.520
Validation loss after 20 epochs is 26.878
Validation loss after 30 epochs is 22.657
Validation loss after 40 epochs is 20.736
Validation loss after 50 epochs is 18.417


In [28]:
eval_model(neural_network, X_test, y_test)

Mean absolute error: 2.92

Root mean squared error 4.02


In [29]:
trainer = Trainer(neural_network_2, SGD(lr=0.01))

trainer.fit(X_train, y_train, X_val, y_val,
       epochs = 50,
       eval_every = 10,
       seed=1);

Validation loss after 10 epochs is 38.433
Validation loss after 20 epochs is 28.609
Validation loss after 30 epochs is 19.886
Validation loss after 40 epochs is 18.124
Validation loss after 50 epochs is 15.767


In [30]:
eval_model(neural_network_2, X_test, y_test)

Mean absolute error: 2.53

Root mean squared error 3.48
