In [10]:
import numpy as np
import random
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
import pickle
import os

# Импорт из ES-HyperNEAT (предполагается, что установлен)
from ES_HyperNEAT.hyperneat import HyperNEAT
from ES_HyperNEAT.substrate import Substrate

# Функция генерации датасета (как раньше)


def generate_feature(feature_type, length):
    if feature_type == "binary":
        return np.random.choice([0, 1], size=length)
    elif feature_type == "nominal":
        return np.random.choice(['A', 'B', 'C', 'D'], size=length)
    elif feature_type == "ordinal":
        return np.random.choice([1, 2, 3, 4, 5], size=length)
    elif feature_type == "quantitative":
        return np.random.uniform(0, 100, size=length)
    else:
        raise ValueError("Unknown feature type")


def create_dataset(num_features, num_samples):
    feature_types = ["binary", "nominal", "ordinal", "quantitative"]
    chosen_types = random.sample(feature_types, k=min(4, num_features))
    while len(chosen_types) < num_features:
        chosen_types.append(random.choice(feature_types))
    random.shuffle(chosen_types)

    data = {}
    for i in range(num_features):
        data[f"Obj1_Feat{i+1}"] = generate_feature(
            chosen_types[i], num_samples)
    for i in range(num_features):
        data[f"Obj2_Feat{i+1}"] = generate_feature(
            chosen_types[i], num_samples)

    labels = []
    for idx in range(num_samples):
        matches = sum(
            data[f"Obj1_Feat{f}"][idx] == data[f"Obj2_Feat{f}"][idx]
            for f in range(1, num_features+1)
        )
        labels.append(1 if matches >= num_features // 2 else 0)  # 1=Yes, 0=No
    return pd.DataFrame(data), np.array(labels)

# Настройка substrate — пример для ES-HyperNEAT


def create_substrate(input_dims, output_dims=1):
    # Входной слой — например, 2D решётка из признаков (зависит от задачи)
    # Для простоты используем одномерную проекцию: (input_dims, 1, 1)
    input_coordinates = [(i, 0, 0) for i in range(input_dims)]

    # Выходной слой — 1 нейрон
    output_coordinates = [(0, 0, 0)]

    return Substrate(input_coordinates, output_coordinates)

# Функция для оценки модели (F1)


def evaluate_hyperneat_model(hyperneat, X_val, y_val):
    preds = []
    for x in X_val:
        output = hyperneat.feed_forward(x)
        preds.append(1 if output[0] > 0.5 else 0)
    return f1_score(y_val, preds)

# Основная функция запуска


def run_es_hyperneat(X_train, y_train, X_val, y_val, input_dim):
    # Создаём substrate с размером входа input_dim
    substrate = create_substrate(input_dim)

    # Настройки HyperNEAT (здесь базовые, можно тонко настраивать)
    hyperneat = HyperNEAT(
        substrate=substrate,
        population_size=50,
        max_generations=50,
        verbosity=2,
    )

    # Запуск эволюции
    winner = hyperneat.run(X_train, y_train)

    # Оценка модели
    f1 = evaluate_hyperneat_model(hyperneat, X_val, y_val)
    print(f"F1-score модели ES-HyperNEAT: {f1:.4f}")

    return winner, hyperneat


# Путь для сохранения моделей
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok=True)

# Генерируем данные для малого датасета
df_small, labels_small = create_dataset(num_features=4, num_samples=30)
X_small = df_small.values
y_small = labels_small
X_train_s, X_val_s, y_train_s, y_val_s = train_test_split(
    X_small, y_small, test_size=0.2, random_state=42)

# Генерируем данные для большого датасета
df_big, labels_big = create_dataset(num_features=15, num_samples=1500)
X_big = df_big.values
y_big = labels_big
X_train_b, X_val_b, y_train_b, y_val_b = train_test_split(
    X_big, y_big, test_size=0.2, random_state=42)

# Запускаем ES-HyperNEAT на малом датасете
winner_s, model_s = run_es_hyperneat(
    X_train_s, y_train_s, X_val_s, y_val_s, input_dim=X_train_s.shape[1])
pickle.dump((winner_s, model_s), open(
    f"{save_dir}/es_hyperneat_small.pkl", "wb"))

