In [None]:
import numpy as np
import random
import copy

Define Class Node and Class Individual

In [None]:
class Node:
    def __init__(self, key):
        self.right = None
        self.left = None
        self.value = key # self.data = key

In [None]:
from dataclasses import dataclass

@dataclass
class Individual:
    genome: Node
    fitness: float = None
    age: int = 0

Define Operators and operands

In [None]:
def add(a, b):
    return a + b

def sub(a, b):
    return a - b

def mul(a, b):
    return a * b

def div(a, b):
    return a / (b + 1e-50)  # Avoid zero division

def log(a):
    return np.log(np.abs(a) + 1e-50)  # Avoid zero log

def sqrt(a):
    return np.sqrt(np.abs(a))  # Avoid negative square root

def sin(a):
    return np.sin(a)

def cos(a):
    return np.cos(a)

def abs(a):
    return np.abs(a)

function_set = {
    'add': (add, 2),
    'substract': (sub, 2),
    'multiply': (mul, 2),
    'divide': (div, 2),
    'sin': (sin, 1),
    'cos': (cos, 1),
    'log': (log, 1),
    'sqrt': (sqrt, 1),
    'abs': (abs, 1),
}


Function for generating the tree, manipulating the tree

In [None]:
def generate_random_tree(complexity = 3, force_operator = None):
    if complexity == 1:
        # 0 feature, 1 constant
        value = np.random.choice([0,1,2])
        if value == 1 or value == 2:
            rnd_choice = np.random.choice(VARIABLES)
        else:
            rnd_choice = np.random.choice(CONSTANTS)
        return Node(rnd_choice)
    
    node_operator = np.random.choice(list(function_set.keys()))
    if force_operator != None:
        binary_op = ['add','substract','multiply','divide']
        force_op = [str(force_operator)] * 5
        binary_op.extend(force_op)
        node_operator = np.random.choice(binary_op)
    
    node = Node(node_operator)
    if function_set[node_operator][1] == 2:
         # Binary operators have two children
        left_complexity = np.random.randint(1, complexity)
        right_complexity = complexity - 1 - left_complexity
        
        # Ensure the complexities are valid
        if left_complexity < 1:
            left_complexity = 1
        if right_complexity < 1:
            right_complexity = 1
        
        node.left = generate_random_tree(left_complexity, force_operator)
        node.right = generate_random_tree(right_complexity, force_operator)
    else:
        if complexity == 2:
            value = np.random.choice([0,1])
            if value == 1:
                rnd_choice = np.random.choice(VARIABLES)
            else:
                rnd_choice = np.random.choice(CONSTANTS)
            node.left = Node(rnd_choice)
        else:
            node.left = generate_random_tree(complexity - 1, force_operator)
            
    return node
    
def count_nodes(node):
    if node is None:
        return 0
    return 1 + count_nodes(node.left) + count_nodes(node.right)

def get_subtree(node, point):
    if node is None:
        return None
    if point == 0:
        return node
    left_size = count_nodes(node.left)
    if point <= left_size:
        return get_subtree(node.left, point - 1)
    return get_subtree(node.right, point - left_size - 1)

def replace_subtree(tree, point, subtree):
    if point == 0:
        tree.value = subtree.value
        tree.left = subtree.left
        tree.right = subtree.right
        return
    left_size = count_nodes(tree.left)
    if point <= left_size:
        replace_subtree(tree.left, point - 1, subtree)
    else:
        replace_subtree(tree.right, point - left_size - 1, subtree)

def print_tree(node, level=0):
    if node is not None:
        print_tree(node.right, level + 1)
        print(' ' * 4 * level + '->', node.value)
        print_tree(node.left, level + 1)

def isOperator(n):
    if n in list(function_set.keys()):
        return True
    else:
        return False
     
def check_tree(node, level=0):
    if node is not None:
        res = isOperator(node.value)
        print(f'{node.value} {res}', )
        check_tree(node.right, level + 1)
        check_tree(node.left, level + 1)

