# Import

In [174]:
import numpy as np
from random import choice, choices, randint,random, uniform
import copy
import math
from dataclasses import dataclass

# Operations

In [175]:
#The depth of a tree cannot be greater than 20 OVERALL
MAX_TREE_DEPTH = 20

#Operations with 1 or 2 arguments divided
#Added a field of probabilities: sum and subtraction should be more likely to be chosen
OPERATIONS_BINARY = [ 
    (np.add, 2, "({} + {})",0.13),
    (np.subtract, 2, "({} - {})",0.13),
    (np.divide, 2, "({} / {})",0.1),
    (np.multiply, 2, "({} * {})",0.1),   
    (np.power, 2, "({} ^ {})",0.07),
]

BINARY_WEIGHTS = [op[3] for op in OPERATIONS_BINARY]

OPERATIONS_UNARY = [ 
    (np.sin, 1, "sin({})",0.09),
    (np.cos, 1, "cos({})",0.09),
    (np.tan, 1, "tan({})",0.07),
    (np.exp, 1, "exp({})",0.07),
    (np.log, 1, "log({})",0.07),
    (np.sqrt, 1, "sqrt({})",0.9),
]
UNARY_WEIGHTS = [op[3] for op in OPERATIONS_UNARY]
OPERATIONS = OPERATIONS_BINARY + OPERATIONS_UNARY
WEIGHTS = BINARY_WEIGHTS + UNARY_WEIGHTS    


# Program evaluation
Now we need a function that given the genotype provide us with the output provided by the predicted function. This function must receive the input vector to perform his operation.

In [176]:
def evaluate_program(program, x):
    if isinstance(program, str):  # Leaf node
        #If it's a leaf, it could be a costant or a variable
        if program[0] == 'x':
            return x[int(program[2:-1])]
        else:
            return float(program)
    elif isinstance(program, list): 
        op = next(op for op, _, symbol,p in OPERATIONS if symbol == program[0])
        args = [evaluate_program(child, x) for child in program[1:]]
        try:
            return op(*args)
        except ZeroDivisionError:
            return np.inf

# Mutation utility functions

In [177]:

def get_subtree_points_recursive(prog, path='', index=0, result=None):
    """ Get the points of the subtrees in the program. """
    if result is None:
        result = []
    
    if isinstance(prog, list) and len(prog) > 1:  
        if path:
            result.append(path)
        for i, node in enumerate(prog):
            if isinstance(node, list):  
                new_path = f"{path}[{i}]" if path else f"[{i}]"
                get_subtree_points_recursive(node, new_path, i, result)
    return result

def access_node_by_path(prog, path):
    """Access a node in the program by its path."""
    indices = [int(p.strip('][')) for p in path.split('][') if p]
    current = prog
    for index in indices:
        current = current[index]
    return current


def find_variable_indices(node, result=None):
    """Find the indices of the variables in the program."""
    if result is None:
        result = set()

    if isinstance(node, list):
        for child in node:
            find_variable_indices(child, result)
    
    elif isinstance(node, str) and node.startswith('x['):
        result.add(int(node[2:-1]))

    return result

def set_subtree_at_path(program, path, new_subtree):
    """Set the new subtree at the specified path."""
    if path == '':
        return new_subtree
    current = program
    indices = [int(x) for x in path.strip('][').split('][')]
    for i in indices[:-1]:  
        current = current[i]
    current[indices[-1]] = new_subtree  
    return program


def swap_operation_at_path(program, path, new_op):
    """Swap the operation at the specified path."""
    current = program
    indices = [int(x) for x in path.strip('][').split('][')]
    for i in indices[:-1]: 
        current = current[i]
    if isinstance(current[indices[-1]], list):  
        current[indices[-1]][0] = new_op 
    return program

def cleaned_program(program):
    """Given a program of only costants, substitute it with a single value given by the computation of costants"""
    value = evaluate_program(program, [0])
    return str(value)