# Запускаем ES-HyperNEAT на большом датасете
winner_b, model_b = run_es_hyperneat(
    X_train_b, y_train_b, X_val_b, y_val_b, input_dim=X_train_b.shape[1])
pickle.dump((winner_b, model_b), open(
    f"{save_dir}/es_hyperneat_big.pkl", "wb"))

ModuleNotFoundError: No module named 'ES_HyperNEAT'

In [11]:
# Импорт необходимых библиотек
import neat  # Библиотека NEAT (нейроэволюция)
import numpy as np  # Для работы с массивами
import pandas as pd  # Для работы с таблицами
import random  # Для генерации случайных чисел
import pickle  # Для сохранения моделей
# Кодирование категориальных признаков
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score  # Метрика оценки
from sklearn.model_selection import train_test_split  # Разделение данных
from pathlib import Path  # Работа с путями файловой системы


# Функция генерации признаков (дублируется для полноты)
def generate_feature(feature_type: str, length: int):
    """Генерирует массив значений признака заданного типа"""
    if feature_type == "binary":
        return np.random.choice([0, 1], size=length)  # Бинарные значения
    elif feature_type == "nominal":
        return np.random.choice(['A', 'B', 'C', 'D'], size=length)  # Категории
    elif feature_type == "ordinal":
        return np.random.choice([1, 2, 3, 4, 5], size=length)  # Порядковые
    elif feature_type == "quantitative":
        return np.random.uniform(0, 100, size=length)  # Числовые
    else:
        raise ValueError("Unknown feature type")  # Ошибка при неизвестном типе


