In [3041]:
import numpy as np
import random
from typing import List, Union, Callable, Optional, Tuple, Dict, Any
import time
import copy
import matplotlib.pyplot as plt
from icecream import ic
from tqdm.auto import tqdm
import sympy as sp
from sympy.parsing.sympy_parser import parse_expr
import re 

### Data Preparation

In [3042]:
def prepare_data(file_path):
    """
    Prepares the data for Symbolic Regression with Genetic Programming
    
    Args:
        file_path: Path to the .npz file containing the data
        
    Returns:
        X: Array of input features
        y: Array of target outputs
        config: Configuration for GP based on the data
    """
    # 1. Loading the data
    print(f"Loading data from {file_path}...")
    data = np.load(file_path)
    X = data['x']
    y = data['y']
    
    # Specific handling for data with transposed dimensions
    # If we have more features than samples, we probably need to transpose
    if X.ndim > 1 and X.shape[1] > X.shape[0] and y.shape[0] == X.shape[1]:
        print("Detected format with features on rows and samples on columns, transposition...")
        X = X.T  # Trasponiamo per avere i campioni sulle righe
    
    # Management of different formats
    if y.ndim > 1:
        if y.shape[0] == 1 or y.shape[1] == 1:  # If y is [1, n_samples] or [n_samples, 1]
            y = y.flatten()
            print(f"y transformed into form {y.shape}")
    
    # Make sure that X is 2D if multidimensional
    if X.ndim == 1:
        X = X.reshape(-1, 1)
        print(f"X rendered 2D with shape{X.shape}")
    
    print(f"Final form: X shape {X.shape}, y shape {y.shape}")
    
    # Check for consistency in the number of samples
    if X.shape[0] != len(y):
        raise ValueError(f"Inconsistent number of samples: X ha {X.shape[0]} campioni, y ne ha {len(y)}")
    
    
    # Determining the dimensionality of the input
    n_features = X.shape[1]
    print(f"Input {n_features}-dimensional with {X.shape[0]} samples")
    
    # Configuration for GP
    # We define the ser of variables base on dimensionality
    # We assume that the variables are named x[0], x[1], ..., x[n_features-1]
    variables = [f'x[{i}]' for i in range(n_features)]
    
    # We define the range of constants based on the data
    const_range = max(np.max(np.abs(X)), np.max(np.abs(y)))
    
    # Complete configuration for GP
    config = {
        'variables': variables,
        'n_features': n_features,
        'const_range': const_range,
        'y_stats': {
            'mean': float(np.mean(y)),
            'std': float(np.std(y)),
            'min': float(np.min(y)),
            'max': float(np.max(y))
        },
        'dataset_size': len(y)
    }
    
    return X, y, config

for i in range(0, 9):
    file_path = f"../data/problem_{i}.npz"
    X, y, config = prepare_data(file_path)
    print(f"\nGP Configuration: {config}")
    print("-" * 50)

Loading data from ../data/problem_0.npz...
Detected format with features on rows and samples on columns, transposition...
Final form: X shape (1000, 2), y shape (1000,)
Input 2-dimensional with 1000 samples

GP Configuration: {'variables': ['x[0]', 'x[1]'], 'n_features': 2, 'const_range': np.float64(3.23346517158693), 'y_stats': {'mean': 0.0740349656325135, 'std': 1.8421127520800509, 'min': -3.208666606823163, 'max': 3.23346517158693}, 'dataset_size': 1000}
--------------------------------------------------
Loading data from ../data/problem_1.npz...
Detected format with features on rows and samples on columns, transposition...
Final form: X shape (500, 1), y shape (500,)
Input 1-dimensional with 500 samples

GP Configuration: {'variables': ['x[0]'], 'n_features': 1, 'const_range': np.float64(0.9954328739932046), 'y_stats': {'mean': -0.028581340125470176, 'std': 0.5035248258112494, 'min': -0.8322110937504172, 'max': 0.8389945887188881}, 'dataset_size': 500}
-----------------------------

### Expression Representation

In [3043]:
class Node:
    """Base class for representing a node in the expression tree"""
    def __init__(self):
        self.depth = 0  # Node depth
    
    def evaluate(self, X: np.ndarray) -> np.ndarray:
        """Evaluates the node given an input X"""
        raise NotImplementedError("You must implement the evaluate method in the subclass")
    def copy(self) -> 'Node':
        """Copy current node"""
        raise NotImplementedError("You must implement the copy method in the subclass")
    
    def to_string(self) -> str:
        """Returns a string representation of the node"""
        raise NotImplementedError("You must implement the to_string method in the subclass")
    
    def get_complexity(self) -> int:
        """Returns the complexity of the node (number of nodes)"""
        return NotImplementedError("You must implement the get_complexity method in the subclass")
    
    def get_height(self) -> int:
        """Returns the height of the node"""
        return NotImplementedError("You must implement the get_height method in the subclass")
    
    def get_nodes(self) -> int:
        """Returns the total number of nodes in the tree"""
        return NotImplementedError("You must implement the get_nodes method in the subclass")
    

In [3044]:
class FunctionNode(Node):
    """Class to represent a function node in the expression tree"""
    def __init__(self, function: Callable, arity: int, symbol: str, children: List[Node] = None):
        super().__init__()
        self.function = function # Function (np) to be applied
        self.arity = arity # Number of arguments (children) the function takes
        self.symbol = symbol # Symbol to represent the function (e.g., '+', '-', '*', '/')
        self.children = children if children is not None else []

    def evaluate(self, X: np.ndarray)->np.ndarray:
        """Evaluate the function by applying it to the children's results"""
        #Evaluate the children
        args= [child.evaluate(X) for child in self.children]
        # Apply the function
        return self.function(*args)
    
    def copy(self) -> 'FunctionNode':
        """Creates a deep copy of the function node"""
        new_children = [child.copy() for child in self.children]
        new_node = FunctionNode(self.function, self.arity, self.symbol, new_children)
        new_node.depth = self.depth
        return new_node
    
    def to_string(self) -> str:
        """Returns a string representation of the function node"""
        if self.arity == 1:
            # Unary function
            return f"{self.symbol}({self.children[0].to_string()})"
        elif self.arity == 2:
            # Binary function (e.g. +, -, *, /)
            return f"({self.children[0].to_string()} {self.symbol} {self.children[1].to_string()})"
        else:
            # Functions with greater arity (although they should not be common)
            args = ", ".join(child.to_string() for child in self.children)
            return f"{self.symbol}({args})"
        
    def get_complexity(self) -> int:
        """Returns the complexity of the node (number of nodes)"""
        return 1 + sum(child.get_complexity() for child in self.children)

    def get_height(self) -> int:
        """Returns the height of the node"""
        return 1 + max((child.get_height() for child in self.children), default=0)

    def get_nodes(self) -> List[Node]:
        """Returns a list of all nodes in the tree"""
        nodes = [self]
        for child in self.children:
            nodes.extend(child.get_nodes())
        return nodes
    


In [3045]:
class TerminalNode(Node):
    """Represents an end node in the tree (variable or constant)"""
    
    def __init__(self, value, is_variable: bool = False, var_index: int = None):
        super().__init__()
        self.value = value
        self.is_variable = is_variable
        self.var_index = var_index  # only used if is_variable is True
    
    def evaluate(self, X: np.ndarray) -> np.ndarray:
        """Evaluate the terminal node"""
        if self.is_variable:
            # If it is a variable, we take the value from the input X
            if X.ndim == 1 and self.var_index == 0:
                return X  # special case for 1D 
            else:
                return X[:, self.var_index]
        else:
            # If it is a constant, we return the value (broadcast on all samples)
            return np.full(X.shape[0] if X.ndim > 1 else len(X), self.value)
    
    def copy(self) -> 'TerminalNode':
        """Creates a copy of the terminal node"""
        new_node = TerminalNode(self.value, self.is_variable, self.var_index)
        new_node.depth = self.depth
        return new_node
    
    def to_string(self) -> str:
        """Returns the string representation of the node"""
        if self.is_variable:
            return f"x[{self.var_index}]"
        else:
            return str(self.value)
    
    def get_complexity(self) -> int:
        """The complexity of a terminal node is 1"""
        return 1
    
    def get_height(self) -> int:
        """The height of an end node is 0"""
        return 0
    
    def get_nodes(self) -> List[Node]:
        """Returns a list containing only this node"""
        return [self]


In [3046]:
class ExpressionTree:
    """It represents a complete expression tree"""
    
    def __init__(self, root: Node):
        self.root = root
        self.update_node_depths()
        self.fitness = None
        self.adjusted_fitness = None  # for fitness sharing
        self.age = 0  # for age of the tree
    
    def evaluate(self, X: np.ndarray) -> np.ndarray:
        """Evaluates the expression tree on input data"""
        return self.root.evaluate(X)
    
    def copy(self) -> 'ExpressionTree':
        """Creates a deep copy of the tree"""
        new_tree = ExpressionTree(self.root.copy())
        new_tree.fitness = self.fitness
        new_tree.adjusted_fitness = self.adjusted_fitness
        new_tree.age = self.age
        return new_tree
    
    def to_string(self) -> str:
        """Returns the string representation of the tree"""
        return self.root.to_string()
    
    def get_complexity(self) -> int:
        """Returns the complexity of the tree"""
        return self.root.get_complexity()
    
    def get_height(self) -> int:
        """Returns the height of the tree"""
        return self.root.get_height()
    
    def get_nodes(self) -> List[Node]:
        """Returns a list of all nodes in the tree"""
        return self.root.get_nodes()
    
    def get_subtree_at_index(self, index: int) -> Node:
        """
        Returns the subtree at the node specified by the index
        Useful for crossover and mutation operations
        """
        nodes = self.get_nodes()
        if 0 <= index < len(nodes):
            return nodes[index]
        return None
    
    def replace_subtree_at_index(self, index: int, new_subtree: Node) -> bool:
        """
        Replaces the subtree at the node specified by the index
        Returns True if the operation is successful, False otherwise
        """
        nodes = self.get_nodes()
        if not (0 <= index < len(nodes)):
            return False
        
        target_node = nodes[index]
        
        # Special case: replacing the tree root
        if target_node == self.root:
            self.root = new_subtree
            self.update_node_depths()
            return True
        
        # Otherwise, we need to find the parent of the target node
        for node in nodes:
            if isinstance(node, FunctionNode):
                for i, child in enumerate(node.children):
                    if child == target_node:
                        node.children[i] = new_subtree
                        self.update_node_depths()
                        return True
        
        return False
    
    def update_node_depths(self):
        """Updates the depth of all nodes in the tree"""
        self._update_depth(self.root, 0)
    
    def _update_depth(self, node: Node, depth: int):
        """Recursive helper to update depth"""
        node.depth = depth
        if isinstance(node, FunctionNode):
            for child in node.children:
                self._update_depth(child, depth + 1)