def safe_copy(obj):
    """Copy an object safely."""
    if isinstance(obj, list):
        return copy.deepcopy(obj)  
    elif isinstance(obj, str):
        return obj 
    else:
        raise TypeError("Unsupported type, only strings.")
    
def depth(program):
    """Compute the depth of the program (represented by a tree)"""
    if isinstance(program, str):
        return 1
    elif isinstance(program, list):
        return 1 + max(depth(child) for child in program[1:])

# Genothype definition

The recursive function we've developed is designed to create complex mathematical formulas in a structured format, where each formula is represented as a list: `[operator, first_operand, second_operand]`. This format allows the recursive and hierarchical organization of operations, facilitating the computational evaluation and manipulation of the formula.

Key features of this function include:

1. **Use of Input Dimensions:** The function uses a list, `used_indices`, to track which input dimensions (e.g., `0,1`, etc.) have been utilized in the formula. This ensures comprehensive coverage of all available input variables (if the depth make it possible), making the formula relevant to all dimensions of the input data.  

2. **Restrictions on Trigonometric Functions:** To enhance the mathematical sensibility of the formulas generated, the function includes a specific constraint regarding the nesting of trigonometric functions. Once a trigonometric function (such as `sin`, `cos`, etc.) is used, the function prohibits the inclusion of another trigonometric function within it. This constraint helps in preventing mathematically nonsensical expressions like `sin(cos(tan(x)))`, which, while computationally valid, may not be practically meaningful or may complicate the interpretation and analysis of the formula.

This approach not only ensures that each formula is robust and contextually appropriate but also maintains clarity and reduces the computational redundancy that might arise from nested trigonometric operations. Such constraints are particularly important in scientific computing and simulations where the accuracy and interpretability of mathematical expressions are critical.


In [178]:
def random_program(depth,input_dim,unary=False, used_indices=None):
    if used_indices is None:
        used_indices = set()

    # Base case: generate a leaf node
    if depth == 0 or (random()<0.2):
        if len(used_indices) < input_dim and random()<0.7:
            available_indices = list(set(range(input_dim)) - used_indices)
            index = choice(available_indices)
            used_indices.add(index)
            return f"x[{index}]", used_indices
        else: #Only positive float between 0 and 10
            return str(uniform(0, 10)), used_indices
           
    if(depth<=1 and not unary and len(used_indices)!=input_dim):
        operations = OPERATIONS
        weights = WEIGHTS
    else:
        operations = OPERATIONS_BINARY
        weights = BINARY_WEIGHTS
    op, arity, symbol,p = choices(operations,weights=weights, k=1)[0]
    if(unary != True):
        unary = arity==1
    children = []
    for _ in range(arity):
        child, used_indices = random_program(depth - 1, input_dim, unary, used_indices)
        children.append(child)
    return [symbol] + children,used_indices

In [179]:
def generate_program(input_dim):
    """Generate a random program given the input dimension. It ensures all dimensions are used once in the program."""
    programs = []
    for i in range(input_dim):
        flag = False #Evaluate that it does not only contain costants
        used_indices_local = set()
        used_indices_local = set(range(input_dim)) - {i}
        while(not flag):
            program, used_indices = random_program(2, input_dim,used_indices=used_indices_local)
            flag = len(find_variable_indices(program)) == 1
            if not flag:
                program = cleaned_program(program)
        programs.append(program)
    #Combine programs together through a binary operation
    program = programs[0]
    for i in range(1,len(programs)):
        op, arity, symbol,p = choices(OPERATIONS_BINARY,weights=BINARY_WEIGHTS, k=1)[0]
        program = [symbol] + [program, programs[i]]
    
    return program

## Individual

We developed a dataclass to store the fitness value of each individual. This will allow us to avoid to recompute the fitness function for the same individual.

In [180]:
@dataclass
class Individual:
    genome: list
    fitness : float = None

