In [2]:
import pandas as pd
import random
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.naive_bayes import GaussianNB
import joblib
import pygad
import numpy as np
import pyswarms as ps
from neat import Config, Population, nn, statistics
import neat
import pickle
from pureples.shared.substrate import Substrate
from pureples.es_hyperneat.es_hyperneat import ESNetwork
from pureples.hyperneat.hyperneat import create_phenotype_network

## Лабораторная работа 2

### Задание 1

In [3]:
def generate_dataset(num_samples):
    data = []
    for _ in range(num_samples):
        object1_state = [
            random.choice([0, 1]),
            random.choice(['Сфера', 'Куб', 'Пирамида']),  # Номинальный признак
            random.choice(['Вперед', 'Назад', 'Лево', 'Право']),  # Порядковый признак
            random.uniform(0, 360)  # Количественный признак
        ]
        
        # Состояние объекта 2
        object2_state = [
            random.choice([0, 1]),  # Бинарный признак
            random.choice(['Сфера', 'Куб', 'Пирамида']),  # Номинальный признак
            random.choice(['Вперед', 'Назад', 'Лево', 'Право']),  # Порядковый признак
            random.uniform(0, 360)  # Количественный признак
        ]
        
        collision = random.choice(['Да', 'Нет'])
        
        data.append(object1_state + object2_state + [collision])
    
    columns = [
        'Объект1_Бинарный', 'Объект1_Форма', 'Объект1_Направление', 'Объект1_Угол',
        'Объект2_Бинарный', 'Объект2_Форма', 'Объект2_Направление', 'Объект2_Угол',
        'Коллизия'
    ]
    return pd.DataFrame(data, columns=columns)

datasets = []
sample_ranges = [(30, 100), (100, 500), (500, 1000), (1000, 2000)]
feature_ranges = ['4-7', '8-10', '10+']

for sample_range in sample_ranges:
    for _ in range(3):
        num_samples = random.randint(*sample_range)
        dataset = generate_dataset(num_samples)
        datasets.append(dataset)

for i, dataset in enumerate(datasets):
    dataset.to_csv(f'dataset_{i+1}.csv', index=False)

### Задание 2

In [4]:
df = datasets[0]

X = df.drop('Коллизия', axis=1)
y = df['Коллизия']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

numeric_features = ['Объект1_Угол', 'Объект2_Угол']
numeric_transformer = StandardScaler()

categorical_features = ['Объект1_Форма', 'Объект1_Направление', 'Объект2_Форма', 'Объект2_Направление']
categorical_transformer = OneHotEncoder(handle_unknown='ignore')

binary_features = ['Объект1_Бинарный', 'Объект2_Бинарный']
binary_transformer = 'passthrough'

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features),
        ('bin', binary_transformer, binary_features)
    ])

models = {
    # Логистическая регрессия - быстрый и интерпретируемый алгоритм
    'Logistic Regression': LogisticRegression(max_iter=1000),
    # Решающее дерево - быстрый и не требует масштабирования данных
    'Decision Tree': DecisionTreeClassifier(),
    # Случайный лес - устойчив к переобучению, хорош для разнородных данных
    'Random Forest': RandomForestClassifier(),
    # SVM - хорош для сложных границ решений, но требует больше ресурсов
    'SVM': SVC(probability=True),
    # Наивный Байес - очень быстрый, хорошо работает с категориальными данными
    'Naive Bayes': GaussianNB()
}

results = {}
for name, model in models.items():
    pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    results[name] = accuracy
    
    joblib.dump(pipeline, f'{name.replace(" ", "_")}_v2_model.pkl')
    print(f'{name}: Accuracy = {accuracy:.4f}')

print("\nРезультаты точности моделей:")
for name, accuracy in results.items():
    print(f"{name}: {accuracy:.4f}")

Logistic Regression: Accuracy = 0.5000
Decision Tree: Accuracy = 0.5000
Random Forest: Accuracy = 0.3889
SVM: Accuracy = 0.5000
Naive Bayes: Accuracy = 0.3889

