In [1]:
from slim_gsgp_lib_np.main_slim import slim
from slim_gsgp_lib_np.utils.utils import train_test_split
from slim_gsgp_lib_np.utils.callbacks import *
from slim_gsgp_lib_np.evaluators.fitness_functions import rmse
from matplotlib.colors import LinearSegmentedColormap
import numpy as np
import time
import os
from tqdm import tqdm
from functions.test_funcs import mape, nrmse, r_squared, mae, standardized_rmse
from matplotlib import pyplot as plt
from slim_gsgp_lib_np.algorithms.SLIM_GSGP.operators.mutators import *
from slim_gsgp_lib_np.algorithms.SLIM_GSGP.operators.simplifiers import *
from slim_gsgp_lib_np.datasets.data_loader import *
from sklearn.preprocessing import MinMaxScaler
from scipy.spatial.distance import pdist, squareform

datasets = [globals()[i] for i in globals() if 'load' in i][2:]

os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["BLIS_NUM_THREADS"] = "1"


# -------------------------- # 
from slim_gsgp_lib_np.utils.utils import check_slim_version
from slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.tree import Tree
from slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition import Condition
from slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.population import Population    
from slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.tree_utils import *

In [2]:
# X, y = datasets[0]()
# X_train, X_test, y_train, y_test = train_test_split(X, y, p_test=0.2)
# X_train = MinMaxScaler().fit_transform(X_train)
# X_test = MinMaxScaler().fit_transform(X_test)
# y_train = MinMaxScaler().fit_transform(y_train.reshape(-1, 1)).ravel()
# y_test = MinMaxScaler().fit_transform(y_test.reshape(-1, 1)).ravel()

X = np.random.rand(20, 2)
y = np.random.rand(20)

X_train, X_test, y_train, y_test = train_test_split(X, y, p_test=0.2)

seed = 2

# agelog = LogAge()
# divlog = LogDiversity()
# early_stop = EarlyStopping_train(patience=2500)

example_tree, population = slim(X_train=X_train, y_train=y_train,
                    dataset_name='test', test_elite=False, slim_version='SLIM*ABS', # initializer='simple',
                    max_depth=14, init_depth=6, pop_size=100, n_iter=100, seed=seed, verbose=1,
                    p_inflate=0.3, p_struct=0.2, eps_fraction=1e-5,
                    prob_const=0.1, n_elites=1, selector='e_lexicase', 
                    decay_rate=0.2, p_xo=0, p_struct_xo=0, prob_terminal=0.8,
                    # callbacks=[agelog, divlog, early_stop], 
                    mode='exp', tournament_size=2, full_return=True, timeout=200,
    )

preds = example_tree.predict(X_test)
print('RMSE:', rmse(preds, y_test))

for ind in population.population:
    ind.version = example_tree.version
    ind.train_semantics = ind.predict(X_train)
    ind.test_semantics = ind.predict(X_test)

FUNCTIONS = example_tree.collection[0].FUNCTIONS
TERMINALS = example_tree.collection[0].TERMINALS
CONSTANTS = example_tree.collection[0].CONSTANTS
SPECIALISTS = {f'S_{i}' : ind for i, ind in enumerate(population.population) if i<5}
Tree.FUNCTIONS = FUNCTIONS
Tree.TERMINALS = TERMINALS
Tree.CONSTANTS = CONSTANTS
Tree.SPECIALISTS = SPECIALISTS
Condition.FUNCTIONS = FUNCTIONS
Condition.TERMINALS = TERMINALS
Condition.CONSTANTS = CONSTANTS

+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|     dataset     |        it       |      train      |       test      |       time      |      nodes      |       div       |     avgStru     |      avgDep     |      struct     |     inflate     |     deflate     |        xo       |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|       test      |        0        |      0.383      |       None      |      0.013      |        5        |        18       |      3.150      |      3.150      |     N/A (0)     |     N/A (0)     |     N/A (0)     |     N/A (0)     |
|-----------------|-----------------|-----------------|-

