In [33]:
import numpy as np
from numpy import random
from tqdm import tqdm
import warnings

In [69]:
problem = np.load('../data/problem_2.npz')
x = problem['x']
y = problem['y']
x.shape

(3, 5000)

In [36]:
# Define irreducible functions
FUNCTIONS = [
    np.add, np.subtract, np.multiply, np.divide,
    np.sin, np.cos, np.tan,
    np.exp, np.log, np.sqrt, np.abs
]

In [37]:
def Generate_range(x):
    """
    Calculates the extended range for generating constants.
    """
    x_min, x_max = x.min(), x.max()
    delta = 0.1 * (x_max - x_min)
    extended_min = x_min - delta
    extended_max = x_max + delta
    return int(extended_min), int(extended_max)


def Generate_end_leafs(x, extended_range):
    """
    Generates end leafs: constants or input variables.
    
    Parameters:
        x (np.ndarray): Input array.
        extended_range (tuple): Tuple containing (extended_min, extended_max).
    """
    extended_min, extended_max = extended_range

    # Random integer constant within the extended range
    constant = random.randint(extended_min, extended_max + 1)
    while constant == 0:
        constant = random.randint(extended_min, extended_max + 1)

    # Randomly return either a constant or a variable
    if random.rand() < 0.5:
        return constant  # Return constant
    else:
        return f"x[{random.randint(0, x.shape[0])}]"  # Return variable
    

def Generate_tree(max_depth, x, extended_range):
    """
    Generates a random tree structure.

    Parameters:
        max_depth (int): The maximum depth of the tree.
        x (np.ndarray): Input data.
        extended_range (tuple): Precomputed (extended_min, extended_max) for generating constants.

    Returns:
        tuple or str/int: A tree structure or a terminal node.
    """
    # Base case: generate terminal node
    if max_depth == 0 or random.random() < 0.3:  # Fixed `random.rand` to `random.random`
        return Generate_end_leafs(x, extended_range)

    # Select a random function
    func = random.choice(FUNCTIONS)

    # Generate left and right subtrees
    left_subtree = Generate_tree(max_depth - 1, x, extended_range)
    right_subtree = Generate_tree(max_depth - 1, x, extended_range)

    # Return the tree as a tuple
    return (func, left_subtree, right_subtree)


def Convert_tree_to_expression(tree):
    """
    Converts a tree structure into a NumPy-executable formula.
    """
    if isinstance(tree, (int, str)):
        if isinstance(tree, str):  # It's a variable like "x[0]"
            return f"x[{int(tree[2:-1])}]"
        return str(tree)  # It's a constant value

    func, left, right = tree
    left_expr = Convert_tree_to_expression(left)
    right_expr = Convert_tree_to_expression(right)

    # Get the function name or operator for binary and unary functions
    if func in [np.add, np.subtract, np.multiply, np.divide]:
        operator = {
            np.add: "+",
            np.subtract: "-",
            np.multiply: "*",
            np.divide: "/",
        }[func]
        return f"({left_expr} {operator} {right_expr})"
    else:
        return f"np.{func.__name__}({left_expr})"  # Use NumPy function name   


def Calculate_mse(tree, x: np.ndarray, y: np.ndarray) -> float:
    """
    Calculates the Mean Squared Error (MSE) for a tree representation.
    """
    # Convert the tree into an expression
    expression = Convert_tree_to_expression(tree)
    
    # Construct a Python function dynamically
    formula = f"""
def f(x: np.ndarray) -> np.ndarray:
    return {expression}
"""

    # Map function names dynamically from FUNCTIONS
    func_mapping = {f.__name__: f for f in FUNCTIONS}
    exec_locals = {}
    exec_globals = {"np": np, **func_mapping}  # Include mapped functions

    try:
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", category=RuntimeWarning)  # Ignore runtime warnings
            exec(formula, exec_globals, exec_locals)  # Dynamically define f
            f = exec_locals["f"]
            predictions = f(x)  # Evaluate predictions using x

            # Check for invalid values in predictions
            if not np.isfinite(predictions).all():
                return float('inf')  # Return infinite MSE if invalid values are found

            # Calculate MSE
            mse = np.mean((predictions - y) ** 2)
            return mse
    except Exception as e:
        # Return infinite MSE for any exceptions during execution
        return float('inf')
        


