In [18]:
import random
import numpy as np
import math

To run Problem 8, we made some simplifications, such as reducing exploitation and using smaller populations, as overall the computation was slow. Additionally, each problem has its own set of hyperparameters.

In [None]:
data = np.load("problem_5.npz")

x = data["x"]
y = data["y"]
x = x.T
print(x.shape, y.shape)

N = y.shape[0]
D = x.shape[1]

In [None]:
indexes = np.random.choice(range(N), size=int(0.20*N), replace=False)

x_test = x[indexes,:]
x_train = np.delete(x,indexes,axis=0)
y_test = y[indexes]
y_train = np.delete(y,indexes,axis=0)

print(x_train.shape, y_train.shape)


# x_train = x
# x_test = x
# y_train = y
# y_test = y

In [21]:
bin_operations =['+', '-', '*', '/']   #'^'

un_operations = {
    '**2': lambda x: x**2,
    '**3': lambda x: x**3,
    '**4': lambda x: x**4,
    '**5': lambda x: x**5,
    '**6': lambda x: x**6
}

un_operations_ele = {
    'sin': lambda x: np.sin(x),
    'cos': lambda x: np.cos(x),
    'exp': lambda x: np.exp(x),
    'log': lambda x: np.log(x) if np.all(x > 0) else x,      # else returns the value of the original x
    'sqrt': lambda x: np.sqrt(x) if np.all(x > 0) else x
}


In [22]:
###### GLOBAL VARIABLES #######
X_PROB = 0.55               # prob of selectiong an xi instead of a constant
X_EXTINTION = 0.2           # prob of extintion due not all xi present (ex. only x1 but not x2 in a 2D problem)

UN_BIN_PROB = 0.3           # prob of selecting an unary operation instead of binary
P_UN_DECREASE_OP = 0.2      # probability that decreases the chance to have specific unary operations
P_REMOVE_MORE = 0.2         # probability of skipping the exp or **3 (removing from available options)

P_BIN_OP = [0.30, 0.25, 0.40, 0.05]             # prob of +, -, *, /                         # p=[0.3, 0.25, 0.25, 0.05, 0.15]    case with ^ too
P_MUTATION = [0.3, 0.3, 0.2, 0.05, 0.05, 0.1]   # prob of different mutations, ["subtree", "point", "collapse", "hoist", "lr", "exp"]
P_RECOMBINATION = [0.3, 0.7]                    # prob of different recombinations, ["half", "normal"]

In [23]:
class Node:
    def __init__(self, value = None, dim = None, operation = None, left=None, right=None, parent = None):
        self.value = value           # Value for leaf node
        self.x_dim = dim             # Dimension (1=x, 2=y, 3=z ...) 
        self.operation = operation   # Operation for internal nodes
        self.left = left             # Left child
        self.right = right           # Right child
        self.parent = parent         # Parent

        
        
    def str_representation(self):
        if self.operation in bin_operations:
            return f"({self.left.str_representation()} {self.operation} {self.right.str_representation()})"
        elif self.operation is None:        # Leaf node
            if isinstance(self.value, str): 
                return str(self.value) + "_" + str(self.x_dim)
            else:
                return str(self.value)
        else: 
            return f"{self.operation}({self.left.str_representation()})"
       
    
    
    def callable_representation(self): ### To have a python lambda function
        if self.operation is None:  # Leaf node
            if self.value == "x":
                return lambda x: x[self.x_dim-1] # Variable
            else:
                return lambda x: self.value  # Constant
        elif self.operation in bin_operations:  # Binary operation
            left_func = self.left.callable_representation()
            right_func = self.right.callable_representation()
            if self.operation == "+":
                return lambda x: left_func(x) + right_func(x)
            elif self.operation == "-":
                return lambda x: left_func(x) - right_func(x)
            elif self.operation == "*":
                return lambda x: left_func(x) * right_func(x)
            #elif self.operation == "^":
            #    return lambda x: left_func(x) ** right_func(x)
            elif self.operation == "/":
                return lambda x: left_func(x) / right_func(x) if abs(right_func(x)) > 1e-10 else (1e10)   # If it is almost 0 or 0, we return a very big number
        else:  # Unary operation (sqrt, log, **2, ...)
            inner_func = self.left.callable_representation()
            all_operations = un_operations | un_operations_ele   # All operations together
            return lambda x: all_operations[self.operation](inner_func(x))
    
    

    
    def copy(self, visited=None):  # Pass visited set to track copies
        if visited is None:
            visited = set()
        
        # Check if the current node has already been copied
        if id(self) in visited:
            return None  # Return None or self, depending on how you want to handle it
        
        visited.add(id(self))
        
        return Node(
            value=self.value,
            dim=self.x_dim,
            operation=self.operation,
            left=self.left.copy(visited) if self.left else None,
            right=self.right.copy(visited) if self.right else None,
            parent=self.parent.copy(visited) if self.parent else None   # if root
        )


    

    
     

    
    
class IndividualTree:
    def __init__(self, root=None, num_dimensions = 1):
        self.root = root    # Will be a reference to the first Node
        self.depth = self.get_depth()
        self.nodes_list = self.get_all_nodes()  # We track the node to a direct access
        self.num_dimensions = num_dimensions    # Numebr of domain dimesion  
        self.num_x_dim = self.get_num_x_dim()  


        
    def get_random_node(self):    # Generate list when it is needed
        if self.nodes_list is None:
            self.nodes_list = self.get_all_nodes(self.root)
        return random.choice(self.nodes_list)

    
    def get_all_nodes(self,  node=None):
        if node is None: 
            node = self.root            
        nodes = [node]
        if node.left:
            nodes.extend(self.get_all_nodes(node.left))
        if node.right:
            nodes.extend(self.get_all_nodes(node.right))
        return nodes
    
    
    def get_depth(self, node=None, current_depth=0):
        if node is None:
            node = self.root
        if not node:  # If there's no root, the depth is 0
            return 0
        # Recursively calculate depth for left and right children
        left_depth = self.get_depth(node.left, current_depth + 1) if node.left else current_depth
        right_depth = self.get_depth(node.right, current_depth + 1) if node.right else current_depth
        return max(left_depth, right_depth)
    
      

    def get_num_x_dim(self):
        max_dims = self.num_dimensions
        found_dims = set()

        def traverse(node):
            if node is None:
                return
            if node.value == "x" and node.x_dim is not None:  # Check if it's an x_i node
                found_dims.add(node.x_dim)
            if len(found_dims) >= max_dims:    # All dimensions are found
                return
            if node.left:
                traverse(node.left)
            if node.right:
                traverse(node.right)

        traverse(self.root)  # Start from the root
        return len(found_dims)
    
    
    
    def str_representation(self):
        return self.root.str_representation()
    
    def callable_representation(self):
        return self.root.callable_representation()

    def evaluate_tree(self, xx):
        func = self.callable_representation()
        return func(xx)
    
    def mse_individual(self, X, y_true):
        y_pred = [self.evaluate_tree(xi) for xi in X]
        ind_mse = np.mean((np.array(y_pred) - np.array(y_true))**2)  # Mean Squared Error (MSE)
        return ind_mse
    
    def update_properties(self):
        self.depth = self.get_depth()
        self.nodes_list = self.get_all_nodes()
    
    def copy(self):
        new_root = self.root.copy() # if self.root else None        
        new_tree = IndividualTree(root=new_root, num_dimensions=self.num_dimensions)
        if self.nodes_list:
            new_tree.nodes_list = new_tree.get_all_nodes(new_root)
        return new_tree
     