In [16]:
collection = create_random_tree(2, 2, FUNCTIONS, TERMINALS, CONSTANTS, SPECIALISTS)
tree = Tree(collection)
print(tree.collection)

S_2


In [23]:
pop = initializer(init_pop_size=100, 
            depth_condition=2, 
            depth_tree=4, 
            FUNCTIONS=FUNCTIONS, 
            TERMINALS=TERMINALS, 
            CONSTANTS=CONSTANTS,
            SPECIALISTS=SPECIALISTS,
            p_c=0.3, 
            p_t=0.7, 
            p_specialist=0.5,
            )

pop = Population([Tree(struc) for struc in pop])
pop.calculate_semantics(X_train)
pop.evaluate(y_train, testing=False)

In [None]:
def get_condition_indices(tree, path=None, FUNCTIONS=None):
    """
    Recursively collects the indices (paths) to condition subtrees in ensemble nodes.
    
    An ensemble (conditional) node is assumed to be a tuple of length 3 
    whose first element is the condition (and is not a GP function, i.e. not in FUNCTIONS).
    
    Parameters
    ----------
    tree : tuple or any
        The tree representation (nested tuples for internal nodes, terminals as strings).
    path : list of int, optional
        The path to the current node (used during recursion). Default is [].
    FUNCTIONS : dict, optional
        Dictionary of GP functions. A node is considered an ensemble node if its first element 
        is not in FUNCTIONS. (If None, no GP functions are assumed.)
    
    Returns
    -------
    List[List[int]]
        A list of paths, where each path is a list of indices indicating the location of a condition.
    """
    if path is None:
        path = []
    indices = []
    # Check if tree is a tuple.
    if isinstance(tree, tuple):
        # If it's an ensemble (conditional) node, we assume:
        # - It has length 3.
        # - Its first element is the condition (and should not be a key in FUNCTIONS).
        if len(tree) == 3 and (FUNCTIONS is None or not (isinstance(tree[0], str) and tree[0] in FUNCTIONS)):
            # Record the path to the condition (the condition is at index 0 of the tuple).
            indices.append(path + [0])
        # Now, traverse all children.
        # For ensemble nodes, we traverse all elements (indices 0, 1, and 2).
        # For GP function nodes (first element is in FUNCTIONS), we assume children start at index 1.
        if isinstance(tree[0], str) and FUNCTIONS is not None and tree[0] in FUNCTIONS:
            arity = FUNCTIONS[tree[0]]["arity"]
            for i in range(1, arity + 1):
                indices.extend(get_condition_indices(tree[i], path + [i], FUNCTIONS))
        else:
            # For ensemble nodes, traverse every element.
            for i in range(len(tree)):
                indices.extend(get_condition_indices(tree[i], path + [i], FUNCTIONS))
    # Terminals are not traversed.
    return indices


def get_specialist_indices(tree, path=None, SPECIALISTS=None):
    """
    Recursively collects the indices (paths) to specialist terminals in the tree.
    
    A specialist is assumed to be a terminal (non-tuple) that appears as a key in SPECIALISTS.
    
    Parameters
    ----------
    tree : tuple or any
        The tree representation.
    path : list of int, optional
        The current path (used in recursion). Default is [].
    SPECIALISTS : dict, optional
        Dictionary of specialists (keys are used as terminal symbols).
    
    Returns
    -------
    List[List[int]]
        A list of paths (each a list of integers) indicating the locations of specialist terminals.
    """
    if path is None:
        path = []
    indices = []
    if isinstance(tree, tuple):
        for i, child in enumerate(tree):
            indices.extend(get_specialist_indices(child, path + [i], SPECIALISTS))
    else:
        if SPECIALISTS is None:
            # If no specialists dictionary is provided, return an empty list.
            return []
        if tree in SPECIALISTS:
            indices.append(path)
    return indices


