In [1]:
# %% import modules
import geppy as gep
from deap import creator, base, tools
import numpy as np
import random
import operator 
import pickle
from fractions import Fraction
import scipy.io as scio
import time
import sympy as sp
import copy
import DHC_MGEP as dm

# For reproduction
s = 0
random.seed(s)
np.random.seed(s)

In [2]:
# %% define functions for tensor operators 
def tensor_add(a, b):
    if isinstance(a, np.ndarray) and isinstance(b, np.ndarray) and a.shape == b.shape:
        return a + b
    else:
        return False

def tensor_sub(a, b):
    if isinstance(a, np.ndarray) and isinstance(b, np.ndarray) and a.shape == b.shape:
        return a - b
    else:
        return False

def tensor_inner_product(a, b):
    if isinstance(a, np.ndarray) and isinstance(b, np.ndarray) and a.shape == b.shape:
        result = np.empty_like(a)
        for i in range(a.shape[0]):
            result[i] = np.matmul(a[i], b[i])
        return result
    else:
        return False

def p_(tensor):
    '''
    need to define a global variable ---- plasmid_library through function Y_pretend
    '''
    if isinstance(tensor, np.ndarray):
        scalar = eval(plasmid_library.pop(0))
        if isinstance(scalar, np.ndarray):
            return scalar[:, None] * tensor
        return tensor * scalar
    else:
        return False


def p_symbol(tensor):
    '''
    need to define a global variable ---- plasmid_library through function: my_compile
    '''
    scalar_expression = plasmid_library.pop(0)
    return tensor * scalar_expression
        
# define a protected division to avoid dividing by zero
def protected_div(x1, x2):
    if abs(x2) < 1e-10:
        return 1
    return x1 / x2


def add(a,b):
    return a + b

def sub(a,b):
    return a - b

def mul(a,b):
    return a * b

symbolic_function_map = {
        'tensor_add': operator.add,
        'tensor_sub': operator.sub,
        'tensor_inner_product': operator.mul,
        'p_': p_symbol,
        operator.add.__name__: operator.add,
        operator.sub.__name__: operator.sub,
        operator.mul.__name__: operator.mul,
        'protected_div': operator.truediv,
    }



In [3]:
# %% generate data

# target equation: tau_ij = 2*miu*S_ij - (2/3)*miu*D_ii*delta_ij
#                         = miu*(u_i__j+u_j__i) - (2/3)*miu*D_ii*delta_ij
# 即静压为0时的牛顿流体本构方程
data = scio.loadmat('data/Taylor_vortex_flow.mat')

u_x = data['u_x'] # u_x__x, s-1
u_y = data['u_y'] # u_x__y, s-1
v_x = data['v_x'] # u_y__x, s-1
v_y = data['v_y'] # u_y__y, s-1

# subsample
indices = np.random.choice(u_x.shape[0], 100, replace=False)
u_x = u_x[indices]
u_y = u_y[indices]
v_x = v_x[indices]
v_y = v_y[indices]

u_i__j = np.stack((np.stack((u_x, u_y), axis=1), np.stack((v_x, v_y), axis=1)), axis=2)
u_j__i = np.stack((np.stack((u_x, v_x), axis=1), np.stack((u_y, v_y), axis=1)), axis=2)
u_i__j = u_i__j.reshape((len(u_x), 2, 2))
u_j__i = u_j__i.reshape((len(u_x), 2, 2))

S_ij = 0.5 * (u_i__j + u_j__i)
Omega_ij = 0.5 * (u_i__j - u_j__i)

df_c = 1.399e-05 # Diffusion coefficient
miu = 2.079e-5 # Viscosity coefficient

#u_ij的两个主不变量
D_ii = u_x + v_y
det___D_ij = u_x * v_y - u_y * v_x

shape = (len(u_x),1)
ones = np.ones(shape, dtype = np.float64)
zeros = np.zeros(shape, dtype = np.float64)
delta_ij = np.stack((np.stack((ones, zeros), axis=1), np.stack((zeros, ones), axis=1)), axis=2)
delta_ij = delta_ij.reshape((len(u_x), 2, 2))
                            
