# A simple symbolic regression example

This notebook implements a **symbolic regression pipeline** based on **Genetic Programming (GP)** using the `flex` framework and its custom `GPSymbolicRegressor`.

The goal is to discover analytical expressions that best fit a given `PMLB` dataset by:
- evolving symbolic expressions;
- optimizing embedded numerical constants;
- penalizing overly complex expressions;
- evaluating performance on training and test datasets.

The code supports:
- multi-variable regression problems;
- automatic constant tuning using gradient-based optimization;
- parallel execution using Ray.

## Imports and Dependencies

This section imports all the required libraries for:

- **Genetic Programming**: `deap.gp`, custom `flex.gp` utilities
- **Numerical computation**: `numpy`, `mygrad`
- **Optimization**: `pygmo` (for constant fitting)
- **Machine Learning utilities**: `scikit-learn`
- **Parallelism**: `ray`
- **Dataset generation**: custom `generate_dataset` function

The code relies on both evolutionary optimization (for structure) and gradient-based optimization (for constants).

In [1]:
from deap import gp

from flex.gp import regressor as gps
from flex.gp.util import (
    detect_nested_trigonometric_functions,
    load_config_data,
    compile_individual_with_consts,
)
from flex.gp.primitives import add_primitives_to_pset_from_dict
import numpy as np
import ray


import warnings
import pygmo as pg

import re
from sklearn.metrics import r2_score

import time

import mygrad as mg
from mygrad._utils.lock_management import mem_guard_off

from functools import partial


from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from pmlb import fetch_data

# set up number of cpus per ray worker
num_cpus = 1

In [2]:
# --- Custom generate dataset function ---
def generate_dataset(problem: str="1027_ESL", random_state: int=42, scaleXy: bool=True):
    np.random.seed(42)
    num_variables = 1
    scaler_X = None
    scaler_y = None

    # PMLB datasets
    X, y = fetch_data(problem, return_X_y=True, local_cache_dir="./datasets")

    num_variables = X.shape[1]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.25, random_state=random_state
    )

    y_train = y_train.reshape(-1, 1)
    y_test = y_test.reshape(-1, 1)

    if scaleXy:
        scaler_X = StandardScaler()
        scaler_y = StandardScaler()

        X_train_scaled = scaler_X.fit_transform(X_train)
        y_train_scaled = scaler_y.fit_transform(y_train)
        X_test_scaled = scaler_X.transform(X_test)
    else:
        X_train_scaled = X_train
        y_train_scaled = y_train
        X_test_scaled = X_test

    y_test = y_test.flatten()
    y_train_scaled = y_train_scaled.flatten()

    num_train_points = X_train.shape[0]


    # note y_test and y_train_scaled must be flattened
    return (
        X_train_scaled,
        y_train_scaled,
        X_test_scaled,
        y_test,
        scaler_X,
        scaler_y,
        num_variables,
        num_train_points,
    )

In [3]:
def eval_model(individual, X, consts=[]):
    num_variables = X.shape[1]
    if num_variables > 1:
        X = [X[:, i] for i in range(num_variables)]
    else:
        X = [X]
    warnings.filterwarnings("ignore")
    y_pred = individual(*X, consts)
    return y_pred


def compute_MSE(individual, X, y, consts=[]):
    y_pred = eval_model(individual, X, consts)
    MSE = np.mean((y - y_pred) ** 2)

    if np.isnan(MSE) or np.isinf(MSE):
        MSE = 1e8

    return MSE


