# Utils

### Imports

In [1]:
import random
from typing import List
from gxgp.node import Node
from utils.operations_dict import basic_function_set, complex_function_set

# If updates on imported files aren't detected, restart the kernel (we'll need to find an automatic solution for this)

import numpy as np
from icecream import ic

from typing import List, Tuple, Dict

### Tree Gen

In [2]:
def generate_random_tree(max_height: int, pc: float, terminal_list: List[str],
                         constants: list[float] = None, p_pick_constant: float = 0.2, p_cut_tree: float = 0.2,
                         verbose: bool = False, cur_depth: int = 0) -> Node:
    """
    Generate a random symbolic expression tree.

    Mandatory Parameters
    ----------
    max_height : int
        The maximum height of the tree. The height of a tree is the length of the longest path from the root to a leaf (e.g. height of a leaf is 0).
    pc : float
        The probability of choosing a complex function over a basic function.
    terminal_list : List[str]
        The terminal list to choose from. Example: ['x0', 'x1', 'x2']

    Optional Parameters
    ----------
    constants : list[float]
        A list of constants that can be used in the tree (default is None).
    p_pick_constant : float
        The probability of choosing a constant over a terminal (default is 0.2).
    p_cut_tree : float
        The probability of cutting the tree early (default is 0.2).    
    verbose : bool    
        Whether to print debug information (default is False).
    cur_depth : int
        The exploration depth (e.g. depth of root is 0)

    Returns
    -------
    Node
        A Node object representing the root of the tree.
    """
    indent = ' ' * (cur_depth * 2)

    # Cut the tree early with probability 0.2
    if (random.random() < p_cut_tree) or max_height == 0:  
        # If constants are provided, choose one with probability p_pick_constant
        if constants is not None and random.random() < p_pick_constant: 
            terminal = random.choice(constants) 
        # Otherwise, pick from the terminal set
        else:                                                
            terminal = random.choice(terminal_list)
        
        if verbose: print(f"{indent}Picked terminal: {terminal}")

        # Set the height of the node to 0
        my_node = Node(terminal)
        my_node.set_height(0)
        return my_node
    else:
        # Choose a complex function with probability pc
        if random.random() < pc:                       
            func = random.choice(list(complex_function_set.keys()))
            if verbose: print(f"{indent}Chose complex function {func}")
            num_children = complex_function_set[func].__code__.co_argcount  # Numero di argomenti della funzione
            children = [generate_random_tree(max_height - 1, pc, terminal_list, constants, p_pick_constant, p_cut_tree, verbose, cur_depth + 1)
                        for _ in range(num_children)]
            
            # Set height
            cur_height = max([child.get_height() for child in children]) + 1
            my_node = Node(complex_function_set[func], children, name=func)
            my_node.set_height(cur_height)
            return my_node
        # Otherwise, choose a basic function
        else:                                           
            func = random.choice(list(basic_function_set.keys()))
            if verbose: print(f"{indent}Chose basic function {func}")
            num_children = basic_function_set[func].__code__.co_argcount  # Numero di argomenti della funzione
            children = [generate_random_tree(max_height - 1, pc, terminal_list, constants, p_pick_constant, p_cut_tree, verbose, cur_depth + 1)
                        for _ in range(num_children)]
            # Set height
            cur_height = max([child.get_height() for child in children]) + 1
            my_node = Node(basic_function_set[func], children, name=func)
            my_node.set_height(cur_height)
            return my_node

