In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from __future__ import annotations
%matplotlib widget

from typing import TypeAlias
Vector: TypeAlias = np.ndarray  # A 1-D array
Matrix: TypeAlias = np.ndarray  # A 2-D array

In [2]:
class LayerInitializationStrategy:
    """Parent class for initialization strategies of weights and biases in Layer."""
    def run(self, size_of_previous_layer: int, size_of_current_layer: int) -> tuple[np.ndarray, np.ndarray]:
        raise NotImplementedError("The 'run' method is implemented only in child classes")

class RandomUniform(LayerInitializationStrategy):
    """
    Initialization strategy sampling weights and biases uniformly in a given interval.

    Attributes
    ----------
    scale: float
        The half-lenght of the interval [-scale, scale] from which values are sampled.

    Methods
    -------
    run(self, size_of_previous_layer: int, size_of_current_layer: int) -> tuple[np.ndarray, np.ndarray]
        Returns the tuple (random_weights, random_biases), where random_weights and random_biases are np.ndarrays of the appropriate shape.
    """
    def __init__(self, scale_of_interval: float, center_of_interval: float = 0):
        self.scale: float = scale_of_interval
        self.center: float = center_of_interval
        
    
    def run(self, size_of_previous_layer: int, size_of_current_layer: int) -> tuple[np.ndarray, np.ndarray]:
        random_weights: np.ndarray = np.random.uniform(
            -self.scale + self.center, self.scale + self.center,
            (size_of_previous_layer, size_of_current_layer)
            )
        random_biases: np.ndarray = np.random.uniform(
            -self.scale + self.center, self.scale + self.center,
            size_of_current_layer
            )
        return random_weights, random_biases

In [3]:
class ActivationFunction:
    """Parent class for activation functions of neural nodes."""
    def __call__(self, x: np.ndarray) -> np.ndarray:
        raise NotImplementedError("The '__call__' method must be implemented in child classes")

    def derivative(self, x: np.ndarray) -> np.ndarray:
        raise NotImplementedError("The 'derivative' method must be implemented in child classes")

class Sigmoid(ActivationFunction):
    """Sigmoid activation function. f(x) = 1 / (1 + np.exp(-x))"""
    def __call__(self, x: np.ndarray) -> np.ndarray:
        return 1 / (1 + np.exp(-x))

    def derivative(self, x: np.ndarray) -> np.ndarray:
        sigmoid = self(x)  # Reuse the __call__ method to compute sigmoid
        return sigmoid * (1 - sigmoid)

In [4]:
class Layer:
    """
    A Layer component of a NeuralNetwork.

    Attributes
    ----------
    unit_number: int
        The number of nodes/units in the Layer.
    init_strat: LayerInitializationStrategy
        The initialization strategy for the weights and biases of the Layer
    activation_function: ActivationFunction
        The function.
    
    I should continue writing docstrings later
    """

    def __init__(self, number_of_units: int,
                 initialization_strategy: LayerInitializationStrategy,
                 activation_function: ActivationFunction):
        self.unit_number: int = number_of_units  # The number of nodes/units in the Layer.
        self.init_strat: LayerInitializationStrategy = initialization_strategy
        self.activation_function: ActivationFunction = activation_function
        self.linear_output: Vector = None
        self.output: Vector = None  # The values computed by the units based on the outputs of the previous layer. Stored for later backprop.
        self.previous_layer: Layer = None  # The layer preceding the current one in the Neural Network. The NN should connect layers during initialization.
        self.next_layer: Layer = None
        self.weights: Matrix = None; self.biases: Vector = None  # Weights and biases connecting the layer with the previous layer of the neural network.
        self.delta: Vector = None
        

    def initialize_weights(self):
        self.weights, self.biases = self.init_strat.run(self.previous_layer.unit_number, self.unit_number)

    def compute_output(self):
        self.linear_output = np.dot(self.previous_layer.output, self.weights) + self.biases
        self.output: Vector = self.activation_function(self.linear_output)
        return self.output
    
    
    # def compute_gradient(self)  # Should this function be here or in a later TrainingAlgorithm class?