def get_candidate_branch_indices(tree, path=None, FUNCTIONS=None, SPECIALISTS=None):
    """
    Recursively collect candidate branch indices (paths) where a specialist can be inserted 
    to prune the tree. Only branches that are not already specialists are candidates.

    In our representation, an ensemble (conditional) node is a tuple of length 3 
    whose first element is a condition (i.e. not a GP function in FUNCTIONS). 
    Its branches (indices 1 and 2) are potential candidate sites if they are not specialists.

    Parameters
    ----------
    tree : tuple or any
        The tree representation.
    path : list of int, optional
        The current path (default: []).
    FUNCTIONS : dict, optional
        Dictionary of GP functions.
    SPECIALISTS : dict, optional
        Dictionary of specialist individuals. A node is considered a specialist if it is a string in SPECIALISTS.

    Returns
    -------
    List[List[int]]
        A list of paths (each a list of indices) representing candidate branch positions.
    """
    if path is None:
        path = []
    candidates = []
    if isinstance(tree, tuple):
        # Check if this is an ensemble node:
        if len(tree) == 3 and (FUNCTIONS is None or not (isinstance(tree[0], str) and tree[0] in FUNCTIONS)):
            # For ensemble nodes, branch indices 1 and 2 are candidates if they are not specialists.
            if not (isinstance(tree[1], str) and SPECIALISTS is not None and tree[1] in SPECIALISTS):
                candidates.append(path + [1])
            if not (isinstance(tree[2], str) and SPECIALISTS is not None and tree[2] in SPECIALISTS):
                candidates.append(path + [2])
            # Also, traverse further into branches that are not specialists.
            if not (isinstance(tree[1], str) and SPECIALISTS is not None and tree[1] in SPECIALISTS):
                candidates.extend(get_candidate_branch_indices(tree[1], path + [1], FUNCTIONS, SPECIALISTS))
            if not (isinstance(tree[2], str) and SPECIALISTS is not None and tree[2] in SPECIALISTS):
                candidates.extend(get_candidate_branch_indices(tree[2], path + [2], FUNCTIONS, SPECIALISTS))
        # If it's a GP function node, traverse its children (children start at index 1).
        elif isinstance(tree[0], str) and FUNCTIONS is not None and tree[0] in FUNCTIONS:
            arity = FUNCTIONS[tree[0]]["arity"]
            for i in range(1, arity + 1):
                candidates.extend(get_candidate_branch_indices(tree[i], path + [i], FUNCTIONS, SPECIALISTS))
        else:
            # Otherwise, traverse all children.
            for i, child in enumerate(tree):
                candidates.extend(get_candidate_branch_indices(child, path + [i], FUNCTIONS, SPECIALISTS))
    return candidates

In [28]:
def get_condition_indices(tree, path=None):
    """
    Recursively collects the paths to condition subtrees in ensemble nodes.
    
    In the new design, an ensemble node is a tuple of length 3 whose first element is a 
    Condition object. Its condition is located at index 0.
    
    Parameters
    ----------
    tree : tuple or any
        The tree's collection.
    path : list of int, optional
        The current path (default is []).
        
    Returns
    -------
    List[List[int]]
        A list of paths (each a list of indices) pointing to condition objects.
    """
    if path is None:
        path = []
    indices = []
    if isinstance(tree, tuple):
        # If this is an ensemble node, its first element is a Condition.
        if len(tree) == 3 and hasattr(tree[0], "repr_"):
            indices.append(path + [0])
        # Recurse over every element.
        for i, child in enumerate(tree):
            indices.extend(get_condition_indices(child, path + [i]))
    return indices


