Copyright **`(c)`** 2024 Andrea Bioddo `<s332145@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# Imports

In [16]:

import numpy as np
from icecream import ic
import operator
import random
import math
import matplotlib.pyplot as plt
import copy
import time
import inspect
import networkx as nx
from networkx.drawing.nx_pydot import graphviz_layout

## Costants

In [17]:
POPULATION_SIZE = 100_000
ELITE_SIZE = 50_000
NUM_RECREATE = POPULATION_SIZE//10
NUM_GENERATIONS = 100
STOP_CASE = 0
DEPTH_MAX = 6
MUTATION_RATE = 0.8
CONSTMAX = 100
CONSTMIN = -100

## Set definition

In [20]:
def save_to_file(output_file, best_tree, num_generation_done, ended_stopcase, mse_val):
    with open(output_file, 'w') as f:
        f.write("Params:\n")
        f.write(f"POPULATION_SIZE: {POPULATION_SIZE}\n")
        f.write(f"ELITE SIZE: {ELITE_SIZE}\n")
        f.write(f"DEPTH_MAX: {DEPTH_MAX}\n")
        f.write(f"Number of gens completed: {num_generation_done}\n")
        f.write(f"Stop case termination: {ended_stopcase}\n\n")
        
        f.write("Best tree found:\n")
        f.write(f"{str(best_tree)}\n\n")
        
        f.write("MSE VALIDATION: \n")
        f.write(f"{str(mse_val)}\n\n")

    print(f"Saved file in {output_file}")

In [21]:
def draw(node):
    G = nx.DiGraph()
    for n1 in list(node.subtree):
        for n2 in list(n1.children):
            G.add_edge(id(n1), id(n2))

    try:
        pos = graphviz_layout(G, prog="dot")
    except Exception as e:
        raise RuntimeError(f"Graphviz layout failed: {e}")
    
    print("Nodes:", G.nodes())
    print("Edges:", G.edges())

    nx.draw_networkx_nodes(
        G,
        nodelist=[id(n) for n in node.subtree if len(n.children) > 0],
        pos=pos,
        node_size=800,
        node_color='lightpink',
        node_shape='o'
    )
    nx.draw_networkx_nodes(
        G,
        nodelist=[id(n) for n in node.subtree if len(n.children) == 0],
        pos=pos,
        node_size=500,
        node_color='lightblue',
        node_shape='s'
    )
    nx.draw_networkx_labels(
        G,
        pos=pos,
        labels={id(n): n.short_name for n in node.subtree},
    )
    nx.draw_networkx_edges(
        G,
        pos=pos,
        node_size=800,
    )


## Structure Definition

In [22]:
class Node:
    def __init__(self, value, children=None):
        if children is not None:
            assert all(isinstance(s, Node) for s in children), "Panic: Children must be `Node`"
        self.value = value
        self.children = children if children else []

    def draw(self):
        try:
            return draw(self)
        except Exception as msg:
            print(f"Drawing not available ({msg})", UserWarning, 2)
            return None

    def __len__(self):
        return 1 + sum(len(c) for c in self.children)

    def evaluate(self, variables):
        # evalueate based on nodetype
        if callable(self.value): 
            children_values = [child.evaluate(variables) for child in self.children]
            if self.value == operator.truediv:
                if children_values[1] == 0:
                    return 1e16
            try:
                result = self.value(*children_values)
                if isinstance(result, complex):
                    return 1e16
                return result
            except (OverflowError, ValueError, Exception):
                return 1e16
        elif isinstance(self.value, str): 
            return variables[self.value]
        else:  # is it is a numeric values
            return self.value

    def __str__(self):
        return self.long_name
    
    def copy(self):
        copied_children = [child.copy() for child in self.children] 
        return Node(self.value, copied_children)
    
    @property
    def subtree(self):
        result = set()
        self._get_subtree(result)
        return result

    def _get_subtree(self, bunch: set):
        bunch.add(self)
        for c in self.children:
            c._get_subtree(bunch)

    @property
    def short_name(self):
        # necessary in order to use "add" and not the all <operator.add> etc..
        if callable(self.value):            
            return self.value.__name__
        return str(self.value)

    @property
    def is_leaf(self):
        return len(self.children) == 0

    @property
    def long_name(self):
        if self.is_leaf:
            return self.short_name
        return f'{self.short_name}(' + ', '.join(c.long_name for c in self.children) + ')'

In [23]:
def generate_random_tree(variables, depth=DEPTH_MAX, constant_max=CONSTMAX, constant_min=CONSTMIN):
    if depth == 0 or (depth > 1 and random.random() < 0.3):
        # leaf (costant or variable)
        if random.random() < 0.5:
            return Node(random.choice(variables))  # variable
        else:
            return Node(random.uniform(constant_min, constant_max))  # costant in range (-10, 10)
    else:
        # Node
        if random.random() < 0.3:  # mono operator
            operation = random.choice([math.sin, math.cos, operator.neg, np.abs, math.log10, math.exp, math.sqrt])
            return Node(operation, [generate_random_tree(variables, depth - 1, constant_max, constant_min)])  # only one child
        else:  # binary operator
            operation = random.choice([operator.add, operator.sub, operator.mul, operator.truediv])
            return Node(operation, [generate_random_tree(variables, depth - 1, constant_max, constant_min) for _ in range(2)])  # two children


## Goal function

In [24]:
def fitness(tree, x_train, y_train, variables):
    try:
        predictions = np.array([tree.evaluate({variables[i]: x[i] for i in range(len(variables))}) for x in x_train.T])
        mse = np.mean((predictions - y_train) ** 2)
        return mse
    except Exception as e:
        return 1e16
    

def fitness_parsimony(tree, x_train, y_train, variables):
    tree_size = count_nodes(tree) 

    try:
        # MSE
        predictions = np.array([tree.evaluate({variables[i]: x[i] for i in range(len(variables))}) for x in x_train.T])
        mse = np.mean((predictions - y_train) ** 2)

        # add parsimony coefficient in order to add penalty to tree too complex
        parsimony_coefficient = 0.001 # scale factor
        fitness_value = mse + parsimony_coefficient * tree_size 
        return fitness_value

    except Exception as e:
        return 1e16
    

def count_nodes(tree):
    if not tree.children:  
        return 1
    return 1 + sum(count_nodes(child) for child in tree.children)


## Simplify tree

In [25]:
# support function that reduce complexity with the tree
def simplify_tree(tree):
    if tree is None or not tree.children:
        return tree

    simplified_children = [simplify_tree(child) for child in tree.children]
    tree.children = simplified_children

    # only attempt to evaluate if all children are numeric constants
    # and the node is  value is a callable like operations
    if (callable(tree.value) and 
        all(child.children == [] and isinstance(child.value, (int, float)) 
            for child in tree.children)):
        try:
            # Evaluate the subtree
            values = [child.value for child in tree.children]
            result = tree.value(*values)
            # Only replace with constant if the result is a number
            if isinstance(result, (int, float)) and not isinstance(result, bool):
                return Node(result)
        except (TypeError, ZeroDivisionError, ValueError, OverflowError):
            pass

    return tree

# Selection of population/crossover/mutation

In [26]:
# recusion in order to count tree depth
def tree_depth(tree):
    if not tree.children:
        return 1
    return 1 + max(tree_depth(child) for child in tree.children)

def tournament_selection(population, fitnesses, num_selected, tournament_size=3):
    selected = []
    for _ in range(num_selected):
        # choose random tournament player
        tournament_indices = random.sample(range(len(population)), tournament_size)
        # find best player
        best_index = min(tournament_indices, key=lambda idx: fitnesses[idx])
        selected.append(population[best_index])
    return selected

def roulette_wheel_selection(population, fitnesses, num_selected):
    total_fitness = sum(fitnesses)
    try:
        if total_fitness == 0:
            return random.choices(population, k=num_selected)
        elif total_fitness == float("inf"):
            return random.choices(population, k=num_selected)
        # calc. relativives probability
        probabilities = [(total_fitness - f) / total_fitness for f in fitnesses]  # lower fitness = more probability
        selected = random.choices(population, probabilities, k=num_selected)
        return selected
    except Exception:
        return []

def select_population(population, fitnesses, num_selected, elite, tournament_size=5):
    if random.random() < 0.5:
        selected = tournament_selection(population, fitnesses, num_selected - ELITE_SIZE, tournament_size)
    else :
        selected = roulette_wheel_selection(population, fitnesses, num_selected - ELITE_SIZE)
    selected += elite
    return selected


In [27]:
# function to create the worst tree based on fitness
def recreate_worst_trees(population, fitnesses, variables, num_to_recreate=10, max_depth=DEPTH_MAX):
    worst_indices = np.argsort(fitnesses)[-num_to_recreate:]
    for idx in worst_indices:
        population[idx] = generate_random_tree(variables, max_depth)
    return population


def count_nodes(tree):
    if not tree.children:
        return 1
    return 1 + sum(count_nodes(child) for child in tree.children)

def crossover(tree1, tree2):

    max_depth = DEPTH_MAX

    def select_random_subtree(tree):
        if not tree.children or random.random() < 0.3:
            return tree
        return random.choice(tree.children)

    def replace_subtree(tree, target, replacement):
        if tree is target:
            return replacement
        new_children = [replace_subtree(child, target, replacement) for child in tree.children]
        tree.children = new_children
        return tree

    subtree1 = select_random_subtree(tree1)
    subtree2 = select_random_subtree(tree2)

    tree1_copy = copy.deepcopy(tree1)
    tree2_copy = copy.deepcopy(tree2)

    tree1_copy = replace_subtree(tree1_copy, subtree1, subtree2)
    tree2_copy = replace_subtree(tree2_copy, subtree2, subtree1)

    if tree_depth(tree1_copy) > max_depth:
        tree1_copy = subtree1 
    if tree_depth(tree2_copy) > max_depth:
        tree2_copy = subtree2

    return tree1_copy, tree2_copy

def tree_depth(tree):
    if not tree.children:
        return 1
    return 1 + max(tree_depth(child) for child in tree.children)


def mutate(tree, variables, mutation_rate=0.7, max_depth=DEPTH_MAX):
    if random.random() < mutation_rate:
        return generate_random_tree(variables, max_depth)

    tree_copy = copy.deepcopy(tree)
    if tree_copy.children:
        child_idx = random.randint(0, len(tree_copy.children) - 1)
        tree_copy.children[child_idx] = mutate(tree_copy.children[child_idx], variables, mutation_rate, max_depth - 1)

    return tree_copy

## Generic Algorithm

In [28]:
def genetic_alg(x, y, population, variables, fitness_passed):
    fitness_avg_history = []
    fitness_min_history = []
    average_size_history = []
    average_depth_history = []

    num_generation_done = 0
    fitness_func = fitness_passed
    
    ended_stopcase = False
    
    num_same_fitness = 0
    old_best_fitness = 1e6
    
    
    for generation in range(NUM_GENERATIONS):
        num_generation_done += 1
        # find fitness for each memmber of population
        fitnesses = [fitness_func(tree, x, y, variables) for tree in population]

        if min(fitnesses) == old_best_fitness:
            num_same_fitness += 1
        else:
            num_same_fitness = 0
            

        # Save for statistics
        fitness_avg_history.append(np.mean(fitnesses))
        fitness_min_history.append(min(fitnesses))
        
        average_depth = np.mean([tree_depth(tree) for tree in population])
        average_size = np.mean([count_nodes(tree) for tree in population])
        average_depth_history.append(average_depth)
        average_size_history.append(average_size)
        
        # save best tree
        best_tree_index = np.argmin(fitnesses)
        best_tree = population[best_tree_index]
            
        print(f"/*************\\\ngeneration: {generation}\nnum_same_fitness: {num_same_fitness}\nold_best_fitness: {old_best_fitness}\nmin(fitnesses): {min(fitnesses)}\naverage_depth: {average_depth}\naverage_size: {average_size}\nstr(best_tree): {str(best_tree)}\n\\*************/\n")
        
        # update old best fitness
        old_best_fitness = min(fitnesses)

        if generation % 5 == 0: # recreate worst trees each 10 gens. 
            population = recreate_worst_trees(population, fitnesses, variables, num_to_recreate=NUM_RECREATE)
            fitnesses = [fitness_func(tree, x, y, variables) for tree in population]
            
        elif num_same_fitness >= 5:  #more aggressive recreate worst trees each 10 same fitness (to reduce and avoid bottle neck).
            population = recreate_worst_trees(population, fitnesses, variables, num_to_recreate=NUM_RECREATE)
            fitnesses = [fitness_func(tree, x, y, variables) for tree in population]

        # best case
        if min(fitnesses) == STOP_CASE:
            print("Finished earlier because got the best MSE possible = 0")
            print(f"generation: {generation}, min(fitnesses): {min(fitnesses)}, max(fitnesses): {max(fitnesses)}")
            ended_stopcase = True
            break
        
        # select the best members 
        elite_indices = np.argsort(fitnesses)[:ELITE_SIZE]
        elite = [population[i] for i in elite_indices]
        
        # choose next population
        selected = select_population(population, fitnesses, POPULATION_SIZE, elite)
        
        # create new population, keeping the best one
        next_population = elite.copy() 
        next_population.append(copy.deepcopy(best_tree))
        
        while len(next_population) < POPULATION_SIZE:
            parent1, parent2 = random.sample(selected, 2)
            child1, child2 = crossover(parent1, parent2)
            parent1Mutated = mutate(child1, variables)
            next_population.append(parent1Mutated)
            if len(next_population) < POPULATION_SIZE:
                parent2Mutated = mutate(child2, variables)
                next_population.append(parent2Mutated)

        # update population
        population = next_population

    best_tree = simplify_tree(population[np.argmin(fitnesses)])
        
    print(f"Formula found: {best_tree}")
    
    # Final metrics
    y_pred = np.array([best_tree.evaluate({variables[i]: x_v[i] for i in range(len(variables))}) for x_v in x.T])
    mse_val = np.mean((y - y_pred) ** 2)
    print(f"MSE VALIDATION: {mse_val}")

    return simplify_tree(best_tree), fitnesses, fitness_avg_history, fitness_min_history, average_size_history, average_depth_history, num_generation_done, ended_stopcase, mse_val



## Main

In [None]:
problems = [
    "problem_1.npz",
    "problem_2.npz",
    "problem_3.npz",
    "problem_4.npz",
    "problem_5.npz",
    "problem_6.npz",
    "problem_7.npz",
    "problem_8.npz"
]

problem_number = 2
num_try = 1
for p in problems:
    print(f"Trying to solve problem: {problem_number}...")
    for try_number in range(num_try):
        print(f"Try number {try_number+1}/{num_try} for problem: {problem_number}")
        problem = np.load(f"../data/{p}")
        x = problem['x']
        y = problem['y']

        x_validation = x
        y_validation = y
        
        print(f"problem size x:{x.shape}, y{y.shape}")

        variables = [f'x{i}' for i in range(x.shape[0])] 
        population = [generate_random_tree(variables) for _ in range(POPULATION_SIZE//2)]
        best_tree, fitnesses, fitness_avg_history, fitness_min_history, average_size_history, average_depth_history, num_generation_done, ended_stopcase, mse_val = genetic_alg(x, y, population, variables, fitness)
        
        print(f"Solved problem {problem_number}, saving results to file: output/problem_{problem_number}_output_try{try_number}_at_{time.time()}.txt\n\n")

        save_to_file(f"../output/problem_{problem_number}_output_try{try_number}_at_{time.time()}.txt",  best_tree, num_generation_done, ended_stopcase, mse_val)
        
        problem_number += 1

## Analysis

In [None]:
"""# Create subplots
fig, axs = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle("Statistics", fontsize=16)

axs[0, 0].plot(range(num_generation_done), fitness_min_history, label="Min Fitness", color="red")
axs[0, 0].set_title("Min fitness")
axs[0, 0].set_xlabel("Generations")
axs[0, 0].set_ylabel("Fitness")
axs[0, 0].grid(True)
axs[0, 0].legend()

axs[0, 1].plot(range(num_generation_done), fitness_avg_history, label="Avg Fitness", color="blue")
axs[0, 1].set_title("Avg Fitness (Log scale)")
axs[0, 1].set_xlabel("Generations")
axs[0, 1].set_ylabel("Fitness")
axs[0, 1].set_yscale('log')
axs[0, 1].grid(True)
axs[0, 1].legend()

axs[1, 0].plot(range(num_generation_done), average_depth_history, label="Avg Depth", color="green")
axs[1, 0].set_title("Avg tree depth size")
axs[1, 0].set_xlabel("Generations")
axs[1, 0].set_ylabel("Depth")
axs[1, 0].grid(True)
axs[1, 0].legend()

axs[1, 1].plot(range(num_generation_done), average_size_history, label="Avg Size", color="purple")
axs[1, 1].set_title("Avg tree size")
axs[1, 1].set_xlabel("Generations")
axs[1, 1].set_ylabel("Size")
axs[1, 1].grid(True)
axs[1, 1].legend()

# Adjust layout
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()"""