Результаты точности моделей:
Logistic Regression: 0.5000
Decision Tree: 0.5000
Random Forest: 0.3889
SVM: 0.5000
Naive Bayes: 0.3889


In [5]:
best_model_name = max(results, key=results.get)
best_model_accuracy = results[best_model_name]
print(f"\nЛучшая модель: {best_model_name} с точностью {best_model_accuracy:.4f}")

loaded_model = joblib.load(f'{best_model_name.replace(" ", "_")}_v2_model.pkl')
sample_data = X_test.iloc[[0]]
prediction = loaded_model.predict(sample_data)
print(f"\nПример предсказания для строки:\n{sample_data}\nПредсказание: {prediction[0]}")
print(f"Реальное значение: {y_test.iloc[0]}")


Лучшая модель: Logistic Regression с точностью 0.5000

Пример предсказания для строки:
    Объект1_Бинарный Объект1_Форма Объект1_Направление  Объект1_Угол  \
44                 0      Пирамида               Назад    128.604488   

    Объект2_Бинарный Объект2_Форма Объект2_Направление  Объект2_Угол  
44                 0         Сфера              Вперед    308.682133  
Предсказание: Нет
Реальное значение: Нет


## Лабораторная работа 3

### PyGAD

In [17]:
# ===== БЛОК 1: Оптимизация лучших моделей =====
# Вместо Naive Bayes будем оптимизировать Random Forest и SVM

def optimize_with_pygad(model_name, X_train, y_train, X_test, y_test):
    # Пространство параметров для Random Forest
    if model_name == 'Random Forest':
        gene_space = [
            {'low': 10, 'high': 200},    # n_estimators
            {'low': 2, 'high': 20},       # max_depth
            {'low': 2, 'high': 20}        # min_samples_split
        ]
    # Пространство параметров для SVM
    elif model_name == 'SVM':
        gene_space = [
            {'low': 0.1, 'high': 10},   # C
            {'low': 0, 'high': 1},       # kernel (0-linear, 1-rbf)
            {'low': 0, 'high': 1}        # gamma (0-scale, 1-auto)
        ]

    # Функция приспособленности
    def fitness_func(ga_instance, solution, solution_idx):
        if model_name == 'Random Forest':
            model = RandomForestClassifier(
                n_estimators=int(solution[0]),
                max_depth=int(solution[1]),
                min_samples_split=int(solution[2]),
                random_state=42
            )
        elif model_name == 'SVM':
            model = SVC(
                C=solution[0],
                kernel=['linear', 'rbf'][int(solution[1])],
                gamma=['scale', 'auto'][int(solution[2])],
                probability=True
            )
        
        pipeline = Pipeline(steps=[
            ('preprocessor', preprocessor),
            ('classifier', model)
        ])
        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        return accuracy

    # Настройка алгоритма
    ga = pygad.GA(
        num_generations=15,
        num_parents_mating=3,
        sol_per_pop=8,
        num_genes=len(gene_space),
        gene_space=gene_space,
        fitness_func=fitness_func,
        mutation_num_genes=1
    )
    ga.run()
    
    # Сохранение лучшей модели
    best_params = ga.best_solution()[0]
    if model_name == 'Random Forest':
        best_model = RandomForestClassifier(
            n_estimators=int(best_params[0]),
            max_depth=int(best_params[1]),
            min_samples_split=int(best_params[2]),
            random_state=42
        )
    else:
        best_model = SVC(
            C=best_params[0],
            kernel=['linear', 'rbf'][int(best_params[1])],
            gamma=['scale', 'auto'][int(best_params[2])],
            probability=True
        )
    
    best_pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', best_model)
    ])
    best_pipeline.fit(X_train, y_train)
    joblib.dump(best_pipeline, f'{model_name.replace(" ", "_")}_pygad_optimized.pkl')
    
    # Оценка точности
    y_pred = best_pipeline.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Оптимизированная {model_name} (PyGAD) точность: {accuracy:.4f}")
    return best_params