def eval_MSE_and_tune_constants(tree, toolbox, X, y):
    individual, num_consts = compile_individual_with_consts(tree, toolbox)

    if num_consts > 0:

        eval_MSE = partial(compute_MSE, individual=individual, X=X, y=y)

        x0 = np.ones(num_consts)

        class fitting_problem:
            def fitness(self, x):
                total_err = eval_MSE(consts=x)
                # return [total_err + 0.*(np.linalg.norm(x, 2))**2]
                return [total_err]

            def gradient(self, x):
                with mem_guard_off:
                    xt = mg.tensor(x, copy=False)
                    f = self.fitness(xt)[0]
                    f.backward()
                return xt.grad

            def get_bounds(self):
                return (-5.0 * np.ones(num_consts), 5.0 * np.ones(num_consts))

        # PYGMO SOLVER
        prb = pg.problem(fitting_problem())
        algo = pg.algorithm(pg.nlopt(solver="lbfgs"))
        # algo = pg.algorithm(pg.pso(gen=10))
        # pop = pg.population(prb, size=70)
        algo.extract(pg.nlopt).maxeval = 10
        pop = pg.population(prb, size=1)
        pop.push_back(x0)
        pop = algo.evolve(pop)
        MSE = pop.champion_f[0]
        consts = pop.champion_x

        if np.isinf(MSE) or np.isnan(MSE):
            MSE = 1e8
    else:
        MSE = compute_MSE(individual, X, y)
        consts = []
    return MSE, consts

## Expression Complexity and Structure Checks

These helper functions analyze individuals to:
- count trigonometric functions;
- detect nested trigonometric expressions;
- penalize overly complex or pathological solutions.

They are later used to regularize the fitness function.

In [4]:
def check_trig_fn(ind):
    return len(re.findall("cos", str(ind))) + len(re.findall("sin", str(ind)))


def check_nested_trig_fn(ind):
    return detect_nested_trigonometric_functions(str(ind))

def get_features_batch(
    individuals_batch,
    individ_feature_extractors=[len, check_nested_trig_fn, check_trig_fn],
):
    features_batch = [
        [fe(i) for i in individuals_batch] for fe in individ_feature_extractors
    ]

    individ_length = features_batch[0]
    nested_trigs = features_batch[1]
    num_trigs = features_batch[2]
    return individ_length, nested_trigs, num_trigs

## Fitness and Prediction Functions

The fitness function combines:
- prediction error (MSE),
- structural penalties (expression length),
- functional penalties (nested trigonometric functions).

A **Tarpeian selection** strategy is applied to discard very large trees early.

In [5]:
def predict(individuals_batch, toolbox, X, penalty, fitness_scale):

    predictions = [None] * len(individuals_batch)

    for i, tree in enumerate(individuals_batch):
        callable, _ = compile_individual_with_consts(tree, toolbox)
        predictions[i] = eval_model(callable, X, consts=tree.consts)

    return predictions


def compute_MSEs(individuals_batch, toolbox, X, y, penalty, fitness_scale):

    total_errs = [None] * len(individuals_batch)

    for i, tree in enumerate(individuals_batch):
        callable, _ = compile_individual_with_consts(tree, toolbox)
        total_errs[i] = compute_MSE(callable, X, y, consts=tree.consts)

    return total_errs


def compute_attributes(individuals_batch, toolbox, X, y, penalty, fitness_scale):

    attributes = [None] * len(individuals_batch)

    individ_length, nested_trigs, num_trigs = get_features_batch(individuals_batch)

    for i, tree in enumerate(individuals_batch):

        # Tarpeian selection
        if individ_length[i] >= 50:
            consts = None
            fitness = (1e8,)
        else:
            MSE, consts = eval_MSE_and_tune_constants(tree, toolbox, X, y)
            fitness = (
                fitness_scale
                * (
                    MSE
                    + 100000 * nested_trigs[i]
                    + 0.0 * num_trigs[i]
                    + penalty["reg_param"] * individ_length[i]
                ),
            )
        attributes[i] = {"consts": consts, "fitness": fitness}
    return attributes


def assign_attributes(individuals_batch, attributes):
    for ind, attr in zip(individuals_batch, attributes):
        ind.consts = attr["consts"]
        ind.fitness.values = attr["fitness"]


## Main Training and Evaluation Pipeline

The `eval` function orchestrates the entire symbolic regression process:
1. loads configuration from a YAML file;
2. generates training and test datasets;
3. builds the GP primitive set;
4. initializes the symbolic regressor;
5. trains the model;
6. evaluates performance on training and test sets.

