In [1]:
import numpy as np
from dataclasses import dataclass
from typing import Optional
from tqdm.auto import tqdm
import warnings
warnings.filterwarnings('ignore')

In [2]:
np.random.seed(42)

# Problem encoding in tree structure

In [3]:
TYPES = ['math_const', 'const', 'var', 'unary_op', 'binary_op']

MATH_CONSTANTS = ['pi', 'e', 'euler_gamma']
MATH_CONSTANTS_MAP = {
    'pi': np.pi, 'e': np.e, 'euler_gamma': np.euler_gamma}

UNARY_OPS = ['negative', 'abs', 'sign'
             'exp', 'log', 'sqrt', 'square',
             'sin', 'cos', 'arcsin', 'arccos', 'arctan', 
             'sinh', 'cosh', 'arcsinh', 'arccosh', 'arctanh']

BINARY_OPS = ['add', 'subtract', 'multiply', 'divide', 'power', 
              'mod', 'maximum', 'minimum', 'heaviside']

UNARY_MAP = {
   'negative': np.negative, 'abs': np.abs, 'sign': np.sign,
   'exp': np.exp, 'log': np.log,
   'sqrt': np.sqrt, 'square': np.square,
   'sin': np.sin, 'cos': np.cos,
   'arcsin': np.arcsin, 'arccos': np.arccos, 'arctan': np.arctan, 
   'sinh': np.sinh, 'cosh': np.cosh,
   'arcsinh': np.arcsinh, 'arccosh': np.arccosh, 'arctanh': np.arctanh}

BINARY_MAP = {
   'add': np.add, 'subtract': np.subtract, 'multiply': np.multiply, 'divide': np.divide,
   'power': np.power, 'mod': np.mod,
   'maximum': np.maximum, 'minimum': np.minimum, 'heaviside': np.heaviside}

In [4]:
@dataclass
class Node:
    type: str
    value: str
    parent: Optional['Node'] = None
    left: Optional['Node'] = None 
    right: Optional['Node'] = None
    
    def __eq__(self, other):
        if not isinstance(other, Node):
            return False
        # Compare only type and value, not the tree structure
        return self.type == other.type and self.value == other.value
    
    def copy(self) -> 'Node':
        """Create a deep copy of the node without copying parent references"""
        new_node = Node(self.type, self.value)
        if self.left:
            new_node.left = self.left.copy()
            new_node.left.parent = new_node
        if self.right:
            new_node.right = self.right.copy()
            new_node.right.parent = new_node
        return new_node

def build_tree(num_vars: int, mode: str = 'full', max_depth: int = 20, max_const: int = 10, depth: int = 0) -> Node:
    if depth == max_depth:
        type = np.random.choice(['math_const', 'const', 'var'], \
                                p = [0.1, 0.4, 0.5])
        if type == 'const':
            if max_const <= 1:
                value = np.random.rand()
            else:
                value = np.random.randint(0, max_const)
            return Node('const', str(value), None, None, None)
        elif type == 'math_const':
            return Node('math_const', np.random.choice(MATH_CONSTANTS), None, None, None)
        elif type == 'var':
            return Node('var', 'x' + str(np.random.randint(0, num_vars)), None, None, None)
    else:
        if mode == 'full':
            type = np.random.choice(['unary_op', 'binary_op'])
        elif mode == 'grow':
            type = np.random.choice(TYPES, p = [0.05, 0.2375, 0.2375, 0.2375, 0.2375])

        if type == 'const':
            return Node('const', str(np.random.randint(0, max_const)), None, None, None)
        elif type == 'math_const':
            return Node('math_const', np.random.choice(MATH_CONSTANTS), None, None, None)
        elif type == 'var':
            return Node('var', 'x' + str(np.random.randint(0, num_vars)), None, None, None)
        elif type == 'unary_op':
            left_child = build_tree(num_vars, mode, max_depth, max_const, depth + 1)
            node = Node('unary_op', np.random.choice(UNARY_OPS), None, left_child, None)
            left_child.parent = node
            return node
        elif type == 'binary_op':
            left_child = build_tree(num_vars, mode, max_depth, max_const, depth + 1)
            right_child = build_tree(num_vars, mode, max_depth, max_const, depth + 1)
            node = Node('binary_op', np.random.choice(BINARY_OPS), None, left_child, right_child)
            left_child.parent = node
            right_child.parent = node
            return node