def simplify(node):
    if node is None:
        return None
    
    # Recursively simplify left and right subtrees
    node.left = simplify(node.left)
    node.right = simplify(node.right)
    
    # Simplify based on the type of node
    if isinstance(node.value, (int, float)):
        return node
    
    if node.value == 'add':
        if isinstance(node.left.value, (int, float)) and isinstance(node.right.value, (int, float)):
            return Node(node.left.value + node.right.value)
        if node.left.value == 0:
            return node.right
        if node.right.value == 0:
            return node.left
    
    if node.value == 'substract':
        if isinstance(node.left.value, (int, float)) and isinstance(node.right.value, (int, float)):
            return Node(node.left.value - node.right.value)
        if node.right.value == 0:
            return node.left
    
    if node.value == 'multiply':
        if isinstance(node.left.value, (int, float)) and isinstance(node.right.value, (int, float)):
            return Node(node.left.value * node.right.value)
        if node.left.value == 1:
            return node.right
        if node.right.value == 1:
            return node.left
        if node.left.value == 0 or node.right.value == 0:
            return Node(0)
    
    if node.value == 'divide':
        if isinstance(node.left.value, (int, float)) and isinstance(node.right.value, (int, float)):
            return Node(node.left.value / node.right.value if node.right.value != 0 else 1)
        if node.right.value == 1:
            return node.left
        if node.left.value == 0:
            return Node(0)
        if node.left.value == node.right.value:
            return Node(1)
    
    if node.value == 'sqrt':
        if isinstance(node.left.value, (int, float)):
            return Node(np.sqrt(np.abs(node.left.value)))
    return node

Function for Fitness evaluation

In [None]:
def fitness(tree, X, y):
    predictions = evaluateExpressionTree(tree, X)
    return np.mean((predictions - y) ** 2)

def evaluateExpressionTree(root, X):
 
    # empty tree
    if root is None:
        return 0
 
    # leaf node
    if root.left is None and root.right is None:
        if isinstance(root.value, (int, float)):  
            return np.full(X.shape[1], root.value)  # return constant value for all samples
        # if it's a variable
        elif root.value.startswith('x'):
            col_idx = int(root.value[1:])   # converting x1 -> index 0, x2 -> index 1
            return X[col_idx, :] # return the corresponding feature for all samples
 
    # evaluate left tree
    left_sum = evaluateExpressionTree(root.left, X)
 
    # evaluate right tree
    right_sum = evaluateExpressionTree(root.right, X)
 
    # check which operation to apply
    if root.value == 'add':
        return add(left_sum , right_sum)
 
    elif root.value == 'substract':
        return sub(left_sum, right_sum)
 
    elif root.value == 'multiply':
        return mul(left_sum, right_sum)
 
    elif root.value == 'divide':
        return div(left_sum, right_sum)
    
    elif root.value == 'sin':
        return sin(left_sum)
    
    elif root.value == 'cos':
        return cos(left_sum)
    
    elif root.value == 'log':
        return log(left_sum)
    
    elif root.value == 'sqrt':
        return sqrt(left_sum)
    
    elif root.value == 'abs':
        return abs(left_sum)

Function for population generation, types of mutations and mutate function

In [None]:
def generate_population(complexity = 3, population_size = 10, force_operator = None):
    population = []
    for _ in range(population_size):
        tree_complexity = random.randint(1,complexity)
        tree_expression = generate_random_tree(tree_complexity, force_operator)
        person = Individual(tree_expression)
        population.append(person)
    return population

def print_population_islands(islands):
    for island in islands:
        print('*'*40)
        for person in island:
            print_tree(person.genome)
            print(f"Fitness ==> {person.fitness}")
            print('-'*40)
        print('*'*40)

def print_population_island(island):
    for person in island:
        print_tree(person.genome)
        print(f"Fitness ==> {person.fitness}")
        print('-'*40)

def evaluate_individual_fitness(islands):
    for island in islands:
        for person in island:
            person.fitness = fitness(person.genome, X, Y)
            person.age = person.age + 1

def age_everyone(island):
    for person in island:
        person.age = person.age + 1
         
def evaluate_island_fitness(island):
    for person in island:
        person.fitness = fitness(person.genome, X, Y)
        person.age = person.age + 1

def get_similar_operator(operator):
    binary = ['add', 'substract', 'multiply', 'divide']
    n_binary = ['sin', 'cos', 'log', 'sqrt', 'abs']
    if operator in binary:
        binary.remove(operator)
        new_operator = random.choice(binary)
    else:
        n_binary.remove(operator)
        new_operator = random.choice(n_binary)
    return new_operator


def find_operator_positions(node, level=0, position=[]):
    if node is not None:
        if isOperator(node.value):
            position.append((node.value, node))
        find_operator_positions(node.right, level + 1, position)
        find_operator_positions(node.left, level + 1, position)
    return position

def find_constants_positions(node, path=""):
    if node is None:
        return []

    # If the node is a leaf node (operand), check if it's a constant
    if node.left is None and node.right is None:
        try:
            float(node.value)  # Check if the value can be converted to a float
            return [(node.value, node)]
        except ValueError:
            return []

    # Recursively find constants in the left and right subtrees
    left_positions = find_constants_positions(node.left, path + "L")
    right_positions = find_constants_positions(node.right, path + "R")

    return left_positions + right_positions

