# Gene Expression Programming

## Imports
External libraries used:
* AnyTree (packaged locally, no installation required)
* MatPlotLib
* Numpy

In [1]:
from copy import deepcopy
from lib.anytree.node import Node
from lib.anytree.render import RenderTree
import matplotlib.pyplot as plt
import numpy as np
from random import random, randint, shuffle
from warnings import warn
%matplotlib inline

## Chromosome Class - Container for a list of genes (strings)
Contains methods to...
* Build an expression tree from a gene
* Evaluate an expression tree given fitness cases and terminal values
* Display the chromosome's expression tree to the console
* Link chromosomes
* Calculate fitness (several ways)
* Generate random genes/chromosomes

In [2]:
class Chromosome:

    # Functions and Terminals are shared by all chromosomes
    functions = dict()
    terminals = list()
    constants = dict()
    ephemeral_random_constants_range = (-1, 1)
    linking_function = None

    # length of head of chromosome
    num_genes = 3
    head_length = 6
    length = 39

    # list of real-valued tuples of the form (x, f(x))
    fitness_cases = []
    max_fitness = None


    def __init__(self, genes: list):

        # do not let chromosomes be defined without first defining their functions, terminals, and head length
        if not Chromosome.functions:
            raise ValueError("Chromosome class has no functions associated with it.")
        if len(Chromosome.terminals) == 0:
            raise ValueError("Chromosome class has no terminals associated with it.")
        if Chromosome.length is None:
            raise ValueError("Chromosome class has no length defined.")
        if Chromosome.head_length is None:
            raise ValueError("Chromosome class has no head length defined.")
        if Chromosome.linking_function is None and len(genes) > 1:
            raise ValueError("Multigenic chromosome defined with no linking function.")
        if len(genes) != Chromosome.num_genes:
            raise ValueError("Number of genes does not match excpected value in class level variable.")
        if "?" in Chromosome.terminals and Chromosome.ephemeral_random_constants_range is None:
            raise ValueError("Must define ephemeral random constants range if using ephemeral random constants.")

        # initialize chromosomes
        self.genes = genes
        self.trees = []
        self._values_ = {}
        self._fitness_ = None
        self.ephemeral_random_constants = list(np.random.uniform(*Chromosome.ephemeral_random_constants_range, size=Chromosome.length))


    # TODO - put informative error message when terminal_values doesn't have enough entries
    def evaluate(self, terminal_values: dict) -> float:
        """
        Returns the result of evaluating the given chromosome for specified fitness cases.

        :param terminal_values: dictionary mapping all present terminal symbols to real values
        :return: real valued result of evaluating the chromosome
        """

        # memoize value in case the chromosome was already evaluated
        value_fingerprint = tuple(sorted(terminal_values.items()))
        if value_fingerprint in self._values_:
            return self._values_[value_fingerprint]

        # build expression trees for each gene if not already built
        if len(self.trees) == 0:
            self.trees = [Chromosome.build_tree(gene) for gene in self.genes]

        # link expression trees if the chromosome is multigenic, otherwise use first tree
        if self.num_genes > 1:
            expression_tree = Chromosome.link(*self.trees)
        else:
            expression_tree = self.trees[0]

        erc_index = 0

        # recursive inorder tree traversal
        def inorder(start: Node) -> float:
            nonlocal terminal_values, erc_index
            if start.name in Chromosome.terminals:
                if start.name == "?":
                    erc_index += 1
                    return self.ephemeral_random_constants[erc_index - 1]
                if start.name in Chromosome.constants:
                    return Chromosome.constants[start.name]
                return int(start.name) if start.name.isdigit() else terminal_values[start.name]
            if start.name in Chromosome.functions:
                return Chromosome.functions[start.name]["f"](*[inorder(node) for node in start.children])

        try:
            self._values_[value_fingerprint] = inorder(expression_tree)
            if isinstance(self._values_[value_fingerprint], np.complex):
                raise TypeError
        # ZeroDivisionError if tree does something like x/(y-y), TypeError if the takes square root of a negative.
        except (ZeroDivisionError, TypeError):
            self._values_[value_fingerprint] = np.nan

        # noinspection PyTypeChecker
        return self._values_[value_fingerprint]


    def fitness(self) -> float:
        """
        Getter for fitness property to make sure we aren't grabbing uncalculated fitnesses

        :return: fitness of chromosome, or raise a warning and return 0 if fitness hasn't been calculated
        """
        if self._fitness_ is not None:
            return self._fitness_
        warn("Fitness of chromosome has not been properly calculated. Returning 0.")
        return 0


    def print_tree(self) -> None:
        """
        Use AnyTree to display a Chromosome's expression tree(s)

        :return: void
        """
        for t in range(len(self.trees)):
            print("Tree %d" % t)
            for pre, _, node in RenderTree(self.trees[t]):
                print("\t%s%s" % (pre, node.name))
        print(self.ephemeral_random_constants)

    def plot_solution(self, objective_function, x_min: float, x_max: float,
                      avg_fitnesses: list, best_fitnesses: list, variable_name: str) -> None:

        """
        Mostly unused, handy for plotting symbolic regression results though

        :param objective_function: ground truth function to plot
        :param x_min: minimum x value of plot
        :param x_max: maximum x value of plot
        :param avg_fitnesses: list of average fitness values by generation
        :param best_fitnesses: list of best fitness values by generation
        :return: void
        """

        if objective_function is not None:
            # set up subplots
            plt.subplots(1, 2, figsize=(16, 8))

            # Objective function vs Discovered function plot
            xs = np.linspace(x_min, x_max, 100)
            plt.subplot(1, 2, 1)
            plt.title("Discovered function vs. Objective function")
            plt.plot(xs, [objective_function(x) for x in xs],
                     linewidth=2, linestyle='dashed', color='black', label="Objective")
            plt.plot(xs, [self.evaluate({variable_name: x}) for x in xs],
                     linewidth=2, color='blue', label="Discovered")
            plt.legend(loc="upper left")

            # Fitness over time plot
            plt.subplot(1, 2, 2)

            plt.title("Fitness by Generation")
            plt.plot(range(len(avg_fitnesses)), avg_fitnesses, label="Average")
            plt.plot(range(len(best_fitnesses)), best_fitnesses, label="Best")
            plt.legend(loc="upper left")
            plt.show()

        else:
            plt.subplots(1, 1, figsize=(8, 8))
            plt.title("Fitness by Generation")
            plt.plot(range(len(avg_fitnesses)), avg_fitnesses, label="Average")
            plt.plot(range(len(best_fitnesses)), best_fitnesses, label="Best")
            plt.legend(loc="upper left")
            plt.show()


    @staticmethod
    def build_tree(gene: str) -> Node:
        """
        Constructs an expression tree from a gene.

        :param gene: gene to turn into expression tree
        :return: anytree Node of the root of the tree
        """

        # shortcut to get the number of arguments to a function
        def args(f: str) -> int:
            return Chromosome.functions[f]["args"] if f in Chromosome.functions else 0

        # recursively build chromosome tree
        def grab_children(parent: Node, current_level = 1):
            nonlocal levels
            if current_level < len(levels):
                nargs = args(parent.name)
                for i in range(nargs):
                    current_node = Node(levels[current_level][i], parent=parent)
                    grab_children(parent=current_node, current_level=current_level + 1)
                    if current_level < len(levels) - 1:
                        levels[current_level + 1] = levels[current_level + 1][args(current_node.name):]

        # build each level of the tree
        levels = [gene[0]]
        index = 0
        while index < len(gene) and sum([args(f) for f in levels[-1]]) != 0:
            nargs = sum([args(f) for f in levels[-1]])
            levels.append(gene[index + 1: index + 1 + nargs])
            index += nargs

        # intialize tree and parse
        tree = Node(gene[0])
        grab_children(tree)
        return tree


    @staticmethod
    # TODO - verify recursive linking with non-commutative linking functions (e.g. -)
    def link(*args) -> Node:
        """
        Links two trees at their roots using the specified linking function.
        Linking function must take as many arguments as number of args provided.

        :param args: expression trees to link. Must be at least as many expression trees as linking function has arguments.
        :return: expression tree with tree1 and tree2 as subtrees
        """

        if Chromosome.linking_function not in Chromosome.functions:
            raise ValueError("Linking function is not defined in Chromosome.functions.")
        if not all([isinstance(arg, Node) for arg in args]):
            raise TypeError("Can only link expression trees.")

        nargs = Chromosome.functions[Chromosome.linking_function]["args"]

        def link_recursive(*args) -> Node:
            root = Node(Chromosome.linking_function)
            if len(args) == nargs:
                for tree in args:
                    tree.parent = root
                return root
            else:
                return link_recursive(link_recursive(*args[:nargs]), *args[nargs:])

        return link_recursive(*args)


    @staticmethod
    # TODO - calculate using numpy arrays for speed
    def absolute_fitness(M: float, *args) -> np.ndarray:
        """
        Calculate absolute fitness of an arbitrary number of Chromosomes.

        :param M: range of fitness function over domain
        :param args: any number of gene objects
        :return: list of fitnesses of corresponding chromosomes
        """
        fitnesses = []
        for chromosome in args:
            # memoize fitness values
            if chromosome._fitness_ is not None:
                fitnesses.append(chromosome._fitness_)
            else:
                fitness = 0
                for j in range(len(Chromosome.fitness_cases)):
                    C_ij = chromosome.evaluate(Chromosome.fitness_cases[j][0])
                    # assign any chromosome that divides by zero a fitness value of zero
                    if type(C_ij) == np.complex or np.isnan(C_ij) or np.isinf(C_ij) or np.isneginf(C_ij):
                        fitness = 0
                        break
                    T_j = Chromosome.fitness_cases[j][1]
                    fitness += M - abs(C_ij - T_j)
                chromosome._fitness_ = fitness
                fitnesses.append(fitness)
        return np.asarray(fitnesses)


    @staticmethod
    # TODO - calculate using numpy arrays for speed
    def relative_fitness(M: float, *args) -> np.ndarray:
        """
        Calculate relative fitness of an arbitrary number of genes.

        :param M: range of fitness function over domain
        :param args: any number of gene objects
        :return: list of fitnesses of corresponding genes
        """
        fitnesses = []
        for chromosome in args:
            # memoize fitness values
            if chromosome._fitness_ is not None:
                fitnesses.append(chromosome._fitness_)
            else:
                fitness = 0
                for j in range(len(Chromosome.fitness_cases)):
                    C_ij = chromosome.evaluate(Chromosome.fitness_cases[j][0])
                    T_j = Chromosome.fitness_cases[j][1]
                    fitness += M - 100*abs(C_ij / T_j - 1)
                chromosome._fitness_ = fitness
                fitnesses.append(fitness)
        return np.asarray(fitnesses)


    @staticmethod
    def inv_squared_error(*args) -> np.ndarray:
        """
        Classical 1/(1+(squared error) fitness value.

        :param args: list of chromosomes to calculate fitness of
        :return: ndarray of fitness values for each given chromosome
        """
        fitnesses = []
        for chromosome in args:
            # memoize fitness values
            if chromosome._fitness_ is not None:
                fitnesses.append(chromosome._fitness_)
            else:
                fitness = 0
                for j in range(len(Chromosome.fitness_cases)):
                    C_ij = chromosome.evaluate(Chromosome.fitness_cases[j][0])
                    if type(C_ij) == np.complex or np.isnan(C_ij) or np.isinf(C_ij) or np.isneginf(C_ij):
                        fitness = np.inf
                        break
                    T_j = Chromosome.fitness_cases[j][1]
                    fitness += (C_ij - T_j)**2
                chromosome._fitness_ = 1.0/(1+fitness)
                fitnesses.append(chromosome._fitness_)
        return np.asarray(fitnesses)


    @staticmethod
    def centralized_inv_squared_error(center: float, dimension: str, *args) -> np.ndarray:
        """
        Mostly unused, a fitness function that focuses on error near a given point.

        :param center: point around which the fitness values are more important
        :param dimension: which dimension of the fitness case to use, in the case of multivariate functions
        :param args: any chromosomes to calculate fitness of
        :return: ndarray of fitness values
        """
        fitnesses = []
        for chromosome in args:
            # memoize fitness values
            if chromosome._fitness_ is not None:
                fitnesses.append(chromosome._fitness_)
            else:
                fitness = 0
                for j in range(len(Chromosome.fitness_cases)):
                    C_ij = chromosome.evaluate(Chromosome.fitness_cases[j][0])
                    if type(C_ij) == np.complex or np.isnan(C_ij) or np.isinf(C_ij) or np.isneginf(C_ij):
                        fitness = np.inf
                        break
                    T_j = Chromosome.fitness_cases[j][1]
                    fitness += abs(C_ij - T_j)**(1/abs(Chromosome.fitness_cases[j][0][dimension] - center))
                chromosome._fitness_ = 1.0 / (1 + fitness)
                fitnesses.append(chromosome._fitness_)
        return np.asarray(fitnesses)


    @staticmethod
    def generate_random_gene() -> str:
        """
        Generates one random gene based on settings specified in Chromosome class.
        :return: string of valid characters
        """
        possible_chars = list(Chromosome.functions.keys()) + Chromosome.terminals
        head = "".join([possible_chars[randint(0, len(possible_chars) - 1)] for _ in range(Chromosome.head_length)])
        tail = "".join([Chromosome.terminals[randint(0, len(Chromosome.terminals) - 1)] for _ in range(Chromosome.length - Chromosome.head_length)])
        return head + tail


    @staticmethod
    def generate_random_individual() -> 'Chromosome':
        """
        Generates one random individual based on settings specified in Chromosome class.
        :return: new Chromosome
        """
        return Chromosome([Chromosome.generate_random_gene() for _ in range(Chromosome.num_genes)])


