In [1]:
from abc import abstractmethod, ABC
from typing import Callable, Tuple, List, Optional
from IPython.core.display_functions import display
from sklearn.datasets import make_classification, make_moons
import numpy as np
from numpy import ndarray
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.dummy import DummyClassifier
from sklearn.linear_model import RidgeClassifier, Perceptron
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report, roc_curve, RocCurveDisplay, auc, roc_auc_score, f1_score, balanced_accuracy_score
from time import time_ns, time
import seaborn as sns
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import PolynomialFeatures, OneHotEncoder
plt.rcParams["figure.figsize"] = (20,10)
random_state = 244827
n_samples = 2427
n_trains = 1

In [2]:
class PUMData:
    def __init__(self, gen, random_state=244827, test_size=0.2):
        self.x_train, self.x_test, self.y_train, self.y_test = train_test_split(gen[0], gen[1], test_size=test_size, random_state=random_state, shuffle=True, stratify=gen[1])

    def normalize(self, min_v=0, max_v=1):
        for i in range(self.x_train.shape[1]):
            if self.x_train[:, i].dtype not in [int, float]:
                continue

            x_min = np.min(self.x_train[:, i])
            x_max = np.max(self.x_train[:, i])
            if x_min != x_max:
                self.x_train[:, i] = (self.x_train[:, i] - x_min) / (x_max - x_min)
                self.x_train[:, i] = self.x_train[:, i] * (max_v - min_v) + min_v

                self.x_test[:, i] = (self.x_test[:, i] - x_min) / (x_max - x_min)
                self.x_test[:, i] = self.x_test[:, i] * (max_v - min_v) + min_v
            else:
                self.x_train[:, i] = 1
                self.x_test[:, i] = 1

    def change_labels(self, old: list, new: list):
        for o, n in zip(old, new):
            self.y_test[self.y_test == o] = n
            self.y_train[self.y_train == o] = n

    def winsorize(self):
        for i in range(self.x_train.shape[1]):
            if self.x_train[:, i].dtype not in [int, float] or len(np.unique(self.x_train[:, i])) == 2:
                continue
            q1_train = np.percentile(self.x_train[:, i], 25)
            q3_train = np.percentile(self.x_train[:, i], 75)
            iqr_train = q3_train - q1_train
            self.x_train[:, i] = np.clip(self.x_train[:, i], q1_train - 1.5 * iqr_train, q3_train + 1.5 * iqr_train)

            q1_test = np.percentile(self.x_test[:, i], 25)
            q3_test = np.percentile(self.x_test[:, i], 75)
            iqr_test = q3_test - q1_test
            self.x_test[:, i] = np.clip(self.x_test[:, i], q1_test - 1.5 * iqr_test, q3_test + 1.5 * iqr_test)

    def make_polynomial(self, poly_degree=2, include_bias=False):
        self.x_test = PolynomialFeatures(degree=poly_degree, include_bias=include_bias).fit_transform(self.x_test)
        self.x_train = PolynomialFeatures(degree=poly_degree, include_bias=include_bias).fit_transform(self.x_train)

In [3]:
class ActivationFunction(ABC):
    threshold = 0

    @abstractmethod
    def __call__(self, x: ndarray) -> ndarray:
        raise NotImplementedError

    @abstractmethod
    def derivative(self, x: ndarray) -> ndarray:
        raise NotImplementedError

    def d(self, x: ndarray) -> ndarray:
        return self.derivative(x)

class Heaviside(ActivationFunction):
    def __call__(self, x: ndarray) -> ndarray:
        return np.where(x >= 0, 1, 0)

    def derivative(self, x: ndarray) -> ndarray:
        return np.ones_like(x)

class Sin(ActivationFunction):
    def __call__(self, x: ndarray) -> ndarray:
        return np.sin(x)

    def derivative(self, x: ndarray) -> ndarray:
        return np.cos(x)

