In [None]:
import numpy as np
from functools import wraps
from enum import Enum
from tqdm import tqdm

In [None]:
def conditional_generator(func):
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        if self.yield_results:
            return func(self, *args, **kwargs)
        for values in func(self, *args, **kwargs):
            pass
        return values
    return wrapper

In [None]:
class Activation(Enum):
    RELU = lambda x: (x > 0) * x, \
           lambda x: x > 0

    SIGMOID = lambda x: 1 / (1 + np.exp(-x)), \
              lambda x: x * (1 - x)

    TANH = lambda x: np.tanh(x), \
           lambda x: 1 - (x ** 2),
    
    SOFTMAX = lambda x: (ex := np.exp(x)) / np.sum(ex, axis=1, keepdims=True), \
              lambda x: (_ for _ in ()).throw(Exception("activation function softmax is only for outer layer"))

In [None]:
class BasicNeuralNetwork:

    def __init__(self, hid_size, epochs=3, alpha=1e-3, batch_size=32, skip_remaining=True,
                 hid_activation=None, out_activation=None, dropout=False, as_probs=False,
                 yield_results=False, silent=True, random_seed=None):
        self.hid_size = hid_size
        self.epochs = epochs
        self.alpha = alpha
        self.batch_size = batch_size
        self.skip_remaining = skip_remaining
        self.hid_activation = hid_activation
        self.out_activation = out_activation
        self.dropout = dropout
        self.as_probs = as_probs
        self.yield_results = yield_results
        self.silent = silent
        self.random_seed = random_seed

        if self.hid_activation is not None:
            self.__hid_activation_fun, self.__hid_activation_deriv = self.hid_activation.value
        if self.out_activation is not None:
            self.__out_activation_fun, _ = self.out_activation.value
        if self.as_probs and self.out_activation is not Activation.SOFTMAX:
            self.__to_probs, _ = Activation.SOFTMAX.value

        self.__rng = np.random.default_rng(seed=self.random_seed)
    
    @conditional_generator
    def fit(self, train_samples, train_labels):
        assert(len(train_samples) == len(train_labels))
        
        # initialize coefficients with values between -0.1 and 0.1
        self.coeffs_in_to_hid = 0.2 * self.__rng.random((len(train_samples.T), self.hid_size)) - 0.1
        self.coeffs_hid_to_out = 0.2 * self.__rng.random((self.hid_size, len(train_labels.T))) - 0.1

        for _ in tqdm(range(self.epochs), disable=self.silent):
            preds_iter, errors_iter = [], []
            for i in range(0, len(train_samples), self.batch_size):
                samples = train_samples[i:i + self.batch_size]
                labels = train_labels[i:i + self.batch_size]
                batch_size = len(samples)
                if batch_size < self.batch_size and self.skip_remaining:
                    continue

                # used to disable appproximately half the weight
                if self.dropout:
                    dropout_mask = self.__rng.integers(2, size=(batch_size, self.hid_size))

                layers_in = samples
                layers_hid = layers_in.dot(self.coeffs_in_to_hid)
                if self.hid_activation is not None:
                    layers_hid = self.__hid_activation_fun(layers_hid)
                if self.dropout:
                    layers_hid *= dropout_mask * 2
                layers_out = layers_hid.dot(self.coeffs_hid_to_out)
                if self.out_activation is not None:
                    layers_out = self.__out_activation_fun(layers_out)

                preds = layers_out
                errors = ((labels - preds) ** 2).sum(axis=0)
                preds_iter.extend(preds), errors_iter.extend(errors)

                deltas_out = labels - preds
                deltas_hid = deltas_out.dot(self.coeffs_hid_to_out.T)
                if self.hid_activation is not None:
                    deltas_hid *= self.__hid_activation_deriv(layers_hid)
                if self.dropout:
                    deltas_hid *= dropout_mask

                self.coeffs_hid_to_out += layers_hid.T.dot(deltas_out) * self.alpha
                self.coeffs_in_to_hid += layers_in.T.dot(deltas_hid) * self.alpha

            preds_iter = np.array(preds_iter)
            errors_iter = np.array(errors_iter)
            if self.as_probs and self.out_activation is not Activation.SOFTMAX:
                preds_iter = self.__to_probs(preds_iter)
            yield (preds_iter, errors_iter)

    def predict(self, samples, labels=None):
        assert(len(samples.T) == len(self.coeffs_in_to_hid))
        if labels is not None:
            assert(len(samples) == len(labels))
            assert(len(labels.T) == len(self.coeffs_hid_to_out.T))

        # predictions for test samples are made in one batch
        layers_in = samples
        layers_hid = layers_in.dot(self.coeffs_in_to_hid)
        if self.hid_activation is not None:
            layers_hid = self.__hid_activation_fun(layers_hid)
        layers_out = layers_hid.dot(self.coeffs_hid_to_out)
        if self.out_activation is not None:
            layers_out = self.__out_activation_fun(layers_out)
        
        preds = layers_out
        if labels is not None:
            errors = ((labels - preds) ** 2).sum(axis=0)
        if self.as_probs and self.out_activation is not Activation.SOFTMAX:
            preds = self.__to_probs(preds)
        if labels is not None:
            return (preds, errors)
        return preds
    
    @conditional_generator
    def evaluate(self, train_samples, test_samples, train_labels, test_labels):
        if self.yield_results:
            for (train_preds, train_errors) in self.fit(train_samples, train_labels):
                (test_preds, test_errors) = self.predict(test_samples, test_labels)
                yield (train_preds, train_errors), (test_preds, test_errors)
        else:
            (train_preds, train_errors) = self.fit(train_samples, train_labels)
            (test_preds, test_errors) = self.predict(test_samples, test_labels)
            yield (train_preds, train_errors), (test_preds, test_errors)

