In [None]:
# Code implemented by Andrea Fontana
# Politecnico di Torino, Computational Intelligence

In [None]:
import numpy as np
import random
from typing import List, Dict

## 1. Initial Configuration

In [None]:
# Data Load
problem = np.load('../data/problem_4.npz')
x = problem['x']
y = problem['y']

TRAIN_SIZE = 1

# decide how many data to train the model (0.6 --> 60% of the data, 1 --> 100% of the data)
train_size = int(x.shape[1] * TRAIN_SIZE)

# From (n_features, n_samples) to (n_samples, n_features)
x_train = x[:, :train_size].T

# From 1D array to 2D array with one column: useful to calculate MSE
# -1 is used to arrange the number of rows to have one column
y_train = y[:train_size].reshape(-1, 1)

if TRAIN_SIZE != 1.0:
    x_val = x[:, train_size:].T
    y_val = y[train_size:].reshape(-1, 1)

In [None]:
""" TEST """
print(x_train.shape)
print(y_train.shape)
if TRAIN_SIZE != 1.0:
    print(x_val.shape)
    print(y_val.shape)

## 2. Node and Functions

In [None]:
# Safe functions help to avoid eager calculus of np.where
# np.close absorb values similar to zero to avoid certain RunTimeWarning
def safe_div(x, y):
    if np.isclose(y, 0):
        return np.nan
    return x / y

def safe_tan(x):
    if np.isclose(np.cos(x), 0):
        return np.nan
    return np.tan(x)

def safe_cot(x):
    if np.isclose(np.sin(x), 0):
        return np.nan
    return np.cos(x) / np.sin(x)

def safe_sqrt(x):
    if x < 0:
        return np.nan
    return np.sqrt(x)

def safe_ln(x):
    if x <= 0:
        return np.nan
    return np.log(x)

def safe_arccot(x):
    if np.isclose(x, 0):
        return np.pi / 2
    return np.arctan(1 / x)

In [None]:
# How many dependent variables
INPUT_VARS = x_train.shape[1]
TERMINAL_SET = [f'x{i}' for i in range(INPUT_VARS)]

# Function Set F: Set of functions with metadata for arity and semantic constraints
FUNCTION_METADATA = {
    '+': {'arity': 2, 'func': lambda x, y: x + y},
    '-': {'arity': 2, 'func': lambda x, y: x - y},
    '*': {'arity': 2, 'func': lambda x, y: x * y},
    '/': {'arity': 2, 'func': safe_div},

    'sin': {'arity': 1, 'func': lambda x: np.sin(x)},
    'cos': {'arity': 1, 'func': lambda x: np.cos(x)},
    'tan': {'arity': 1, 'func': safe_tan},
    'cot': {'arity': 1, 'func': safe_cot},

    'sinh': {'arity': 1, 'func': lambda x: np.sinh(np.clip(x, -200, 200))},
    'cosh': {'arity': 1, 'func': lambda x: np.cosh(np.clip(x, -200, 200))},
    'tanh': {'arity': 1, 'func': lambda x: np.tanh(x)},

    'sqrt': {'arity': 1, 'func': safe_sqrt},
    'sqr': {'arity': 1, 'func': lambda x: np.power(np.clip(x, -1e+50, 1e+50), 2)},
    'cube': {'arity': 1, 'func': lambda x: np.power(np.clip(x, -1e+20, 1e+20), 3)},

    'ln': {'arity': 1, 'func': safe_ln},
    'exp': {'arity': 1, 'func': lambda x: np.exp(np.clip(x, -100, 350))},

    'arctan': {'arity': 1, 'func': lambda x: np.arctan(x)},
    'arccot': {'arity': 1, 'func': safe_arccot}
}

FUNCTION_SET_UNARY = [key for key, meta in FUNCTION_METADATA.items() if meta['arity'] == 1]
FUNCTION_SET_BINARY = [key for key, meta in FUNCTION_METADATA.items() if meta['arity'] == 2]
FUNCTION_SET = list(FUNCTION_METADATA.keys())