## GeneExpressionProgram Class - Container for Program Execution
Why do you have a class where every method is static?

Because I don't like polluting the global namespace, and I like grouping variables by their function.

Main driver for Gene Expression Programming. Contains methods for:
* Evolving a configured program
* Roulette wheel selection
* Mutation
* All kinds of recombination
* Plotting the results of evolution

In [3]:
class GeneExpressionProgram:

    ### Hyperparameters ###
    NUM_RUNS = 5
    NUM_GENERATIONS = 500
    POPULATION_SIZE = 100
    NUM_FITNESS_CASES = 10
    ERROR_TOLERANCE = 0.0000001

    ### Reproduction ###
    MUTATION_RATE = 0.051
    ONE_POINT_CROSSOVER_RATE, TWO_POINT_CROSSOVER_RATE, GENE_CROSSOVER_RATE = 0.2, 0.5, 0.1
    IS_TRANSPOSITION_RATE, IS_ELEMENTS_LENGTH = 0.1, [1, 2, 3]
    RIS_TRANSPOSITION_RATE, RIS_ELEMENTS_LENGTH = 0.1, [1, 2, 3]
    GENE_TRANSPOSITION_RATE = 0.1

    ### Fitness Evaluation ###
    OBJECTIVE_FUNCTION = None
    FITNESS_FUNCTION = None
    FITNESS_FUNCTION_ARGS = list()
    OBJECTIVE_MIN, OBJECTIVE_MAX = None, None
    FUNCTION_Y_RANGE = None


    def __init__(self):
        pass


    @staticmethod
    def evolve() -> (Chromosome, list, list):
        """
        Execute Gene Expression Programming algorithm

        :return: tuple of:
            the best fit chromosome,
            list of average fitness by generation (for plotting),
            list of best fitness by generation (for plotting
        """

        # create initial population
        population = [Chromosome.generate_random_individual() for _ in range(GeneExpressionProgram.POPULATION_SIZE)]

        generation = 0
        best_fit_individual = None
        average_fitness_by_generation = []
        best_fitness_by_generation = []
        while generation < GeneExpressionProgram.NUM_GENERATIONS:

            ### EVALUATION ###

            # calcluate fitnesses for population
            population_fitnesses = GeneExpressionProgram.FITNESS_FUNCTION(*GeneExpressionProgram.FITNESS_FUNCTION_ARGS, *population)

            # find best fit individual
            # noinspection PyTypeChecker
            best_fit_generation = population[np.argmax(population_fitnesses)]
            if generation == 0 or best_fit_individual.fitness() < best_fit_generation.fitness():
                best_fit_individual = deepcopy(best_fit_generation)

            # skip rest of loop if we have found optimal solution
            if abs(best_fit_individual.fitness() - Chromosome.max_fitness) <= GeneExpressionProgram.ERROR_TOLERANCE:
                average_fitness_generation = float(np.mean(population_fitnesses))
                average_fitness_by_generation.append(average_fitness_generation)
                best_fitness_by_generation.append(best_fit_individual.fitness())
                break

            next_generation = list()


            ### SELECTION (roulette wheel with simple elitism) ###

            # copy best individual to next generation
            next_generation.append(deepcopy(best_fit_individual))

            # select the rest of the next generation with roulette wheel selection
            all_parents = GeneExpressionProgram.roulette_wheel_selection(population, len(population))

            # Mutation
            all_parents = list(map(GeneExpressionProgram.mutate, all_parents))

            # IS Transposition
            all_parents = list(map(GeneExpressionProgram.is_transposition, all_parents))

            # RIS Transposition
            all_parents = list(map(GeneExpressionProgram.ris_transposition, all_parents))

            # Gene Transposition
            all_parents = list(map(GeneExpressionProgram.gene_transposition, all_parents))

            # Recombination
            shuffle(all_parents)
            for i in range(1, GeneExpressionProgram.POPULATION_SIZE, 2):

                # in case we don't have a pair to check for crossover, avoid index error
                if i + 1 >= GeneExpressionProgram.POPULATION_SIZE:
                    next_generation.append(all_parents[i])
                    break

                child1, child2 = all_parents[i], all_parents[i+1]

                # One-point Recombination
                if random() < GeneExpressionProgram.ONE_POINT_CROSSOVER_RATE:
                    child1, child2 = GeneExpressionProgram.one_point_recombination(child1, child2)

                # Two-point Recombination
                elif random() < GeneExpressionProgram.TWO_POINT_CROSSOVER_RATE:
                    child1, child2 = GeneExpressionProgram.two_point_recombination(child1, child2)

                # Gene Recombination
                elif random() < GeneExpressionProgram.GENE_CROSSOVER_RATE:
                    child1, child2 = GeneExpressionProgram.gene_recombination(child1, child2)

                # Include children in next generation
                next_generation.append(child1)
                next_generation.append(child2)

            # prepare for next iteration
            population = next_generation
            generation += 1

            average_fitness_generation = float(np.mean(population_fitnesses))
            average_fitness_by_generation.append(average_fitness_generation)
            best_fitness_by_generation.append(best_fit_individual.fitness())
            print("Generation: %d\tPopulation Size: %d\tAverage Fitness: %.5f\tBest Fitness (overall): %.5f" %
                  (generation, len(population), average_fitness_generation, best_fit_individual.fitness()))

        return best_fit_individual, average_fitness_by_generation, best_fitness_by_generation


    @staticmethod
    def random_search(num_generations: int, fitness_function: callable, fitness_function_args: list) -> (Chromosome, list, list):
        best = None
        best_fitness = 0
        average_fitnesses = []
        best_fitnesses = []
        for gen in range(num_generations):
            generation_fitnesses = []
            for individual in range(GeneExpressionProgram.POPULATION_SIZE):
                current = Chromosome.generate_random_individual()
                current_fitness = fitness_function(*fitness_function_args, current)
                generation_fitnesses.append(current_fitness)
                if best is None or best_fitness <= current_fitness:
                    best = deepcopy(current)
                    best_fitness = best.fitness()
            average_fitnesses.append(np.mean(generation_fitnesses))
            best_fitnesses.append(best_fitness)
            if best_fitness >= Chromosome.max_fitness:
                break
        return best, average_fitnesses, best_fitnesses


    @staticmethod
    def roulette_wheel_selection(chromosomes: list, n: int) -> list:
        """
        Returns n randomly selected chromosomes using roulette wheel selection.
        Adapted from:
        https://stackoverflow.com/questions/2140787/select-k-random-elements-from-a-list-whose-elements-have-weights

        :param chromosomes: list of chromosomes to select from
        :param n: number of samples to retrieve
        :return: list of chosen chromosome(s)
        """
        total = float(sum(c.fitness() for c in chromosomes))
        i = 0
        fitness = chromosomes[0].fitness()
        while n:
            x = total * (1 - random() ** (1.0 / n))
            total -= x
            while x > fitness:
                x -= fitness
                i += 1
                fitness = chromosomes[i].fitness()
            fitness -= x
            yield chromosomes[i]
            n -= 1


    @staticmethod
    def mutate(chromosome: Chromosome) -> Chromosome:
        """
        Randomly mutate genes in a chromosome.

        :param chromosome: chromosome to mutate
        :return: mutated chromosome
        """

        head_characters = list(Chromosome.functions.keys()) + Chromosome.terminals

        new_genes = []
        # for each gene (multigenic chromosomes)
        for gene in range(Chromosome.num_genes):
            new_gene = ""
            # for each character in the gene
            for i in range(len(chromosome.genes[gene])):
                # if we are to mutate it
                if random() < GeneExpressionProgram.MUTATION_RATE:
                    # if we are mutating the head
                    if i < Chromosome.head_length:
                        new_gene += head_characters[randint(0, len(head_characters) - 1)]
                    else:
                        new_gene += Chromosome.terminals[randint(0, len(Chromosome.terminals) - 1)]
                else:
                    new_gene += chromosome.genes[gene][i]
            new_genes.append(new_gene)

        # create new chromosome to ensure memoized fitness values are recalculated.
        new_chromosome = Chromosome(new_genes)
        new_chromosome.ephemeral_random_constants = chromosome.ephemeral_random_constants

        # mutate ephemeral random constants
        for constant in range(len(new_chromosome.ephemeral_random_constants)):
            if random() < GeneExpressionProgram.MUTATION_RATE:
                new_chromosome.ephemeral_random_constants[constant] = np.random.uniform(*Chromosome.ephemeral_random_constants_range)

        return new_chromosome


    @staticmethod
    def is_transposition(chromosome: Chromosome) -> Chromosome:
        """
        Insertion Sequence transposition.

        :param chromosome: chromosome to perform IS transposition on
        :return: new chromosome
        """

        # TODO - properly transpose ephemeral random constants

        if random() < GeneExpressionProgram.IS_TRANSPOSITION_RATE:

            # determine parameters of transposition
            length = np.random.choice(GeneExpressionProgram.IS_ELEMENTS_LENGTH)
            source_gene = randint(0, len(chromosome.genes) - 1)
            target_gene = randint(0, len(chromosome.genes) - 1)
            target_position = randint(1, Chromosome.head_length - length)
            sequence_start = randint(0, len(chromosome.genes[source_gene]))

            transposition_string = chromosome.genes[source_gene][sequence_start:min(Chromosome.length, sequence_start + length)]

            # make substitution
            new_chromosome = Chromosome(chromosome.genes)
            new_chromosome.ephemeral_random_constants = chromosome.ephemeral_random_constants
            new_chromosome.genes[target_gene] = new_chromosome.genes[target_gene][:target_position] + \
                                                transposition_string + \
                                                new_chromosome.genes[target_gene][target_position + length:]

            return new_chromosome

        else:

            return chromosome


    @staticmethod
    def ris_transposition(chromosome: Chromosome) -> Chromosome:
        """
        Root Insertion Sequence transposition.

        :param chromosome: chromosome to perform RIS transposition on
        :return: new chromosome
        """

        # TODO - properly transpose ephemeral random constants

        start_point = randint(0, Chromosome.head_length - 1)
        gene = randint(0, Chromosome.num_genes - 1)
        while start_point < Chromosome.head_length and chromosome.genes[gene][start_point] not in Chromosome.functions:
            start_point += 1

        if random() < GeneExpressionProgram.RIS_TRANSPOSITION_RATE and chromosome.genes[gene][start_point] in Chromosome.functions:
            ris_length = np.random.choice(GeneExpressionProgram.RIS_ELEMENTS_LENGTH)
            ris_string = chromosome.genes[gene][start_point:start_point+ris_length]

            new_chromosome = Chromosome(chromosome.genes)
            new_chromosome.ephemeral_random_constants = chromosome.ephemeral_random_constants
            old_head = new_chromosome.genes[gene][:Chromosome.head_length]
            new_head = old_head[:start_point] + ris_string + old_head[start_point:]
            new_chromosome.genes[gene] = new_head[:Chromosome.head_length] + new_chromosome.genes[gene][Chromosome.head_length:]

            return new_chromosome

        else:

            return chromosome


    @staticmethod
    def gene_transposition(chromosome: Chromosome) -> Chromosome:
        """
        Gene Insertion Sequence transposition.

        :param chromosome: chromosome to perform gene transposition on
        :return: new chromosome
        """

        # TODO - properly transpose ephemeral random constants

        if Chromosome.num_genes > 1 and random() < GeneExpressionProgram.GENE_TRANSPOSITION_RATE:

            index = randint(0, Chromosome.num_genes - 1)
            temp = chromosome.genes[index]
            chromosome.genes[index] = chromosome.genes[0]
            chromosome.genes[0] = temp
            new_chromosome = Chromosome(chromosome.genes)
            new_chromosome.ephemeral_random_constants = chromosome.ephemeral_random_constants
            return new_chromosome

        else:

            return chromosome


    @staticmethod
    def one_point_recombination(chromosome1: Chromosome, chromosome2: Chromosome) -> (Chromosome, Chromosome):
        """
        Classical one point recombination.

        :param chromosome1: parent 1
        :param chromosome2: parent 2
        :return: offspring 1, offspring 2
        """
        gene = randint(0, Chromosome.num_genes - 1)
        position = randint(0, Chromosome.length)

        child1_split_gene = chromosome1.genes[gene][:position] + chromosome2.genes[gene][position:]
        child2_split_gene = chromosome2.genes[gene][:position] + chromosome1.genes[gene][position:]

        child1_genes = chromosome1.genes[:gene] + [child1_split_gene] + (chromosome2.genes[gene+1:] if gene < Chromosome.num_genes - 1 else [])
        child2_genes = chromosome2.genes[:gene] + [child2_split_gene] + (chromosome1.genes[gene + 1:] if gene < Chromosome.num_genes - 1 else [])

        child1, child2 = Chromosome(child1_genes), Chromosome(child2_genes)

        constants_split_position = randint(0, Chromosome.length - 1)
        child1.ephemeral_random_constants = chromosome1.ephemeral_random_constants[:constants_split_position] + \
                                            chromosome2.ephemeral_random_constants[constants_split_position:]

        child2.ephemeral_random_constants = chromosome2.ephemeral_random_constants[:constants_split_position] + \
                                            chromosome1.ephemeral_random_constants[constants_split_position:]

        return child1, child2


    @staticmethod
    def two_point_recombination(chromosome1: Chromosome, chromosome2: Chromosome) -> (Chromosome, Chromosome):
        """
        Classical two point recombination.

        :param chromosome1: parent 1
        :param chromosome2: parent 2
        :return: offspring 1, offsprint 2
        """

        # generate crossover points
        position1, position2 = sorted([randint(0, Chromosome.length*Chromosome.num_genes - 1), randint(0, Chromosome.length*Chromosome.num_genes - 1)])

        # join genes into single string for ease of manipulation
        child1_genes_str = "".join(chromosome1.genes)
        child2_genes_str = "".join(chromosome2.genes)

        # perform crossover
        child1_genes = child1_genes_str[:position1] + child2_genes_str[position1:position2] + child1_genes_str[position2:]
        child2_genes = child2_genes_str[:position1] + child1_genes_str[position1:position2] + child2_genes_str[position2:]

        # split genes from string into list
        child1_genes = [child1_genes[i:i + Chromosome.length] for i in range(0, Chromosome.num_genes * Chromosome.length, Chromosome.length)]
        child2_genes = [child2_genes[i:i + Chromosome.length] for i in range(0, Chromosome.num_genes * Chromosome.length, Chromosome.length)]

        child1, child2 = Chromosome(child1_genes), Chromosome(child2_genes)
        split_positions = sorted([randint(0, Chromosome.length - 1), randint(0, Chromosome.length - 1)])

        child1.ephemeral_random_constants = chromosome1.ephemeral_random_constants[:split_positions[0]] + \
                                            chromosome2.ephemeral_random_constants[split_positions[0]:split_positions[1]] + \
                                            chromosome1.ephemeral_random_constants[split_positions[1]:]
        child2.ephemeral_random_constants = chromosome2.ephemeral_random_constants[:split_positions[0]] + \
                                            chromosome1.ephemeral_random_constants[split_positions[0]:split_positions[1]] + \
                                            chromosome2.ephemeral_random_constants[split_positions[1]:]
        return child1, child2


    @staticmethod
    def gene_recombination(chromosome1: Chromosome, chromosome2: Chromosome) -> (Chromosome, Chromosome):
        """
        Two point recombination that occurs along gene boundaries for multigenic chromosomes.

        :param chromosome1: parent 1
        :param chromosome2: parent 2
        :return: offspring 1, offspring 2
        """

        # choose gene to swap
        gene = randint(0, Chromosome.num_genes - 1)

        # initialize children genes
        child1_genes = chromosome1.genes
        child2_genes = chromosome2.genes

        # perform swap
        child1_genes[gene] = chromosome2.genes[gene]
        child2_genes[gene] = chromosome1.genes[gene]

        return Chromosome(child1_genes), Chromosome(child2_genes)


    @staticmethod
    def plot_reps(avg_fitnesses: list, best_fitnesses: list, random_search_avg: list = None, random_search_best: list = None) -> None:
        """
        Plot all reps with global best solutions.

        :param avg_fitnesses: list of lists containing average fitnesses by generation for each rep
        :param best_fitnesses: same as avg_fitnesses but best fitnesses
        :param random_search_avg: average fitness value for random search across generations
        :param random_search_best: best fitness value for random search by generation
        :return: void
        """

        is_random_search = not (random_search_avg is None or random_search_best is None)

        plt.subplots(1, 2, figsize=(16, 8))

        plt.subplot(1, 2, 1)
        plt.title("Average Fitness by Generation")
        plt.xlabel("Generation")
        plt.ylabel("Average Fitness")

        # plot each rep
        for rep in range(GeneExpressionProgram.NUM_RUNS):
            plt.plot(range(len(avg_fitnesses[rep])), avg_fitnesses[rep], label="Rep %d Average" % (rep + 1))

        if is_random_search:
            plt.plot(range(len(random_search_avg)), random_search_avg, label="Random Search Average")
            #plt.plot(range(len(random_search_best)), random_search_best, label="Random Search Best")

        plt.legend(loc="upper left")

        plt.subplot(1, 2, 2)
        plt.title("Best Fitness by Generation")
        plt.xlabel("Generation")
        plt.ylabel("Best Fitness")

        # plot each rep
        for rep in range(GeneExpressionProgram.NUM_RUNS):
            plt.plot(range(len(best_fitnesses[rep])), best_fitnesses[rep], label="Rep %d Best" % (rep + 1))

        if is_random_search:
            plt.plot(range(len(random_search_best)), random_search_best, label="Random Search Best")

        plt.legend(loc="upper left")
        plt.show()


