In [1]:
import os

import random
from functools import partial
from decimal import Decimal

from deap import base
from deap import creator
from deap import tools
from deap_ga import *
from utils import *

import numpy as np
import scipy.io as sio
import pysindy as ps
from tqdm import trange

### GA

In [2]:
n_poly = 2
n_derivatives = 3
n_modules = 8

### Data

In [3]:
data_path = "./Datasets/"
data = sio.loadmat(os.path.join(data_path, "burgers.mat"))
u_clean = (data['usol']).real; u = u_clean.copy()
x = (data['x'][0]).real
t = (data['t'][:,0]).real
dt = t[1]-t[0]; dx = x[2]-x[1]
xt = np.array([x.reshape(-1, 1), t.reshape(1, -1)], dtype=object)
X, T = np.meshgrid(x, t)
XT = np.asarray([X, T]).T

In [4]:
function_library = ps.PolynomialLibrary(degree=n_poly, include_bias=False)

weak_lib = ps.WeakPDELibrary(
    function_library=function_library,
    derivative_order=n_derivatives,
    spatiotemporal_grid=XT,
    include_bias=True,
    diff_kwargs={"is_uniform":True},
    K=10000
)

X_pre = np.array(weak_lib.fit_transform(np.expand_dims(u, -1)))
y_pre = weak_lib.convert_u_dot_integral(np.expand_dims(u, -1))

In [5]:
base_poly = np.array([[p, 0] for p in range(1, n_poly+1)])
base_derivative = np.array([[0, d] for d in range(1, n_derivatives+1)])
modules = [(0, 0)] if weak_lib.include_bias else []
modules += [(p, 0) for p in range(1, n_poly+1)] + \
            [(0, d) for d in range(1, n_derivatives+1)] + \
            [tuple(p+d) for d in base_derivative for p in base_poly]
assert len(modules) == len(weak_lib.get_feature_names())
base_features = dict(zip(modules, X_pre.T))
u_t = y_pre.copy()

In [6]:
add_mutate = partial(add_mutate, n_poly=n_poly, n_derivatives=n_derivatives)
order_mutate = partial(order_mutate, n_poly=n_poly, n_derivatives=n_derivatives)
module_mutate = partial(module_mutate, n_poly=n_poly, n_derivatives=n_derivatives)

creator.create("FitnessMin", base.Fitness, weights=(-1.0, -1.0))
creator.create("Individual", frozenset, fitness=creator.FitnessMin)

toolbox = base.Toolbox()
toolbox.register("PdeModule", generate_module, n_poly, n_derivatives)
toolbox.register("individual", myInitRepeat, 
                 creator.Individual, toolbox.PdeModule, n_modules)
toolbox.register("population", tools.initRepeat, 
                 set, toolbox.individual)

mse = lambda _ : (partial(evaluate_genome, base_features=base_features, target=u_t, epsilon=0)(_)[0],)
n_pop = 300
pop = toolbox.population(n=n_pop)
epi = 10**(sci_format(np.median([f[0] for f in list(map(mse, pop))]))[1])
penalized_mse = partial(evaluate_genome, base_features=base_features, target=u_t, epsilon=epi)

toolbox.register("evaluate", penalized_mse)
toolbox.register("cross", crossover_permutation)
toolbox.register("add_mutate", add_mutate)
toolbox.register("del_mutate", del_mutate)
toolbox.register("order_mutate", module_mutate) # order_mutate
toolbox.register("select", tools.selNSGA2)

for ind in pop:
    ind.fitness.values = toolbox.evaluate(ind)

In [7]:
n_generations = 100
MU = 100
add_rate, del_rate, order_rate = 0.4, 0.5, 0.4
pop = toolbox.select(pop, len(pop))

print("Learning PDEs...")
for _ in trange(n_generations):
    offspring = tools.selTournament(pop, len(pop), tournsize=2)
    
    for i in range(0, len(offspring)-1, 2):
        offspring[i], offspring[i+1] = toolbox.cross(offspring[i], offspring[i+1])

    for i in range(len(offspring)):
        if random.random() < add_rate:
            offspring[i], = toolbox.add_mutate(offspring[i])
        if random.random() < del_rate:
            offspring[i], = toolbox.del_mutate(offspring[i])
        if random.random() < order_rate:
            offspring[i], = toolbox.order_mutate(offspring[i])

    for i in range(len(offspring)-1, -1, -1):
        if len(offspring[i]) > 0:
            offspring[i] = creator.Individual(offspring[i])
            offspring[i].fitness.values = toolbox.evaluate(offspring[i])
        else:
            del offspring[i]
    offspring = list(set(offspring))
    
    pop = toolbox.select(pop + offspring, MU)

pop = sorted(set(pop), key=lambda _: sum(_.fitness.values))
max_ss = max([len(p) for p in pop]) # for best subset selection
pop[0], pop[0].fitness.values[0]

Learning PDEs...


100%|█████████████████████████████████████████| 100/100 [00:04<00:00, 22.59it/s]


(frozenset({(0, 2), (1, 1)}), 5.492652162115227e-10)