class InputLayer(Layer):
    def __init__(self, number_of_units: int):
        super().__init__(number_of_units, None, None)
        # An input layer has no previous layer to connect to, so attributes referring to a previous layer are deleted.
        del self.previous_layer, self.weights, self.biases, self.init_strat, self.activation_function
    
    def feed_input(self, value: Vector) -> None:
        self.output: Vector = value
    
    def initialize_weights(self):
        raise NotImplementedError("InputLayer does not require weight initialization.")

    def compute_output(self):
        return self.output

class HiddenLayer(Layer):
    def backward(self):
        self.delta = np.dot(self.next_layer.weights, self.next_layer.delta) * self.activation_function.derivative(self.linear_output)

class OutputLayer(Layer):
    def __init__(self, number_of_units: int,
                 initialization_strategy: LayerInitializationStrategy,
                 activation_function: ActivationFunction):
        super().__init__(number_of_units, initialization_strategy, activation_function)
        del self.next_layer

In [5]:
class ListOfArrays:
    """
    An utility class for defining element-wise operations on lists containing heteromorphic np.ndarrays.
    Useful for storing network weights and biases in MLP-architecture NeuralNetworks.
    """
    def __init__(self, arrays: list[np.ndarray]):
        self.arrays: list[np.ndarray] = arrays
    
    def __repr__(self):
        return f"ListOfArrays{(self.arrays)}"

    def __getitem__(self, index):
        return self.arrays[index]

    def __setitem__(self, index, value):
        self.arrays[index] = value
    
    def __add__(self, other):
        if not isinstance(other, ListOfArrays): raise TypeError("Operand is not a ListOfArrays")
        return ListOfArrays([x + y for x, y in zip(self.arrays, other.arrays)])
    
    def __sub__(self, other):
        if not isinstance(other, ListOfArrays): raise TypeError("Operand is not a ListOfArrays")
        return ListOfArrays([x - y for x, y in zip(self.arrays, other.arrays)])
    
    def __mul__(self, scalar: float):
        return ListOfArrays([x * scalar for x in self.arrays])
    
    def __rmul__(self, scalar: float):
        return self.__mul__(scalar)

    def __truediv__(self, scalar: float):
        return ListOfArrays([x / scalar for x in self.arrays])
    
    def __pow__(self, power: float):
        return ListOfArrays([x**power for x in self.arrays])
    
    def sum(self) -> float:
        return np.sum([np.sum(array) for array in self.arrays])
    
    def set_all_values_to(self, value: float) -> None:
        for a in self.arrays:
            a = value*np.ones(a.shape)


class ListOfVectors(ListOfArrays):
    def __init__(self, arrays: list[Vector]):
        super().__init__(arrays)

class ListOfMatrices(ListOfArrays):
    pass
    def __init__(self, arrays: list[Matrix]):
        super().__init__(arrays)

In [6]:
class NeuralNetwork:
    def __init__(self, layers: list[Layer]):
        self.layers: list[Layer] = layers
        # Maybe here I should ensure that layers are correctly typed (layers[0] should be an InputLayer, layers[-1] an OutputLayer, all other layers should be HiddenLayer).
        self.input_layer: InputLayer = layers[0]; self.hidden_layers: list[HiddenLayer] = layers[1: -1]; self.output_layer: OutputLayer = layers[-1]
        self.layers_with_weights: list[Layer] = self.layers[1: ]
        self.connect_layers()
        self.initialize_weights()

        #self.weights = ListOfMatrices([l.weights for l in self.layers_with_weights])
        #self.biases = ListOfVectors([l.biases for l in self.layers_with_weights])
    
    def connect_layers(self) -> None:
        for (i, layer) in enumerate(self.layers):
            if not isinstance(layer, InputLayer): layer.previous_layer = self.layers[i - 1]
            if not isinstance(layer, OutputLayer): layer.next_layer = self.layers[i + 1]

    def initialize_weights(self) -> None:
        for layer in self.layers_with_weights: layer.initialize_weights()
    
    def feed_input(self, value: np.ndarray) -> None:
        self.input_layer.feed_input(value)

    def activate_network(self) -> np.ndarray:
        for i in range(len(self.layers)): self.layers[i].compute_output()
        return self.output_layer.output
    
    def compute_output(self, value: np.ndarray) -> np.ndarray:
        self.feed_input(value)
        return self.activate_network()
    
    def backward(self) -> None:
        for l in reversed(self.hidden_layers):
            l.backward()
    
    def compute_multiple_outputs(self, x_data: pd.DataFrame | np.ndarray) -> np.ndarray[np.ndarray]:
        if isinstance(x_data, pd.DataFrame): x_data = x_data.to_numpy()
        outputs = np.array(
            [
                self.compute_output(x_data[i]) for i in range(len(x_data))
            ]
        )
        return outputs