## Symbolic Regression: $f(a) = a^4 + a^3 + a^2 + a$
Run this function to have GEP attempt to discover the function
$$ f(a) = a^4 + a^3 + a^2 + a $$
across 10 fitness cases. The fitness cases are of the form $\left(a, f(a)\right)$ for randomly generated $a$ between `OBJECTIVE_MIN` and `OBJECTIVE_MAX`. 

In [4]:
def a4_a3_a2_a1():

    # define objective function
    GeneExpressionProgram.OBJECTIVE_FUNCTION = staticmethod(lambda a: a ** 4 + a ** 3 + a ** 2 + a)
    GeneExpressionProgram.OBJECTIVE_MIN, GeneExpressionProgram.OBJECTIVE_MAX = 0, 20

    # Define terminals and functions
    Chromosome.terminals = ["a"]
    Chromosome.functions = {
        "+": {"args": 2, "f": lambda x, y: x + y},
        "-": {"args": 2, "f": lambda x, y: x - y},
        "*": {"args": 2, "f": lambda x, y: x * y},
        "/": {"args": 2, "f": lambda x, y: x / y}
    }
    Chromosome.fitness_cases = [
        ({"a": a}, GeneExpressionProgram.OBJECTIVE_FUNCTION(a))
        for a in np.random.rand(GeneExpressionProgram.NUM_FITNESS_CASES) * (
        GeneExpressionProgram.OBJECTIVE_MAX - GeneExpressionProgram.OBJECTIVE_MIN) + GeneExpressionProgram.OBJECTIVE_MIN
    ]

    GeneExpressionProgram.FUNCTION_Y_RANGE = \
        GeneExpressionProgram.OBJECTIVE_FUNCTION(GeneExpressionProgram.OBJECTIVE_MAX) - \
        GeneExpressionProgram.OBJECTIVE_FUNCTION(GeneExpressionProgram.OBJECTIVE_MIN)

    Chromosome.linking_function = "+"
    Chromosome.max_fitness = 1 #GeneExpressionProgram.FUNCTION_Y_RANGE * GeneExpressionProgram.NUM_FITNESS_CASES

    ans = GeneExpressionProgram.evolve()
    ans.print_tree()

