In [1]:
import os
os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"] = ".20"

In [2]:
import pandas as pd
from model import Brain
from submodels import factories
import matplotlib.pyplot as plt
import pandas as pd
from jf.db import DB
from lib.plot import plot_2d_positions_colors
from lib.sde.grn2 import GRNMain2
from lib.sde.mutate import mutate_grn2
from jf.profiler import Profiler
from tqdm import tqdm

In [3]:
from itertools import accumulate
import numpy as np
from collections import defaultdict

In [4]:
HISTORY = defaultdict(dict)

In [5]:
END_TIME = 53

In [6]:
HALL_OF_FAME = []

In [7]:
REF = pd.read_csv("output/results/setup_basic/export/ref_basic.csv")  # ref is a mean

In [8]:
def individual_generator():
    return Solution(GRNMain2(5, 0, 5))

In [9]:
if False:
    grn = GRNMain2(5, 0, 5)
    ccls = factories["grn2"](grn=grn)

    bb = Brain(time_step=0.5, verbose=False, start_population=4, max_pop_size=1e3,
            cell_cls=ccls.generate, end_time=60, start_time=50, silent=False)
    bb.run()

# cell = bb.population[0]
# cell.cell_program.print_gene_info(2)

# cell.cell_program.grn.print_trees()

In [10]:
def r():
    return np.random.uniform(0, 1)

class Solution:
    def __init__(self, grn):
        self.grn = grn
        
    def copy(self):
        return Solution(self.grn.copy())
        
    def mutate(self):
        mutate_grn2(self.grn)

In [11]:
individual_generator().grn

>> G_0: init: 0.50; noise: 0.70; b: 2.07; m: 9.56; expr: 1.62; deg: 7.15; thr: 4.19; theta: 3.11; tree : (NOT NOT 2 AND 1)
>> G_1: init: 1.59; noise: 3.89; b: 7.35; m: 1.60; expr: 5.29; deg: 0.85; thr: 4.88; theta: 3.10; tree : NOT ((2 AND 3) OR 1)
>> G_2: init: 1.05; noise: 5.51; b: 7.72; m: 5.24; expr: 3.78; deg: 7.10; thr: 2.95; theta: 3.76; tree : (2 AND 4)
>> G_3: init: 1.16; noise: 1.79; b: 6.12; m: 1.36; expr: 0.92; deg: 9.04; thr: 8.00; theta: 8.93; tree : 3
>> G_4: init: 0.99; noise: 8.65; b: 5.82; m: 7.69; expr: 0.36; deg: 0.20; thr: 5.48; theta: 5.35; tree : 0

In [12]:
def shrink_and_align_stats(stats, ref, max_step=None):
    min_step = max(min(stats.time), min(ref.time))

    if max_step is None:
        max_step = max(ref.time)
    
    new_stats = stats[(stats.time >= min_step) & (stats.time <= max_step)].set_index("time")
    new_ref = ref[(ref.time >= min_step) & (ref.time <= max_step)].set_index("time")
    
    return new_stats, new_ref

In [13]:
# compare only the progenitor_pop_size
def evaluating_function(stats, ref, max_step=None):
    """
    The evaluation function for progenitor population size.
    The lower is the better
    :param stats: the stats of the bb after running
    """
    colname_ref = "size_type_Cycling"
    colname_stats = "progenitor_pop_size"
    # scale
    stats, ref = shrink_and_align_stats(stats, ref, max_step=max_step)
    
    min_step = min(ref.index)  # ref.index is the time
    stats[colname_stats] = stats[colname_stats] / stats[colname_stats].get(min_step)
    ref[colname_ref] = ref[colname_ref] / ref[colname_ref].get(min_step)
    
    err = 0
    for t in ref.index:
        val_ref = ref[colname_ref].get(t, 0)
        if val_ref == 0:
            raise ValueError("Found val_ref to 0, this should not be possible")
        val_stats = stats[colname_stats].get(t, 0)
        err += (val_ref - val_stats)**2 / val_ref
        
    return err

In [14]:
def run_grn(grn):
    ccls = factories["grn2"](grn=grn)
    bb = Brain(time_step=0.5, verbose=False, start_population=4, max_pop_size=1e3,
            cell_cls=ccls, end_time=END_TIME, start_time=50, silent=True)
    bb.run()
    return bb