def generate_random_tree_with_all_terminal(max_height: int, pc: float, terminal_list: List[str],
                         constants: list[float] = None, p_pick_constant: float = 0.2, p_cut_tree: float = 0.2,
                         verbose: bool = False, cur_depth: int = 0, picked_terminal: set[str]=set()) -> Node:
    """
    Generate a random symbolic expression tree.

    Mandatory Parameters
    ----------
    max_height : int
        The maximum height of the tree. The height of a tree is the length of the longest path from the root to a leaf (e.g. height of a leaf is 0).
    pc : float
        The probability of choosing a complex function over a basic function.
    terminal_list : List[str]
        The terminal list to choose from. Example: ['x0', 'x1', 'x2']

    Optional Parameters
    ----------
    constants : list[float]
        A list of constants that can be used in the tree (default is None).
    p_pick_constant : float
        The probability of choosing a constant over a terminal (default is 0.2).
    p_cut_tree : float
        The probability of cutting the tree early (default is 0.2).    
    verbose : bool    
        Whether to print debug information (default is False).
    cur_depth : int
        The exploration depth (e.g. depth of root is 0)

    Returns
    -------
    Node
        A Node object representing the root of the tree.
    """
    indent = ' ' * (cur_depth * 2)

    # Cut the tree early with probability 0.2
    if (random.random() < p_cut_tree) or max_height == 0:  
        # If constants are provided, choose one with probability p_pick_constant
        if constants is not None and random.random() < p_pick_constant and len(picked_terminal) == len(terminal_list): 
            terminal = random.choice(constants) 
        # Otherwise, pick from the terminal set
        else:                                                
            terminal = random.choice(terminal_list)
            picked_terminal.add(terminal)
        
        if verbose: print(f"{indent}Picked terminal: {terminal}")

        # Set the height of the node to 0
        my_node = Node(terminal)
        my_node.set_height(0)
        return my_node
    else:
        # Choose a complex function with probability pc
        if random.random() < pc:                       
            func = random.choice(list(complex_function_set.keys()))
            if verbose: print(f"{indent}Chose complex function {func}")
            num_children = complex_function_set[func].__code__.co_argcount  # Numero di argomenti della funzione
            children = [generate_random_tree_with_all_terminal(max_height - 1, pc, terminal_list, constants, p_pick_constant, p_cut_tree, verbose, cur_depth + 1,picked_terminal)
                        for _ in range(num_children)]
            
            # Set height
            cur_height = max([child.get_height() for child in children]) + 1
            my_node = Node(complex_function_set[func], children, name=func)
            my_node.set_height(cur_height)
            return my_node
        # Otherwise, choose a basic function
        else:                                           
            func = random.choice(list(basic_function_set.keys()))
            if verbose: print(f"{indent}Chose basic function {func}")
            num_children = basic_function_set[func].__code__.co_argcount  # Numero di argomenti della funzione
            children = [generate_random_tree_with_all_terminal(max_height - 1, pc, terminal_list, constants, p_pick_constant, p_cut_tree, verbose, cur_depth + 1,picked_terminal)
                        for _ in range(num_children)]
            # Set height
            cur_height = max([child.get_height() for child in children]) + 1
            my_node = Node(basic_function_set[func], children, name=func)
            my_node.set_height(cur_height)
            return my_node

### Fitness

In [3]:
import warnings
warnings.simplefilter("error", RuntimeWarning)

# Fitness reverse
def fitness(mytree: Node, vars, labels, verbose=False):
    try:
        output = np.array([mytree(**var) for var in vars])
        if mytree.get_height() > 0:
            mse = 100 * np.square(labels - output).mean()
            mse += mse * 0.01 * mytree.get_height()
        else:
            mse = 100 * np.square(labels - output).mean()
        return mse
    except RuntimeWarning as e:
        if verbose: print(f"caught runtime warning: {e}, setting fitness to inf")
        return np.inf

def fitness_unscaled(mytree: Node, vars, labels, verbose=False):
    try:
        output = np.array([mytree(**var) for var in vars])
        mse = 100 * np.square(labels - output).mean()
        return mse
    except RuntimeWarning as e:
        if verbose: print(f"caught runtime warning: {e}, setting fitness to inf")
        return np.inf
#print(fitness(initialized, vars, labels))

# Inits


### Import a problem

In [4]:
from gxgp import Node
import numpy as np

problem_number = 4
problem = np.load(f'../data/problem_{problem_number}.npz')
input = problem['x']
labels = problem['y']

print("Input shape:", input.shape, " Example of sample: ", input[:, 0])
print("Labels shape:", labels.shape, " Example of label: ", labels[0])

# Terminal set
terminal_list = ['x' + str(i) for i in range(input.shape[0])]

print("terminal_list: ", terminal_list)

print("input shape is ", input.shape)

vars = []
for j in range(input.shape[1]):
    cur_vars = {'x'+str(i): input[i][j] for i in range(input.shape[0])}
    # print("cur_vars is ", cur_vars)
    vars.append(cur_vars)