tau_ij = - (2/3)*miu*D_ii[:, None]*delta_ij

def add_noise(array, snr):
    mean_power = np.mean(array**2)
    noise_power = mean_power / (10**(snr / 10))
    noise = np.random.normal(0, np.sqrt(noise_power), array.shape)
    noisy_array = array + noise
    return noisy_array

def add_gaussian_noise(array, mean=0, std=1):
    noise = np.random.normal(mean, std, array.shape)
    return array + noise

def add_uniform_noise(array, low=-1, high=1):
    noise = np.random.uniform(low, high, array.shape)
    return array + noise

Y = tau_ij

In [4]:
# %% Assign number tags

# Assign prime number tags to base dimensions
L,M,T,I,Theta,N,J = 2,3,5,7,11,13,17

# Derive the tags for dirived physical quantities according to their dimensions
# Note that the tags are always in the form of fractions, instead of floats, which avoids introducing any truncation error. 
# Therefore, we use 'Fraction' function here.
dict_of_dimension = {'S_ij':Fraction(1,T),
                     'Omega_ij':Fraction(1,T),
                     # 'u_i,j':Fraction(1,T),
                     # 'u_j,i':Fraction(1,T),
                     'delta_ij':Fraction(1),
                     'D_ii':Fraction(1,T),
                     'det___D_ij':Fraction(1,(T**2)),
                     'miu':Fraction(M,L*T),
                     'df_c':Fraction((L**2),T)} 

# Assign number tags to taget variable
target_dimension = Fraction(M,L*((T)**(2)))

In [5]:
# %% Creating the primitives set
# Define the operators
host_pset = gep.PrimitiveSet('Host', input_names=['S_ij','Omega_ij','delta_ij'])
host_pset.add_function(tensor_add, 2)
host_pset.add_function(tensor_sub, 2)
host_pset.add_function(tensor_inner_product, 2)
host_pset.add_function(p_, 1)


plasmid_pset = gep.PrimitiveSet('Plasmid', input_names=['D_ii','det___D_ij'])
plasmid_pset.add_symbol_terminal('df_c', df_c)
plasmid_pset.add_symbol_terminal('miu', miu)
plasmid_pset.add_function(operator.add, 2)
plasmid_pset.add_function(operator.sub, 2)
plasmid_pset.add_function(operator.mul, 2)
plasmid_pset.add_function(protected_div, 2)
plasmid_pset.add_rnc_terminal() # Add random numerical constants (RNC).
# %% Create the individual and population

# Define the indiviudal class, a subclass of gep.Chromosome
creator.create("FitnessMax", base.Fitness, weights=(1,))  # weights=(-1,)/weights=(1,) means to minimize/maximize the objective (fitness).
creator.create("Host_Individual", gep.Chromosome, fitness=creator.FitnessMax, plasmid=[])
creator.create("Plasmid_Individual", gep.Chromosome) 

# Register the individual and population creation operations
h = 15           # head length
n_genes = 2      # number of genes in a chromosome
r = 15          # length of the RNC array
# enable_ls = True # whether to apply the linear scaling technique

toolbox = gep.Toolbox()

toolbox.register('host_gene_gen', gep.Gene, pset=host_pset, head_length=h)
toolbox.register('host_individual', creator.Host_Individual, gene_gen=toolbox.host_gene_gen, n_genes=n_genes, linker=tensor_add)
toolbox.register("host_population", tools.initRepeat, list, toolbox.host_individual)

def random_float(a, b):
    # Generate floating-point numbers that retain five decimal places
    num = random.uniform(a, b)
    num = round(num, 5)
    return num
    
toolbox.register('rnc_gen', random_float, a=-5, b=5)   # each RNC is random float within [-0.2, 0.2]
toolbox.register('plasmid_gene_gen', gep.GeneDc, pset=plasmid_pset, head_length=h, rnc_gen=toolbox.rnc_gen, rnc_array_length=r)
toolbox.register('plasmid_individual', creator.Plasmid_Individual, gene_gen=toolbox.plasmid_gene_gen, n_genes=n_genes, linker=operator.add)
toolbox.register("plasmid_population", dm.plasmid_generate, toolbox.plasmid_individual)