### Function Set

In [3047]:
def safe_div(a: np.ndarray, b: np.ndarray) -> np.ndarray:
    """Protected division: returns a/b or 1 when b is close to zero"""
    return np.divide(a, b, out=np.ones_like(a), where=np.abs(b) > 1e-8)

def safe_log(a: np.ndarray) -> np.ndarray:
    """Protected logarithm: returns log(|a|) or 0 for a close to zero"""
    return np.log(np.abs(a), out=np.zeros_like(a), where=np.abs(a) > 1e-10)

def safe_sqrt(a: np.ndarray) -> np.ndarray:
    """Square root protected: returns sqrt(|a|)"""
    return np.sqrt(np.abs(a))

def safe_exp(a: np.ndarray) -> np.ndarray:
    """Protected exponential: limits input to avoid overflow"""
    return np.exp(np.clip(a, -200, 200))

def safe_sin(a: np.ndarray) -> np.ndarray:
    """Protected sin"""
    return np.sin(np.clip(a, -1000, 1000))

def safe_cos(a: np.ndarray) -> np.ndarray:
    """Protected cos"""
    return np.cos(np.clip(a, -1000, 1000))

def safe_tan(a: np.ndarray) -> np.ndarray:
    """Protected tangent: limits outputs to avoid extreme values"""
    return np.clip(np.tan(a), -200, 200)


In [3048]:
def create_function_set(use_trig: bool = True, use_exp_log: bool = True) -> List[Dict[str, Any]]:
    """
    Creates a set of functions to be used in the expression tree.
    
    Args:
        use_trig: Whether to include trigonometric functions.
        use_exp_log: Whether to include exponential and logarithmic functions.
        
    Returns:
        List of dictionaries, each containing:
            - function: the Python function to call.
            - arity: the number of arguments required.
            - symbol: the symbol for display.
            - weight: selection weight (relative probability).
    """
    # Basic arithmetic functions (always included)
    functions = [
        {'function': np.add, 'arity': 2, 'symbol': '+', 'weight': 1.0},
        {'function': np.subtract, 'arity': 2, 'symbol': '-', 'weight': 1.0},
        {'function': np.multiply, 'arity': 2, 'symbol': '*', 'weight': 1.0},
        {'function': safe_div, 'arity': 2, 'symbol': '/', 'weight': 0.7},  # Peso più basso per la divisione
    ]
    
    # Trigonometric functions (optional)
    if use_trig:
        functions.extend([
            {'function': safe_sin, 'arity': 1, 'symbol': 'sin', 'weight': 0.6},
            {'function': safe_cos, 'arity': 1, 'symbol': 'cos', 'weight': 0.6},
            {'function': safe_tan, 'arity': 1, 'symbol': 'tan', 'weight': 0.5},  # Peso più basso per la tangente
        ])
    
    # Exponential and logarithmic functions (optional)
    if use_exp_log:
        functions.extend([
            {'function': safe_exp, 'arity': 1, 'symbol': 'exp', 'weight': 0.4},
            {'function': safe_log, 'arity': 1, 'symbol': 'log', 'weight': 0.5},
            {'function': safe_sqrt, 'arity': 1, 'symbol': 'sqrt', 'weight': 0.6},
        ])
    
    return functions


### Terminal Set

In [3049]:
def create_variable_terminals(n_features: int, variable_weight: float = 1.0) -> List[Dict[str, Any]]:
    """
    Create terminals for input variables
    
    Args:
        n_features: Number of input variables
        variable_weight: Weight assigned to the variables
        
    Returns:
        List of dictionaries for variable terminals
    """
    return [
        {
            'is_variable': True, 
            'var_index': i, 
            'weight': variable_weight
        } for i in range(n_features)
    ]

In [3050]:
def create_constant_terminals(const_range: float, n_constants: int = 10, 
                             standard_weight: float = 0.3,
                             zero_weight: float = 0.5,
                             one_weight: float = 0.5,
                             minus_one_weight: float = 0.3,
                             pi_weight: float = 0.2,
                             e_weight: float = 0.2) -> List[Dict[str, Any]]:
    """
    Creates terminals for constants
    
    Args:
        const_range: Range for random constants
        n_constants: Number of pre-generated constants
        standard_weight: Weight for random constants
        zero_weight: Weight for the constant 0
        one_weight: Weight for the constant 1
        minus_one_weight: Weight for the constant -1
        pi_weight: Weight for the constant π
        e_weight: Weight for the constant e
        
    Returns:
        List of dictionaries for constant terminals
    """
    # Costanti fisse importanti
    fixed_constants = [
        {'is_variable': False, 'value': 1.0, 'weight': one_weight},
        {'is_variable': False, 'value': -1.0, 'weight': minus_one_weight},
        {'is_variable': False, 'value': np.pi, 'weight': pi_weight},
        {'is_variable': False, 'value': np.e, 'weight': e_weight},
    ]
    
    # Random constants pre-generated
    random_constants = [
        {
            'is_variable': False, 
            'value': random.uniform(-const_range, const_range) if abs(random.uniform(-const_range, const_range)) > 1e-8 else 1.0, # Avoid zero
            'weight': standard_weight
        } for _ in range(n_constants)
    ]
    
    return fixed_constants + random_constants


In [3051]:
def generate_ephemeral_constant(const_range: float) -> float:
    """
    Generates an ephemeral random constant
    avoiding values too close to zero
    """
    value = random.uniform(-const_range, const_range)
    # Avoid values too close to zero
    if abs(value) < 1e-8:
        if random.random() < 0.5:
            value = 1e-8
        else:
            value = -1e-8
    return value

In [3052]:
class GPConfig:
    """Class for managing the configuration of the GP algorithm"""
    
    def __init__(self, 
                 n_features: int,
                 const_range: float,
                 use_trig: bool = True,
                 use_exp_log: bool = True,
                 min_depth: int = 2,
                 max_depth: int = 6,
                 pop_size: int = 500,
                 generations: int = 50,
                 tournament_size: int = 5,
                 crossover_prob: float = 0.7,
                 mutation_prob: float = 0.2,
                 elitism_rate: float = 0.1,
                 max_tree_size: int = 50,
                 parsimony_coef: float = 0.01,
                function_weights: dict = None,
                terminal_weights: dict = None):
        
        # Function Set Configuration and Terminals
        self.function_set = create_function_set(use_trig, use_exp_log)
        self.variable_terminals = create_variable_terminals(n_features)
        self.constant_terminals = create_constant_terminals(const_range)
        
        # Apply any custom weights to functions
        if function_weights:
            self._apply_function_weights(function_weights)
            
        # Apply any customised weights to the terminals
        if terminal_weights:
            self._apply_terminal_weights(terminal_weights)



        # Calculates cumulative weights for weighted selection
        self._calculate_weights()
        
        # Tree size limits
        self.min_depth = min_depth
        self.max_depth = max_depth
        self.max_tree_size = max_tree_size
        
        # Parameters of the evolutionary algorithm
        self.pop_size = pop_size
        self.generations = generations
        self.tournament_size = tournament_size
        self.crossover_prob = crossover_prob
        self.mutation_prob = mutation_prob
        self.elitism_rate = elitism_rate
        
        # Bloat control and diversity maintenance
        self.parsimony_coef = parsimony_coef  # penalty for complexity
        
        # Other parameters
        self.n_features = n_features
        self.const_range = const_range


    def _apply_function_weights(self, function_weights):
        """Apply custom weights to functions"""
        for func in self.function_set:
            symbol = func['symbol']
            if symbol in function_weights:
                func['weight'] = function_weights[symbol]
                print(f"Personalised weight for {symbol}: {func['weight']}")
    
    def _apply_terminal_weights(self, terminal_weights):
        """Apply custom weights to terminals"""
        # For variables
        if 'variables' in terminal_weights:
            for var in self.variable_terminals:
                var['weight'] = terminal_weights['variables']
                
        
        if 'constants' in terminal_weights:
            for const in self.constant_terminals:
                if isinstance(const['value'], float) and not (const['value'] in [1.0, -1.0, np.pi, np.e]):
                    const['weight'] = terminal_weights['constants']
                    
        if 'one' in terminal_weights:
            for const in self.constant_terminals:
                if const['value'] == 1.0:
                    const['weight'] = terminal_weights['one']
                    
        if 'minus_one' in terminal_weights:
            for const in self.constant_terminals:
                if const['value'] == -1.0:
                    const['weight'] = terminal_weights['minus_one']
        
        if 'pi' in terminal_weights:
            for const in self.constant_terminals:
                if const['value'] == np.pi:
                    const['weight'] = terminal_weights['pi']
                    
        if 'e' in terminal_weights:
            for const in self.constant_terminals:
                if const['value'] == np.e:
                    const['weight'] = terminal_weights['e']
        
                


    def _calculate_weights(self):
        """Calculates cumulative weights for weighted selection of functions and terminals"""
        # Cumulative weights for functions
        cum_weight = 0
        self.function_cumulative_weights  = []
        for func in self.function_set:
            cum_weight += func['weight']
            self.function_cumulative_weights.append(cum_weight)
        
        # Normalise weights
        if cum_weight > 0:
            self.function_cumulative_weights  = [w / cum_weight for w in self.function_cumulative_weights]
        
        # Cumulative weights for variable terminalsi
        cum_weight = 0
        self.variable_weights = []
        for var in self.variable_terminals:
            cum_weight += var['weight']
            self.variable_weights.append(cum_weight)
        
        # Normalise weights
        if cum_weight > 0:
            self.variable_weights = [w / cum_weight for w in self.variable_weights]
        
        # Cumulative weights for constant terminals
        cum_weight = 0
        self.constant_weights = []
        for const in self.constant_terminals:
            cum_weight += const['weight']
            self.constant_weights.append(cum_weight)
        
        # Normalise weights
        if cum_weight > 0:
            self.constant_weights = [w / cum_weight for w in self.constant_weights]
    
    def get_random_function(self) -> Dict[str, Any]:
        """Randomly selects a function from the set, based on weights"""
        r = random.random()
        for i, w in enumerate(self.function_cumulative_weights):
            if r <= w:
                return self.function_set[i]
        return self.function_set[-1]  # fallback
    
    def get_random_variable(self) -> Dict[str, Any]:
        """Randomly selects a terminal variable from the set, based on the weights"""
        if not self.variable_terminals:
            raise ValueError("No variables available")
        
        r = random.random()
        for i, w in enumerate(self.variable_weights):
            if r <= w:
                return self.variable_terminals[i]
        return self.variable_terminals[-1]  # fallback
    
    def get_random_constant(self) -> Dict[str, Any]:
        """Randomly selects a terminal constant from the set, based on the weights"""
        if not self.constant_terminals:
            # Generates a new ephemeral constant if no pre-defined constants are available
            return {'is_variable': False, 'value': generate_ephemeral_constant(self.const_range)}
        
        # Occasionally generates a new efimera constant instead of using a default one
        if random.random() < 0.3:  # 30% probability of generating a new constant
            return {'is_variable': False, 'value': generate_ephemeral_constant(self.const_range)}
        
        # Otherwise, select from the default constants
        r = random.random()
        for i, w in enumerate(self.constant_weights):
            if r <= w:
                return self.constant_terminals[i]
        return self.constant_terminals[-1]  # fallback
    
    def get_random_terminal(self) -> Dict[str, Any]:
        """Randomly selects a terminal (variable or constant)"""
        # Probability of selecting a variable vs. a constant
        # We generally want to give more weight to the variable
        if random.random() < 0.7:  # 70% probability of selecting a variable
            try:
                return self.get_random_variable()
            except ValueError:
                return self.get_random_constant()
        else:
            return self.get_random_constant()
        
    def print_function_weights(self):
        """Print current function weights"""
        print("Function weights:")
        for func in self.function_set:
            print(f"  {func['symbol']}: {func['weight']:.2f}")
    
    def print_terminal_weights(self):
        """Print current terminal weights"""
        print("Variable terminal weights:")
        for var in self.variable_terminals:
            print(f"  x[{var['var_index']}]: {var['weight']:.2f}")
        
        print("Constant terminal weights:")
        for const in self.constant_terminals:
            value_str = f"{const['value']}" if not const['is_variable'] else f"x[{const['var_index']}]"
            print(f"  {value_str}: {const['weight']:.2f}")    