In [7]:
class RegularizationTerm:
    def set_network(self, network: NeuralNetwork) -> None:
        self.network = network

    def __call__(self) -> float:
        pass

    def gradient(self) -> tuple[ListOfArrays, ListOfArrays]:
        pass

class NoRegularization(RegularizationTerm):
    def __init__(self):
        pass

    def __call__(self) -> float:
        return 0
    
    def gradient(self) -> tuple[ListOfArrays, ListOfArrays]:
        layers = self.network.layers_with_weights
        return ListOfArrays([np.zeros_like(l.weights) for l in layers]), ListOfArrays([np.zeros_like(l.biases) for l in layers])

class Tikhonov(RegularizationTerm):
    def __init__(self, penalty: float):
        self.penalty: float = penalty
        self.network: NeuralNetwork = None

    def __call__(self) -> float:
        layers: list[Layer] = self.network.layers_with_weights
        weights_term = np.sum([np.sum(layer.weigths**2) for layer in layers])  # The sum of squares of all the weights in the NN.
        biases_term = np.sum([np.sum(layer.biases**2) for layer in layers])
        return self.penalty * (weights_term + biases_term) / 2

    def gradient(self) -> tuple[ListOfArrays, ListOfArrays]:
        layers: list[Layer] = self.network.layers_with_weights
        gradient_on_weights: ListOfArrays = ListOfArrays([-self.penalty * l.weights for l in layers])
        gradient_on_biases: ListOfArrays = ListOfArrays([-self.penalty * l.biases for l in layers])
        return gradient_on_weights, gradient_on_biases

In [8]:
class MomentumRule:
    pass

In [9]:
#class TrainingAlgorithm: pass

class StoppingCondition:
    def __init__(self):
        self.alg: TrainingAlgorithm = None
    
    def set_alg(self, alg: TrainingAlgorithm) -> None:
        self.alg = alg

    @property
    def is_satisfied(self) -> bool:
        pass

class ThresholdOnTrainingError(StoppingCondition):
    def __init__(self, threshold: float, patience: int):
        super().__init__()
        self.threshold: float = threshold
        self.patience: int = patience
    
    @property
    def is_satisfied(self) -> bool:
        current_training_error: float = self.alg.current_tr_err
        if current_training_error < self.threshold:
            self.consecutive_epochs += 1
            return self.consecutive_epochs > self.patience
        else:
            self.consecutive_epochs = 0
            return False

In [10]:
class ErrorFunction:
    def __call__(self, y_data: np.ndarray, y_predicted: np.ndarray) -> float:
        pass
    
    def simple_gradient(self, y_data: np.ndarray, y_predicted: np.ndarray) -> np.ndarray:
        pass

class MSE(ErrorFunction):
    def __call__(self, y_data: np.ndarray, y_predicted: np.ndarray) -> float:
        """
        Returns the average over the dataset of the square euclidean distance between the training outputs and the predictions.
        """
        num_patterns = 1 if y_data.ndim == 1 else len(y_data)
        return 0.5 * np.sum((y_data - y_predicted)**2) / num_patterns
    
    def simple_gradient(self, y_data: np.ndarray, y_predicted: np.ndarray) -> np.ndarray:
        """
        Returns y_data - y_predicted. It is meant to be used on a single pattern at a time, during backpropagation.
        """
        return (y_data - y_predicted)

In [11]:
class Dataset:
    def __init__(self, x_data: Matrix, y_data: Matrix):
        """
        Initializes the dataset with input (x_data) and output (y_data).
        """
        if x_data.ndim == 1 or y_data.ndim == 1:
            raise ValueError(f"x_data and y_data should be matrices, where each row represents a pattern and each column a feature, but got arguments of shape {x_data.shape} and {y_data.shape}")

        self.x: Matrix = x_data
        self.y: Matrix = y_data
    
    def __len__(self):
        """
        Returns the number of patterns in the dataset
        """
        return len(self.x)
    
    def __getitem__(self, index):
        """
        Retrieves the input-output pair at the specified index.
        """
        return self.x[index], self.y[index]  