In [6]:
def eval(problem, cfgfile, seed=42):

    regressor_params, config_file_data = load_config_data(cfgfile)

    scaleXy = config_file_data["gp"]["scaleXy"]

    # generate training and test datasets
    (
        X_train_scaled,
        y_train_scaled,
        X_test_scaled,
        y_test,
        _,
        scaler_y,
        num_variables,
        _,
    ) = generate_dataset(problem, scaleXy=scaleXy, random_state=seed)

    if num_variables == 1:
        pset = gp.PrimitiveSetTyped("Main", [float], float)
        pset.renameArguments(ARG0="x")
    elif num_variables == 2:
        pset = gp.PrimitiveSetTyped("Main", [float, float], float)
        pset.renameArguments(ARG0="x")
        pset.renameArguments(ARG1="y")
    else:
        pset = gp.PrimitiveSetTyped("Main", [float] * num_variables, float)

    pset = add_primitives_to_pset_from_dict(pset, config_file_data["gp"]["primitives"])

    batch_size = config_file_data["gp"]["batch_size"]
    if config_file_data["gp"]["use_constants"]:
        pset.addTerminal(object, float, "c")

    callback_func = assign_attributes
    fitness_scale = 1.0

    penalty = config_file_data["gp"]["penalty"]
    common_params = {"penalty": penalty, "fitness_scale": fitness_scale}

    gpsr = gps.GPSymbolicRegressor(
        pset_config=pset,
        fitness=compute_attributes,
        predict_func=predict,
        score_func=compute_MSEs,
        common_data=common_params,
        callback_func=callback_func,
        print_log=True,
        num_best_inds_str=1,
        save_best_individual=False,
        output_path="./",
        seed_str=None,
        batch_size=batch_size,
        num_cpus=num_cpus,
        remove_init_duplicates=True,
        **regressor_params,
    )

    
    est = gpsr

    tic = time.time()
    est.fit(X_train_scaled, y_train_scaled)
    toc = time.time()

    best = est.get_best_individuals(n_ind=1)[0]

    if hasattr(best, "consts"):
        print("Best parameters = ", best.consts)

    print("Elapsed time = ", toc - tic)
    individuals_per_sec = (
        (est.get_last_gen() + 1)
        * gpsr.num_individuals
        * gpsr.num_islands
        / (toc - tic)
    )
    print("Individuals per sec = ", individuals_per_sec)

    u_best = est.predict(X_test_scaled)

    # de-scale outputs before computing errors
    if scaleXy:
        u_best = scaler_y.inverse_transform(u_best.reshape(-1, 1)).flatten()

    MSE = np.mean((u_best - y_test) ** 2)
    r2_test = r2_score(y_test, u_best)
    print("MSE on the test set = ", MSE)
    print("R^2 on the test set = ", r2_test)

    pred_train = est.predict(X_train_scaled)

    if scaleXy:
        pred_train = scaler_y.inverse_transform(pred_train.reshape(-1, 1)).flatten()
        y_train_scaled = scaler_y.inverse_transform(
            y_train_scaled.reshape(-1, 1)
        ).flatten()

    MSE = np.mean((pred_train - y_train_scaled) ** 2)
    r2_train = r2_score(y_train_scaled, pred_train)
    print("MSE on the training set = ", MSE)
    print("R^2 on the training set = ", r2_train)

    return r2_train, r2_test

In [7]:
problem = "1027_ESL"
cfgfile = "simple_sr.yaml"


#seeds = [29802, 22118, 860, 15795, 21575, 5390, 11964, 6265, 23654, 11284]
seed = 29802

r2_tests = []

# possibly add rmse_train, rmse_test, rmse_val
header = ["problem", "trial", "r2_train", "r2_test", "seed"]


print("PROBLEM: ", problem)
print("seed: ", seed)
r2_train, r2_test = eval(problem=problem, cfgfile=cfgfile, seed=seed)
r2_tests.append(r2_test)

stats = {
    "problem": problem,
    "r2_train": r2_train,
    "r2_test": r2_test,
    "seed": seed,
}

print(stats)

# ray is explicitly shut down at the end of the execution to release computational resources
ray.shutdown()

PROBLEM:  1027_ESL
seed:  29802


2026-01-13 15:07:11,631	INFO worker.py:2007 -- Started a local Ray instance.