# Оптимизируем две лучшие модели из ЛР2
best_rf_params = optimize_with_pygad('Random Forest', X_train, y_train, X_test, y_test)
best_svm_params = optimize_with_pygad('SVM', X_train, y_train, X_test, y_test)

Оптимизированная Random Forest (PyGAD) точность: 0.6364
Оптимизированная SVM (PyGAD) точность: 0.3636


### PySwarms

In [None]:
# ===== БЛОК 4: Исправленная оптимизация PySwarms =====

def optimize_with_pyswarms(model_name, X_train, y_train, X_test, y_test):
    # Пространство поиска для Random Forest
    if model_name == 'Random Forest':
        dimensions = 3
        bounds = ([10, 2, 2], [200, 20, 20])  # n_estimators, max_depth, min_samples_split
    # Пространство поиска для SVM
    else:
        dimensions = 3
        bounds = ([0.1, 0, 0], [10, 1, 1])  # C, kernel, gamma

    # Функция стоимости
    def f(x):
        scores = []
        for params in x:
            try:
                if model_name == 'Random Forest':
                    model = RandomForestClassifier(
                        n_estimators=int(params[0]),
                        max_depth=int(params[1]),
                        min_samples_split=int(params[2]),
                        random_state=42
                    )
                else:
                    model = SVC(
                        C=params[0],
                        kernel=['linear', 'rbf'][int(params[1])],
                        gamma=['scale', 'auto'][int(params[2])],
                        probability=True
                    )
                
                pipeline = Pipeline([
                    ('preprocessor', preprocessor),
                    ('classifier', model)
                ])
                pipeline.fit(X_train, y_train)
                y_pred = pipeline.predict(X_test)
                accuracy = accuracy_score(y_test, y_pred)
                scores.append(1 - accuracy)  # Минимизируем ошибку
            except Exception as e:
                scores.append(1)  # Штраф за невалидные параметры
        return np.array(scores)

    # Запуск оптимизации
    optimizer = ps.single.GlobalBestPSO(
        n_particles=10,
        dimensions=dimensions,
        options={'c1': 0.5, 'c2': 0.3, 'w': 0.9},
        bounds=bounds
    )
    best_cost, best_pos = optimizer.optimize(f, iters=20)
    
    # Сохранение и оценка модели
    if model_name == 'Random Forest':
        best_model = RandomForestClassifier(
            n_estimators=int(best_pos[0]),
            max_depth=int(best_pos[1]),
            min_samples_split=int(best_pos[2]),
            random_state=42
        )
    else:
        best_model = SVC(
            C=best_pos[0],
            kernel=['linear', 'rbf'][int(best_pos[1])],
            gamma=['scale', 'auto'][int(best_pos[2])],
            probability=True
        )
    
    best_pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', best_model)
    ])
    best_pipeline.fit(X_train, y_train)
    joblib.dump(best_pipeline, f'{model_name.replace(" ", "_")}_pyswarms_optimized.pkl')
    
    y_pred = best_pipeline.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Оптимизированная {model_name} (PSO) точность: {accuracy:.4f}")
    return best_pos

# Оптимизация двух лучших моделей
best_swarm_rf = optimize_with_pyswarms('Random Forest', X_train, y_train, X_test, y_test)
best_swarm_svm = optimize_with_pyswarms('SVM', X_train, y_train, X_test, y_test)

2025-05-24 07:05:07,099 - pyswarms.single.global_best - INFO - Optimize for 50 iters with {'c1': 0.5, 'c2': 0.3, 'w': 0.9}
pyswarms.single.global_best: 100%|██████████|50/50, best_cost=0.5
2025-05-24 07:05:15,700 - pyswarms.single.global_best - INFO - Optimization finished | best cost: 0.5, best pos: [0.69053755 0.38016987 0.79807297]


Лучшие параметры SVM (PSO): [0.69053755 0.38016987 0.79807297]


### NEAT