### Initial Population

In [3053]:
def grow_tree(config: GPConfig, max_depth: int, min_depth: int = 1, current_depth: int = 0) -> Node:
    """
    Grow' method for generating a tree with variable depth
    
    Args:
        config: GP configuration
        max_depth: Maximum depth of the tree
        min_depth: Minimum depth of the tree
        current_depth: Current depth of the node
        
    Returns:
        Root node of the generated tree
    """
    # If we are at the maximum depth, we can only create terminal nodes
    if current_depth >= max_depth:
        terminal_info = config.get_random_terminal()
        if terminal_info['is_variable']:
            return TerminalNode(None, is_variable=True, var_index=terminal_info['var_index'])
        else:
            return TerminalNode(terminal_info['value'], is_variable=False)
    
    # If we have not yet reached the minimum depth, we only create function nodes
    if current_depth < min_depth:
        function_info = config.get_random_function()
        children = [grow_tree(config, max_depth, min_depth, current_depth + 1) for _ in range(function_info['arity'])]
        return FunctionNode(function_info['function'], function_info['arity'], function_info['symbol'], children)
    
    # Otherwise, we randomly choose between functions and terminals
    if random.random() < 0.5:  # 50% probability for functions or terminals
        function_info = config.get_random_function()
        children = [grow_tree(config, max_depth, min_depth, current_depth + 1) for _ in range(function_info['arity'])]
        return FunctionNode(function_info['function'], function_info['arity'], function_info['symbol'], children)
    else:
        terminal_info = config.get_random_terminal()
        if terminal_info['is_variable']:
            return TerminalNode(None, is_variable=True, var_index=terminal_info['var_index'])
        else:
            return TerminalNode(terminal_info['value'], is_variable=False)

In [3054]:
def full_tree(config: GPConfig, max_depth: int, current_depth: int = 0) -> Node:
    """
    Full' method for generating a tree with all branches at the same depth
    
    Args:
        config: GP configuration
        max_depth: Maximum depth of the tree
        current_depth: Current depth of the node
        
    Returns:
        Root node of the generated tree
    """
    # If we are at the maximum depth, we can only create terminal nodes
    if current_depth >= max_depth:
        terminal_info = config.get_random_terminal()
        if terminal_info['is_variable']:
            return TerminalNode(None, is_variable=True, var_index=terminal_info['var_index'])
        else:
            return TerminalNode(terminal_info['value'], is_variable=False)
    
    # Otherwise, we create only function nodes
    function_info = config.get_random_function()
    children = [full_tree(config, max_depth, current_depth + 1) for _ in range(function_info['arity'])]
    return FunctionNode(function_info['function'], function_info['arity'], function_info['symbol'], children)


In [3055]:
def ramped_half_and_half(config: GPConfig, min_depth: int, max_depth: int) -> ExpressionTree:
    """
    Ramped half-and-half' initialisation method
    Combines grow and full for greater diversity
    
    Args:
        config: GP configuration
        min_depth: Minimum tree depth
        max_depth: Maximum depth of trees
        
    Returns:
        A new expression tree
    """
    # Choose a random depth between min_depth and max_depth
    depth = random.randint(min_depth, max_depth)
    
    # Choose randomly between ‘grow’ and ‘full’.
    if random.random() < 0.5:
        root = grow_tree(config, depth, min_depth)
    else:
        root = full_tree(config, depth)
    
    return ExpressionTree(root)

In [3056]:
def initialize_population(config: GPConfig) -> List[ExpressionTree]:
    """
    Creates the initial population of expression trees
    
    Args:
        config: GP configuration
        
    Returns:
        List of expression trees
    """
    population = []
    unique_expressions = set()  # For tracking unique expressions
    
    print(f"Population initialisation of  {config.pop_size} individuals...")
    start_time = time.time()
    
    # Generate individuals until the population is full
    while len(population) < config.pop_size:
        tree = ramped_half_and_half(config, config.min_depth, config.max_depth)
        
        # Check if the expression is unique to contribute to inizial diversity
        expr_str = tree.to_string()
        if expr_str not in unique_expressions:
            unique_expressions.add(expr_str)
            population.append(tree)
            
            # We update the status every 100 individuals
            if len(population) % 1000 == 0:
                elapsed = time.time() - start_time
                print(f"  Generated  {len(population)} individuals in  {elapsed:.2f} seconds")
    
    elapsed = time.time() - start_time
    print(f"Population initialised in {elapsed:.2f} seconds")
    
    # Initial population statistics
    heights = [tree.get_height() for tree in population]
    sizes = [tree.get_complexity() for tree in population]
    print(f"Initial population statistics:")
    print(f"  - Average height: {np.mean(heights):.2f} (min: {min(heights)}, max: {max(heights)})")
    print(f"  - Average size: {np.mean(sizes):.2f} nodes (min: {min(sizes)}, max: {max(sizes)})")
    
    return population

### Fitness Evaluation

In [3057]:
def calculate_fitness(tree: ExpressionTree, X: np.ndarray, y: np.ndarray, 
                      parsimony_coef: float = 0.001) -> float:
    """
    Calculates the fitness of an individual
    
    Args:
        tree: The expression tree to be evaluated
        X: Input features
        y: Output target
        parsimony_coef: Penalty coefficient for the complexity of the tree
        
    Returns:
        Fitness value (the lower the bett
    """
    try:
        # Evaluates the tree on input data        
        predictions = tree.evaluate(X)        # In the case of NaN or infinite values, it assigns a very high (bad) fitness
        if np.any(np.isnan(predictions)) or np.any(np.isinf(predictions)):
            return float('inf')
        
        # Calculate the mean square error (MSE)
        mse = np.mean((predictions - y) ** 2)
        # add a small penalty to encourage more complex solutions
        complexity = tree.get_complexity()
        complexity_penalty = 0.0
        if complexity < 1:
            complexity_penalty = parsimony_coef * (complexity - 2)
        
        # The final fitness is MSE + penalty (lower is better)
        fitness = mse + complexity_penalty
        return fitness
    
    except Exception as e:
        # In case of errors during the evaluation, it assigns a very high fitness
        print(f"Errore durante la valutazione: {e}")
        return float('inf')

In [3058]:
def evaluate_population(population: List[ExpressionTree], X: np.ndarray, y: np.ndarray, 
                       config: GPConfig) -> None:
    """
    Evaluates all individuals in the population
    
    Args:
        population: List of expression trees
        X: Input features
        y: Output target
        config: GP configuration
    """
    for i, tree in enumerate(population):
        tree.fitness = calculate_fitness(tree, X, y, config.parsimony_coef)