class Tanh(ActivationFunction):
    def __call__(self, x: ndarray) -> ndarray:
        return np.tanh(x)

    def derivative(self, x: ndarray) -> ndarray:
        return 1 / (np.cosh(x) ** 2)

class Sign(ActivationFunction):
    def __call__(self, x: ndarray) -> ndarray:
        return np.sign(x)

    def derivative(self, x: ndarray) -> ndarray:
        return np.ones_like(x)

class Relu(ActivationFunction):
    def __call__(self, x: ndarray) -> ndarray:
        return np.where(x >= 0, x, 0)

    def derivative(self, x: ndarray) -> ndarray:
        return np.where(x >= 0, 1, 0)

class LeakyRelu(ActivationFunction):
    threshold = 0.01

    def __call__(self, x: ndarray) -> ndarray:
        return np.where(x >= 0, x, 0.01 * x)

    def derivative(self, x: ndarray) -> ndarray:
        return np.where(x >= 0, 1, 0.01)

class Sigmoid(ActivationFunction):
    threshold = 0.5

    def __call__(self, x: ndarray) -> ndarray:
        return 1 / (1 + np.exp(-x))

    def derivative(self, x: ndarray) -> ndarray:
        return self(x) * (1 - self(x))

In [5]:
class DenseLayer:
    _accepted_activation_functions = ['Heaviside', 'sin', 'tanh', 'sign', 'Relu', 'LeakyRelu', 'sigmoid']
    def __init__(self, input_features: int, n_neurons: int, activation: str):
        self.n_neurons = n_neurons
        self.input_features = input_features
        self.activation: ActivationFunction = {'Heaviside': Heaviside(),
                           'sin': Sin(),
                           'tanh': Tanh(),
                           'sign': Sign(),
                           'Relu': Relu(),
                           'LeakyRelu': LeakyRelu(),
                           'sigmoid': Sigmoid()}[activation]

        self.estimators: ndarray = np.random.normal(loc=1, scale=.15, size=(n_neurons, input_features+1))

    def forward(self, X: ndarray) -> ndarray:
        X = self.expand_features(X)
        return X.dot(self.estimators.T)

    def activate(self, S: ndarray) -> ndarray:
        return self.activation(S)

    def predict(self, X: ndarray) -> ndarray:
        X = self.expand_features(X)
        return self.activate(X.dot(self.estimators.T))

    @staticmethod
    def expand_features(X: ndarray) -> ndarray:
        return np.c_[np.ones(X.shape[0]) * -1, X]

