In [2294]:
import numpy as np
from icecream import ic
from dataclasses import dataclass
from tqdm.auto import tqdm

# Symbolic regression
Symbolic regression (SR) is a type of regression analysis that searches the space of mathematical expressions to find the model that best fits a given dataset, both in terms of accuracy and simplicity.

## Input dataset

The algorithm takes as input a file containing a dataset with the X values ​​and the expected output Y


In [2295]:
#list of operations that our program supports
double_operation = ['add', 'sub', 'mul', 'div', "pow"]
single_operation = ['neg', 'sin', 'cos', 'tan', 'exp', 'log', 'sqrt']

#we load from a file the dataset of values ​​X and the expected output Y
problem = np.load('data/problem_0.npz')
X = problem['x']
Y = problem['y']
ic(X.shape)
None

ic| X.shape: (2, 1000)


### Utilities

In [2296]:
@dataclass
class Node:
    value: str|int
    left: object = None
    right: object = None

@dataclass
class Individual:
    genome: Node
    num: int
    param: list
    fitness: float = None

#we duplicate the tree
def rec_copy(node):
    left = None
    right = None
    if node.left != None:
        left = rec_copy(node.left)
    if node.right != None:
        right = rec_copy(node.right)
    return Node(node.value, left, right)

#we modify the leaf x with y in the tree
def rec_modify(node, x, y):
    if type(node.value) == str:
        if node.value in double_operation:
            if rec_modify(node.left, x, y) == 1:
                return 1
            if rec_modify(node.right, x, y) == 1:
                return 1
        else:
            if rec_modify(node.left, x, y) == 1:
                return 1
    else:
        if node.value == x:
            node.value = y
            return 1
    return 0

#we generate a random tree with max_depth equals to max_depth
def generate_random_tree(max_depth, val, current_depth=0):
    if current_depth >= max_depth or (current_depth > 0 and np.random.random() < 0.3):
        if len(val) > 0:
            x = np.random.randint(0, len(val))
            v = val.pop(x)
        else:
            v = - np.random.randint(1, 10)
        return Node(v, None, None)
    if np.random.random() > 0.5:
        node =  Node(double_operation[np.random.randint(0, len(double_operation))])
        node.left = generate_random_tree(max_depth, val, current_depth + 1)
        node.right = generate_random_tree(max_depth, val, current_depth + 1)
    else:
        node = Node(single_operation[np.random.randint(0, len(single_operation))])
        node.left = generate_random_tree(max_depth, val, current_depth + 1)
    return node

#we count the number of parameters present in the tree and insert their indicators into a list
def rec_have_param(node, list):
    if type(node.value) == str:
        if node.value in double_operation:
            return rec_have_param(node.left, list) + rec_have_param(node.right, list)
        else:
            return rec_have_param(node.left, list)
    elif node.value < 0:
        list.append(node.value)
        return 1
    else:
        return 0
    
#we count the number of value present in the tree and insert their indicators into a list
def rec_have_value(node, list):
    if type(node.value) == str:
        if node.value in double_operation:
            return rec_have_value(node.left, list) + rec_have_value(node.right, list)
        else:
            return rec_have_value(node.left, list)
    elif node.value >= 0:
        list.append(node.value)
        return 1
    else:
        return 0

#we count the number of operation
def rec_num_operation(node):
    if type(node.value) == str:
        if node.value in double_operation:
            return rec_num_operation(node.left) + rec_num_operation(node.right) + 1
        else:
            return rec_num_operation(node.left) + 1
    else:
        return 0

#generates a string containing the expression that the tree represents
def rec_toString(node, param):
    if type(node.value) == str:
        if node.value in double_operation:
            if node.value == "add":
                return "(" + rec_toString(node.left, param) + "+" + rec_toString(node.right, param) + ")"
            elif node.value == "sub":
                return "(" + rec_toString(node.left, param) + "-" + rec_toString(node.right, param) + ")"
            elif node.value == "mul":
                return "(" + rec_toString(node.left, param) + "*" + rec_toString(node.right, param) + ")"
            elif node.value == "div":
                return "(" + rec_toString(node.left, param) + "/" + rec_toString(node.right, param) + ")"   
            elif node.value == "pow":
                return "(" + rec_toString(node.left, param) + "**" + rec_toString(node.right, param) + ")"   
        else:
            if node.value == "neg":
                return "-(" + rec_toString(node.left, param) + ")"
            elif node.value == "sin":
                return "sin(" + rec_toString(node.left, param) + ")"
            elif node.value == "cos":
                return "cos(" + rec_toString(node.left, param) + ")"
            elif node.value == "tan":
                return "tan(" + rec_toString(node.left, param) + ")"
            elif node.value == "exp":
                return "exp(" + rec_toString(node.left, param) + ")"
            elif node.value == "log":
                return "log(" + rec_toString(node.left, param) + ")"
            elif node.value == "sqrt":
                return "sqrt(" + rec_toString(node.left, param) + ")"
    if node.value < 0:
        return str(param[(- node.value) - 1])
    return "x" + str(node.value)

