In [1]:
# This is to set the current working directory (CWD) to the root of the project
# Without, we cannot access DGA module
from pathlib import Path
import os
import sys
old_cwd = os.getcwd()
new_cwd = Path(os.getcwd()).parent.parent
os.chdir(new_cwd)
sys.path.append(str(old_cwd))

In [2]:
import numpy as np
import pickle as pkl
from os.path import join as file_path
from DGA.Algorithm import Plateau_Genetic_Algorithm
from DGA.Model import Model
from DGA.Local import Synchronized
from DGA.Plotting import plot_model_logs

In [3]:
class Custom_Plateau_Algorithm(Plateau_Genetic_Algorithm):

  def __init__(self, penalty_modulator, max_penalty, **kwargs):
    super().__init__(**kwargs)
    self.penalty_modulator = penalty_modulator
    self.max_penalty = max_penalty

  def initial_gene(self, **kwargs):
    return np.random.normal(loc=0, scale=10, size=self.gene_shape)

  # Return mutated gene
  def mutate(self, gene, **kwargs):
    if np.random.rand() < self.mutation_rate:
      mutation_start = np.random.randint(0, np.prod(self.gene_shape))
      mutation_end = np.random.randint(mutation_start+1, mutation_start+2+(self.mutation_rate*5))
      if mutation_end > np.prod(self.gene_shape):
        mutation_end = np.prod(self.gene_shape)
      gene = gene.flatten()
      gene[mutation_start:mutation_end] += np.random.normal(loc=0, scale=self.mutation_rate*5, size=mutation_end-mutation_start)
    return gene.reshape(self.gene_shape)

  def founder_proximity_penalty(self, gene):
    penalty = min(super().founder_proximity_penalty(gene), self.max_penalty) # Cap penalty to radius
    norm_penalty = penalty / self.max_penalty    # Normalize penalty to 0-1
    return norm_penalty * self.penalty_modulator    # Apply modulator penalty

In [4]:
class Vector_Estimation_Model(Model):  # <--- Remember to inherit Model class!
  def __init__(self, num_vectors: int, vector_size: tuple, vector_distribution: float, target_vectors: list = None, **kwargs):
    super().__init__(**kwargs)

    # Initialize target vectors
    self.target_vectors = []

    # Use previously initialized target vectors
    if 'target_vectors' in kwargs:
      self.target_vectors = kwargs['target_vectors']
    else:                             # Initialize targets when starting run

      # Initialize target vectors when passed as args
      if target_vectors is not None:
        self.target_vectors = target_vectors

      # Initialize target vectors randomly
      else:
        for i in range(num_vectors):
          location = np.random.uniform(low=-vector_distribution, high=+vector_distribution)
          self.target_vectors.append(np.random.normal(loc=location, scale=3, size=vector_size))

  def run(self, gene, **kwargs) -> float:
    smallest_diff = np.inf
    for i, targ in enumerate(self.target_vectors):
      diff = np.linalg.norm(gene.flatten() - targ.flatten())
      if diff < smallest_diff:
        smallest_diff = diff
    return -smallest_diff

  def logger(self, fitness, iteration, **kwargs):
    log = super().logger(fitness, iteration, **kwargs)  # Get default log
    return log

In [5]:
def algorithm_tester(gene_size, algorithm_args, num_vectors, run_name, run_dir=None):
	if run_dir is not None:		# Option to save run data to a specific directory
		run_name = file_path(run_dir, run_name)

	alg = Custom_Plateau_Algorithm(**algorithm_args)
	mod = Vector_Estimation_Model(num_vectors=num_vectors, vector_size=(gene_size, gene_size), vector_distribution=3)
	local_runner = Synchronized(run_name=run_name, model=mod, algorithm=alg,)

	# Run model & plot results
	local_runner.run()

	# Find closest target vector for each foundation gene & count how many targets were found
	found_targets = [False, False, False]
	for _, gene_data in alg.founders_pool.items():
		smallest_diff = np.inf
		for i, targ in enumerate(mod.target_vectors):
			diff = np.linalg.norm(gene_data['gene'].flatten() - targ.flatten())
			if diff < smallest_diff:
				smallest_diff = diff
				found_targets[i] = True
	num_targs_found = sum(found_targets)
	target_vectors = [targ for targ in mod.target_vectors]
	founders_pool = [gene_data['gene'] for _, gene_data in alg.founders_pool.items()]
	return num_targs_found, len(local_runner.log), target_vectors, founders_pool