In [None]:
""" TEST """
print(INPUT_VARS)

In [None]:
class Node:
    def __init__(self, value, left=None, right=None):
        self.value = value
        self.left = left
        self.right = right


    def is_leaf(self):
        return self.left is None and self.right is None

    
    # Return depth of the tree
    def get_height(self):
        if self.is_leaf():
            return 1
        if self.value in FUNCTION_SET_UNARY:
            return 1 + self.left.get_height()
        return 1 + max(self.left.get_height(), self.right.get_height())
    
    
    # Return number of the nodes of the tree
    def get_size(self):
        if self.is_leaf():
            return 1
        if self.value in FUNCTION_SET_UNARY:
            return 1 + self.left.get_size()
        return 1 + self.left.get_size() + self.right.get_size()
    
    
    # Return a List of all the Nodes (class Node) of the tree
    def get_all_subtrees(self) -> List:
        subtrees = [self]
        if self.left:
            subtrees.extend(self.left.get_all_subtrees())
        if self.right:
            subtrees.extend(self.right.get_all_subtrees())
        return subtrees

    
    def find_parent_of(self, target_node, current_node=None):
        if current_node is None:
            current_node = self
        if current_node.left == target_node or current_node.right == target_node:
            return current_node
        if current_node.left:
            parent = self.find_parent_of(target_node, current_node.left)
            if parent: return parent
        if current_node.right:
            parent = self.find_parent_of(target_node, current_node.right)
            if parent: return parent
        return None

    
    def copy(self):
        if self.is_leaf():
            return Node(self.value)
        if self.value in FUNCTION_SET_UNARY:
            return Node(self.value, self.left.copy())
        return Node(self.value, self.left.copy(), self.right.copy())
    

    # Batch processing: evaluation for all the points in the dataset
    # - resulting array contains the prediction for each point of the dataset
    def evaluate(self, input_vars: Dict[str, np.ndarray]) -> np.ndarray:
        if self.value in input_vars:
            return input_vars[self.value]
        if self.is_leaf():
            try:
                # np.full_like models the numeric value into an np.ndarray of the same dimension of terminal variables:
                # - (x0 + 0.5) --> ([1, 2, 3] + [0.5, 0.5, 0.5])
                return np.full_like(list(input_vars.values())[0], float(self.value))
            except ValueError:
                raise ValueError(f"Unknown terminal value: {self.value}")

        func = FUNCTION_METADATA[self.value]['func']

        # Unary operators
        if self.value in FUNCTION_SET_UNARY:
            left_val = self.left.evaluate(input_vars)
            return func(left_val)
        # Binary operators
        else:  
            left_val = self.left.evaluate(input_vars)
            right_val = self.right.evaluate(input_vars)
            return func(left_val, right_val)

    
    def __str__(self):
        if self.is_leaf():
            return str(self.value)
        if self.value in FUNCTION_SET_UNARY:
            return f"{self.value}({self.left})"
        return f"({self.left} {self.value} {self.right})"

## 3. Helper functions

In [None]:
def count_constants_and_variables(node):
    #Count constants and variables (with frequency) in a tree.
    #Returns a dict with:
    #  - 'constants': number of constants
    #  - 'variables': total number of variables
    #  - 'variable_freq': frequency of each variable (e.g., {'x0': 2, 'x1': 1})
   
    if node is None:
        return {'constants': 0, 'variables': 0, 'variable_freq': {}}

    counts = {'constants': 0, 'variables': 0, 'variable_freq': {}}

    # If the node is a leaf
    if node.is_leaf():
        try:
            # Try converting to float → constant
            float(node.value)
            counts['constants'] = 1
        except ValueError:
            # It's a variable (e.g., 'x0', 'x1', ...)
            counts['variables'] = 1
            counts['variable_freq'][node.value] = 1
        return counts

    # If the node has children → recursive aggregation
    left_counts = count_constants_and_variables(node.left)
    for key in ['constants', 'variables']:
        counts[key] += left_counts[key]
    for var, freq in left_counts['variable_freq'].items():
        counts['variable_freq'][var] = counts['variable_freq'].get(var, 0) + freq

    if node.right:
        right_counts = count_constants_and_variables(node.right)
        for key in ['constants', 'variables']:
            counts[key] += right_counts[key]
        for var, freq in right_counts['variable_freq'].items():
            counts['variable_freq'][var] = counts['variable_freq'].get(var, 0) + freq

    return counts