In [24]:
def available_un_func(max_grade, flag_un_ele):
    # Only operations up to grade max_grade
    available_un_op = []
    for key in un_operations.keys():
        # Extract the number after '**'
        degree = int(key[2:])
        
        # Check if the degree is less than or equal to max_grade
        if degree <= max_grade:
            available_un_op.append(key)

    if flag_un_ele == True:
        if np.random.uniform(0, 1) < P_UN_DECREASE_OP:   # Additional decrease of the prob to get elementary oeprations
            available_un_op.extend(un_operations_ele)

    if np.random.uniform(0, 1) < P_REMOVE_MORE:    # Additional decrease of the probability to choose exp(x) that often brings to overflow!
        if 'exp' in available_un_op:  # Check if "exp" is in available operations
            available_un_op.remove('exp')
        
    if np.random.uniform(0, 1) < P_REMOVE_MORE:     # Additional decrease of the probability to choose **3 that can bring to overflow!
        if '**3' in available_un_op:    # Check if "**3" is in available operations
            available_un_op.remove('**3')
        
    #print(available_un_op)
    return available_un_op



def assemble_tree(num_dimensions, depth, max_grade=2, flag_un_ele = False, parent=None):    
    available_unary_op = available_un_func(max_grade, flag_un_ele)   # Only operations up to grade max_grade
        
    ### ROOT or LEAF NODE
    if depth == 0:   # Parent will be None if root

        ### Selecting xi (as a string)
        if np.random.rand() < X_PROB:
            x_selected = np.random.randint(1,num_dimensions+1)
            return Node(value="x", dim = x_selected, parent=parent)
        
        ### Selecting a constant
        else:  
            sampled_value = random.choice([0, 1])
            if sampled_value == 0:
                value = round(np.random.uniform(-25,25),4)  # simulate a number between -10 and 10 and rounding it with 3 digits
            else:
                value = round(np.random.uniform(-2,2),4)   # simulate a small number
            return Node(value=value, parent=parent)
    
    else:
        ### UNARY OPERATIONS
        if np.random.rand() < UN_BIN_PROB and list(available_unary_op):   
            op = random.choice(list(available_unary_op))    # choose one randomly
            left = assemble_tree(num_dimensions, depth - 1, max_grade, flag_un_ele)
            return Node(value=None, operation = op, left=left, right = None, parent=parent)
        
        ### BINARY OPERATIONS
        else:             
            op = np.random.choice(bin_operations, p=P_BIN_OP)    

            # Generating substrees in a recursive way
            left = assemble_tree(num_dimensions, depth - 1, max_grade, flag_un_ele)
            right = assemble_tree(num_dimensions, depth - 1, max_grade, flag_un_ele)

            if op == '/' and isinstance(right.value, float):  # small check for zero division
                while op == '/' and abs(right.value) < 1e-10:
                    op = random.choice(bin_operations)
            return Node(value=None, operation = op, left=left, right=right, parent=parent)





def assemble_tree_with_vars(operation=None, num_dim=1, ele1=None, ele2=None):
    dim1 = None
    dim2 = None 
    value1 = None 
    value2 = None 

    if ele2 != None:   # bianry operation 

        if isinstance(ele1, tuple):
            dim1 = ele1[1] 
            value1 = ele1[0]
        else:
            value1 = ele1  

        if isinstance(ele2, tuple):
            dim2 = ele2[1]
            value2 = ele2[0]
        else:
            value2 = ele2
                
        # Create nodes for var1 and var2
        left_node = Node(value=value1, dim=dim1)
        right_node = Node(value=value2, dim=dim2)

        # Create the root node with the operation
        root_node = Node(operation=operation, left=left_node, right=right_node)
        left_node.parent = root_node
        right_node.parent = root_node

        # Create and return the tree
        tree = IndividualTree(root=root_node, num_dimensions=num_dim) 
        tree.update_properties()
        return tree
     
    elif ele1 != None and operation != None:     # unary operation
        if isinstance(ele1, tuple):
            dim1 = ele1[1] 
            value1 = ele1[0]
        else:
            value1 = ele1  


        left_node = Node(value=value1, dim=dim1)
        right_node = None

        root_node = Node(operation=operation, left=left_node, right=right_node)
        left_node.parent = root_node

        tree = IndividualTree(root=root_node, num_dimensions=num_dim) 
        tree.update_properties()
        return tree



    elif (ele1 != None and ele2 == None) and operation == None: # single constant/variable
        dim1 = None
        if isinstance(ele1, tuple):
            dim1 = ele1[1] 
            value1 = ele1[0]
        else:
            value1 = ele1  
 
        root_node = Node(value = value1, dim=dim1, operation=None, left=None, right=None, parent = None) 
        tree = IndividualTree(root=root_node, num_dimensions=num_dim) 
        tree.update_properties()
        return tree

    else:
        print("cannot create tree")
        return 