def print_tree(node: Node, depth: int = 0):
    if node is None:
        return
    print('- ' * depth*2 + node.value)
    print_tree(node.left, depth + 1)
    print_tree(node.right, depth + 1)

def get_formula(node: Node) -> str:
    if node is None:
        return ""
    if node.type == 'const':
        return node.value
    elif node.type == 'math_const':
        return f"np.{node.value}"
    elif node.type == 'var':
        return f"x[{node.value[1:]}]"
    elif node.type == 'unary_op':
        return f"np.{node.value}({get_formula(node.left)})"
    elif node.type == 'binary_op':
        return f"np.{node.value}({get_formula(node.left)}, {get_formula(node.right)})"

In [5]:
def evaluate_tree(node: Node, x: np.ndarray) -> np.ndarray:
    """Given tree and x, compute values of formula encoded in the tree for each sample in x"""
    
    try:
        if node.type == 'const':
            result = np.full(x.shape[1], float(node.value))
        elif node.type == 'math_const':
            result = np.full(x.shape[1], MATH_CONSTANTS_MAP[node.value])
        elif node.type == 'var':
            result = x[int(node.value[1:])]
        elif node.type == 'unary_op':
            left = evaluate_tree(node.left, x)
            result = UNARY_MAP[node.value](left)
        else:  # binary_op
            left = evaluate_tree(node.left, x)
            right = evaluate_tree(node.right, x)
            result = BINARY_MAP[node.value](left, right)
            
        if np.any(np.isnan(result)) or np.any(np.isinf(result)):
            result = np.full(x.shape[1], np.inf)
            
        return result
    except:
        return np.full(x.shape[1], np.inf)

In [6]:
def collect_nodes(node: Node, take_root: bool = False) -> list:
    nodes = []
    if node is not None:
        if node.parent is not None or take_root:
            nodes.append(node)
        nodes.extend(collect_nodes(node.left, take_root))
        nodes.extend(collect_nodes(node.right, take_root))
    return nodes

def tree_size(node: Node) -> int:
    if node is None:
        return 0
    return 1 + tree_size(node.left) + tree_size(node.right)

def fitness(individual: Node, x: np.ndarray, y: np.ndarray, parsimony_pressure: bool = True) -> tuple[float, int]:
    """
    Calculate fitness as the tuple of 
    [0] MSE between the true values and the predicted values obtained through formula encoded in the individual
    and [1] size of the individual (number of nodes).
    When parsimony pressure is enabled, the first component is adjusted to penalize larger trees.
    """
    y_pred = evaluate_tree(individual, x)
    mse = np.mean((y - y_pred) ** 2)
    size = tree_size(individual)
    weighted_fitness = mse
    if parsimony_pressure and size > 7:
        if mse < 1:
            weighted_fitness += 0.01*size
        elif mse < 10:
            weighted_fitness += 0.1*size
        else:
            weighted_fitness += np.floor_divide(mse, 10)*size
    return weighted_fitness, size

# Genetic operators

In [7]:
def recombination(parent1: Node, parent2: Node) -> tuple[Node, Node]:
    """Create 2 offsprings by swapping subtrees between 2 parents"""
    
    child1 = parent1.copy()
    child2 = parent2.copy()

    if tree_size(child1) == 1 or tree_size(child2) == 1:
        return child1, child2
    
    node1 = np.random.choice(collect_nodes(child1, take_root=False))
    node2 = np.random.choice(collect_nodes(child2, take_root=False))

    if node1.parent:
        if node1.parent.left is node1:
            node1.parent.left = node2
        else:
            node1.parent.right = node2
    if node2.parent:
        if node2.parent.left is node2:
            node2.parent.left = node1
        else:
            node2.parent.right = node1
    node1.parent, node2.parent = node2.parent, node1.parent

    return child1, child2

In [8]:
MUTATIONS = ['point', 'permutation', 'hoist', 'collapse']