In [3059]:
def apply_fitness_sharing(population: List[ExpressionTree], sigma: float = 0.1, 
                         use_sympy: bool = True) -> None:
    """
    Apply fitness sharing to foster diversity in the population
    
    Args:
    population: List of expression trees already evaluated
    sigma: Radius of the sharing kernel
    use_sympy: Whether to use sympy to simplify expressions
    
    """
    n = len(population)
    
    # Pre-calculates expression strings (with sympy if required)
    expressions = []
    for tree in population:
        expr = tree.to_string()
        if use_sympy:
            try:
                #print("Tree Complexity: ", tree.get_complexity())
                expr = sympy_simplify_expression(expr)

            except Exception as e:
                # Fallback to basic simplification in the event of an error
                expr = simplify_expression(expr)
        else:
            expr = simplify_expression(expr)
        expressions.append(expr)
    
    # Calculate the sharing factors
    for i in range(n):
        sharing_factor = 1.0
        
        for j in range(n):
            if i != j:
                # If the expressions are identical after simplification,
                # the distance is 0 and the sharing factor is maximum
                if expressions[i] == expressions[j]:
                    sharing_factor += 1.0
                else:
                    # For different expressions, calculate a similarity measure
                    expr_i = expressions[i]
                    expr_j = expressions[j]
                    
                    # Jaccard similarity on terms
                    terms_i = set(re.findall(r'[\w\.]+|\[\d+\]', expr_i))
                    terms_j = set(re.findall(r'[\w\.]+|\[\d+\]', expr_j))
                    if terms_i and terms_j:
                        jaccard_terms = len(terms_i & terms_j) / len(terms_i | terms_j)
                    else:
                        jaccard_terms = 0
                    
                    # Structural similarity (based on operators)
                    ops_i = set(re.findall(r'[\+\-\*\/\^\(\)]|sin|cos|exp|log', expr_i))
                    ops_j = set(re.findall(r'[\+\-\*\/\^\(\)]|sin|cos|exp|log', expr_j))
                    if ops_i and ops_j:
                        structure_sim = len(ops_i & ops_j) / len(ops_i | ops_j)
                    else:
                        structure_sim = 0
                    
                    # Complexity similarity (based on the number of nodes)
                    len_i = len(expr_i)
                    len_j = len(expr_j)
                    len_ratio = min(len_i, len_j) / max(len_i, len_j) if max(len_i, len_j) > 0 else 1
                    
                    # Combines the three similarity measures with weights
                    similarity = (0.5 * jaccard_terms + 0.3 * structure_sim + 0.2 * len_ratio)
                    
                    # Apply the sharing formula only if similar enough
                    if similarity > 1 - sigma:
                        d = 1 - similarity
                        sharing_factor += max(0, 1 - (d/sigma)**2)
        
        # Limits the sharing factor
        sharing_factor = min(10.0, max(1.0, sharing_factor))
        
        # Adjust fitness
        if population[i].fitness != float('inf'):
            population[i].adjusted_fitness = population[i].fitness * sharing_factor
        else:
            population[i].adjusted_fitness = float('inf')

In [3060]:
def calculate_semantic_distance(tree1: ExpressionTree, tree2: ExpressionTree, 
                               X_sample: np.ndarray) -> float:
    """
    Calculates the semantic distance between two expression trees
    based on the behaviour on a data sample
    
    Args:
        tree1, tree2: Expression trees to compare
        X_sample: Input data sample to test behaviour
        
    Returns:
        Semantic distance (0 = identical behaviour)
    """
    # We evaluate trees on the data sample
    try:
        output1 = tree1.evaluate(X_sample)
        output2 = tree2.evaluate(X_sample)
        
        # Distance calculation (mean square error)
        if np.any(np.isnan(output1)) or np.any(np.isinf(output1)) or \
           np.any(np.isnan(output2)) or np.any(np.isinf(output2)):
            return float('inf')
        
        # Normalisation of outputs for fairer comparison
        if np.std(output1) > 0 and np.std(output2) > 0:
            output1 = (output1 - np.mean(output1)) / np.std(output1)
            output2 = (output2 - np.mean(output2)) / np.std(output2)
        
        # Calculate distance
        return np.mean((output1 - output2) ** 2)
    
    except Exception:
        # In case of errors, we consider the trees far apart
        return float('inf')

### Selection Method

In [3061]:
def tournament_selection(population: List[ExpressionTree], tournament_size: int, 
                         use_adjusted_fitness: bool = False) -> ExpressionTree:
    """
    Select an individual by tournament selection
    
    Args:
        population: List of expression trees
        tournament_size: Number of tournament participants
        use_adjusted_fitness: Whether to use diversity-adjusted fitness
        
    Returns:
        The selected individual
    """
    # Randomly selects tournament_size individuals from the population
    contestants = random.sample(population, min(tournament_size, len(population)))
    
    # Find the individual with the best (lowest) fitness
    if use_adjusted_fitness:
        # Use fitness adjusted for diversity, if available
        best = min(contestants, key=lambda x: float('inf') if x.adjusted_fitness is None else x.adjusted_fitness)
    else:
        best = min(contestants, key=lambda x: float('inf') if x.fitness is None else x.fitness)
    
    return best


In [3062]:
def age_weighted_selection(population: List[ExpressionTree], 
                          max_age: int = 10, 
                          young_advantage: float = 0.3) -> ExpressionTree:
    """
    Selection favouring younger individuals (with less age)
    
    Args:
        population: List of expression trees
        max_age: Maximum age considered for advantage
        young_advantage: Percentage advantage for young individuals
        
    Returns:
        The selected individual
    """
    # Calculates weights based on age
    age_weights = [max(0.1, 1.0 - (tree.age / max_age) * young_advantage) 
                  for tree in population]
    
    # Normalise weights
    total_weight = sum(age_weights)
    if total_weight > 0:
        norm_weights = [w / total_weight for w in age_weights]
    else:
        norm_weights = [1.0 / len(population)] * len(population)
    
    # Weighted selection
    return random.choices(population, weights=norm_weights, k=1)[0]


In [3063]:
def select_parents(population: List[ExpressionTree], config: GPConfig, 
                  X_sample: np.ndarray) -> Tuple[ExpressionTree, ExpressionTree]:
    """
    Select two parents from the population
    
    Args:
        population: List of expression trees
        config: GP configuration
        X_sample: Data sample to calculate semantic diversity
        
    Returns:
        Parent pair
    """
    # We randomly choose the selection method
    selection_r = random.random()
    
    if selection_r < 0.9:  # 90% chance of using the standard tournament with adjusted fitness
        parent1 = tournament_selection(population, config.tournament_size, use_adjusted_fitness=True)
        parent2 = tournament_selection(population, config.tournament_size, use_adjusted_fitness=True)
    else:  # 10% probability of using age-based selection
        parent1 = age_weighted_selection(population)
        parent2 = age_weighted_selection(population)
    
    # Make sure the parents are different
    attempts = 0
    while parent1 == parent2 and attempts < 5:
        parent2 = tournament_selection(population, config.tournament_size)
        attempts += 1
    
    return parent1, parent2

### Genetic Operator

In [3064]:
def subtree_crossover(parent1: ExpressionTree, parent2: ExpressionTree, 
                     max_tries: int = 5, max_depth: int = 10) -> Tuple[ExpressionTree, ExpressionTree]:
    """
    Crossover for trees: exchanging subtrees between parents
    
    Args:
        parent1, parent2: Parent trees
        max_tries: Maximum number of attempts to find valid crossover points
        max_depth: Maximum depth allowed for the resulting tree
        
    Returns:
        Two child trees generated by the crossover
    """
    # We create copies of parents
    child1 = parent1.copy()
    child2 = parent2.copy()
    
    # Get all nodes in trees
    nodes1 = child1.get_nodes()
    nodes2 = child2.get_nodes()
    
    if not nodes1 or not nodes2:
        return child1, child2  # We cannot crossover if one of the trees is empty
    
    # Attempts crossover a limited number of times
    for _ in range(max_tries):
        # Randomly choose crossover points
        crossover_point1 = random.randrange(len(nodes1))
        crossover_point2 = random.randrange(len(nodes2))
        
        # Obtain the subtrees to be exchanged
        subtree1 = nodes1[crossover_point1]
        subtree2 = nodes2[crossover_point2]
        
        # Create copies of the subtrees to avoid modifying the originals
        subtree1_copy = subtree1.copy()
        subtree2_copy = subtree2.copy()
        
        # Replace the subtrees in the children
        child1.replace_subtree_at_index(crossover_point1, subtree2_copy)
        child2.replace_subtree_at_index(crossover_point2, subtree1_copy)
        
        # Update the depths of the nodes in the children
        child1.update_node_depths()
        child2.update_node_depths()
        
        # Check if the resulting trees are within the allowed depth
        if child1.get_height() <= max_depth and child2.get_height() <= max_depth:
            break
        else:
            # Restore children from parental copies
            child1 = parent1.copy()
            child2 = parent2.copy()
            nodes1 = child1.get_nodes()
            nodes2 = child2.get_nodes()
    
    # Increase age
    child1.age = 0
    child2.age = 0
    
    return child1, child2

In [3065]:
def subtree_mutation(tree: ExpressionTree, config: GPConfig, 
                    max_depth: int = 10) -> ExpressionTree:
    """
    Subtree mutation: replaces a random subtree with a new one
    
    Args:
        tree: Tree to be mutated
        config: GP configuration
        max_depth: Maximum depth allowed for the resulting tree
        
    Returns:
        Shaft mutated
    """
    # Create a copy of the tree
    mutated = tree.copy()
    
    # Get all nodes in the tree
    nodes = mutated.get_nodes()
    
    if not nodes:
        return mutated  # We cannot mutate an empty tree
    
    # Randomly select a mutation point
    mutation_point = random.randrange(len(nodes))
    
    # Calculate the maximum depth for the new subtree
    node_depth = nodes[mutation_point].depth
    remaining_depth = max_depth - node_depth
    
    if remaining_depth < 1:
        return mutated  # We cannot change if there is no room to grow
    
    # Generates a new random subtree
    new_subtree = grow_tree(config, remaining_depth, min_depth=1)
    
    # Replace the subtree
    mutated.replace_subtree_at_index(mutation_point, new_subtree)
    
    # Updates node depths
    mutated.update_node_depths()
    
    # Reset age
    mutated.age = 0
    
    return mutated

In [3066]:
def point_mutation(tree: ExpressionTree, config: GPConfig) -> ExpressionTree:
    """
    Point mutation: changes a single node while maintaining the tree structure
    
    Args:
        tree: Tree to be mutated
        config: GP configuration
        
    Returns:
        Tree mutated
    """
    # Create a copy of the tree
    mutated = tree.copy()
    
    # Obtain all nodes in the tree
    nodes = mutated.get_nodes()
    
    if not nodes:
        return mutated  # We cannot mutate an empty tree
    
    # Randomly select a mutation point
    mutation_point = random.randrange(len(nodes))
    node = nodes[mutation_point]
    
    # Mutation based on node type
    if isinstance(node, FunctionNode):
        # Replace with another function of the same arity
        compatible_functions = [f for f in config.function_set if f['arity'] == node.arity]
        if compatible_functions:
            function_info = random.choice(compatible_functions)
            new_node = FunctionNode(function_info['function'], 
                                   function_info['arity'], 
                                   function_info['symbol'],
                                   node.children.copy())  # reuses the same children
            
            # Replace the node
            mutated.replace_subtree_at_index(mutation_point, new_node)
    
    elif isinstance(node, TerminalNode):
        if node.is_variable:
            # Replace with another variable
            if len(config.variable_terminals) > 1:
                terminal_info = config.get_random_variable()
                while terminal_info['var_index'] == node.var_index:
                    terminal_info = config.get_random_variable()
                
                new_node = TerminalNode(None, True, terminal_info['var_index'])
                mutated.replace_subtree_at_index(mutation_point, new_node)
        else:
            # We could replace it with another constant or slightly modify the valu
            if random.random() < 0.5:  # 50% probability of changing the value
                # Change existing value (small perturbation)
                new_value = node.value * (1.0 + random.uniform(-0.1, 0.1))
                new_node = TerminalNode(new_value, False)
                #Avoid zero values
                if abs(new_value) < 1e-8:
                    new_value = 1e-8 if new_value >= 0 else -1e-8
            else:
                # Replace with a new constant
                terminal_info = config.get_random_constant()
                new_node = TerminalNode(terminal_info['value'], False)
            
            mutated.replace_subtree_at_index(mutation_point, new_node)
    
    # Updates node depths
    mutated.update_node_depths()
    
    # Reset age
    mutated.age = 0
    
    return mutated