class NeuralNetwork:
    def __init__(self, layers: List[DenseLayer], learning_rate: float = 0.01, min_lr: float=0.001, max_lr=0.01, batch_size=64, max_epochs=1000, verbose=False, dynamic_lr=False):
        self.learning_rate = learning_rate
        self.min_learning_rate = min_lr
        self.max_learning_rate = max_lr
        self.batch_size = batch_size
        self.verbose = verbose
        self.dynamic_lr = dynamic_lr
        self.epochs = max_epochs
        self.layers = layers

    def _forward(self, X: ndarray) -> Tuple[ndarray, ndarray, ndarray]:
        pre_activations = []
        activated = []
        for layer in self.layers:
            X = layer.forward(X)
            pre_activations.append(X)
            X = layer.activate(X)
            activated.append(X)
        return X, pre_activations, activated

    def _deltas(self, pre_activations: ndarray, Y: ndarray, predicted: ndarray):
        d_loss = self._derivative_loss_function(Y, predicted) * self.layers[-1].activation.d(pre_activations[-1])
        deltas = [0] * (len(self.layers) - 1)
        deltas[-1] = d_loss
        for l in range(len(deltas)-2, -1, -1):
            delta = self.layers[l + 1].estimators.T.dot(deltas[l + 1]) * self.layers[l].activation.d(pre_activations[l])
            deltas[l] = delta
        return deltas

    def _back_propagate(self, deltas , activations):
        dw = []
        deltas = [0] + deltas
        for l in range(1, len(self.layers)):
            dw_l = deltas[l].dot(activations[l -1].T)
            dw.append(dw_l)

        return dw

    def fit(self, X, Y):
        Y = Y.reshape(-1, 1)
        n = X.shape[0]
        batch_divisible = n - n % self.batch_size

        for e in tqdm(range(self.epochs), disable=not self.verbose):
            indexes = np.arange(0, n, 1)
            np.random.shuffle(indexes)
            batched_X = X[indexes]
            batched_Y = Y[indexes]

            batched_X = batched_X[:batch_divisible]
            batched_Y = batched_Y[:batch_divisible]

            for i in range(0, batch_divisible, self.batch_size):
                X_batch = batched_X[i:i + self.batch_size]
                Y_batch = batched_Y[i:i + self.batch_size]

                preds, pre_activations, activations = self._forward(X_batch)
                deltas = self._deltas(pre_activations=pre_activations, Y=Y_batch, predicted=preds)
                dW = self._back_propagate(deltas=deltas, activations=activations)

                print(dW.shape)

                # dW = (Y_batch - preds) * self.activation.d(X_batch.dot(self.coef_.T)) * X_batch
                # self.coef_ += self.learning_rate * dW.mean(axis=0)

            if self.dynamic_lr:
                self.learning_rate = self.min_learning_rate + (self.max_learning_rate - self.min_learning_rate) * (
                            1 + np.cos(e / self.epochs * np.pi))

        return self

    def decision_function(self, X: ndarray) -> ndarray:
        output = X
        for estimator in self.layers:
            output = estimator.predict(X)

        return output

    def predict(self, X: ndarray) -> ndarray:
        return np.where(self.decision_function(X) > self.layers[-1].activation.threshold, 1, 0)

    @staticmethod
    def _loss_function(true: ndarray, predicted: ndarray) -> ndarray:
        n = predicted.shape[1]
        cost = (1. / (2 * n)) * np.sum((true - predicted) ** 2)
        return cost

    @staticmethod
    def _derivative_loss_function(true: ndarray, predicted: ndarray) -> ndarray:
        return predicted - true


single_mod_data = PUMData(make_classification(n_samples=n_samples, n_features=2, n_redundant=0, n_classes=2, n_clusters_per_class=1, random_state=random_state), random_state=random_state)
data_moons = PUMData(make_moons(n_samples=n_samples, noise=0.05, random_state=random_state))

model = NeuralNetwork([
    DenseLayer(2, 4, 'sigmoid'),
    DenseLayer(4, 8, 'sigmoid'),
    DenseLayer(8, 1, 'sigmoid')
])
model.fit(single_mod_data.x_train, single_mod_data.y_train)



ValueError: shapes (3,2) and (64,1) not aligned: 2 (dim 1) != 64 (dim 0)

Traceback (most recent call last):
  File "_pydevd_bundle\pydevd_cython_win32_38_64.pyx", line 1035, in _pydevd_bundle.pydevd_cython_win32_38_64.PyDBFrame.trace_dispatch
  File "D:\JetBrains\apps\PyCharm-P\ch-0\213.7172.26\plugins\python\helpers-pro\jupyter_debug\pydev_jupyter_plugin.py", line 144, in cmd_step_over
    if _is_inside_jupyter_cell(frame, pydb):
  File "D:\JetBrains\apps\PyCharm-P\ch-0\213.7172.26\plugins\python\helpers-pro\jupyter_debug\pydev_jupyter_plugin.py", line 209, in _is_inside_jupyter_cell
    if is_cell_filename(filename):
  File "D:\JetBrains\apps\PyCharm-P\ch-0\213.7172.26\plugins\python\helpers-pro\jupyter_debug\pydev_jupyter_plugin.py", line 220, in is_cell_filename
    ipython_shell = get_ipython()
NameError: name 'get_ipython' is not defined