In [None]:
def prune_tree(individual: Node, max_depth, current_depth=0) -> Node:
    # Limits the depth of a tree to max_depth.
    # If the depth is not reached, the function continues to call itself on the left and right children.
    if not individual:
        return None
    
    # We have reached the depth limit --> convert the Node in a leaf
    if current_depth >= (max_depth - 1):
        if random.random() < 0.5:
            new_value = random.choice(TERMINAL_SET)
        else:
            new_value = str(random.uniform(-1, 1))
        
        return Node(new_value)

    # We haven't reached the depth limit
    if individual.left:
        individual.left = prune_tree(individual.left, max_depth, current_depth + 1)
    if individual.right:
        individual.right = prune_tree(individual.right, max_depth, current_depth + 1)
        
    return individual

In [None]:
def simplify_tree(node: Node) -> Node:
    if node.is_leaf():
        return node
    if node.left:
        node.left = simplify_tree(node.left)
    if node.right:
        node.right = simplify_tree(node.right)

    # ========================================
    # Operator(constant) -> constant
    # ========================================
    if node.value in FUNCTION_SET_UNARY:
        if node.left.is_leaf():
            try:
                # Performs the mathematical function on the child's value if it is a constant
                val = float(node.left.value)
                result = FUNCTION_METADATA[node.value]['func'](val)

                if not np.isinf(result) and not np.isnan(result):
                    return Node(str(result))

            except (ValueError, TypeError):
                # This is a terminal, you don't have to simplify
                pass
    
    # ==============================================================================
    # Existing simplification logic (Constant Folding, A+A, etc.)
    # ==============================================================================
    if node.value in FUNCTION_SET_BINARY:
        try:
            left_val = float(node.left.value)
            right_val = float(node.right.value)
            
            if node.value == '+':
                result = left_val + right_val
            elif node.value == '-':
                result = left_val - right_val
            elif node.value == '*':
                result = left_val * right_val
            elif node.value == '/':
                result = left_val / right_val if right_val != 0 else 1.0

            if not np.isinf(result) and not np.isnan(result):
                return Node(str(result))
        except (ValueError, AttributeError):
            pass

        # A + A = 2 * A
        if node.value == '+' and str(node.left) == str(node.right):
            return Node('*', Node('2'), node.left)
        
        # A * 1 = A,    1 * A = A
        if node.value == '*' and (node.right.value == '1' or node.left.value == '1'):
            if node.left.value == '1':
                return node.right
            else:
                return node.left
        
        # A / 1 = A
        if node.value == '/' and node.right.value == '1':
            return node.left
        
        # A - A = 0
        if node.value == '-' and str(node.left) == str(node.right):
            return Node('0')
        
        # A + 0 = A,    0 + A = A
        if node.value == '+':
            if node.left.value == '0':
                return node.right
            if node.right.value == '0':
                return node.left
            
        # A * 0 = 0,    0 * A = 0
        if node.value == '*' and (node.left.value == '0' or node.right.value == '0'):
            return Node('0')

    return node

## 4. Fitness