In [6]:
def my_compile(Individual, usage):
    global plasmid_library
    if usage == 'show':
        plasmid_library = []
        for plasmid in Individual.plasmid:
            scalar_expression = gep.simplify(plasmid, symbolic_function_map)
            plasmid_library.append(scalar_expression)
        tensor_expression = gep.simplify(Individual, symbolic_function_map)
        return str(tensor_expression)
    else:
        raise ValueError("usage should be 'dimensional_verification','fitness_evaluation', or 'show'. ")

def Y_pretend(individual):
    global plasmid_library
    plasmid_library = [str(plasmid) for plasmid in individual.plasmid]
    host_str = str(individual)
    return eval(host_str)

def dimensional_verification(individual, dict_of_dimension, target_dimension):
    plasmid_library = [str(plasmid).replace('\t','').replace('\n','')
                       .replace('add','my_add').replace('sub','my_sub')
                       .replace('mul','my_mul').replace('truediv','my_truediv')
                       .replace('protected_div','my_protected_div')
                       for plasmid in individual.plasmid]
    
    def my_add(a,b):

        if isinstance(a,bool) or isinstance(b,bool):
            return False
        if isinstance(a,int) or isinstance(a,float):
            a = Fraction(1)
        if isinstance(b,int) or isinstance(b,float):
            b = Fraction(1)

        if a == b:
            return a
        else:
            return False

    def my_sub(a,b):
        
        if isinstance(a,bool) or isinstance(b,bool):
            return False
        if isinstance(a,int) or isinstance(a,float):
            a = Fraction(1)
        if isinstance(b,int) or isinstance(b,float):
            b = Fraction(1)

        if a == b:
            return a
        else:
            return False

    def my_mul(a,b):
        
        if isinstance(a,bool) or isinstance(b,bool):
            return False
        if isinstance(a,int) or isinstance(a,float):
            a = Fraction(1)
        if isinstance(b,int) or isinstance(b,float):
            b = Fraction(1)
            
        return a * b

    def my_truediv(a,b):
        
        if isinstance(a,bool) or isinstance(b,bool) or b == 0:
            return False
        if isinstance(a,int) or isinstance(a,float):
            a = Fraction(1)
        if isinstance(b,int) or isinstance(b,float):
            b = Fraction(1)

        return a / b

    def my_protected_div(a,b):
        
        if isinstance(a,bool) or isinstance(b,bool) or b == 0:
            return False
        if isinstance(a,int) or isinstance(a,float):
            a = Fraction(1)
        if isinstance(b,int) or isinstance(b,float):
            b = Fraction(1)

        return a / b
    
    def my_p_(tensor):
        if isinstance(tensor, bool):
            return False
        scalor = plasmid_library.pop(0)
        scalor = eval(scalor, create_var)
        return scalor * tensor

        
    create_var = locals()
    create_var.update(dict_of_dimension)
    namespace = {'my_add': my_add, 'my_sub': my_sub,'my_mul': my_mul,'my_truediv': my_truediv, 'my_protected_div': my_protected_div}
    create_var.update(namespace)
    
    individual_expr = str(individual).replace('\t','').replace('\n','').replace('tensor_add','my_add').replace('tensor_sub','my_sub').replace('tensor_inner_prduct','my_mul').replace('p_','my_p_')
    dimension_of_DDEq = eval(individual_expr)
    if dimension_of_DDEq == target_dimension:
        return True
    else:
        return False
        
toolbox.register('compile',my_compile)
toolbox.register('dimensional_verification',dimensional_verification)

In [7]:
# %% Define the loss function