In [3067]:
def deterministic_crowding(parent1: ExpressionTree, parent2: ExpressionTree,
                          child1: ExpressionTree, child2: ExpressionTree,
                          X: np.ndarray, y: np.ndarray, config: GPConfig) -> List[ExpressionTree]:
    """
    Deterministic crowding: children replace parents only if they have better fitness
    
    Args:
        parent1, parent2: Parents
        child1, child2: Children generated by parents
        X, y: Data for evaluation
        config: GP configuration
        
    Returns:
        List of selected individuals
    """
    # Calculate children's fitness
    child1.fitness = calculate_fitness(child1, X, y, config.parsimony_coef)
    child2.fitness = calculate_fitness(child2, X, y, config.parsimony_coef)
    
    # Apply fitness sharing to parents and children
    # We can create a small temporary population with children to calculate their adjusted_fitness
    temp_pop = [parent1, parent2, child1, child2]
    apply_fitness_sharing(temp_pop)

    # Calculates similarities between parents and children
    X_sample = X[:min(len(X), 100)]  # Use a sample for efficiency
    
    dist_p1c1 = calculate_semantic_distance(parent1, child1, X_sample)
    dist_p1c2 = calculate_semantic_distance(parent1, child2, X_sample)
    
    # Decide which parent-child pairings to compare
    if dist_p1c1 <= dist_p1c2:
        # parent1 vs child1, parent2 vs child2
        competition1 = (parent1, child1)
        competition2 = (parent2, child2)
    else:
        # parent1 vs child2, parent2 vs child1
        competition1 = (parent1, child2)
        competition2 = (parent2, child1)
    
    result = []
    
    # First competition
    if competition1[1].adjusted_fitness <= competition1[0].adjusted_fitness:
        result.append(competition1[1])  # the child wins
    else:
        result.append(competition1[0])  # the parent wins
    
    # Second competition
    if competition2[1].adjusted_fitness <= competition2[0].adjusted_fitness:
        result.append(competition2[1])  # The child wins
    else:
        result.append(competition2[0])  # The parent wins
    
    return result

###  Genetic Operator

In [3068]:
def apply_genetic_operators(population: List[ExpressionTree], X: np.ndarray, y: np.ndarray, 
                           config: GPConfig) -> List[ExpressionTree]:
    """
    Apply genetic operators to create a new population
    
    Args:
        population: List of expression trees
        X, y: Evaluation data
        config: GP configuration
        
    Returns:
        New population
    """
    # Sort population by fitness (best first)
    sorted_population = sorted(population, key=lambda x: float('inf') if x.adjusted_fitness is None else x.adjusted_fitness)
    
    # Number of individuals to be selected by elitism
    n_elite = int(config.pop_size * config.elitism_rate)
    
    # Sample to calculate semantic diversity
    X_sample = X[:min(len(X), 100)]  # Use a sample for efficiency
    
    # Select elites
    new_population = [tree.copy() for tree in sorted_population[:n_elite]]
    
    # Increase the age of each elite individual
    for tree in new_population:
        tree.age += 1
    
    # Complete the population with new individual
    while len(new_population) < config.pop_size:
        # Select genetic operation (crossover or mutation)
        op_choice = random.random()
        
        if op_choice < config.crossover_prob:
            # Crossover
            parent1, parent2 = select_parents(population, config, X_sample)
            child1, child2 = subtree_crossover(parent1, parent2, max_depth=config.max_depth)
            
            # Use deterministic crowding to decide which individuals to keep
            selected = deterministic_crowding(parent1, parent2, child1, child2, X, y, config)
            
            # Add to new population
            new_population.extend(selected)
            if len(new_population) > config.pop_size:
                new_population = new_population[:config.pop_size]
        
        elif op_choice < config.crossover_prob + config.mutation_prob:
            # Mutation
            parent = tournament_selection(population, config.tournament_size)
            
            # Rnandomly choose between subtree mutation and point mutation
            mutation_choice = random.random()
            
            if mutation_choice < 0.7:  # 70% subtree mutation
                child = subtree_mutation(parent, config, max_depth=config.max_depth)
            else:  # 30% point mutation
                child = point_mutation(parent, config)
            
            # Calculate fitness of the child
            child.fitness = calculate_fitness(child, X, y, config.parsimony_coef)
            temp_pop = [parent, child]
            apply_fitness_sharing(temp_pop)
            # Compare parent and child
            if child.adjusted_fitness <= parent.adjusted_fitness:
                new_population.append(child)
            else:
                # Still add the son with some probability
                if random.random() < 0.1:  # 10% probability
                    new_population.append(child)
                else:
                    new_population.append(parent.copy())
        
        else:
            # Playback (direct copy)
            parent = tournament_selection(population, config.tournament_size)
            offspring = parent.copy()
            offspring.age += 1  # Increase age
            new_population.append(offspring)
    
    # Make sure the population is exactly the right size
    if len(new_population) > config.pop_size:
        new_population = new_population[:config.pop_size]
    
    # Fitness adjusted for diversity
    apply_fitness_sharing(new_population)
    
    return new_population

### Diversity Maintenance

In [3069]:
class Island:
    """Representing an island in the Island Model"""
    
    def __init__(self, population: List[ExpressionTree], config: GPConfig, island_id: int):
        self.population = population
        self.config = config
        self.id = island_id
        self.best_individual = None
        self.best_fitness = float('inf')
        self.generations_without_improvement = 0
    
    def evolve(self, X: np.ndarray, y: np.ndarray, generation: int) -> None:
        """
        Evolving the island for a generation
        
        Args:
            X, y: Training data
            generation: Number of the current generatione
        """
        # Apply genetic operators
        self.population = apply_genetic_operators(self.population, X, y, self.config)
        
        # Update the best solution
        current_best = min(self.population, key=lambda x: float('inf') if x.adjusted_fitness is None else x.adjusted_fitness)
        if current_best.adjusted_fitness < self.best_fitness:
            self.best_individual = current_best.copy()
            self.best_fitness = current_best.adjusted_fitness
            self.generations_without_improvement = 0
        else:
            self.generations_without_improvement += 1
        
        # Progress log
        if generation % 100 == 0:  # Every 100 generations
            print(f"Island {self.id} | Generation {generation} | Best Fitness: {self.best_fitness}")

In [3070]:
def initialize_islands(total_population: List[ExpressionTree], config: GPConfig, n_islands: int = 3) -> List[Island]:
    """
    Initialise islands by dividing the population
    
    Args:
        total_population: Full list of expression trees
        config: GP configuration
        n_islands: Number of islands to create
        
    Returns:
        List of Island objects
    """
    islands = []
    # Randomly mixing the population
    shuffled_population = random.sample(total_population, len(total_population))
    
    # Calculate the size of each island
    island_size = len(shuffled_population ) // n_islands
    
    # Distribute the population among the islands
    for i in range(n_islands):
        start_idx = i * island_size
        end_idx = start_idx + island_size if i < n_islands - 1 else len(shuffled_population)
        island_population = shuffled_population[start_idx:end_idx]
        
        # Create island-specific configuration
        island_config = copy.deepcopy(config)
        island_config.pop_size = len(island_population)
        
        # Create isaland
        island = Island(island_population, island_config, i)
        islands.append(island)
    
    return islands


In [3071]:
def migration(islands: List[Island], migration_rate: float = 0.2, mutate_migrants: bool = True, mutation_strength: float = 1.5) -> None:
    """
    Allows migration of individuals between islands, with the possibility of mutating migrants
    
    Args:
        islands: List of Island objects
        migration_rate: Percentage of population migrating
        mutate_migrants: Whether to apply mutation to migrants
        mutation_strength: Factor amplifying the mutation probability
    """
    if len(islands) <= 1:
        return
    
    print("Performing inter-island migration...")
    
    for i, source_island in enumerate(islands):
        # Calculates the destination island (the next, or the first if it is the last)
        dest_idx = (i + 1) % len(islands)
        dest_island = islands[dest_idx]
        
        # Number of individuals to migrate
        n_migrants = max(1, int(source_island.config.pop_size * migration_rate))
        
        # Select migrants (half best, half random)
        n_best = n_migrants // 2
        n_random = n_migrants - n_best
        
        # Sort by fitness
        sorted_pop = sorted(source_island.population, 
                          key=lambda x: float('inf') if x.adjusted_fitness is None else x.adjusted_fitness)
        
        # Get the best
        migrants_best = [ind.copy() for ind in sorted_pop[:n_best]]
        
        # Take some random
        migrants_random = [ind.copy() for ind in random.sample(source_island.population, n_random)]
        
        migrants = migrants_best + migrants_random
        # Mutation of migrants if requested
        if mutate_migrants:
            for j, migrant in enumerate(migrants):
                # Apply mutation with increased probability
                if random.random() < source_island.config.mutation_prob * mutation_strength:
                    # Randomly choose the mutation type
                    mutation_choice = random.random()
                    
                    if mutation_choice < 0.7:  # 70% subtree mutation
                        migrants[j] = subtree_mutation(migrant, source_island.config, 
                                                     max_depth=source_island.config.max_depth)
                    else:  # 30% point mutation
                        migrants[j] = point_mutation(migrant, source_island.config)
        
        # Replace the worst in the destination island
        dest_sorted = sorted(dest_island.population, 
                           key=lambda x: float('inf') if x.adjusted_fitness is None else x.adjusted_fitness, 
                           reverse=True)  # decrescent order
        
        # Remove the worst individuals from the destination island
        for j in range(min(n_migrants, len(dest_sorted))):
            dest_island.population.remove(dest_sorted[j])
        
        # Add the migrants to the destination island
        dest_island.population.extend(migrants)
        
        print(f"  Migration: {n_migrants} individuals from island {i} to island {dest_idx}" + 
              (", with mutation" if mutate_migrants else ""))