def point_mutation(individual: Node, num_vars: int, max_const: int = 100) -> Node:
    """Mutate a random node in the individual, the type of node is preserved in case of unary and binary operators"""
    nodes = collect_nodes(individual, take_root=True)
    mutation_node = np.random.choice(nodes)
    if mutation_node.type in ['math_const', 'const', 'var']:
        new_type = np.random.choice(['math_const', 'const', 'var'], \
                                    p=[0.1, 0.4, 0.5])
        if new_type == 'const':
            mutation_node.type = 'const'
            if max_const <= 1:
                value = np.random.rand()
            else:
                value = np.random.randint(0, max_const)
            mutation_node.value = str(value)
        elif new_type == 'math_const':
            mutation_node.type = 'math_const'
            mutation_node.value = np.random.choice(MATH_CONSTANTS)
        elif new_type == 'var':
            mutation_node.type = 'var'
            mutation_node.value = 'x' + str(np.random.randint(0, num_vars))
    elif mutation_node.type == 'unary_op':
        mutation_node.value = np.random.choice(UNARY_OPS)
    elif mutation_node.type == 'binary_op':
        mutation_node.value = np.random.choice(BINARY_OPS)
    return individual


def permutation_mutation(individual: Node) -> Node:
    """Swap left and right subtrees of a random binary operator node"""
    binary_nodes = [node for node in collect_nodes(individual, take_root=True) if node.type == 'binary_op']
    if len(binary_nodes) != 0:
        mutation_node = np.random.choice(binary_nodes)
        mutation_node.left, mutation_node.right = mutation_node.right, mutation_node.left
    return individual

def hoist_mutation(individual: Node) -> Node:
    """Select a random subtree and return it as a new individual"""
    nodes = collect_nodes(individual, take_root=False)
    if len(nodes) == 0:
        return individual
    mutation_node = np.random.choice(nodes)
    mutation_node.parent = None
    return mutation_node

def collapse_mutation(individual: Node) -> Node:
    """Select a random non-leaf node and replace it with its left-most leaf child"""
    nonleaf_nodes = [node for node in collect_nodes(individual, take_root=True) if node.type in ['unary_op', 'binary_op']]
    if len(nonleaf_nodes) == 0:
        return individual
    mutation_node = np.random.choice(nonleaf_nodes)
    replacement_node = mutation_node
    while replacement_node.left is not None:
        replacement_node = replacement_node.left
    replacement_node.parent = mutation_node.parent
    if mutation_node.parent:
        if mutation_node.parent.left == mutation_node:
            mutation_node.parent.left = replacement_node
        else:
            mutation_node.parent.right = replacement_node
    return individual

# Generation

In [9]:
def tournament_selection(population: list[Node], x: np.ndarray, y: np.ndarray, 
                         tournament_size: int = 2, fitness_hole_prob: float = 0.1) -> Node:
    """Select the best individual from a random subset of the population"""
    tournament = np.random.choice(population, tournament_size, replace=False)
    if np.random.rand() < fitness_hole_prob:
        # select the "smallest" individual, additionally giving preference to the one whose fitness is not inf
        valid_individuals = [ind for ind in tournament if fitness(ind, x, y)[0] != np.inf]
        if valid_individuals:
            best_individual = min(valid_individuals, key=lambda i: fitness(i, x, y)[1])
        else:
            best_individual = min(tournament, key=lambda i: fitness(i, x, y)[1])
    else:
        # select the "fittest" individual
        best_individual = min(tournament, key=lambda i: fitness(i, x, y)[0])
    return best_individual


def survival_selection(population: list[Node], offspring: list[Node], 
                       x: np.ndarray, y: np.ndarray, pop_size: int) -> list[Node]:
    """Select the best individuals from the population and the offspring"""
    population.extend(offspring)
    return sorted(population, key=lambda i: fitness(i, x, y)[0])[:pop_size]