def get_specialist_indices(tree, path=None, SPECIALISTS=None):
    """
    Recursively collects the paths to specialist terminals in the tree.
    
    A specialist terminal is assumed to be a non-tuple value that is a key in SPECIALISTS.
    
    Parameters
    ----------
    tree : tuple or any
        The tree's collection.
    path : list of int, optional
        The current path (default is []).
    SPECIALISTS : dict, optional
        Dictionary of specialist objects.
        
    Returns
    -------
    List[List[int]]
        A list of paths (each a list of indices) to specialist terminals.
    """
    if path is None:
        path = []
    indices = []
    if isinstance(tree, tuple):
        for i, child in enumerate(tree):
            indices.extend(get_specialist_indices(child, path + [i], SPECIALISTS))
    else:
        if SPECIALISTS is not None and tree in SPECIALISTS:
            indices.append(path)
    return indices


def get_candidate_branch_indices(tree, path=None, SPECIALISTS=None, FUNCTIONS=None):
    """
    Recursively collects candidate branch paths where a specialist can be inserted
    (to prune the tree) – but only in ensemble nodes where the branch is not already a specialist.
    
    In our design, an ensemble (conditional) node is a tuple of length 3 whose first element is a 
    Condition object. Its branches (indices 1 and 2) are candidates if they are not specialists.
    
    Parameters
    ----------
    tree : tuple or any
        The tree's collection.
    path : list of int, optional
        The current path (default is []).
    SPECIALISTS : dict, optional
        Dictionary of specialist objects.
    FUNCTIONS : dict, optional
        Dictionary of GP functions.
        
    Returns
    -------
    List[List[int]]
        A list of paths (each a list of indices) representing candidate branch positions.
    """
    if path is None:
        path = []
    candidates = []
    if isinstance(tree, tuple):
        # If this is an ensemble node: length==3 and first element is a Condition.
        if len(tree) == 3 and hasattr(tree[0], "repr_"):
            # Check branch at index 1.
            if not (isinstance(tree[1], str) and SPECIALISTS is not None and tree[1] in SPECIALISTS):
                candidates.append(path + [1])
            # Check branch at index 2.
            if not (isinstance(tree[2], str) and SPECIALISTS is not None and tree[2] in SPECIALISTS):
                candidates.append(path + [2])
            # Recurse into branches that are not specialists.
            if not (isinstance(tree[1], str) and SPECIALISTS is not None and tree[1] in SPECIALISTS):
                candidates.extend(get_candidate_branch_indices(tree[1], path + [1], SPECIALISTS, FUNCTIONS))
            if not (isinstance(tree[2], str) and SPECIALISTS is not None and tree[2] in SPECIALISTS):
                candidates.extend(get_candidate_branch_indices(tree[2], path + [2], SPECIALISTS, FUNCTIONS))
        # If it's a GP function node, traverse its children starting at index 1.
        elif isinstance(tree[0], str) and FUNCTIONS is not None and tree[0] in FUNCTIONS:
            arity = FUNCTIONS[tree[0]]["arity"]
            for i in range(1, arity + 1):
                candidates.extend(get_candidate_branch_indices(tree[i], path + [i], SPECIALISTS, FUNCTIONS))
        else:
            # Otherwise, traverse all children.
            for i, child in enumerate(tree):
                candidates.extend(get_candidate_branch_indices(child, path + [i], SPECIALISTS, FUNCTIONS))
    return candidates


In [32]:
get_condition_indices(pop.population[88].collection)

[[0], [1, 0], [1, 1, 0], [1, 2, 0], [2, 0], [2, 1, 0], [2, 2, 0]]

In [33]:
get_specialist_indices(pop.population[88].collection, SPECIALISTS=SPECIALISTS)  

[[1, 1, 1],
 [1, 1, 2],
 [1, 2, 1],
 [1, 2, 2],
 [2, 1, 1],
 [2, 1, 2],
 [2, 2, 1],
 [2, 2, 2]]

In [34]:
get_candidate_branch_indices(pop.population[88].collection, SPECIALISTS=SPECIALISTS, FUNCTIONS=FUNCTIONS)

[[1], [2], [1, 1], [1, 2], [2, 1], [2, 2]]

In [27]:
pop.population[88].collection