### Bloat Control

In [3072]:
def apply_bloat_control(population: List[ExpressionTree], config: GPConfig) -> None:
    """
    Applies various bloat control techniques
    
    Args:
        population: List of expression trees
        config: GP configuration
    """
    # Check for oversized individuals
    oversized = [i for i, tree in enumerate(population) if tree.get_complexity() > config.max_tree_size]
    
    if oversized:
        print(f"Bloat control: {len(oversized)} individuals exceed the maximum size")
        
        for idx in oversized:
            # Attempt to reduce size by replacing sub-shafts with terminals
            tree = population[idx]
            nodes = tree.get_nodes()
            
            # Find non-terminal knots at greater depths
            non_terminal_nodes = [i for i, node in enumerate(nodes) 
                                if isinstance(node, FunctionNode) and node.depth > 2]
            
            if non_terminal_nodes:
                # Replace a random node with a terminal
                node_idx = random.choice(non_terminal_nodes)
                
                # Create a new terminal node
                terminal_info = config.get_random_terminal()
                if terminal_info['is_variable']:
                    new_node = TerminalNode(None, True, terminal_info['var_index'])
                else:
                    new_node = TerminalNode(terminal_info['value'], False)
                
                # Replace the subtree
                tree.replace_subtree_at_index(node_idx, new_node)
                tree.update_node_depths()
    
    # Apply lexicographic parsimony pressure
    # When two individuals have similar fitness, prefer the simpler one
    epsilon = 0.0000001  # threshold for considering similar fitness
    
    for i in range(len(population)):
        for j in range(i + 1, len(population)):
            tree_i = population[i]
            tree_j = population[j]
            
            # If the fitnesses are similar
            if abs(tree_i.adjusted_fitness - tree_j.adjusted_fitness) < epsilon:
                # Get complexity
                complexity_i = tree_i.get_complexity()
                complexity_j = tree_j.get_complexity()
                
                # If tree j is significantly more complex, penalise it
                if complexity_j > complexity_i * 1.5:
                    # Add a small fitness penalty
                    tree_j.adjusted_fitness += epsilon
                
                # If tree i is significantly more complex, penalise it
                elif complexity_i > complexity_j * 1.5:
                    # Add a small fitness penalty
                    tree_i.adjusted_fitness += epsilon

### Simplify Expression

In [3073]:
def simplify_expression(expression: str) -> str:
    """
    Simplifies a mathematical expression using basic algebraic rules.
    
    Args:
        expression: Expression as string
        
    Returns:
        Simplified expression
    """
    # For complete simplification you would need a library like sympy
    # This is an improved version that implements more simplification rules
    
    # Initial cleaning: removes excess spaces
    expression = expression.strip()
    
    # Liste di pattern di semplificazione in ordine di applicazione
    simplification_rules = [
        # Operations with 0
        (r'\(\s*0\.0+\s*\+\s*(.*?)\s*\)', r'(\1)'),  # (0.0 + x) -> (x)
        (r'\(\s*(.*?)\s*\+\s*0\.0+\s*\)', r'(\1)'),  # (x + 0.0) -> (x)
        (r'\(\s*0\.0+\s*\-\s*(.*?)\s*\)', r'(0.0 - \1)'),  # (0.0 - x) -> (0.0 - x) [keep]
        (r'\(\s*(.*?)\s*\-\s*0\.0+\s*\)', r'(\1)'),  # (x - 0.0) -> (x)
        (r'\(\s*(.*?)\s*\*\s*0\.0+\s*\)', r'0.0000'),  # (x * 0.0) -> 0.0
        (r'\(\s*0\.0+\s*\*\s*(.*?)\s*\)', r'0.0000'),  # (0.0 * x) -> 0.0
        (r'\(\s*0\.0+\s*\/\s*(.*?)\s*\)', r'0.0000'),  # (0.0 / x) -> 0.0
        (r'\(\s*(.*?)\s*\/\s*0\.0+\s*\)', r'inf'),  # (x / 0.0) -> inf [capture division by zero]
        
        # Operations with 1
        (r'\(\s*(.*?)\s*\*\s*1\.0+\s*\)', r'(\1)'),  # (x * 1.0) -> (x)
        (r'\(\s*1\.0+\s*\*\s*(.*?)\s*\)', r'(\1)'),  # (1.0 * x) -> (x)
        (r'\(\s*(.*?)\s*\/\s*1\.0+\s*\)', r'(\1)'),  # (x / 1.0) -> (x)
        (r'\(\s*1\.0+\s*\/\s*(.*?)\s*\)', r'(1.0 / \1)'),  # (1.0 / x) keep
        
        # Operations with himself
        (r'\(\s*(.*?)\s*\-\s*\1\s*\)', r'0.0000'),  # (x - x) -> 0.0
        (r'\(\s*(.*?)\s*\/\s*\1\s*\)', r'1.0000'),  # (x / x) -> 1.0 [dangerous for x=0, but we handle it in safe_div]
        
        # Functional simplifications
        (r'sin\(\s*0\.0+\s*\)', r'0.0000'),  # sin(0.0) -> 0.0
        (r'cos\(\s*0\.0+\s*\)', r'1.0000'),  # cos(0.0) -> 1.0
        (r'exp\(\s*0\.0+\s*\)', r'1.0000'),  # exp(0.0) -> 1.0
        (r'log\(\s*1\.0+\s*\)', r'0.0000'),  # log(1.0) -> 0.0
        
        # Nested operations
        (r'\(\(\s*(.*?)\s*\)\)', r'(\1)'),  # ((x)) -> (x) Remove double parentheses
        
        # More specific nested operations
        (r'\(\(([^()]*)\)\s*\+\s*\(([^()]*)\)\)', r'((\1) + (\2))'),  # ((a) + (b)) -> (a + b)
        (r'\(\(([^()]*)\)\s*\-\s*\(([^()]*)\)\)', r'((\1) - (\2))'),  # ((a) - (b)) -> (a - b)
        
        # Other algebraic simplifications
        (r'\(\s*(.*?)\s*\+\s*\(\s*\-\s*(.*?)\s*\)\s*\)', r'(\1 - \2)'),  # (a + (-b)) -> (a - b)
        (r'\(\s*(.*?)\s*\-\s*\(\s*\-\s*(.*?)\s*\)\s*\)', r'(\1 + \2)'),  # (a - (-b)) -> (a + b)
    ]
    
    # Final cleaning pattern (to be applied at the end)
    cleanup_rules = [
        # Removes outer brackets if possible
        (r'^\((.*)\)$', r'\1'),  # (x) -> x for the entire expression
        
        # Number Formatting
        (r'0\.0+', r'0.0'),  # 0.0000 -> 0.0
        (r'1\.0+', r'1.0'),  # 1.0000 -> 1.0
        
        # Removal of unnecessary brackets
        (r'\(\s*x\[(\d+)\]\s*\)', r'x[\1]'),  # (x[0]) -> x[0]
    ]
    
    # Applies simplification rules repeatedly
    prev_expr = ""
    while prev_expr != expression:
        prev_expr = expression
        for pattern, replacement in simplification_rules:
            expression = re.sub(pattern, replacement, expression)
    
    # Applies final cleaning rules
    for pattern, replacement in cleanup_rules:
        expression = re.sub(pattern, replacement, expression)
    
    return expression

In [3074]:
def sympy_simplify_expression(expression: str) -> str:
    """
    Simplifies an expression using the sympy library for symbolic calculation.
    
    Args:
        expression: Expression as string
        
    Returns:
        Simplified expression
    """
    try:
        #print(f"Expression: {expression}")
        # Prepare the expression for sympy (replacing x[0] with x_0, etc.).
        prepared_expr = re.sub(r'x\[(\d+)\]', r'x_\1', expression)
        
        #print(f"Prepared expression: {prepared_expr}")
        # Define Symbols
        symbol_names = set(re.findall(r'x_(\d+)', prepared_expr))
        symbols = {f'x_{i}': sp.Symbol(f'x_{i}', real=True) for i in symbol_names}  # Forza i simboli ad essere reali
        
        # Analyses and simplifies the expression
        parsed_expr = parse_expr(prepared_expr, local_dict=symbols)
        simplified = sp.expand(parsed_expr) #or expand 
        
        # Checks whether the expression contains complex numbers or special symbols such as zoo
        if "zoo" in str(simplified) or "I" in str(simplified) or "oo" in str(simplified):
            # Fallback to basic simplification if we obtain problematic results
            return simplify_expression(expression)
        #print(f"Sympy simplification: {simplified}")
        # Convert back to original format
        result = str(simplified)
        result = re.sub(r'x_(\d+)', r'x[\1]', result)
        return result
    except Exception as e:
        # Fallback to basic simplification if sympy is not available or there is an error
        print(f"Invalid expression for sympy: {expression}")
        print(f"Preapred expression: {prepared_expr}")
        print(f"Error in simplification sympy: {str(e)}")
        return simplify_expression(expression)

### Princial Algorithm

