diff --git a/cgp/genome.py b/cgp/genome.py index b32405da..fde198e0 100644 --- a/cgp/genome.py +++ b/cgp/genome.py @@ -107,7 +107,7 @@ def dna(self) -> List[int]: def dna(self, value: List[int]) -> None: self._validate_dna(value) self._dna = value - self._initialize_unkown_parameters() + self._initialize_unknown_parameters() @property def _n_hidden(self) -> int: @@ -332,6 +332,9 @@ def _is_output_region(self, region_idx: int) -> bool: def _is_function_gene(self, gene_idx: int) -> bool: return (gene_idx % self._length_per_region) == 0 + def _is_hidden_input_gene(self, gene_idx: int, region_idx: int) -> bool: + return self._is_hidden_region(region_idx) & ((gene_idx % self._length_per_region) != 0) + def _is_active_input_gene(self, gene_idx: int) -> bool: input_index = gene_idx % self._length_per_region assert input_index > 0 @@ -346,6 +349,27 @@ def _is_active_input_gene(self, gene_idx: int) -> bool: else: assert False # should never be reached + def _select_gene_indices_for_mutation(self, mutation_rate, len_dna, rng): + """Selects the gene indices for mutations + + Parameters + ---------- + mutation_rate : float + Proportion of genes to be mutated, between 0 and 1. + len_dna : int + Length of the genome dna. + rng : numpy.random.RandomState + Random number generator instance to use for selecting the indices. + + Returns + ---------- + selected_gene_indices: numpy array + indices of the genes selected for mutation. + """ + + selected_gene_indices = np.nonzero(rng.rand(len_dna) < mutation_rate)[0] + return selected_gene_indices + def mutate(self, mutation_rate: float, rng: np.random.RandomState): """Mutate the genome. @@ -354,7 +378,7 @@ def mutate(self, mutation_rate: float, rng: np.random.RandomState): mutation_rate : float Proportion of genes to be mutated, between 0 and 1. rng : numpy.random.RandomState - Random number generator instance to use for crossover. + Random number generator instance to use for mutation. Returns ---------- @@ -362,71 +386,77 @@ def mutate(self, mutation_rate: float, rng: np.random.RandomState): True if only inactive regions of the genome were mutated, False otherwise. """ - def count_dna_differences(dna0: List[int], dna1: List[int]) -> int: - return len([1 for gene0, gene1 in zip(dna0, dna1) if gene0 != gene1]) - - n_mutations = int(mutation_rate * len(self.dna)) - assert n_mutations > 0 - graph = CartesianGraph(self) active_regions = graph.determine_active_regions() - dna = list(self._dna) - only_silent_mutations = True - while count_dna_differences(self.dna, dna) < n_mutations: - gene_idx = rng.randint(0, self._n_genes) - region_idx = gene_idx // self._length_per_region + selected_gene_indices = self._select_gene_indices_for_mutation( + mutation_rate, len(dna), rng + ) - if self._is_input_region(region_idx): - continue # nothing to do here + for (gene_idx, allele) in zip(selected_gene_indices, np.array(dna)[selected_gene_indices]): - elif self._is_output_region(region_idx): - silent = self._mutate_output_region(dna, gene_idx, rng) + region_idx = gene_idx // self._length_per_region - elif self._is_hidden_region(region_idx): - silent = self._mutate_hidden_region(dna, gene_idx, active_regions, rng) + permissible_values = self._determine_alternative_permissible_values( + gene_idx, allele, region_idx + ) + if len(permissible_values) > 0: - else: - assert False # should never be reached + dna[gene_idx] = rng.choice(permissible_values) + silent = region_idx not in active_regions - only_silent_mutations = only_silent_mutations and silent + only_silent_mutations = only_silent_mutations and silent self.dna = dna return only_silent_mutations - def _mutate_output_region( - self, dna: List[int], gene_idx: int, rng: np.random.RandomState - ) -> bool: - assert self._is_gene_in_output_region(gene_idx) + def _determine_alternative_permissible_values( + self, gene_idx: int, gene: int, region_idx: int + ) -> List[int]: - if not self._is_function_gene(gene_idx) and self._is_active_input_gene(gene_idx): - permissible_inputs = self._permissible_inputs_for_output_region() - dna[gene_idx] = rng.choice(permissible_inputs) - return False - else: - return True + if self._is_input_region(region_idx): + return [] # genes in input regions have no alternative permissible values - def _mutate_hidden_region( - self, dna: List[int], gene_idx: int, active_regions: List[int], rng: np.random.RandomState - ) -> bool: + elif self._is_hidden_region(region_idx): + return self._determine_alternative_permissible_values_hidden_gene( + gene_idx, gene, region_idx + ) - assert self._is_gene_in_hidden_region(gene_idx) + elif self._is_output_region(region_idx): + return self._determine_alternative_permissible_values_output_gene(gene_idx, gene) - region_idx = gene_idx // self._length_per_region - silent_mutation = region_idx not in active_regions + else: + assert False # should never be reached + def _determine_alternative_permissible_values_hidden_gene( + self, gene_idx: int, gene: int, region_idx: int + ) -> List[int]: if self._is_function_gene(gene_idx): - dna[gene_idx] = self._primitives.sample_allele(rng) - return silent_mutation + permissible_values = list(np.arange(len(self._primitives._primitives))) + elif self._is_hidden_input_gene(gene_idx, region_idx): + permissible_values = self._permissible_inputs(region_idx) + + else: + assert False + permissible_values.remove(gene) + return permissible_values + + def _determine_alternative_permissible_values_output_gene( + self, gene_idx: int, gene: int + ) -> List[int]: + input_index = ( + gene_idx % self._length_per_region + ) # assumes that the second gene in all output regions is the index of the input + if input_index == 1: + permissible_values = self._permissible_inputs_for_output_region() + permissible_values.remove(gene) else: - permissible_inputs = self._permissible_inputs(region_idx) - dna[gene_idx] = rng.choice(permissible_inputs) + permissible_values = [] - silent_mutation = silent_mutation or (not self._is_active_input_gene(gene_idx)) - return silent_mutation + return permissible_values @property def primitives(self) -> Primitives: @@ -484,7 +514,7 @@ def update_parameters_from_torch_class(self, torch_cls: "torch.nn.Module") -> bo return any_parameter_updated - def _initialize_unkown_parameters(self) -> None: + def _initialize_unknown_parameters(self) -> None: for region_idx, region in self.iter_hidden_regions(): node_id = region[0] node_type = self._primitives[node_id] diff --git a/cgp/population.py b/cgp/population.py index 9d0e668c..0df00d2b 100755 --- a/cgp/population.py +++ b/cgp/population.py @@ -34,7 +34,7 @@ def __init__( """ self.n_parents = n_parents # number of individuals in parent population - if not (0.0 < mutation_rate and mutation_rate < 1.0): + if not (0.0 < mutation_rate and mutation_rate <= 1.0): raise ValueError("mutation rate needs to be in (0, 1)") self._mutation_rate = mutation_rate # probability of mutation per gene diff --git a/test/test_ea_mu_plus_lambda.py b/test/test_ea_mu_plus_lambda.py index dc316c71..d7518554 100644 --- a/test/test_ea_mu_plus_lambda.py +++ b/test/test_ea_mu_plus_lambda.py @@ -203,6 +203,8 @@ def objective(individual): individual.fitness = float(individual.idx) return individual + population_params["mutation_rate"] = 1.0 # ensures every offspring has mutations + pop = cgp.Population(**population_params, genome_params=genome_params) ea = cgp.ea.MuPlusLambda(**ea_params) diff --git a/test/test_genome.py b/test/test_genome.py index 55d1a892..8c65b155 100644 --- a/test/test_genome.py +++ b/test/test_genome.py @@ -3,6 +3,7 @@ import cgp from cgp.genome import ID_INPUT_NODE, ID_OUTPUT_NODE, ID_NON_CODING_GENE +from cgp.cartesian_graph import CartesianGraph def test_check_dna_consistency(): @@ -346,89 +347,203 @@ def test_is_gene_in_output_region(rng_seed): assert not genome._is_gene_in_output_region(11) -def test_mutate_hidden_region(rng_seed): +def test_mutation_rate(rng_seed, mutation_rate): + n_inputs = 1 + n_outputs = 1 + n_columns = 4 + n_rows = 3 + genome = cgp.Genome(n_inputs, n_outputs, n_columns, n_rows, None, (cgp.Add, cgp.Sub)) rng = np.random.RandomState(rng_seed) - genome = cgp.Genome(1, 1, 3, 1, None, (cgp.Add, cgp.ConstantFloat)) - dna = [ - ID_INPUT_NODE, - ID_NON_CODING_GENE, - ID_NON_CODING_GENE, + genome.randomize(rng) + + def count_n_immutable_genes(n_inputs, n_output, n_row): + length_per_region = genome.primitives.max_arity + 1 # function gene + input gene addresses + n_immutable_genes = n_inputs * length_per_region # none of the input genes are mutable + n_immutable_genes += n_output * ( + length_per_region - 1 + ) # only one gene per output can be mutated + if n_inputs == 1: + n_immutable_genes += ( + n_row * 2 + ) # input gene addresses in the first hidden layer can't be mutated in that case + return n_immutable_genes + + def count_mutations(dna0, dna1): + n_differences = 0 + for (allele0, allele1) in zip(dna0, dna1): + if allele0 != allele1: + n_differences += 1 + return n_differences + + n_immutable_genes = count_n_immutable_genes(n_inputs, n_outputs, n_rows) + n_mutations_mean_expected = mutation_rate * (len(genome.dna) - n_immutable_genes) + n_mutations_std_expected = np.sqrt( + (len(genome.dna) - n_immutable_genes) * mutation_rate * (1 - mutation_rate) + ) + + n = 10000 + n_mutations = [] + for _ in range(n): + dna_old = genome.dna + genome.mutate(mutation_rate, rng) + n_mutations.append(count_mutations(dna_old, genome.dna)) + + assert np.mean(n_mutations) == pytest.approx(n_mutations_mean_expected, rel=0.04) + assert np.std(n_mutations) == pytest.approx(n_mutations_std_expected, rel=0.04) + + +def test_only_silent_mutations(genome_params, mutation_rate, rng_seed): + genome = cgp.Genome(**genome_params) + rng = np.random.RandomState(rng_seed) + genome.randomize(rng) + + only_silent_mutations = genome.mutate(mutation_rate=0, rng=rng) + assert only_silent_mutations is True + + only_silent_mutations = genome.mutate(mutation_rate=1, rng=rng) + assert not only_silent_mutations + + dna_fixed = [ + -1, + -3, + -3, + -1, + -3, + -3, + 2, 1, 0, 0, - 1, 0, 0, 0, 0, - 2, - ID_OUTPUT_NODE, - 3, - ID_NON_CODING_GENE, - ] - genome.dna = list(dna) - active_regions = cgp.CartesianGraph(genome).determine_active_regions() - - # mutating any gene in inactive region returns True - assert genome._mutate_hidden_region(list(dna), 3, active_regions, rng) is True - assert genome._mutate_hidden_region(list(dna), 4, active_regions, rng) is True - assert genome._mutate_hidden_region(list(dna), 5, active_regions, rng) is True - - # mutating function gene in active region returns False - assert genome._mutate_hidden_region(list(dna), 6, active_regions, rng) is False - # mutating inactive genes in active region returns True - assert genome._mutate_hidden_region(list(dna), 7, active_regions, rng) is True - assert genome._mutate_hidden_region(list(dna), 8, active_regions, rng) is True - - # mutating any gene in active region without silent genes returns False - assert genome._mutate_hidden_region(list(dna), 9, active_regions, rng) is False - assert genome._mutate_hidden_region(list(dna), 10, active_regions, rng) is False - assert genome._mutate_hidden_region(list(dna), 11, active_regions, rng) is False - - -def test_mutate_output_region(rng_seed): - rng = np.random.RandomState(rng_seed) - genome = cgp.Genome(1, 1, 2, 1, None, (cgp.Add,)) - dna = [ - ID_INPUT_NODE, - ID_NON_CODING_GENE, - ID_NON_CODING_GENE, 0, 0, 0, 0, 0, 0, - ID_OUTPUT_NODE, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + -2, 2, - ID_NON_CODING_GENE, + -3, ] + genome.dna = dna_fixed + graph = CartesianGraph(genome) + active_regions = graph.determine_active_regions() + length_per_region = genome.primitives.max_arity + 1 # function gene + input gene addresses + gene_to_be_mutated_non_active = ( + 3 * length_per_region + ) # operator gene of 2nd internal unit, a silent unit in this dna + + def select_gene_indices_silent(mutation_rate, dna, rng): + selected_gene_indices = [gene_to_be_mutated_non_active] + return selected_gene_indices + + genome._select_gene_indices_for_mutation = \ + select_gene_indices_silent # monkey patch the selection of indices to selecting a silent gene + genome.dna = dna_fixed + only_silent_mutations = genome.mutate(mutation_rate, rng) + assert only_silent_mutations is True + + gene_to_be_mutated_active = ( + active_regions[-1] * length_per_region + ) # function gene of the 1st active internal gene, should always be mutable + + def select_gene_indices_non_silent(mutation_rate, dna, rng): + selected_gene_indices = [gene_to_be_mutated_active] + return selected_gene_indices + + genome._select_gene_indices_for_mutation = select_gene_indices_non_silent + only_silent_mutations = genome.mutate(mutation_rate, rng) + assert not only_silent_mutations + + +def test_permissible_values_hidden(rng_seed): + genome_params = { + "n_inputs": 2, + "n_outputs": 1, + "n_columns": 3, + "n_rows": 3, + "levels_back": 2, + "primitives": (cgp.Add, cgp.Sub, cgp.ConstantFloat), + } + + genome = cgp.Genome(**genome_params) + + # test function gene + gene_idx = 6 # function gene of first hidden region + allele = 0 # + region_idx = None # can be any number, since not touched for function gene + permissible_function_gene_values = \ + genome._determine_alternative_permissible_values_hidden_gene( + gene_idx, allele, region_idx + ) + assert permissible_function_gene_values == [ + 1, + 2, + ] # function idx 1,2 (cgp.Sub, cgp.ConstantFloat) + + # test input gene + gene_idx = 16 # first input gene in second column of hidden region + allele = 0 + region_idx = gene_idx // ( + genome.primitives.max_arity + 1 + ) # function gene + input gene addresses + permissible_hidden_input_gene_values = \ + genome._determine_alternative_permissible_values_hidden_gene( + gene_idx, allele, region_idx + ) - assert genome._mutate_output_region(list(dna), 9, rng) is True - assert genome._mutate_output_region(list(dna), 10, rng) is False - assert genome._mutate_output_region(list(dna), 11, rng) is True + assert permissible_hidden_input_gene_values == [ + 1, + 2, + 3, + 4, + ] # gene indices of input, 1st hidden layer -@pytest.mark.parametrize("mutation_rate", [0.02, 0.05, 0.2]) -def test_correct_number_of_mutations(mutation_rate, rng_seed): +def test_permissible_values_output(rng_seed): + genome_params = { + "n_inputs": 2, + "n_outputs": 1, + "n_columns": 3, + "n_rows": 3, + "levels_back": 2, + "primitives": (cgp.Add, cgp.Sub, cgp.ConstantFloat), + } + genome = cgp.Genome(**genome_params) - n_inputs = 2 - n_outputs = 1 - n_columns = 10 - n_rows = 2 - levels_back = 5 - primitives = (cgp.Add, cgp.Sub, cgp.Mul, cgp.Div, cgp.ConstantFloat) + gene_idx_function = 33 + gene_idx_input0 = 34 + gene_idx_input1 = 35 - rng = np.random.RandomState(rng_seed) - genome = cgp.Genome(n_inputs, n_outputs, n_columns, n_rows, levels_back, primitives) - genome.randomize(rng) + allele = 5 # set current value for input0 -> should be excluded from permissible values - n_mutations = 0 - genome_new = genome.clone() - genome_new.mutate(mutation_rate, rng) - for (gene_0, gene_1) in zip(genome.dna, genome_new.dna): - if gene_0 != gene_1: - n_mutations += 1 + permissible_values = genome._determine_alternative_permissible_values_output_gene( + gene_idx_function, allele + ) + assert permissible_values == [] - n_mutations_expected = int(mutation_rate * len(genome.dna)) - assert n_mutations == n_mutations_expected + permissible_values = genome._determine_alternative_permissible_values_output_gene( + gene_idx_input0, allele + ) + assert permissible_values == [0, 1, 2, 3, 4, 6, 7, 8, 9, 10] + + permissible_values = genome._determine_alternative_permissible_values_output_gene( + gene_idx_input1, allele + ) + assert permissible_values == [] diff --git a/test/test_hl_api.py b/test/test_hl_api.py index 9ea76dd1..28c8069b 100644 --- a/test/test_hl_api.py +++ b/test/test_hl_api.py @@ -65,6 +65,8 @@ def test_parallel_population(population_params, genome_params, ea_params): fitness_per_n_processes.append( _test_population(population_params, genome_params, ea_params) ) + + assert fitness_per_n_processes[1] == pytest.approx(fitness_per_n_processes[2]) assert fitness_per_n_processes[0] == pytest.approx(fitness_per_n_processes[1]) assert fitness_per_n_processes[0] == pytest.approx(fitness_per_n_processes[2])