vars = np.array(vars)

print("vars shape is ", vars.shape)

Input shape: (2, 5000)  Example of sample:  [ 3.15087424 -1.73013313]
Labels shape: (5000,)  Example of label:  1.8823292724754674
terminal_list:  ['x0', 'x1']
input shape is  (2, 5000)
vars shape is  (5000,)


### Generate a tree

In [5]:
from utils.terminal_constants import crammed_constants

height = 10
my_tree = generate_random_tree(height, 0.2, terminal_list, constants=crammed_constants, p_pick_constant=0.4, p_cut_tree=0.01, verbose=True)
# my_tree.draw()

Chose complex function pow
  Chose basic function sub
    Chose complex function log
      Chose basic function mul
        Chose basic function sub
          Chose basic function mul
            Chose complex function log
              Chose complex function inv
                Chose basic function sqrt
                  Chose basic function add
                    Picked terminal: 16.0
                    Picked terminal: 3.0
            Chose basic function sub
              Chose complex function exp
                Chose basic function div
                  Chose complex function sin
                    Picked terminal: x0
                  Chose basic function mul
                    Picked terminal: x0
                    Picked terminal: x0
              Chose basic function sub
                Chose complex function inv
                  Chose basic function div
                    Picked terminal: x1
                    Picked terminal: x1
                Chose basic function

### Extract a formula

In [6]:
formula = str(my_tree)
print(formula)

