In [None]:
import numpy as np
import random
import copy
import math

# Define constants, variables, and operators
constant_range = np.linspace(-1, 1, num=25)
CONSTANTS = list(constant_range)
UNARY_OPERATORS = ['sin', 'cos', 'tan', 'sqrt', 'log', 'exp', 'abs']
BINARY_OPERATORS = ['add', 'sub', 'mul', 'div', 'pow']

def safe_divide(a, b):
    """
    Safe division that returns 10^6 if the denominator is 0, else normal division.
    """
    if b == 0:
        return float(10**6)  # Return 10^6 (large penalty) when dividing by zero
    result = a / b
    return np.clip(result, -1000, 1000)

def safe_sqrt(a):
    """
    Safe square root function. Returns a large penalty if the input is negative.
    """
    if a < 0:
        return float(10**6)  # Large penalty for negative values
    result = np.sqrt(a)
    return np.clip(result, -1000, 1000)

def safe_log(a):
    """
    Safe natural log function. Returns a large penalty if the input is less than or equal to 0.
    """
    if a <= 0:
        return float(10**6)  # Large penalty for values <= 0
    result = np.log(a)
    return np.clip(result, -1000, 1000)

def safe_exp(x):
    """
    Safe exponential function. Clamps the result to avoid overflow issues.
    """
    x = np.clip(x, -700, 700)  # Limiting input to prevent overflow
    result = np.exp(x)
    return np.clip(result, -1000, 1000)

def safe_sin(x):
    """
    Safe sine function. Clips input to avoid excessively large values.
    """
    x = np.clip(x, -1000, 1000)  # Clip the input to avoid excessive values
    result = np.sin(x)
    return np.clip(result, -1000, 1000)

def safe_cos(x):
    """
    Safe cosine function. Clips input to avoid excessively large values.
    """
    x = np.clip(x, -1000, 1000)  # Clip the input to avoid excessive values
    result = np.cos(x)
    return np.clip(result, -1000, 1000)

def safe_tan_inv(x):
    """
    Safe inverse tangent function (arctangent).
    Ensures the result is within a manageable range.
    """
    result = np.arctan(x)
    return np.clip(result, -1000, 1000)

# The OPERATORS dictionary, now includes only the necessary operators
OPERATORS = {
    'sin': safe_sin,
    'cos': safe_cos,
    'sqrt': safe_sqrt,
    'log': safe_log,
    'exp': safe_exp,
    'abs': np.abs,
}

class Node:
    def __init__(self, value, is_operator=False):
        """
        Initialize a Node.
        
        :param value: The value of the node (operator, constant, or variable name).
        :param is_operator: True if the node represents an operator, otherwise False.
        """
        self.value = value
        self.is_operator = is_operator
        self.left = None  # Left child of the node (used for binary operators)
        self.right = None  # Right child of the node (used for binary operators)
        self.cached_value = None  # Cache for evaluation results to avoid redundant calculations

    def evaluate(self, variables, depth=0):
        """
        Evaluate the subtree rooted at this node.
        
        :param variables: A dictionary mapping variable names to their values.
        :param depth: The current recursion depth (for debugging and avoiding infinite recursion).
        :return: The result of evaluating the subtree.
        """
        if self.cached_value is not None:
            return self.cached_value  # If the result is cached, return it directly

        if depth > 1000:  # Prevent infinite recursion (safe guard for deeply nested trees)
            return float(100000)  # Return a large penalty value if recursion depth is exceeded
        
        # Check for None values to avoid errors
        if self is None:
            return 0
        
        '''OPERATORS = {
            'sin': safe_sin,
            'cos': safe_cos,
            'sqrt': safe_sqrt,
            'log': safe_log,
            'exp': safe_exp,
            'abs': np.abs,
        }'''

        if self.is_operator:
            # Unary operator (e.g., sin, cos, sqrt, log, exp)
            if self.value in UNARY_OPERATORS:
                if self.left is None:
                    raise ValueError(f"Missing left child for operator {self.value}")
                left_value = self.left.evaluate(variables, depth + 1)
                result = OPERATORS[self.value](left_value)  # Evaluate the unary operator
                self.cached_value = result  # Cache the result
                return result
            
            # Binary operator (e.g., add, sub, mul, div, pow)
            elif self.value in BINARY_OPERATORS:
                if self.left is None or self.right is None:
                    raise ValueError(f"Missing children for operator {self.value}")
                left_value = self.left.evaluate(variables, depth + 1)
                right_value = self.right.evaluate(variables, depth + 1)
                result = OPERATORS[self.value](left_value, right_value)  # Evaluate the binary operator
                self.cached_value = result  # Cache the result
                return result
        
        elif self.value in variables:
            # Variable node (e.g., x0, x1, etc.)
            return variables[self.value]
        
        # Constant node (just return the constant value)
        self.cached_value = self.value
        return self.value