def Select_random_subtree(tree):
    """
    Selects a random subtree (or terminal node) from the given tree.

    Parameters:
        tree: The tree to select a random subtree from.

    Returns:
        The selected subtree and its parent structure (None if it's the root).
    """
    if isinstance(tree, (int, str)):
        # Base case: tree is a terminal node
        return tree, None

    func, left, right = tree

    # Decide to pick the current node or descend into subtrees
    if random.random() < 0.33:  # 33% chance to pick the current node
        return tree, None
    elif random.random() < 0.5:  # 50% chance to go to the left subtree
        subtree, parent = Select_random_subtree(left)
        return subtree, (func, subtree, right) if parent is None else (func, parent, right)
    else:  # 50% chance to go to the right subtree
        subtree, parent = Select_random_subtree(right)
        return subtree, (func, left, subtree) if parent is None else (func, left, parent)


def Mutate_tree(tree, x, extended_range):
    """
    Mutates a random subtree or terminal node in the tree.

    Parameters:
        tree: The tree to mutate.
        x (np.ndarray): Input data for generating terminals.
        extended_range (tuple): The range for generating constants.
        
    Returns:
        The mutated tree.
    """
    subtree, parent = Select_random_subtree(tree)

    if isinstance(subtree, (int, str)):
        # Subtree is a terminal node; replace it
        new_terminal = Generate_end_leafs(x, extended_range)
        if parent is None:
            return new_terminal  # If root node, return the new terminal
        else:
            func, left, right = parent
            if subtree == left:
                return (func, new_terminal, right)
            else:
                return (func, left, new_terminal)
    else:
        # Subtree is a function node; replace the function
        func, left, right = subtree
        new_func = random.choice(FUNCTIONS)
        while new_func == func:
            new_func = random.choice(FUNCTIONS)
        new_subtree = (new_func, left, right)
        if parent is None:
            return new_subtree  # If root node, return the new subtree
        else:
            func, left, right = parent
            if subtree == left:
                return (func, new_subtree, right)
            else:
                return (func, left, new_subtree)


def Recombinate_trees(tree1, tree2):
    """
    Swaps a random subtree between two trees.

    Parameters:
        tree1: The first tree.
        tree2: The second tree.

    Returns:
        A tuple of the two new trees after recombination.
    """
    subtree1, parent1 = Select_random_subtree(tree1)
    subtree2, parent2 = Select_random_subtree(tree2)

    # Swap subtrees
    if parent1 is None and parent2 is None:
        # Both are root nodes
        return subtree2, subtree1
    elif parent1 is None:
        # tree1's root node is replaced
        return subtree2, tree2
    elif parent2 is None:
        # tree2's root node is replaced
        return tree1, subtree1
    else:
        # Replace in respective parents
        func1, left1, right1 = parent1
        func2, left2, right2 = parent2

        if subtree1 == left1:
            parent1 = (func1, subtree2, right1)
        else:
            parent1 = (func1, left1, subtree2)

        if subtree2 == left2:
            parent2 = (func2, subtree1, right2)
        else:
            parent2 = (func2, left2, subtree1)

        return parent1, parent2


def Evaluate_population(population):
        """

        Evaluate MSE for all individuals

        """
        return [(tree, Calculate_mse(tree, x, y)) for tree in population]