In [None]:
def calculate_fitness(individual, data_x, data_y):
    size_penalty = 0
    unbalanced_penalty = 0
    ratio_penalty = 0
    raw_mse = 0
    
    try:
        # {'x0': [column 0 values], 'x1': [column 1 values]}
        input_vars = {f'x{i}': data_x[:, i] for i in range(data_x.shape[1])}
        y_pred = individual.evaluate(input_vars).reshape(-1, 1)

        # Handling Infinite or NaN Values
        # - sqrt(-1) OR log(0) --> np.nan
        # - the tree will have inf fitness, very unlikely to be selected for subsequent generations
        if np.any(np.isnan(y_pred)) or np.any(np.isinf(y_pred)):
            return (float('inf'), float('inf'))
        
        # Disable RuntimeWarning if it cathces overflow
        with np.errstate(over='ignore'):
            raw_mse = np.mean((y_pred - data_y) ** 2)

        # If the error itself is unstable, it penalizes the individual
        if np.isinf(raw_mse) or np.isnan(raw_mse):
            return (float('inf'), float('inf'))
        
        # Calculate the penalty based on the size of the tree, if it is very large
        size = individual.get_size()
        max_size = (10.5 * INPUT_VARS) // 3
        if size > max_size:
            size_penalty = (size - max_size) * 0.02

        # If a terminal is not present in the tree, its frequency is 0
        terminal_counts = count_constants_and_variables(individual)
        all_terminal_freqs = [terminal_counts['variable_freq'].get(f'x{i}', 0) for i in range(INPUT_VARS)]

        terminal_penalty = 10
        for freq in all_terminal_freqs:
            if freq == 0:  # missing terminal
                terminal_penalty *= 5

        if sum(all_terminal_freqs) > 0:
            std_dev = np.std(all_terminal_freqs)
            unbalanced_penalty = std_dev * 10 + terminal_penalty
        else:
            # Penalty for trees that do not contain variables (constant)
            unbalanced_penalty = 100 + terminal_penalty            

        # penalize solutions with a high constant:variable ratio
        constants = terminal_counts['constants']
        variables = terminal_counts['variables']
        
        if variables > 0:
            ratio = constants / variables
            if ratio > 1:
                # The penalty is proportional to the ratio, with a significant basis
                ratio_penalty = 10 * ratio

        return (raw_mse + size_penalty + unbalanced_penalty + ratio_penalty, raw_mse)
    except (ValueError, ZeroDivisionError, OverflowError):
        return (float('inf'), float('inf'))


## 5. Population Generation (Grow and Full)

In [None]:
# Grow method: generates trees with depth <= max_depth
def grow(max_depth, current_depth=0):
    if current_depth >= (max_depth - 1):
        # Choose an input variable or a random number
        if random.random() < 0.5: 
            return Node(str(random.uniform(-1, 1)))
        else:
            return Node(random.choice(TERMINAL_SET))
    
    # Randomly choose between Function, Terminal or Costant
    r = random.random()
    if r < 0.75:  # 70% function
        choice = random.choice(FUNCTION_SET)
    elif random.random() < 0.5:
        choice = 'number'
    else:
        choice = 'variable'

    if choice in FUNCTION_SET_BINARY:
        return Node(choice, grow(max_depth, current_depth + 1), grow(max_depth, current_depth + 1))
    elif choice in FUNCTION_SET_UNARY:
        return Node(choice, grow(max_depth, current_depth + 1))
    else:
        if choice == 'number':
            return Node(str(random.uniform(-1, 1)))
        else:
            return Node(random.choice(TERMINAL_SET))
        

# Full method: generates trees with depth = max_depth
def full(max_depth, current_depth=0):
    if current_depth >= (max_depth - 1):
        if random.random() < 0.5:
            return Node(str(random.uniform(-1, 1)))
        else:
            return Node(random.choice(TERMINAL_SET))
    
    func_choice = random.choice(FUNCTION_SET)

    if func_choice in FUNCTION_SET_BINARY:
        return Node(func_choice, full(max_depth, current_depth + 1), full(max_depth, current_depth + 1))
    elif func_choice in FUNCTION_SET_UNARY:
        return Node(func_choice, full(max_depth, current_depth + 1))
    