pow(sub(log(mul(sub(mul(log(inv(sqrt(add(16, 3)))), sub(exp(div(sin(x0), mul(x0, x0))), sub(inv(div(x1, x1)), sqrt(cos(x1))))), div(add(mul(add(sin(x1), div(x0, x0)), add(add(7, 7), sub(14, x0))), sub(add(add(14, x0), inv(x1)), abs(pow(x1, 20)))), add(add(div(add(19, x0), abs(x0)), sub(sqrt(x0), abs(x0))), pow(sub(inv(x0), div(8, x0)), pow(sin(18), pow(x0, x0)))))), sqrt(add(x0, neg(mul(sqrt(sqrt(x1)), cos(sub(x0, x1)))))))), mul(div(mul(abs(sin(abs(div(sqrt(x1), div(x1, x1))))), mul(sub(mul(div(add(x0, x1), sqrt(12)), abs(sqrt(x1))), add(add(sqrt(5), abs(x1)), sqrt(div(11, x1)))), sqrt(sub(sub(add(x1, x1), mul(x1, 12)), neg(neg(6)))))), inv(mul(mul(sqrt(sub(add(x0, x0), sub(x1, 3))), pow(sqrt(abs(x0)), mul(sqrt(x0), add(4, 9)))), add(mul(mul(sqrt(x0), cos(17)), mul(abs(x1), add(x1, x0))), abs(sqrt(add(x1, x1))))))), sqrt(x1))), div(add(mul(mul(mul(add(x0, add(sub(abs(17), sqrt(x1)), sqrt(add(15, x0)))), neg(add(add(exp(x0), sqrt(x0)), add(cos(x1), add(16, 14))))), sub(sin(inv(div(x0, 

# Test

### Testing All Problems

In [7]:
import numpy as np
from gxgp import Node
from utils.terminal_constants import crammed_constants
import sys
sys.path.append('..')
import s328890


for problem_number, my_function in zip([1, 2, 3, 4, 5, 6, 7, 8], [s328890.f1, s328890.f2, s328890.f3, s328890.f4, s328890.f5, s328890.f6, s328890.f7, s328890.f8]):
    problem = np.load(f'../data/problem_{problem_number}.npz')
    input = problem['x']
    labels = problem['y']
    # Terminal set
    terminal_list = ['x' + str(i) for i in range(input.shape[0])]

    vars = []
    for j in range(input.shape[1]):
        cur_vars = {'x'+str(i): input[i][j] for i in range(input.shape[0])}
        # print("cur_vars is ", cur_vars)
        vars.append(cur_vars)
    vars = np.array(vars)

    height = 10
    random_population = [generate_random_tree_with_all_terminal(height, 0.15, terminal_list, constants=crammed_constants, p_pick_constant=0.4, p_cut_tree=0.05) 
                         for _ in range(100)]
    
    random_pop_fitness = [fitness_unscaled(tree, vars, labels) for tree in random_population]
    random_pop_fitness.sort()
    best_fitness = random_pop_fitness[0]
    random_pop_fitness = [f for f in random_pop_fitness if f != np.inf]
    random_pop_mean_fitness = np.mean(random_pop_fitness)

    my_output = my_function(input)
    if my_output is not None:
        print("=======================")
        print(f"Function f{problem_number}() has")
        my_best_fitness = 100*np.square(labels-my_output).sum()/len(labels)
        print(f"\tMSE (train): {my_best_fitness:g}")
        print(f"\tRandom population (with height < {height}) best fitness: {best_fitness:g}")
        diff = best_fitness - my_best_fitness
        print(f"\tDifference is {diff:g}")
    else :
        print("=======================")
        print(f"Function f{problem_number}() has")
        print("\txxxxxxxxxxxxxxxx")
        print(f"\tRandom population (with height < {height}) mean fitness: ", best_fitness)

    # print("=======================")
    # print("Labels Comparison")
    # print("mine - ground truth")
    # for my, gt in zip(my_output, labels):
    #     print(f"{my} - {gt}")

Function f1() has
	MSE (train): 7.12594e-32
	Random population (with height < 10) best fitness: 0.328155
	Difference is 0.328155
Function f2() has
	MSE (train): 1.77627e+15
	Random population (with height < 10) best fitness: 2.96134e+15
	Difference is 1.18507e+15
Function f3() has
	MSE (train): 29.9537
	Random population (with height < 10) best fitness: 275572
	Difference is 275542
Function f4() has
	MSE (train): 7.67489
	Random population (with height < 10) best fitness: 2288.9
	Difference is 2281.22
Function f5() has
	MSE (train): 5.11477e-20
	Random population (with height < 10) best fitness: 18.5668
	Difference is 18.5668
Function f6() has
	MSE (train): 0.00512497
	Random population (with height < 10) best fitness: 826.894
	Difference is 826.888
Function f7() has
	MSE (train): 1945.31
	Random population (with height < 10) best fitness: 75853.9
	Difference is 73908.6
Function f8() has
	MSE (train): 6.70698e+07
	Random population (with height < 10) best fitness: 2.29858e+09
	Differen

# Older Tests

In [8]:
stringa = 'mul(sqrt(mul(mul(mul(mul(x0, 13), sqrt(sub(1.03972, abs(log(sub(x0, x1)))))), add(div(x1, 8), abs(log(sub(x0, x1))))), sqrt(sub(1.71699, abs(add(add(sub(x0, x0), abs(sqrt(sub(add(sub(x0, x0), abs(sqrt(sub(1.03972, abs(log(sub(x0, x1))))))), abs(abs(log(sub(x0, x1)))))))), x0)))))), abs(sub(mul(div(x1, 8), sub(sqrt(sub(mul(div(x1, mul(div(x1, 8), x1)), sub(sub(x1, sub(div(x1, 31), sub(sub(x1, x1), sub(mul(mul(mul(sub(x0, x1), x0), x0), abs(log(x0))), x1)))), sub(div(x1, mul(div(x1, 8), x1)), x1))), sub(1.03972, log(sub(x1, x0))))), sub(div(sub(div(sub(sqrt(abs(x1)), sub(abs(sqrt(sub(1.03972, abs(div(x1, 8))))), x0)), x1), sub(mul(13, abs(add(x0, x0))), x1)), 8), sub(mul(13, abs(add(x0, x1))), div(x1, 8))))), mul(div(x1, 8), sub(x0, sub(mul(mul(mul(div(x1, 8), 31), x0), abs(log(sub(x1, x0)))), abs(x0)))))))'


### Test loading a whole population

In [None]:
import re
from joblib import Parallel, delayed
from tqdm import tqdm

def reload_population(checkpoint_name: str, verbose=True) -> list:
    helper_tree = generate_random_tree(1, 0.2, terminal_list, constants=crammed_constants, p_pick_constant=0.4, p_cut_tree=0.01, verbose=False)
    with open(checkpoint_name, 'r') as f:
        input_text = f.read()

    # Extract formulas using regex
    formula_pattern = re.compile(r'Tree \d+:\n(.+)\nFitness:')
    formulas = formula_pattern.findall(input_text)
    # for i, formula in enumerate(formulas):
    #     print("formula ", i)
    #     print(formula)

    # Parse each formula and recreate the tree
    trees = [helper_tree.parse_formula(formula) for formula in formulas]
    if verbose:
        print("Loaded a population of ", len(trees), " trees from ", checkpoint_name)
    
    return trees

reloaded_population = reload_population("./checkpoints/2025-01-20_16-35-37_p4_gen50.txt")
reloaded_fitnesses = fitnesses = np.array(Parallel(n_jobs=-1)(delayed(fitness_unscaled)(tree, vars, labels) for tree in tqdm(reloaded_population[:10], desc="Evaluating population")))
print(reloaded_fitnesses)

### Testing conversion from formula to tree

In [None]:
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from utils.terminal_constants import crammed_constants

# Parameters
OFFSPRING_SIZE = 200
POPULATION_SIZE = 100
INITIAL_PM = 0.2
FINAL_PM = 0.05                                          # CHANGE:: before we had 0.05 here
x_elitism = 0.08
MAX_GENERATIONS = 100
HEIGHT = 15
PC = 0.5                                           # CHANGE:: before we had 0.1 here
P_PICK_CONSTANT = 0.4
P_CUT_TREE = 0.05
CONSTANTS = crammed_constants

def initialize_population(_):
    return generate_random_tree_with_all_terminal(HEIGHT, PC, terminal_list, constants=CONSTANTS, p_pick_constant=P_PICK_CONSTANT, p_cut_tree=P_CUT_TREE)

with ThreadPoolExecutor() as executor:
    population = list(tqdm(executor.map(initialize_population, range(POPULATION_SIZE)), desc="Initializing population", total=POPULATION_SIZE))
original_formulas = [str(tree) for tree in population]

print("population size: ", len(population))
print("original_formulas size: ", len(original_formulas))
print()

population_fromckp = [my_tree.parse_formula(formula) for formula in original_formulas]
final_formulas = [str(tree) for tree in population_fromckp]

print("population_fromckp size: ", len(population_fromckp))
print("final_formulas size: ", len(final_formulas))
print()

for i, (original_formula, final_formula) in enumerate(zip(original_formulas, final_formulas)):
    if original_formula != final_formula:
        print(f"formulas at position {i} differ, see original and final respectively:")
        print(f"> {original_formula}")
        print(f"> {final_formula}")
        print()

### Testing if formula works (on problem 6 best result)

In [None]:
import numpy as np

print("input is:")
print(input)

def my_sqrt(x: np.ndarray) -> np.ndarray:
    return np.sqrt(np.abs(x))

def my_log(x: np.ndarray) -> np.ndarray:
    return np.log(np.abs(x))

def my_reciprocal(x: np.ndarray) -> np.ndarray:
    if not isinstance(x, np.ndarray):
        if x == 0:
            return 0
        else:
            return np.reciprocal(x)
        
    out = []
    for el in x:
        if el == 0:
            out.append(el)
        else:
            out.append(np.reciprocal(el))
    return np.array(out)

def f6(x: np.ndarray) -> np.ndarray:
    return np.divide(np.subtract(np.subtract(np.subtract(np.subtract(np.subtract(np.subtract(np.subtract(np.subtract(np.subtract(np.subtract(np.subtract(np.add(x[1], 96), np.exp(np.add(x[1], x[0]))), np.exp(np.abs(np.add(x[1], x[0])))), np.exp(np.abs(np.add(x[1], x[0])))), np.exp(np.abs(np.add(x[1], x[0])))), np.exp(np.abs(np.add(x[1], x[0])))), np.exp(np.abs(np.add(x[1], x[0])))), np.exp(np.add(x[1], x[0]))), np.exp(np.abs(np.add(x[1], x[0])))), np.exp(np.abs(np.add(x[1], x[0])))), np.exp(np.abs(np.abs(np.add(x[1], x[0]))))), np.exp(np.abs(np.add(x[1], x[0])))), np.exp(np.power(30, np.power(np.exp(np.abs(np.add(x[1], np.exp(x[0])))), np.subtract(x[0], x[0])))))# Resulting fitness
# print(f"shape of input is {input.shape}")

# str_output = f6(input)
# str_output = [(float(out)) for out in str_output]
# for str_out, tree_out in zip(str_output, tree_output):
#     print(f"str_out is {str_out}")

print(f"MSE (train): {100*np.square(labels-f6(input)).sum()/len(labels):g}")

### Check datasets

In [None]:
from gxgp import Node
import numpy as np
import matplotlib.pyplot as plt

problem_numbers = [1, 2, 3, 4, 5, 6, 7, 8]
for problem_number in problem_numbers:
    print("=====================================")
    print("Problem number: ", problem_number)

    problem = np.load(f'./data/problem_{problem_number}.npz')
    input = problem['x']
    labels = problem['y']

    print("Input shape:", input.shape, " Example of sample: ", input[:, 0])
    print("Labels shape:", labels.shape, " Example of label: ", labels[0])

    # Terminal set
    terminal_list = ['x' + str(i) for i in range(input.shape[0])]

    print("terminal_list: ", terminal_list)

    print("input shape is ", input.shape)

    vars = []
    for j in range(input.shape[1]):
        cur_vars = {'x'+str(i): input[i][j] for i in range(input.shape[0])}
        # print("cur_vars is ", cur_vars)
        vars.append(cur_vars)
    vars = np.array(vars)

    print("vars shape is ", vars.shape)

    # Plot the distribution of the labels as a histogram
    plt.figure(figsize=(10, 6))
    plt.hist(labels, bins=30, alpha=0.7, edgecolor='black')
    plt.title(f'Distribution of Labels for Problem {problem_number}')
    plt.xlabel('Label Value')
    plt.ylabel('Frequency')
    plt.grid(True)
    plt.show()

    # Plot the distribution of the labels as a hexbin plot
    plt.figure(figsize=(10, 6))
    plt.hexbin(labels, np.zeros_like(labels), gridsize=50, cmap='viridis', mincnt=1)
    plt.colorbar(label='Counts')
    plt.title(f'Density of Labels for Problem {problem_number}')
    plt.xlabel('Label Value')
    plt.yticks([])  # Hide y-axis ticks
    plt.grid(True)
    plt.show()

### Test the training loop's reuse

In [None]:
from joblib import Parallel, delayed
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import math

### SETTINGS ###
USE_JOBLIB_INSTEAD_OF_THREADPOOL = True # I modified fitness computation only, but the others would benefit too IMO if you want to try

ALREADY_INITIALIZED = False
if ALREADY_INITIALIZED:
    START_FROM_GENERATION = 100
    END_AT_GENERATION = 150
else:
    START_FROM_GENERATION = 0
### END SETTINGS ###


# Parameters
# crossover = recombination_crossover
OFFSPRING_SIZE = 200
POPULATION_SIZE = 100
OUTSIDER_SIZE = math.ceil(OFFSPRING_SIZE*0.1)
INITIAL_PM = 0.2
FINAL_PM = 0.05                                          # CHANGE:: before we had 0.05 here
x_elitism = 0.08
MAX_GENERATIONS = 100
HEIGHT = 5
PC = 0.15                                           # CHANGE:: before we had 0.1 here
P_PICK_CONSTANT = 0.4
P_CUT_TREE = 0.05
CONSTANTS = crammed_constants

if ALREADY_INITIALIZED:
    MAX_GENERATIONS = END_AT_GENERATION

conf = {
    "problem": problem_number,
    # "crossover": crossover,
    "OFFSPRING_SIZE": OFFSPRING_SIZE,
    "POPULATION_SIZE": POPULATION_SIZE,
    "OUTSIDER_SIZE": OUTSIDER_SIZE,
    "INITIAL_PM": INITIAL_PM,
    "FINAL_PM": FINAL_PM,
    "x_elitism": x_elitism,
    "MAX_GENERATIONS": MAX_GENERATIONS,
    "HEIGHT": HEIGHT,
    "PC": PC,
    "P_PICK_CONSTANT": P_PICK_CONSTANT,
    "P_CUT_TREE": P_CUT_TREE,
    "CONSTANTS": CONSTANTS,
}

for generation in range(START_FROM_GENERATION, MAX_GENERATIONS):
    print(f"Generation {generation}")