# Symbolic regression of a dynamical system

In this example, Kozax is applied to recover the state equations of the Lotka-Volterra system. The candidate solutions are integrated as a system of differential equations, after which the predictions are compared to the true observations to determine a fitness score.

In [1]:
# Specify the cores to use for XLA
import os
os.environ["XLA_FLAGS"] = '--xla_force_host_platform_device_count=10'

import jax
import diffrax
import jax.numpy as jnp
import jax.random as jr
import diffrax

from kozax.genetic_programming import GeneticProgramming
from kozax.fitness_functions.ODE_fitness_function import ODEFitnessFunction
from kozax.environments.SR_environments.lotka_volterra import LotkaVolterra

These device(s) are detected:  [CpuDevice(id=0), CpuDevice(id=1), CpuDevice(id=2), CpuDevice(id=3), CpuDevice(id=4), CpuDevice(id=5), CpuDevice(id=6), CpuDevice(id=7), CpuDevice(id=8), CpuDevice(id=9)]


First the data is generated, consisting of initial conditions, time points and the true observations. Kozax provides the Lotka-Volterra environment, which is integrated with Diffrax.

In [2]:
def get_data(key, env, dt, T, batch_size=20):
    x0s = env.sample_init_states(batch_size, key)
    ts = jnp.arange(0, T, dt)

    solver = diffrax.Dopri5()
    dt0 = 0.001
    saveat = diffrax.SaveAt(ts=ts)

    system = diffrax.ODETerm(env.drift)

    def solve(ts, x0):
        # Solve the system given an initial conditions
        sol = diffrax.diffeqsolve(system, solver, ts[0], ts[-1], dt0, x0, saveat=saveat, max_steps=500, 
                                  adjoint=diffrax.DirectAdjoint(), stepsize_controller=diffrax.PIDController(atol=1e-7, rtol=1e-7, dtmin=0.001), args=jnp.array([0.0]))
        
        return sol.ys

    ys = jax.vmap(solve, in_axes=[None, 0])(ts, x0s) #Parallelize over the batch dimension
    
    return x0s, ts, ys

key = jr.PRNGKey(0)
data_key, gp_key = jr.split(key)

T = 30
dt = 0.2
env = LotkaVolterra()

# Simulate the data
data = get_data(data_key, env, dt, T, batch_size=4)
x0s, ts, ys = data

For the fitness function, we used the ODEFitnessFunction that uses Diffrax to integrate candidate solutions. It is possible to select the solver, time step, number of steps and a stepsize controller to balance efficiency and accuracy. To ensure convergence of the genetic programming algorithm, constant optimization is applied to the best candidates at every generation. The constant optimization is performed with a couple of simple evolutionary steps that adjust the values of the constants in a candidate. The hyperparameters that define the constant optimization are `constant_optimization_N_offspring` (number of candidates with different constants should be sampled for each candidate), `constant_optimization_steps` (number of iterations of constant optimization for each candidate), `optimize_constants_elite` (number of candidates that constant optimization is applied to), `constant_step_size_init` (initial value of the step size for sampling constants) and `constant_step_size_decay` (the rate of decrease of the step size over generations).

In [3]:
#Define the nodes and hyperparameters
operator_list = [
        ("+", lambda x, y: jnp.add(x, y), 2, 0.5), 
        ("-", lambda x, y: jnp.subtract(x, y), 2, 0.1), 
        ("*", lambda x, y: jnp.multiply(x, y), 2, 0.5), 
    ]

variable_list = [["x" + str(i) for i in range(env.n_var)]]
layer_sizes = jnp.array([env.n_var])

population_size = 100
num_populations = 10
num_generations = 50

#Initialize the fitness function and the genetic programming strategy
fitness_function = ODEFitnessFunction(solver=diffrax.Dopri5(), dt0 = 0.01, stepsize_controller=diffrax.PIDController(atol=1e-6, rtol=1e-6, dtmin=0.001), max_steps=300)

strategy = GeneticProgramming(num_generations, population_size, fitness_function, operator_list, variable_list, layer_sizes, num_populations = num_populations,
                        constant_optimization_method="evolution", constant_optimization_N_offspring = 50, constant_optimization_steps = 3, 
                        optimize_constants_elite=100, constant_step_size_init=0.1, constant_step_size_decay=0.99)