def find_x_positions(node, level=0, position=[]):
    if node is not None:
        if not isinstance(node.value, (int, float)) and  node.value.startswith('x'):
            position.append((node.value, node))
        find_x_positions(node.right, level + 1, position)
        find_x_positions(node.left, level + 1, position)
    return position

def mutate(Expressions, Temperature, max_depth=7):
    cond = False
    while (not cond):
        E = copy.deepcopy(Expressions)
        i = random.randint(1,8)
        match i:
            case 1:
                # Mutate random constant
                alpha = np.power((0.1 + 0.1 * Temperature + 0.1),(random.randint(1,5) - 1))
                constant_positions = find_constants_positions(E.genome)
                res_len = len(constant_positions)
                if res_len > 0:
                    const_value, const_node = constant_positions[random.randint(0,res_len-1)]
                    const_node.value = const_value * alpha
            case 2:
                # Mutate random Operator
                operator_positions = find_operator_positions(E.genome, 0, [])
                res_len = len(operator_positions)
                if res_len > 0:
                    const_value, const_node = operator_positions[random.randint(0,res_len-1)]
                    const_node.value = get_similar_operator(const_value)
            case 3:
                # Delete subtree and replace with constant
                point1 = np.random.randint(0, count_nodes(E.genome))
                new_const = Node(random.choice(CONSTANTS))
                replace_subtree(E.genome, point1, new_const)
            case 4:
                # Delete subtree and replace with random tree
                point1 = np.random.randint(0, count_nodes(E.genome))
                new_tree = generate_random_tree(3)
                replace_subtree(E.genome, point1, new_tree)
            case 5:
                E.genome = simplify(E.genome)
            case 6:
                E.genome = generate_random_tree(random.randint(1,5))
            case 7:
                pos = find_x_positions(E.genome, 0, [])
                res_len = len(pos)
                if res_len > 0:
                    const_value, const_node = pos[random.randint(0,res_len-1)]
                    aus_var = copy.copy(VARIABLES)
                    aus_var.remove(const_value)
                    const_node.value = random.choice(aus_var)
            case 8:
                ... #do nothing
        #print("genome"+E.genome)
        n_nodes = count_nodes(E.genome)
        if n_nodes <= max_depth:
            cond = True
    if random.random() < 0.01 :
        # Random do not apply any mutation
        E = copy.deepcopy(Expressions)
    
    return E

def one_point_crossover(tree1, tree2):
    """
    Perform one-point crossover between two parent trees.
    
    Parameters:
    tree1 (Node): Root node of the first parent tree.
    tree2 (Node): Root node of the second parent tree.
    
    Returns:
    offspring1, offspring2 (tuple): Two offspring trees.
    """
    point1 = random.randint(0, count_nodes(tree1) - 1)
    point2 = random.randint(0, count_nodes(tree2) - 1)
    
    subtree1 = get_subtree(tree1, point1)
    subtree2 = get_subtree(tree2, point2)
    
    offspring1 = copy.deepcopy(tree1)
    offspring2 = copy.deepcopy(tree2)
    
    replace_subtree(offspring1, point1, subtree2)
    replace_subtree(offspring2, point2, subtree1)
    
    return offspring1, offspring2


In [None]:
def expression_to_string(node):
    numpy_func_map = {
        'add': '+',
        'substract': '-',
        'multiply': '*',
        'divide': '/',
        'sin': 'np.sin',
        'cos': 'np.cos',
        'log': 'np.log',
        'sqrt': 'np.sqrt',
        'abs': 'np.abs'
    }
    if node is None:
        return ""
    
    # If the node is a leaf node (i.e., no children), return its value
    if node.left is None and node.right is None:
        return str(node.value)
    
    # Recursively get the string representation of the left and right subtrees
    left_str = expression_to_string(node.left)
    right_str = expression_to_string(node.right)
    
    # Return the string representation of the current node in the desired format
    if node.value in ['add', 'substract', 'divide', 'multiply']:
        return f"{numpy_func_map[node.value]}({left_str},{right_str})"
    elif node.value in ['sin', 'cos', 'abs', 'sqrt', 'log']:
        return f"{numpy_func_map[node.value]}({left_str})"

Define the Evolve function