def generate_population(pop_size, max_depth, x_train, y_train):
    population = []
    num_grow = int(pop_size * 0.6)
    num_full = pop_size - num_grow

    for _ in range(num_grow):
        individual = grow_and_simplify(max_depth, x_train, y_train)
        population.append(individual)
    for _ in range(num_full):
        individual = full_and_simplify(max_depth, x_train, y_train)
        population.append(individual)
    return population


# Wrapper function that generates a tree using the Grow method and simplifies it
def grow_and_simplify(max_depth, x_train, y_train):
    while True:
        # Generate the tree
        tree = grow(max_depth, current_depth=0)
        # Apply simplification
        simplified_tree = simplify_tree(tree)
        # Calculate fitness to check validity
        fitness, _ = calculate_fitness(simplified_tree, x_train, y_train)
        
        # If fitness is valid, exit the loop and return the tree
        if not np.isnan(fitness) and not np.isinf(fitness):
            return simplified_tree
        

# Wrapper function that generates a tree using the Full method and simplifies it
def full_and_simplify(max_depth, x_train, y_train):
    while True:
        # Generate the tree
        tree = full(max_depth, current_depth=0)
        # Apply simplification
        simplified_tree = simplify_tree(tree)
        # Calculate fitness to check validity
        fitness, _ = calculate_fitness(simplified_tree, x_train, y_train)
        
        # If fitness is valid, exit the loop and return the tree
        if not np.isnan(fitness) and not np.isinf(fitness):
            return simplified_tree


## 6. Genetic Operators (Crossover and Mutation)

In [None]:
def subtree_crossover(parent1: Node, parent2: Node) -> tuple[Node, Node]:
    # Two exact copies of the two parent trees are created.
    child1 = parent1.copy()
    child2 = parent2.copy()

    # All the lists of nodes that make up each tree are generated
    subtrees1 = child1.get_all_subtrees()
    subtrees2 = child2.get_all_subtrees()

    # Two nodes are chosen randomly, representing the roots of the sub-trees to be exchanged
    subtree1_to_swap = random.choice(subtrees1)
    subtree2_to_swap = random.choice(subtrees2)

    # --- Editing child1 ---
    if subtree1_to_swap == child1:  
        # If the root is selected, the new tree is directly a copy of the other subtree.
        child1 = subtree2_to_swap.copy()
    else:
        # Find the parent of the subtree to swap (parent_of_swap1)
        parent_of_swap1 = child1.find_parent_of(subtree1_to_swap)
        # Replaces the reference to the old subtree with a copy of the new subtree from the other parent.
        if parent_of_swap1.left == subtree1_to_swap:
            parent_of_swap1.left = subtree2_to_swap.copy()
        else:
            parent_of_swap1.right = subtree2_to_swap.copy()

    # --- Editing child2 ---
    if subtree2_to_swap == child2:
        child2 = subtree1_to_swap.copy()
    else:
        parent_of_swap2 = child2.find_parent_of(subtree2_to_swap)
        if parent_of_swap2.left == subtree2_to_swap:
            parent_of_swap2.left = subtree1_to_swap.copy()
        else:
            parent_of_swap2.right = subtree1_to_swap.copy()
    
    return child1, child2


In [None]:
def subtree_mutation(parent, max_depth):
    individual = parent.copy()
    all_nodes = individual.get_all_subtrees()
    
    # Subtree Mutation: Replaces a subtree with a new tree
    if len(all_nodes) > 1:
        node_to_replace = random.choice(all_nodes[1:])
    else:
        node_to_replace = individual

    new_subtree = grow_and_simplify(max_depth, x_train, y_train)

    if node_to_replace == individual:
        return new_subtree
    parent_of_replace = individual.find_parent_of(node_to_replace)
    if parent_of_replace.left == node_to_replace:
        parent_of_replace.left = new_subtree
    else:
        parent_of_replace.right = new_subtree

    return prune_tree(individual, max_depth)


