Importing cellular automata & optimization classes, and other stuff

In [1]:
import os
import sys
import shutil

from typing import List, Type, Callable, Dict
from numpy import int32
from numpy._typing import NDArray
import importlib

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(''))))

#from algorithm.blender import Lattice, clear_initial
from algorithm.genetic import Optimizer, Mutator, ArbitraryRulesetMutator, MutationSet
from algorithm.objectives import sampleobj

import numpy as np
import pandas as pd

import time

Setting up optimizer and data logging code

In [2]:
def log_mutation(data_list: List[Dict], mutations: List[MutationSet], objective_val: float):
    """
    Given the data list reference, the mutation set, and the objective value after applying it, add it to the data logging list
    """
    ic_cell_pos = []
    ic_state_old = []
    ic_state_new = []
    srt_cell_pos = []
    srt_state_old = []
    srt_state_new = []

    """
    Important difference from original genetic algorithm: 
    ic_mut is a 4 membered list showing the mutated position in the IC.
    Example: [0,0,0,1], meaning that at IC pos (0,0) the state 0 is modified to be 1.
    srt_mut is a 6 membered list showing the mutated position in the ruleset.
    Example: [0,0,0,0,0,1], meaning that at SRT rule pos (0,0,0) index 0 the rule 0 is modified to be 1.
    For a 2 state SRT there are 18 different cells for mutation: 0 - 17.
    """
    for ic_mut in mutations.ic_mutations:
        ic_cell_pos.append(tuple(ic_mut[0:2]))
        ic_state_old.append(ic_mut[2])
        ic_state_new.append(ic_mut[3])
    for srt_mut in mutations.srt_mutations:
        srt_cell_pos.append(tuple(srt_mut[0:4]))
        srt_state_old.append(srt_mut[4])
        srt_state_new.append(srt_mut[5])
    
    data_list.append({
        "ic_cell_pos": np.array(ic_cell_pos), 
        "ic_state_old": np.array(ic_state_old), 
        "ic_state_new": np.array(ic_state_new), 
        "srt_cell_pos": np.array(srt_cell_pos), 
        "srt_state_old": np.array(srt_state_old), 
        "srt_state_new": np.array(srt_state_new), 
        "objective": objective_val,
    })

def run_experiment(iters: int, grid_sz: int, opt_func: Callable[[NDArray[int32]], int],
                   srt_num_mutate: int, ic_num_mutate: int, rule_mutate_prob: float, strict: bool = False, states: int = 2, ic_enable: bool = True, srt_enable: bool = True):
    """
    Runs an experiment with the below hyperparameters:

    :param iters: The number of iterations the mutation algorithm (updating both IC and SRT) is going to run for
    :param grid_sz: The size of the square grid that we're going to update each iteration
    :param opt_func: The functions that gives the performance metric we're going to optimize
    :param srt_num_mutate: The number of SRT cells for which we're going to mutate the rule applied, each iteration
    :param ic_num_mutate: The number of IC cells for which we're going to mutate the rule applied, each iteration
    :param rule_mutate_prob: The probability, for each neighbor state tensor of the rule of a cell that's selected to be mutated, the final state is mutated
    
    Params ruleset_mutator_class and rule_set have been deleted due to previous deletion of the RulesetMutator class.
    """
    # TODO: separate SRT and IC mutations to have a certain number of each
    # RESOLVED: add a flag to enable doing only SRT or only IC mutations in an iteration (in optimizer step, and then propagate into mutator)
    ruleset_mutator = ArbitraryRulesetMutator(grid_size=grid_sz, mutate_p=1/(grid_sz**2) * (srt_num_mutate+ic_num_mutate), rule_mutate_p=rule_mutate_prob, strict=strict, states=states, ic_enable=ic_enable, srt_enable = srt_enable)

    optim = Optimizer(mutator=ruleset_mutator, objective=lambda grid: opt_func(grid))
    """
    Pandas Dataframe used to log experiment data is:

    ic_cell_pos (np.array) | ic_state_old (np.array) | ic_state_new (np.array) | srt_cell_pos (np.array) | srt_state_old (np.array) | srt_state_new (np.array) | objective (float)
    
    etc.

    initial state for IC is in entry 0 in ic_state_old, and SRT is in entry 0 in srt_state_old

    ic and srt mutation cell positions and states can have an extra dimension in the beginning to indicate they are batch updates
    """

    init_state = optim.state
    
    data_list = [{"ic_cell_pos": grid_sz, 
                  "ic_state_old": init_state.initial, 
                  "ic_state_new": None, 
                  "srt_cell_pos": -1, 
                  "srt_state_old": init_state.rules, 
                  "srt_state_new": None, 
                  "objective": 0}]

    for it in range(iters):
        #print(f"Iteration {it}")
        # print(f"On iteration {it+1}...")
        accepted, new, old, mutations = optim.step()
        # data logging
        log_mutation(data_list, mutations, optim.objvalue)
        #if accepted:
        #    print("Got a better state!", optim.objvalue)

    # print(data_list)
    df = pd.DataFrame(data_list)
    # print(df)
    return df