## Transform the program into a human readable function

In [181]:
def program_to_string(program):
    if isinstance(program, str):  # leaf
        return program  
    elif isinstance(program, list):  
        try:
            _, _, symbol = next((op, arity, s) for op, arity, s,p in OPERATIONS if s == program[0])
        except StopIteration:
            raise ValueError(f"Not known operation: {program[0]}")
        
        children = [program_to_string(child) for child in program[1:]]
        
        return symbol.format(*children)

In [182]:
print(program_to_string(generate_program(3)))

(((sqrt(x[0]) - (0.7334293407275227 * 1.3626301438307575)) / (sqrt(x[1]) - (4.0852366968840945 * 3.7519342704284284))) - (x[2] / (2.557507469868127 ^ 8.022147612715063)))


## Fitness function

# MAPE
The Mean Absolute Percentage Error (MAPE) is a statistical measure used to evaluate the accuracy of predictions in regression analysis. It quantifies the average magnitude of errors in a set of predictions, expressed as a percentage of the actual values. The formula for MAPE is the average of the absolute differences between the predicted and actual values, divided by the actual values, multiplied by 100 to convert it into a percentage. This metric is particularly useful because it provides a clear, interpretable measure of predictive error relative to the size of the numbers being predicted, making it ideal for comparing the accuracy of prediction models across different data scales

In [183]:
def mape(program, x, y):
    predictions = np.array([evaluate_program(program, x_row) for x_row in x.T])
    return  float(np.mean(np.abs((y - predictions) / y)) * 100)

In [184]:
def fitness_function(program, x, y):
    """Fitness evaluation for a program."""
    try:
        # Evaluation of the program
        predictions = np.array([evaluate_program(program, x_row) for x_row in x.T])
        if np.any(np.isnan(predictions)) or np.any(np.isinf(predictions)):
            return np.inf  # Penalize invalid programs 

        #Compute mape
        error = mape(program, x, y)
        if not math.isfinite(error):
            return np.inf
        #Evaluation of the complexity of a program removed, it is problem dependant, the depth of a program is limited anyway

    except Exception as e:
        # Penalize programs with errors 
        print(f"Error in program evaluation: {e}")
        return np.inf

    return error

## Mutation

The idea of the mutation is that recursively finds all the subtree of the tree. Casually select one of them. It checks which variables were in that subtree and generates a new tree containing (at least) the same variables to be added at that place: it ensures that we always consider all the dimensions of the input. 

If the considered sub tree do not contains any variable's dimension, we can return a constant value.

In [185]:
def mutate(program, input_dim, max_depth=3):
    """Mutation of a program."""
    mutant = copy.deepcopy(program)

    points = get_subtree_points_recursive(program)
    if not points:
        return generate_program(input_dim)

    point = choice(points)
    subtree = access_node_by_path(mutant, point)
    
    #Variable set contains the indices of the variables used in the subtree
    variable_set = find_variable_indices(subtree)
    variables = set(range(input_dim))-variable_set
    new_subtree = []
    if(random()<0.6):
        if(len(variable_set)==0):
            #return generate_program(input_dim)
            new_subtree = str(random()*9+1)
            return set_subtree_at_path(mutant, point, new_subtree)
        elif(len(variable_set)==1):
            new_subtree = random_program(1, input_dim,False,variables)[0]
        else:
            new_subtree = random_program(max_depth, input_dim,False,variables)[0]
    else:
        new_subtree = random_program(max_depth, input_dim,True)[0]
    
    mutant = set_subtree_at_path(mutant, point, new_subtree)
    if(random()<0.4):
            points = get_subtree_points_recursive(mutant)
            if not points:
                return mutant
            point = choice(points)
            #Randomly choose a binary operation
            op, arity, symbol,p = choices(OPERATIONS_BINARY,weights=BINARY_WEIGHTS, k=1)[0]
            if(len(access_node_by_path(mutant, point)) == 3):
                mutant = swap_operation_at_path(mutant, point, symbol)

    return mutant

