In [3]:
import random
import numpy as np
import time
from concurrent.futures import ThreadPoolExecutor

# Rastrigin function as the fitness function for optimization
def rastrigin_function(x):
    A = 10
    return A * len(x) + sum([(xi ** 2 - A * np.cos(2 * np.pi * xi)) for xi in x])

class GeneticAlgorithmOptimizer:
    def __init__(self, param_ranges, generations=100, population_size=1000, mutation_rate=0.1,
                 mutation_percent_genes=0.2, mutation_by_replacement=True, mutation_range_factor=0.2,
                 mutation_strategy='uniform', max_no_improvement=10, elite_fraction=0.1, 
                 early_stopping_threshold=1e-6, super_mutation_rate=0.05, seed=42):
        self.param_ranges = param_ranges
        self.generations = generations
        self.population_size = population_size
        self.mutation_rate = mutation_rate
        self.mutation_percent_genes = mutation_percent_genes  # % of genes to mutate
        self.mutation_by_replacement = mutation_by_replacement  # Full gene replacement
        self.mutation_range_factor = mutation_range_factor
        self.mutation_strategy = mutation_strategy
        self.max_no_improvement = max_no_improvement
        self.elite_fraction = elite_fraction
        self.early_stopping_threshold = early_stopping_threshold
        self.super_mutation_rate = super_mutation_rate
        self.seed = seed
        self.cache = {}
        self.evaluated_params = set()
        np.random.seed(seed)
        random.seed(seed)

    # Fitness function evaluation with memoization
    def evaluate_params(self, params):
        key = tuple(params.items())
        if key in self.cache:
            return self.cache[key]

        x = np.array([params[key] for key in params])
        result = rastrigin_function(x)  # Changed to Rastrigin function
        self.cache[key] = result
        return result

    # Initialize population with random individuals
    def initialize_population(self, size):
        population = []
        while len(population) < size:
            individual = {param: random.uniform(self.param_ranges[param][0], self.param_ranges[param][1]) for param in self.param_ranges}
            if tuple(individual.items()) not in self.evaluated_params:
                population.append(individual)
                self.evaluated_params.add(tuple(individual.items()))
        return population

    # Select the best individuals from the population based on their fitness scores
    def select_best_individuals(self, population, scores, k):
        selected = np.argsort(scores)[:k]
        return [population[i] for i in selected]

    # Crossover between two parents to create a child
    def crossover(self, parent1, parent2):
        child = {}
        for param in self.param_ranges:
            alpha = random.random()
            child[param] = alpha * parent1[param] + (1 - alpha) * parent2[param]
        return child

    # Mutation with replacement option and percent-based mutation control
    def mutate(self, individual, mutation_rate, mutation_ranges, strategy='uniform'):
        mutated_individual = individual.copy()
        num_genes_to_mutate = int(self.mutation_percent_genes * len(self.param_ranges))

        for _ in range(num_genes_to_mutate):
            param = random.choice(list(self.param_ranges.keys()))
            if random.random() < mutation_rate:
                if self.mutation_by_replacement:
                    mutated_individual[param] = random.uniform(self.param_ranges[param][0], self.param_ranges[param][1])
                else:
                    mutation_amount = random.uniform(-mutation_ranges[param], mutation_ranges[param])
                    mutated_individual[param] += mutation_amount
                    mutated_individual[param] = max(self.param_ranges[param][0], min(mutated_individual[param], self.param_ranges[param][1]))

        return mutated_individual

    # Super mutation: drastic changes for diversity in the population
    def super_mutate(self, individual):
        super_mutated_individual = individual.copy()
        for param in self.param_ranges:
            if isinstance(self.param_ranges[param][0], int):
                super_mutation_amount = random.randint(self.param_ranges[param][0], self.param_ranges[param][1])
            else:
                super_mutation_amount = random.uniform(self.param_ranges[param][0], self.param_ranges[param][1])
            
            super_mutated_individual[param] = super_mutation_amount
        return super_mutated_individual

    def optimize(self):
        start_time = time.time()
        population = self.initialize_population(self.population_size)
        best_score = float('inf')
        best_params = None
        no_improvement_count = 0
        elite_size = int(self.population_size * self.elite_fraction)
        mutation_ranges = {param: (self.param_ranges[param][1] - self.param_ranges[param][0]) * self.mutation_range_factor for param in self.param_ranges}
        best_generation = 0

        for generation in range(self.generations):
            with ThreadPoolExecutor() as executor:
                scores = list(executor.map(self.evaluate_params, population))

            best_individuals = self.select_best_individuals(population, scores, self.population_size // 2)
            min_score = min(scores)
            if min_score < best_score:
                best_score = min_score
                best_params = population[scores.index(min_score)]
                best_generation = generation + 1
                no_improvement_count = 0
            else:
                no_improvement_count += 1

            if best_score < self.early_stopping_threshold:
                print(f"Early stopping: Optimization reached threshold at generation {generation + 1}.")
                break

            if no_improvement_count >= self.max_no_improvement:
                print(f"Stopping early due to no improvement after {generation + 1} generations.")
                break

            new_population = best_individuals[:elite_size]
            while len(new_population) < self.population_size:
                parent1, parent2 = random.sample(best_individuals, 2)
                child = self.crossover(parent1, parent2)
                if random.random() < self.super_mutation_rate:
                    child = self.super_mutate(child)
                else:
                    child = self.mutate(child, self.mutation_rate, mutation_ranges, self.mutation_strategy)
                if tuple(child.items()) not in self.evaluated_params:
                    new_population.append(child)
                    self.evaluated_params.add(tuple(child.items()))

            while len(new_population) < self.population_size:
                individual = {param: random.uniform(self.param_ranges[param][0], self.param_ranges[param][1]) for param in self.param_ranges}
                if tuple(individual.items()) not in self.evaluated_params:
                    new_population.append(individual)
                    self.evaluated_params.add(tuple(individual.items()))

            population = new_population
            print(f'Generation {generation + 1}, Best Score: {best_score}, Best Params: {best_params}')

        end_time = time.time()
        elapsed_time = end_time - start_time

        print("\nOptimization results:")
        print(f"Best generation: {best_generation}")
        print(f"Time to best generation: {elapsed_time:.2f} seconds")
        print(f"Best function value: {best_score}")
        print(f"Best parameters: {best_params}")

        return best_params, best_score

# Example usage with Rastrigin function

param_ranges = {
    "x0": (-5.12, 5.12),
    "x1": (-5.12, 5.12),
    "x2": (-5.12, 5.12),
    "x3": (-5.12, 5.12),
    "x4": (-5.12, 5.12),
    "x5": (-5.12, 5.12),
    "x6": (-5.12, 5.12),
    "x7": (-5.12, 5.12),
    "x8": (-5.12, 5.12),
    "x9": (-5.12, 5.12),
}

optimizer = GeneticAlgorithmOptimizer(
    param_ranges=param_ranges,
    generations=1000,
    population_size=200,
    mutation_rate=0.1,
    mutation_percent_genes=0.2,  # 20% of genes are mutated
    mutation_by_replacement=True,  # Full replacement mutation
    mutation_range_factor=0.1,
    mutation_strategy='uniform',
    max_no_improvement=100,
    elite_fraction=0.1,
    early_stopping_threshold=1e-6,
    super_mutation_rate=0.1
)

best_params, best_score = optimizer.optimize()


Generation 1, Best Score: 98.0306702305312, Best Params: {'x0': -0.9685338832036123, 'x1': 0.025809625481109322, 'x2': 4.046012106918442, 'x3': 2.084424039447101, 'x4': -1.9355858085410786, 'x5': -3.9176627625494906, 'x6': 4.261175151110506, 'x7': -2.0988149744000326, 'x8': 1.1737645889930137, 'x9': -2.8761231264220783}
Generation 2, Best Score: 71.17663362679693, Best Params: {'x0': 0.9915394990607503, 'x1': -0.11140633908219999, 'x2': 1.1015825113428717, 'x3': 2.4875543156278046, 'x4': -0.05910381647510457, 'x5': 0.03977365149516621, 'x6': 1.3076178124264324, 'x7': -1.0018279635339495, 'x8': -0.0707061858468363, 'x9': -0.4978775490505324}
Generation 3, Best Score: 57.4370483347531, Best Params: {'x0': 0.8544209493111776, 'x1': -0.8933038279681149, 'x2': -1.0732121447124894, 'x3': 2.1127964129550443, 'x4': -0.05488010136036531, 'x5': 1.0832601979320247, 'x6': 0.1341236320809136, 'x7': 0.5020834068488766, 'x8': 1.190577129325689, 'x9': 1.1654890274612613}
Generation 4, Best Score: 23.1

In [2]:
# чуть улучшенная версия 
import random
import numpy as np
import pandas as pd
import gc
import time
from pyDOE import lhs

# Универсальная функция кэширования с возможностью очистки кэша
class CacheFunction:
    def __init__(self, func):
        self.func = func
        self.cache = {}

    def __call__(self, x):
        key = tuple(x) if isinstance(x, (list, np.ndarray)) else x
        if key in self.cache:
            return self.cache[key]
        result = self.func(x)
        self.cache[key] = result
        return result

    def clear_cache(self):
        self.cache.clear()

# Пример целевой функции (Растригина с 10 переменными)
def rastrigin_function(x):
    A = 10
    return A * len(x) + np.sum(x**2 - A * np.cos(2 * np.pi * x))

# Кэшированная версия целевой функции
cached_objective_function = CacheFunction(rastrigin_function)

# Инициализация популяции (выбор между случайной и латинским гиперкубом)
def initialize_population(pop_size, x_range, num_dimensions, method='random'):
    if method == 'random':
        return np.random.uniform(x_range[0], x_range[1], (pop_size, num_dimensions))
    elif method == 'lhs':  # Латинский гиперкуб
        lhs_samples = lhs(num_dimensions, samples=pop_size)
        population = lhs_samples * (x_range[1] - x_range[0]) + x_range[0]
        return population

# Селекция лучших индивидов с элитизмом
def select_top_individuals_with_elitism(population, fitness_values, selection_percentage, elite_count=1):
    num_selected = int(len(population) * selection_percentage) - elite_count
    sorted_indices = np.argsort(fitness_values)
    elite_indices = sorted_indices[:elite_count]
    elite_population = population[elite_indices]
    elite_fitness = fitness_values[elite_indices]
    selected_indices = sorted_indices[elite_count:num_selected + elite_count]
    selected_population = population[selected_indices]
    selected_fitness = fitness_values[selected_indices]
    final_population = np.concatenate([elite_population, selected_population])
    final_fitness = np.concatenate([elite_fitness, selected_fitness])
    return final_population, final_fitness

# Скрещивание двух родителей
def crossover(parent1, parent2):
    return np.random.uniform(parent1, parent2)

# Создание потомков
def generate_offspring(selected_population, num_offspring):
    offspring = []
    for _ in range(num_offspring):
        parents = np.random.choice(len(selected_population), 2, replace=False)
        child = crossover(selected_population[parents[0]], selected_population[parents[1]])
        offspring.append(child)
    return np.array(offspring)

# Мутация потомков с новыми гиперпараметрами
def mutate(offspring, mutation_rate, mutation_strength, mutation_percent_genes, mutation_by_replacement, x_range):
    num_to_mutate = int(len(offspring) * mutation_rate)
    indices_to_mutate = np.random.choice(len(offspring), num_to_mutate, replace=False)
    for i in indices_to_mutate:
        num_genes_to_mutate = int(offspring.shape[1] * mutation_percent_genes)
        genes_to_mutate = np.random.choice(offspring.shape[1], num_genes_to_mutate, replace=False)
        for gene_idx in genes_to_mutate:
            if mutation_by_replacement:
                offspring[i, gene_idx] = np.random.uniform(x_range[0], x_range[1])
            else:
                mutation = np.random.uniform(-mutation_strength, mutation_strength) * (x_range[1] - x_range[0])
                offspring[i, gene_idx] += mutation
                offspring[i, gene_idx] = np.clip(offspring[i, gene_idx], x_range[0], x_range[1])
    return offspring

# Оптимизация генетического алгоритма
class GeneticAlgorithmOptimizer:
    def __init__(self, param_ranges, population_size, offspring_ratio, mutation_rate, mutation_strength, mutation_percent_genes,
                 mutation_by_replacement, elite_fraction, initialization_method, selection_percentage, early_stopping_patience, generations=400):
        self.param_ranges = param_ranges
        self.population_size = population_size
        self.offspring_ratio = offspring_ratio
        self.mutation_rate = mutation_rate
        self.mutation_strength = mutation_strength
        self.mutation_percent_genes = mutation_percent_genes
        self.mutation_by_replacement = mutation_by_replacement
        self.elite_fraction = elite_fraction
        self.initialization_method = initialization_method
        self.selection_percentage = selection_percentage
        self.early_stopping_patience = early_stopping_patience
        self.generations = generations
    
    def optimize(self):
        population = initialize_population(self.population_size, (-5.12, 5.12), 10, method=self.initialization_method)
        fitness_values = np.array([cached_objective_function(ind) for ind in population])

        best_solution = None
        best_fitness = float('inf')
        no_improvement_count = 0

        global_start_time = time.time()

        for generation in range(self.generations):
            selected_population, selected_fitness = select_top_individuals_with_elitism(
                population, fitness_values, self.selection_percentage, elite_count=int(self.elite_fraction * self.population_size)
            )
            num_offspring = int(len(selected_population) * self.offspring_ratio)
            offspring = generate_offspring(selected_population, num_offspring)
            offspring = mutate(offspring, self.mutation_rate, self.mutation_strength, self.mutation_percent_genes,
                               self.mutation_by_replacement, (-5.12, 5.12))

            combined_population = np.concatenate([selected_population, offspring])
            combined_fitness = np.array([cached_objective_function(ind) for ind in combined_population])

            final_population, final_fitness = select_top_individuals_with_elitism(
                combined_population, combined_fitness, self.population_size / len(combined_population),
                elite_count=int(self.elite_fraction * self.population_size)
            )

            population = final_population
            fitness_values = final_fitness

            current_best_fitness = np.min(final_fitness)
            current_best_solution = final_population[np.argmin(final_fitness)]

            if current_best_fitness < best_fitness:
                best_fitness = current_best_fitness
                best_solution = current_best_solution
                no_improvement_count = 0
            else:
                no_improvement_count += 1

            if no_improvement_count >= self.early_stopping_patience:
                break

        elapsed_time = time.time() - global_start_time
        return best_solution, best_fitness, generation + 1, elapsed_time

# Генерация случайных гиперпараметров
def generate_random_hyperparameters():
    hyperparameters = {
        "population_size": random.randint(100, 500),
        "offspring_ratio": random.uniform(1.5, 3.0),
        "mutation_rate": random.uniform(0.1, 0.5),
        "mutation_strength": random.uniform(0.1, 0.5),
        "mutation_percent_genes": random.uniform(0.1, 0.2),
        "mutation_by_replacement": random.choice([True]),
        "elite_fraction": random.uniform(0.05, 0.2),
        "initialization_method": random.choice(['random', 'lhs'])
    }
    return hyperparameters

# Количество внешних и внутренних прогонов
num_outer_runs = 30 # разные гиперпараметры
num_inner_runs = 30 # одинаковые гиперпараметры
summary_results = []
header_saved = False
csv_filename = 'cust_rastrigin_upd.csv'

# Внешний цикл для различных гиперпараметров
for outer_run in range(num_outer_runs):
    print(f"\n=== Внешний прогон {outer_run + 1} ===\n")
    
    hyperparams = generate_random_hyperparameters()
    
    results = []

    # Внутренний цикл для одного набора гиперпараметров
    for inner_run in range(num_inner_runs):
        optimizer = GeneticAlgorithmOptimizer(
            param_ranges={
                "x0": (-5.12, 5.12),
                "x1": (-5.12, 5.12),
                "x2": (-5.12, 5.12),
                "x3": (-5.12, 5.12),
                "x4": (-5.12, 5.12),
                "x5": (-5.12, 5.12),
                "x6": (-5.12, 5.12),
                "x7": (-5.12, 5.12),
                "x8": (-5.12, 5.12),
                "x9": (-5.12, 5.12),
            },
            **hyperparams,
            generations=100,
            selection_percentage=0.5,
            early_stopping_patience=10
        )

        best_solution, best_fitness, best_generation, best_generation_time = optimizer.optimize()

        run_data = {
            'Значение функции Растригина': best_fitness,
            'Лучшая итерация': best_generation,
            'Время до лучшей итерации': best_generation_time
        }
        results.append(run_data)

    df_results = pd.DataFrame(results)
    median_fitness = df_results['Значение функции Растригина'].median()
    mode_iteration = df_results['Лучшая итерация'].mode()[0]

    summary_row = {
        'Медиана значения функции Растригина': median_fitness,
        'Мода лучшей итерации': mode_iteration,
        'Медиана времени до лучшей итерации': df_results['Время до лучшей итерации'].median()
    }

    for param, value in hyperparams.items():
        summary_row[param] = value

    # Добавляем строку в csv-файл
    df_summary_row = pd.DataFrame([summary_row])
    df_summary_row.to_csv(csv_filename, mode='a', header=not header_saved, index=False, float_format='%.6f')
    header_saved = True  # Чтобы сохранить заголовок только один раз

    # Вывод строки перед записью в файл (или после)
    print(f"Результаты для внешнего прогона {outer_run + 1}:")
    print(df_summary_row.to_string(index=False))

    # Очистка кэша и ручное удаление крупных объектов
    cached_objective_function.clear_cache()
    del df_results, df_summary_row, results
    gc.collect()

# Итоговая таблица не создается, поскольку все записывается в файл
print(f"\nВсе результаты сохранены в файл '{csv_filename}'.")



=== Внешний прогон 1 ===

Результаты для внешнего прогона 1:
 Медиана значения функции Растригина  Мода лучшей итерации  Медиана времени до лучшей итерации  population_size  offspring_ratio  mutation_rate  mutation_strength  mutation_percent_genes  mutation_by_replacement  elite_fraction initialization_method
                            0.334563                   100                            1.195499              231         1.881476       0.215606           0.340511                0.148254                     True        0.126963                random

=== Внешний прогон 2 ===

Результаты для внешнего прогона 2:
 Медиана значения функции Растригина  Мода лучшей итерации  Медиана времени до лучшей итерации  population_size  offspring_ratio  mutation_rate  mutation_strength  mutation_percent_genes  mutation_by_replacement  elite_fraction initialization_method
                            0.027891                   100                            2.472513              467         1.6197