timelogs = []

def repeat_experiment(experiment_name: str, num_expers: int, *args):
    """
    Perform (sequentially) multiple experiments that return a Pandas DataFrame and save all the data

    :param experiment_name: The name of the experiment to save the file
    :param num_expers: Number of times to run the experiment (and save all the data in one file)
    :param *args: The arguments to be passed to the experiment function
    """
    for i in range(num_expers):
        init = time.time()
        print(f'REPETITION {i}')
        ret_data = run_experiment(*args)
        timelogs.append(time.time() - init)
        print(f"Finished rep {i} in {time.time() - init}s")
        ret_data.to_hdf(f'test/{states}/{experiment_name}_{i}_{states}states.h5', key='data', mode='a')


Setting up experiments and gathering data

In [None]:
ITERATIONS_SET = [100]
GRID_SIZE_SET = [32]
NUM_REPEAT = 11
EXPERIMENT_NAME = "statetimetest"
state_set = [2,3,4,5,6,7,8,9,10,11,12]
for iters in ITERATIONS_SET:
    for grid_sz in GRID_SIZE_SET:
        for states in state_set:
            print(f"RUNNING EXPERIMENT {EXPERIMENT_NAME} WITH {iters} ITERATIONS AND {grid_sz} SIZE GRID AND {states} STATES")
            #Total param is set default to false meaning that the probability is not the total probability; 
            #so that the program aligns closer to that of the original genetic algorithm
            
            #def probability for Strict Mode = 2/3; def probability for Non-Strict Mode = 5/384
            repeat_experiment(f"{EXPERIMENT_NAME}_{iters}ITERS_{grid_sz}GRID", NUM_REPEAT, iters, grid_sz, sampleobj, 10, 10, 2/3, True, states, True, True)

RUNNING EXPERIMENT statetimetest WITH 100 ITERATIONS AND 32 SIZE GRID AND 5 STATES
REPETITION 0


KeyboardInterrupt: 

In [None]:
print(timelogs)

[5.291915655136108, 5.077759504318237, 5.131318092346191, 5.062913656234741, 5.0701470375061035, 5.078394412994385, 5.144408226013184, 5.069453477859497, 5.09482479095459, 5.157494783401489, 5.0829973220825195, 29.191088438034058, 28.601072311401367, 28.45984387397766, 28.516353845596313, 28.60829997062683, 28.611929655075073, 28.624119997024536, 28.67142963409424, 28.38516592979431, 28.427666664123535, 28.40297842025757, 138.85115361213684, 137.45857954025269, 137.62925052642822, 137.99586701393127, 136.76956725120544, 137.24541521072388, 136.7518277168274, 137.19946575164795, 137.24821710586548, 137.55703115463257, 137.07272791862488, 470.3409218788147]


In [None]:
np.set_printoptions(legacy='1.25')

print(stat(timelogs[0:11]))
print(stat(timelogs[11:22]))
print(stat(timelogs[22:33]))
print(stat(timelogs[33:]))

[5.114693359895186, 0.06409675948192313]
[28.59090443090959, 0.21227816643929287]
[137.43446389111605, 0.567315317165921]
[470.3409218788147, 0.0]