def point_mutation(parent, max_depth=None):
    individual = parent.copy()
    all_nodes = individual.get_all_subtrees()
    
    # Point Mutation: Replaces a single node with a new node of its same type
    if not all_nodes:
        return individual
    
    node_to_mutate = random.choice(all_nodes)

    if node_to_mutate.is_leaf():
        # The node is a leaf. Determines whether it is a constant or a variable.
        try:
            # Case 1: the node is a constant
            float(node_to_mutate.value)
            original_value = node_to_mutate.value
            new_value = original_value
            while new_value == original_value:
                new_value = str(random.uniform(-1, 1))
            node_to_mutate.value = new_value
        except ValueError:
            # Case 2: If the conversion fails, it is a variable
            original_value = node_to_mutate.value
            new_value = original_value
            if len(TERMINAL_SET) > 1:
                while new_value == original_value:
                    new_value = random.choice(TERMINAL_SET)
            else:
                # Replace with a constant
                new_value = str(random.uniform(-1, 1))
            node_to_mutate.value = new_value
    else:
        # The node is an (internal) operator
        if node_to_mutate.value in FUNCTION_SET_UNARY:
            new_value = random.choice(FUNCTION_SET_UNARY)
        else:
            new_value = random.choice(FUNCTION_SET_BINARY)
        node_to_mutate.value = new_value
    
    return individual


def hoist_mutation(parent, max_depth=None):
    individual = parent.copy()
    all_nodes = individual.get_all_subtrees()
    
    # Hoist Mutation: Replaces the root with a random subtree
    # Creates a list of all nodes within the tree that are not leaves or roots
    internal_nodes = [n for n in all_nodes if not n.is_leaf() and n != individual]
    if internal_nodes:
        node_to_hoist = random.choice(internal_nodes)
        return node_to_hoist.copy()
    
    return individual


def expansion_mutation(parent, max_depth):
    # Expansion Mutation: Replaces a leaf with a function node
    individual = parent.copy()
    all_nodes = individual.get_all_subtrees()
    
    leaf_nodes = [n for n in all_nodes if n.is_leaf()]
    if leaf_nodes:
        node_to_expand = random.choice(leaf_nodes)
        func = random.choice(FUNCTION_SET)
        arity = FUNCTION_METADATA[func]['arity']
        node_to_expand.value = func
        if arity == 1:
            if random.random() < 0.5:
                node_to_expand.left = Node(str(random.uniform(-1, 1)))
            else:
                node_to_expand.left = Node(random.choice(TERMINAL_SET))
        else:
            if random.random() < 0.5:
                node_to_expand.left = Node(str(random.uniform(-1, 1)))
            else:
                node_to_expand.left = Node(random.choice(TERMINAL_SET))
            if random.random() < 0.5:
                node_to_expand.right = Node(str(random.uniform(-1, 1)))
            else:
                node_to_expand.right = Node(random.choice(TERMINAL_SET))
            
    return prune_tree(individual, max_depth)


def collapse_mutation(parent, max_depth=None):
    # Collapse Mutation: Replaces a function node with a leaf
    individual = parent.copy()
    all_nodes = individual.get_all_subtrees()
    
    internal_nodes = [n for n in all_nodes if not n.is_leaf()]
    if internal_nodes:
        node_to_collapse = random.choice(internal_nodes)
        node_to_collapse.value = random.choice(TERMINAL_SET)
        node_to_collapse.left = None
        node_to_collapse.right = None
        
    return individual


# Funzione per mutare un albero con uno dei 5 metodi
def mutate(parent: Node, max_depth) -> Node:
    mutation_functions = [
        subtree_mutation, 
        point_mutation, 
        hoist_mutation, 
        expansion_mutation, 
        collapse_mutation
    ]
    
    # Scegli una funzione di mutazione a caso e applicala
    chosen_mutation = random.choice(mutation_functions)
    return chosen_mutation(parent, max_depth)