In [6]:
gene_sizes = [5, 10, 15, 20]		# Gene sizes to test (square matrix)
max_penaltys = [25, 75, 200, 400, 800, 1500] 	# Max distance for penalty
penalty_modulators = [10, 20, 40, 80, 120, 180, 250, 350]		# Penalty modulator
tests_per_param = 8		# Number of tests to run per parameter combination
num_vectors = 3		# Number of target vectors to find

# Loop for Grid Search
GS_out_dir = 'scripts/GS_outputs_V3'
for g_size in gene_sizes:
	for max_p in max_penaltys:
		for p_mod in penalty_modulators:
			search_results = []
			test_name = f"GS_size{g_size}_pmod{p_mod}_max_p{max_p}"
			test_out_dir = file_path(GS_out_dir, test_name)

			algorithm_args = {'gene_shape':(g_size, g_size),
											'num_genes':10,
											'mutation_rate':1,
											'mutation_decay':.99995,
											'iterations_per_epoch':70_000,
											'epochs':num_vectors,		# 1 epoch per vector
											'plateau_sensitivity':1e-4,
											'plateau_sample_size':2000,
											'warmup':2000,
											'penalty_modulator':p_mod,
											'max_penalty':max_p,}
			
			for i in range(tests_per_param):
				# Test algorithm
				num_targs_found, num_iters, target_vectors, founders_pool = algorithm_tester(g_size, 
																																										algorithm_args=algorithm_args,
																																										num_vectors=num_vectors, 
																																										run_name=f"RUN_{i}", 
																																										run_dir=test_out_dir)
				# Plot learning curve
				plot_model_logs(run_dir=file_path(test_out_dir, f"RUN_{i}"), num_models=1, show_plot=False, save_plot=True)
				# Append results
				search_results.append({
					'num_targs_found': num_targs_found,
					'num_iters': num_iters,
					'target_vectors': target_vectors,
					'founders_pool': founders_pool,
				})

			# Print
			avg_num_targs_found = sum([res['num_targs_found'] for res in search_results]) / len(search_results)
			avg_num_iters = sum([res['num_iters'] for res in search_results]) / len(search_results)
			print(f"size: {g_size}, pmod: {p_mod}, max_penalty: {max_p}, avg_targs_found: {avg_num_targs_found}, avg_iters: {avg_num_iters}")

			# Save search results to disk
			# Note: Search results is a list of dicts, each dict contains the targets, the targets found, number of iterations, and the founders pool
			file_data = []
			for i in range(len(search_results)):
				file_data.append(search_results[i])
			with open(file_path(test_out_dir, 'search_results.pkl'), 'wb') as f:
				pkl.dump(file_data, f)

size: 5, pmod: 10, max_penalty: 25, avg_targs_found: 2.375, avg_iters: 46431.25
size: 5, pmod: 20, max_penalty: 25, avg_targs_found: 2.375, avg_iters: 43920.5
size: 5, pmod: 40, max_penalty: 25, avg_targs_found: 2.5, avg_iters: 46328.75
size: 5, pmod: 80, max_penalty: 25, avg_targs_found: 2.375, avg_iters: 44009.75
size: 5, pmod: 120, max_penalty: 25, avg_targs_found: 2.625, avg_iters: 41867.375
size: 5, pmod: 180, max_penalty: 25, avg_targs_found: 2.75, avg_iters: 42134.125
size: 5, pmod: 250, max_penalty: 25, avg_targs_found: 2.5, avg_iters: 42428.5
size: 5, pmod: 350, max_penalty: 25, avg_targs_found: 2.75, avg_iters: 40991.875
size: 5, pmod: 10, max_penalty: 50, avg_targs_found: 2.75, avg_iters: 50908.125
size: 5, pmod: 20, max_penalty: 50, avg_targs_found: 2.375, avg_iters: 50588.125
size: 5, pmod: 40, max_penalty: 50, avg_targs_found: 2.625, avg_iters: 48216.0
size: 5, pmod: 80, max_penalty: 50, avg_targs_found: 2.75, avg_iters: 40777.625
size: 5, pmod: 120, max_penalty: 50, avg_