In [19]:
# ===== БЛОК 2: Исправленная реализация NEAT =====

def run_neat(dataset, config_path, output_name):
    # Предобработка данных
    X = dataset.drop('Коллизия', axis=1)
    y = dataset['Коллизия'].map({'Да': 1, 'Нет': 0})  # Преобразуем в числовой формат
    
    # Разделение данных
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # Преобразование данных
    X_train_transformed = preprocessor.fit_transform(X_train)
    X_test_transformed = preprocessor.transform(X_test)
    
    # Функция оценки генома
    def eval_genomes(genomes, config):
        for genome_id, genome in genomes:
            net = nn.FeedForwardNetwork.create(genome, config)
            genome.fitness = 0
            
            correct = 0
            for i in range(len(X_train_transformed)):
                inputs = X_train_transformed[i]
                output = net.activate(inputs)
                pred = 1 if output[0] > 0.5 else 0
                if pred == y_train.iloc[i]:
                    correct += 1
            
            genome.fitness = correct / len(X_train_transformed)
    
    # Загрузка конфигурации
    config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                         neat.DefaultSpeciesSet, neat.DefaultStagnation,
                         config_path)
    
    pop = neat.Population(config)
    pop.add_reporter(neat.StdOutReporter(True))
    stats = neat.StatisticsReporter()
    pop.add_reporter(stats)
    
    # Запуск NEAT (меньше поколений для скорости)
    winner = pop.run(eval_genomes, 20)
    
    # Сохранение лучшей сети
    winner_net = nn.FeedForwardNetwork.create(winner, config)
    with open(f'best_{output_name}.pkl', 'wb') as f:
        pickle.dump(winner_net, f)
    
    # Оценка точности
    correct = 0
    for i in range(len(X_test_transformed)):
        inputs = X_test_transformed[i]
        output = winner_net.activate(inputs)
        pred = 1 if output[0] > 0.5 else 0
        if pred == y_test.iloc[i]:
            correct += 1
    
    accuracy = correct / len(X_test_transformed)
    print(f"Точность NEAT ({output_name}): {accuracy:.4f}")
    
    return winner_net, accuracy

# Загрузка минимального и максимального датасетов
min_dataset = datasets[0]
max_dataset = datasets[-1]

# Запуск NEAT для обоих датасетов
neat_min, acc_min = run_neat(min_dataset, 'neat_config.txt', 'neat_min')
neat_max, acc_max = run_neat(max_dataset, 'neat_config.txt', 'neat_max')

# Сравнение моделей
print(f"\nСравнение моделей NEAT:")
print(f"Минимальный датасет ({len(min_dataset)} строк): Точность = {acc_min:.4f}")
print(f"Максимальный датасет ({len(max_dataset)} строк): Точность = {acc_max:.4f}")
print(f"Разница в точности: {abs(acc_min - acc_max):.4f}")


 ****** Running generation 0 ****** 

Population's average fitness: 0.50333 stdev: 0.06988
Best fitness: 0.69048 - size: (1, 18) - species 1 - id 55
Average adjusted fitness: 0.194
Mean genetic distance 1.153, standard deviation 0.348
Population of 150 members in 1 species:
   ID   age  size  fitness  adj fit  stag
     1    0   150      0.7    0.194     0
Total extinctions: 0
Generation time: 0.067 sec

 ****** Running generation 1 ****** 

Population's average fitness: 0.54619 stdev: 0.06443
Best fitness: 0.69048 - size: (1, 18) - species 1 - id 55
Average adjusted fitness: 0.189
Mean genetic distance 1.148, standard deviation 0.388
Population of 150 members in 1 species:
   ID   age  size  fitness  adj fit  stag
     1    1   150      0.7    0.189     1
Total extinctions: 0
Generation time: 0.066 sec (0.066 average)

 ****** Running generation 2 ****** 

Population's average fitness: 0.58222 stdev: 0.06480
Best fitness: 0.76190 - size: (2, 18) - species 1 - id 421
Average adjusted 