Let's create a new tweak function that is able to tweak a program by adding an unary operation in leaf nodes (if it not a constant value nor already a unary operation).

In [186]:
def tweak_program_2(program):
    """
    Modifica un sottoalbero del programma aggiungendo un operatore unario
    su una foglia, evitando di applicarlo a una foglia già modificata da un operatore unario.
    """
    # Trova tutte le foglie del programma
    def get_leaf_indices(node, path=()):
        """
        Ritorna i percorsi alle foglie dell'albero.
        Una foglia è un valore (stringa o numero) non ulteriormente divisibile.
        """
        if isinstance(node, (str, int, float)):  # Nodo foglia (variabile o costante)
            return [path]
        elif isinstance(node, list) and len(node) > 1:  # Nodo interno valido
            indices = []
            for i, child in enumerate(node[1:], start=1):  # Salta l'operatore
                indices.extend(get_leaf_indices(child, path + (i,)))
            return indices
        return []  # Nodo vuoto o non valido

    # Ottieni tutte le foglie
    leaf_indices = get_leaf_indices(program)
    if not leaf_indices:
        return program  # Nessuna modifica possibile

    # Seleziona casualmente una foglia
    selected_leaf_path = choice(leaf_indices)

    # Accedi alla foglia selezionata
    node = program
    for idx in selected_leaf_path[:-1]:
        node = node[idx]

    # Verifica che il nodo sia valido prima di modificare
    if isinstance(node, list) and len(selected_leaf_path) > 0:
        leaf = node[selected_leaf_path[-1]]
        
        # Verifica se la foglia è modificabile
        if isinstance(leaf, (str, int, float)):
            # Scegli un operatore unario
            unary_operator = choice(["sin({})", "cos({})", "tan({})", "log({})", "sqrt({})"])
            
            # Applica l'operatore unario
            node[selected_leaf_path[-1]] = [unary_operator, leaf]
        elif isinstance(leaf, list) and len(leaf) == 2 and isinstance(leaf[0], str):
            # La foglia è già un operatore unario applicato: non fare nulla
            pass

    return program


Let's also introduce a function that simply search for a leaf node of a constant (numerical) value and multiply it by 1.x, where x is randomicaly selected in order to convert the the number to a float value.

In [187]:
from random import choice, uniform

def tweak_program_with_constant(program):
    """
    Modifica un nodo foglia che sia una costante numerica, moltiplicandolo
    per una costante moltiplicativa del tipo 1,x, dove x è un float casuale.
    """
    # Trova tutte le foglie del programma
    def get_leaf_indices(node, path=()):
        """
        Ritorna i percorsi alle foglie dell'albero.
        Una foglia è un valore (numero o variabile) non ulteriormente divisibile.
        """
        if isinstance(node, (int, float)):  # Nodo foglia costante
            return [path]
        elif isinstance(node, list) and len(node) > 1:  # Nodo interno valido
            indices = []
            for i, child in enumerate(node[1:], start=1):  # Salta l'operatore
                indices.extend(get_leaf_indices(child, path + (i,)))
            return indices
        return []  # Nodo vuoto o non valido

    # Ottieni tutte le foglie
    leaf_indices = get_leaf_indices(program)
    if not leaf_indices:
        return program  # Nessuna modifica possibile

    # Filtra solo le costanti numeriche
    constant_leaf_indices = []
    for path in leaf_indices:
        node = program
        for idx in path[:-1]:
            node = node[idx]
        leaf = node[path[-1]]
        if isinstance(leaf, (int, float)):
            constant_leaf_indices.append(path)

    if not constant_leaf_indices:
        return program  # Nessuna costante da modificare

    # Seleziona casualmente una foglia costante
    selected_leaf_path = choice(constant_leaf_indices)

    # Accedi alla foglia selezionata
    node = program
    for idx in selected_leaf_path[:-1]:
        node = node[idx]

    # Moltiplica la costante per una costante moltiplicativa casuale
    leaf = node[selected_leaf_path[-1]]
    if isinstance(leaf, (int, float)):
        multiplier = uniform(1.0, 2.0)  # Genera un moltiplicatore casuale tra 1.0 e 2.0
        node[selected_leaf_path[-1]] = leaf * multiplier

    return program

