In [25]:
import numpy as np
from itertools import combinations
import random

class LogicFun:
    ARGS_COUNT = None

    def __init__(self, var_indexes, neg=False):
        self.var_indexes = var_indexes
        self.neg = neg

    def boolean(self, inputs):
        value = self._boolean(inputs)
        return not value if self.neg else value

    def real(self, inputs):
        value = self._real(inputs)
        return 1 - value if self.neg else value

    def deriv(self, inputs):
        value = self._deriv(inputs)
        return -value if self.neg else value

    def _boolean(self, inputs):
        raise NotImplementedError

    def _real(self, inputs):
        raise NotImplementedError

    def _deriv(self, inputs):
        raise NotImplementedError


class TrueFun(LogicFun):
    ARGS_COUNT = 0
    def _boolean(self, inputs): return True
    def _real(self, inputs): return 1.0
    def _deriv(self, inputs): return np.zeros_like(inputs)


class IdentityFun(LogicFun):
    ARGS_COUNT = 1
    def _boolean(self, inputs): return inputs[self.var_indexes[0]]
    def _real(self, inputs): return inputs[self.var_indexes[0]]
    def _deriv(self, inputs):
        deriv = np.zeros_like(inputs)
        deriv[self.var_indexes[0]] = 1
        return deriv


class AndFun(LogicFun):
    ARGS_COUNT = 2
    def _boolean(self, inputs): return inputs[self.var_indexes[0]] and inputs[self.var_indexes[1]]
    def _real(self, inputs): return inputs[self.var_indexes[0]] * inputs[self.var_indexes[1]]
    def _deriv(self, inputs):
        deriv = np.zeros_like(inputs)
        deriv[self.var_indexes[0]] = inputs[self.var_indexes[1]]
        deriv[self.var_indexes[1]] = inputs[self.var_indexes[0]]
        return deriv


class OrFun(LogicFun):
    ARGS_COUNT = 2
    def _boolean(self, inputs): return inputs[self.var_indexes[0]] or inputs[self.var_indexes[1]]
    def _real(self, inputs): return inputs[self.var_indexes[0]] + inputs[self.var_indexes[1]] - inputs[self.var_indexes[0]] * inputs[self.var_indexes[1]]
    def _deriv(self, inputs):
        deriv = np.zeros_like(inputs)
        deriv[self.var_indexes[0]] = 1 - inputs[self.var_indexes[1]]
        deriv[self.var_indexes[1]] = 1 - inputs[self.var_indexes[0]]
        return deriv
    

class Network:
    def __init__(self, input_size, layers_sizes, connections_rate):
        self.input_size = input_size
        self.layers = self.__create_layers(input_size, layers_sizes, connections_rate)

    def predict_boolean(self, inputs):
        inputs = inputs > 0.5
        outputs = [self.__propagate_boolean(input) for input in inputs]
        return np.array(outputs)

    def predict_real(self, inputs):
        inputs = inputs.astype(np.float64)
        outputs = [self.__propagate_real(input) for input in inputs]
        return np.array(outputs)

    def fit(self, inputs, outputs, epochs=1, learning_rate=0.01):
        inputs, outputs = inputs.astype(np.float64), outputs.astype(np.float64)
        for epoch in range(epochs):
            self.__learn_epoch(inputs, outputs, learning_rate, epoch + 1)

    def __propagate_boolean(self, input):
        current_input = input
        for layer in self.layers:
            current_input = layer.propagate_boolean(current_input)
        return current_input

    def __propagate_real(self, input):
        current_input = input
        for layer in self.layers:
            current_input = layer.propagate_real(current_input)
        return current_input

    def __learn_epoch(self, inputs, outputs, learning_rate, epoch_no):
        shuffled_inputs, shuffled_outputs = self.__shuffle(inputs, outputs)
        for input, output in zip(shuffled_inputs, shuffled_outputs):
            self.__learn_single(input, output, learning_rate)

    def __learn_single(self, input, output, learning_rate):
        prediction = self.__propagate_real(input)
        gradients = self.__get_loss_deriv(prediction, output)
        self.__backpropagate(gradients, learning_rate)

    def __backpropagate(self, gradients, learning_rate):
        for layer in reversed(self.layers):
            gradients = layer.backpropagate(gradients, learning_rate)

    @staticmethod
    def __shuffle(inputs, outputs):
        indices = np.arange(len(inputs))
        np.random.shuffle(indices)
        return inputs[indices], outputs[indices]

    @staticmethod
    def __get_loss_deriv(prediction, target):
        return 2 * (prediction - target)

    @staticmethod
    def __create_layers(input_size, layers_sizes, connections_rate):
        layers = []
        all_sizes = [input_size, *layers_sizes]
        for input_size, gates_count in zip(all_sizes, all_sizes[1:]):
            layer = Layer(gates_count, input_size, connections_rate)
            layers.append(layer)
        return layers