In [3075]:
def genetic_programming(X: np.ndarray, y: np.ndarray, config: GPConfig, 
                       # Parameters for the island model
                       use_islands: bool = False,
                       n_islands: int = 5, 
                       migration_interval: int = 10,
                       migration_rate: float = 0.1,
                       
                       # Parameters for bloat control
                       bloat_control_interval: int = 5,
                       
                       # Other parameters
                       early_stopping_generations: int = 20) -> ExpressionTree:
    """
    Main Genetic Programming Algorithm for Symbolic Regression
    
    Args:
        X: Input features
        y: Output target
        config: GP configuration
        
        # Parameters for the island model
        use_islands: Whether to use the island model
        n_islands: Number of islands (if use_islands is True)
        migration_interval: Interval of generations between migrations
        migration_rate: Percentage of population migrating
        
        # Parameters for bloat control
        bloat_control_interval: Interval of generations for bloat control
        
        # Other parameters
        early_stopping_generations: Number of generations without improvement before stopping
        
    Returns:
        Best expression tree found
    """
    start_time = time.time()
    print(f"Starting Genetic Programming for Symbolic Regression...")
    print(f"Configuration: pop_size={config.pop_size}, max_depth={config.max_depth}, "
          f"generations={config.generations}")
    if use_islands:
        print(f"Island model: {n_islands} islands, migration every {migration_interval} generations")
        config.print_function_weights()
    
    # Initialise the population
    initial_population = initialize_population(config)
    
    # Assess the initial population
    evaluate_population(initial_population, X, y, config)
    
    # Apply fitness sharing for diversity
    apply_fitness_sharing(initial_population) 
    
    # Initialise islands or single population
    if use_islands:
        islands = initialize_islands(initial_population, config, n_islands)
        best_individual = min([island.best_individual for island in islands if island.best_individual], 
                            key=lambda x: x.adjusted_fitness, default=None)
        best_fitness = float('inf') if best_individual is None else best_individual.adjusted_fitness
    else:
        population = initial_population
        best_individual = min(population, key=lambda x: float('inf') if x.adjusted_fitness is None else x.adjusted_fitness)
        best_fitness = best_individual.adjusted_fitness
    
    # Statistics for monitoring
    stats = {
        'best_fitness': [],
        'avg_fitness': [],
        'avg_size': [],
        'best_size': [],
        'diversity': []
    }
    
    # Principal loop of the algorithm
    generations_without_improvement = 0
    for generation in tqdm(range(config.generations)):
        generation_start = time.time()
        diversity = 0  
        
        if use_islands:
            # Evolve each island separately
            for island in islands:
                island.evolve(X, y, generation)
            
            # Collect all individuals for statistics and migration
            all_individuals = []
            for island in islands:
                all_individuals.extend(island.population)
            
            # Calculates diversity (number of unique expressions) once
            all_expressions = [hash(sympy_simplify_expression(tree.to_string())) for tree in all_individuals]
            unique_expressions = set(all_expressions)
            diversity = len(unique_expressions) / len(all_individuals)
            
            # Periodic migration
            if (generation + 1) % migration_interval == 0:
                # If diversity is low, it activates migrant mutation
                if diversity < 0.2:  # Arbitrary threshold to be adjusted
                    migration(islands, migration_rate=migration_rate, mutate_migrants=True, 
                             mutation_strength=1.5)
                else:
                    migration(islands, migration_rate=migration_rate, mutate_migrants=False)
            
            # Calculate the best overall individual
            current_best = min([island.best_individual for island in islands if island.best_individual], 
                             key=lambda x: x.adjusted_fitness)
            
            #  Periodic bloat control
            if generation % bloat_control_interval == 0:
                for island in islands:
                    apply_bloat_control(island.population, config)

            # Calculate other statistics
            avg_fitness = np.mean([tree.adjusted_fitness for tree in all_individuals if tree.adjusted_fitness != float('inf')])
            avg_size = np.mean([tree.get_complexity() for tree in all_individuals])
            
        else:
            # Apply genetic operators
            population = apply_genetic_operators(population, X, y, config)
                        
            # Periodic bloat control
            if generation % bloat_control_interval == 0:
                apply_bloat_control(population, config)
            
            # Calculate the best individual
            current_best = min(population, key=lambda x: float('inf') if x.adjusted_fitness is None else x.adjusted_fitness)
            
            # Calculate statistics
            avg_fitness = np.mean([tree.adjusted_fitness for tree in population if tree.adjusted_fitness != float('inf')])
            avg_size = np.mean([tree.get_complexity() for tree in population])
            
            # Calculate diversity (number of unique expressions)
            expressions = [hash(sympy_simplify_expression(tree.to_string())) for tree in population]
            unique_expressions = set(expressions)
            diversity = len(unique_expressions) / len(population)
        
        # Upgrade the best global individual
        if current_best.adjusted_fitness < best_fitness:
            best_individual = current_best.copy()
            best_fitness = current_best.adjusted_fitness
            generations_without_improvement = 0
            print(f"New best solution found:")
            print(f"  Expression: {best_individual.to_string()}")
            print(f"  Simplified Expression: {sympy_simplify_expression(best_individual.to_string())}")
            print(f"  Fitness: {best_fitness}")
            print(f"  Complexity: {best_individual.get_complexity()} nodes")
        else:
            generations_without_improvement += 1
        
        # Store statistics
        stats['best_fitness'].append(best_fitness)
        stats['avg_fitness'].append(avg_fitness)
        stats['avg_size'].append(avg_size)
        stats['best_size'].append(best_individual.get_complexity())
        stats['diversity'].append(diversity)
        
        # Generation log
        generation_time = time.time() - generation_start
        if generation % 5 == 0 or generation == config.generations - 1:
           print(f"Generazione {generation}, Best Fitness: {best_fitness}, Diversità: {diversity:.2f}, Tempo: {generation_time:.2f}s")
        
        # Early Termination Criterion
        if generations_without_improvement >= early_stopping_generations:
            print(f"Terminazione anticipata: nessun miglioramento nelle ultime {early_stopping_generations} generazioni")
            break
        #!!todo: si potrebbe pensare di ristaratare il tutto con una nuova popolazione ma con pesi sulle funzionie variabili e coistanti diverse
    
    total_time = time.time() - start_time
    print(f"Algorithm completed in {total_time:.2f} seconds")
    print(f"Best solution found:")
    print(f"  Simplified Expression: {sympy_simplify_expression(best_individual.to_string())}")
    print(f"  Expression: {best_individual.to_string()}")
    print(f"  Fitness: {best_fitness}")
    print(f"  Complexity: {best_individual.get_complexity()} nodes")
    
    # Visualizza statistiche
    plot_statistics(stats)
    
    return best_individual

### Termination AND Evaluetion

In [3076]:

def plot_statistics(stats: Dict) -> None:
    """
    View execution statistics
    
    Args:
        stats: Dictionary with collected statistics
    """
    fig, axs = plt.subplots(3, 1, figsize=(10, 12))
    
    # Fitness plot
    axs[0].plot(stats['best_fitness'], label='Best fitness')
    axs[0].plot(stats['avg_fitness'], label='Average fitness')
    axs[0].set_title('Evolution of fitness')
    axs[0].set_xlabel('Generation')
    axs[0].set_ylabel('Fitness')
    axs[0].legend()
    axs[0].grid(True)
    
    # Size plot
    axs[1].plot(stats['best_size'], label='Best individual size')
    axs[1].plot(stats['avg_size'], label='Average size')
    axs[1].set_title('Evolution of size')
    axs[1].set_xlabel('Generation')
    axs[1].set_ylabel('Numebr of nodes')
    axs[1].legend()
    axs[1].grid(True)
    
    # Diversity plot
    axs[2].plot(stats['diversity'], label='Diversity')
    axs[2].set_title('Evolution of diversity')
    axs[2].set_xlabel('Generation')
    axs[2].set_ylabel('Proportion of unique expressions')
    axs[2].legend()
    axs[2].grid(True)
    
    plt.tight_layout()
    plt.savefig('gp_statistics.png')
    plt.show()


def plot_predictions(tree: ExpressionTree, X: np.ndarray, y: np.ndarray, title: str) -> None:
    """
    View model predictions compared with actual data
    
    Args:
        tree: Expression tree
        X: Input features
        y: Output target
        title: Title of graph
    """
    # Calculate forecasts
    predictions = tree.evaluate(X)
    
    plt.figure(figsize=(10, 6))
    
    if X.ndim == 1 or (X.ndim == 2 and X.shape[1] == 1):
        # For 1D data, display predictions vs. actual data.
        x_plot = X.flatten() if X.ndim == 2 else X
        sort_idx = np.argsort(x_plot)
        x_plot = x_plot[sort_idx]
        y_plot = y[sort_idx]
        predictions = predictions[sort_idx]
        
        plt.scatter(x_plot, y_plot, alpha=0.5, label='Dati reali')
        plt.plot(x_plot, predictions, 'r-', linewidth=2, label='Modello GP')
        plt.xlabel('X')
        plt.ylabel('y')
    else:
        # For multidimensional data, display predictions vs. actuals.
        plt.scatter(y, predictions, alpha=0.5)
        plt.plot([min(y), max(y)], [min(y), max(y)], 'r--', linewidth=2)
        plt.xlabel('Target reale')
        plt.ylabel('Previsione')
    
    plt.title(title)
    plt.legend()
    plt.grid(True)
    plt.savefig('gp_predictions.png')
    plt.show()

### Configuration and Execution 

In [3077]:
def run_gp_on_problem(file_path: str, config_overrides: Dict = None,
    # Additional parameters
    function_weights: Dict = None,
    terminal_weights: Dict = None,
                      
    # Parameters for the island model
    use_islands: bool = False,
    n_islands: int = 5,
    migration_interval: int = 50,
    migration_rate: float = 0.1,
                      
    # Parameters for bloat control
    bloat_control_interval: int = 5,
                      
    # Other parameters
    early_stopping_generations: int = 100) -> ExpressionTree:
    """
    Runs the GP algorithm on a specific problem with various configurable options
    
    Args:
        file_path: Path to the problem file
        config_overrides: Overrides the default configuration
        
        # Parameters for function weights and terminals
        function_weights: Dictionary of custom weights for functions, ex: {'+': 1.5, '*': 0.8, 'sin': 0.3}
        terminal_weights: Dictionary of custom weights for terminals, ex: {'variables': 2.0, 'constants': 0.5}
        
        # Parameters for island model.
        use_islands: Whether to use the island model
        n_islands: Number of islands
        migration_interval: Interval of generations between migrations
        migration_rate: Percentage of population that migrates
  
        
        # Parameters for bloat control
        bloat_control_interval: Interval of generations for bloat control.
        
        # Other parameters
        early_stopping_generations: Number of generations without improvement before stopping
        
    Returns:
        Best expression tree found
    """
    # Upload and prepare data
    X, y, data_config = prepare_data(file_path)
    
    # Create the basic configuration
    config = GPConfig(
        n_features=data_config['n_features'],
        const_range=data_config['const_range'],
        use_trig=True,
        use_exp_log=True,
        min_depth=2,
        max_depth=6,
        pop_size=500,
        generations=50,
        tournament_size=100,
        crossover_prob=0.7,
        mutation_prob=0.05,
        elitism_rate=0.1,
        max_tree_size=50,
        parsimony_coef=0.01,
        function_weights=function_weights,
        terminal_weights=terminal_weights
    )
    
    # Apply overwrites to configuration
    if config_overrides:
        for key, value in config_overrides.items():
            if hasattr(config, key):
                setattr(config, key, value)
    
    # Run the algorithm
    print(f"\nGP execution on {file_path}...")
    best_tree = genetic_programming(X, y, config, 
        use_islands=use_islands,
        n_islands=n_islands,
        migration_interval=migration_interval,
        migration_rate=migration_rate,
        bloat_control_interval=bloat_control_interval,
        early_stopping_generations=early_stopping_generations,)
    
    # Visualize results
    plot_predictions(best_tree, X, y, f"Previsioni GP su {file_path}")
    
    # Simplify the best expression
    simplified_expr = sympy_simplify_expression(best_tree.to_string())
    print(f"Original expression: {best_tree.to_string()}")
    print(f"Simplified expression: {simplified_expr}")
    
    return best_tree


