Skip to content

Commit

Permalink
Change the logic in genome.mutate: loop over genes which for which a …
Browse files Browse the repository at this point in the history
…random prob is below the mutation rate, evaluate permissible values for every gene,

 make a mutation always change the gene value, if more than one value is possible for this gene.
Add test for number of mutations in a large population (must be close to expected value), test only silent mutation by monkey patching the indices selection function, testing permissible values for hidden and output region.
Allow mutation_rates = 1.0.
  • Loading branch information
Henrik Mettler committed Jul 20, 2020
1 parent 66a8e5b commit 742cc9f
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 109 deletions.
120 changes: 75 additions & 45 deletions cgp/genome.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def dna(self) -> List[int]:
def dna(self, value: List[int]) -> None:
self._validate_dna(value)
self._dna = value
self._initialize_unkown_parameters()
self._initialize_unknown_parameters()

@property
def _n_hidden(self) -> int:
Expand Down Expand Up @@ -332,6 +332,9 @@ def _is_output_region(self, region_idx: int) -> bool:
def _is_function_gene(self, gene_idx: int) -> bool:
return (gene_idx % self._length_per_region) == 0

def _is_hidden_input_gene(self, gene_idx: int, region_idx: int) -> bool:
return self._is_hidden_region(region_idx) & ((gene_idx % self._length_per_region) != 0)

def _is_active_input_gene(self, gene_idx: int) -> bool:
input_index = gene_idx % self._length_per_region
assert input_index > 0
Expand All @@ -346,6 +349,27 @@ def _is_active_input_gene(self, gene_idx: int) -> bool:
else:
assert False # should never be reached

def _select_gene_indices_for_mutation(self, mutation_rate, len_dna, rng):
"""Selects the gene indices for mutations
Parameters
----------
mutation_rate : float
Proportion of genes to be mutated, between 0 and 1.
len_dna : int
Length of the genome dna.
rng : numpy.random.RandomState
Random number generator instance to use for selecting the indices.
Returns
----------
selected_gene_indices: numpy array
indices of the genes selected for mutation.
"""

selected_gene_indices = np.nonzero(rng.rand(len_dna) < mutation_rate)[0]
return selected_gene_indices

def mutate(self, mutation_rate: float, rng: np.random.RandomState):
"""Mutate the genome.
Expand All @@ -354,79 +378,85 @@ def mutate(self, mutation_rate: float, rng: np.random.RandomState):
mutation_rate : float
Proportion of genes to be mutated, between 0 and 1.
rng : numpy.random.RandomState
Random number generator instance to use for crossover.
Random number generator instance to use for mutation.
Returns
----------
bool
True if only inactive regions of the genome were mutated, False otherwise.
"""

def count_dna_differences(dna0: List[int], dna1: List[int]) -> int:
return len([1 for gene0, gene1 in zip(dna0, dna1) if gene0 != gene1])

n_mutations = int(mutation_rate * len(self.dna))
assert n_mutations > 0

graph = CartesianGraph(self)
active_regions = graph.determine_active_regions()

dna = list(self._dna)

only_silent_mutations = True
while count_dna_differences(self.dna, dna) < n_mutations:

gene_idx = rng.randint(0, self._n_genes)
region_idx = gene_idx // self._length_per_region
selected_gene_indices = self._select_gene_indices_for_mutation(
mutation_rate, len(dna), rng
)

if self._is_input_region(region_idx):
continue # nothing to do here
for (gene_idx, allele) in zip(selected_gene_indices, np.array(dna)[selected_gene_indices]):

elif self._is_output_region(region_idx):
silent = self._mutate_output_region(dna, gene_idx, rng)
region_idx = gene_idx // self._length_per_region

elif self._is_hidden_region(region_idx):
silent = self._mutate_hidden_region(dna, gene_idx, active_regions, rng)
permissible_values = self._determine_alternative_permissible_values(
gene_idx, allele, region_idx
)
if len(permissible_values) > 0:

