<h1>Лабораторная работа №3. "Обучение многослойного перцептрона. Предсказание временных рядов с помощью нейросети"</h1>
<p>Лабораторная работа №3, Мартынов В.В., ИВТ-1, 4 курс, Физ-тех, Основы ИИ, Вариант №6</p>
<hr>

<h2>Чтение и обработка данных</h2>
<p>Метод на возоде получает путь к файлу и начинает его чтение. После извлечения всех строк файла происходит очитска полностью пустых колонок и строк. После очистки данных происходит доазполнение пустых значений методом линейной интерполяции. Последним этапом становиться сглаживание данных методом скользящего среднего.</p>
<hr>

In [None]:
import csv
import os

from tkinter import Tk
from tkinter.filedialog import askopenfilename

import numpy as np

from scipy.ndimage import gaussian_filter1d

In [None]:
def process_csv(filename):
    data = []
    
    # Считывание данных и заполнение массива данными
    with open(filename, 'r', newline='') as file:
        reader = csv.reader(file, delimiter=';')
        next(reader, None)
    
        for row in reader:
            data.append(row)
    
    # Удаление пустых строк
    data = [row for row in data if any(item.strip() for item in row)]
    
    transposed_data = list(zip(*data))
    
    # Удаление пустых столбцов
    transposed_data = [col for col in transposed_data if any(item.strip() for item in col)]
    
    data = list(zip(*transposed_data))
    
    # Преобразование данных в числовой массив NumPy (с обработкой ошибок)
    num_data = []
    for row in data:
        num_row = []
        for item in row:
            try:
                num_row.append(float(item.replace(',', '.')))
            except ValueError:
                num_row.append(np.nan)  # Заменяем нечисловые значения на NaN
        num_data.append(num_row)
    
    num_data = np.array(num_data)
    
    # Заполнение пропусков методом линейной интерполяции
    for i in range(num_data.shape[1]):
        col = num_data[:, i]
        mask = np.isnan(col)
        if np.any(~mask): # Проверка на наличие хотя бы одного не-NaN значения
            col[mask] = np.interp(np.where(mask)[0], np.where(~mask)[0], col[~mask])
        else:
            print(f"Предупреждение: Столбец {i+1} содержит только NaN значения. Заполнение пропусков невозможно.")
    
    # Сглаживание данных методом скользящего среднего
    smoothed_data = gaussian_filter1d(num_data, sigma=1, axis=0) # sigma - параметр сглаживания
    
    return smoothed_data.tolist()

<hr>
<h2>Подготвка данных</h2>
<p>Получает на вход последовательность и количество шагов. Преобразует исходнкю последовательность в обучающую выборку X, Y для модели.</p>
<hr>

In [None]:
def split_sequence(sequence, n_steps):
    X, y = list(), list()
    for i in range(len(sequence)):
        # Нахождение конца последовательности
        end_ix = i + n_steps
        
        # Проверка на выход за пределы последовательности
        if end_ix > len(sequence) - 1:
            break
        
        # Получение входных и выходных значений последовательности
        seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
        X.append(seq_x)
        y.append(seq_y)
    return np.array(X), np.array(y)

<hr>
<h2>Нахождения максимума нейронов</h2>
<p>При помощи функции (3.3) из теоретических сведений лабораторной работы №3 будут расчиты минимум и максимум нейронов.</p>
<hr>

In [None]:
def calculate_neurons(nx, ny, np_var):
    # Расчет количества синаптических весов
    nw_min = (ny * np_var) / (1 + math.log2(np_var))
    nw_max = ny * ((np_var / ny) + 1) * (nx + ny + 1) + ny
    
    return int(round(nw_min)), int(round(nw_max))

<hr>
<h2>Создание и обучение модели</h2>
<p>Метод принимающий на вход обработанные и подготовленные данные, а так же информацию о нужной модели (количество нейронов, функцию активации и количество шагов). Распределяет полученные данные на входе на тестовую и обучающую выборку (в соотношении 1 (тестовая) к 4 (обучающая)). Создаёт модель с 2-мя слоями и полученным на входе количеством нейронов, указывается функция активации поступившая на входе. В виде оптимизатора был выбрал Adam с начальным шагом 0.05.</p>
<hr>

In [None]:
from sklearn.model_selection import train_test_split
from keras.models import Model
from keras.layers import Dense, Input
import tensorflow as tf
from tensorflow.keras.optimizers import Adam

In [None]:
def build_and_train_model(n_neurons_per_layer, activation, n_steps, X, y, loss_threshold = 0.005):
    batch_size = 28
    num_epochs = 75
    
    # Разделение данных
    trainX, testX, trainY, testY = train_test_split(X, y, test_size = 0.3375)
    
    # Создание модели с использованием функционального API
    inp = Input(shape=(n_steps,))
    hidden = Dense(n_neurons_per_layer, activation=activation)(inp)
    # hidden_1 = Dense(n_neurons_per_layer, activation=activation)(hidden)
    out = Dense(1)(hidden)

    model = Model(inputs=inp, outputs=out)
    optimizer = Adam(learning_rate = 0.05)
    model.compile(loss=tf.keras.losses.MeanSquaredError(), optimizer=optimizer)

    history = model.fit(trainX, trainY,
          batch_size = batch_size, epochs = num_epochs,
          verbose = 1, validation_split = 0.1)
    model.evaluate(testX, testY, verbose = 1)

    return model, history

<hr>
<h2>Запсук последовательности обучения и прогнозирования данных</h2>
<p>Главная функция программы отвечающая за последовательность выполнения всей программы.</p>
<hr>

