Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [113]:
import numpy as np
import random

In [None]:
problem = np.load('../data/problem_8.npz')
x= problem['x']
y = problem['y']
print(x.shape)

In [None]:

def add(a, b):
    return a + b

def sub(a, b):
    return a - b

def mul(a, b):
    return a * b

def div(a, b):
    return a / (b + 1e-50)  # Protected division

def log(a):
    return np.log(np.abs(a) + 1e-50)  # Protected log

def sqrt(a):
    return np.sqrt(np.abs(a))  # Protected sqrt

# NumPy ufuncs
def sin(a):
    return np.sin(a)

def cos(a):
    return np.cos(a)

def abs(a):
    return np.abs(a)

# pow and exp excluded for overflow problems
function_set = {
    'add': (add, 2),
    'substract': (sub, 2),
    'multiply': (mul, 2),
    'divide': (div, 2),
    'sin': (sin, 1),
    'cos': (cos, 1),
    'log': (log, 1),
    'sqrt': (sqrt, 1),
    'abs': (abs, 1),
}
num_vars = x.shape[0] 
terminals = [f'x{i}' for i in range(num_vars)] 
terminals.append('const')  

print(terminals)


In [116]:
class Node:
    def __init__(self, value, children=None):
        self.value = value  # Operator or terminal
        self.children = children or []  # Child nodes
    
    def is_terminal(self):
        return len(self.children) == 0
    
    def __str__(self):
        if callable(self.value):
            return f"{self.value.__name__}({', '.join(map(str, self.children))})"
        return str(self.value)
    
    def depth(self):
        if self.is_terminal():
            return 0  # Terminal nodes have depth 0
        return 1 + max(child.depth() for child in self.children)

In [117]:
def generate_random_tree(max_depth, terminals, functions):
    if max_depth == 0 or (random.random() < 0.5):  # Terminal node
        value = random.choice(terminals)
        if value == 'const':  # Random constant
            return Node(np.random.randint(-100, 100)) # integers from  -100 100 seemed proper heuristic to represnt all constants 
        return Node(value)
    else:  # Function node
        func, arity = random.choice(list(functions.values()))
        children = [generate_random_tree(max_depth - 1, terminals, functions) for _ in range(arity)]
        return Node(func, children)

In [118]:
def evaluate_tree(tree, X):
    if tree.is_terminal():
        # Handle constant terminal node
        if isinstance(tree.value, (int, float)):  
            return np.full(X.shape[1], tree.value)  # Return constant value for all samples
        # Handle variable terminal node (e.g., x0, x1)
        elif tree.value.startswith('x'):
            col_idx = int(tree.value[1:])   # Convert x1 -> index 0, x2 -> index 1, etc.
            return X[col_idx, :] # Return the corresponding feature for all samples
    else:
        # Handle function node (apply function to children)
        func = tree.value
        args = [evaluate_tree(child, X) for child in tree.children]
        # Ensure all arguments have the same shape
        return func(*args) 

In [119]:
def fitness(tree, X, y):
    predictions = evaluate_tree(tree, X)
    return np.mean((predictions - y) ** 2)

In [120]:
def crossover(parent1, parent2, max_depth):
    def select_random_subtree(node):
        if not node.children or random.random() < 0.5:  # 50% chance to pick the current node
            return node
        return select_random_subtree(random.choice(node.children))

    if parent1.is_terminal() or parent2.is_terminal():
        return select_random_subtree(parent2)

    # Randomly decide whether to swap subtrees
    if random.random() < 0.5:  # 50% chance to swap subtrees
        return select_random_subtree(parent2)

    # Perform crossover for each child by recursively combining subtrees
    new_children = []
    for child in parent1.children:
        new_children.append(crossover(child, parent2, max_depth))  # Apply crossover recursively on children

    new_tree = Node(parent1.value, new_children)

    # Enforce depth constraint
    if new_tree.depth() > max_depth:
        return parent1  # Revert to parent1 if depth exceeds max_depth

    return new_tree

def subtree_mutation(tree, max_depth, terminals, functions):
    if random.random() < 0.1 or tree.is_terminal() or max_depth == 0:  # Replace subtree
        return generate_random_tree(2, terminals, functions)
    new_tree = Node(tree.value, [subtree_mutation(child, max_depth - 1, terminals, functions) for child in tree.children])
    if(new_tree.depth() > max_depth):
        return tree
    return new_tree

def hoist_mutation(tree, max_depth):
    if tree.is_terminal() or max_depth == 0:
        return tree  # Base case: terminal nodes remain unchanged
    # Select a random subtree
    selected_subtree = random.choice(tree.children)
    if random.random() < 0.7 or selected_subtree.is_terminal():  # Hoist this subtree
        return selected_subtree
    # Recursively apply hoist mutation
    return Node(tree.value, [hoist_mutation(child, max_depth - 1) for child in tree.children])