(<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ec5010>,
 (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ec5090>,
  (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ec5110>,
   'S_1',
   'S_2'),
  (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ec51d0>,
   'S_4',
   'S_4')),
 (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ec52d0>,
  (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ec5350>,
   'S_2',
   'S_1'),
  (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ec5410>,
   'S_2',
   'S_3')))

In [None]:
def replace_subtree(tree, path, new_subtree):
    """
    Replace the subtree at the specified path with new_subtree.

    Parameters
    ----------
    tree : tuple or any
        The original tree.
    path : list of int
        The path (list of indices) to the subtree to replace.
    new_subtree : any
        The subtree (e.g. a specialist terminal) to insert.

    Returns
    -------
    The new tree with the subtree at 'path' replaced.
    """
    if not path:
        return new_subtree
    if isinstance(tree, tuple):
        index = path[0]
        tree_list = list(tree)
        tree_list[index] = replace_subtree(tree[index], path[1:], new_subtree)
        return tuple(tree_list)
    else:
        raise ValueError("Path leads into a terminal; cannot replace further.")
    
def get_subtree(tree, path):
    """
    Retrieve the subtree at the given path from the tree.
    """
    if not path:
        return tree
    if isinstance(tree, tuple):
        index = path[0]
        return get_subtree(tree[index], path[1:])
    else:
        return tree
    
def collect_valid_subtrees(tree):
    """
    Recursively collects all valid subtrees for hoist mutation.
    
    A valid subtree is one that is either:
      - A specialist terminal (a string), or
      - A complete ensemble node: a tuple of length 3 whose first element is a Condition object.
    
    Parameters
    ----------
    tree : tuple or any
        The tree's collection.
    
    Returns
    -------
    list
        A list of valid candidate subtrees.
    """
    candidates = []
    if isinstance(tree, tuple):
        # Check if this tuple is a complete ensemble node.
        if len(tree) == 3 and hasattr(tree[0], "repr_"):
            candidates.append(tree)
        # Recurse into every child.
        for child in tree:
            candidates.extend(collect_valid_subtrees(child))
    else:
        # If not a tuple, then it's a terminal.
        if isinstance(tree, str):
            candidates.append(tree)
    return candidates

In [None]:
# ------------------------------------------------------------ PRUNE MUTATION -------------------------------------------------------------- # 
def mutate_prune(tree, SPECIALISTS, FUNCTIONS):
    """
    Prune Mutation via Specialist Insertion.

    This function gathers candidate branch indices (only those where the branch is not already
    a specialist), randomly selects one candidate, and replaces that branch with a random specialist.
    If no candidate is found, it replaces the entire tree with a specialist.

    Parameters
    ----------
    tree : tuple or any
        The tree representation.
    SPECIALISTS : dict
        Dictionary of specialist individuals (keys used as terminals).
    FUNCTIONS : dict
        Dictionary of GP functions.

    Returns
    -------
    The new tree after pruning.
    """
    candidates = get_candidate_branch_indices(tree, path=[], FUNCTIONS=FUNCTIONS, SPECIALISTS=SPECIALISTS)
    if not candidates:
        # No candidate found: prune the entire tree.
        return random.choice(list(SPECIALISTS.keys()))
    chosen_path = random.choice(candidates)
    new_spec = random.choice(list(SPECIALISTS.keys()))
    new_tree = replace_subtree(tree, chosen_path, new_spec)
    return new_tree

