In [14]:
import pandas as pd

import random

import math

from sklearn.preprocessing import StandardScaler

import numpy as np

Необходимо реализовать класс Нейронная сеть, который позволяет инициализировать архитектуру полносвязной сети с возможностью передачи кол-ва нейронов, слоёв, функций активации, а также предполагает методы для обучения и валидации.

- инициализация архитектуры сети
- прямой проход
- обратный
- обучение модели
- предсказание
- валидация

In [15]:
class NeuralNetwork:
    def __init__(self, layers, activation_funcs, lr = 0.001):
        self.layers = layers
        self.activation_funcs = activation_funcs
        self.lr = lr
        self.weights = []
        self.biases = []

        # for i in range(len(layers) - 1):
        #     w = [[random.uniform(-1, 1) for _ in range(layers[i])] for _ in range(layers[i + 1])]
        #     b = [0.0 for _ in range(layers[i + 1])]
        #     self.weights.append(w)
        #     self.biases.append(b)
        for i in range(len(layers) - 1):
            limit = math.sqrt(6 / (layers[i] + layers[i + 1]))
            w = [[random.uniform(-limit, limit) for _ in range(layers[i])] for _ in range(layers[i + 1])]
            b = [0.0 for _ in range(layers[i + 1])]
            self.weights.append(w)
            self.biases.append(b)

    def _activation(self, x, activation_func):
        if activation_func == 'relu':
            return max(0, x)
        elif activation_func == 'sigmoid':
            return 1 / (1 + math.exp(-x))
        elif activation_func == 'tanh':
            return math.tanh(x)
        elif activation_func == 'linear':
            return x

    def _activation_derivative(self, x, activation_func):
        if activation_func == 'relu':
            return 1.0 if x > 0 else 0.0
        elif activation_func == 'sigmoid':
            x = max(min(x, 100), -100)  # ограничиваем диапазон
            return 1 / (1 + math.exp(-x))
        # elif activation_func == 'sigmoid':
        #     sig = self._activation(x, 'sigmoid')
        #     return sig * (1 - sig)
        elif activation_func == 'tanh':
            return 1 - math.tanh(x)**2
        elif activation_func == 'linear':
            return 1.0

    def forward(self, input_vector):
        self.z = []
        self.a = [input_vector]

        for i in range(len(self.weights)):
            z_layer = []
            a_layer = []
            for j in range(len(self.weights[i])):
                z = sum([self.weights[i][j][k] * self.a[-1][k] for k in range(len(self.a[-1]))]) + self.biases[i][j]
                a = self._activation(z, self.activation_funcs[i])
                z_layer.append(z)
                a_layer.append(a)
            self.z.append(z_layer)
            self.a.append(a_layer)

        return self.a[-1]

    def backward(self, input_vector, target):
        output = self.a[-1] # предсказанное значение
        target = [target] if isinstance(target, (int, float)) else target # фактическое значение, в кот-ое д/б попасть
        deltas = []
        
        # дельта (ошибка) выходного слоя
        delta = [(output[i] - target[i]) * self._activation_derivative(self.z[-1][i], self.activation_funcs[-1])
                 for i in range(len(output))]
        deltas.append(delta)

        # дельты (ошибки) скрытых слоёв
        for i in reversed(range(len(self.weights) - 1)):
            delta = []
            for j in range(len(self.weights[i])):
                # смотрим, как каждый нейрон повлиял на ошибку следующего слоя
                error = sum([deltas[0][k] * self.weights[i + 1][k][j] for k in range(len(deltas[0]))]) 
                # домножаем на на производную активации, чтобы узнать дельту нейрона j
                delta_j = error * self._activation_derivative(self.z[i][j], self.activation_funcs[i])
                delta.append(delta_j)
            deltas.insert(0, delta) # добавляем дельту в начало списка, чтобы сохранить порядок слоёв

        for i in range(len(self.weights)):
            for j in range(len(self.weights[i])):
                for k in range(len(self.weights[i][j])):
                    self.weights[i][j][k] -= self.lr * deltas[i][j] * self.a[i][k] # градиентый спуск
                self.biases[i][j] -= self.lr * deltas[i][j]

    def train(self, X_train, Y_train, epochs=1000, batch_size=32):
        n_samples = len(X_train)
        for epoch in range(epochs):
            print (f"Epoch {epoch} started")
            indices = np.arange(n_samples)
            np.random.shuffle(indices)
            total_loss = 0

            for start_idx in range(0, n_samples, batch_size):
                end_idx = start_idx + batch_size
                batch_idx = indices[start_idx:end_idx]
                X_batch = X_train[batch_idx]
                y_batch = Y_train[batch_idx]

                outputs = [self.forward(x.tolist()) for x in X_batch]

                batch_loss = 0
                for i in range(len(X_batch)):
                    loss = sum([
                        (outputs[i][j] - y_batch[i][j])**2
                        if np.isfinite(outputs[i][j]) and np.isfinite(y_batch[i][j])
                        else 0.0
                        for j in range(len(outputs[i]))
                    ])
                    
                    # loss = sum([(outputs[i][j] - y_batch[i][j])**2 for j in range(len(outputs[i]))])
                    batch_loss += loss
                    self.backward(X_batch[i].tolist(), y_batch[i].tolist())

                total_loss += batch_loss

            if epoch % 5 == 0:
                avg_loss = total_loss / n_samples
                print(f"Epoch {epoch}, MSE: {avg_loss:.4f}")

    def predict(self, X):
        return [self.forward(x.tolist()) for x in X]

    def calculate_mae(self, X, y):
        predictions = self.predict(X)
        total_error = 0
        for i in range(len(y)):
            target = [y[i]] if isinstance(y[i], (int, float)) else y[i]
            error = sum([abs(predictions[i][j] - target[j]) for j in range(len(predictions[i]))])
            total_error += error
        return total_error / len(y)

In [16]:
# Загрузка и подготовка данных
df = pd.read_csv("./additional/ParisHousing.csv")
X = df.drop('price', axis=1).values
Y = df['price'].values

In [17]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
Y_log = np.log1p(Y)

In [18]:
indices = list(range(len(X_scaled)))
random.shuffle(indices)
split = int(0.8 * len(X_scaled))
train_idx = indices[:split]
test_idx = indices[split:]

X_train = X_scaled[train_idx]
Y_train = Y_log[train_idx].reshape(-1, 1)
X_test = X_scaled[test_idx]
Y_test = Y_log[test_idx].reshape(-1, 1)

In [19]:
# Инициализация и обучение модели
model = NeuralNetwork(
    layers=[X_train.shape[1], 64, 32, 1],
    activation_funcs=['relu', 'relu', 'linear'],
    lr=0.001
)

In [20]:
model.train(X_train, Y_train, epochs=150, batch_size=100)

Epoch 0 started
Epoch 0, MSE: 82589168507495832008720493638633845211505927531858423000562986088143660526649852409794360571928999716802790966318589687719068168509799132593830804308553366152043644676025858094678560213458330883601647507624802999937268780293276335461794456685926668137259696066938900385916979773440.0000
Epoch 1 started
Epoch 2 started
Epoch 3 started
Epoch 4 started
Epoch 5 started
Epoch 5, MSE: 0.0000
Epoch 6 started
Epoch 7 started
Epoch 8 started
Epoch 9 started
Epoch 10 started
Epoch 10, MSE: 0.0000
Epoch 11 started


KeyboardInterrupt: 

In [None]:
mae_train = model.calculate_mae(X_train, Y_train)
mae_test = model.calculate_mae(X_test, Y_test)