# Register the dimensional verification operation
# Define the fitness for individuals that apply the linear scaling technique
def evaluate(individual):
    """
    First verify whether the individuals satisfy dimensional homogeneity.
    If it is not dimensional homogeneous, we would identify it as an invalid individual and directly assign a significant error to it.
    Otherwise, we would apply linear scaling (ls) to the individual, 
    and then evaluate its loss: MRE (mean relative error)
    """
    validity = toolbox.dimensional_verification(individual, dict_of_dimension, target_dimension)
    if not validity:
        return -1.,
    else:
        Yp = Y_pretend(individual)
        print(individual)
        print(individual.plasmid[0])
        print(individual.plasmid[1])
        print(Yp)
        if isinstance(Yp, np.ndarray):
            sum = 0
            for k in range(Y.shape[0]):
                y = Y[k]
                yp = Yp[k]
                sum += (
                    (y[0][0] * yp[0][0] + y[0][1] * yp[0][1] + y[1][0] * yp[1][0] + y[1][1] * yp[1][1]) / 
                    (      (abs((y[0][0])**2 + 2 * y[0][1] * y[1][0] + (y[1][1])**2)**0.5) *
                    (abs((yp[0][0])**2 + 2 * yp[0][1] * yp[1][0] + (yp[1][1])**2)**0.5)     )
                )
            fitness = sum / Y.shape[0]
            print(fitness)
            return fitness,
            
        else:
            raise ValueError("type of Yp must be np.ndarray.")

# todo: ls-evaluate
toolbox.register('evaluate', evaluate)

In [8]:
# %% Register genetic operators
toolbox.register('select', tools.selTournament, tournsize=2) # Selection operator
# 1. general operators
toolbox.register('mut_uniform', gep.mutate_uniform, pset=host_pset, ind_pb=0.05, pb=1)
toolbox.register('mut_invert', gep.invert, pb=0.1)
toolbox.register('mut_is_transpose', gep.is_transpose, pb=0.1)
toolbox.register('mut_ris_transpose', gep.ris_transpose, pb=0.1)
# toolbox.register('mut_gene_transpose', gep.gene_transpose, pb=0.1)
toolbox.register('cx_1p', gep.crossover_one_point, pb=0.3)
toolbox.register('cx_2p', gep.crossover_two_point, pb=0.2)
toolbox.register('cx_gene', gep.crossover_gene, pb=0.1)
# 2. Dc-specific operators
toolbox.register('mutdc_dc', gep.mutate_uniform_dc, ind_pb=0.05, pb=1)
toolbox.register('mutdc_invert_dc', gep.invert_dc, pb=0.1)
toolbox.register('mutdc_transpose_dc', gep.transpose_dc, pb=0.1)
toolbox.register('mutdc_rnc_array_dc', gep.mutate_rnc_array_dc, rnc_gen=toolbox.rnc_gen, ind_pb='0.5p')
toolbox.pbs['mutdc_rnc_array_dc'] = 1 

# %% Statistics to be inspected
stats = tools.Statistics(key=lambda ind: ind.fitness.values[0])
stats.register("avg", np.mean)
# stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)

# %% Launch evolution

# Define size of population and number of generations
n_pop = 5000             # Number of individuals in a host_population
n_gen = 1000000             # Maximum Generation
tol = 1e-4               # Threshold to terminate the evolution
output_type = 'Constitutive_equation_of_Newtonian_fluid'     # Name of the problem
isRestart = False        # 'True'/'False' for initializing the population with given/random individuals.

# If isRestart is 'True', read the given .pkl file to load the individuals as the first generation population.
# If isRestart is 'False', initialize the first generation population with random individuals.
if isRestart:
    with open("pkl/Constitutive_equation_of_Newtonian_fluid.pkl",'rb') as file:
        host_pop  = pickle.loads(file.read())
        for ind in host_pop:
            print(ind)
else:
    host_pop = toolbox.host_population(n=n_pop) 
    plasmid_pop = toolbox.plasmid_population(host_pop)
    for ind_host, ind_plasmid in zip(host_pop, plasmid_pop):
        ind_host.plasmid = ind_plasmid 

# Only record the best three individuals ever found in all generations
champs = 3 
hof = tools.HallOfFame(champs)   



In [9]:
# Evolve
start_time = time.time()
pop, log = dm.gep_simple(host_pop, plasmid_pop, toolbox, n_generations=n_gen, n_elites=1,
                          stats=stats, hall_of_fame=hof, verbose=True,tolerance = tol,GEP_type = output_type)

gen	nevals	avg	min	max
0  	5000  	-1 	-1 	-1 
1  	4999  	-1 	-1 	-1 
2  	4999  	-1 	-1 	-1 


KeyboardInterrupt: 