# ------------------------------------------------------------ EXPAND MUTATION -------------------------------------------------------------- #
def mutate_expand(tree, SPECIALISTS, depth_condition, p_c=0.3, p_t=0.5):
    """
    Mutation Operator: Expand Specialist.
    
    Given a tree, this function finds candidate indices where a specialist terminal is located.
    It then randomly selects one candidate and replaces that specialist with an ensemble node,
    which is composed of:
      - A new condition generated by create_grow_random_tree (using depth_condition),
      - Two new specialists (randomly chosen from SPECIALISTS) as the true and false branches.
    
    This expands the tree at that location.
    
    Parameters
    ----------
    tree : tuple or any
        The tree representation.
    SPECIALISTS : dict
        Dictionary of specialist individuals.
    depth_condition : int
        Maximum depth for generating the new condition tree.
    p_c : float, optional
        Constant probability for tree generation (passed to create_grow_random_tree). Default is 0.3.
    p_t : float, optional
        Terminal probability for tree generation (passed to create_grow_random_tree). Default is 0.5.
    
    Returns
    -------
    The mutated tree with one specialist expanded into an ensemble node.
    """
    # Get all indices where the node is a specialist terminal.
    candidate_indices = get_specialist_indices(tree, path=[], SPECIALISTS=SPECIALISTS)
    if not candidate_indices:
        # No specialist found; nothing to expand.
        return tree
    chosen_path = random.choice(candidate_indices)
    new_condition = create_grow_random_tree(depth_condition, FUNCTIONS, TERMINALS, CONSTANTS, p_c=p_c, p_t=p_t, first_call=True)
    new_condition = Condition(new_condition)
    new_spec1 = random.choice(list(SPECIALISTS.keys()))
    new_spec2 = random.choice(list(SPECIALISTS.keys()))
    new_subtree = (new_condition, new_spec1, new_spec2)
    new_tree = replace_subtree(tree, chosen_path, new_subtree)
    return new_tree

# ----------------------------------------------------- SPECIALIST MUTATION ---------------------------------------------------------- #
def mutate_specialist(tree, SPECIALISTS):
    """
    Mutation Operator: Swap Specialist.
    
    This function finds all positions in the tree where a specialist terminal occurs.
    It randomly selects one such position and replaces the specialist with a different
    randomly chosen specialist from the SPECIALISTS dictionary.
    
    Parameters
    ----------
    tree : tuple or any
        The tree representation.
    SPECIALISTS : dict
        Dictionary of specialist individuals (the keys are used as terminals).
    
    Returns
    -------
    The new tree after swapping one specialist.
    """
    candidate_paths = get_specialist_indices(tree, path=[], SPECIALISTS=SPECIALISTS)
    if not candidate_paths:
        return tree  # No specialist found, return unchanged.
    chosen_path = random.choice(candidate_paths)
    current_spec = get_subtree(tree, chosen_path)
    candidates = list(SPECIALISTS.keys())
    # Remove the current specialist so that a new one is selected.
    if current_spec in candidates:
        candidates.remove(current_spec)
    if not candidates:
        return tree  # In case there is only one specialist available.
    new_spec = random.choice(candidates)
    new_tree = replace_subtree(tree, chosen_path, new_spec)
    return new_tree

# ----------------------------------------------------- CONDITION MUTATION ---------------------------------------------------------- #
def mutate_condition(tree, max_depth_condition, TERMINALS, CONSTANTS, FUNCTIONS, p_c=0.3, p_t=0.5, decay_rate=0.2):
    """
    Mutates a condition subtree by selecting one candidate condition node and then
    choosing an index within that node to replace with a new subtree.
    
    Process:
      1. Collect candidate paths to condition subtrees using get_condition_indices.
      2. Randomly choose one candidate path.
      3. Retrieve the candidate condition node (which should be a tuple).
      4. Randomly choose an index within that candidate tuple:
           - If index 0 is chosen, the entire candidate is replaced;
           - If index > 0 is chosen, then the allowed maximum depth is reduced by 1.
      5. Generate a new subtree using create_grow_random_tree with the appropriate maximum depth.
      6. Replace the subtree at the full path (candidate path + chosen index) with the new subtree.
    
    Parameters
    ----------
    tree : tuple or any
        The original tree representation.
    max_depth_condition : int
        The maximum depth allowed for condition trees.
    TERMINALS : dict
        Dictionary of terminal symbols.
    CONSTANTS : dict
        Dictionary of constant symbols.
    FUNCTIONS : dict
        Dictionary of allowed GP functions.
    p_c : float
        Constant probability used in tree generation.
    p_t : float
        Terminal probability used in tree generation.
    decay_rate : float
        Rate at which the maximum depth is reduced when selecting a non-root index.

    
    Returns
    -------
    The mutated tree.
    """
    candidate_paths = get_condition_indices(tree, path=[])
    if not candidate_paths:
        return tree
    
    candidate_path = random.choice(candidate_paths)
    candidate_tree = get_subtree(tree, candidate_path)

    # Now mutate the candidate tree.
    indices_with_levels = get_indices_with_levels(candidate_tree.repr_)
    random_index, depth = exp(candidate_tree.depth, max_depth_condition, indices_with_levels, decay_rate)

    # If just a node is selected
    if depth == 1: 
        if random.random() < p_c:
            new_subtree = random.choice(list(CONSTANTS.keys()))
        else:
            new_subtree = random.choice(list(TERMINALS.keys()))    
    else: 
        new_subtree = create_grow_random_tree(depth, FUNCTIONS, TERMINALS, CONSTANTS, p_c, p_t, first_call=True)

    new_condition = swap_sub_tree(candidate_tree.repr_, new_subtree, list(random_index))   
    new_condition = Condition(new_condition)
    new_tree = replace_subtree(tree, candidate_path, new_condition)
    return new_tree

