# Synthetic Data

# AI Feynman

Load imports and dataset

In [None]:
# Load AI Feynman 
from datasets import load_dataset
import torch 
import numpy as np
import os 
import sys 
import matplotlib.pyplot as plt
import re

ai_feynman_easy = load_dataset("yoshitomo-matsubara/srsd-feynman_easy") # 30 equations
ai_feynman_medium = load_dataset("yoshitomo-matsubara/srsd-feynman_medium") # 40 equations
ai_feynman_hard = load_dataset("yoshitomo-matsubara/srsd-feynman_hard") # 50 equations

Prep the dataset 

In [None]:
ai_feynman = ai_feynman_hard # change to easy or medium as needed
print(ai_feynman)

# dict containing the training data for each equation (every 8000 samples for train, 1000 for validation and test)
equation_data = {}
for i in range(0, len(ai_feynman["train"]) // 8000):
    X_train, y_train = [], []
    X_val, y_val = [], []
    X_test, y_test = [], []

    # Process the training, validation, and test data
    for train_data in ai_feynman["train"]["text"][i * 8000:(i + 1) * 8000]:
        train_values = train_data.split(" ")
        X_train.append([float(v) for v in train_values[:-1]])
        y_train.append(float(train_values[-1]))

    for val_data in ai_feynman["validation"]["text"][i * 1000:(i + 1) * 1000]:
        val_values = val_data.split(" ")
        X_val.append([float(v) for v in val_values[:-1]])
        y_val.append(float(val_values[-1]))

    for test_data in ai_feynman["test"]["text"][i * 1000:(i + 1) * 1000]:
        test_values = test_data.split(" ")
        X_test.append([float(v) for v in test_values[:-1]])
        y_test.append(float(test_values[-1]))

    # Store the data in the equation_data dictionary
    equation_data[i] = {
        "X_train": torch.Tensor(X_train),
        "y_train": torch.Tensor(y_train),
        "X_val": torch.Tensor(X_val),
        "y_val": torch.Tensor(y_val),
        "X_test": torch.Tensor(X_test),
        "y_test": torch.Tensor(y_test)
    }

print(f"Number of equations: {len(equation_data)}")
print(f"Shape of training data for first equation: {equation_data[1]['X_train'].shape}, {equation_data[1]['y_train'].shape}")
print(f"Shape of validation data for first equation: {equation_data[1]['X_val'].shape}, {equation_data[1]['y_val'].shape}")
print(f"Shape of test data for first equation: {equation_data[1]['X_test'].shape}, {equation_data[1]['y_test'].shape}")


Some metrics


In [None]:
def positive_r2(y_true, y_pred):
    ss_res = np.sum((y_true - y_pred) ** 2)
    ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
    if ss_tot == 0:
        ss_tot = 1e-10  # Avoid division by zero
    r2 = 1 - (ss_res / ss_tot)
    return r2

def mse_loss(y_true, y_pred):
    return np.mean((y_true - y_pred) ** 2)

def adjusted_r2(y_true, y_pred, n, k):
    r2 = positive_r2(y_true, y_pred)
    adj_r2 = 1 - (1 - r2) * (n - 1) / (n - k - 1)
    return adj_r2

## GFN-SR

To run it with a LSTM RNN forward policy, make sure to set the policy to 'RNN' in the function call. For transformer forward policy use 'TRN'. 

In [None]:
sys.path.insert(0, os.path.join(os.getcwd(), 'gfn-sr'))
from train import train_gfn_sr_adapted

mse_scores = []
r2_scores = []
adjusted_r2_scores = []
for i in range(0, len(equation_data)): 
    print(f"\nProcessing equation {i}...")
    X_train = equation_data[i]["X_train"]
    y_train = equation_data[i]["y_train"]
    X_val = equation_data[i]["X_val"]
    y_val = equation_data[i]["y_val"]

    batch_size = 64
    num_epochs = 100

    try:
        gfn_sr, env, errs, avg_mses, top_mses = train_gfn_sr_adapted(X_train, y_train, batch_size, num_epochs, show_plot=False, use_gpu=True, policy="RNN", verbose=False)
    except Exception as e:
        print(f"Error training GFN-SR for equation {i}: {e}")
        continue
    expr_gfnsr_trn = str(env.reward_manager.best_expr)
    print(f"Best expression from GFN-SR: {expr_gfnsr_trn}")

    n_terms = X_val.shape[1]  
    eval_dict = {f"x{j+1}": X_val[:, j] for j in range(n_terms)}
    eval_dict.update({
        'cos': torch.cos,
        'sin': torch.sin,
        'exp': torch.exp,
        'sqrt': torch.sqrt,
        'log': torch.log,
        'inv': lambda x: 1/x,
        '0': torch.Tensor([0]),
        'square': torch.square,
    })
    if expr_gfnsr_trn.startswith("square(0)"):
        print(f"GFN-SR output is square(0) for equation {i}. Setting output to zero tensor.")
        y_gfnsr_trn = torch.zeros_like(y_val)
    else:
        y_gfnsr_trn = eval(expr_gfnsr_trn, eval_dict)
    
    # if it is a tensor and it contains nan -> 0
    if isinstance(y_gfnsr_trn, torch.Tensor) and torch.isnan(y_gfnsr_trn).any():
        print(f"GFN-SR output contains NaN for equation {i}. Setting NaN values to 0.")
        y_gfnsr_trn = torch.nan_to_num(y_gfnsr_trn)
    elif not isinstance(y_gfnsr_trn, torch.Tensor) and y_gfnsr_trn == 0:
        print(f"GFN-SR output is 0 for equation {i}. Setting output to zero tensor.")
        y_gfnsr_trn = torch.zeros_like(y_val)

    r2_score_gfnsr = positive_r2(np.array(y_val), np.array(y_gfnsr_trn))
    mse_score_gfnsr = mse_loss(np.array(y_val), np.array(y_gfnsr_trn))
    adj_r2_score_gfnsr = adjusted_r2(np.array(y_val), np.array(y_gfnsr_trn), len(y_val), n_terms)
    mse_scores.append(mse_score_gfnsr)
    r2_scores.append(r2_score_gfnsr)
    adjusted_r2_scores.append(adj_r2_score_gfnsr)
    print(f"GFN-SR R^2 score: {r2_score_gfnsr}")
    print(f"GFN-SR adjusted R^2 score: {adj_r2_score_gfnsr}")
    print(f"GFN-SR MSE loss: {mse_score_gfnsr}")


print(f"Average MSE score across all equations: {np.mean(mse_scores)}")
print(f"Average R^2 score across all equations: {np.mean(r2_scores)}")
print(f"Average adjusted R^2 score across all equations: {np.mean(adjusted_r2_scores)}")
print(f"Standard deviation of MSE scores: {np.std(mse_scores)}")
print(f"Standard deviation of R^2 scores: {np.std(r2_scores)}")
print(f"Standard deviation of adjusted R^2 scores: {np.std(adjusted_r2_scores)}")

# Post-processing
r2_gp_finite = [r2 for r2 in r2_scores if not (np.isinf(r2) or np.isnan(r2))]
r2_gp_capped = [max(0, r2) for r2 in r2_gp_finite]
print(f"R^2 scores for GPLearn (capped): {r2_gp_capped}")


## GPLearn
Has no GPU optimization so very slow.

In [None]:
from gplearn.genetic import SymbolicRegressor

mse_scores = []
r2_scores = []
adjusted_r2_scores = []
for i in range(0, len(equation_data)): 
    print(f"\nProcessing equation {i}...")
    X_train = equation_data[i]["X_train"]
    y_train = equation_data[i]["y_train"]
    X_val = equation_data[i]["X_val"]
    y_val = equation_data[i]["y_val"]

    est_gp = SymbolicRegressor(population_size=5000,
                        generations=20, stopping_criteria=0.01,
                        p_crossover=0.7, p_subtree_mutation=0.1,
                        p_hoist_mutation=0.05, p_point_mutation=0.1,
                        max_samples=0.9, verbose=0,
                        parsimony_coefficient=0.01, random_state=0)
    est_gp.fit(X_train, y_train)
    y_gp = est_gp.predict(X_val)

    r2_gp = positive_r2(np.array(y_val), np.array(y_gp))
    mse_gp = mse_loss(np.array(y_val), np.array(y_gp))
    mse_scores.append(mse_gp)
    r2_scores.append(r2_gp)
    expr_gp = est_gp._program
    
    print(f"Best expression from GPLearn: {expr_gp}")
    print(f"GPLearn R^2 score: {r2_gp}")
    print(f"GPLearn MSE loss: {mse_gp}")

print(f"Average MSE score across all equations: {np.mean(mse_scores)}")
print(f"Average R^2 score across all equations: {np.mean(r2_scores)}")
print(f"Standard deviation of MSE scores: {np.std(mse_scores)}")
print(f"Standard deviation of R^2 scores: {np.std(r2_scores)}")

# Post-processing
r2_gp_finite = [r2 for r2 in r2_scores if not (np.isinf(r2) or np.isnan(r2))]
r2_gp_capped = [max(0, r2) for r2 in r2_gp_finite]
print(f"R^2 scores for GPLearn (capped): {r2_gp_capped}")

## PySR
Ran with niterations = 10 for easy, 3 for medium and hard to save compute. 

In [None]:
from pysr import PySRRegressor

mse_scores = []
r2_scores = []
adjusted_r2_scores = []
for i in range(0, len(equation_data)): 
    print(f"\nProcessing equation {i}...")
    X_train = equation_data[i]["X_train"]
    y_train = equation_data[i]["y_train"]
    X_val = equation_data[i]["X_val"]
    y_val = equation_data[i]["y_val"]

    pysr = PySRRegressor(
        niterations=3,  # < Increase me for better results
        binary_operators=["+", "*"],
        unary_operators=[
            "cos",
            "exp",
            "sin",
            "inv(x) = 1/x",
            # ^ Custom operator (julia syntax)
        ],
        extra_sympy_mappings={"inv": lambda x: 1 / x},
        # ^ Define operator for SymPy as well
        elementwise_loss="loss(prediction, target) = (prediction - target)^2",
        # ^ Custom loss function (julia syntax)
        verbosity=0
    )
    pysr.fit(X_train, y_train)
    y_pysr = pysr.predict(X_val)
    expr_pysr = pysr.get_best().equation
    
    r2_pysr = positive_r2(np.array(y_val), np.array(y_pysr))
    mse_pysr = mse_loss(np.array(y_val), np.array(y_pysr))
    mse_scores.append(mse_pysr)
    r2_scores.append(r2_pysr)


    print(f"Best expression from PySR: {expr_pysr}")
    print(f"PySR R^2 score: {r2_pysr}")
    print(f"PySR MSE loss: {mse_pysr}")


print(f"Average MSE score across all equations: {np.mean(mse_scores)}")
print(f"Average R^2 score across all equations: {np.mean(r2_scores)}")
print(f"Standard deviation of MSE scores: {np.std(mse_scores)}")
print(f"Standard deviation of R^2 scores: {np.std(r2_scores)}")

r2_pysr_capped = [max(0, r2) for r2 in r2_scores]
print(f"R^2 scores for PySR (capped): {r2_pysr_capped}")

## RandomForest + DecisionTree

In [None]:
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor

r2_scores_rf = []
r2_scores_dt = []
for i in range(0, len(equation_data)): 
    print(f"\nProcessing equation {i}...")
    X_train = equation_data[i]["X_train"]
    y_train = equation_data[i]["y_train"]
    X_val = equation_data[i]["X_val"]
    y_val = equation_data[i]["y_val"]

    est_tree = DecisionTreeRegressor()
    est_tree.fit(X_train, y_train) 
    score_tree = est_tree.score(X_val, y_val)
    r2_scores_dt.append(score_tree)

    est_rf = RandomForestRegressor()
    est_rf.fit(X_train, y_train)
    score_rf = est_rf.score(X_val, y_val)
    r2_scores_rf.append(score_rf)

    print(f"Decision Tree R^2 score: {score_tree}")    
    print(f"RF R^2 score: {score_rf}")


print(f"Average R^2 score across all equations for RF: {np.mean(r2_scores_rf)}")
print(f"Average R^2 score across all equations for DT: {np.mean(r2_scores_dt)}")
print(f"Standard deviation of R^2 scores for RF: {np.std(r2_scores_rf)}")
print(f"Standard deviation of R^2 scores for DT: {np.std(r2_scores_dt)}")

r2_rf_capped = [max(0, r2) for r2 in r2_scores_rf]
r2_dt_capped = [max(0, r2) for r2 in r2_scores_dt]
print(f"R^2 scores for RF (capped): {r2_rf_capped}")
print(f"R^2 scores for DT (capped): {r2_dt_capped}")