# Symbolic Regression Program
### Computational Intelligence Project 2024/2025
#### Lorenzo Formentin 

---
### Import and Definition

In [None]:
import random
from gp_node import Node
import numpy as np
from typing import Callable, List, Union

np.seterr(all="ignore")


def p_log(x):
    with np.errstate(divide='ignore', invalid='ignore'):
        return np.log(np.abs(x))

def p_sqrt(x):
    return np.sqrt(np.abs(x))

def p_power(x, y):
    with np.errstate(all='ignore'):
        return np.power(np.abs(x), y)
    
# operations list to use with (np_operation, num_of_argument)
operations = [
    (np.negative, 1),
    (p_sqrt, 1),
    (np.exp, 1),
    (p_log, 1),
    (np.sin, 1),
    (np.cos, 1),
    (np.tan, 1),
    (np.square, 1),
    (np.cbrt, 1),
    (np.abs, 1),
    (np.reciprocal, 1),
    (np.add, 2),
    (np.subtract, 2),
    (np.multiply, 2),
    (np.divide, 2),
    (p_power, 2)
]

### GP Implementation

In [None]:
class GP:
    def __init__(self, population_size: int, generations: int, operators: List[Callable], num_var: int, mutation_rate: float, crossover_rate: float, max_depth: int):
        self.population_size = population_size
        self.generations = generations
        self.operators = operators
        self.num_variables = num_var
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        self.max_depth = max_depth
        self.population = [self.generate_population() for _ in range(population_size)]

    # -- Tree Generation ------------------------------------------------
    def generate_population(self, var: bool = False, depth_limit: int = None) -> Node:
        if depth_limit is None:
            depth_limit = self.max_depth
        has_var = [var]
        
        def generate_tree(depth=0) -> Node:
            if depth >= depth_limit or (depth > 0 and random.random() < 0.4):
                if not has_var[0]:
                    has_var[0] = True
                    return Node(f'x_{random.randint(0, self.num_variables-1)}')

                if random.random() < 0.6:
                    return Node(random.uniform(-5, 5))
                else:
                    return Node(f'x_{random.randint(0, self.num_variables-1)}')

            op, num_op = random.choice(self.operators)
            children = [generate_tree(depth + 1) for _ in range(num_op)]
            return Node(op, children)
        
        return generate_tree()
    
    # -- Fitness Evaluation ---------------------------------------------
    def fitness(self, tree: Node, X: np.ndarray, Y: np.ndarray) -> float:
        predictions = tree(x=X)
        mse = np.mean((Y - predictions)**2)
        if np.isnan(mse) or np.isinf(mse):
            return np.inf
        return mse
    
    # -- Utility Methods ------------------------------------------------
    def clone_tree(self, node: Node) -> Node:
        cloned_children = [self.clone_tree(child) for child in node.children]
        return Node(node.value, cloned_children)
    
    def get_all_nodes_with_parent(self, node: Node, parent=None, index=None) -> List[tuple]:
        nodes = [(parent, index, node)]
        for i, child in enumerate(node.children):
            nodes.extend(self.get_all_nodes_with_parent(child, node, i))
        return nodes
    
    # -- Crossover ------------------------------------------------------
    def crossover(self, tree1: Node, tree2: Node):
        max_attempts = 10
        attempts = 0
        while attempts < max_attempts:
            t1 = self.clone_tree(tree1)
            t2 = self.clone_tree(tree2)
            nodes1 = self.get_all_nodes_with_parent(t1)
            nodes2 = self.get_all_nodes_with_parent(t2)

            # Swap the subtrees
            parent1, index1, subtree1 = random.choice(nodes1)
            parent2, index2, subtree2 = random.choice(nodes2)
            if parent1 is None:
                new_t1 = subtree2
            else:
                parent1.children[index1] = subtree2
                new_t1 = t1

            if parent2 is None:
                new_t2 = subtree1
            else:
                parent2.children[index2] = subtree1
                new_t2 = t2
            
            if new_t1.tree_depth <= self.max_depth and new_t2.tree_depth <= self.max_depth:
                return new_t1, new_t2
            attempts += 1
        
        return self.clone_tree(tree1), self.clone_tree(tree2)
    
    # -- Mutation -------------------------------------------------------
    def mutate(self, tree: Node) -> Node:
        if random.random() > self.mutation_rate:
            return tree 
        
        t = self.clone_tree(tree)
        nodes = self.get_all_nodes_with_parent(t)
        parent, index, node_to_mutate = random.choice(nodes)
        
        if node_to_mutate.is_leaf:
            if isinstance(node_to_mutate.value, (int, float)):
                sigma = 0.5
                new_val = node_to_mutate.value + np.random.normal(0, sigma)
                new_node = Node(new_val)
            else:
                if random.random() < 0.4:
                    new_node = Node(random.uniform(-5, 5))
                else:
                    new_node = Node(node_to_mutate.value)
        else:
            if parent is not None:
                parent_depth = parent.tree_depth
                available_depth = self.max_depth - parent_depth
            else:
                available_depth = self.max_depth
            new_node = self.generate_population(var=True, depth_limit=available_depth)
        
        if parent is None:
            candidate = new_node
        else:
            parent.children[index] = new_node
            candidate = t
        
        if candidate.tree_depth <= self.max_depth:
            return candidate
        else:
            return tree
        
    # -- Evolution ------------------------------------------------------
    def evolve(self, X: np.ndarray, Y: np.ndarray) -> (Node, float):
        best_tree = None
        best_fit = np.inf
        
        for generation in range(self.generations):
            fitness_values = []
            for tree in self.population:
                fit = self.fitness(tree, X, Y)
                if np.isinf(fit):
                    new_tree = self.generate_population()
                    fit = self.fitness(new_tree, X, Y)
                    tree = new_tree
                fitness_values.append(fit)
            
            paired = list(zip(fitness_values, self.population))
            paired.sort(key=lambda x: x[0])
            fitness_values, sorted_population = zip(*paired)
            
            if fitness_values[0] < best_fit:
                best_fit = fitness_values[0]
                best_tree = self.clone_tree(sorted_population[0])
            
            print(f"Generation {generation}: Best fitness = {best_fit}, Expression = {best_tree}")
            
            selection_size = max(2, int(self.population_size * 0.4))
            selected = list(sorted_population[:selection_size])
            
            elite_count = max(1, int(self.population_size * 0.1))
            new_population = [self.clone_tree(ind) for ind in sorted_population[:elite_count]]
            
            while len(new_population) < self.population_size:
                parent1 = random.choice(selected)
                parent2 = random.choice(selected)

                if random.random() < self.crossover_rate:
                    child1, child2 = self.crossover(parent1, parent2)
                else:
                    child1 = self.clone_tree(parent1)
                    child2 = self.clone_tree(parent2)
                
                child1 = self.mutate(child1)
                child2 = self.mutate(child2)
                new_population.append(child1)
                if len(new_population) < self.population_size:
                    new_population.append(child2)
            
            self.population = new_population[:self.population_size]
        
        return best_tree, best_fit

### Evaluation

In [None]:
problem = np.load('../data/problem_8.npz')
X = problem['x']
Y = problem['y']
num_variables= X.shape[0]

symreg = GP(400, 1000, operations, num_variables, 0.8, 0.5, 6)

best_expression, best_mse = symreg.evolve(X, Y)
print(f'mse: {best_mse}, y = {best_expression}')