In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from numpy.random import uniform
from keras.datasets import mnist

In [None]:
def shuffle(list1, list2):
    temp = list(zip(list1, list2))
    np.random.shuffle(temp)
    res1, res2 = zip(*temp)
    res1, res2 = list(res1), list(res2)
    return res1, res2

def train_speed_dont_decrease_func(epoch_now, decrease_count, train_speed):
    return decrease_count, train_speed

def train_speed_decrease_func(epoch_now, decrease_count, train_speed):
    if (epoch_now % 30 == 0):
             train_speed *= 0.93
             decrease_count += 1
    return decrease_count, train_speed


def sigm(x):
    return 1 / (1 + np.exp(-x))

coeff = 0.1

def der_sigm(x):
    return sigm(x) * (1 - sigm(x))  

def relu(x):
  try:
    a = iter(x)
    array = []
    for i in x:
      if i >= 0:
        array.append(coeff * i)
      else:
        array.append(0)
    return np.array(array)
  except:
    return coeff * x

def der_relu(x):
  try:
    array = []
    for i in x:
      if i >= 0:
        array.append(coeff)
      else:
        array.append(0)
    return np.array(array)
  except:
    return coeff
    

class Net:
    def __init__(self, inputs, neurons_array, activations = None, deriviatives = None, weight_border = 0.5, normalize_weights = True):
        if activations != None: 
            self.acts = activations
            self.ders = deriviatives
        else:
            self.acts = [sigm for i in range(len(neurons_array))]
            self.ders = [der_sigm for i in range(len(neurons_array))]

        self.G = [[0 for n in range(neurons_array[i])] for i in range(len(neurons_array))]
        
        self.train_speed_decrease_func = train_speed_dont_decrease_func
        self.round_training_predict = True

        inputs += 1

        self.weights = [[[uniform(-weight_border, weight_border) for k in range(neurons_array[i - 1] + 1 if i > 0 else inputs)] for j in range(neurons_array[i])] for i in range( len(neurons_array))]
        
        if normalize_weights:
            for i in range(len(neurons_array)):
                inputs = neurons_array[i - 1] + 1 if i > 0 else inputs
                b = 0.7 * (neurons_array[i] ** (1 / inputs))
                for j in range(len(self.weights[i])):
                    sum = np.sum(self.weights[i][j])
                    for k in range(len(self.weights[i][j])):
                        self.weights[i][j][k] = b * self.weights[i][j][k] / sum

        self.fields = [[0 for j in range(neurons_array[i])] for i in range(len(neurons_array))]
        self.J = []
        self.H = []
        self.train_size_epoch = 1

        self._l = len(self.weights) - 1
    
    def _normalize_weights(self, inputs_count):
        for i in range(len(self.weights)):
                inputs = len(self.weights[i - 1]) + 1 if i > 0 else inputs_count
                b = 0.7 * (len(self.weights[i]) ** (1 / inputs))
                for j in range(len(self.weights[i])):
                    sum = np.sum(self.weights[i][j])
                    for k in range(len(self.weights[i][j])):
                        self.weights[i][j][k] = b * self.weights[i][j][k] / sum
        
    def predict(self, signals : any):
        signals = np.append(signals.copy(), 1)

        for i in range(len(self.weights[0])):
            self.fields[0][i] = np.dot(self.weights[0][i], signals)

        for i in range(1, len(self.weights)):
            for j in range(len(self.weights[i])):
                self.fields[i][j] = np.dot(self.weights[i][j], np.append(self.acts[i - 1](np.array(self.fields[i - 1].copy())), 1))

        arr = np.array(self.fields[self._l].copy())
        return self.acts[self._l](arr)
    
    def train(self, x_train, y_train, train_speed, max_epochs,  max_error = 0.1):
        errs_history = []
        hitrate_history = []

        decrease_count = 0

        x_train = x_train.copy()
        y_train = y_train.copy()

        if not isinstance(y_train, np.ndarray):
            y_train = np.array(y_train)

        for epoch in range(max_epochs):

            print(f'epoch {epoch + 1} has started', end='')

            x, y = None, None

            x_train, y_train = shuffle(x_train, y_train)

            if self.train_size_epoch == 1:
                x, y = x_train, y_train
            else:
                x, a, y, b = train_test_split(x_train, y_train, train_size=self.train_size_epoch, random_state=42)

            x, y = shuffle(x, y)

            errs = 0

            for i_x in range(len(x)):
                y_pr = self.predict(x[i_x])

                error = y_pr - y[i_x]

                errs += np.square(error).mean()

                self._back_prop(error, x, i_x, train_speed)
            
            # errs = errs / len(x) if len(x) > 1 else errs * len(self.weights[self._l])
            errs /= len(x)
            errs_history.append(errs)
            
            print(f', error = {errs}')

            if max_error != None and errs <= max_error:
                print(f'fitted by error')
                return

            decrease_count, train_speed = self.train_speed_decrease_func(epoch, decrease_count, train_speed)

        print('ended by epochs')

    def _get_signals(self, x, i, i_x):
        signals = self.acts[i - 1](np.array(self.fields[i - 1].copy())) if i > 0 else x[i_x].copy()
        signals = np.append(signals, 1)
        return signals
    
    def _back_prop(self, error, x, i_x, train_speed):

        for i in range(len(self.weights[self._l])):
            g = self.ders[self._l](self.fields[self._l][i]) * error[i]
            self.G[self._l][i] = g
            signals = self._get_signals(x, self._l, i_x)
            for j in range(len(self.weights[self._l][i])):
                self.weights[self._l][i][j] -= train_speed * self.G[self._l][i] * signals[j]
        
        for i in range(self._l - 1, -1, -1):
            signals = self._get_signals(x, i, i_x)
            for j in range(len(self.weights[i])):
                weighted_sum = 0
                for k in range(len(self.weights[i + 1])):
                    weighted_sum += self.G[i + 1][k] * self.weights[i + 1][k][j]
                g = self.ders[i](self.fields[i][j]) * weighted_sum 

                self.G[i][j] = g

                for k in range(len(self.weights[i][j])):
                    self.weights[i][j][k] -= train_speed * self.G[i][j] * signals[k]