def BasicLinearPopulation(num_dim):

    population0 = []
    for i in range(1, num_dim + 1):
        tree0 = assemble_tree_with_vars(num_dim = num_dim, ele1=('x', i))
        population0.append(tree0)
 
    population1 = []

    # Generate all combinations for  multiplicative (x_i * x_j)
    for i in range(1, num_dim + 1):
        for j in range(i, num_dim + 1):
            if i != j:  # Exclude cases where i == j
                tree_mul = assemble_tree_with_vars("*", num_dim, ele1=('x', i), ele2=('x', j))
                population1.append(tree_mul)

    population2 = []
    if num_dim > 1:
        for i in range(1, num_dim + 1):
            for j in range(i, num_dim + 1):
                if i != j:  # Exclude cases where i == j
                    tree = assemble_tree_with_vars("+", num_dim, ele1=('x', i), ele2=('x', j))
                    population2.append(tree)  
    population3 = []
    if num_dim > 2:
        for i in range(1, num_dim + 1):
            for j in range(i, num_dim + 1):
                for k in range(j, num_dim + 1):
                    if i != k and j != k and i !=j:  
                        tree1 = assemble_tree_with_vars("+", num_dim, ele1=('x', i), ele2=('x', j))
                        node = Node(value="x", dim=k)
                        root = Node(operation="+", left=tree1.root, right=node)
                        node.parent = root                    
                        tree2 = IndividualTree(num_dimensions = num_dim, root = root)
                        tree2.update_properties()
                        population3.append(tree2)  
    population4 = []            
    for tree in population0 + population1 + population2 + population3:
        t1 = tree.copy()
        node = Node(value=-1)
        root = Node(operation="*", left=t1.root, right=node)
        node.parent = root
        t1.root.parent = root
        t1 = IndividualTree(num_dimensions = num_dim, root = root)
        population4.append(t1)
        
 
    return population0 + population1 + population2 + population3 + population4
    


def BasicExponentialPopulation(num_dim, population):
    pop1 = []
    for tree in population:
        t1 = tree.copy()
        root_tree = t1.root.copy()
        node = Node(operation = "exp", left=root_tree, right=None)
        root_tree.parent = node
        exp_tree = IndividualTree(num_dimensions = num_dim, root = node)
        exp_tree.update_properties()
        pop1.append(exp_tree)
    return pop1

     
 

        
def create_pop(degree = 3, depth = 2, num_dimensions =1, n_pop = 50, flag_un_ele = False):    # only poly if flag_un_ele = False
   
    poly_pop = []
    while len(poly_pop) < n_pop:
        current_d = np.random.randint(1, depth + 1)  # Sampling a random depth
        ind = assemble_tree(num_dimensions, current_d, max_grade=degree, flag_un_ele = flag_un_ele)
        ind_tree = IndividualTree(root = ind, num_dimensions = num_dimensions)
        
        if ind_tree.get_num_x_dim() == 0:  # If there are no 'x' variables
           if random.random() < 0.8:  # Don't add the tree with probability 
               continue
            
        poly_pop.append(ind_tree)
    return poly_pop 



def evaluate_population(population, X, y_true):
    fitness_scores = []
    for individual in population:
        mse = individual.mse_individual(X, y_true)
        fitness_scores.append(mse)
    return fitness_scores

In [25]:
def SubtreeMutation(tree, depth_max, degree = 3, flag_un_ele = False): # Change a subtree 
    t1 = tree.copy()
    new_depth = max(1, int((depth_max - t1.depth)/2))    # smaller depth
    subtree = assemble_tree(x.shape[1], new_depth, degree, flag_un_ele) # create random subtree (or leaf)
    
    not_changed = True   ### True if the individual is the original

    flag = True
    while flag: 
        node = t1.get_random_node()  # find mutating node 
    
        if node.left is not None and node.right is not None:  # the node is a binary operation
            if random.randint(0,1) == 1:
                node.left = subtree
                subtree.parent = node
                flag=False
                not_changed = False
            else:
                node.right = subtree
                subtree.parent = node
                flag=False
                not_changed = False
        elif node.left is not None:  # the node is a unary operation
            node.left = subtree
            subtree.parent = node
            flag = False
            not_changed = False

        elif (node.left is None and node.right is None):  # root or leaf
            flag = False

    if not_changed == False:
        t1.update_properties()  # Recalculate depth and node list
        return t1
    else:
        return tree   # No changes has been made





def PointMutation(tree, degree=3, flag_un_ele = False): # Change binary operation/unary operation/leaf value 
    t1 = tree.copy()
    node = t1.get_random_node() 

    not_changed = True

    if node.value != None:  # leaf case
        not_changed = False
        ### x
        if (np.random.rand() < X_PROB) and tree.num_dimensions > 1:      # It is possible to find another xi 
            dim_old = node.x_dim
            node.x_dim = np.random.randint(1,t1.num_dimensions+1)
            while (dim_old == node.x_dim):    # I want to find something new
                node.x_dim = np.random.randint(1,t1.num_dimensions+1)
        ### constant
        else: 
            sampled_value = random.choice([0, 1])
            if sampled_value == 0:
                node.value = round(np.random.uniform(-10,10),4)  # simulate a number between -10 and 10 and rounding it with 3 digits
            else:
                node.value = round(np.random.uniform(-1,1),4)   # simulate a small number
    else:
        not_changed = False
        ### unary operation
        if node.right == None: 
            available_un_op = available_un_func(max_grade = degree, flag_un_ele = flag_un_ele)
            if len(available_un_op) > 1:   # If it is possible to find another operation
                old_operation = node.operation
                node.operation = random.choice(list(available_un_op))
                if node.operation == old_operation:    # I want that something changes
                    node.operation = random.choice(list(available_un_op))

        ### binary operation
        else: # binary case
            if len(bin_operations) > 1:   # If it is possible to find another operation
                old_operation = node.operation
                node.operation = np.random.choice(bin_operations, p = P_BIN_OP)
                if node.operation == old_operation:     # I want that something changes
                    node.operation = np.random.choice(bin_operations, p = P_BIN_OP)

    if not_changed == False:
        t1.update_properties()  # Recalculate depth and node list
        return t1
    else:
        return tree   # No changes has been made


