In [1]:
from gplearn.utils import check_random_state


rng = check_random_state(0)

# Training samples
X_train = rng.uniform(-1, 1, 100).reshape(50, 2)
y_train = X_train[:, 0] * X_train[:, 1] + X_train[:, 1] - 1
# print("shapes:", X_train.shape, y_train.shape)

# Testing samples
X_test = rng.uniform(-1, 1, 100).reshape(50, 2)
y_test = X_test[:, 0] * X_test[:, 1] + X_test[:, 1] - 1

In [None]:
from gplearn.genetic import SymbolicRegressor


est_gp = SymbolicRegressor(population_size=100,
                           generations=20, stopping_criteria=0.01,
                           init_depth=(2, 5),
                           optimize_constants=True,
                           p_crossover=0.7, p_subtree_mutation=0.1,
                           p_hoist_mutation=0.05, p_point_mutation=0.1,
                           max_samples=0.9, verbose=1,
                           parsimony_coefficient=0.01, random_state=42)
# est_gp.fit(X_train, y_train)

# 1. Todo: make executable program have input for constants

# automatische differentation (execute)
# JIT compiler for execute
# stop random variables mutation
# use leaset squres sparringly 
#   (wenige iterationen, grobe Toleranzen, residuen skalieren um größenordnungen zu normalisieren)

# Auswertung: gegeben zeitbudget: GN-verfahren gegen populationsgröße, was performt besser?


    |   Population Average    |             Best Individual              |
---- ------------------------- ------------------------------------------ ----------
 Gen   Length          Fitness   Length          Fitness      OOB Fitness  Time Left


RecursionError: maximum recursion depth exceeded

In [None]:
import itertools
import numpy as np
from gplearn.functions import _Function
from scipy.optimize import least_squares

class TestProgram(object):
    def __init__(self, transformer=None, metric=None):
        self.transformer = transformer
        self.metric = metric
        self.function = None
        self.raw_fitness = None
        add2 = _Function(function=np.add, name='add', arity=2)
        sub2 = _Function(function=np.subtract, name='sub', arity=2)
        mul2 = _Function(function=np.multiply, name='mul', arity=2)  
        # y_test = X_test[:, 0] * X_test[:, 1] + X_test[:, 1] - 1  
        self.program = [add2, mul2, 0, 1, sub2, 1, 0.5]
    
    def build_callable_program(self):
        # compiles single terminals, usable for lambdas
        constant_counter = itertools.count()    
        def translate_terminal(terminal, constants):
            if isinstance(terminal, int):
                return lambda X: X[:, terminal]
            if isinstance(terminal, float):
                return lambda X: np.repeat(constants[next(constant_counter)], X.shape[0])
            if callable(terminal):
                return lambda X, constants: terminal(X, constants)
            

        # Check for single-node programs
        node = self.program[0]
        if isinstance(node, float):
            return lambda X: np.repeat(node, X.shape[0])
        if isinstance(node, int):
            return lambda X: X[:, node]

        apply_stack = []

        for node in self.program:
            if isinstance(node, _Function):
                apply_stack.append([node])
            else:
                # Lazily evaluate later
                apply_stack[-1].append(node)
            
            while len(apply_stack[-1]) == apply_stack[-1][0].arity + 1:
                # Apply functions that have sufficient arguments
                print(apply_stack)
                function = apply_stack[-1][0]
                terminals = apply_stack[-1][1:]
                # turn terminals into lambdas
                intermediate_function = lambda X, constants: function(
                    *[translate_terminal(t, constants)(X, constants) for t in terminals])
                if len(apply_stack) != 1:
                    apply_stack.pop()
                    apply_stack[-1].append(intermediate_function)
                else:
                    return intermediate_function

        
    def optimized_fitness(self, X, y, sample_weight):
        # Check for single-node programs
        # time0 = time.time()
        node = self.program[0]
        if isinstance(node, float) or isinstance(node, int):
            return self.raw_fitness(X, y, sample_weight)
        
        self.function = self.build_callable_program()
        
        # build objective function for optimization
        def objective(constants):
            y_pred = self.function(X, constants)
            if self.transformer:
                y_pred = self.transformer(y_pred)
            return self.metric(y, y_pred, sample_weight)

        # Extract initial constants from the program
        initial_constants = [node for node in self.program if isinstance(node, float)]

        if initial_constants:
            result = least_squares(objective, initial_constants) #, method="lm", verbose=2, ftol=10e-4, gtol=10e-4, xtol=10e-4, jac="cs")
            optimized_constants = result.x
            print("optimized constants:", optimized_constants)

            # Update the program with optimized constants
            const_idx = 0
            for i, node in enumerate(self.program):
                if isinstance(node, float):
                    self.program[i] = optimized_constants[const_idx]
                    const_idx += 1

            # time1 = time.time()
            # if time1 - time0 > 2:
            # print(f'Optimized fitness took {time1 - time0} seconds')
            fitness = result.fun[0]
            # print(type(fitness))
            return fitness
        else:
            fitness = self.raw_fitness(X, y, sample_weight)
            # print("no constants to optimize for")
            return fitness

        # optimized_fitness = self.raw_fitness(X, y, sample_weight)
        # print(f'Unoptimized fitness: {unoptimized_fitness}')
        # print(f'Optimized fitness: {optimized_fitness}')

program = TestProgram()
test_fit = program.optimized_fitness(X_train, y_train, None)

[[<gplearn.functions._Function object at 0x7f47244e8230>], [<gplearn.functions._Function object at 0x7f4724566660>, 0, 1]]
[[<gplearn.functions._Function object at 0x7f47244e8230>, <function TestProgram.build_callable_program.<locals>.<lambda> at 0x7f471fb444a0>], [<gplearn.functions._Function object at 0x7f472442f920>, 1, 0.5]]
[[<gplearn.functions._Function object at 0x7f47244e8230>, <function TestProgram.build_callable_program.<locals>.<lambda> at 0x7f471fb444a0>, <function TestProgram.build_callable_program.<locals>.<lambda> at 0x7f471fb45300>]]