In [None]:
(X_trn, y_trn), (X_tst, y_tst) = mnist.load_data()

In [None]:
def make_samples(x, y, trn_show, n_samples):
  dictionary = {}
  for i in range(10):
    dictionary[i] = 0
  # x, y = shuffle(x, y)
  y_train = []
  x_train = []
  show = []
  for i in range(len(x)):
    number = y[i]
    if dictionary[number] < n_samples:
      x_train.append(x[i])
      y_train.append(y[i])
      show.append(trn_show[i])
      dictionary[number] += 1
    if np.sum(list(dictionary.values())) == n_samples * 10:
      break
  # print(dictionary)
  return np.array(x_train), np.array(y_train), show

In [None]:
X_train = X_trn.reshape([-1, 28 * 28]) / 255.0
X_test = X_tst.reshape([-1, 28 * 28]) / 255.0

y_train = y_trn.copy()
y_test = y_tst.copy()

In [None]:
X_train, y_train, X_train_show = make_samples(X_train, y_train, X_trn, 40)

print(X_train.shape)
print(X_test.shape)

(400, 784)
(10000, 784)


In [None]:
inputs = 28 * 28
neurons = [128, 32, 1]
acts = [sigm, sigm, relu]
ders = [der_sigm, der_sigm, der_relu]

In [None]:
model = Net(inputs, neurons, activations=acts, deriviatives=ders)
model.train_size_epoch = 1

In [None]:
model.train(X_train, y_train, 0.1, 30, max_error = None)

epoch 1 has started, error = 8.301958924855319
epoch 2 has started, error = 4.807637134365201
epoch 3 has started, error = 3.544028381110672
epoch 4 has started, error = 3.1583208253603
epoch 5 has started, error = 2.838472656593547
epoch 6 has started, error = 2.295031290463058
epoch 7 has started, error = 1.85892250497766
epoch 8 has started, error = 1.593174743247502
epoch 9 has started, error = 1.3036913820046945
epoch 10 has started, error = 0.9774737931715606
epoch 11 has started, error = 0.6548803820713686
epoch 12 has started, error = 0.5212990218432035
epoch 13 has started, error = 0.40360794415327256
epoch 14 has started, error = 0.34697116952217705
epoch 15 has started, error = 0.25815251818275053
epoch 16 has started, error = 0.19336130494113501
epoch 17 has started, error = 0.15639103858239933
epoch 18 has started, error = 0.13870952273634982
epoch 19 has started, error = 0.10993001858728339
epoch 20 has started, error = 0.11333752971051168
epoch 21 has started, error = 0.

In [None]:
acc = 0
mse = 0
for i in range(len(X_test)):
  pred = model.predict(X_test[i])
  if round(pred[0]) == y_test[i]:
    acc += 1
  mse += (pred - y_test[i]) ** 2
acc /= len(X_test)
mse /= len(X_test)
print('Hitrate is {0}. Mean squared error is {1}'.format(acc, mse))

Hitrate is 0.5465. Mean squared error is [2.54127612]