else:
assert False # should never be reached
dna[gene_idx] = rng.choice(permissible_values)
silent = region_idx not in active_regions

only_silent_mutations = only_silent_mutations and silent
only_silent_mutations = only_silent_mutations and silent

self.dna = dna
return only_silent_mutations

def _mutate_output_region(
self, dna: List[int], gene_idx: int, rng: np.random.RandomState
) -> bool:
assert self._is_gene_in_output_region(gene_idx)
def _determine_alternative_permissible_values(
self, gene_idx: int, gene: int, region_idx: int
) -> List[int]:

if not self._is_function_gene(gene_idx) and self._is_active_input_gene(gene_idx):
permissible_inputs = self._permissible_inputs_for_output_region()
dna[gene_idx] = rng.choice(permissible_inputs)
return False
else:
return True
if self._is_input_region(region_idx):
return [] # genes in input regions have no alternative permissible values

def _mutate_hidden_region(
self, dna: List[int], gene_idx: int, active_regions: List[int], rng: np.random.RandomState
) -> bool:
elif self._is_hidden_region(region_idx):
return self._determine_alternative_permissible_values_hidden_gene(
gene_idx, gene, region_idx
)

assert self._is_gene_in_hidden_region(gene_idx)
elif self._is_output_region(region_idx):
return self._determine_alternative_permissible_values_output_gene(gene_idx, gene)

region_idx = gene_idx // self._length_per_region
silent_mutation = region_idx not in active_regions
else:
assert False # should never be reached

def _determine_alternative_permissible_values_hidden_gene(
self, gene_idx: int, gene: int, region_idx: int
) -> List[int]:
if self._is_function_gene(gene_idx):
dna[gene_idx] = self._primitives.sample_allele(rng)
return silent_mutation
permissible_values = list(np.arange(len(self._primitives._primitives)))

elif self._is_hidden_input_gene(gene_idx, region_idx):
permissible_values = self._permissible_inputs(region_idx)

else:
assert False
permissible_values.remove(gene)
return permissible_values

def _determine_alternative_permissible_values_output_gene(
self, gene_idx: int, gene: int
) -> List[int]:
input_index = (
gene_idx % self._length_per_region
) # assumes that the second gene in all output regions is the index of the input
if input_index == 1:
permissible_values = self._permissible_inputs_for_output_region()
permissible_values.remove(gene)
else:
permissible_inputs = self._permissible_inputs(region_idx)
dna[gene_idx] = rng.choice(permissible_inputs)
permissible_values = []

silent_mutation = silent_mutation or (not self._is_active_input_gene(gene_idx))
return silent_mutation
return permissible_values

@property
def primitives(self) -> Primitives:
Expand Down Expand Up @@ -484,7 +514,7 @@ def update_parameters_from_torch_class(self, torch_cls: "torch.nn.Module") -> bo

return any_parameter_updated

def _initialize_unkown_parameters(self) -> None:
def _initialize_unknown_parameters(self) -> None:
for region_idx, region in self.iter_hidden_regions():
node_id = region[0]
node_type = self._primitives[node_id]
Expand Down
2 changes: 1 addition & 1 deletion cgp/population.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
"""
self.n_parents = n_parents # number of individuals in parent population

if not (0.0 < mutation_rate and mutation_rate < 1.0):
if not (0.0 < mutation_rate and mutation_rate <= 1.0):
raise ValueError("mutation rate needs to be in (0, 1)")
self._mutation_rate = mutation_rate # probability of mutation per gene

Expand Down
2 changes: 2 additions & 0 deletions test/test_ea_mu_plus_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ def objective(individual):
individual.fitness = float(individual.idx)
return individual

population_params["mutation_rate"] = 1.0 # ensures every offspring has mutations

pop = cgp.Population(**population_params, genome_params=genome_params)
ea = cgp.ea.MuPlusLambda(**ea_params)

Expand Down
Loading

0 comments on commit 742cc9f

Please sign in to comment.