In [15]:
def fitness_func(grn):
    bb = run_grn(grn)
    output = evaluating_function(bb.stats, REF, max_step=END_TIME)
    fitness = 1.0 / output
    return fitness

In [16]:
def get_sample_index(sample, acc):
    val = min(filter(lambda x: x > sample, acc))
    return acc.index(val)

def weighted_selection(population, fitness_values, create_func, new_fitness=1.):
    """
    :param population: list of individuals
    :param fit_values: fitness values for each individual (same length with population)
    :param create_func: function to create new individual
    """
    pop_sel = []
    history_sel = []
    acc = list(accumulate(fitness_values))
    total_population_fitness = acc[-1]
    total_fitness = total_population_fitness + new_fitness
    for i in range(POP_SIZE):
        sample = np.random.uniform(0, total_fitness)
        if sample >= total_population_fitness:
            pop_sel.append(create_func())
            history_sel.append(-1)
        else:
            chosen_id = get_sample_index(sample, acc)
            pop_sel.append(population[chosen_id].copy())
            history_sel.append(chosen_id)
            
    HISTORY[args.generation] = history_sel
            
    return pop_sel

In [17]:
get_sample_index(0.5, [0.1, 0.4, 0.6, 1.2])  # to the lib !

2

In [18]:
# generation
POP_SIZE = 50
N_GEN = 10

def do_init_pop():
    return [individual_generator() for i in range(POP_SIZE)]

def do_fitness(pop):
    return [fitness_func(sol.grn) for sol in tqdm(pop)]

def do_selection(pop_fit, pop):
    # print("Fit score : ", pop_fit)
    pop_sel = []
    acc = list(accumulate(pop_fit))
    best = max(pop_fit)
    best_sol = pop[pop_fit.index(best)]
    
    print("Total fitness :", acc[-1])

    pop_sel = weighted_selection(pop, pop_fit, individual_generator, new_fitness=1.)
        
    return pop_sel, best, best_sol

def do_mutation(pop_sel):
    [p.mutate() for p in pop_sel]
    return pop_sel

In [19]:
class O:
    pass

args = O()

def main():
    print("Setup GA")
    bbest = 0
    ttbest = None
    pop = do_init_pop()
    for generation in range(N_GEN):
        args.generation = generation
        fit = do_fitness(pop)
        sel, best, best_sol = do_selection(fit, pop)
        if best > bbest:
            print(f"Best {best}")
            if best > 10:
                return ttbest
            bbest = best
            ttbest = best_sol
            HALL_OF_FAME.append(best_sol.copy())
        else:
            print("--")
        pop = do_mutation(sel)
        
    return ttbest

In [20]:
sols = main()

  0%|          | 0/50 [00:00<?, ?it/s]

Setup GA


100%|██████████| 50/50 [00:53<00:00,  1.06s/it]
  0%|          | 0/50 [00:00<?, ?it/s]

Total fitness : 26.297208240303924
Best 5.731468013891973


100%|██████████| 50/50 [01:07<00:00,  1.35s/it]

Total fitness : 74.10483463054896
Best 10.219603728890295





In [21]:
for sol in sols:
    print("=============")
    print(sol.grn)

TypeError: 'Solution' object is not iterable

In [None]:
for one in HALL_OF_FAME:
    print(fitness_func(one.grn))

In [None]:
HALL_OF_FAME[0].grn

In [None]:
HALL_OF_FAME[-1].grn

In [None]:
bb = run_grn(sol.grn)

In [None]:
bb.stats

In [None]:
def show_population_curve(stats, ref, max_step=None, show=True):
    """
    The evaluation function for progenitor population size.
    The lower is the better
    :param stats: the stats of the bb after running
    """
    colname_ref = "size_type_Cycling"
    colname_stats = "progenitor_pop_size"
    # scale
    stats, ref = shrink_and_align_stats(stats, ref, max_step=max_step)
    
    min_step = min(ref.index)  # ref.index is the time
    stats[colname_stats] = stats[colname_stats] / stats[colname_stats].get(min_step)
    ref[colname_ref] = ref[colname_ref] / ref[colname_ref].get(min_step)
    
    plt.plot(ref.index, [ref[colname_ref].get(t, 0) for t in ref.index], label="Reference")
    plt.plot(ref.index, [stats[colname_stats].get(t, 0) for t in ref.index], label="Simulation")
    if show:
        plt.show()

In [None]:
show_population_curve(bb.stats, REF, max_step=END_TIME)