In [None]:
class BasicSparseNeuralNetwork:
    
    def __init__(self, dict_size, hid_size, epochs=3, alpha=1e-3,
                 hid_activation=None, out_activation=None, dropout=False, as_probs=False,
                 factor_words_freq=True, yield_results=False, silent=True, random_seed=None):
        self.dict_size = dict_size
        self.hid_size = hid_size
        self.epochs = epochs
        self.alpha = alpha
        self.hid_activation = hid_activation
        self.out_activation = out_activation
        self.dropout = dropout
        self.as_probs = as_probs
        self.factor_words_freq = factor_words_freq
        self.yield_results = yield_results
        self.silent = silent
        self.random_seed = random_seed

        if self.hid_activation is not None:
            self.__hid_activation_fun, self.__hid_activation_deriv = self.hid_activation.value
        if self.out_activation is not None:
            self.__out_activation_fun, _ = self.out_activation.value
        if self.as_probs and self.out_activation is not Activation.SOFTMAX:
            self.__to_probs, _ = Activation.SOFTMAX.value

        self.__rng = np.random.default_rng(seed=self.random_seed)

    @conditional_generator
    def fit(self, train_samples, train_labels):
        assert(len(train_samples) == len(train_labels))
        
        # initialize coefficients using values between -0.1 and 0.1
        self.coeffs_in_to_hid = 0.2 * self.__rng.random((self.dict_size, self.hid_size)) - 0.1
        self.coeffs_hid_to_out = 0.2 * self.__rng.random((self.hid_size, len(train_labels.T))) - 0.1
        
        for _ in tqdm(range(self.epochs), disable=self.silent):
            preds_iter, errors_iter = [], []
            for sample, label in zip(train_samples, train_labels):
                if self.dropout:
                    dropout_mask = self.__rng.integers(2, size=self.hid_size)
                
                layer_in = sample
                if self.factor_words_freq:
                    layer_hid = layer_in.T[1].dot(self.coeffs_in_to_hid[layer_in.T[0]])
                else:
                    layer_hid = self.coeffs_in_to_hid[layer_in.T[0]].sum(axis=0)
                if self.hid_activation is not None:
                    layer_hid = self.__hid_activation_fun(layer_hid)
                if self.dropout:
                    # half the weights are disabled, so the result is doubled to make up for it
                    layer_hid *= dropout_mask * 2
                layer_out = layer_hid.dot(self.coeffs_hid_to_out)
                if self.out_activation is not None:
                    layer_out = self.__out_activation_fun(layer_out)

                pred = layer_out
                error = ((label - pred) ** 2).sum(axis=0)
                preds_iter.append(pred), errors_iter.append(error)

                deltas_out = label - pred
                deltas_hid = deltas_out.dot(self.coeffs_hid_to_out.T)
                if self.hid_activation is not None:
                    deltas_hid *= self.__hid_activation_deriv(layer_hid)
                if self.dropout:
                    deltas_hid *= dropout_mask

                self.coeffs_hid_to_out += np.outer(layer_hid, deltas_out) * self.alpha
                self.coeffs_in_to_hid[layer_in.T[0]] += deltas_hid * self.alpha

            preds_iter = np.array(preds_iter)
            errors_iter = np.array(errors_iter)
            if self.as_probs and self.out_activation is not Activation.SOFTMAX:
                preds_iter = self.__to_probs(preds_iter)
            yield (preds_iter, errors_iter)

    def predict(self, samples, labels=None):
        if labels is not None:
            assert(len(samples) == len(labels))
            assert(len(labels.T) == len(self.coeffs_hid_to_out.T))
        preds, errors = [], []
        for sample, label in zip(samples, labels):
            layer_in = sample
            if self.factor_words_freq:
                layer_hid = layer_in.T[1].dot(self.coeffs_in_to_hid[layer_in.T[0]])
            else:
                layer_hid = self.coeffs_in_to_hid[layer_in.T[0]].sum(axis=0)
            if self.hid_activation is not None:
                layer_hid = self.__hid_activation_fun(layer_hid)
            layer_out = layer_hid.dot(self.coeffs_hid_to_out)
            if self.out_activation is not None:
                layer_out = self.__out_activation_fun(layer_out)
            
            pred = layer_out
            preds.append(pred)
            if labels is not None:
                error = ((label - pred) ** 2).sum(axis=0)
                errors.append(error)

        preds = np.array(preds)
        if labels is not None:
            errors = np.array(errors)
        if self.as_probs and self.out_activation is not Activation.SOFTMAX:
            preds = self.__to_probs(preds)
        if labels is not None:
            return (preds, errors)
        return preds

    @conditional_generator
    def evaluate(self, train_samples, test_samples, train_labels, test_labels):
        if self.yield_results:
            for (train_preds, train_errors) in self.fit(train_samples, train_labels):
                (test_preds, test_errors) = self.predict(test_samples, test_labels)
                yield (train_preds, train_errors), (test_preds, test_errors)
        else:
            (train_preds, train_errors) = self.fit(train_samples, train_labels)
            (test_preds, test_errors) = self.predict(test_samples, test_labels)
            yield (train_preds, train_errors), (test_preds, test_errors)