## Symbolic Regression: $f(x) = \sin(x)$
Watch as GEP attempts to approximate $f(x) = sin(x)$ using only addition, subtraction, multiplication, division, the variable $x$, and the number 6! If you're lucky, it may come up with the second order Taylor approximation centered at $x=0$:
$$\sin(x) \approx x - \frac{x^3}{6}$$
However, it often finds the first order Taylor approximation, calls it good enough, and quits. The first order Taylor approximation is
$$\sin(x) \approx x$$
for $x \approx 0$.

In [5]:
def sinx_polynomial():
    # define objective function
    GeneExpressionProgram.OBJECTIVE_FUNCTION = staticmethod(lambda a: np.sin(a))
    GeneExpressionProgram.OBJECTIVE_MIN, GeneExpressionProgram.OBJECTIVE_MAX = -5, 5
    GeneExpressionProgram.FITNESS_FUNCTION = Chromosome.centralized_inv_squared_error
    GeneExpressionProgram.FITNESS_FUNCTION_ARGS = [0, "x"]
    GeneExpressionProgram.NUM_FITNESS_CASES = 10
    GeneExpressionProgram.NUM_GENERATIONS = 150
    GeneExpressionProgram.POPULATION_SIZE = 200
    GeneExpressionProgram.MUTATION_RATE = 0.05

    Chromosome.head_length = 8
    Chromosome.num_genes = 3
    Chromosome.length = 24

    # Define terminals and functions
    Chromosome.terminals = ["x", "6", "!"]
    Chromosome.constants = {"!": 120}
    Chromosome.functions = {
        "+": {"args": 2, "f": lambda x, y: x + y},
        "-": {"args": 2, "f": lambda x, y: x - y},
        "*": {"args": 2, "f": lambda x, y: x * y},
        "/": {"args": 2, "f": lambda x, y: x / y}
        #"^": {"args": 2, "f": lambda x, y: x ** y}
    }
    Chromosome.fitness_cases = [
        ({"x": x}, GeneExpressionProgram.OBJECTIVE_FUNCTION(x))
        for x in np.random.rand(GeneExpressionProgram.NUM_FITNESS_CASES) * (
        GeneExpressionProgram.OBJECTIVE_MAX - GeneExpressionProgram.OBJECTIVE_MIN) + GeneExpressionProgram.OBJECTIVE_MIN
    ]

    GeneExpressionProgram.FUNCTION_Y_RANGE = 2
    Chromosome.linking_function = "*"
    Chromosome.max_fitness = 1 #GeneExpressionProgram.FUNCTION_Y_RANGE * GeneExpressionProgram.NUM_FITNESS_CASES

    ans, average_fitnesses, best_fitnesses = GeneExpressionProgram.evolve()
    ans.print_tree()
    ans.plot_solution(GeneExpressionProgram.OBJECTIVE_FUNCTION,
                      GeneExpressionProgram.OBJECTIVE_MIN, GeneExpressionProgram.OBJECTIVE_MAX,
                      average_fitnesses, best_fitnesses)