def SubTreeCollapse(tree):      # Collapse a subtree to a leaf
    t1 = tree.copy()
    
    not_changed = True
    flag = True
    if len(t1.nodes_list) == 1:  # Is only root
        flag = False

    while flag:
        node = t1.get_random_node()
        if node != t1.root:  # not in the case of root
            not_changed = False
            node.left = None
            node.right = None
            node.operation = None
            
            # Choose x or a constant for the new leaf
            if np.random.rand() < X_PROB: 
                node.x_dim = np.random.randint(1,t1.num_dimensions+1)
                node.value = "x"
            else: 
                sampled_value = random.choice([0, 1])
                if sampled_value == 0:
                    node.value = round(np.random.uniform(-10,10),4)  # simulate a number between -10 and 10 and rounding it with 3 digits
                else:
                    node.value = round(np.random.uniform(-1,1),4)   # simulate a small number
            flag = False
            
    if not_changed == False:
        t1.update_properties()  # Recalculate depth and node list
        return t1
    else:
        return tree   # No changes has been made





def LRMutations(tree):  # Swap Left and Right
    t1 = tree.copy() 

    not_changed = True

    if len(t1.nodes_list) == 1:    # only root, nothing to swap
        return tree    # Return the original tree

    node = t1.get_random_node()
    r_temp = node.right
    l_temp = node.left

    if r_temp is not None and l_temp is not None:      #binary operation
        not_changed = False
        node.right = l_temp
        node.left = r_temp
        node.left.parent = node  
        node.right.parent = node
        t1.update_properties()
        return t1 
    else:              # no children to swap (unary case or leaf case) => TRY AGAIN
        i = 0
        while  i <= len(t1.nodes_list):  # try again
                node = t1.get_random_node()
                r_temp = node.right
                l_temp = node.left
                if r_temp is not None and l_temp is not None:     # binary operation was found
                    node.right = l_temp
                    node.left = r_temp
                    node.left.parent = node  
                    node.right.parent = node
                    t1.update_properties()
                    not_changed = False
                    return t1 
                i += 1
            
    if not_changed == False:
        t1.update_properties()  # Recalculate depth and node list
        return t1
    else:
        return tree   # No changes has been made
    



def HoistMutation(tree, max_attempts = 23):
    t1 = tree.copy()
    not_changed = True

    if len(t1.nodes_list) == 1:
        flag = False
        not_changed = True
    i = 0
    while i <= max_attempts:
        node = t1.get_random_node()

        if node.value == None:  # Not a leaf
            # Disconnect the node from its current parent if it has one
            if node.parent:
                if node.parent.left == node:
                    node.parent.left = None
                elif node.parent.right == node:
                    node.parent.right = None

            t1.root = node 
            node.parent = None

            # Attach children if they exist
            if node.left:
                node.left.parent = node
            if node.right:
                node.right.parent = node  

            if node.operation:  # If it's an internal node, update the value if necessary
                node.value = None
            not_changed = False
            break ### Change occurred
        i += 1
    if not_changed == False:
        t1.update_properties()  # Recalculate depth and node list
        return t1
    else:
        return tree   # No changes has been made







def Expansion(tree, expansion_factor = 1, degree=3, flag_un_ele = False):
    t1 = tree.copy()
    not_changed = True

    max_depth = math.ceil(expansion_factor*t1.depth) if math.ceil(expansion_factor*t1.depth) > 1 else math.ceil(expansion_factor*t1.depth) + 1
    depth = np.random.randint(1,max_depth+1)
    flag = True
    while flag:
        node = t1.get_random_node()
        if node.value != None:   # we are in a leaf
            not_changed = False
            subtree = assemble_tree(t1.num_dimensions, depth, degree, flag_un_ele)
            node.left = subtree.left
            node.right = subtree.right
            node.value = subtree.value
            node.operation = subtree.operation
            
            if node.left:
                node.left.parent = node
            if node.right:
                node.right.parent = node
            flag = False
    if not_changed == False:
        t1.update_properties()  # Recalculate depth and node list
        return t1
    else:
        return tree   # No changes has been made


def HalfRecombination(tree1, tree2):   # left from one parent, right from the other one
    t1 = tree1.copy()
    t2 = tree2.copy()
    
    # Not possilbe if one is leaf
    if t1.root.left is None and t1.root.right is None or t2.root.left is None and t2.root.right is None:   
        return tree1, tree2
    
    # If both have an unary operation or one is binary one unary
    elif (t1.root.right is None) or (t2.root.right is None):
        left_1_temp = t1.root.left
        left_2_temp = t2.root.left
        t1.root.left = left_2_temp
        t2.root.left = left_1_temp
        
        if t1.root.left:
            t1.root.left.parent = t1.root
        if t2.root.left:
            t2.root.left.parent = t2.root
        
        t1.update_properties()
        t2.update_properties()
        return t1,t2
    
    else:
        left_1_temp = t1.root.left
        left_2_temp = t2.root.left
        t1.root.left = left_2_temp
        t2.root.left = left_1_temp
        
        right_1_temp = t1.root.right
        right_2_temp = t2.root.right
        t1.root.right = right_2_temp
        t2.root.right = right_1_temp
        
        if t1.root.left:
            t1.root.left.parent = t1.root
        if t1.root.right:
            t1.root.right.parent = t1.root
        
        if t2.root.left:
            t2.root.left.parent = t2.root
        if t2.root.right:
            t2.root.right.parent = t2.root
            
        t1.update_properties()
        t2.update_properties()
        return t1,t2
        
     

    
def Recombination(tree1, tree2):
    t1 = tree1.copy() 
    t2 = tree2.copy() 
    
    node1 = t1.get_random_node()
    node2 = t2.get_random_node()
    
    if node1.left is None and node1.right is None or node2.left is None and node2.right is None:   
        return tree1, tree2
    
    # If both have an unary operation or one is binary one unary
    elif (node1.right is None) or (node2.right is None):
        left_1_temp = node1.left
        left_2_temp = node2.left
        node1.left = left_2_temp
        node2.left = left_1_temp
        
        if node1.left:
            node1.left.parent = node1
        if node2.left:
            node2.left.parent = node2
            
        t1.update_properties()
        t2.update_properties()
        return t1,t2
    
    else:
        left_1_temp = node1.left
        left_2_temp = node2.left
        node1.left = left_2_temp
        node2.left = left_1_temp
        
        right_1_temp = node1.right
        right_2_temp = node2.right
        node1.right = right_2_temp
        node2.right = right_1_temp
        
        if node1.left:
            node1.left.parent = node1
        if node1.right:
            node1.right.parent = node1
        
        if node2.left:
            node2.left.parent = node2
        if node2.right:
            node2.right.parent = node2
            
        t1.update_properties()
        t2.update_properties()
        return t1,t2
    

    