## 7. Symbolic Regression

In [None]:

def genetic_programming(pop_size, max_generations, max_depth, xover_rate, mut_rate, limit_impr, elite_rate, imm_rate):
    population = generate_population(pop_size, max_depth, x_train, y_train)
    
    best_individual_ever = None
    best_fitness_ever = np.finfo(np.float64).max
    no_improvement_count = 0

    elite_size = int(pop_size * elite_rate)
    num_immigrants = int(pop_size * imm_rate)
    
    for gen in range(max_generations):
        new_population = []
        fitness_scores = []
        pure_scores = []

        for ind in population:
            mse_score, raw_mse = calculate_fitness(ind, x_train, y_train)
            fitness_scores.append(mse_score)
            pure_scores.append(raw_mse)

        # retian only sorted tree individuals, not the corresponding fitness
        sorted_population = [ind for _, ind in sorted(zip(fitness_scores, population), key=lambda pair: pair[0])]

        best_fitness_current_gen = sorted(pure_scores)[0]

        if best_fitness_current_gen < best_fitness_ever:
            best_fitness_ever = best_fitness_current_gen
            best_individual_ever = sorted_population[0].copy()
            no_improvement_count = 0
        else:
            no_improvement_count += 1
            
        print(f"Gen_{gen}: Best_MSE_so_far = {best_fitness_ever:.7f} | Best_Tree_so_far: {best_individual_ever}")

        if best_fitness_ever < 1e-10:
            break

        # Parent Selection: Over-selection
        overselection_group1 = sorted_population[:elite_size] # Elite individuals
        overselection_group2 = sorted_population[elite_size:] # Non-Elite individuals
        
        # Stagnation logic
        if no_improvement_count >= limit_impr:
            print(f"Stagnazione rilevata.")

            # 1. Retain N best individuals
            retained_size = 15
            elite_group = overselection_group1[:retained_size]
            new_population = [ind.copy() for ind in elite_group]

            # 2. Immigration for the rest of the population
            new_individuals_size = pop_size - len(new_population)
            new_population.extend(generate_population(new_individuals_size, max_depth, x_train, y_train))

            population = new_population
            no_improvement_count = 0
            continue

        # Survivor Selection: Elitism
        retained_size = int(len(overselection_group1) * 0.08)
        elite_group = overselection_group1[:retained_size]
        new_population = [ind.copy() for ind in elite_group]       

        while len(new_population) < (pop_size - num_immigrants):
            # Function to select a parent with Over-selection
            def select_parent():
                if random.random() < 0.8:
                    return random.choice(overselection_group1)
                else:
                    return random.choice(overselection_group2)

            # Select parents
            parent1 = select_parent()
            parent2 = select_parent()

            # Perform either mutation or crossover
            if random.random() < xover_rate:
                child1, child2 = subtree_crossover(parent1, parent2)
                # truncation after crossover
                child1 = prune_tree(child1, max_depth)
                child2 = prune_tree(child2, max_depth)
            else:
                child1 = parent1.copy()
                child2 = parent2.copy()
                
            if random.random() < mut_rate:
                child1 = mutate(child1, max_depth)
            elif random.random() < mut_rate:
                child2 = mutate(child2, max_depth)

            # Simplification after crossover or mutation
            child1 = simplify_tree(child1)
            child2 = simplify_tree(child2)

            # Child assessment and validation: Add your child only if his fitness is valid
            for child in [child1, child2]:
                if len(new_population) < (pop_size - num_immigrants):
                    fitness, _ = calculate_fitness(child, x_train, y_train)

                    if not np.isnan(fitness) and not np.isinf(fitness):
                        new_population.append(child)

        # Add IMMIGRANTS to the population to avoid getting stuck in local optima
        new_population.extend(generate_population(num_immigrants, max_depth, x_train, y_train))

        # Force mutation to all the best individuals every 10 generations to escape local optima
        if (gen + 1) % 25 == 0:
            print(f"Gen_{gen}: Applico mutazione all'intera popolazione...")
            
            for ind_to_mutate in elite_group:
                original_fitness = fitness_scores[population.index(ind_to_mutate)]
        
                func_mutate = random.choice([point_mutation, subtree_mutation])
                mutated_ind = func_mutate(ind_to_mutate, max_depth)
                mutated_ind = simplify_tree(mutated_ind)
                
                mutated_fitness, _ = calculate_fitness(mutated_ind, x_train, y_train)
                
                if not np.isnan(mutated_fitness) and not np.isinf(mutated_fitness) and mutated_fitness < original_fitness:
                    try:
                        idx_in_new_pop = new_population.index(ind_to_mutate)
                        new_population[idx_in_new_pop] = mutated_ind
                    except ValueError:
                        pass

        population = new_population
        
    return best_individual_ever, best_fitness_ever