# Функция создания датасета (дублируется для полноты)
def create_dataset(num_features: int, num_samples: int) -> pd.DataFrame:
    """Создает датасет с заданным числом признаков и образцов"""
    feature_types = ["binary", "nominal", "ordinal", "quantitative"]
    chosen_types = random.sample(feature_types, k=min(4, num_features))
    while len(chosen_types) < num_features:
        chosen_types.append(random.choice(feature_types))
    random.shuffle(chosen_types)

    data = {}
    # Генерация признаков для первого объекта
    for i in range(num_features):
        data[f"Obj1_Feat{i + 1}"] = generate_feature(
            chosen_types[i], num_samples)
    # Генерация признаков для второго объекта
    for i in range(num_features):
        data[f"Obj2_Feat{i + 1}"] = generate_feature(
            chosen_types[i], num_samples)

    # Создание меток (Yes/No) на основе совпадений признаков
    labels = []
    for idx in range(num_samples):
        matches = sum(
            data[f"Obj1_Feat{f}"][idx] == data[f"Obj2_Feat{f}"][idx]
            for f in range(1, num_features + 1)
        )
        labels.append("Yes" if matches >= num_features // 2 else "No")
    data["Collision"] = labels
    return pd.DataFrame(data)


# Функция подготовки данных для обучения
def prepare_dataset(num_features, num_samples):
    """Подготавливает данные для нейросети"""
    df = create_dataset(num_features, num_samples)  # Создаем датасет
    X = df.drop(columns=["Collision"])  # Признаки
    y = df["Collision"].map({"Yes": 1, "No": 0})  # Преобразуем метки в числа

    # Кодируем категориальные признаки
    for col in X.columns:
        if X[col].dtype == object:
            X[col] = LabelEncoder().fit_transform(X[col])
    return X.values, y.values  # Возвращаем numpy массивы


# Функция оценки генома (индивидуального решения)
def eval_genome(genome, config, X_train, y_train, X_val, y_val):
    """Оценивает качество генома на валидационных данных"""
    net = neat.nn.FeedForwardNetwork.create(genome, config)  # Создаем сеть
    predictions = []
    for xi in X_val:
        output = net.activate(xi)  # Получаем предсказание
        # Бинарная классификация
        predictions.append(1 if output[0] > 0.5 else 0)
    return f1_score(y_val, predictions)  # Возвращаем F1-score


# Основная функция запуска NEAT
def run_neat(X_train, y_train, X_val, y_val, config_file, generations=50):
    """Запускает алгоритм NEAT для обучения"""
    print(f"Размер данных: {X_train.shape[1]} признаков")

    # Загружаем конфигурацию NEAT из файла
    config = neat.Config(
        neat.DefaultGenome,
        neat.DefaultReproduction,
        neat.DefaultSpeciesSet,
        neat.DefaultStagnation,
        config_file
    )

    # Создаем популяцию
    population = neat.Population(config)

    # Добавляем отчеты для вывода прогресса
    population.add_reporter(neat.StdOutReporter(True))
    population.add_reporter(neat.StatisticsReporter())

    # Функция оценки всех геномов в популяции
    def eval_genomes(genomes, config):
        for genome_id, genome in genomes:
            genome.fitness = eval_genome(
                genome, config, X_train, y_train, X_val, y_val)

    # Запускаем эволюцию
    winner = population.run(eval_genomes, generations)
    return winner, config


# Функция сохранения обученной модели
def save_neat_model(winner, config, filename):
    """Сохраняет лучший геном и конфигурацию в файл"""
    with open(filename, "wb") as f:
        pickle.dump((winner, config), f)


# Основной блок выполнения
if __name__ == "__main__":
    # Подготовка малого датасета (4 признака на объект, всего 8)
    X_small, y_small = prepare_dataset(num_features=4, num_samples=30)
    X_small = np.hstack([X_small[:, :4], X_small[:, 4:8]]
                        )  # Объединяем признаки

    # Подготовка большого датасета (15 признаков на объект, всего 30)
    X_big, y_big = prepare_dataset(num_features=15, num_samples=1500)

    # Разделение на обучающую и валидационную выборки
    X_train_s, X_val_s, y_train_s, y_val_s = train_test_split(
        X_small, y_small, test_size=0.2, random_state=42)
    X_train_b, X_val_b, y_train_b, y_val_b = train_test_split(
        X_big, y_big, test_size=0.2, random_state=42)

    # Пути к файлам конфигурации NEAT
    config_small_path = "config_small"  # Для 8 входных признаков
    config_big_path = "config_big"  # Для 30 входных признаков

    # Запуск NEAT для малого датасета
    winner_small, config_small = run_neat(
        X_train_s, y_train_s, X_val_s, y_val_s, config_small_path)
    print("NEAT для малого датасета завершён.")

    # Запуск NEAT для большого датасета
    winner_big, config_big = run_neat(
        X_train_b, y_train_b, X_val_b, y_val_b, config_big_path)
    print("NEAT для большого датасета завершён.")

    # Сохранение обученных моделей
    save_dir = Path("saved_models")
    save_dir.mkdir(exist_ok=True)  # Создаем директорию, если не существует

    # Сохраняем модель для малого датасета
    save_neat_model(winner_small, config_small, save_dir / "neat_small.pkl")
    # Сохраняем модель для большого датасета
    save_neat_model(winner_big, config_big, save_dir / "neat_big.pkl")

    print("Модели NEAT сохранены.")

Размер данных: 8 признаков


Exception: No such config file: p:\Python\ProgPracticum3\config_small

In [None]:
# Импорт необходимых библиотек
import numpy as np  # Для работы с массивами и математическими операциями
import pandas as pd  # Для работы с табличными данными
import random  # Для генерации случайных чисел
from pathlib import Path  # Для удобной работы с путями файловой системы

# Импорт функций из scikit-learn
from sklearn.model_selection import train_test_split  # Для разделения данных
# Для кодирования и масштабирования
from sklearn.preprocessing import LabelEncoder, StandardScaler
# Модель логистической регрессии
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier  # Модель дерева решений
from sklearn.metrics import f1_score  # Метрика оценки качества

# Импорт библиотеки для оптимизации роевым методом (PSO)
import pyswarms as ps  # Алгоритм оптимизации Particle Swarm Optimization
import pickle  # Для сохранения и загрузки моделей


# Функция генерации признаков заданного типа
def generate_feature(feature_type: str, length: int) -> np.ndarray:
    """Генерирует массив значений признака заданного типа"""
    if feature_type == "binary":
        return np.random.choice([0, 1], size=length)  # Бинарные значения (0/1)
    elif feature_type == "nominal":
        # Категориальные
        return np.random.choice(['A', 'B', 'C', 'D'], size=length)
    elif feature_type == "ordinal":
        return np.random.choice([1, 2, 3, 4, 5], size=length)  # Порядковые
    elif feature_type == "quantitative":
        return np.random.uniform(0, 100, size=length)  # Количественные (0-100)
    else:
        # Ошибка при неверном типе
        raise ValueError("Неизвестный тип признака")


# Функция создания датасета
def create_dataset(num_features: int, num_samples: int) -> pd.DataFrame:
    """Создает датасет с заданным числом признаков и образцов"""
    feature_types = ["binary", "nominal",
                     "ordinal", "quantitative"]  # Доступные типы
    chosen_types = random.sample(feature_types, k=min(
        4, num_features))  # Выбираем случайные типы

    # Дополняем список типов, если нужно больше признаков
    while len(chosen_types) < num_features:
        chosen_types.append(random.choice(feature_types))
    random.shuffle(chosen_types)  # Перемешиваем типы

    data = {}  # Словарь для хранения данных
    # Генерируем признаки для первого объекта
    for i in range(num_features):
        data[f"Obj1_Feat{i + 1}"] = generate_feature(
            chosen_types[i], num_samples)
    # Генерируем признаки для второго объекта
    for i in range(num_features):
        data[f"Obj2_Feat{i + 1}"] = generate_feature(
            chosen_types[i], num_samples)

    # Создаем метки (Yes если >= половины признаков совпадают, иначе No)
    labels = []
    for idx in range(num_samples):
        matches = sum(
            data[f"Obj1_Feat{f}"][idx] == data[f"Obj2_Feat{f}"][idx]
            for f in range(1, num_features + 1)
        )
        labels.append("Yes" if matches >= num_features // 2 else "No")
    data["Collision"] = labels  # Добавляем метки в датасет
    return pd.DataFrame(data)  # Возвращаем DataFrame


# Функция оценки качества для PSO
def pso_fitness(params, model_class, X_train, y_train, X_val, y_val):
    """Вычисляет качество модели для оптимизации PSO"""
    n_particles = params.shape[0]  # Число частиц в рое
    fitness = np.zeros(n_particles)  # Массив для хранения оценок

    for i in range(n_particles):
        p = params[i]  # Параметры текущей частицы
        if model_class == LogisticRegression:
            C = p[0]  # Параметр регуляризации
            max_iter = int(p[1])  # Максимальное число итераций
            max_iter = max(max_iter, 1000)  # Гарантируем минимум 1000 итераций
            model = LogisticRegression(C=C, max_iter=max_iter, solver='lbfgs')
        elif model_class == DecisionTreeClassifier:
            max_depth = int(p[0])  # Максимальная глубина дерева
            # Минимальное число образцов для разделения
            min_samples_split = int(p[1])
            model = DecisionTreeClassifier(
                max_depth=max_depth, min_samples_split=min_samples_split)
        else:
            # Если модель неизвестна, возвращаем худший результат
            fitness[i] = 1.0
            continue

        # Проверка, что есть оба класса
        if len(np.unique(y_train)) < 2:
            fitness[i] = 1.0
            continue

        try:
            model.fit(X_train, y_train)  # Обучаем модель
            preds = model.predict(X_val)  # Прогнозируем на валидации
            score = f1_score(y_val, preds)  # Вычисляем F1-score
            fitness[i] = -score  # Минимизируем -F1 (PSO минимизирует функцию)
        except Exception:
            fitness[i] = 1.0  # При ошибке возвращаем худший результат

    return fitness  # Возвращаем массив оценок


# Функция оптимизации гиперпараметров через PSO
def optimize_pso(model_class, X_train, y_train, X_val, y_val, bounds, options, iters=50):
    """Оптимизирует гиперпараметры модели с помощью PSO"""
    # Инициализируем оптимизатор PSO
    optimizer = ps.single.GlobalBestPSO(
        n_particles=20,  # Число частиц в рое
        dimensions=len(bounds[0]),  # Размерность пространства параметров
        options=options,  # Параметры алгоритма PSO
        bounds=bounds  # Границы пространства поиска
    )

    # Запускаем оптимизацию
    best_cost, best_pos = optimizer.optimize(
        pso_fitness, iters,  # Функция оценки и число итераций
        model_class=model_class, X_train=X_train, y_train=y_train,
        X_val=X_val, y_val=y_val  # Дополнительные аргументы для pso_fitness
    )

    print(f"Лучшие параметры (PSO): {best_pos}, F1: {-best_cost:.4f}")
    return best_pos, -best_cost  # Возвращаем лучшие параметры и F1-score


# Основной блок выполнения
if __name__ == "__main__":
    # Параметры генерации данных
    num_features = 6  # Число признаков у каждого объекта
    num_samples = 200  # Число образцов в датасете

    # Генерируем датасет
    data = create_dataset(num_features, num_samples)
    print(data.head())  # Выводим первые строки датасета

    # Разделяем на признаки (X) и целевую переменную (y)
    X = data.drop(columns=["Collision"])
    y = data["Collision"].map({"Yes": 1, "No": 0})  # Преобразуем метки в числа

    # Кодируем категориальные признаки
    for col in X.columns:
        if X[col].dtype == object:  # Если признак категориальный
            X[col] = LabelEncoder().fit_transform(X[col])  # Кодируем числами

    # Масштабируем количественные признаки
    quant_cols = [col for col in X.columns if X[col].dtype in [
        np.float64, np.float32]]
    scaler = StandardScaler()  # Инициализируем стандартизатор
    if quant_cols:
        X[quant_cols] = scaler.fit_transform(X[quant_cols])  # Масштабируем

    # Разбиваем данные на обучающую, валидационную и тестовую выборки
    X_train_full, X_test, y_train_full, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_full, y_train_full, test_size=0.25, random_state=42)

    # Выводим распределение классов
    print("Распределение классов в полном тренировочном наборе:",
          np.unique(y_train_full, return_counts=True))
    print("Распределение классов в тренировочном наборе:",
          np.unique(y_train, return_counts=True))

    # Задаем диапазоны для оптимизации гиперпараметров
    logreg_bounds = (np.array([0.001, 100]), np.array(
        [10.0, 1500]))  # Для логистической регрессии
    dtree_bounds = (np.array([1, 2]), np.array([20, 20]))  # Для дерева решений

    options = {'c1': 0.5, 'c2': 0.3, 'w': 0.9}  # Параметры PSO

    # Оптимизируем логистическую регрессию
    best_logreg_params, best_logreg_score = optimize_pso(
        LogisticRegression, X_train, y_train, X_val, y_val,
        bounds=logreg_bounds, options=options, iters=50
    )

    # Оптимизируем дерево решений
    best_dtree_params, best_dtree_score = optimize_pso(
        DecisionTreeClassifier, X_train, y_train, X_val, y_val,
        bounds=dtree_bounds, options=options, iters=50
    )

    # Проверяем распределение классов перед финальным обучением
    print("Распределение классов перед финальным обучением:",
          np.unique(y_train_full, return_counts=True))
    if len(np.unique(y_train_full)) < 2:
        raise ValueError(
            "Ошибка: в полном тренировочном наборе только один класс!")

    # Обучаем финальные модели на всех обучающих данных
    final_logreg = LogisticRegression(
        C=best_logreg_params[0],  # Оптимальный C
        # Оптимальное число итераций
        max_iter=int(max(best_logreg_params[1], 1000)),
        solver='lbfgs'  # Алгоритм оптимизации
    )
    final_logreg.fit(X_train_full, y_train_full)  # Обучаем на всех данных

    final_dtree = DecisionTreeClassifier(
        max_depth=int(best_dtree_params[0]),  # Оптимальная глубина
        # Оптимальный параметр разделения
        min_samples_split=int(best_dtree_params[1])
    )
    final_dtree.fit(X_train_full, y_train_full)  # Обучаем на всех данных

    # Сохраняем обученные модели
    save_dir = Path("saved_models")  # Путь к директории
    save_dir.mkdir(exist_ok=True)  # Создаем директорию, если не существует

    # Сохраняем модель логистической регрессии
    with open(save_dir / "best_logreg_pso_model.pkl", "wb") as f:
        pickle.dump(final_logreg, f)

    # Сохраняем модель дерева решений
    with open(save_dir / "best_dtree_pso_model.pkl", "wb") as f:
        pickle.dump(final_dtree, f)

    print("Модели успешно сохранены в 'saved_models'.")