In [73]:
from src.harness import architecture as arch

import copy
import functools
import numpy as np
import tensorflow as tf
from tensorflow import keras
from typing import Any, Callable, Dict, Iterable, List, Literal, Tuple

In [74]:
# Typedefs
Genome = List[tf.Tensor]
GenomeInit = Callable[[keras.Model], Genome]
GenomeMetricCallback = Callable[[Dict, Genome], Any]

class Individual:
    # One shared copy throughout the class (all individuals in population share same original weights)
    ARCHITECTURE = None
    MODEL = None
    
    def __init__(
        self, 
        architecture: arch.Architecture, 
        genome_init: GenomeInit,
    ):
        # If this is the first instance of the class, initialize it with read only copies of data
        if self.ARCHITECTURE is None:
            self.ARCHITECTURE = architecture
            self.MODEL = self.ARCHITECTURE.get_model_constructor()()
            self.DATA = self.ARCHITECTURE.load_data()
            
        self.genome = genome_init(self.model)
        self.rng = np.random.default_rng()
        self.metrics = {}
        
    @staticmethod
    def copy_from(individual: Literal['Individual']) -> Literal['Individual']:
        copied = copy.deepcopy(individual)
        copied.metrics.clear()
        copied.rng = np.random.default_rng()
        return copied
        
    @property
    def fitness(self) -> float | None:
        return self.metrics.get("mean_accuracy")
    
    @property
    def architecture(self) -> arch.Architecture | None:
        return self.ARCHITECTURE
    
    @property
    def model(self) -> keras.Model | None:
        return self.MODEL
    
    @property
    def data(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] | None:
        return self.DATA
    
    @property
    def training_data(self) -> Tuple[np.ndarray, np.ndarray] | None:
        if self.data is not None:
            X_train, _, Y_train, _ = self.data
            return X_train, Y_train
        
    @property
    def test_data(self) -> Tuple[np.ndarray, np.ndarray] | None:
        if self.data is not None:
            _, X_test, _, Y_test = self.data
            return X_test, Y_test
        
    def copy_model(self) -> keras.Model | None:
        if self.model is not None:
            return copy.deepcopy(self.model)
        
    def sample_mask(self) -> Genome:
        return [
            tf.cast(
                np.random.uniform(low=0, high=1, size=probabilities.shape)
                > probabilities,
                tf.float32,
            )
            for probabilities in self.genome
        ]
    
    @staticmethod
    def eval_fitness(individual: Literal['Individual'], num_evaluations: int = 5):
        model = individual.copy_model()
        model.compile(
            loss=keras.losses.CategoricalCrossentropy(),
            metrics=[keras.metrics.CategoricalAccuracy()]
        )
        X_test, Y_test = individual.test_data
        
        losses = np.zeros(num_evaluations)
        accuracies = np.zeros(num_evaluations)
        for i in range(num_evaluations):
            mask = individual.sample_mask()
            model.set_weights([w * m for w, m in zip(individual.model.get_weights(), mask)])
            losses[i], accuracies[i] = model.evaluate(X_test, Y_test)
        individual.metrics["mean_loss"] = np.mean(losses)
        individual.metrics["mean_accuracy"] = np.mean(accuracies)
    
    # Mutation methods
    
    @staticmethod
    def mutate_perturb(individual: Literal['Individual'], rate: float, scale: float):
        for layer_index, layer in enumerate(individual.genome):
            perturb_mask = (np.random.uniform(
                low=0, 
                high=1, 
                size=layer.shape,
            ) > rate).astype(np.int8)
            perturbations = individual.rng.normal(
                loc=0,
                scale=scale,
                size=layer.shape,
            ) * perturb_mask
            individual.genome[layer_index] = np.clip(
                layer + perturbations, 
                a_min=0,
                a_max=1,
            )
    
    @staticmethod
    def mutate_resample(individual: Literal['Individual'], rate: 0.05):
        for layer_index, layer in enumerate(individual.genome):
            perturb_mask = np.random.uniform(
                low=0, 
                high=1, 
                size=layer.shape,
            ) > rate
            resampled = np.random.uniform(
                low=0, 
                high=1, 
                size=layer.shape,
            )
            np.place(layer, perturb_mask, resampled)
    
    # Initialization methods
    
    @staticmethod
    def full_mask_init(model: keras.Model) -> Genome:
        return [
            np.ones_like(
                input=weights,
                dtype=tf.float32,
            )
            for weights in model.get_weights()
        ]
    
    @staticmethod
    def random_init(model: keras.Model) -> Genome:
        return [
            np.random.uniform(low=0, high=1, size=weights.shape)
            for weights in model.get_weights()
        ]
    
    # Crossover methods
    
    @staticmethod
    def layer_crossover(p1: Literal['Individual'], p2: Literal['Individual']) -> Iterable[Literal['Individual']]:
        child1, child2 = list(map(Individual.copy_from, (p1, p2)))
        p1_weights = p1.genome
        p2_weights = p2.genome
        parents = np.random.randint(low=0, high=2, size=len(p1_weights))
        child1.genome = copy.deepcopy([
            p1_weights[layer_index] if parent == 0 
            else p2_weights[layer_index] 
            for layer_index, parent in enumerate(parents)
        ])
        child2.genome = copy.deepcopy([
            p2_weights[layer_index] if parent == 0 
            else p1_weights[layer_index] 
            for layer_index, parent in enumerate(parents)
        ])
        return child1, child2
    
    @staticmethod
    def neuron_crossover(p1: Literal['Individual'], p2: Literal['Individual']) -> Iterable[Literal['Individual']]:
        child1, child2 = list(map(Individual.copy_from, (p1, p2)))
        p1_weights = p1.genome
        p2_weights = p2.genome
        for layer_index, weights in enumerate(p1_weights):
            # Generate a 0/1 for each row, then extend it across all outgoing synapses
            parents = np.repeat(
                np.random.randint(low=0, high=2, size=weights.shape[0]),
                1 if weights.ndim == 1 else weights.shape[1],
                axis=0,
            ).reshape((weights.shape))
            inverse_parents = np.logical_not(parents).astype(np.int8)
            
            # This multiplication uses masks to perform selection
            child1.genome[layer_index] = p1_weights[layer_index] * parents \
                + p2_weights[layer_index] * inverse_parents
            child2.genome[layer_index] = p2_weights[layer_index] * parents \
                + p1_weights[layer_index] * inverse_parents
        return child1, child2
    
    @staticmethod
    def synapse_crossover(p1: Literal['Individual'], p2: Literal['Individual']) -> Iterable[Literal['Individual']]:
        child1, child2 = list(map(Individual.copy_from, (p1, p2)))
        p1_weights = p1.genome
        p2_weights = p2.genome
        for layer_index, weights in enumerate(p1_weights):
            # Generate a 0/1 for each row, then extend it across all outgoing synapses
            parents = np.random.randint(low=0, high=2, size=weights.shape)
            inverse_parents = np.logical_not(parents).astype(np.int8)
            
            # This multiplication uses masks to perform selection
            child1.genome[layer_index] = p1_weights[layer_index] * parents \
                + p2_weights[layer_index] * inverse_parents
            child2.genome[layer_index] = p2_weights[layer_index] * parents \
                + p1_weights[layer_index] * inverse_parents
        return child1, child2