def Sym_reg(x, y, max_depth=3, population_size=1000, generations=10, mutation_rate=0.5):
    """
    Symbolic regression using genetic programming principles.

    Parameters:
        x (np.ndarray): Input features.
        y (np.ndarray): Target values.
        max_depth (int): Maximum depth of the tree.
        population_size (int): Number of individuals in the population.
        generations (int): Number of generations to evolve the population.
        mutation_rate (float): Probability of mutating an individual.

    Returns:
        tuple: The best tree and its MSE.
    """
    extended_range = Generate_range(x)

    # Initialize population with random trees
    population = [
        Generate_tree(max_depth, x, extended_range)
        for _ in range(population_size)
    ]

    
    # Evaluate initial population
    evaluated_population = Evaluate_population(population)
    

    for gen in range(generations):
        print(f"Generation {gen + 1}/{generations}")

        
        survivors = []

        # Tournament selection
        while len(evaluated_population) > 1:
            # Randomly pick two distinct individuals
            idx1, idx2 = np.random.choice(len(evaluated_population), size=2, replace=False)
            
            # Compare their fitness (lower MSE is better)
            if evaluated_population[idx1][1] <= evaluated_population[idx2][1]:
                winner = evaluated_population[idx1]  # The one with the better (lower) MSE
            else:
                winner = evaluated_population[idx2]
            
            # Add the winner to survivors
            survivors.append(winner[0])  # Append the tree (not MSE)
            
            # Remove both individuals from evaluated_population
            del evaluated_population[max(idx1, idx2)]  # Remove the higher index first
            del evaluated_population[min(idx1, idx2)]  # Then remove the lower index

        # Reproduce new individuals by mutation or recombination
        offspring = []

        # Perform mutation or recombination to generate 80% of the new individuals
        while len(offspring) < ((population_size // 2) * 0.8):
            if np.random.rand() < mutation_rate:
                # Select a random parent from survivors for mutation
                parent = survivors[np.random.choice(len(survivors))]
                offspring.append(Mutate_tree(parent, x, extended_range))
            else:
                # Select two random parents from survivors for recombination
                parent1_index, parent2_index = np.random.choice(len(survivors), size=2, replace=False)
                parent1, parent2 = survivors[parent1_index], survivors[parent2_index]
                child1, child2 = Recombinate_trees(parent1, parent2)
                offspring.extend([child1, child2])


        brand_new_population = [
            Generate_tree(max_depth, x, extended_range)
            for _ in range(int((population_size // 2) * 0.2))
        ]

        # Update population with survivors and offspring
        population = survivors + offspring + brand_new_population

        # Evaluate the new population
        evaluated_population = Evaluate_population(population)
        evaluated_population.sort(key=lambda ind: ind[1])  # Sort by MSE

        # Print the best solution of the generation
        best_tree, best_mse = evaluated_population[0]
        print(f"Best MSE: {best_mse:.6f}")

    # Return the best solution overall
    best_tree, best_mse = evaluated_population[0]
    best_formula = Convert_tree_to_expression(best_tree)
    print("Best Tree:", best_tree)
    print("Best Formula:", f"def f(x): return {best_formula}")
    print("Best MSE:", best_mse)

    return best_tree, best_mse


In [77]:
# Run symbolic regression
best_tree, best_mse = Sym_reg(x, y, max_depth=7, population_size=10000, generations=100, mutation_rate=0.3)

Generation 1/100
Best MSE: 19920285033546.093750
Generation 2/100
Best MSE: 19920285033546.093750
Generation 3/100
Best MSE: 19920285033546.093750
Generation 4/100
Best MSE: 19920285033546.093750
Generation 5/100
Best MSE: 19920285033546.093750
Generation 6/100
Best MSE: 19920285033546.093750
Generation 7/100
Best MSE: 19920285033546.093750
Generation 8/100
Best MSE: 19920285033546.093750
Generation 9/100
Best MSE: 19920285033546.093750
Generation 10/100
Best MSE: 19920285033546.093750
Generation 11/100
Best MSE: 19920285033546.093750
Generation 12/100
Best MSE: 19920285033546.093750
Generation 13/100
Best MSE: 19920285033546.093750
Generation 14/100
Best MSE: 19920285033546.093750
Generation 15/100
Best MSE: 19920285033546.093750
Generation 16/100
Best MSE: 19920285033546.093750
Generation 17/100
Best MSE: 19920285033546.093750
Generation 18/100
Best MSE: 19920285033546.093750
Generation 19/100
Best MSE: 19920285033546.093750
Generation 20/100
Best MSE: 19920285033546.093750
Generatio