In [10]:
def genetic_programming(x: np.ndarray, y: np.ndarray, pop_size: int = 5000, max_generations: int = 30, 
                        max_depth: int = 10, max_const: int = 100, tournament_size: int = 2, 
                        mutation_prob: float = 0.1, fitness_hole_prob: float = 0.1, stagnation_window: int = 10) -> Node:
    
    population = []
    pbar = tqdm(total=pop_size, desc="Creating initial population")
    while len(population) < pop_size:
        tree = build_tree(x.shape[0],'grow' if np.random.random() < 0.5 else 'full',
                        max_depth, max_const)
        # Skip trees that give only np.inf
        if np.all(np.isinf(evaluate_tree(tree, x))):
            continue
        else:
            population.append(tree)
            pbar.update(1)
    pbar.close()

    
    best_fitness = (np.inf, np.inf)
    best_individual = None
    generations_without_improvement = 0

    for generation in tqdm(range(max_generations)):
        offspring = []
        for _ in tqdm(range(pop_size)):
            if np.random.rand() < mutation_prob:
                mutation_type = np.random.choice(MUTATIONS)
                parent = tournament_selection(population, x, y, tournament_size, fitness_hole_prob)
                if mutation_type == 'point':
                    offspring.append(point_mutation(parent.copy(), x.shape[0], max_const))
                elif mutation_type == 'permutation':
                    offspring.append(permutation_mutation(parent.copy()))
                elif mutation_type == 'hoist':
                    offspring.append(hoist_mutation(parent.copy()))
                elif mutation_type == 'collapse':
                    offspring.append(collapse_mutation(parent.copy()))
            else: #XOver
                parent1 = tournament_selection(population, x, y, tournament_size, fitness_hole_prob)
                parent2 = tournament_selection(population, x, y, tournament_size, fitness_hole_prob)
                child1, child2 = recombination(parent1, parent2)
                offspring.extend([child1, child2])

        population = survival_selection(population, offspring, x, y, pop_size)

        current_best = min(population, key=lambda i: fitness(i, x, y)[0])
        current_fitness = fitness(current_best, x, y)
        
        if current_fitness < best_fitness:
            best_fitness = current_fitness
            best_individual = current_best
            generations_without_improvement = 0
        else:
            generations_without_improvement += 1
        
        print(f"Generation {generation}: best fitness = {best_fitness}")
        print(f"Generation {generation}: best formula = {get_formula(best_individual)}")

        if best_fitness[0] < 1e-6:
            print(f"Early stopping: found a formula with MSE < 1e-6")
            break
        
        if generations_without_improvement >= stagnation_window:
            print(f"Early stopping: no improvement for {stagnation_window} generations")
            break
            
    return best_individual

# Problem creation and running GP

In [11]:
problem = np.load('../data/problem_1.npz')
x = problem['x']
y = problem['y']

num_vars = x.shape[0]
max_depth = np.minimum(10, num_vars*3)
pop_size = 2500
max_const = np.rint(np.maximum(np.max(np.abs(x)),np.max(np.abs(y))))

solution = genetic_programming(x, y, pop_size=pop_size, max_depth=max_depth, max_const=max_const, mutation_prob=0.1)

Creating initial population:   0%|          | 0/2500 [00:00<?, ?it/s]

  0%|          | 0/30 [00:00<?, ?it/s]

  0%|          | 0/2500 [00:00<?, ?it/s]

Generation 0: best fitness = (np.float64(7.125940794232773e-34), 2)
Generation 0: best formula = np.sin(x[0])
Early stopping: found a formula with MSE < 1e-6


In [12]:
print(f"Formula found: {get_formula(solution)}")
print(f"Training MSE: {fitness(solution, x, y, parsimony_pressure=False)[0]}")

Formula found: np.sin(x[0])
Training MSE: 7.125940794232773e-34


In [14]:
def constant_tuning(gp_solution: Node, x: np.ndarray, y: np.ndarray) -> Node:
    """Tune constants in the formula to minimise MSE if MSE of GP solution is too large"""
    best_fitness = fitness(gp_solution, x, y, parsimony_pressure=False)
    best_individual = gp_solution.copy()
    for node in collect_nodes(best_individual, take_root=True):
        if node.type == 'const':
            best_individual_value = node.value
            multiplier = np.random.normal(1, 1)
            for _ in range(5000):
                new_value = float(node.value) * multiplier
                node.value = str(new_value)
                current_fitness = fitness(best_individual, x, y, parsimony_pressure=False)
                if current_fitness < best_fitness:
                    best_fitness = current_fitness
                    best_individual_value = node.value
                else:
                    node.value = best_individual_value
                    multiplier = np.random.normal(1, 1)
    return best_individual

In [None]:
tuned_solution = constant_tuning(solution, x, y)
print(f"Formula tuned: {get_formula(tuned_solution)}")
print(f"MSE on tuned formula: {fitness(tuned_solution, x, y, parsimony_pressure=False)[0]}")