# Esempio di utilizzo
program = ['+', 3, ['*', ['sin', 'x[0]'], 5], ['-', 7, 2]]
modified_program = tweak_program_with_constant(program)
print(modified_program)


['+', 3, ['*', ['sin', 'x[0]'], 5], ['-', 8.021533052904317, 2]]


## Crossover

We can use a croossover function that receives only 2 parents and, if one of them is a leaf program simply return casually one of the 2 programs (avoiding to perform the operation for programs with no childrens). Otherwise, select random indexes for both the parents and combine the first part of the tree with the second part of the tree of the 2 parents, returning a new individual.

New crossover function that tries to swap two sub-trees only if they uses the same set of variables (to ensure that the result will actually contain each component of the input variable exactly once).

In [188]:
import copy
from random import choice

def check_depth(subtree, max_depth):
    """ Verifica se la profondità del sottoalbero supera max_depth. """
    def depth(tree, current_depth):
        if not isinstance(tree, list) or not tree:
            return current_depth
        return max(depth(child, current_depth + 1) for child in tree)
    return depth(subtree, 0) <= max_depth

def crossover(parent1, parent2, max_depth,input_dim):
    """ Crossover a singolo punto tra due programmi con controllo della profondità e variabili. """
    child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2)
    
    points1 = get_subtree_points_recursive(child1)
    points2 = get_subtree_points_recursive(child2)
    
    if not points1 or not points2:
        return generate_program(input_dim), generate_program(input_dim)  
    
    valid = False
    attempts = 0
    while not valid and attempts < 100:  # Limita il numero di tentativi per evitare loop infiniti
        point1 = choice(points1)
        point2 = choice(points2)

        subtree1 = access_node_by_path(child1, point1)
        subtree2 = access_node_by_path(child2, point2)

        variable_set_1 = find_variable_indices(subtree1)
        variable_set_2 = find_variable_indices(subtree2)
        # Verifica se i sottoalberi possono essere scambiati rispetto a variabili e profondità
        if (set(find_variable_indices(subtree1)) == set(find_variable_indices(subtree2)) and
            len(variable_set_1)==input_dim and
            check_depth(subtree1, max_depth) and check_depth(subtree2, max_depth)):
            valid = True
        attempts += 1

    if valid:
        child1 = set_subtree_at_path(child1, point1, subtree2)
        child2 = set_subtree_at_path(child2, point2, subtree1)

    return child1, child2


## Genetic algorithm

### Data loading

In [189]:
#load the problem with problem_X, for X that goes from 0 to 8
problem = np.load('data/problem_4.npz')
x = problem['x']
y = problem['y']
print(x.shape)
print(y.shape)

(2, 5000)
(5000,)


## Tournament selection for parents with fitness hole, tau set to 20

In [190]:
def tournament_selection(population,tau=10):
    tau = min(tau, len(population)) #not needed in theory
    tournament_indices = np.random.choice(len(population), tau, replace=False)

    considered_individuals = []
    for index in tournament_indices:
        considered_individuals.append(population[index])
    considered_individuals.sort(key=lambda i: i.fitness)
    if random() < 0.9:
        winner = considered_individuals[0].genome
    else:
        #Select one among the second and the worst
        winner = considered_individuals[randint(1,len(considered_individuals)-1)].genome
    return winner


## Parameters