#calculates the result of the tree expression with the values ​​received as parameters
def generate_exp(node, list, param):
    if type(node.value) == str:
        if node.value in double_operation:
            q1 = generate_exp(node.left, list, param)
            q2 = generate_exp(node.right, list, param)
            if q1 == np.nan or q2 == np.nan or q1 == np.inf or q2 == np.inf:
                return np.inf
            if node.value == "add":
                return q1 + q2
            elif node.value == "sub":
                return q1 - q2
            elif node.value == "mul":
                return q1 * q2
            elif node.value == "div":
                if q2 == 0:
                    return np.inf
                return q1 / q2
            elif node.value == "pow":
                if q1 < 0:
                    return np.inf
                return np.float_power(q1, q2)
        else:
            q = generate_exp(node.left, list, param)
            if q == np.nan or q == np.inf:
                return np.inf
            if node.value == "neg":
                return - q
            elif node.value == "sin":
                return np.sin(q)
            elif node.value == "cos":
                return np.cos(q)
            elif node.value == "tan":
                return np.tan(q)
            elif node.value == "exp":
                return np.exp(q)
            elif node.value == "log":
                if q <= 0:
                    return np.inf
                return np.log(q)
            elif node.value == "sqrt":
                if q < 0:
                    return np.inf
                return np.sqrt(q)
    else:
        if node.value < 0:
            return param[(- node.value) - 1]
        return list[node.value]

### Mutation

The mutation algorithm take in input one individual and generates a new one

In [2297]:
def rec_nodes_mutation(node, prob):
    if type(node.value) == str:
        if node.value in double_operation:
            if np.random.random() < prob:
                node.value = double_operation[np.random.randint(0, len(double_operation))]
            if node.left != None:
                rec_nodes_mutation(node.left, prob)
            if node.right != None:
                rec_nodes_mutation(node.right, prob)
        else:
            if np.random.random() < prob:
                node.value = single_operation[np.random.randint(0, len(single_operation))]
            if node.left != None:
                return rec_nodes_mutation(node.left, prob)
    else:
        return 0

def mutation(p: Individual):
    genome = rec_copy(p.genome)
    new_param = p.param.copy()
    if np.random.random() < 0.4 and p.num != 0:
        #modify a operation with probability 1/p.num
        rec_nodes_mutation(genome, 1/p.num)
    else:
        if np.random.random() < 0.1:
            #add a new operation to the tree
            prec = None
            next1 = genome
            ok = 0
            #select a random leaf
            while ok == 0:
                if type(next1.value) != str:
                    ok = 1
                else:
                    if np.random.random() < 0.5:
                        if next1.left != None:
                            prec = next1
                            next1 = next1.left
                    else:
                        if next1.right != None:
                            prec = next1
                            next1 = next1.right
            #decide whether to add the new operation to the right or left of the node "prec" and with probability 50% add a double_operation (setting a new param or value for the empty leaf) or single_operation
            if prec.left == next1:
                if np.random.random() > 0.5:
                    new_node = Node(-1, None, None)
                    l = []
                    x = rec_have_value(genome, l)
                    if x > 0:
                        for i in range(0, X.shape[0]):
                            if i not in l:
                                new_node.value = i
                    if new_node.value == -1:
                        param = np.random.randint(0, 10)
                        new_param.append(param)
                        new_node.value = -len(new_param)
                    prec.left = Node(double_operation[np.random.randint(0, len(double_operation))], next1, new_node) 
                else:
                    prec.left = Node(single_operation[np.random.randint(0, len(single_operation))], next1, None)
            else:
                if np.random.random() > 0.5:
                    new_node = Node(-1, None, None)
                    l = []
                    x = rec_have_value(genome, l)
                    if x > 0:
                        for i in range(0, X.shape[0]):
                            if i not in l:
                                new_node.value = i
                    if new_node.value == -1:
                        param = np.random.randint(0, 10)
                        new_param.append(param)
                        new_node.value = -len(new_param)
                    prec.right = Node(double_operation[np.random.randint(0, len(double_operation))], next1, new_node) 
                else:
                    prec.right = Node(single_operation[np.random.randint(0, len(single_operation))], next1, None)
            return Individual(genome, p.num+1, new_param)
        else:
            if np.random.random() < 0.5 and len(p.param) != 0:
                #modify a param incrementing or decrementing it
                index = np.random.randint(0, len(p.param))
                if np.random.random() < 0.5:
                    new_param[index] += 1
                else:
                    new_param[index] -= 1
            else:
                #replace a leaf with another leaf
                candidates = np.random.choice([i for i in range(-len(new_param), X.shape[0])], 2)
                rec_modify(genome, candidates[0], candidates[1])
    return Individual(genome, p.num, new_param)