class Individual:
    def __init__(self, root):
        """
        Initialize an Individual (a tree).
        
        :param root: The root node of the tree.
        """
        self.root = root
        self.fitness_value = None
        self.mse_percentage = None

    def evaluate(self, variables):
        """
        Evaluate the tree by calling the root node's evaluate method.
        
        :param variables: A dictionary mapping variable names to their values.
        :return: The result of evaluating the tree.
        """
        return self.root.evaluate(variables)

    def fitness(self, file_path):
        """
        Calculate the fitness of the individual by evaluating its performance on data.
        
        :param file_path: Path to the data file containing input-output pairs.
        """
        data = np.load(file_path)
        x = data['x']
        y = data['y']

        # Initialize variables for prediction
        num_features = x.shape[0]
        variables = {f'x{i}': None for i in range(num_features)}

        # Compute predictions
        y_pred = []
        for i in range(x.shape[1]):  # Iterate over each column
            for j in range(num_features):  # Set variable values for this row
                variables[f'x{j}'] = x[j, i]
            y_pred.append(self.evaluate(variables))
        y_pred = np.array(y_pred)

        # Calculate MSE (Mean Squared Error)
        mse = np.mean((y - y_pred) ** 2)
        self.fitness_value = mse
        
        # Check if MSE is NaN (due to overflow), set to a large penalty if so
        if np.isnan(mse):
            self.fitness_value = float(10**6)

        # Get the range of y values to determine the scale for percentage calculation
        y_min = np.min(y)
        y_max = np.max(y)

        # Calculate MSE percentage relative to the range of y values
        mse_percentage = (mse / (y_max - y_min)) * 100
        self.mse_percentage = mse_percentage

        return self.fitness_value 

    def __str__(self):
        """
        Return a string representation of the tree.
        """
        return self._str_helper(self.root)

    def _str_helper(self, node, visited=None):
        """Helper function for string representation. Recursively traverse the tree."""
        if visited is None:
            visited = set()  # Initialize visited set for the first call

        # If node is None, return an empty string
        if node is None:
            return ""

        # Check if we've already visited this node to detect cycles. It should not happen in general.
        if id(node) in visited:
            return "(...)  # Cycle detected"
        
        # Mark the current node as visited
        visited.add(id(node))

        if node.is_operator:
            # Handle binary operators (e.g., add, sub, mul, div)
            if node.value in ['add', 'sub', 'mul', 'div']:
                left_str = self._str_helper(node.left, visited)
                right_str = self._str_helper(node.right, visited)
                return f"({left_str} {node.value} {right_str})"
            
            # Handle unary operators (e.g., sin, cos, sqrt, log, exp)
            elif node.value in ['sin', 'cos', 'sqrt', 'log', 'exp']:
                left_str = self._str_helper(node.left, visited)
                return f"({node.value} {left_str})"
        
        # For variable and constant nodes, just return their value as a string
        return str(node.value)
    
    def count_nodes(self, node, depth=0, max_depth=1000):
        """Count the number of nodes in the tree while limiting recursion depth."""
        if node is None or depth > max_depth:
            return 0
        if node.is_operator:
            return 1 + self.count_nodes(node.left, depth + 1, max_depth) + self.count_nodes(node.right, depth + 1, max_depth)
        return 1  # It's a leaf node (constant or variable)