## 8. Execution

In [None]:
# The R-squared value is a statistical metric that indicates the proportion of variance in the dependent variable that can be predicted 
# from the independent variable(s). R-squared measures how well a regression model fits the data.
# - A value of 1 means the model explains 100% of the variability, while 0 means it explains nothing.
def calculate_r_squared(y_true, y_pred):

    ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
    ss_res = np.sum((y_true - y_pred) ** 2)

    # dependent variable has no variability (it is costant!)
    if ss_tot == 0:
        # If the model predicts that constant perfectly
        if np.allclose(y_true, y_pred): # returns True if all elements are enough close with a certain tolerance
            r2 = 1.0
        # If the model does NOT predict correctly
        else:
            r2 = 0.0
    else:
        r2 = 1 - (ss_res / ss_tot)
    
    return r2

In [None]:
POP_SIZE = 1000
if INPUT_VARS == 6:
    MAX_DEPTH = 10
    MAX_GENERATIONS = 2000
else:
    MAX_DEPTH = INPUT_VARS * 2 + 1
    MAX_GENERATIONS = 3500
PROB_CROSSOVER = 0.8
PROB_MUTATION = 0.4
MAX_NO_IMPROVEMENT_GENS = 15
ELITE_RATE = 0.3
IMMIGRANT_RATIO = 0.15

if __name__ == "__main__":
    print(f"Start Genetic Programming with Population {POP_SIZE} and Depth {MAX_DEPTH}")
    best_solution, best_fitness = genetic_programming(
        POP_SIZE, 
        MAX_GENERATIONS, 
        MAX_DEPTH,
        PROB_CROSSOVER,
        PROB_MUTATION,
        MAX_NO_IMPROVEMENT_GENS,
        ELITE_RATE,
        IMMIGRANT_RATIO,
    )
    
    if best_solution:
        print("\n--- Final Results ---")
        
        # Calculate the prediction on the training set
        input_vars = {f'x{i}': x_train[:, i] for i in range(x_train.shape[1])}
        y_pred_train = best_solution.evaluate(input_vars).reshape(-1, 1)
        
        # Calculate the R-squared for the training set
        r2_train = calculate_r_squared(y_train, y_pred_train)

        print(f"Best expression found: {best_solution}")
        print(f"MSE on the training set: {best_fitness:.7f}")
        print(f"R-squared on the training set: {r2_train:.7f}")

        if TRAIN_SIZE != 1.0:
            # Calculate the prediction on the validation set
            input_vars = {f'x{i}': x_val[:, i] for i in range(x_val.shape[1])}
            y_pred_val = best_solution.evaluate(input_vars).reshape(-1, 1)

            # Calculate the R-squared for the validation set
            r2_val = calculate_r_squared(y_val, y_pred_val)

            print(f"MSE on the validation set: {calculate_fitness(best_solution, x_val, y_val)[1]:.7f}")
            print(f"R-squared on the validation set: {r2_val:.7f}") 