Input data should be formatted as: ['x0', 'x1'].


In [4]:
# Sample the initial population
population = strategy.initialize_population(gp_key)

# Define the number of timepoints to include in the data
end_ts = int(ts.shape[0]/2)

for g in range(num_generations):
    if g == 25: # After 25 generations, use the full data
        end_ts = ts.shape[0]

    key, eval_key, sample_key = jr.split(key, 3)
    # Evaluate the population on the data, and return the fitness
    fitness, population = strategy.evaluate_population(population, (x0s, ts[:end_ts], ys[:,:end_ts]), eval_key)

    if (g%5)==0:
        print("Generation:", g)
        strategy.print_pareto_front()

    # Evolve the population until the last generation. The fitness should be given to the evolve function.
    if g < (num_generations-1):
        population = strategy.evolve_population(population, fitness, sample_key)

Generation: 0
Complexity: 2, fitness: 6.447543621063232, equations: [1.53, -0.844]
Complexity: 6, fitness: 1.7761852741241455, equations: [-1.19*x0, -1.13*x1]
Complexity: 8, fitness: 1.7118381261825562, equations: [-1.67*x0, -0.306*x0*x1]
Complexity: 10, fitness: 1.64039945602417, equations: [-x0 - 0.941, x0 - x1 + 1.43]
Complexity: 12, fitness: 1.3394683599472046, equations: [1.29 - 2.42*x0, 0.0769 - 0.336*x1]
Generation: 5
Complexity: 2, fitness: 2.925888776779175, equations: [-0.708, -0.546]
Complexity: 4, fitness: 2.0265915393829346, equations: [-0.999*x0, -0.731]
Complexity: 6, fitness: 1.4680837392807007, equations: [1.11 - x0, -0.281*x1]
Complexity: 8, fitness: 1.3401834964752197, equations: [-x0*x1 + 1.56, -0.289*x1]
Complexity: 10, fitness: 1.306777000427246, equations: [-2.15*x0, x0 - 0.412*x1]
Complexity: 12, fitness: 1.2634639739990234, equations: [0.87 - 1.69*x0, 0.653*x0 - 0.499*x1]
Complexity: 14, fitness: 1.222165822982788, equations: [1.18 - 2.26*x0, x0 - 0.459*x1 - 0.

Kozax provides a fit function that receives the data and a random key. However, it is also possible to run Kozax with an easy loop consisting of evaluating and evolving. This is useful as different input data can be provided during evaluation. In symbolic regression of dynamical systems, it helps to first optimize on a small part of the time points, and provide the full data trajectories only after a couple of generations.

In [5]:
strategy.print_pareto_front()

Complexity: 2, fitness: 2.8840174674987793, equations: [-0.611, -0.753]
Complexity: 4, fitness: 2.0265915393829346, equations: [-0.999*x0, -0.731]
Complexity: 6, fitness: 1.4009642601013184, equations: [-1.68*x0, -0.271*x1]
Complexity: 8, fitness: 1.3372949361801147, equations: [-x0*x1 + 0.605, -0.309*x1]
Complexity: 10, fitness: 1.2822456359863281, equations: [-0.389*x0*x1 + 0.487, -0.289*x1]
Complexity: 12, fitness: 1.231080174446106, equations: [0.836 - 2.17*x0, x0 - 0.487*x1]
Complexity: 14, fitness: 1.2212458848953247, equations: [1.11 - 2.28*x0, x0 - 0.452*x1 - 0.264]
Complexity: 16, fitness: 1.0084158182144165, equations: [-0.383*x0*x1 + x0, x0 - 0.326*x1 - 0.337]
Complexity: 18, fitness: 0.1432454138994217, equations: [-0.382*x0*x1 + x0, 0.0999*x0*x1 - 0.429*x1]


Instead of using evolution to optimize the constants, Kozax also offers gradient-based optimization. For gradient optimization, it is possible to specify the optimizer, the number of candidates to apply constant optimization to, the initial learning rate and the learning rate decay over generation. These two methods are provided as either can be more effective or efficient for different problems.

In [6]:
import optax

strategy = GeneticProgramming(num_generations, population_size, fitness_function, operator_list, variable_list, layer_sizes, num_populations = num_populations,
                        constant_optimization_method="gradient", constant_optimization_steps = 15, optimizer_class = optax.adam,
                        optimize_constants_elite=100, constant_step_size_init=0.025, constant_step_size_decay=0.95)

Input data should be formatted as: ['x0', 'x1'].


In [8]:
key = jr.PRNGKey(0)
data_key, gp_key = jr.split(key)

T = 30
dt = 0.2
env = LotkaVolterra()

# Simulate the data
data = get_data(data_key, env, dt, T, batch_size=4)
x0s, ts, ys = data

# Sample the initial population
population = strategy.initialize_population(gp_key)

# Define the number of timepoints to include in the data
end_ts = int(ts.shape[0]/2)

for g in range(num_generations):
    if g == 25: # After 25 generations, use the full data
        end_ts = ts.shape[0]

    key, eval_key, sample_key = jr.split(key, 3)
    # Evaluate the population on the data, and return the fitness
    fitness, population = strategy.evaluate_population(population, (x0s, ts[:end_ts], ys[:,:end_ts]), eval_key)

    if (g%5)==0:
        print("Generation:", g)
        strategy.print_pareto_front()

    # Evolve the population until the last generation. The fitness should be given to the evolve function.
    if g < (num_generations-1):
        population = strategy.evolve_population(population, fitness, sample_key)

Generation: 0
Complexity: 2, fitness: 6.447543621063232, equations: [1.53, -0.844]
Complexity: 6, fitness: 1.7973437309265137, equations: [-1.12*x0, -1.23*x1]
Complexity: 8, fitness: 1.7219312191009521, equations: [-1.28*x0, -0.234*x0*x1]
Complexity: 10, fitness: 1.6546754837036133, equations: [-x0 - 1.02, x0 - x1 + 1.54]
Complexity: 12, fitness: 1.3882397413253784, equations: [1.41 - 2.33*x0, -0.226*x1 - 0.145]
Complexity: 16, fitness: 1.376958966255188, equations: [-0.971*x0*(x0 - 1.42) - 0.491, 0.325 - 0.378*x1]
Generation: 5
Complexity: 2, fitness: 3.1813440322875977, equations: [-0.415, -0.330]
Complexity: 4, fitness: 2.006434440612793, equations: [0.633 - x0, -0.706]
Complexity: 6, fitness: 1.3983227014541626, equations: [-1.7*x0, -0.291*x1]
Complexity: 8, fitness: 1.3975510597229004, equations: [-2.34*x0, -0.289*x1]
Complexity: 10, fitness: 1.345879316329956, equations: [1.19 - 2.32*x0, -0.283*x1]
Complexity: 12, fitness: 1.3137644529342651, equations: [-0.974*x0*x1 + 1.05, 1.85

In [None]:
strategy.print_pareto_front()

Complexity: 2, fitness: 2.889601707458496, equations: [-0.789, -0.746]
Complexity: 4, fitness: 2.231682300567627, equations: [1.05 - x0, -0.336]
Complexity: 6, fitness: 1.392594814300537, equations: [-1.95*x0, -0.291*x1]
Complexity: 8, fitness: 1.3738036155700684, equations: [-1.62*x0, x0 - 0.407*x1]
Complexity: 10, fitness: 1.3001673221588135, equations: [-0.276*x0*x1, x0 - 0.386*x1]
Complexity: 12, fitness: 1.226672649383545, equations: [-0.274*x0*x1 + 0.218, x0 - 0.407*x1]
Complexity: 14, fitness: 0.916778028011322, equations: [x0*(1.33 - 0.49*x1), 0.311*x0 - 0.327*x1]
Complexity: 16, fitness: 0.04582058638334274, equations: [2*x0*(0.537 - 0.202*x1), x1*(0.103*x0 - 0.415)]
Complexity: 18, fitness: 0.04011291265487671, equations: [1.86*x0*(0.574 - 0.215*x1), x1*(0.103*x0 - 0.415)]