class DataManager:
    def __init__(self, dataset: Dataset, minibatch_size: int = None, shuffle: bool = False):
        self.dataset: Dataset = dataset
        self.minibatch_size: int = minibatch_size or len(dataset)
        self.shuffle: bool = shuffle
    
    def __iter__(self):
        """
        An iterator yielding minibatches
        """
        indices = np.arange(len(self.dataset))
        
        if self.shuffle:
            np.random.shuffle(indices)
        
        for start in range(0, len(indices), self.minibatch_size):
            minibatch_indices = indices[start:start + self.minibatch_size]
            minibatch_x = self.dataset.x[minibatch_indices, :]
            minibatch_y = self.dataset.y[minibatch_indices, :]
            yield minibatch_x, minibatch_y

In [12]:
class TrainingAlgorithm:
    def __init__(self, x_train: pd.DataFrame, y_train: pd.DataFrame, network: NeuralNetwork):
        self.network: NeuralNetwork = network

        if isinstance(x_train, pd.DataFrame): x_train = x_train.to_numpy()
        if isinstance(y_train, pd.DataFrame): y_train = y_train.to_numpy()
        self.training_set = Dataset(x_train, y_train)

        self.current_tr_err: float = float('inf')

        self.history: dict[list] = {'training error': []}
        self.cache: dict = {}

class Backprop(TrainingAlgorithm):
    def __init__(self, x_train: pd.DataFrame, y_train: pd.DataFrame, network: NeuralNetwork,
                 learning_rate: float,
                 error_function: ErrorFunction,
                 stopping_condition: StoppingCondition,
                 regularization_term: RegularizationTerm = None,
                 minibatch_size: int = None
                 ):
        super().__init__(x_train, y_train, network)

        self.learning_rate: float = learning_rate

        self.err_fun: ErrorFunction = error_function
        
        self.stop_cond: StoppingCondition = stopping_condition
        self.stop_cond.set_alg(self)

        self.weights_gradient: ListOfMatrices = ListOfMatrices([np.zeros_like(l.weights) for l in self.network.layers_with_weights])
        self.biases_gradient: ListOfVectors = ListOfVectors([np.zeros_like(l.biases) for l in self.network.layers_with_weights])

        self.regularization_term: RegularizationTerm = regularization_term or NoRegularization()
        self.regularization_term.set_network(self.network)

        self.minibatch_size: int = minibatch_size or len(self.training_set)
        self.minibatch_generator: DataManager = DataManager(self.training_set, self.minibatch_size,
                                                            shuffle = (self.minibatch_size != len(self.training_set))
                                                            )
        self.current_mb_size: int = None
        self.current_mb_x: Matrix = None; self.current_mb_y: Matrix = None
        
    
    def run(self, max_epochs: int) -> None:
        epoch: int = 0
        while epoch < max_epochs:
            epoch += 1

            for minibatch_x, minibatch_y in self.minibatch_generator:
                self.update_minibatch_metadata(minibatch_x, minibatch_y)
                self.update_gradients()
                self.update_network_parameters()
            
            self.compute_training_error()
            if self.stop_cond.is_satisfied: break

    def update_minibatch_metadata(self, minibatch_x: Matrix, minibatch_y: Matrix):
        self.current_mb_x = minibatch_x; self.current_mb_y = minibatch_y
        self.current_mb_size = len(minibatch_x)
    
    def update_gradients(self):
        self.reset_gradients()

        for x, y in zip(self.current_mb_x, self.current_mb_y):
            predicted_y = self.network.compute_output(x)
            out_l = self.network.output_layer
            out_l.delta = self.err_fun.simple_gradient(y, predicted_y)*out_l.activation_function.derivative(out_l.linear_output)
            self.network.backward()
            self.weights_gradient += ListOfMatrices([np.outer(l.previous_layer.output, l.delta) for l in self.network.layers_with_weights])
            self.biases_gradient += ListOfVectors([l.delta for l in self.network.layers_with_weights])
        self.weights_gradient /= self.current_mb_size; self.biases_gradient /= self.current_mb_size

        self.add_regul_contribution()
        self.add_momentum_contribution()
    
    def reset_gradients(self) -> None:
        self.weights_gradient.set_all_values_to(0)
        self.biases_gradient.set_all_values_to(0)
    
    def add_regul_contribution(self) -> None:
        contribution_to_w, contribution_to_b = self.regularization_term.gradient()
        self.weights_gradient += contribution_to_w; self.biases_gradient += contribution_to_b

    def add_momentum_contribution(self) -> None:
        pass
    
    def update_network_parameters(self) -> None:
        factor = self.learning_rate * self.current_mb_size / len(self.training_set)

        for i, l in enumerate(self.network.layers_with_weights):
            l.weights += factor * self.weights_gradient[i]
            l.biases += factor * self.biases_gradient[i]
        #self.network.weights += factor * self.weights_gradient
        #self.network.biases += factor * self.biases_gradient
        

    def compute_training_error(self) -> None:
        y_prediction = self.network.compute_multiple_outputs(self.current_mb_x)
        self.current_tr_err = self.err_fun(self.current_mb_y, y_prediction)
        self.history['training error'] += [self.current_tr_err]