# Typedefs
Mutation = Callable[[Individual], None]
Crossover = Callable[[Individual, Individual], Individual]
FitnessFunction = Callable[[Individual], float]


In [75]:
a = arch.Architecture('lenet', 'mnist')
p1 = Individual(a, Individual.random_init)
p2 = Individual(a, Individual.random_init)
c1, c2 = Individual.layer_crossover(p1, p2)

In [78]:
# (µ + λ) strategy
def evolutionary_algorithm(
    num_generations: int,
    num_parents: int,
    num_children: int,
    tournament_size: int,
    num_tournament_winners: int,
    individual_constructor: Callable[[], Individual],
    fitness_eval: FitnessFunction,
    crossover: Crossover | None = None,
    mutations: List[Mutation] = [],
    genome_metric_callbacks: List[GenomeMetricCallback] = [],
):
    if num_tournament_winners > tournament_size:
        raise ValueError("Cannot have more tournament winners than participants")
        
    best_solution = None
    best_fitness = -np.inf
    genome_metrics = {"best_solution_fitness": np.zeros(num_generations)}
    
    # Create and evaluate the initial population
    population = []
    for _ in range(num_parents):
        individual = individual_constructor()
        fitness_eval(individual)
        if individual.fitness > best_fitness:
            best_solution = copy.deepcopy(individual)
            best_fitness = best_solution.fitness
        population.append(individual)
    
    for generation_index in range(num_generations):
        children = []
        while len(children) < num_children:
            parents = np.random.choice(population, 2)
            new_children = crossover(*parents) if crossover else list(map(Individual.copy_from, parents))
            
            for child in new_children:
                for mutation in mutations:
                    mutation(child)
                fitness_eval(child)
                if child.fitness > best_fitness:
                    best_solution = child
                    best_fitness = best_solution.fitness
            children.extend(new_children)
        population.extend(children)
        
        # Seed next generation with best solution found thus far
        next_generation = [best_solution]
        while len(next_generation) < num_parents:
            tournament = sorted(
                np.random.choice(population, size=tournament_size),
                key = lambda x: x.fitness,
                reverse=True,
            )
            next_generation.extend(tournament[:num_tournament_winners])
        population = next_generation
        
        for callback in genome_metric_callbacks:
            callback(genome_metrics, population)
        genome_metrics["best_solution_fitness"][generation_index] = best_fitness
        
    return genome_metrics
    