## CartPole v1
GEP evolves a real-valued function which is positive when the cart should move to the right and negative when the cart should move to the left.

In [6]:
def cart_pole_real():

    import gym
    env = gym.make('CartPole-v1')

    def fitness(num_trials: int, render: bool = False, doPrint = False, *args) -> np.ndarray:
        fitnesses = []
        for chromosome_index in range(len(args)):
            chromosome = args[chromosome_index]
            total_reward = 0
            if doPrint:
                print("\tCalculating chromosome %d across %d trials." % (chromosome_index, num_trials), end="\t")
            for trial in range(num_trials):
                x, theta, dx, dtheta = env.reset()
                t = 0
                while True:
                    if render: env.render()
                    action = chromosome.evaluate({"x": x, "v": dx, "t": theta, "u": dtheta})
                    observation, reward, done, info = env.step(action > 0)
                    x, theta, dx, dtheta = observation
                    total_reward += reward
                    if done:
                        break
                    t += 1
            chromosome._fitness_ = total_reward / float(num_trials)
            fitnesses.append(total_reward / float(num_trials))
            if doPrint:
                print("Fitness: %.5f" % (total_reward / float(num_trials)))
        return np.asarray(fitnesses)

    Chromosome.functions = {
        "+": {"args": 2, "f": lambda x, y: x + y},
        "-": {"args": 2, "f": lambda x, y: x - y},
        "*": {"args": 2, "f": lambda x, y: x * y},
        "/": {"args": 2, "f": lambda x, y: x / y}
    }

    Chromosome.terminals = [
        "x",    # cart x-value
        "v",    # cart velocity
        "t",    # pole angle
        "u"     # rate of change of pole angle
    ]

    GeneExpressionProgram.FITNESS_FUNCTION = fitness
    GeneExpressionProgram.FITNESS_FUNCTION_ARGS = [10, False, False]
    GeneExpressionProgram.POPULATION_SIZE = 10
    GeneExpressionProgram.ERROR_TOLERANCE = 0
    GeneExpressionProgram.NUM_RUNS = 5
    GeneExpressionProgram.NUM_GENERATIONS = 15

    Chromosome.max_fitness = 500
    Chromosome.num_genes = 3
    Chromosome.length = 30
    Chromosome.head_length = 10
    Chromosome.linking_function = "+"

    average_fitnesses = []
    best_fitnesses = []

    for rep in range(GeneExpressionProgram.NUM_RUNS):
        ans, gen_average_fitnesses, gen_best_fitnesses = GeneExpressionProgram.evolve()
        average_fitnesses.append(gen_average_fitnesses)
        best_fitnesses.append(gen_best_fitnesses)
        ans.print_tree()
        print(ans.genes)

        if fitness(100, False, True, ans)[0] == Chromosome.max_fitness:
            fitness(5, True, False, ans)
        

    GeneExpressionProgram.plot_reps(average_fitnesses, best_fitnesses)

    # Very good genes (500 fitness over 100 attempts):
    # ['-u-uvx*txtxxutuutvtvtvttttutvu', '+*vxu/x/-/tutxtxxxuvuuxvtuvvtu', '+ut*vv*vtvxvxvuxvtxvtvuutvtttt']
    # ['t/v*/xtttvuxtvvttutxxtuvxuvttt', '-v*+v*+t+tuxtvvttuuvxvtuvuvxtt', 'u++tu+uvtvtxtvtvtuvxxttvxvxvv']
    # ['-t-*+u*u-*tuutxxuutxtxttxttvxu', '+-u**+-/vxvxxxvtxvxuxvxvvxxtvx', '+xv-xvx-xtuutxxtxxxtxxuttvxxut']

