In [1]:
import numpy as np
from numpy import ndarray

from typing import List

In [2]:
class Operation(object):

    def __init__(self):
        pass


    def forward(self,
                input_: ndarray):
        self.input_ = input_

        self.output = self._output()

        return self.output


    def backward(self, output_grad: ndarray) -> ndarray:

        assert_same_shape(self.output, output_grad)

        self._compute_grads(output_grad)

        assert_same_shape(self.input_, self.input_grad)
        return self.input_grad


    def _compute_grads(self, output_grad: ndarray) -> ndarray:
        self.input_grad = self._input_grad(output_grad)


    def _output(self, input_: ndarray) -> ndarray:
        raise NotImplementedError()


    def _input_grad(self, output_grad: ndarray) -> ndarray:
        raise NotImplementedError()

In [3]:
class ParamOperation(Operation):

    def __init__(self, param: ndarray) -> ndarray:
        super().__init__()
        self.param = param

    def backward(self, output_grad: ndarray) -> ndarray:

        assert_same_shape(self.output, output_grad)

        self.input_grad = self._input_grad(output_grad)
        self.param_grad = self._param_grad(output_grad)

        assert_same_shape(self.inputs, self.input_grad)

        return self.input_grad

    def _param_grad(self, output_grad: ndarray) -> ndarray:
        raise NotImplementedError()

In [4]:
class WeightMultiply(ParamOperation):

    def __init__(self, W: ndarray):
        super().__init__(W)

    def _outputs(self) -> ndarray:
        return np.dot(self.input, self.param)

    def _input_grads(self, output_grad: ndarray) -> ndarray:
        return np.dot(output_grad, np.transpose(self.param, (1, 0)))

    def _param_grad(self, output_grad: ndarray) -> ndarray:
        return np.dot(np.transpose(self.input, (1, 0)), output_grad)

In [5]:
a = np.array([[1,2], [3,4]])
print(a)
print(b)
b = np.array([5,6])

In [6]:
class BiasAdd(ParamOperation):

    def __init__(self,
                 B: ndarray):
        
        assert B.shape[0] == 1
        
        super().__init__(B)

    def _outputs(self) -> ndarray:
        
        # self.input.shape[1] == self.param.shape[1]
        return self.input + self.param

    def _input_grad(self, output_grad: ndarray) -> ndarray:
        
        # self.input.shape[1] == self.param.shape[1]
        return np.ones_like(self.input) * output_grad

    def _param_grad(self, output_grad: ndarray) -> ndarray:
        param_grad = torch.ones_like(self.param) * output_grad
        return np.sum(param_grad, axis=0).reshape(1, param_grad.shape[1])

In [7]:
class Layer(object):

    def __init__(self,
                 neurons: int) -> None:
        self.neurons = neurons
        self.first = True
        self.params: List[ndarray] = []
        self.param_grads: List[ndarray] = []
        self.operations: List[Operation] = []

    def _setup_layer(self, num_in: int) -> None:
        pass

    def forward(self, input_: ndarray) -> ndarray:
        if self.first:
            self._setup_layer(input_)
            self.first = False

        self.input_ = input_

        for operation in self.operations:

            input_ = operation.forward(input_)

        self.output = input_

        return self.output

    def backward(self, output_grad: ndarray) -> ndarray:

        assert_same_shapes(self.output, output_grad)

        for operation in reversed(self.operations):
            output_grad = operation.backward(output_grad)

        input_grad = output_grad

        assert_same_shapes(self.input_, input_grad)

        return input_grad

    def _param_grads(self) -> ndarray:

        self.param_grads = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.param_grads.append(operation.param_grad)

    def _params(self) -> ndarray:

        self.params = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.params.append(operation.param)


    def update_params(self,
                      learning_rate: float) -> None:

        for param, param_grad in zip(self.params, self.param_grads):
            param -= learning_rate * param_grad

In [8]:
class Loss(object):

    def __init__(self):
        pass

    def forward(self, prediction: ndarray, target: ndarray) -> float:

        assert_same_shapes(prediction, target)

        self.prediction = prediction
        self.target = target

        self.output = self._output()

        return self.output

    def backward(self) -> ndarray:

        self.input_grad = self._input_grad()

        assert_same_shapes(self.prediction, self.input_grad)

        return self.input_grad

    def _output(self) -> float:
        raise NotImplementedError()

    def _input_grad(self) -> ndarray:
        raise NotImplementedError()

In [9]:
class MeanSquaredError(Loss):

    def __init__(self) -> None:
        super().__init__()

    def _output(self) -> float:

        loss = np.sum(np.power(self.prediction - self.target, 2))

        return loss

    def _input_grad(self) -> ndarray:

        return 2.0 * (self.prediction - self.target)

In [10]:
class NeuralNetwork(object):

    def __init__(self, layers: List[Layer],
                 loss: Loss,
                 learning_rate: float = 0.01) -> None:
        self.layers = layers
        self.loss = loss
        self.learning_rate = learning_rate

    def forward(self, x_batch: ndarray) -> ndarray:

        x_out = x_batch
        for layer in self.layers:
            x_out = layer.forward(x_out)

        return x_out

    def batch_loss(self,
                   prediction: ndarray,
                   y_batch: ndarray) -> ndarray:

        assert_same_shape(prediction, y_batch)

        return self.loss.loss_gradient(prediction, y_batch)

    def backward(self, loss_grad: ndarray) -> None:

        grad = loss_grad
        for layer in reversed(self.layers):
            grad = layer.backward(grad)

        return None

    def train_batch(self,
                    x_batch: ndarray,
                    y_batch: ndarray) -> float:

        assert x_batch.shape[0] == y_batch.shape[0]
        
        predictions = self.forward(x_batch)

        loss = self.loss.forward(predictions, y_batch)

        self.backward(self.loss.backward(predictions, y_batch))

        self.update_params()

        return loss

    def update_params(self) -> None:
        for layer in self.layers:
            layer.update_params(self.learning_rate)