Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

## Import

In [2]:
import numpy as np
import random
from copy import deepcopy
from icecream import ic
from tqdm.auto import tqdm
import importlib
from gxgp import gp_common
importlib.reload(gp_common)
import gxgp.node as node_mod
importlib.reload(node_mod)
from gxgp.node import Node

## Data preparation

In [4]:
#choose the problem here
PROBLEM_NAME = 'problem_8.npz'

#estract the data
problem = np.load(PROBLEM_NAME)
x = problem['x']
y = problem['y']

#number of inputs
NUM_INPUT = x.shape[0]

terminals_set = set()

for i in range(NUM_INPUT):
    string = 'x' + '[' + str(i) + ']'
    terminals_set.add(string)

terminals_set


{'x[0]', 'x[1]', 'x[2]', 'x[3]', 'x[4]', 'x[5]'}

## Data structures

In [3]:
#safe division: returns a/b if b is different from 0, 1.0 otherwise
def safe_divide(a, b):
    return np.where(np.abs(b) > 1e-6, np.divide(a, b), 1.0)

#safe log: returns log(x) if x is different from 0, 1e-6 otherwise
def safe_log(x):
    x_safe = np.where(x <= 1e-6, 1e-6, x)
    return np.log(np.abs(x_safe))

def safe_sqrt(x):
    return np.sqrt(np.abs(x))

def safe_reciprocal(x: np.ndarray) -> np.ndarray:
    x = np.asarray(x)  # Assicura che x sia un array numpy
    return np.where(x == 0, 0, np.reciprocal(x))

#set of arithmetic functions
arithmetic_set = [np.add, np.subtract, np.multiply, safe_divide]

#set of unary functions
unary_set = [np.sin, np.cos, np.abs, safe_log, safe_sqrt, safe_reciprocal]




## functions 

In [None]:
#builds a lambda function from a string
def build_function(expr: str):
    ns = {'np': np, 'safe_log': safe_log,'safe_sqrt': safe_sqrt, 'safe_divide': safe_divide, 'safe_reciprocal': safe_reciprocal}
    return eval('lambda x: ' + expr, ns)

#fitness function used was -mse
def fitness_function(y_true, x, tree):
    funz = str(tree)
    func = build_function(funz)
    #penality on fitness for invalid values
    if np.any(np.isnan(func(x))) or np.any(np.isinf(func(x))):
        return -1e12  
    fitness = -100*np.square(y_true-func(x)).sum()/len(y_true)
    return fitness

#generates a random tree
def generate_random_tree(max_depth: int) -> Node:

    if max_depth == 0 or random.random() < 0.3:
        if random.random() < 0.6:
            term = random.choice(list(terminals_set))
        else:
            term = random.uniform(-100, 100)
        return Node(term)
    else:
        if random.random() < 0.5:
            # choose an unary function
            func = random.choice(list(unary_set))
            # Recursively it generates a subtree
            child = generate_random_tree(max_depth - 1)
            return Node(func, [child])
        else:
            # choose a random arithmetic function
            func = random.choice(list(arithmetic_set))
            left = generate_random_tree(max_depth - 1)
            right = generate_random_tree(max_depth - 1)
            return Node(func, [left, right])

#replace subtree
def replace_subtree(current: Node, target: Node, new_subtree: Node) -> bool:
    #list of successors of the current node
    children = current.successors  
    for i, child in enumerate(children):
        if child is target:
            # replace the children with the new subtree
            children[i] = new_subtree
            current.successors = children  
            return True
        else:
            if replace_subtree(child, target, new_subtree):
                return True
    return False

def sub_tree_mutation(individual: Node) -> Node:
  
    mutated = deepcopy(individual)
    
    nodes = list(mutated.subtree)
    
    if not nodes:
        return mutated
    
    target = random.choice(nodes)
    new_subtree = generate_random_tree(2)
    replaced = replace_subtree(mutated, target, new_subtree)
    
    return mutated

def point_mutation(individual: Node) -> Node:

    mutated = deepcopy(individual)
    
    nodes = list(mutated.subtree)
    
    if not nodes:
        return mutated
    
    target = random.choice(nodes)
    
    if len(target._successors) == 2:
        new_func = random.choice(list(arithmetic_set))
        target._func = lambda *args, **kwargs: new_func(*args, **kwargs)
        target._arity = 2
        #differentiate between numpy functions and custom functions
        if hasattr(np, new_func.__name__) and getattr(np, new_func.__name__) is new_func:
            target._str = f'np.{new_func.__name__}'
        else:
            target._str = new_func.__name__
    elif len(target._successors) == 1:
        new_func = random.choice(list(unary_set))
        target._func = lambda *args, **kwargs: new_func(*args, **kwargs)
        target._arity = 1
        if hasattr(np, new_func.__name__) and getattr(np, new_func.__name__) is new_func:
            target._str = f'np.{new_func.__name__}'
        else:
            target._str = new_func.__name__
    elif len(target._successors) == 0:
        try:
            # Try to convert the terminal to a number, if the conversion fails, it is a variable (a string)
            current_value = float(target._str)
            new_value = current_value * random.random()
            target._func = eval(f'lambda **_kw: {new_value}')
            target._str = f'{new_value:g}'
        except ValueError:
            new_terminal = random.choice(list(terminals_set))
            target._func = lambda **_kw: _kw[new_terminal]
            target._str = new_terminal
        target._arity = 0
    else:
        pass

    return mutated