In [None]:
def hoist_mutation(tree):
    """
    Performs hoist mutation on the tree.
    
    It collects all valid subtrees (complete ensemble nodes or specialist terminals),
    excludes the whole tree itself, and randomly selects one candidate to replace the
    entire tree.
    
    Parameters
    ----------
    tree : tuple or any
        The original tree's collection.
    
    Returns
    -------
    new_tree : tuple or any
        The mutated tree (i.e. one of the candidate subtrees) or the original tree if none exist.
    """
    candidates = collect_valid_subtrees(tree)
    # Exclude the entire tree from candidates.
    if tree in candidates:
        candidates.remove(tree)
    if not candidates:
        return tree
    return random.choice(candidates)

In [77]:
pop.population[66].collection

(<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea8210>,
 (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea89d0>,
  'S_1',
  'S_0'),
 (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea96d0>,
  'S_2',
  'S_4'))

In [390]:
collect_subtrees(pop.population[66].collection)

[(<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea8210>,
  (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea89d0>,
   'S_1',
   'S_0'),
  (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea96d0>,
   'S_2',
   'S_4')),
 <slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea8210>,
 (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea89d0>,
  'S_1',
  'S_0'),
 <slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea89d0>,
 'S_1',
 'S_0',
 (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea96d0>,
  'S_2',
  'S_4'),
 <slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea96d0>,
 'S_2',
 'S_4']

In [443]:
hoist_mutation(pop.population[66].collection)

(<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition at 0x218a8ea96d0>,
 'S_2',
 'S_4')

In [243]:
psp = prune_with_specialist(pop.population[66].collection, SPECIALISTS=SPECIALISTS, FUNCTIONS=FUNCTIONS)
psp = Tree(psp)
print(psp)
psp.evaluate(rmse, X_train, y_train, testing=False)
print(psp.fitness)

(('subtract', 'x0', 'x0'), S_1, (('subtract', 'constant_0.84', 'x1'), S_2, S_4))
0.46605944513968284


In [256]:
exp = mutation_expand_specialist(pop.population[66].collection, SPECIALISTS=SPECIALISTS, depth_condition=2)
exp = Tree(exp)
print(exp)
exp.evaluate(rmse, X_train, y_train, testing=False)
print(exp.fitness)

(('subtract', 'x0', 'x0'), (('add', 'x1', 'x0'), (('multiply', 'x1', 'constant_0.84'), S_4, S_4), S_0), (('subtract', 'constant_0.84', 'x1'), S_2, S_4))
0.46605944513968284


In [291]:
spm = mutation_specialist(pop.population[66].collection, SPECIALISTS=SPECIALISTS)
spm = Tree(spm)
print(spm)
spm.evaluate(rmse, X_train, y_train, testing=False)
print(spm.fitness)

