In [670]:
import numpy as np
import random
from dataclasses import dataclass
from tqdm.auto import tqdm

In [671]:
problem = np.load('../data/problem_1.npz')
x = problem['x']
y = problem['y']
print(x.shape)
print(y.shape)
PROBLEM_SIZE = x.shape[0]

POPULATION_SIZE = PROBLEM_SIZE*10
OFFSPRING_SIZE = int(POPULATION_SIZE*0.5)
MAX_GENERATIONS = 50

(1, 500)
(500,)


In [672]:
rng = np.random.Generator(np.random.PCG64([POPULATION_SIZE, OFFSPRING_SIZE, MAX_GENERATIONS]))

In [673]:
# Operators

BINARY_OPERATORS = ["+", "-", "*", "/"]
UNARY_OPERATORS = ["sin", "cos", "exp", "log"]
VARIABLES = ["X_"+str(i) for i in range(PROBLEM_SIZE)]

In [674]:
@dataclass
class Individual:
    genome: list
    fitness: float = None

In [675]:
def compute_MSE(Y_pred, Y_real):
    MSE = 100*np.square(Y_real-Y_pred).sum()/len(Y_real)
    return MSE

### Logic

We have to find a combination of X with np operands that gives as result Y.
Each formula will have the form: A * Op(X1) Op1 B * Op(X2) Op2 ... N * Op(Xn), where A, B, ..N are positive costants, Op(Xn) are unary operators, Opn binary operators 

Example of formula expression for 2 variable problem:
["1", "", "X_0", "+", "0.2", "sin", "X_1", "+"]

In [676]:
def compute_FX(F,X)->np.ndarray:
    
    def solve_row(row):
        Y_i = 0
        Term_i = 0
        last_unary = ""
        for token in F:
            if token in BINARY_OPERATORS:
                Y_i = eval(f"Y_i {token} Term_i") if Term_i !=0 else Y_i

            elif token in UNARY_OPERATORS + [""]:
                last_unary = token

            elif token in VARIABLES:
                idx = int(token.split("_")[1])
                if last_unary == "":
                    Term_i *= X[idx,row]
                else:
                    Term_i *= getattr(np, last_unary)(X[idx,row] if last_unary!= "log" else np.abs(X[idx,row]))
            else:
                Term_i += float(token)
        return Y_i 

    Y_pred = [solve_row(i) for i in range(x.shape[1])]
    mse = compute_MSE(Y_pred,y)
    return mse

In [677]:
def generate_individual()->Individual:
    individual_gen = []
    for n in range(PROBLEM_SIZE):
       
        costant = (np.mean(y)/(np.mean(x,1)*PROBLEM_SIZE))[0]
        individual_gen.append(costant)

        unary_op = "" if rng.random() < 0.5 else str(rng.choice(UNARY_OPERATORS,1)[0])
        individual_gen.append(unary_op)

        var = "X_" + str(n)
        individual_gen.append(var)

        bin_op = rng.choice(BINARY_OPERATORS,1) if n !=0 else rng.choice(["+","-"],1)
        individual_gen.append(str(bin_op[0]))
        
    return Individual(individual_gen,compute_FX(individual_gen,x))

In [678]:
def parent_selection(population):
    candidates = sorted(rng.choice(population,4), key=lambda e: e.fitness)
    return rng.choice(candidates,2)

In [679]:
def xover(p1:Individual,p2:Individual)->Individual:
    ''' Exchange terms between two individuals'''

    # Divide genomes for each parent in the terms of the formula
    blocks1 = [p1.genome[i:i+4] for i in range(0, len(p1.genome), 4)]
    blocks2 = [p2.genome[i:i+4] for i in range(0, len(p2.genome), 4)]

    # Choose N random Terms to switch
    xover_blocks = rng.choice(range(len(blocks1)),rng.integers(0,len(blocks1)))

    # Exchange terms
    for b_i in xover_blocks:
        blocks1[b_i] = blocks2[b_i]

    # Rebuild genome
    child_genome = [elem for block in blocks1 for elem in block]

    return Individual(child_genome)

In [680]:
def mutation(p:Individual)->Individual:
    ''' Mutate terms of one individual'''

    # Divide genome in the terms of the formula
    blocks = [p.genome[i:i+4] for i in range(0, len(p.genome), 4)]
    
    # Choose N random Terms to mutate
    mutate_blocks = rng.choice(range(len(blocks)),rng.integers(0,len(blocks)))

    for b_i in mutate_blocks:
        # Choose between the elements what to mutate 
        elems = rng.choice(range(4),rng.integers(0,4))

        for e_i in elems:
            if e_i == 0:
                blocks[b_i][e_i] = blocks[b_i][e_i] + blocks[b_i][e_i]*0.01 if rng.random()<0.5 else blocks[b_i][e_i] - blocks[b_i][e_i]*0.01
            elif e_i == 1:
                new_unary_op = "" if rng.random() < 0.5 else str(rng.choice(UNARY_OPERATORS,1)[0])
                blocks[b_i][e_i] = new_unary_op
            elif e_i == 3:
                new_bin_op = rng.choice(BINARY_OPERATORS,1) if b_i !=0 else rng.choice(["+","-"],1)
                blocks[b_i][e_i] = str(new_bin_op[0])  
            else:
                continue
    
    child_genome = [elem for block in blocks for elem in block]
    return Individual(child_genome)

In [681]:
def EA_resolution(population):
    for _ in tqdm(range(MAX_GENERATIONS)):
        offspring = list()

        for _ in range(OFFSPRING_SIZE):
            
            p1,p2 = parent_selection(population[:int(POPULATION_SIZE/2)])
            
            if rng.random() <0.5:
                o = xover(p1,p2)
            else:
                o = mutation(p1)

            offspring.append(o)

        for i in offspring:
            i.fitness = compute_FX(i.genome,x)

        population.extend(offspring)
        population.sort(key=lambda i: i.fitness)
        population = population[:POPULATION_SIZE]

    return population[0]

In [682]:
POPULATION = [generate_individual() for _ in range(POPULATION_SIZE)]
winner = EA_resolution(POPULATION)
print(winner)

100%|██████████| 50/50 [00:10<00:00,  4.61it/s]

Individual(genome=[np.float64(0.9718797984090422), 'sin', 'X_0', '+'], fitness=np.float64(0.02011294545641478))