In [3078]:
problems = [

    {"file_path": "../data/problem_0.npz", 
     "config": {
         "max_depth": 8, 
         "pop_size": 5000, 
         "generations": 1000,
         "max_tree_size": 70
     },
     "use_islands": True,
     "n_islands": 5,
     "migration_interval": 100,
     "migration_rate": 0.1
    },
    

    {"file_path": "../data/problem_1.npz", 
     "config": {
         "max_depth": 8, 
         "pop_size": 10000, 
         "generations": 1000,
         "max_tree_size": 70
     },
     "use_islands": True,
     "n_islands": 5,
     "migration_interval": 100,
     "migration_rate": 0.1
    },
    

    {"file_path": "../data/problem_2.npz", 
     "config": {
         "max_depth": 8, 
         "pop_size": 10000, 
         "generations": 1000,
         "max_tree_size": 60
     },
     "use_islands": True,
     "n_islands": 5,
     "migration_interval": 80,
     "migration_rate": 0.12
    },
    
 
    {"file_path": "../data/problem_3.npz", 
     "config": {
         "max_depth": 8, 
         "pop_size": 10000, 
         "generations": 1000,
         "max_tree_size": 65,
     },
     "use_islands": True,
     "n_islands": 4,
     "migration_interval": 50,
     "migration_rate": 0.2
    },

    {"file_path": "../data/problem_4.npz", 
     "config": {
         "max_depth": 8, 
         "pop_size": 10000, 
         "generations": 1000,
         "max_tree_size": 100,
     },
     "use_islands": True,
     "n_islands": 5,  
     "migration_interval": 50,
     "migration_rate": 0.08
    },

    {"file_path": "../data/problem_5.npz", 
     "config": {
         "max_depth": 10,  
         "pop_size": 10000, 
         "generations": 1000,
         "max_tree_size": 80
     },
     "use_islands": True,
     "n_islands": 5,
     "migration_interval": 50,
     "migration_rate": 0.15
    },
    

    {"file_path": "../data/problem_6.npz", 
     "config": {
         "max_depth": 10,  
         "pop_size": 10000, 
         "generations": 1000,
         "max_tree_size": 40,
         "parsimony_coef": 0.05, 
     },
     "use_islands": True, 
     "early_stopping_generations": 50 
    },
    

    {"file_path": "../data/problem_7.npz", 
     "config": {
         "max_depth": 8, 
         "pop_size": 10000, 
         "generations": 1000,
         "max_tree_size": 60,
         "tournament_size": 100, 
         "elitism_rate": 0.2,    
     },
     "use_islands": True,
     "n_islands": 5,
     "migration_interval": 50,
     "migration_rate": 0.25  
    },
    
    {"file_path": "../data/problem_8.npz", 
     "config": {
         "max_depth": 8, 
         "pop_size": 10000, 
         "generations": 1000,
         "max_tree_size": 50,
     },
     "use_islands": True,
     "n_islands": 5,
     "migration_interval": 50,
     "migration_rate": 0.15
    }
]

In [None]:
print(f"\n=== Esecuzione GP su {problems[0]['file_path']} ===")
best_tree = run_gp_on_problem(
    problems[0]['file_path'], 
    problems[0]['config'],
    function_weights=problems[0]['config'].get('function_weights'),
    terminal_weights=problems[0]['config'].get('terminal_weights'),
    use_islands=problems[0].get('use_islands', False),
    n_islands=problems[0].get('n_islands', 5),
    migration_interval=problems[0].get('migration_interval', 10),
    migration_rate=problems[0].get('migration_rate', 0.1),
    early_stopping_generations=problems[0].get('early_stopping_generations', 100)
)


=== Esecuzione GP su ../data/problem_0.npz ===
Loading data from ../data/problem_0.npz...
Detected format with features on rows and samples on columns, transposition...
Final form: X shape (1000, 2), y shape (1000,)
Input 2-dimensional with 1000 samples

GP execution on ../data/problem_0.npz...
Starting Genetic Programming for Symbolic Regression...
Configuration: pop_size=5000, max_depth=8, generations=1000
Island model: 5 islands, migration every 100 generations
Function weights:
  +: 1.00
  -: 1.00
  *: 1.00
  /: 0.70
  sin: 0.60
  cos: 0.60
  tan: 0.50
  exp: 0.40
  log: 0.50
  sqrt: 0.60
Population initialisation of  5000 individuals...
  Generated  1000 individuals in  0.03 seconds
  Generated  2000 individuals in  0.07 seconds
  Generated  3000 individuals in  0.09 seconds
  Generated  4000 individuals in  0.12 seconds
  Generated  5000 individuals in  0.15 seconds
Population initialised in 0.15 seconds
Initial population statistics:
  - Average height: 4.40 (min: 2, max: 8)
  

In [None]:
print(f"\n=== Esecuzione GP su {problems[1]['file_path']} ===")
best_tree = run_gp_on_problem(
    problems[1]['file_path'], 
    problems[1]['config'],
    function_weights=problems[1]['config'].get('function_weights'),
    terminal_weights=problems[1]['config'].get('terminal_weights'),
    use_islands=problems[1].get('use_islands', False),
    n_islands=problems[1].get('n_islands', 5),
    migration_interval=problems[1].get('migration_interval', 10),
    migration_rate=problems[1].get('migration_rate', 0.1),
    early_stopping_generations=problems[1].get('early_stopping_generations', 100)
)

In [None]:
print(f"\n=== Esecuzione GP su {problems[2]['file_path']} ===")
best_tree = run_gp_on_problem(
    problems[2]['file_path'], 
    problems[2]['config'],
    function_weights=problems[2]['config'].get('function_weights'),
    terminal_weights=problems[2]['config'].get('terminal_weights'),
    use_islands=problems[2].get('use_islands', False),
    n_islands=problems[2].get('n_islands', 5),
    migration_interval=problems[2].get('migration_interval', 10),
    migration_rate=problems[2].get('migration_rate', 0.1),
    early_stopping_generations=problems[2].get('early_stopping_generations', 100)
)

In [None]:
print(f"\n=== Esecuzione GP su {problems[3]['file_path']} ===")
best_tree = run_gp_on_problem(
    problems[3]['file_path'], 
    problems[3]['config'],
    function_weights=problems[3]['config'].get('function_weights'),
    terminal_weights=problems[3]['config'].get('terminal_weights'),
    use_islands=problems[3].get('use_islands', False),
    n_islands=problems[3].get('n_islands', 5),
    migration_interval=problems[3].get('migration_interval', 10),
    migration_rate=problems[3].get('migration_rate', 0.1),
    early_stopping_generations=problems[3].get('early_stopping_generations', 100)
)

In [None]:
print(f"\n=== Esecuzione GP su {problems[4]['file_path']} ===")
best_tree = run_gp_on_problem(problems[4]['file_path'],
    problems[4]['config'],
    function_weights=problems[4]['config'].get('function_weights'),
    terminal_weights=problems[4]['config'].get('terminal_weights'),
    use_islands=problems[4].get('use_islands', False),
    n_islands=problems[4].get('n_islands', 5),
    migration_interval=problems[4].get('migration_interval', 10),
    migration_rate=problems[4].get('migration_rate', 0.1),
    early_stopping_generations=problems[4].get('early_stopping_generations', 100)
)

In [None]:
print(f"\n=== Esecuzione GP su {problems[5]['file_path']} ===")
best_tree = run_gp_on_problem(problems[5]['file_path'],
    problems[5]['config'],
    function_weights=problems[5]['config'].get('function_weights'),
    terminal_weights=problems[5]['config'].get('terminal_weights'),
    use_islands=problems[5].get('use_islands', False),
    n_islands=problems[5].get('n_islands', 5),
    migration_interval=problems[5].get('migration_interval', 10),
    migration_rate=problems[5].get('migration_rate', 0.1),
    early_stopping_generations=problems[5].get('early_stopping_generations', 100)
)

In [None]:
print(f"\n=== Esecuzione GP su {problems[6]['file_path']} ===")
best_tree = run_gp_on_problem(problems[6]['file_path'],
    problems[6]['config'],
    function_weights=problems[6]['config'].get('function_weights'),
    terminal_weights=problems[6]['config'].get('terminal_weights'),
    use_islands=problems[6].get('use_islands', False),
    n_islands=problems[6].get('n_islands', 5),
    migration_interval=problems[6].get('migration_interval', 10),
    migration_rate=problems[6].get('migration_rate', 0.1),
    early_stopping_generations=problems[6].get('early_stopping_generations', 100)
)

In [None]:
print(f"\n=== Esecuzione GP su {problems[7]['file_path']} ===")
best_tree = run_gp_on_problem(problems[7]['file_path'],
    problems[7]['config'],
    function_weights=problems[7]['config'].get('function_weights'),
    terminal_weights=problems[7]['config'].get('terminal_weights'),
    use_islands=problems[7].get('use_islands', False),
    n_islands=problems[7].get('n_islands', 5),
    migration_interval=problems[7].get('migration_interval', 10),
    migration_rate=problems[7].get('migration_rate', 0.1),
    early_stopping_generations=problems[7].get('early_stopping_generations', 100)
)

In [None]:
print(f"\n=== Esecuzione GP su {problems[8]['file_path']} ===")
best_tree = run_gp_on_problem(problems[8]['file_path'],
    problems[8]['config'],
    function_weights=problems[8]['config'].get('function_weights'),
    terminal_weights=problems[8]['config'].get('terminal_weights'),
    use_islands=problems[8].get('use_islands', False),
    n_islands=problems[8].get('n_islands', 5),
    migration_interval=problems[8].get('migration_interval', 10),
    migration_rate=problems[8].get('migration_rate', 0.1),
    early_stopping_generations=problems[8].get('early_stopping_generations', 100)
)  