In [None]:
import matplotlib.pyplot as plt
import math

import itertools

In [None]:
def main():
    # 1. Ввод параметров пользователем
    activation = input("Введите функцию активации ('tanh' или 'sigmoid'): ")
    n_steps = int(input("Введите длину временного шага (n_steps): "))

    # 2. Загрузка и подготовка данных
    filename = askopenfilename(filetypes=[("CSV files", "*.csv")])
    data = process_csv(filename)

    # 3. Расчёт количества нейронов для слоём
    series = np.array(data)[:, 14]  
    X, y = split_sequence(series, n_steps)

    # 4. Расчёт максимального количества нейронов скрытого слоя
    nx = n_steps
    ny = 1
    np_var = len(X)
    nw_min, nw_max = calculate_neurons(nx, ny, np_var)

    # 5. Ввод количества нейронов пользователем
    n_neurons_per_layer = int(input(f"Введите количество нейронов в каждом из скрытых слоев (минимум {nw_min}, максимум {nw_max}): "))
    
    # Проверка на корректность ввода
    if n_neurons_per_layer < nw_min or n_neurons_per_layer > nw_max:
        print("Ошибка ввода числа нейронов. Завершение работы.")
        return

    # Исходя и формулы 3.3 расчитываем итоговое количество нейронов в скрытыхъ слоях
    n_neurons_per_layer = n_neurons_per_layer / (nx + ny)
    n_neurons_per_layer = int(round(n_neurons_per_layer))
    
    # 6. Создание и обучение модели
    model, history = build_and_train_model(n_neurons_per_layer, activation, n_steps, X, y)

    # 7. Анализ эффективности и вывод результатов
    print(model.summary())
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper right')
    plt.show()

    # 8. Сохранение обученной модели
    model.save('best_model.h5')

In [None]:
def main_test():
    # 2. Загрузка и подготовка данных
    filename = askopenfilename(filetypes=[("CSV files", "*.csv")])
    data = process_csv(filename)

    # Комбинации для обучения
    n_neurons_combinations = [10, 15, 20, 25]
    n_steps_combinations = [4]

    # Функция активации
    activation_function_list = ['tanh', 'sigmoid']
    
    best_model = None
    min_val_loss = float('inf') # Начальное значение, гарантирующее, что val_loss любой модели будет меньше

    # 3. Цикл по комбинациям
    for activation_function, n_neurons, n_steps in itertools.product(activation_function_list, n_neurons_combinations, n_steps_combinations):
        print(f"Функция активации: {activation_function}, Количество нейронов: {n_neurons}, Количество временных шагов: {n_steps}")
        
        series = np.array(data)[:, 14]
        X, y = split_sequence(series, n_steps)
        
        nx = n_steps
        ny = 1
        np_var = len(X)
        nw_min, nw_max = calculate_neurons(nx, ny, np_var)

        model, history = build_and_train_model(n_neurons, activation_function, n_steps, X, y)
        print(model.summary())

        # ... (остальной вывод результатов) ...
        plt.plot(history.history['loss'])
        plt.plot(history.history['val_loss'])
        plt.title('Model loss')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Test'], loc='upper right')
        plt.show()

        val_loss = history.history['val_loss'][-1] # Берем val_loss из последней эпохи
        
        if val_loss < min_val_loss:
            min_val_loss = val_loss
            best_model = model

    # Сохранение лучшей модели (еще один вариант - проверка на наличие файла)
    if best_model:
        best_model.save('best_model.h5')
        print(f"Лучшая модель сохранена в best_model.h5 (val_loss = {min_val_loss})")

In [None]:
if __name__ == "__main__":
    # main()
    main_test()

<hr>
<h2>Предсказание данных</h2>
<p>Функция выполняющаяся после обучения и сохранения модели. Находит лучшую сохранённую модель и на основе входных данных предсказывает будущее значение.</p>
<hr>

In [None]:
from tensorflow.keras.models import load_model

In [None]:
# Загрузка и тестирование лучшей модели
if os.path.exists('best_model.h5'):
    best_model = load_model('best_model.h5')
    # Ниже указаны данные для предсказания. На вход модели подавался набор из 4 данных.
    X_test = [0.7568, 0.7119, 0.6406, 0.7139]

    X_test = np.array([X_test])
    predictions = best_model.predict(X_test)[0][0]

    class_boundaries = [0, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5]
    predicted_class_index = -1 # Инициализация
    
    if predictions <= class_boundaries[0]:
        predicted_class_index = 0
    elif predictions <= class_boundaries[1]:
        predicted_class_index = 1
    elif predictions <= class_boundaries[2]:
        predicted_class_index = 2
    elif predictions <= class_boundaries[3]:
        predicted_class_index = 3
    elif predictions <= class_boundaries[4]:
        predicted_class_index = 4
    elif predictions <= class_boundaries[5]:
        predicted_class_index = 5
    else:
        predicted_class_index = 6

    predicted_class = predicted_class_index + 1
    upper_bound = class_boundaries[predicted_class_index + 1] if predicted_class_index + 1 < len(class_boundaries) else "∞"  # Разделение на два этапа
    predicted_interval = f"{class_boundaries[predicted_class_index]:.2f} - {upper_bound}"
    
    print(f"Предсказанное значение: {predicted_value:.4f}")
    print(f"Предсказанный класс (индекс): {predicted_class_index}")
    print(f"Предсказанный класс: {predicted_class}")
    print(f"Предсказанный интервал: {predicted_interval}")