# Grundlegende Algorithmen und Verfahren der KI - Programmentwurf 12
Q1 2024 TIK

Gruppe:
- Simon Seifert
- Jonas Preusch
- Daniel Zängler
- Johannes Hoppe

## Aufgabe: Partitionsproblem
Gegeben ist eine Menge von $N$ Objekten mit den Größen $a_1,a_2,...,a_N \in \mathbb{N}$. Gesucht ist eine Teilmenge $P \subseteq \{1,2,...,N\}$ der Objekte, für welche $\sum{_{i \in P}}\ a_i = \sum{_{i \notin P}}\ a_i$ gilt.
Mit einem evolutionären Algorithmus soll eine solche Partition $P$ gefunden werden.

Mit einem evolutionären Algorithmus soll eine derartige Partition P gefunden werden. Für die
Erstellung einer Implementierung sind folgende Anforderungen zu beachten.
- Ausgangspunkt ist eine Folge von Größen $a_1, a_2,..., a_N \in \mathbb{N}$.
- Als genetische Repräsentation eines Lösungsvorschlags dient eine binäre Folge der Länge $N$. Dabei gilt $i \in P$ genau dann, wenn die $i$-te Stelle der Folge eine 1 ist.
- Implementieren Sie die Transformationen Selektion, Mutation und Rekombination.
- Wählen Sie $N \geq 10$ und starten Sie mit einer initialen Population von mindestens 50
Individuen.

---

## Programmentwurf:
Für die Lösung dieses Problems haben wir uns für die Pogrammiersprache Python entschieden.
Unser Code ist dabei in die folgenden Dateien aufgeteilt:
- [`main.py`](main.py)
- [`back_end.py`](back_end.py)
- [`algorithm.py`](algorithm.py)
- [`util.py`](util.py)
- [`config_storage.yaml`](config_storage.yaml)
- [`set_storage.yaml`](set_storage.yaml)

Sowie diesem Python Notebook, im Jupyter .ipynb Format, welches der Dokumentation des Entwurfs dient.

### Datei erläuterung:

`main.py` dient zum ausführen des Programms.

`back_end.py` enthält die grundlegenden Strukturen, welche wir für einen evolutionären Algorithmus benötigen (Selektion, Mutation, Rekombination), sowie Funktionen zur bestimmung der 'Fitness' der Lösungen.

`algorithm.py` enthält den eigentlichen Teil des Algorithmus und ruft dafür die Funktionen aus `back_end.py` auf.

`util.py` enthält Funktionen, die das Speichern/Laden von verschieden Konfigurationen ermöglichen, sowie eine Funktion um die Ergebnisse schlussendlich in einem 'Pie-Chart' darzustellen.

`config_storage.yaml` und `set_storage.yaml` enthalten die Initiale Konfiguration und Datensatz, für das Programm.

---

## Code:

Die folgende Code Erläuterung findet bestmöglich in der selben Reihenfolge statt, die auch das Programm bei Ausführung folgt.
NOTE: Markdown Links zu verschiedenen Abschnitten scheinen nicht komplett zu funktionieren -> Man landet immer etwas überhalb des eigentlichen Abschnittes

#### `main()`
Beim Ausführen des Programms werden die Funktionen aus `algorithm.py` und `util.py` importiert und die `main()` Funktion aufgerufen, vorrausgesetzt die Datei ist `"__main__"`, also wenn sie ist die Datei, die von Python gestartet wird.