In [191]:
generations = 200
population_size = 200
p_crossover = 0.6
p_mutation = 0.4
tweak_probability = 0.2
max_depth = 2 
elite_size = 2
offspring_size = population_size 

Different EA aproach:
1. In this version we always include the elite inside the next generation as a first step
2. We extend population with the new population (resulting in having elites twice)
3. We take only the distinct individuals
4. We mantain inside the population only the population_size best individuals.

In [None]:
# Inizializza popolazione
np.seterr(all='ignore')
input_dim = x.shape[0]
population = [Individual(genome=generate_program(input_dim)) for _ in range(population_size)]
for i in population:
    i.fitness=fitness_function(i.genome, x, y)
# Loop principale per le generazioni
def run_genetic_algorithm():
    global population

    for gen in range(generations):
        population.sort(key=lambda i: i.fitness)
        mape_val = mape(population[0].genome, x, y)
        
        #break the cycle if you found the best solution you're able to find with training data
        if(mape_val==0.0000): 
            np.seterr(all='warn')
            print('Best program found with mape=0')
            return population[0].genome

        np.seterr(all='warn')
        print(f"Generazione {gen + 1}, miglior fitness: {mape_val:.6f}")
        #population is already sorted, so:
        print(f"Best formula: {program_to_string(population[0].genome)}")
        np.seterr(all='ignore')
        
        # Crea la nuova generazione
        next_population = []
        next_population.extend(population[:elite_size])  # Mantieni i migliori individui
        
        
        while len(next_population) < offspring_size:
            if random() < p_crossover:
                # Crossover
                #With random choice is much faster than tournament selection
                #Choose the best and the second best parent
                parent1, parent2 = tournament_selection(population), tournament_selection(population)
                child1, child2 = crossover(safe_copy(parent1), safe_copy(parent2),max_depth,input_dim)
                
                if random() < p_mutation:
                    child1 = mutate(safe_copy(child1), input_dim)
                if random() < p_mutation:
                    child2 = mutate(safe_copy(child2), input_dim)
                #let's add the new individuals:
                next_population.append(Individual(genome=child1, fitness=fitness_function(child1, x, y)))
            
                if len(next_population) < offspring_size:
                    next_population.append(Individual(genome=child2, fitness=fitness_function(child2, x, y)))
                if random() < tweak_probability and len(next_population) < offspring_size:
                    new_ind = tweak_program_2(safe_copy(child1))
                    next_population.append(Individual(genome=new_ind, fitness=fitness_function(new_ind, x, y)))
            else:
                # Mutate directly a parent
                parent = tournament_selection(population)
                mutant = mutate(safe_copy(parent), input_dim)
                next_population.append(Individual(genome=mutant, fitness=fitness_function(mutant, x, y)))
                    
        # the new population is the one generated in the offspring
        population.extend(next_population)

        # Remove duplicates
        unique_population = {}
        for prog in population:
            serialized = str(prog)
            if serialized not in unique_population:
                unique_population[serialized] = prog
        
        # update fitness of the new population
        population = list(unique_population.values())
        
        population.sort(key=lambda i: i.fitness)
        population = population[:population_size]
        

    # Identify the best program
    population.sort(key=lambda i: i.fitness)
    best_program = population[0]
    best_mape = mape(best_program.genome, x, y)

    print("Best program:", best_program.genome, "; Fitness:", best_mape)
    return best_program.genome

best_program = run_genetic_algorithm()

Generazione 1, miglior fitness: 100.047507
Best formula: ((7.178981011575169 + (x[0] - 1.619017424605168)) / ((1.331563693440776 + x[1]) * (7.951401588698658 ^ 3.513130369864761)))
Generazione 2, miglior fitness: 99.968047
Best formula: ((7.178981011575169 + (x[0] - 1.619017424605168)) / (2.2030039936067727 * (7.951401588698658 ^ 3.513130369864761)))


In [None]:
program_to_string(best_program)