Generating initial population(s)...
 Removing duplicates from initial population(s)...




 DONE.
DONE.
Evaluating initial population(s)...
DONE.
 -= START OF EVOLUTION =- 
   	     	           fitness            	             size             
   	     	------------------------------	------------------------------
gen	evals	min   	avg   	max   	std   	min	avg   	max	std   
1  	2000 	0.1663	0.6675	1.1616	0.2429	2  	8.0545	21 	3.8996
Best individuals of this generation:
aq(add(add(ARG0, ARG2), ARG3), c)
2  	2000 	0.1663	0.4828	0.6572	0.0924	2  	7.7195	22 	3.9847
Best individuals of this generation:
aq(add(add(ARG0, ARG2), ARG3), c)
3  	2000 	0.1663	0.4286	0.5213	0.0732	2  	7.942 	25 	4.1436
Best individuals of this generation:
aq(add(add(ARG0, ARG2), ARG3), c)
4  	2000 	0.1617	0.389 	0.4812	0.0637	2  	8.824 	28 	4.4514
Best individuals of this generation:
mul(c, add(add(ARG2, ARG3), add(ARG1, ARG0)))
5  	2000 	0.1617	0.3546	0.4292	0.0545	2  	9.465 	28 	4.8756
Best individuals of this generation:
mul(c, add(add(ARG2, ARG3), add(ARG1, ARG0)))
6  	2000 	0.1617	0.3282	0.3986	0.04

[36m(pid=gcs_server)[0m [2026-01-13 15:07:38,404 E 2414 2414] (gcs_server) gcs_server.cc:303: Failed to establish connection to the event+metrics exporter agent. Events and metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14


7  	2000 	0.1617	0.3078	0.3661	0.043 	3  	11.3225	32 	5.2708
Best individuals of this generation:
mul(c, add(add(ARG2, ARG3), add(ARG1, ARG0)))


[33m(raylet)[0m [2026-01-13 15:07:40,702 E 2562 2562] (raylet) main.cc:1032: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14


8  	2000 	0.1617	0.2892	0.3431	0.0391	3  	12.388 	33 	5.5686
Best individuals of this generation:
mul(c, add(add(ARG2, ARG3), add(ARG1, ARG0)))


[36m(compute_attributes pid=2726)[0m [2026-01-13 15:07:42,777 E 2726 2851] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
[2026-01-13 15:07:43,034 E 2361 2722] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14


9  	2000 	0.1511	0.273 	0.3322	0.0355	5  	13.485 	39 	5.4463
Best individuals of this generation:
mul(c, add(ARG3, add(add(ARG0, ARG2), add(ARG1, ARG3))))
10 	2000 	0.1511	0.2572	0.3142	0.0305	5  	13.8875	39 	5.6526
Best individuals of this generation:
mul(c, add(ARG3, add(add(ARG0, ARG2), add(ARG1, ARG3))))
11 	2000 	0.1511	0.245 	0.287 	0.0266	5  	13.785 	36 	5.5292
Best individuals of this generation:
mul(c, add(ARG3, add(add(ARG0, ARG2), add(ARG1, ARG3))))
12 	2000 	0.1511	0.2354	0.2715	0.0245	5  	13.875 	35 	5.4484
Best individuals of this generation:
mul(c, add(ARG3, add(add(ARG0, ARG2), add(ARG1, ARG3))))
13 	2000 	0.1511	0.2256	0.2577	0.0225	5  	14.247 	34 	5.6068
Best individuals of this generation:
mul(c, add(ARG3, add(add(ARG0, ARG2), add(ARG1, ARG3))))
14 	2000 	0.1511	0.2168	0.2453	0.0203	5  	14.9135	38 	5.7694
Best individuals of this generation:
mul(c, add(ARG3, add(add(ARG0, ARG2), add(ARG1, ARG3))))
15 	2000 	0.1511	0.2088	0.2325	0.0186	5  	15.1545	44 	5.9342
Best indi

[36m(compute_attributes pid=2737)[0m [2026-01-13 15:07:42,982 E 2737 3416] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14[32m [repeated 15x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)[0m
