In [7]:
import tensorflow as tf
import keras
import random

In [3]:
class GeneticLocalSearch:
    """A genetic local search metaheuristic.
    
    Genetic local search generates a population of randomly seeded MLPs
    and trains them on given data. It then selects the models with the
    lowest loss to serve as 'parents' for the next generation of models.
    The 'child' models are generated by randomly choosing between the 
    weights of pairs of parent models and mutating a small number of 
    weights. This process of repeated until the specified number of 
    generations is reached. The model with the lowest loss is returned.
    
    Parameters
    ----------
    generations: int (default=100)
        The number of times the metaheuristic is applied.
    
    pop_size: int (default=100)
        The number of models in eac generation.
        
    tournament_size: int (default=5)
        The number of models to compare when choosing each parent.
        
    crossover_type: str (default='uniform')
        How weights are chosen when generating child models.
        
        - If 'uniform', each weight is independently selected from
        between the two parents.
        - If '2-point', all weights between two randomly selected
        indices are chosen from one parent, and all weights outside
        those indices are chosen from the other.
        
    mutation_rate: float (default=0.05)
        The percent chance that a weight will be mutated.
        
    mutation_sd: float (default=0.05)
        The standard deviation of the normal distribution from which
        the new weight is selected. The distribution is centered on the 
        old weight value. The default value was chosen to be the same as
        the value Keras uses for its Gaussian initializers.
        
    nodes: int (default=64)
        The number of nodes in the hidden layer of each model.
        
    optimizer: str (default='sgd')
        The optimizer used when training each model. Passed to Keras' 
        model.compile(). See Keras' documentation on optimizers.
        
    loss: str (default='mean_squared_error')
        The loss used when training each model. Passed to Keras' 
        model.compile().See Keras' documentation on losses.
        
    metrics: list (default=['mean_squared_error'])
        The mertic used when training each model. Passed to Keras' 
        model.compile(). See Keras' documentation on metrics.
        
    epochs: int (default=10)
        The number of epochs to train each model. Passed to Keras'
        model.fit().
        
    validation_split: float (default=0.2):
        The proportion of training data to be used to  evaluate each
        model. Passed to Keras' model.fit().
    """
    
    def __init__(generations=100, pop_size=100, tournament_size=5,
                 crossover_type='uniform', mutation_rate=0.05, mutation_sd=0.05,
                 nodes=64, optimizer='sgd', loss='mean_squared_error', 
                 metrics=['mean_squared_error'], epochs=10, validation_split=0.2):
        self.generations = generations
        self.pop_size = pop_size
        self.tournament_size = tournament_size
        self.crossover_type = crossover_type
        self.mutation_rate = mutation_rate
        self.mutation_sd = mutation_sd
        self.nodes = nodes
        self.optimizer = optimizer
        self.loss = loss
        self.metrics = metrics
        self.epochs = epochs
        self.validation_split = validation_split
        
    def _build_model(self, num_inputs, num_outputs):
        """Return a MLP model."""
        model = keras.Sequential([
        keras.layers.Dense(self.nodes, 
                           activation=tf.nn.relu,
                           input_shape=(num_inputs,)),
        keras.layers.Dense(num_outputs)
      ])
        model.compile(loss=self.loss,
                      optimizer=self.optimizer,
                      metrics=self.metrics)
        return model

    def _initialize_population(self, num_inputs, num_outputs):
        """Return the initial population of untrained models."""
        population = set([])
        for i in range(self.pop_size):
            model = self._build_model(num_inputs, num_outputs)
            population.add(model)
        return population
            
    def _local_search(self, population, data, target):
        """Train each model in the population and return the trained models."""
        trained_population = set([])
        while len(population) > 0:
            model = population.pop()
            history = model.fit(data, 
                                target, 
                                epochs=self.epochs, 
                                validation_split=self.validation_split, 
                                verbose=1, 
                                callbacks=[])
            trained_population.add((history.history['val_loss'], model))
        return trained_population

    def _select_parents(self, population):
        """Perform tournament selection and return the winners."""
        parents = set([])
        while len(population) > self.tournament_size:
            chosen = random.sample(population, self.tournament_size)
            parents.add(min(chosen))
            population = set([m for m in population if not m in chosen])
        if len(population) > 0:
            parents.add(min(population))
        return parents

    def _make_children(self, parents, num_inputs, num_outputs):
        """Perform crossover and mutation and return the next generation."""
        children = set([])
        while len(children) < pop_size:
            chosen = random.sample(parents,2)
            child_weights = []
            for arr_0, arr_1 in zip(chosen[0].trainable_weights, 
                                    chosen[1].trainable_weights):
                child_arr = np.zeros(arr_0.shape)
                for ((x,y), val_0), (_, val_1) in zip(np.ndenumerate(arr_0), 
                                                      np.ndenumerate(arr_1)):
                    if self.crossover_type=='uniform':
                        child_arr[x][y] = random.sample([val_0, val_1],1)
                    elif self.crossover_type=='2-point':
                        x_range = sorted(random.sample(range(child_arr.shape[0]),2))
                        y_range = sorted(random.sample(range(child_arr.shape[1]),2))
                        if ((x < x_range[0] and y < y_range[0]) or 
                            (x > x_range[1] and y > y_range[1])):
                            child_arr[x][y] = val_0
                        else:
                            child_arr[x][y] = val_1
                    if random.random() < self.mutation_rate:
                        child_arr[x][y] = random.normalvariate(mu=child_arr[x][y], 
                                                               sigma=self.mutation_sd)
                child_weights.append(child_arr)
            model = self._build_model(num_inputs, num_outputs)
            # idk if get_weights() returns the same object as trainable_weights; test
            model.set_weights(child_weights) 
            children.add(model)
            return children            

    def fit(self, data, target, num_outputs=1):
        """Perform genetic local search and return the best model.
        
        Parameters
        ----------
        data: vector, matrix, or array of training data. 
            Passed to Keras' model.fit()
        
        target: vector, matrix, or array of target data. 
            Passed to Keras' model.fit()
        
        num_outputs: int (default=1)
            The size the each model's output layer. If performing a 
            classification task, this value corresponds to the number of 
            classes.
        """
        
        num_inputs = data.shape[1]
        population = self._initialize_population(num_inputs, num_outputs)
        trained_population = self._local_search(population, data, target)
        for _ in range(self.generations):
            parents = self._select_parents(trained_population)
            population = self._make_children(parents)
            trained_population = self._local_search(population, data, target)
        return min(trained_population)


In [8]:
# next: 
# write test cases
# adaptive algos
# make nn params adjustable
# write ga to determine best nn params 
# keep track of population losses for plotting
# make it easier to alter nn for regression vs. classification problems, 
#     e.g. adding softmax activation for output
# add more callbacks
# error handling
# determine a good value for mutation_sd
# determine if mutating biases makes sense
# add early stopping