def ApplyMutation(individual, max_depth, degree, flag_un_ele):
    mutation_choice = np.random.choice(["subtree", "point", "collapse", "hoist", "lr", "exp"], p=P_MUTATION)
    #print('\t\tM ',mutation_choice)
    if mutation_choice == "subtree":
        return SubtreeMutation(individual, max_depth, degree, flag_un_ele)
    elif mutation_choice == "point":
        return PointMutation(individual, degree,flag_un_ele)
    elif mutation_choice == "collapse":
        return SubTreeCollapse(individual)
    elif mutation_choice == "hoist":
        return HoistMutation(individual)
    elif mutation_choice == "lr":
        return LRMutations(individual)
    elif mutation_choice == "exp":
        return Expansion(individual)
    
    
    
def ApplyRecombination(individual1, individual2):
    rec_choice = np.random.choice(["half", "normal"], p = P_RECOMBINATION)
    #print('\t\tR ',rec_choice)
    if rec_choice == "half":
        return HalfRecombination(individual1, individual2)
    elif rec_choice == "normal":
        return Recombination(individual1, individual2)

In [26]:
def TournamentSelection(population, fitnesses, tournament_size, k_ind = 1, temperature = 1):
    ### RANDOM
    if random.random() < temperature / 1.8:
        #print('selected randomly')
        if k_ind == 1:   # mutation
            idx = random.sample(range(len(population)), 1)
            return population[idx[0]]
        else:    # recombination
            idx1, idx2 = random.sample(range(len(population)),2)
            return population[idx1], population[idx2]
    
    ### TURNAMENT
    else:
        #print('tournament')
        selected_idx = random.sample(range(len(population)), min(tournament_size, len(population)))   # We select some individuals from the population
        tournament_population = [population[i] for i in selected_idx]
        tournament_fitnesses = [fitnesses[i] for i in selected_idx]
        #print('\ttournament fitnesses:', tournament_fitnesses)

        sorted_indices = sorted(range(len(tournament_fitnesses)), key=lambda i: tournament_fitnesses[i])
        if k_ind == 1:   # mutation
            #print('\t\t selected: ', tournament_fitnesses[sorted_indices[0]])
            p = tournament_population[sorted_indices[0]]
            return p
        else:    # recombination
            #print('\t\tselected: ', tournament_fitnesses[sorted_indices[0]], 'and', tournament_fitnesses[sorted_indices[1]])
            p1 = tournament_population[sorted_indices[0]]
            p2 = tournament_population[sorted_indices[1]]
            return p1, p2

    
    


    
def ApplyExtinction(X, population, max_depth, extinction_factor=0.2, max_ext_n=10):    
    survived_population = []
    removed_count = 0

    for individual in population:
        removed = False

        ##### Extintion because of depth => the smaller tree has a higher probabilty to survive 
        depth = individual.depth
        extinction_prob = extinction_factor * (depth / max_depth)
        if (np.random.uniform(0, 1) < extinction_prob):   # Dead
            removed_count += 1
            removed = True  # We remove the individual

      

        ##### Too many extintions
        if removed_count >= max_ext_n:    
            break
    
    return survived_population

def OptimalPruning(tree,X,y):
    list_leaves  = []
    for node in tree.nodes_list:
        if node.left == None and node.right == None:
            list_leaves.append(node)
    best_tree = tree 
    best_fit = tree.mse_individual(X,y)
    initial_fit = best_fit

    for n in list_leaves:
        if not (n.right == None and n.left == None):
            temp_tree = tree.copy()
            EliminateNode(temp_tree, n)
            temp_fit = temp_tree.mse_individual(X,y)
            if temp_fit < best_fit:
                best_tree = temp_tree
                best_fit = temp_fit
    best_tree.update_properties() 
    return best_tree

 



def EliminateNode(tree, node):
    father = node.parent  
    if father == None:
        if node.right != None:
            node = node.right
        else:
            node = node.left
        return


    if father.right == node and father.parent != None: # bianry operation -> the operation is useless
        temp = father.parent
        father = father.left
        father.parent = temp
        tree.update_properties()
        return
    elif father.left == node and father.right != None and father.parent != None:
        temp = father.parent
        father = father.right
        father.parent = temp
        tree.update_properties()
        return
    else: # unary operation: we want to remove the single father child,
        if father.parent == None: #check root case
            if father.right == node:
                father = father.left
                father.parent = None
            else:
                father = father.right
                father.parent = None
        else:
            EliminateNode(tree, father)
        tree.update_properties()
    return





def CoefImprovement(tree, X, y):
  
    t1 = tree.copy()
      
    node_list = t1.nodes_list
    leaves = []
    for n in node_list:
        if n.value != None and not isinstance(n.value,str):
            leaves.append(n)
            
    if not leaves:
        return t1
    
    leaf = random.choice(leaves)
    
    initial_score = t1.mse_individual(X,y) 
    best_score =  t1.mse_individual(X,y)
    leaf.value = -leaf.value
    score = t1.mse_individual(X,y)
    if best_score < score:
        leaf.value = -leaf.value 
    else:
        best_score = 1
    
    factors = np.array([0.01, 0.1,0.5,2, 10, 100])
    for _ in range(0,5):
        values = np.array((6,1))
        values = leaf.value
        factors[:3] /= 2  
        factors[3:] *= 2  
        values = values * factors

        best_value = leaf.value
        for v in values:
            leaf.value = v
            score = t1.mse_individual(X,y)
            if score < best_score:
                best_score = score
                best_value = v

        leaf.value = best_value
    
    t1.update_properties()
    return t1

In [27]:
def compare_trees(tree1, tree2):     # compare 3 trees
    if tree1 is None and tree2 is None:
        return True
    if tree1 is None or tree2 is None:
        return False

    # Compare the root node and tree properties (depth, num_dimensions, num_x_dim)
    return compare_nodes(tree1.root, tree2.root) and tree1.depth == tree2.depth and tree1.num_dimensions == tree2.num_dimensions and tree1.num_x_dim == tree2.num_x_dim