In [None]:
def evolve(Population, X, Y):
    nc = 500
    for k in range(nc):
        # Order by age and 
        ordered_data = sorted(Population, key=lambda x: (x.age, x.fitness))
        if random.random() > 0.02:
            # Mutation
            # Tournament Selection
            tournamen_selection = random.sample(Population, 12)
            expression = min(tournamen_selection , key=lambda x: x.fitness)
            temperature = 1 - (k / nc)
            new_expression = mutate(expression, temperature, 7)
            new_expression.fitness = fitness(new_expression.genome, X, Y)
            new_expression.age = 1
            # Replace with oldest
            oldest_indices = sorted(range(len(Population)), key=lambda i: Population[i].age, reverse=True)[:1]
            # Remove oldest
            ordered_data.pop(oldest_indices[0])
            ordered_data.append(new_expression)
        else:
            # Crossover
            parent_1 = min(random.sample(Population, 12) , key=lambda x: x.fitness)
            parent_2 = min(random.sample(Population, 12) , key=lambda x: x.fitness)
            offspring1, offspring2 = one_point_crossover(parent_1.genome, parent_2.genome)
            new_individual_1 = Individual(offspring1)
            new_individual_1.fitness = fitness(new_individual_1.genome, X, Y)
            new_individual_1.age = 1
            new_individual_2 = Individual(offspring2)
            new_individual_2.fitness = fitness(new_individual_2.genome, X, Y)
            new_individual_1.age = 1
            # Replace the 
            oldest_indices = sorted(range(len(Population)), key=lambda i: Population[i].age, reverse=True)[:2]
            ordered_data.pop(oldest_indices[0])
            ordered_data.append(new_individual_1)
            ordered_data.pop(oldest_indices[1])
            ordered_data.append(new_individual_2)
        Population = ordered_data
        #evaluate_island_fitness(Population)
        age_everyone(Population)
        
    
    return Population

Load the data of the problem

In [None]:
path = "c:\\Users\\tedib\\Desktop\\Scuola\\Magistrale Intelligenza Artificiale\\Secondo Anno\\Computational Intelligence\\vs_code\\DEF-Project\\data\\"
n_problem = "2"
problem = np.load(path+'problem_'+n_problem+'.npz')
X= problem['x']
Y = problem['y']

constant_range = np.linspace(-100, 100, num=30)
CONSTANTS = list(constant_range)
VARIABLES = [f'x{i}' for i in range(X.shape[0])]
values_choice = VARIABLES.copy()
values_choice.extend(CONSTANTS)

print(X.shape, Y.shape)

MAIN, Execute the Symbolic Regression Algorithm

In [None]:
n_islands = 15
iteration = 3
n_population = 25
Islands = []
max_depth = 7
operators = list(function_set.keys())
for i in range(n_islands):
    force_operator = None
    if i < len(operators):
        force_operator = operators[i]
    new_population = generate_population(max_depth, n_population, force_operator)
    Islands.append(new_population)

evaluate_individual_fitness(Islands)
#print_population_islands(Islands)

best_expression_overall = Individual(Node('10'))
best_expression_island = [Individual(Node('10'))]*n_islands
print(best_expression_overall.fitness)
for n in range(40):
    print(f"*"*30+f" Generation {n} "+ "*"*30)
    i = 0
    for island in Islands:
        print("-"*40)
        print(f'Init Island {i}')
        population = evolve(island, X, Y)
        ordered_data = sorted(population, key=lambda x: (x.fitness))
        Islands[i] = ordered_data[:n_population]
        population = Islands[i]
        print("End evolve")
        
        possible_best_individual = min(population, key=lambda x: (x.fitness))
        if best_expression_overall.fitness is not None:
            if possible_best_individual.fitness < best_expression_overall.fitness:
                best_expression_overall = possible_best_individual
        else:
            best_expression_overall = possible_best_individual
        if best_expression_island[i].fitness is not None:
            if possible_best_individual.fitness < best_expression_island[i].fitness:
                best_expression_island[i] = possible_best_individual
        else:
            best_expression_island.append(possible_best_individual)
        
        # Migration
        for indiv in population:
            island_used = [i]
            cond = True
            while(cond):
                rnd_island = random.randint(0,n_islands-1)
                if rnd_island not in island_used:
                    cond = False
                    if random.random() < 0.05:
                        island_used.append(rnd_island)
            if random.random() < 0.3:
                indiv.genome = simplify(indiv.genome)
            if random.random() < 0.05:
                indiv = possible_best_individual
            if random.random() < 0.05:
                indiv = best_expression_island[rnd_island]
            if random.random() < 0.05:
                # Replace with random individual
                indiv = random.sample(Islands[rnd_island],1)
                
        print(f'End Island {i}')
        print("-"*40)
        print()
        i += 1
    print(best_expression_overall)
    print_tree(best_expression_overall.genome)
    print("-"*20)
    
    


Print best expression of the evaluation

In [None]:
print(best_expression_overall)
print_tree(best_expression_overall.genome)
print(expression_to_string(best_expression_overall.genome))