### Crossover

The crossover algorithm take in input two individuals and generates a new one, for generating the new individual we take the first double operation in the first Individual and set in its right subtree a random subtree of the second Individual

In [2298]:
def crossover(p1: Individual, p2: Individual):
    new_param = []
    l = []
    #we copy the p1 tree, find the next node where we perform crossover and set next.right with a temporary value
    tree1 = rec_copy(p1.genome)
    next = tree1
    while type(next.value) == str and next.value not in double_operation:
        next = next.left
    next.right = Node(0, None, None)
    #we search for the parameters used in the truncated tree tree1 and save the ones used in new_param and we go to modify their indicator in the tree based on the new list
    x = rec_have_param(tree1, l)
    if x > 0:
        for i in l:
            new_param.append(p1.param[(- i) - 1])
            rec_modify(tree1, i, -len(new_param))
    #we copy the right subtree of p2 and save the parameters used inside it and we add them to the list of new_param and we modify their indicator in the tree based on the new list
    l.clear()
    next1 = p2.genome
    if type(next1.value) == str and next1.value in double_operation:
        if np.random.random() > 0.5:
            subt = rec_copy(next1.left)
        else:
            subt = rec_copy(next1.right)
    elif type(next1.value) == str:
        subt = rec_copy(next1.left)
    else:
        subt = Node(single_operation[np.random.randint(0, len(single_operation))], rec_copy(next1), None)
    x = rec_have_param(subt, l)
    if x > 0:
        for i in l:
            new_param.append(p2.param[(- i) - 1])
            rec_modify(subt, i, -len(new_param))
    #we merge the truncated tree tree1 with the subtree subt
    next.right = subt
    num_nodi = rec_num_operation(tree1)
    return Individual(tree1, num_nodi, new_param)

### Fitness

The fitness function is composed by the MSE (Mean Square Error) and the number of operations present in the Individual

In [2299]:
def fitness(individual: Individual):
    value = 0
    c = 0
    num_elementi = X.shape[1]
    for riga in X:
        value += np.square(Y[c] - generate_exp(individual.genome, riga, individual.param))
        c += 1
    return round(-float(100*value/num_elementi), 10), -int(individual.num)

### Parent Selection

For parent selection we use Tournament Selection

In [2300]:
def parent_selection(population):
    candidates = sorted(np.random.choice(population, 4), key=lambda e: e.fitness, reverse = True)
    return candidates[0]

### Simplify the tree

Algorithm that takes an Individual as input and modifies it by eliminating useless operations that may be present in the tree

In [2301]:
def rec_simplify_tree(node, param):
    if type(node.value) == str:
        if node.value in double_operation:
            if node.value == "add":
                if type(node.left.value) != str and node.left.value < 0 and param[-node.left.value - 1] == 0:
                    node.value = node.right.value
                    node.left = node.right.left
                    node.right = node.right.right
                    return 1
                if type(node.right.value) != str and node.right.value < 0 and param[-node.right.value - 1] == 0:
                    node.value = node.left.value
                    node.right = node.left.right
                    node.left = node.left.left
                    return 1
            if node.value == "sub":
                if type(node.right.value) != str and node.right.value < 0 and param[-node.right.value - 1] == 0:
                    node.value = node.left.value
                    node.right = node.left.right
                    node.left = node.left.left
                    return 1
            if node.value == "mul":
                if type(node.left.value) != str and node.left.value < 0 and param[-node.left.value - 1] == 1:
                    node.value = node.right.value
                    node.left = node.right.left
                    node.right = node.right.right
                    return 1
                if type(node.right.value) != str and node.right.value < 0 and param[-node.right.value - 1] == 1:
                    node.value = node.left.value
                    node.right = node.left.right
                    node.left = node.left.left
                    return 1
            if node.value == "div":
                if type(node.right.value) != str and node.right.value < 0 and param[-node.right.value - 1] == 1:
                    node.value = node.left.value
                    node.right = node.left.right
                    node.left = node.left.left
                    return 1
            return rec_simplify_tree(node.left, param) + rec_simplify_tree(node.right, param)
        else:
            if node.value == "neg":
                if type(node.left.value) == str and node.left.value == "neg":
                    node.value = node.left.left.value
                    node.right = node.left.left.right
                    node.left = node.left.left.left
                    return 0
            if node.value == "exp":
                if type(node.left.value) == str and node.left.value == "log":
                    node.value = node.left.left.value
                    node.right = node.left.left.right
                    node.left = node.left.left.left
                    return 0
            if node.value == "log":
                if type(node.left.value) == str and node.left.value == "exp":
                    node.value = node.left.left.value
                    node.right = node.left.left.right
                    node.left = node.left.left.left
                    return 0
            return rec_simplify_tree(node.left, param)
    else:
        return 0