In [None]:
class SymbolicRegression:
    def __init__(self, population_size, num_generations, mutation_rate, crossover_rate, file_path):
        """
        Initialize the Symbolic Regression algorithm with the necessary parameters.
        
        :param population_size: Number of individuals in the population (must be an integer).
        :param num_generations: Number of generations to evolve the population (must be an integer).
        :param mutation_rate: Probability (between 0 and 1) of mutation occurring in each individual.
        :param crossover_rate: Probability (between 0 and 1) of crossover between two individuals.
        :param file_path: Path to the .npz file containing the dataset 'x' and 'y'.
        """
        self.population_size = population_size
        self.num_generations = num_generations
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        self.file_path = file_path
        self.population = []

        # Load data from the provided file
        data = np.load(self.file_path)
        self.x_data = data['x']
        self.y_data = data['y']
        self.num_vars = self.x_data.shape[0]  # Get the number of variables (rows in x)
        self.VARIABLES = [f'x{i}' for i in range(self.num_vars)]  # Dynamically generate variable names

    def create_tree(self, num_nodes, x_data):
        """
        Recursively create a random symbolic tree with the specified number of nodes.
        
        :param num_nodes: The number of nodes to be included in the tree (typically between 4-10).
        :param x_data: The input data used to determine variable names.
        :return: An Individual object representing the symbolic regression tree.
        """
        def build_tree(nodes_left, is_root=True):
            """Build the tree structure recursively."""
            if nodes_left == 1:
                # Leaf node: either a variable or constant
                if random.random() < 0.5:
                    return Node(random.choice(self.VARIABLES)), 1
                else:
                    return Node(random.choice(CONSTANTS)), 1
            else:
                # Internal node: operator (binary/unary)
                operator = random.choice(list(OPERATORS.keys()))
                
                # Unary operator requires only one child (left node)
                if operator in UNARY_OPERATORS:
                    right_subtree_size = nodes_left - 1
                    node = Node(operator, is_operator=True)
                    right_child, _ = build_tree(right_subtree_size, is_root=False)
                    node.left = right_child
                    return node, 1 + right_subtree_size
                
                # Binary operator requires two children (left and right nodes)
                elif operator in BINARY_OPERATORS:
                    right_subtree_size = random.randint(1, nodes_left - 2)
                    left_subtree_size = nodes_left - 1 - right_subtree_size
                    node = Node(operator, is_operator=True)
                    left_child, left_size = build_tree(left_subtree_size, is_root=False)
                    right_child, right_size = build_tree(right_subtree_size, is_root=False)
                    node.left = left_child
                    node.right = right_child
                    return node, 1 + left_size + right_size
                
                
                    

        # Create the tree starting from the root node
        root_node, _ = build_tree(num_nodes)
        # print("here")
        if root_node is None:
            print(f"Warning: Tree creation failed with num_nodes={num_nodes}. Returning None.")
        return Individual(root_node)

    def create_initial_population(self):
        """Create the initial population of individuals with random symbolic trees."""
        for _ in range(self.population_size):
            num_nodes = random.randint(4, 10)  # Randomly decide the number of nodes
            individual = self.create_tree(num_nodes=num_nodes, x_data=self.x_data)
            self.population.append(individual)

    def tournament_selection(self, tournament_size):
        """
        Tournament selection: Randomly select individuals for a tournament and return the best individual.
        
        :param tournament_size: The size of the tournament (number of individuals to randomly pick).
        :return: List of selected parents for reproduction.
        """
        selected_parents = []
        for _ in range(self.population_size):
            tournament = random.sample(self.population, tournament_size)
            best_individual = min(tournament, key=lambda ind: ind.fitness_value)  # Select the individual with the lowest fitness
            selected_parents.append(best_individual)
        return selected_parents

    def swap_subtrees(self, node1, node2):
        """
        Swap the subtrees of two selected nodes.
        
        :param node1: First node (subtree root).
        :param node2: Second node (subtree root).
        """
        # Leaf node swap: swapping variable or constant
        if not node1.is_operator and not node2.is_operator:
            node1.value, node2.value = node2.value, node1.value
            return
        
        # Operator node swap: perform according to compatibility
        if node1.is_operator and node2.is_operator:
            if node1.value in ['add', 'sub', 'mul', 'div', 'pow'] and node2.value in ['add', 'sub', 'mul', 'div', 'pow']:
                node1.value, node2.value = node2.value, node1.value
                node1.left, node2.left = node2.left, node1.left
                node1.right, node2.right = node2.right, node1.right
            elif node1.value in ['sin', 'cos', 'tan', 'sqrt'] and node2.value in ['sin', 'cos', 'tan', 'sqrt']:
                node1.value, node2.value = node2.value, node1.value
                node1.left, node2.left = node2.left, node1.left
            else:
                return
            
    def get_random_node(self, root_node):
        """
        Ritorna un nodo casuale (Node) dall'albero radicato in root_node.

        :param root_node: Nodo radice dell'albero da cui scegliere.
        :return: Un nodo scelto casualmente.
        """
        nodes = []

        def traverse(node):
            if node is None:
                return
            nodes.append(node)
            traverse(node.left)
            traverse(node.right)

        traverse(root_node)

        if not nodes:
            return None  # albero vuoto, nessun nodo da restituire

        return random.choice(nodes)
    
    def get_tree_depth(self, node):
        """
        Calcola la profondità massima dell'albero radicato in 'node'.

        :param node: Nodo radice dell'albero.
        :return: Intero, profondità dell'albero (1 per un singolo nodo).
        """
        if node is None:
            return 0
        left_depth = self.get_tree_depth(node.left)
        right_depth = self.get_tree_depth(node.right)
        return 1 + max(left_depth, right_depth)
    
    def select_random_node(self, root):
        """
        Seleziona casualmente un nodo dall'albero radicato in 'root'.

        :param root: Nodo radice dell'albero.
        :return: Nodo scelto casualmente.
        """
        all_nodes = []

        def traverse(node):
            if node is None:
                return
            all_nodes.append(node)
            traverse(node.left)
            traverse(node.right)

        traverse(root)
        return random.choice(all_nodes) if all_nodes else None

    def crossover(self, parent1, parent2, min_depth=3, max_depth=9):
        """
        Perform a crossover between two parents to create offspring.
        
        :param parent1: First parent individual.
        :param parent2: Second parent individual.
        :param min_depth: Minimum tree depth for the offspring.
        :param max_depth: Maximum tree depth for the offspring.
        :return: A tuple of two offspring individuals after crossover.
        """
        # Create deep copies of the parents to avoid modifying the originals
        child1_before = copy.deepcopy(parent1)
        child2_before = copy.deepcopy(parent2)

        # Select random crossover points in both trees
        crossover_point1 = self.get_random_node(child1.root)
        crossover_point2 = self.get_random_node(child2.root)

        # Swap the subtrees at the selected points
        self.swap_subtrees(crossover_point1, crossover_point2)

        # Ensure the resulting trees meet the depth requirements
        if self.get_tree_depth(child1.root) < min_depth or self.get_tree_depth(child1.root) > max_depth:
            child1 = child1_before #copy.deepcopy(parent1)
        if self.get_tree_depth(child2.root) < min_depth or self.get_tree_depth(child2.root) > max_depth:
            child2 = child2_before #copy.deepcopy(parent2)

        return child1, child2

    def mutate(self, individual):
        """
        Perform mutation on a random node in the individual's tree by changing its operator or leaf.
        
        :param individual: The individual to mutate.
        :return: The mutated individual.
        """
        node_to_mutate = self.select_random_node(individual.root)
        
        if node_to_mutate is None:
            return individual  # If no node selected, return the individual as is

        if not node_to_mutate.is_operator:
            # Mutate leaf nodes: change between variable and constant
            if random.random() < 0.5:
                node_to_mutate.value = random.choice(CONSTANTS)
            else:
                node_to_mutate.value = random.choice(self.VARIABLES)
        else:
            # Mutate operator nodes: change the operation but keep the operator type (binary or unary)
            if node_to_mutate.value in ['add', 'sub', 'mul', 'div', 'pow']:
                possible_operators = ['add', 'sub', 'mul', 'div', 'pow']
                possible_operators.remove(node_to_mutate.value)
                node_to_mutate.value = random.choice(possible_operators)
            elif node_to_mutate.value in ['sin', 'cos', 'tan', 'sqrt']:
                possible_operators = ['sin', 'cos', 'tan', 'sqrt']
                possible_operators.remove(node_to_mutate.value)
                node_to_mutate.value = random.choice(possible_operators)
        
        return individual

    def evolve(self):
        """Evolve the population over multiple generations."""
        # Step 1: Create the initial population
        self.create_initial_population()

        current_generation = 0
        total_generations = self.num_generations

        # Main evolution loop
        while current_generation < total_generations:
            print(f"Starting Generation {current_generation + 1}")

            # Evaluate fitness for each individual in the population
            for individual in self.population:
                try:
                    individual.fitness(file_path=self.file_path)
                except Exception as e:
                    print(f"Error evaluating fitness for individual: {e}")
                    individual.fitness_value = float(1000000)

            # Step 2: Ensure population validity (more than one node)
            valid_population = [ind for ind in self.population if ind.count_nodes(ind.root) > 1]
            while len(valid_population) < self.population_size:
                new_individual = self.create_tree(num_nodes=random.randint(4, 10), x_data=self.x_data)
                if new_individual.count_nodes(new_individual.root) > 1:
                    valid_population.append(new_individual)
            self.population = valid_population

            # Step 3: Elitism - Carry the best individuals to the next generation
            elitism_size = max(1, self.population_size // 10)
            best_individuals = sorted(self.population, key=lambda ind: ind.fitness_value)[:elitism_size] #elitism_sizeindividuals
            next_generation = best_individuals[:]

            # Step 4: Tournament selection and crossover to generate offspring
            unique_individuals = set()
            for ind in next_generation:
                unique_individuals.add(hash(str(ind)))

            while len(next_generation) < self.population_size:
                parents = self.tournament_selection(tournament_size=3)
                parent1, parent2 = random.sample(parents, 2)

                # Apply crossover with a certain probability
                if random.random() < self.crossover_rate:
                    offspring1, offspring2 = self.crossover(parent1, parent2)
                    for offspring in [offspring1, offspring2]:
                        if hash(str(offspring)) not in unique_individuals:
                            next_generation.append(offspring)
                            unique_individuals.add(hash(str(offspring)))
                else:
                    for parent in [parent1, parent2]:
                        if hash(str(parent)) not in unique_individuals:
                            next_generation.append(parent)
                            unique_individuals.add(hash(str(parent)))

            # Step 5: Mutation - Apply mutation with a certain probability
            for ind in next_generation:
                self.mutate(ind)

            # Replace old population with new generation
            self.population = next_generation[:self.population_size]

            current_generation += 1

            # Output the best individual of the generation
            best_individual = min(self.population, key=lambda x: x.fitness_value)
            for i, ind in enumerate(self.population):
                print(f"Individual {i} fitness: {ind.fitness(self.file_path)}")

            print(f"Best Individual of Generation {current_generation}: {best_individual.fitness_value}")





In [None]:
# Usage example
file_path = '../data/problem_1.npz'  # Path to the problem_1.npz file # then change to problem_2.npz, problem_3.npz, problem_4.npz, ... and it adapts automatically to the number of variables.

# Initialize the Symbolic Regression algorithm
sr = SymbolicRegression(
    population_size=200, 
    num_generations=50,
    mutation_rate=0.2,
    crossover_rate=0.7, 
    file_path=file_path
)

# Evolve the population
sr.evolve()