## Hard Symbolic Regression
GEP attempts to find the equation
$$ f(a) = 4.251a^2 + \ln(a^2) + 7.243e^a $$
using ephemeral random constants across 5000 generations and 5 reps. This often gets stuck in local optima, and hence does not always attain a large fitness value. The success of this algorithm depends greatly on the luck of the initial population and the probability of favorable mutations.

In [8]:
def hard_regression():
    # TODO - crossover and mutation of constants
    # define objective function
    GeneExpressionProgram.OBJECTIVE_FUNCTION = staticmethod(lambda a: 4.251*np.power(a, 2) + np.log(np.power(a, 2)) + 7.243*np.exp(a))
    GeneExpressionProgram.OBJECTIVE_MIN, GeneExpressionProgram.OBJECTIVE_MAX = -1, 1
    GeneExpressionProgram.NUM_FITNESS_CASES = 20
    GeneExpressionProgram.FITNESS_FUNCTION = Chromosome.inv_squared_error
    GeneExpressionProgram.NUM_GENERATIONS = 5000
    GeneExpressionProgram.NUM_RUNS = 5
    GeneExpressionProgram.MUTATION_RATE = 0.041

    # Define terminals and functions
    Chromosome.terminals = ["a", "?"]
    Chromosome.ephemeral_random_constants_range = (-1, 1)
    Chromosome.functions = {
        "+": {"args": 2, "f": lambda x, y: x + y},
        "-": {"args": 2, "f": lambda x, y: x - y},
        "*": {"args": 2, "f": lambda x, y: x * y},
        "/": {"args": 2, "f": lambda x, y: x / y if y != 0 else np.nan},
        "L": {"args": 1, "f": lambda x: np.log(x) if x > 0 else np.nan},
        "E": {"args": 1, "f": lambda x: np.exp(x)},
        #"K": {"args": 1, "f": lambda x: np.log10(x) if x > 0 else np.nan},
        #"~": {"args": 1, "f": lambda x: np.power(10, x)},
        #"S": {"args": 1, "f": lambda x: np.sin(x)},
        #"C": {"args": 1, "f": lambda x: np.cos(x)}
    }
    Chromosome.fitness_cases = [
        ({"a": a}, GeneExpressionProgram.OBJECTIVE_FUNCTION(a))
        for a in np.random.rand(GeneExpressionProgram.NUM_FITNESS_CASES) * (
        GeneExpressionProgram.OBJECTIVE_MAX - GeneExpressionProgram.OBJECTIVE_MIN) + GeneExpressionProgram.OBJECTIVE_MIN
    ]

    Chromosome.linking_function = "+"
    Chromosome.length = 60
    Chromosome.head_length = 20
    Chromosome.max_fitness = 1 #GeneExpressionProgram.FUNCTION_Y_RANGE * GeneExpressionProgram.NUM_FITNESS_CASES

    average_fitnesses = []
    best_fitnesses = []
    answers = []

    ans = None
    for rep in range(GeneExpressionProgram.NUM_RUNS):
        print("====================================== Rep %d ======================================" % rep)
        ans, gen_average_fitnesses, gen_best_fitnesses = GeneExpressionProgram.evolve()
        average_fitnesses.append(gen_average_fitnesses)
        best_fitnesses.append(gen_best_fitnesses)
        ans.print_tree()
        print(ans.genes)
        answers.append(deepcopy(ans))

    print("====================================== Random Search ======================================")
    random_search_individual, random_search_avg, random_search_best = GeneExpressionProgram.random_search(
        GeneExpressionProgram.NUM_GENERATIONS,
        GeneExpressionProgram.FITNESS_FUNCTION,
        GeneExpressionProgram.FITNESS_FUNCTION_ARGS
    )

    # Good genes:
    #ans = Chromosome(['+/aE?aa?aaEa*Ea+E/?L?aa??aaaaa???aaaaa???aa?a??a?aa???aaa??a?a??a?aaaaaa?aaa??aa', '?a???L*Laa+++L-?///aaa??????a?aa?aaa?aaa?aaa?aa??a?a?a?aa??aaa?a?aaaaaa?aa???aaa', '+L**E*aaEaa+E??*/a*-???a?aa?aaaa?aaaaaaa?a?aaaa?aa?aaaaaaa???aa?aaaa?aa??aaa????'])
    #ans.ephemeral_random_constants = [0.15711028370174618, 0.8582913293794245, 0.08834476189713181, -0.6507871955790874, -0.5001482762755984, -0.36339831251011967, 0.49239731282122023, -0.7707384044151064, 0.33957437653782097, -0.050909069821311936, 0.40688394042469067, 0.8838615430620207, -0.03950637280673064, -0.8398663459127116, -0.9701111175669401, 0.16516078130630563, -0.5163031755060181, 0.26930803528455916, -0.7833159749333989, -0.7075160969083776, -0.6751546227334948, 0.1505636368911023, -0.16805240822390255, -0.34424168190370485, -0.8544980338428079, -0.01217703484745547, -0.24005751860391533, -0.5077198074421936, -0.47443544577074226, 0.5247967085947582, -0.22543318048008576, 0.4938865002308659, -0.6465093618701077, -0.19098460727467326, -0.2944062401077585, 0.7016839377380519, -0.14341637591100542, 0.23227088210671476, 0.36051772215302, 0.6509343605188611, -0.332150327502315, 0.3171544096208032, 0.2676850827993431, -0.46506262502073414, 0.843996276413379, -0.5614960005334659, 0.47891344757490373, -0.5575325624206526, -0.8156525364269045, -0.31652263727243746, 0.06884531540832572, 0.8836222097723032, 0.6601557412383419, -0.7869161076217597, -0.16110865846423783, -0.06702662866877018, -0.22568995470545228, 0.9438977429740325, -0.6410133183089668, 0.045353882523733624, 0.16786587502585948, -0.9008872395302681, 0.004355424045595413, 0.9179463218466064, 0.4105708444235643, -0.001799675111191501, -0.4201794697378056, -0.37672122028632216, 0.5906938113771141, 0.6004718433032417, -0.4698494772153414, 0.04238505345795085, 0.1657146428146341, 0.5148585050576182, 0.6955837854892111, -0.08812485635975653, -0.4101965485774235, 0.7234363214796748, 0.14285945798729927, 0.6450352601522822]
    #ans.plot_solution(GeneExpressionProgram.OBJECTIVE_FUNCTION, GeneExpressionProgram.OBJECTIVE_MIN, GeneExpressionProgram.OBJECTIVE_MAX, [], [], "a")

    #ans = Chromosome([0.10325095527275208, -0.6353207023355358, 0.3087002846077995, -0.646063772861873, -0.7604999404652737, -0.7026365243175745, -0.8546384172276964, -0.9210656698793773, -0.92473371383499159, 0.38475054329790037, 0.011392225394319944, -0.18840805047095377, 0.40103873272978197, 0.06328742279657229, 0.7176441901359412, -0.26941389839428265, 0.8330939983953638, -0.3291581824265808, -0.6081511326274629, 0.9437496769675557, 0.14649019501924365, -0.026586568971478375, 0.6942711369955727, -0.4952751886918654, 0.49348107956565457, 0.6373366861063929, -0.4180513467203788, 0.37222546291909575, 0.09407582333744235, -0.7976697480943422, -0.04875126785507633, 0.0738340898048051, 0.4574594498229325, 0.6176526203069368, -0.8316660105318336, 0.328772037314927, 0.854958551073552, -0.3935592840377773, -0.1523385500807699, -0.8272148377421589, 0.8580710774374207, -0.18672644199209687, 0.5393722078778671, -0.6906345459192962, -0.22443846331305117, 0.34512085908760115, 0.21516058072431332, -0.8499353974445536, 0.26989917691422316, -0.44201601204751584, 0.00887253706744362, -0.7836088648008288, -0.8679234491746306, -0.07209104779770104, 0.16000238129327515, 0.8890857194001152, -0.6325595149575627, -0.11804832230841544, -0.018168330413408817, -0.6561487673112327, 0.18667288624065526, -0.21171583904469493, -0.7074160949096726, -0.5647179047402893, 0.6466522462832285, 0.17157152187819658, -0.55051406791476754, -0.32459344245456756, -0.73568042741430184, 0.39543504084587644, -0.9332865781958235, 0.9903268434472272, 0.05041449572068979, 0.0798497529910911, -0.66617358746171074, 0.5071192429292173, 0.38014218205802264, -0.929868126429702, -0.23615736970440504, 0.5939936181262968])
    #ans.ephemeral_random_constants = ['+EEEaa/L-+L-EL-L?aLEaa??????aa?aa?????aa??a???aaaaaaa????aa?aa??aa??a?a??aaa?aaa', '+EEE*aa?+?*/LLa/*+*-a?a?aaaa??a?aa?a??aa???aaaaa????a???????a?aaa????????aaaaaa?', '-L***aaa--+*+*+?+?--?aa??aa?aa?aa??a??a?aa??aaaaa?a??a?aa?aa?????aaaa??aa?aaa?a?']

    best = max(answers, key=lambda c:c.fitness())
    ans.plot_solution(GeneExpressionProgram.OBJECTIVE_FUNCTION, GeneExpressionProgram.OBJECTIVE_MIN, GeneExpressionProgram.OBJECTIVE_MAX, average_fitnesses[0], best_fitnesses[0], "a")
    GeneExpressionProgram.plot_reps(average_fitnesses, best_fitnesses, random_search_avg, random_search_best)