Zunächst wird mit [`load_set()`](#load_set) der Datensatz, über welchen der Algorithmus laufen soll, sowie mit [`load_config()`](#load_config) die initialen Werte aus `config_storage.yaml` geladen.
Diese sind:

- `amount_generations` : Anzahl an Generationen, für die der Algorithmus laufen soll
- `fitness_selection` : Anzahl an Lösungen einer Generation, die verändert und für die nächste verwendet werden sollen.
- `generation_size` : Anzahl der Individuen pro Generation
- `good_enough_number` : Fitness-Wert, ab dem eine (ausreichende / 'good enough') Lösung gefunden wurde, sodass vor erreichen der festgelegten maximalen Laufzeit abgebrochen werden kann.
- `mutation_factor` : Anzahl der Mutationen, pro Durchführung der Mutations-Funktion
- `stagnation_threshold` : Wert, mit dem eine Stagnation der Population definiert wird (siehe [`break_stagnation()`](#break_stagnation))

In [None]:
import algorithm
import util


def main():
    # One can create a new configuration one load one of the previously used once.
    initial_set = util.random_set(100, 10, 400)
    print(initial_set)
    util.save_set(initial_set)
    generation_size = 50
    amount_generations = 50
    fitness_selection = 5
    mutation_factor = 1
    stagnation_threshold = 1
    num_runs = 10
    good_enough_number = 1
    run_config_1 = util.RunConfig(generation_size=generation_size, amount_generations=amount_generations,
                                fitness_selection=fitness_selection, mutation_factor=mutation_factor,
                               stagnation_threshold=stagnation_threshold, num_runs=num_runs, good_enough_number=good_enough_number)
    util.save_config(run_config=run_config_1)
    print(util.get_all_configs())
    set_1 = util.load_set(1)
    run_config2 = util.load_config(1)
    results = algorithm.run_algorithm_multiple_times(set_1, *run_config2.values())
    util.plot_generations_needed(results)

if __name__ == "__main__":
    main()

Außerdem sind hier noch Funktionen zum Generieren eines zufälligen Datensatzes, und zum Speichern dieses, sowie speichern einer neuen Konfiguration. (siehe [Utilities](#utilities))

[`run_algorithm_multiple_times()`](#run_algorithm_multiple_times) wird mit den zuvor geladenen Werten aufgerufen.
Und [`plot_generations_needed`](#plot_generations_needed) wird zur Ausgabe der Ergebnissene verwendet.

#### `run_algorithm_multiple_times()`
`run_algorithm_multiple_times()` ruft den eigetntlichen Algorithmus mit `run_evolutionary_algorithm()` mehrfach auf, bis zur Anzahl definitert in `num_runs`. Dabei werden die Restlichen Werte einfach an die Funktion weitergegeben.
Die Ergebnisse werden in einer Liste gespeichert und zurück gegeben.  

In [None]:
def run_algorithm_multiple_times(initial_set, generation_size, amount_generations, fitness_selection, mutation_factor,
                                 stagnation_threshold, num_runs, good_enough_number):
    results = []

    for _ in range(num_runs):
        result = run_evolutionary_algorithm(initial_set, generation_size, amount_generations, fitness_selection,
                                            mutation_factor, stagnation_threshold,
                                            good_enough_number)
        results.append(result)

    return results

In [None]:
import back_end


def run_evolutionary_algorithm(initial_set, amount_solutions, generations, fitness_selection=10, mutation_factor=4,
                               stagnation_threshold=3, good_enough_number=2):
    # Initialize the population
    population = back_end.initialization(initial_set, amount_solutions)
    best_solution = back_end.sort_by_fitness(population)[0]  # Get the best solution from the sorted population
    stagnation_tracker = 0

    # Evolve the population for the specified number of generations
    for gen in range(generations):
        population = back_end.sort_by_fitness(population)
        # if we achieved a good enough solution abort
        if best_solution.fitness <= good_enough_number:  # Check the fitness of the best solution
            print(f"found good enough solution: {best_solution.fitness}")

            return {"solution": best_solution, "generations_needed": gen + 1}
        if stagnation_tracker >= stagnation_threshold:
            print("stagnation threshold reached")
            population = back_end.break_stagnation(population)
            stagnation_tracker = 0
        stagnation_tracker += 1
        # Select the best individuals fraction
        population = back_end.best_fitness_selection(population, fitness_selection)
        print(f"best Score: {best_solution.fitness}")
        # Apply mutation to the selected individuals
        population = back_end.mutate(population, mutation_factor=mutation_factor)

        # Perform crossover to generate the next generation
        population = back_end.random_crossover(population, amount_solutions)
        population = back_end.sort_by_fitness(population)

        # Update the best solution if necessary
        if population[0].fitness < best_solution.fitness:  # Check the fitness of the best solution
            best_solution = population[0]

    return {"solution": best_solution, "generations_needed": generations}

#### `initialization()`

-> Bringt Werte aus dem initialen Datensatz in Form der Klasse [`Solution`](#solution)

In [None]:
def initialization(initial_set, amount_solutions):
    return [Individual(initial_set) for _ in range(amount_solutions)]

#### `Individual`
Die Klasse besteht aus den Teilen:
- `initial_set` : Initialer Datensatzt
- `solution` : Zufällig generierte binäre Abfolge, durch [`generate_solutions()`](#generate_solutions)
- `total_sum` : Summe der Werte aus `initial_set` (zur Berechnung der Fitness)
- `partial_sum` : Summe der Werte aus `initial_set`, die die Belegung '1' in der Binären Abfolge haben (zur Berechnung der Fitness)
- `fitness` : Bewertung der Lösung im Vergleich zum ideal Ergebnis, durch [`calculate_fitness()`](#calculate_fitness)

Auch enthält sie einen Teil um als 'String' ausgegeben werden zu können.

In [None]:
import random

class Individual:
    def __str__(self):
        return (
            f"Solution: initial_set={self.initial_set}, solution={self.solution}, total_sum={self.total_sum},"
            f"partial_sum={self.partial_sum}, fitness={self.fitness}"
        )

    # first implementation of the fitness function
    def calculate_fitness(self):
        if (self.partial_sum - self.total_sum / 2) == 0:
            return 100
        fitness = (abs(self.partial_sum - (self.total_sum / 2)))
        return fitness

    def calculate_partial_sum(self):
        partial_sum = 0
        for i in range(len(self.initial_set)):
            if self.solution[i] == 1:
                partial_sum += self.initial_set[i]
        return partial_sum

    def generate_solution(self):
        return [random.choice([0, 1]) for _ in self.initial_set]

    # initial Set is always bigger 10 so no division by 0 possible in fitness
    def __init__(self, initial_set):
        self.initial_set = initial_set
        self.solution = self.generate_solution()
        self.total_sum = sum(initial_set)
        self.partial_sum = self.calculate_partial_sum()
        self.fitness = self.calculate_fitness()

#### `break_stagnation()`
-> Ersetzt einen Teil der Population durch neue zufällige Individuen, zur verhinderung von Stagnation

In [None]:
def break_stagnation(population, new_population_output=20):
    new_population = initialization(population[0].initial_set, new_population_output)
    population = population[
                 :-new_population_output
                 ]  # Remove the last new_population_output elements
    population.extend(new_population)  # Append the new_population to the population
    return population

#### `generate_solution()`
-> Generiert eine pseudo-zufällige binäre Abfolge

In [None]:
def generate_solution(self):
        return [random.choice([0, 1]) for _ in self.initial_set]

#### `calculate_parital_sum()`
-> Berechnet die Summe der Elemente, mit Belegung '1'

In [None]:
def calculate_partial_sum(self):
        partial_sum = 0
        for i in range(len(self.initial_set)):
            if self.solution[i] == 1:
                partial_sum += self.initial_set[i]
        return partial_sum

#### `calculate_fitness()`
-> Berechnung der Fitness



In [None]:
def calculate_fitness(self):
        if (self.partial_sum - self.total_sum / 2) == 0:
            return 100
        fitness = (abs(self.partial_sum - (self.total_sum / 2)))
        return fitness

#### `best_fitness_selection()`
Diese Funktion ist für den Selektion in unserem Algorithmus zuständig.
`best_fitness_selection()` sortiert die Lösungen anhand ihrer 'Fitness' und gibt standarmäßig die Top-10 dieser zurück.

In [None]:
def best_fitness_selection(solutions, output_amount):
    return sort_by_fitness(solutions)[:output_amount]

#### `sort_by_fitness`

In [None]:
def sort_by_fitness(solutions):
    sorted_list = sorted(solutions, key=lambda x: x.fitness, reverse=False)
    return sorted_list

#### `mutate()`
Diese Funktion ist für Mutation in unserem Algorithmus zuständig.
`mutate()` ändert die Belegung zufälliger Stellen der binären Abfolge (0->1 oder 1->0)

#### `run_evolutionary_algorithm()`
`run_evolutionary_algorithm()` ist die hautsächliche Funktion des Programms.

1. Variablen werden definiert und zunächst gesetzt:
    - Die `population` (durch [`initialization()`](#initialization)) geladen.
    - Die Lösung mit der momentan besten Fitness wird als `best_solution` gesetzt.
    - `stagnation_tracker` wird definiert.
2. Die Funktionen werden `generation_size` oft wiederholt:
    - Die Population wird nach Fitness sortiert.
    - Falls bereits eine Lösung mit ausreichender Fitness gefunden wurde, wird diese zurück gegeben und die Funktion beendet.
    - `stagnation_tracker` wird überprüft, dass im Fall einer Stagnation der Population die Funktion [`break_stagnation()`](#break_stagnation) ausgeführt wird, welche Individuen in der Pouplation durch neue zufällige ersetzt.
    - Die besten Individuen werden für die Mutation und Rekombination ausgewählt ([`best_fitness_selection()`](#best_fitness_selection)
    - Diese Individuen werden mutiert ([`mutate()`](#mutate))
    - Diese Individuen werden rekombiniert ([`random_crossover()`](#mutate))
    - Danach erneut sortiert
    - Falls die beste dadurch entstandene Lösung besser ist, als die momentan Beste in `best_solution`, wird `best_solution` mit der neuen überschrieben.

In [None]:
def mutate(solutions, mutation_factor=1):
    # chose a mutation_factor between 0 and length of the solution. we will then choose random positions of solution to
    # redraw the values,the amount based on the mutation_factor
    # we use range() plus len() to choose which positions of the solutions to change
    for solution in solutions:
        mutations = random.sample(range(len(solution.solution)), mutation_factor)
        for i in mutations:
            solution.solution[i] == abs(solution.solution[i]-1)
                
    return solutions

#### `random_crossover()`
Diese Funktion ist für die Rekombination in unserem Algorithmus zuständig.
`random_crossover()` wählt aus der Population zufällig zwei 'Eltern' aus, von denen zufällige 'Gen-Paare' miteinader getauscht werden um eine neue Lösung zu generieren.

In [None]:
def random_crossover(parent_solutions, num_children):
    """
    Perform random crossover between a list of parent solutions.

    This method randomly selects each gene from either parent with equal probability
    to produce a specified number of child solutions.

    Args:
    - parent_solutions (list): A list of parent solutions, where each solution is represented as a list.
    - num_children (int): The number of child solutions to generate.

    Returns:
    - list: A list of child solutions resulting from the crossover operation.
    """

    children = []
    for _ in range(num_children):
        # Randomly select parents
        parent1 = random.choice(parent_solutions)
        parent2 = random.choice(parent_solutions)

        # Perform random crossover
        child = Individual(initial_set=parent1.initial_set)
        child.solution = [
            random.choice(gene_pair)
            for gene_pair in zip(parent1.solution, parent2.solution)
        ]

        children.append(child)

    return children

### Utilities:

#### `load_sets()`
-> Einlesen der Daten aus `set_storage.yaml`

TODO: Rename `load_sets()`, because of simmilar function name to `load_set()`

In [None]:
import yaml
from typing import List


def load_sets() -> List[List[int]]:
    with open(SetStorage, "r") as infile:
        return yaml.safe_load(infile) or []
    
    
SetStorage = "set_storage.yaml"

#### `save_set()`
-> Speichern von Daten in `set_storage.yaml`

In [None]:
def save_set(new_set: List[int]) -> None:
    sets = load_sets()
    sets.append(new_set)
    with open(SetStorage, "w") as outfile:
        yaml.dump(sets, outfile, default_flow_style=False, allow_unicode=True)

#### `save_config()`
-> Speichern von Konfigurationen in `config_storage.yaml`

In [None]:
ConfigStorage = "config_storage.yaml"


def save_config(run_config) -> None:
    config_to_save = run_config.to_dict()
    configs = get_all_configs()
    configs.append(config_to_save)

    with open(ConfigStorage, "w") as outfile:
        yaml.dump(configs, outfile, default_flow_style=False, allow_unicode=True)

#### `load_set()`
-> Laden eines gespeicherten Datensatzes

In [None]:
def load_set(number: int) -> List[int]:
    sets = load_sets()
    if 0 <= number < len(sets):
        return sets[number]
    else:
        return None

#### `load_config()`
-> Laden einer gespeicherten Konfiguration

In [None]:
def load_config(number: int) -> dict:
    with open(ConfigStorage, "r") as infile:
        configs = yaml.safe_load(infile)
        if configs and 0 <= number < len(configs):
            return configs[number]
        else:
            return {}

#### `get_all_configs()`

In [None]:
def get_all_configs() -> List[dict]:
    with open(ConfigStorage, "r") as outfile:
        return yaml.safe_load(outfile) or []

#### `plot_generations_needed()`
-> Darstellen der Ergebnisse als Pie-Chart, mit Hilfe von 'matplotlib'

In [None]:
from matplotlib import pyplot as plt


def plot_generations_needed(results):
    generations_needed = [result["generations_needed"] for result in results]
    num_runs = len(results)

    # Count the frequency of each generation needed
    generation_counts = {
        gen: generations_needed.count(gen) for gen in set(generations_needed)
    }

    # Sort the generations by their counts
    sorted_generations = sorted(
        generation_counts.items(), key=lambda x: x[1], reverse=True
    )
    labels, counts = zip(*sorted_generations)

    max_generations = max(generations_needed)
    max_label = f"Max Gens: {max_generations}"

    # Create a dictionary to store the average fitness score for each generation
    average_fitness_scores = {
        gen: sum(
            result["solution"].fitness
            for result in results
            if result["generations_needed"] == gen
        ) / generation_counts[gen]
        for gen in set(generations_needed)
    }

    # Create the pie chart
    plt.figure(figsize=(8, 8))
    patches, _, _ = plt.pie(
        counts,
        labels=labels,
        autopct="%1.1f%%",
        startangle=140,
        textprops={"fontsize": 12},
    )

    # Add the average fitness score inside each pie slice
    for patch, (gen, count) in zip(patches, sorted_generations):
        patch.set_label(f"{gen}\n(Avg. Fitness: {average_fitness_scores[gen]:.2f})")

    plt.title(
        f"Distribution of Generations Needed\nTotal Runs: {num_runs}\n{max_label}",
        fontsize=16,
    )
    plt.axis("equal")  # Equal aspect ratio ensures that pie is drawn as a circle
    plt.subplots_adjust(left=0.1, right=0.7, top=0.9, bottom=0.1)  # Adjust layout

    plt.legend(
        title="Average Fitness Score",
        loc="center left",
        fontsize=10,
        bbox_to_anchor=(1, 0.5),
    )

    plt.show()

---

## Tests

### Aufgaben:
- Überlegen Sie sich ein Testszenario und führen Sie die Tests durch. Bewerten Sie die
Ergebnisse.
- Lassen Sie die Evolution über eine variable Anzahl an Generationen laufen, um eine Lösung des Problems zu finden Dokumentieren und interpretieren Sie die erhaltenen Ergebnisse.

Die Visualisierung der Ergebnisse durch plot_generations_needed in util.py zeigt, wie viele Generationen benötigt wurden, um eine Lösung zu finden, und die durchscnittliche Fitness der Generationen.

### Definitionen:
#### Fitness:
Die Fitness wurde als absolute Differenz der Summe der beiden Teilmengen definiert. Eine kleinere Fitness ist also besser, wobei 0 das optimalste Ergebnis darstellt. Die Fitness einer Generation entspricht die der besten Lösung.

#### Good_enough_factor:
Da es nicht immer eine optimale Lösung gibt, wurde ein Cutoff-Punkt eingefügt, an dem der Algorithmus abbricht, sofern das Ergebnis zufriedenstellend ist. Dieser wurde als Zahl definiert, könnte jedoch auch als Prozentanteil der Gesamtsumme festgelegt werden.

### Ergebnisse:
Es wurde eine Funktion implementiert, welche den Algorithmus mehrmals ausführt und die Ergebnisse in einem Pie-Chart visualisiert.
Je nach Menge wurden sehr unterschiedliche Ergebnisse festgestellt. Manchmal konnte nach 50 Generationen (als maximale Anzahl festgelegt) kein zufriedenstellendes Ergebnis erzielt werden. Dies geschah natürlich bei sehr ungünstigen Mengen, bei denen eine Zahl größer als die Summe der restlichen war. Auch bei einer zu großen Anzahl von Elementen in einer Menge traten häufiger unbefriedigende Ergebnisse auf. Bei kleineren Mengen wurden oft zufriedenstellende Ergebnisse erzielt.

Die erste Implementation des Algorithmus beinhaltete lediglich die notwendigen Schritte Evaluierung, Mutation und Rekombination. Nach einigen Durchläufen zeigte sich, dass der Algorithmus in der Lage war, semi-regelmäßig sinnvolle Ergebnisse zu liefern, jedoch nicht konsistent.

Daher wurden einige Optimierungen eingeführt. Durch das Plotten des Fitness-Scores verdeutlichte sich, dass die Fitness generell anstieg, sich jedoch gelegentlich auch verringerte. 
Die Lösung bestand darin, jedes mal das beste Ergebnis zu speichern und der nächsten Generation hinzuzufügen.
Dies bewirkte nicht nur, dass die Fitness nicht mehr sank, sondern auch eine allgemeine Verbesserung.

Das zweite Problem war, dass der Algorithmus öfters bei einem lokalen Minimum stagnierte. Um dieses Problem zu minimieren, wurde ein Stagnations-Threshold hinzugefügt, welcher bei Erreichen die Funktion "break_stagnation" aufruft. Diese Funktion nimmt als Parameter die Population und die Anzahl der zu ersetzenden schlechtesten Lösungen entgegen. Die Funktion gibt dann eine neue Population zurück, bei der die schlechtesten Lösungen durch neue zufällige ersetzt wurden. Es wurde beobachtet, dass dieser Schritt wirksam war, um die Stagnation zu brechen.
Diese Optimierung war hilfreich. Es wurde auch erkannt, dass eine weitere Verbesserung erzielt werden konnte, wenn bis zu ungefähr 90% anstatt der anfänglichen 10% bis 50% der schlechtesten Lösungen ersetzt wurden. Ab 90% jedoch zeigte sich eine Verschlechterung, da die Rekombination mit den besten 10% festgesetzt wurde und bei zufälligen Lösungen die Vorteile der Kombination guter Lösungen verloren gingen.

Bezüglich des Mutationsfaktors wurden ähnliche Beobachtungen gemacht. Ist der Wert zu niedrig, stagniert der Algorithmus vermehrt in einem lokalen Optimum. Ist er jedoch zu hoch, gehen wie bei der Stagnation auch hier die Vorteile guter Lösungen durch zu viel Chaos verloren. Auch hier wurde ein fester Wert gefunden, der für den Umfang von Mengen gut funktionierte. Bei deutlich größeren Mengen (200+) war der Mutationsfaktor relativ zu der Menge zu niedrig. Für einen allgemeineren Ansatz könnte man dies ebenfalls prozentual zusammen mit der floor-funktion lösen.

Abschließend lässt sich zusammenfassen, dass oft ein Mittelmaß für jegliche Parameter gefunden werden muss. Dieses multiparametrische Optimierungsproblem ist schwer zu lösen. Um diesen wiederum zu optimieren, wäre erneut ein evolutionärer Algorithmus nötig. Dies könnte ad infinitum weitergeführt werden, um sich dem optimalen Ergebnis anzunähern. Jedoch wurde festgestellt, dass durch die Deklarierung des "good_enough_threshold" oft deutlich schneller ein gutes Ergebnis erzielt werden konnte. Es wird angenommen, dass nach der Evaluation mehrerer Durchläufe und deren Dokumentation, die Treshhold für dieses Projekt erreicht wurde.