def hoist_mutation(individual: Node) -> Node:
    
    nodes = list(individual.subtree)
    
    #removes the root from the node list
    nodes.remove(individual)
    
    if not nodes:
        return individual
    
    target = random.choice(nodes)
    
    return target

class Individual:
    def __init__(self, genome, fitness=0):
        self.genome = genome
        self.fitness = fitness 

def elitist_selection(population, elite_size=10):
    elites = sorted(population, key=lambda x: x.fitness, reverse=True)[:elite_size]
    return np.random.choice(elites)

def tournament_selection(population, T=3):
    selected = []
    for _ in range(T):
        selected.append(random.choice(population))
    return max(selected, key=lambda x: x.fitness)


## Finding the solution

In [None]:
#this cell is ran only the fist time, than the best individual is saved after every run of the algorithm
genome = generate_random_tree(5)
fitness = fitness_function(y, x, genome)
#best is the all time best individual
best = Individual(genome, fitness)
#bestfitness is the fitness of the best individual in the population in the current generation
bestfitness = fitness

In [None]:
POPULATION_SIZE = 120
MAX_GENERATIONS = 10000
TOURNAMENT_SIZE = 5
MUTATION_RATE = 0.6
OFFSPRING_SIZE = 20
#if the best fitness doesn't change for MAX_COUNTER generations, the population is reinitialized or mutated
MAX_COUNTER = 85
#max length of the tree
MAX_LEN = 200

population = []
population.append(best)


#initialization
for _ in range(0, POPULATION_SIZE):
    tree = generate_random_tree(5)
    fitness = fitness_function(y, x, tree)
    guy = Individual(tree, fitness)
    population.append(guy)

best = max(population, key=lambda x: x.fitness)
COUNTER = 0

for generation in tqdm(range(MAX_GENERATIONS)):
    offspring = list()
    for _ in range(OFFSPRING_SIZE):
        if random.random() < MUTATION_RATE:
            parent = tournament_selection(population, TOURNAMENT_SIZE)
            #point or subtree mutation are used with 90% probability
            if random.random() < 0.9:
                if random.random() < 0.5:
                    child = point_mutation(parent.genome)
                else:
                    child = sub_tree_mutation(parent.genome)
            else:
                child = hoist_mutation(parent.genome)        
        else:
            parent1 = elitist_selection(population)
            parent2 = tournament_selection(population, TOURNAMENT_SIZE)
            if random.random() < 0.5:
                child = gp_common.xover_swap_subtree(parent1.genome, parent2.genome)
            else:
                child = gp_common.xover_swap_subtree(parent2.genome, parent1.genome)

        #contain the bloating
            smallerchild = deepcopy(child)
            while len(smallerchild) > MAX_LEN:
                smallerchild = hoist_mutation(child)
            fitness = fitness_function(y, x, smallerchild)
            offspring.append(Individual(smallerchild, fitness)) 
            
    population.extend(offspring)
    population.sort(key=lambda i: i.fitness, reverse=True)
    population = population[:POPULATION_SIZE]
    if(best.fitness < population[0].fitness):
        best = population[0]
    if(bestfitness == population[0].fitness):
        COUNTER = COUNTER + 1
    else:
        COUNTER = 0
        bestfitness = population[0].fitness
    if(COUNTER > MAX_COUNTER):  
        if random.random() < 0.9:
            for _ in range(0, POPULATION_SIZE):
                if random.random() < 0.6:
                    tree = sub_tree_mutation(population[_].genome)
                else:
                    tree = hoist_mutation(population[_].genome)
                fitness = fitness_function(y, x, tree)
                guy = Individual(tree, fitness)
                population[_] = guy
            population.sort(key=lambda i: i.fitness, reverse=True)
        else:
            population[0] = best
            for _ in range(1, POPULATION_SIZE):
                tree = generate_random_tree(5)
                fitness = fitness_function(y, x, tree)
                guy = Individual(tree, fitness)
                population.append(guy)
        COUNTER = 0

print(best.genome)
print(best.fitness)
    
    


In [None]:
best.fitness

In [None]:
str(best.genome)

In [None]:
len(best.genome)

## Evaluation

In [1]:
import s334034
import importlib
importlib.reload(s334034)


<module 's334034' from 'c:\\Users\\emanu\\CI\\prova3\\s334034.py'>

To set the y change the problem name in the "Data preparation" section

In [None]:
print(f"MSE: {100*np.square(y-s334034.f1(x)).sum()/len(y):g}")

MSE: 7.12594e-32


In [12]:
print(f"MSE: {100*np.square(y-s334034.f2(x)).sum()/len(y):g}")

MSE: 2.03839e+14


In [15]:
print(f"MSE: {100*np.square(y-s334034.f3(x)).sum()/len(y):g}")

MSE: 2697.29


In [17]:
print(f"MSE: {100*np.square(y-s334034.f4(x)).sum()/len(y):g}")

MSE: 4.74355


In [19]:
print(f"MSE: {100*np.square(y-s334034.f5(x)).sum()/len(y):g}")

MSE: 2.5606e-18


In [21]:
print(f"MSE: {100*np.square(y-s334034.f6(x)).sum()/len(y):g}")

MSE: 0.0087182


In [23]:
print(f"MSE: {100*np.square(y-s334034.f7(x)).sum()/len(y):g}")

MSE: 388.378


In [5]:
print(f"MSE: {100*np.square(y-s334034.f8(x)).sum()/len(y):g}")

MSE: 5.176e+07