class Layer:
    def __init__(self, size, input_size, connections_rate):
        self.size = size
        self.input_size = input_size
        self.connections_rate = connections_rate
        self.gates = [Gate(input_size, connections_rate) for _ in range(size)]

    def propagate_boolean(self, input):
        return np.array([gate.propagate_boolean(input) for gate in self.gates])

    def propagate_real(self, input):
        return np.array([gate.propagate_real(input) for gate in self.gates])

    def backpropagate(self, gradient, learning_rate):
        next_gradients = np.zeros((self.size, self.input_size), dtype=np.float64)
        for i, (gate, gradient_elem) in enumerate(zip(self.gates, gradient)):
            next_gradients[i, :] = gate.backpropagate(gradient_elem, learning_rate)
        return np.sum(next_gradients, axis=0)


class Gate:
    FUNCTIONS = [TrueFun, IdentityFun, AndFun, OrFun]

    def __init__(self, input_size, connections_rate):
        self.input_size = input_size
        self.connections_rate = connections_rate
        self.funs = self.__create_all_functions_variants()
        self.weights = np.random.randn(len(self.funs))
        self.__input = None
        self.__funs_results = None

    def propagate_boolean(self, input):
        max_weight_idx = np.argmax(self.weights)
        fun = self.funs[max_weight_idx]
        return fun.boolean(input)

    def propagate_real(self, input):
        funs_results = np.array([fun.real(input) for fun in self.funs])
        soft_weights = self.__softmax(self.weights)
        self.__input = input
        self.__funs_results = funs_results
        return soft_weights @ funs_results

    def backpropagate(self, gradient, learning_rate):
        next_gradient = self.__get_next_gradient(gradient)
        self.__update_weights(gradient, learning_rate)
        return next_gradient

    def __get_next_gradient(self, gradient):
        derivs = np.column_stack([fun.deriv(self.__input) for fun in self.funs])
        derivs = derivs * self.__softmax(self.weights)
        derivs = np.sum(derivs, axis=1)
        return gradient * derivs

    def __update_weights(self, gradient, learning_rate):
        self.weights -= gradient * self.__funs_results * learning_rate

    @staticmethod
    def __softmax(x):
        x = x - np.max(x)
        e = np.exp(x)
        s = np.sum(e)
        return e / s if s != 0 else np.zeros_like(e)

    def __create_all_functions_variants(self):
        all_variants = []
        for func_cls in self.FUNCTIONS:
            func_variants = self.__create_function_variants(func_cls)
            all_variants.extend(func_variants)
        return all_variants

    def __create_function_variants(self, func_cls):
        vars_combinations = list(combinations(range(self.input_size), func_cls.ARGS_COUNT))
        vars_combinations = random.sample(vars_combinations, k=min(self.connections_rate, len(vars_combinations)))
        return [func_cls(vars_combination, neg) for vars_combination in vars_combinations for neg in [True, False]]
    

def get_accuracy(prediction, target):
    prediction, target = prediction > 0.5, target > 0.5
    correct_count = np.sum(np.all(prediction == target, axis=1))
    total_count = len(target)
    return correct_count / total_count if total_count != 0 else 0

In [5]:
import numpy as np
from tensorflow.keras.datasets.mnist import load_data as load_data_MNIST
from einops import rearrange

(x_train, y_train), (x_test, y_test) = load_data_MNIST()

x_train = rearrange(x_train, 'b h w -> b (h w)')
x_test = rearrange(x_test, 'b h w -> b (h w)')

features = x_test > 100
outputs = (y_test == 5).reshape(-1, 1)

In [7]:
network = Network(784, [40, 50, 50, 40, 1], connections_rate=5)
network.fit(features, outputs, epochs=1, learning_rate=0.1)

In [None]:
from sklearn.metrics import accuracy_score
predictions = network.predict_real(features)

In [26]:
accuracy = get_accuracy(predictions, outputs)
print(accuracy)

0.9108