In [18]:
# Some fake training data
x = np.array(
    [[1, 0],
     [1, 0],
     [1, 0]], dtype = float
)
y = np.array(
    [[1, 0],
     [1, 0],
     [1, 0]], dtype = float
)


# Define and initialize the nn manually
layers = [InputLayer(2), HiddenLayer(3, RandomUniform(0.0), Sigmoid()), OutputLayer(2, RandomUniform(0.0), Sigmoid())]
nn = NeuralNetwork(layers)

# Initialize layers
nn.hidden_layers[0].weights = np.array(
    [[1, 0.5, 2],
    [-1, 0, -1.5]], dtype = float
)
nn.hidden_layers[0].biases = np.array([1, 2, 3], dtype=float)

nn.output_layer.weights = np.array(
    [[1, 3],
     [2, 3],
     [-1, 0]], dtype = float
)
nn.output_layer.biases = np.array([0,2], dtype=float)

# Test forward pass
nn.compute_output(x[0])

print(nn.input_layer.output)
print(nn.hidden_layers[0].linear_output)
print(nn.hidden_layers[0].output)
print(nn.output_layer.linear_output)
print(nn.output_layer.output)


# Test packward pass
nn.output_layer.delta = (y[0] - nn.compute_output(x[0])) * nn.output_layer.activation_function.derivative(nn.output_layer.linear_output)
nn.backward()
print(nn.output_layer.delta)
print(nn.hidden_layers[0].delta)

[1. 0.]
[2.  2.5 5. ]
[0.88079708 0.92414182 0.99330715]
[1.73577357 7.41481669]
[0.85014943 0.9993981 ]
[ 0.01909027 -0.00060118]
[ 0.001815    0.00255016 -0.00012691]


In [19]:
training_alg = Backprop(x, y, nn, 200.0, MSE(), ThresholdOnTrainingError(0.00001, 10), regularization_term = Tikhonov(0.000), minibatch_size = None)

In [20]:
old_weights = ListOfArrays([np.copy(nn.hidden_layers[0].weights), np.copy(nn.output_layer.weights)])
old_biases = ListOfArrays([np.copy(nn.hidden_layers[0].biases), np.copy(nn.output_layer.biases)])

# Test one epoch update
training_alg.run(1)
new_weights = ListOfArrays([np.copy(nn.hidden_layers[0].weights), np.copy(nn.output_layer.weights)])
new_biases = ListOfArrays([np.copy(nn.hidden_layers[0].biases), np.copy(nn.output_layer.biases)])



print(new_weights - old_weights)


print(
    training_alg.learning_rate * np.outer(nn.input_layer.output, nn.hidden_layers[0].delta)
)
print(
    training_alg.learning_rate * np.outer(nn.hidden_layers[0].output, nn.output_layer.delta)
)

ListOfArrays[array([[ 0.36299938,  0.51003272, -0.02538264],
       [ 0.        ,  0.        ,  0.        ]]), array([[ 3.3629306 , -0.1059028 ],
       [ 3.52842316, -0.11111436],
       [ 3.79250011, -0.11943047]])]
[[ 0.36299938  0.51003272 -0.02538264]
 [ 0.          0.         -0.        ]]
[[ 3.58340937 -0.11284595]
 [ 3.70829744 -0.11677882]
 [ 3.79117874 -0.11938886]]