def simplify_tree(individual: Individual):
    old_genome = rec_copy(individual.genome)
    val = rec_simplify_tree(individual.genome, individual.param)
    if val > 0:
        #setting the parameters
        new_param = []
        l = []
        x = rec_have_param(individual.genome, l)
        if x > 0:
            for i in l:
                new_param.append(individual.param[(- i) - 1])
                rec_modify(individual.genome, i, -len(new_param))
        individual.param = new_param
    individual.num = rec_num_operation(individual.genome)
    if individual.num == 0:
        individual.genome = old_genome 
    return individual

## Initial Population

The algorithm generate a random initial population with minimal dimension of the Individual equal to the number of value in the X dataset

In [2302]:
INITIAL_POPULATION = 30

#generate the initial population
population = []
while len(population) < INITIAL_POPULATION:
    tree = generate_random_tree(X.shape[0], [i for i in range(0, X.shape[0])], 0)
    l = []
    new_param = []
    x = rec_have_param(tree, l)
    if x > 0:
        for i in l:
            new_param.append(np.random.randint(0, 10))
            rec_modify(tree, i, -len(new_param))
    i = Individual(tree, rec_num_operation(tree), new_param)
    i = simplify_tree(i)
    i.fitness = fitness(i)
    if i.fitness[0] != np.nan and i.fitness[0] != np.inf and i.fitness[0] != -np.inf:
        population.append(i)
population.sort(key=lambda i: i.fitness, reverse = True)
population

[Individual(genome=Node(value='add', left=Node(value='mul', left=Node(value=0, left=None, right=None), right=Node(value=1, left=None, right=None)), right=Node(value='sqrt', left=Node(value=-1, left=None, right=None), right=None)), num=3, param=[9], fitness=(-0.1901464407, -3)),
 Individual(genome=Node(value='cos', left=Node(value=1, left=None, right=None), right=None), num=1, param=[], fitness=(-0.8556113548, -1)),
 Individual(genome=Node(value='cos', left=Node(value=0, left=None, right=None), right=None), num=1, param=[], fitness=(-0.8759265341, -1)),
 Individual(genome=Node(value='cos', left=Node(value=0, left=None, right=None), right=None), num=1, param=[], fitness=(-0.8759265341, -1)),
 Individual(genome=Node(value='cos', left=Node(value='neg', left=Node(value=0, left=None, right=None), right=None), right=None), num=2, param=[], fitness=(-0.8759265341, -2)),
 Individual(genome=Node(value='tan', left=Node(value='mul', left=Node(value=0, left=None, right=None), right=Node(value=1, le

## Genetic Programming Algorithm

We use a Tree-based Genetic Programming to find the solution

In [2303]:
POPULATION_SIZE = 40
OFFSPRING_SIZE = 8
MAX_GENERATIONS = 1000

for g in tqdm(range(MAX_GENERATIONS)):
    #generate the offspring
    offspring = list()
    for _ in range(OFFSPRING_SIZE//2):
        #parent selection
        i1 = parent_selection(population)
        i2 = parent_selection(population)
        #crossover
        o1 = crossover(i1, i2)
        o2 = crossover(i2, i1)
        #mutation
        o1 = mutation(o1)
        o2 = mutation(o2)
        #simplify the tree
        o1 = simplify_tree(o1)
        o1 = simplify_tree(o2)
        #evaluate the genome by calculating fitness and remove duplicates
        o1.fitness = fitness(o1)
        if o1.fitness != np.nan:
            ok = 0
            for x in population:
                if x.fitness == o1.fitness:
                    ok = 1
            if ok == 0:
                offspring.append(o1)
        o2.fitness = fitness(o2)
        if o2.fitness != np.nan:
            ok = 0
            for x in population:
                if x.fitness == o2.fitness:
                    ok = 1
            if ok == 0:
                offspring.append(o2)
    #reintroduce the offspring into the population and keep only the first POPULATION_SIZE individuals
    population.extend(offspring)
    population.sort(key=lambda i: i.fitness, reverse = True)
    population = population[:POPULATION_SIZE]
#best individual
ic(population[0].fitness)
print(rec_toString(population[0].genome, population[0].param))

  0%|          | 0/1000 [00:00<?, ?it/s]

ic| population[0].fitness: (-0.1100761314, -3)


((x0*x1)+sqrt(13))