In [79]:
num_runs = 2

individual_constructor = functools.partial(
    Individual, 
    architecture=arch.Architecture('lenet', 'mnist'), 
    genome_init=Individual.random_init,
)
fitness_eval = functools.partial(Individual.eval_fitness, num_evaluations=1)

mutations = [
    functools.partial(Individual.mutate_resample, rate=0.05),
    functools.partial(Individual.mutate_perturb, rate=0.1, scale=0.025),
]
def dummy_callback(data: Dict, *args):
    if data.get("dummy") is None:
        data["dummy"] = []
    data["dummy"].append(len(data["dummy"]))
    
genome_metric_callbacks = [
    dummy_callback,
]
kwargs = {
    "num_generations": 2,
    "num_parents": 5,
    "num_children": 5,
    "tournament_size": 4,
    "num_tournament_winners": 2,
    "individual_constructor": individual_constructor,
    # Construction kwargs
    "fitness_eval": fitness_eval,
    "mutations": mutations,
    "crossover": Individual.neuron_crossover,
    "genome_metric_callbacks": genome_metric_callbacks,
}

metrics = []
for _ in range(num_runs):
    metrics.append(evolutionary_algorithm(**kwargs))

[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - categorical_accuracy: 0.1222 - loss: 2.3061
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - categorical_accuracy: 0.1474 - loss: 2.2946
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - categorical_accuracy: 0.1317 - loss: 2.3073
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - categorical_accuracy: 0.1134 - loss: 2.3011
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - categorical_accuracy: 0.0713 - loss: 2.3242
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - categorical_accuracy: 0.1028 - loss: 2.3164
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - categorical_accuracy: 0.0803 - loss: 2.3151
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - categorical_accuracy: 0.0820 - loss: 2.3052
[1m313/313[0m [32m━━━━━━━━━━━

In [82]:
metrics

[{'best_solution_fitness': array([0.1468, 0.1468]), 'dummy': [0, 1]},
 {'best_solution_fitness': array([0.14399999, 0.14399999]), 'dummy': [0, 1]}]