def compare_nodes(node1, node2):
    # If both nodes are None, they are equal
    if node1 is None and node2 is None:
        return True
    
    # If one node is None and the other is not, they are not equal
    if node1 is None or node2 is None:
        return False
    
    # Compare the value, dimension, and operation of both nodes
    if node1.value != node2.value:
        return False
    if node1.x_dim != node2.x_dim:
        return False
    if node1.operation != node2.operation:
        return False
    
    # Recursively compare the left and right children
    left_equal = compare_nodes(node1.left, node2.left)
    right_equal = compare_nodes(node1.right, node2.right)
    
    return left_equal and right_equal

In [28]:
def remove_empty(population):
    counter = 0
    for i in range(len(population) - 1, -1, -1):   #starting from the end
        if population[i] == None:
            del population[i]
            counter += 1
        elif population[i].root == None:
            del population[i]
            counter += 1
        elif population[i].root.operation == None and population[i].root.value == None: 
            del population[i]
            counter += 1
        elif (population[i].root.left == None and population[i].root.right == None and population[i].root.value == None):
            del population[i]
            counter += 1
        elif (population[i].root.left == None and population[i].root.right == None and population[i].root.operation != None):
            del population[i]
            counter += 1
        elif (population[i].root.left == None and population[i].root.right != None):
            del population[i]
            counter += 1

    if counter>0:
        print('c:', counter)        
    return population



def EvolvePopulation(X, y_true, population, n_pop, fitness_scores, recombination_rate, max_depth, degree, 
                     current_t, tournament_size, extintion_coeff, flag_un_ele = False):
  

    ####### Select parents, number proportional to temperature
    num_new_individuals = int((current_t * len(population))*1.5)      # At the beginning higher temperature, so more offsprings
    new_individuals = []
    n=0
    while n < num_new_individuals:
        if random.random() < recombination_rate:
            parent1, parent2 = TournamentSelection(population, fitness_scores, tournament_size, k_ind=2)
            child1, child2 = ApplyRecombination(parent1, parent2) 
            #if(child1 == parent1):
                #print('no change')
            if child1 != parent1:   # Does not have the same pointer, meaning that changes have been made
                new_individuals.append(child1)

            #if(child2 == parent2):
                #print('no change')
            if child2 != parent2:
                new_individuals.append(child2)
    
        else:
            parent = TournamentSelection(population, fitness_scores, tournament_size, k_ind=1)
            child = ApplyMutation(parent, max_depth, degree, flag_un_ele)
            #if(child == parent):
                #print('no change')
            if child != parent: 
                new_individuals.append(child)
        n +=1

        
    population.extend(new_individuals)
    print("\t\t Num offsprings:", len(new_individuals))

    population = remove_empty(population)    # Clean empty trees after changes
    



    ####### Evaluate population
    new_fitnesses = evaluate_population(population, X, y_true)
    sorted_indices = np.argsort(new_fitnesses)
    sorted_population = [population[i] for i in sorted_indices]

    n_equal = 0
    i = 0
    while i < len(sorted_population)-1:    # comparison of the individuals that might be equal! If they are equal after the sort they will be near!
        tree1 = sorted_population[i]
        tree2 = sorted_population[i+1]
        if compare_trees(tree1, tree2): 
            #print(tree1.str_representation(),' and ', tree2.str_representation())
            n_equal += 1
            sorted_population.pop(i)     # Remove the duplicate tree from the list
            i = i   # Same i as now the t+1 individual is in i 
        else:
            i += 1
    print("\t\t Number removed as identical:", n_equal)

    n_best = 5
    population_best = sorted_population[:n_best]   # the best will always survive
    population_rest = sorted_population[n_best:]   # the others might not

    
    ####### Extintion (with a certain probability)
    before = len(population)    # n_population (whole) before
    max_ext = max(0, len(population_rest) - (n_pop - n_best) - 10)   # Not too many extintions
    #print('max', max_ext)
    if random.choice([0,1])==1:   # We apply extintion
        population_after_ext = ApplyExtinction(X, population_rest, max_depth, extintion_coeff, max_ext)               
    else:
        population_after_ext = population_rest
    final_population = population_best + population_after_ext    # No need to do sorting again
    after = len(final_population)
    print("\t\t Extinguished:", before-after)
    
    
    
    ####### Survivor selection
    num_best = int(n_pop * (0.9 * current_t))    # Always more best individuals
    num_random = n_pop - num_best
    top_individuals = final_population[:num_best]    # The best ones
    remaining_population = final_population[num_best:]   # The others
    random_individuals = random.sample(remaining_population, min(num_random, len(remaining_population)))
    survived_population = top_individuals + random_individuals   # We choose some that are not the best too
    population = remove_empty(population)


    return survived_population


def Exploitation(population, X, y_true):  
    for _ in range(int(len(population)/5)):   # Repeat for 50% of the population
        n = random.randint(0, len(population)-1)   #Chose one individual
        population[n] = CoefImprovement(population[n],  X, y_true)

         
    for _ in range(int(len(population)/5)):    # Repeat for 50% of the population
        n = random.randint(0, len(population)-1)    # Chose one individual
        population[n] = OptimalPruning(population[n], X, y_true)  

    fitness_scores = evaluate_population(population, X, y_true)
    #print(min(fitness_scores))
    return population
    

In [29]:
def print_tree_population_statistics(population):
    # Collect all depths
    depths = [tree.depth for tree in population]
    
    # Calculate statistics
    mean_depth = np.mean(depths)
    max_depth = np.max(depths)
    min_depth = np.min(depths)
    median_depth = np.median(depths)
    std_depth = np.std(depths)
    
    # Print the statistics
    print(f"Mean Depth: {mean_depth}")
    print(f"Max Depth: {max_depth}")
    print(f"Min Depth: {min_depth}")
    return int(mean_depth)
 