# Point Mutation
def point_mutation(tree, terminals, functions):
    if random.random() < 0.2 or tree.is_terminal():  # Mutate this node
        if tree.is_terminal():
            # Replace terminal with another random terminal
            new_value = random.choice(terminals)
            if new_value == 'const':  # Generate a random constant if needed
                return Node(np.random.randint(-100, 100))
            return Node(new_value)
        else:
            # Replace function with another function of the same arity
            func, arity = random.choice([f for f in functions.values() if len(tree.children) == f[1]])
            return Node(func, tree.children)  # Keep children the same for point mutation
    # Recur into children
    return Node(tree.value, [point_mutation(child, terminals, functions) for child in tree.children])

def collapse_mutation(tree, terminals):
    if tree.is_terminal():  # Base case: terminal nodes remain unchanged
        return tree
    
    if random.random() < 0.3:  # Collapse this subtree into a terminal
        new_value = random.choice(terminals)
        if new_value == 'const':  # Generate a random constant if needed
            return Node(np.random.randint(-100, 100))
        return Node(new_value)
    
    # Recur into children
    return Node(tree.value, [collapse_mutation(child, terminals) for child in tree.children])

def mutate(tree, max_depth, terminals, functions):
    # Define probabilities for each mutation type
    mutation_probs = {
        "subtree": 0.6,  # Subtree mutation
        "point": 0.2,    # Point mutation
        "hoist": 0.1,    # Hoist mutation
        "collapse" : 0.1
    }

    # Select mutation type based on weighted probability
    mutation_type = random.choices(
        list(mutation_probs.keys()),
        weights=list(mutation_probs.values()),
        k=1
    )[0]

    # Apply the selected mutation type
    if mutation_type == "subtree":
        return subtree_mutation(tree, max_depth, terminals, functions)
    elif mutation_type == "point":
        return point_mutation(tree, terminals, functions)
    elif mutation_type == "hoist":
        return hoist_mutation(tree, max_depth)
    elif mutation_type == "collapse":
        return collapse_mutation(tree,terminals)

In [123]:
def evolve(X, y, terminals, functions, population_size, max_depth, num_generations):
    # Initialize population
    # Initial population with small tress just to cover basic functions
    population = [generate_random_tree(2, terminals, functions) for _ in range(population_size)]
    
    best_fitness = float('inf')  # Track the best fitness score
    generation = 0              # Keep track of generation number
    
    for generation in range(num_generations):
        # Evaluate fitness
        fitness_scores = np.array([fitness(tree, X, y) for tree in population])
        best_idx = np.argmin(fitness_scores)
        current_best_tree, current_best_score = population[best_idx], fitness_scores[best_idx]
        current_best_depth = current_best_tree.depth()
        
        print(f"Generation {generation}, Best Fitness: {current_best_score}, Depth: {current_best_depth}")
        
        # Update best fitness and tree
        if current_best_score < best_fitness:
            best_fitness = current_best_score
            best_tree = current_best_tree
        
        # Selection: Keep top % 20 (elitism)
        sorted_population = [tree for _, tree in sorted(zip(fitness_scores, population), key=lambda x: x[0])]
        next_population = sorted_population[:population_size // 5]
        
        # Generate offspring
        while len(next_population) < population_size:
            if random.random() < 0.5:  # Crossover
                parents = random.sample(population, 2)
                child = crossover(parents[0], parents[1], max_depth)
            else:  # Mutation
                parent = random.choice(population)
                child = mutate(parent, max_depth, terminals, functions)
            next_population.append(child)
        
        population = next_population
    
    print(f"Evolution completed after {num_generations} generations with best fitness: {best_fitness}")
    return best_tree

In [None]:
# depth is limited to avoid overfitting and bloating
best_tree = evolve(x, y, terminals, function_set, population_size=100000, max_depth=6, num_generations=100)

# Display the best formula
print("Best Formula:", best_tree)

# Evaluate on training set
mse_train = np.mean((y - evaluate_tree(best_tree, x)) ** 2)
print("Training MSE:", mse_train)

In [126]:
def tree_to_numpy(node):
    if node.is_terminal():
        # Terminal case: return the variable or constant directly
        return str(node.value)
    
    # Function case: Construct the string representation
    func = node.value  # e.g., "add", "mul"
    children_expr = [tree_to_numpy(child) for child in node.children]
    
    # Map function names to NumPy equivalents
    numpy_func_map = {
        add: 'np.add',
        sub: 'np.subtract',
        mul: 'np.multiply',
        div: 'np.divide',
        sin: 'np.sin',
        cos: 'np.cos',
        log: 'np.log',
        sqrt: 'np.sqrt',
        abs: 'np.abs'
    }
    
    # Generate the NumPy-compatible string
    if func in numpy_func_map:
        if len(children_expr) == 2:  # Binary operator
            return f"{numpy_func_map[func]}({children_expr[0]}, {children_expr[1]})"
        elif len(children_expr) == 1:  # Unary operator
            return f"{numpy_func_map[func]}({children_expr[0]})"
        else:
            raise ValueError("Function arity does not match the tree structure.")
    else:
        raise ValueError(f"Unsupported function: {func}")

In [None]:
print(tree_to_numpy(best_tree))