(('subtract', 'x0', 'x0'), (('add', 'x1', 'x0'), S_1, S_0), (('subtract', 'constant_0.84', 'x1'), S_2, S_0))
0.3776030533260461


In [350]:
mut_condition = mutate_condition_tree(pop.population[66].collection, 3, TERMINALS, CONSTANTS, FUNCTIONS, 0.3, 0.5, 0.2)
print(mut_condition)
tmut = Tree(mut_condition)
tmut.evaluate(rmse, X_train, y_train, testing=False)
print(tmut.fitness)
print(tmut)

(<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition object at 0x00000218A8EA8210>, (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition object at 0x00000218A9A88750>, 'S_1', 'S_0'), (<slim_gsgp_lib_np.algorithms.MULTI_SLIM.representations.condition.Condition object at 0x00000218A8EA96D0>, 'S_2', 'S_4'))
0.46605944513968284
(('subtract', 'x0', 'x0'), (('add', 'x1', 'x1'), S_1, S_0), (('subtract', 'constant_0.84', 'x1'), S_2, S_4))


In [348]:
print(pop.population[66])

(('subtract', 'x0', 'x0'), (('add', 'x1', 'x0'), S_1, S_0), (('subtract', 'constant_0.84', 'x1'), S_2, S_4))


In [421]:
struc = (('divide', ('divide', 'x0', 'x0'), ('add', 'x1', 'constant_0.54')), 'S_2', (('multiply', 'constant_0.19', 'constant__1.0'), 'S_0', 'S_3'))
print(struc)

(('divide', ('divide', 'x0', 'x0'), ('add', 'x1', 'constant_0.54')), 'S_2', (('multiply', 'constant_0.19', 'constant__1.0'), 'S_0', 'S_3'))


In [187]:
candidates = get_candidate_branch_indices(struc, FUNCTIONS=FUNCTIONS)
print("Candidate branch indices:", candidates)

Candidate branch indices: [[1], [2], [2, 1], [2, 2], [2, 1, 1], [2, 1, 2]]


In [371]:
condition_candidates = get_condition_indices(struc, path=[], FUNCTIONS=FUNCTIONS)
print(condition_candidates)

[[0], [2, 0]]


### ------------------------------------------------------------------------------------------------

In [364]:
struc = create_random_tree(3, 3, FUNCTIONS, TERMINALS, CONSTANTS, SPECIALISTS, p_specialist=0.7, p_t=0.7)
print(struc)

(('divide', 'x0', ('subtract', 'x1', 'x0')), 'S_0', 'S_2')


In [9]:
struc = create_random_tree(2, 1, FUNCTIONS, TERMINALS, CONSTANTS, SPECIALISTS)
tree = Tree(struc)
tree.evaluate(rmse, X_train, y_train, testing=False)
tree.evaluate(rmse, X_test, y_test, testing=True)   
tree.print_tree_representation()
print('------------------/-----------------')
print('Train RMSE (ensemble):', tree.fitness)
print('Test RMSE (ensemble):', tree.test_fitness)
print('Train RMSE (individual):', rmse(example_tree.predict(X_train), y_train))
print('Test RMSE (individual):', rmse(example_tree.predict(X_test), y_test))
print('------------------/-----------------')
print('Nodes:', tree.nodes_count)   
print('Total Nodes:', tree.total_nodes)


if (
  multiply(
    constant_0.41
    x0
  )
) > 0 then
  S_3
else
  S_2
endif

------------------/-----------------
Train RMSE (ensemble): 0.5768332842641796
Test RMSE (ensemble): 0.6984042546297496
Train RMSE (individual): 0.14272031425948212
Test RMSE (individual): 0.27550132633252133
------------------/-----------------
Nodes: 5
Total Nodes: 103


In [13]:
from slim_gsgp_lib_np.config.multi_slim_config import * 
slim_params = SlimParameters()