In [30]:
def Pop1(xx, yy, population1, population_size=50,  recombination_rate=0.7, max_depth=100, 
                       degree=3,  flag_un_ele_in_between = False, current_t = 0.1, 
                       tournament_size_coeff = 0.1, extintion_coeff = 0.1, best_fitness1 = float("inf"), best_individual1 = None):
        
        print("---------------------")
        pop1_mean = print_tree_population_statistics(population1)
        print("---------------------")
        
        tournament_size = int(population_size*tournament_size_coeff)    # size of the coefficient 
        fitness_score1 = evaluate_population(population1, xx, yy)

        
        n_dim = xx.shape[1]   # n of columns
        
        
        
        ### ADD RANDOM POPULATION
        num_random_individuals = int((len(population1))/5)   
        population_linear = BasicLinearPopulation(n_dim)
        population_exponential = BasicExponentialPopulation(n_dim, population_linear)
        random_pop = create_pop(num_dimensions=n_dim, degree=2, depth=max(2,pop1_mean-1), n_pop=num_random_individuals,  flag_un_ele= flag_un_ele_in_between) 
        random_pop = random_pop + population_linear
        for i,rand_tree in enumerate(random_pop):
            random_pop[i] = CoefImprovement(rand_tree,  xx, yy)
        random_pop = random_pop + population_linear + population_exponential

 
        fitness_scores_random_pop = evaluate_population(random_pop, xx, yy)
        population1.extend(random_pop)
        fitness_score1.extend(fitness_scores_random_pop)
        

        if np.random.uniform(0, 1) < 1: #current_t:    # Chance of exploration very high at the beginning
            print("\t Exploration Pop1")
            depthEVO=int(max(3,pop1_mean*2))
            population1 = EvolvePopulation(xx, yy, population1, population_size, fitness_score1, recombination_rate, depthEVO, 
                                        degree, current_t, tournament_size, extintion_coeff, flag_un_ele_in_between)
        
 
        fitness_score1 = evaluate_population(population1, xx, yy)
        min_fitness1 = min(fitness_score1)
        if min_fitness1 < best_fitness1:
            best_fitness1 = min_fitness1
            best_individual1 = population1[np.argmin(fitness_score1)].copy()

        #print(f"Best MSE (POP 1) = {best_fitness1:.6f}")
 
        return population1, best_individual1, best_fitness1

In [31]:
def Pop2(xx, yy, population, population_size=50,  recombination_rate=0.7, max_depth=100, 
                       degree=3,  flag_un_ele_in_between = False, current_t = 0.1, 
                       tournament_size_coeff = 0.1, extintion_coeff = 0.1, best_fitness2 = float("inf"), best_individual2 = None):
        
        print("---------------------")
        pop_mean = print_tree_population_statistics(population)
        print("---------------------")
        
        tournament_size = int(population_size*tournament_size_coeff)    # size of the coefficient 

        n_dim = xx.shape[1]   # n of columns
        
        
        fitness_score = evaluate_population(population, xx, yy)
 
        

        if np.random.uniform(0, 1) < 1: #current_t:    # Chance of exploration very high at the beginning
            print("\t Exploration Pop 2")
            
            depthEVO=int(max(3,pop_mean*1.5))
            population = EvolvePopulation(xx, yy, population, population_size, fitness_score, recombination_rate, depthEVO, 
                                        degree+1, current_t, tournament_size, extintion_coeff, flag_un_ele_in_between)
        
 

        fitness_score = evaluate_population(population, xx, yy)
        min_fitness2 = min(fitness_score)
        if min_fitness2 < best_fitness2:
            best_fitness2 = min_fitness2
            best_individual2 = population[np.argmin(fitness_score)].copy()
        
        #print(f"Best MSE (POP 2) = {best_fitness2:.6f}")
        
         

        return population, best_individual2, best_fitness2

In [32]:
def Pop3(xx, yy, population, population_size=50,  recombination_rate=0.7, max_depth=100, 
                       degree=3,  flag_un_ele_in_between = False, current_t = 0.1, 
                       tournament_size_coeff = 0.1, extintion_coeff = 0.1, best_fitness3 = float("inf"), best_individual3 = None):
        
        print("---------------------")
        pop_mean = print_tree_population_statistics(population)
        print("---------------------")
        
        tournament_size = int(population_size*tournament_size_coeff)    # size of the coefficient 

        n_dim = xx.shape[1]   # n of columns
        
        
        fitness_score = evaluate_population(population, xx, yy)
        

        if np.random.uniform(0, 1) < current_t:    # Chance of exploration very high at the beginning
            print("\t Exploration POP 3")
            depthEVO = int(pop_mean)
            population = EvolvePopulation(xx, yy, population, population_size, fitness_score, recombination_rate, depthEVO, 
                                        degree, current_t, tournament_size, extintion_coeff, flag_un_ele_in_between)
        
        if np.random.uniform(0, 1) < ((1 - current_t/3)):   # Chance of exploitation, lower at the beginning   
            print("\t 3rd Exploitation POP 3 (on survivors)")
            population = Exploitation(population, xx, yy)


        fitness_score = evaluate_population(population, xx, yy)
        min_fitness3 = min(fitness_score)
        if min_fitness3 < best_fitness3:
            best_fitness3 = min_fitness3
            best_individual3 = population[np.argmin(fitness_score)].copy()
        
        #print(f"Best MSE (POP 3)= {best_fitness3:.6f}")
        
         

        return population, best_individual3, best_fitness3

In [33]:

def SymbolicRegressionMULTIPOP(xx, yy, population_size=50, max_generations=100, recombination_rate=0.7, max_depth=100, 
                       degree=3,  flag_un_ele_beginning = False, flag_un_ele_in_between = False, t_coeff = 0.1, 
                       tournament_size_coeff = 0.1, extintion_coeff = 0.1):   
    



    
    t_start = 1.0
    current_t = t_start
    n_dim = xx.shape[1]   # n of columns
    


    # INIZIALIZZAZIONE POPOLAZIONE INIZIALE
    population_linear = BasicLinearPopulation(num_dim=n_dim)
    population_tree = create_pop(degree = 2, depth = 3, num_dimensions =n_dim, n_pop = population_size, flag_un_ele = flag_un_ele_beginning)
    population1 = population_linear + population_tree

    best_individual1 = None
    best_individual2 = None
    best_individual3 = None
    best_fitness1 = float("inf")
    best_fitness2 =  float("inf")
    best_fitness3 =  float("inf")
    population2 = []
    population3 = []
    
    for generation in range(max_generations):
        
        population1 = remove_empty(population1)
        population2 = remove_empty(population2)
        population3 = remove_empty(population3)



        # POPULATION 1 -------------------------------------------------------------------------------
        print("\n\n******************************************************")
        print(f"Generation {generation + 1}")
        print("\n*********************")
        print("POPULATION 1:")

 
        population1, best_individual1, best_fitness1 = Pop1(xx = xx,yy= yy,population1=population1, population_size= population_size,recombination_rate= recombination_rate, max_depth=max_depth, 
                       degree=degree,  flag_un_ele_in_between=flag_un_ele_in_between , current_t=current_t, 
                       tournament_size_coeff=tournament_size_coeff , extintion_coeff=extintion_coeff,best_fitness1=best_fitness1,best_individual1= best_individual1)
 
        
        print("\n----") 
        print("migliore pop 1:")
        print(best_individual1.str_representation())
        print(f"mse train: {best_fitness1}")
        print(f"mse test: {best_individual1.mse_individual(x_test, y_test)}")
        print("----")
        print("----")


        # POPOLAZIONE 2-------------------------------------------------------------------------------
        print("\n*********************")
        print("POPULATION 2:")
        
        if len(population2) >= population_size:
            population2 = population2[:population_size] 

            population2, best_individual2, best_fitness2 = Pop2(xx, yy,population2,  population_size, recombination_rate, max_depth, 
                       degree,  flag_un_ele_in_between , current_t, 
                       tournament_size_coeff , extintion_coeff,best_fitness2, best_individual2)
            

                
            print("\n----") 
            print("migliore pop 2:")
            print(best_individual2.str_representation())
            print(best_fitness2)
            print(f"mse train: {best_fitness2}")
            print(f"mse test: {best_individual2.mse_individual(x_test, y_test)}")
            print("----")
 

        else:
            print(f"filling... [{len(population2)}/{population_size}]")
            
        
        

        # POPOLAZIONE 3-------------------------------------------------------------------------------
        print("\n*********************")
        print("POPULATION 3:")
        pop3to1 = []
        pop2to3 = []

        if len(population3) >= population_size:
            population3 = population3[:population_size] 
            population3, best_individual3, best_fitness3 = Pop3(xx, yy,population3,  population_size, recombination_rate, max_depth, 
                    degree,  flag_un_ele_in_between , current_t, 
                    tournament_size_coeff , extintion_coeff,best_fitness3, best_individual3)
            
    
            print("\n----") 
            print("migliore pop 3:")
            print(best_individual3.str_representation())
            print(f"mse train: {best_fitness3}")
            print(f"mse test: {best_individual3.mse_individual(x_test, y_test)}")
            print("----")

        else:
                print(f"filling... [{len(population3)}/{population_size}]")
            


        # WE EXANGE THE DEEPEST ONES - from POP1 to POP2
        percentage_exchanged = 0.2   
        num_exhanged = int(percentage_exchanged * population_size)
        sorted_trees = sorted(population1, key=lambda tree: tree.depth, reverse=True)
        temp = sorted_trees[:num_exhanged]
        pop1to2 = [tree.copy() for tree in temp]
        
        if len(population3) >= population_size:
            population1 = sorted_trees[num_exhanged:]


        # WE TAKE THE BEST TREES and WE PRUNE AND ELEIMINATE SUBTREES TO FIND BETTER SMALLER TREES - those will go from POP2 to POP3 
        if len(population2) >= population_size:
            fitness_score_pop2 = evaluate_population(population2, xx, yy)
            best_indices = np.argsort(fitness_score_pop2)[:num_exhanged*2] 
            best_indices = random.sample(list(best_indices), num_exhanged)
            pop2to3 = [tree.copy() for idx, tree in enumerate(population2) if idx in best_indices] #consideriamo solo quelli estratti

            for tree in pop2to3: 
                temp = tree.copy()
                temp = SubTreeCollapse(temp)
                for _ in range(0,5):
                    temp = SubTreeCollapse(tree)
                    if temp.mse_individual(xx,yy) < tree.mse_individual(xx,yy):
                        tree = temp

            population2 = sorted(population2, key=lambda tree: tree.depth, reverse=True)
            population2 = population2[num_exhanged:]

    
        # FROM POP3 to POP1, by TOURNAMENT SELECTION
        if len(population3) >= population_size:
            fitness_score_pop2 = evaluate_population(population3, xx, yy)
            best_indices = np.argsort(fitness_score_pop2)[:num_exhanged*2]
            best_indices = random.sample(list(best_indices), num_exhanged)
            pop3to1 = [tree.copy() for idx, tree in enumerate(population3) if idx in best_indices] #consideriamo solo quelli estratti
    
            population3 = [tree for idx, tree in enumerate(population3) if idx not in best_indices] #rimuoviamo

 
        population1 = population1 + pop3to1
        population2 = population2 + pop1to2
        population3 = population3 + pop2to3

        #fitness_score_pop = evaluate_population(population1, xx, yy)
        current_t -= t_coeff
        
        
    individuals = [best_individual1,best_individual2, best_individual3 ]
    top_fitnesses= [best_fitness1, best_fitness2,best_fitness3]
    i = np.argmin(top_fitnesses)
    return individuals[i]

In [35]:
### We might have used different seed as we tested separatedly on different PC
# np.random.seed(23)
# random.seed(23)


POP_SIZE = 80
MAX_GENERATIONS = 100

TEMP_COEFF = 1/MAX_GENERATIONS   # If higher more EXPLORATION, if lower more EXPLOITATION

REC_RATE = 0.3                   # MUTATION_RATE = 1 - RECOMBINATION_RATE
MAX_DEPTH = 10                   # int((10 * D)/4)
MAX_DEGREE = 4
TOURNAMENT_COEFF = 0.2           # 1 means all the population
EXTINTION_COEFF = 0.1

best_individual = SymbolicRegressionMULTIPOP(
    xx=x_train,  
    yy=y_train,  
    population_size=POP_SIZE,
    max_generations=MAX_GENERATIONS,
    recombination_rate=REC_RATE,
    max_depth=MAX_DEPTH,
    degree=MAX_DEGREE,
    flag_un_ele_beginning = True,      # With True we have also elementary unary functions at the beginning
    flag_un_ele_in_between = True,     # With True we have also elementary unary functions in evolving
    t_coeff=TEMP_COEFF,
    tournament_size_coeff=TOURNAMENT_COEFF,
    extintion_coeff=EXTINTION_COEFF
)
