In [1]:
import os
import sys
nb_dir = os.path.split(os.getcwd())[0]
if nb_dir not in sys.path:
    sys.path.append(nb_dir)

In [2]:
from __future__ import annotations
import abc
import typing

from typing import Optional
from typing import Tuple
from typing import Dict
from typing import Generic
from typing import Type
from dataclasses import dataclass

from typing import Union
from typing import List
from typing import Any
from typing import Self
from typing import Callable
from inspect import signature

from core.variator import Variator
from core.selector import NaiveElitistSelector
from core.evaluator import Evaluator
from core.controller import Controller
from core.population import Population
from core.population import Genome




from random import choice









T = typing.TypeVar("T")

class ArityMismatch(Exception):

    def __init__(self, expr, expected:Optional[int], given: Optional[int]):
        super().__init__(f"Arity mismatch: while evaluating {str(expr)}, "
                         f"{expected} arguments are expected, while "
                         f"{given} are given")

def get_arity(fun: Any) -> int:
    if (callable(fun)):
        return len(signature(fun).parameters)                 
    else:
        return 0

# class VariadicFunction(typing.Generic[T]):
#     def __init__(self, function : Union[typing.Callable[..., T], T], arity: Optional[int] = None):
#         self._function = function
#         self.arity = arity
        
#         # If the arity is not given, get cute and try to infer it from function signature.
#         # At this point (2023-10-31, 14:44), I do not plan to implement the ability to allow the user to explicitly give arity.
#         # This generalization (from 1, 2 trees to 1, n tree) is quite generous as it is.
#         # It is also not possible to retrieve the signature of built-in functions.
#         if self.arity is None:
#             if (callable(self._function)):
#                 self.arity = get_arity(self._function)
#             else:
#                 self.arity = 0

#         # Sanity check: if a constant is given and arity is not 0, make noise
#         if (not callable(self._function) and self.arity != 0):
#             raise ArityMismatch(self._function, self.arity, arity)

#     def __call__(self, *args: T) -> T:
#         if (callable(self._function)):
#             # If the function is callable, check if its arity matches that of the object.
#             if (len(args) != self.arity):
#                 raise ArityMismatch(self._function, self.arity, len(args))
            
#             return self._function(*args)
#         else:
#             # If the function is a value, assert that its arity is 0.
#             if (len(args) != 0):
#                 raise ArityMismatch(self._function, self.arity, len(args))
#             return self._function
        
#     def __str__(self):
#         return str(self._function.__name__)

class Expression(abc.ABC, typing.Generic[T]):
    def __init__(self, function: T | typing.Callable[..., T], *children: Expression[T]):
        self._function = function
        self.children : List[Expression[T]] = list(children)
    
    def evaluate(self) -> T:
        expected_arity = get_arity(self._function)
        children_arity = len(self.children)
        results = tuple(x.evaluate() for x in self.children)
        if callable(self._function):
            if (expected_arity == children_arity):
                return self._function(*results)
            else:
                raise ArityMismatch(self._function, expected_arity, children_arity)
        elif expected_arity == 0 and children_arity == 0:
            return self._function
        else:
            raise ArityMismatch(self._function, expected_arity, children_arity)
        
        return self._function(*results)
    
    def copy(self) -> Self:
        children_copies : Tuple[Expression[T], ...] = tuple(x.copy() for x in self.children)
        return self.__class__(self._function, *children_copies)
    
    # Beware this function leans to the left subtree
    def nodes(self) -> List[Expression[T]]:
        return [self] + list(self.children)
    
    def __str__(self) -> str:
        children_names : str = ''
        for c in self.children:
            children_names = children_names +" "+ str(c)
        my_name: str = ""
        if callable(self._function):
            my_name = self._function.__name__
        else:
            my_name = str(self._function)
        return (f"{str(my_name)} ({children_names})")

    __copy__ = copy
    __deepcopy__ = copy


class ExpressionFactory(typing.Generic[T]):
    def __init__(self, functions: Tuple[Callable[..., T], ...]):

        # Build a pool of functions, as a dictionary where each index corresponds to an arity, and each 
        #   value is a list of functions (or terminals!) with that arity.
        self.function_pool: Dict[int, List[Callable[..., T]]] = {}
        for fun in functions:
            arity: int = get_arity(fun)
            if arity in self.function_pool:
                self.function_pool[arity].append(fun)
            else:
                self.function_pool[arity] = [fun]

        self.depth_cap: int = 0
        self.budget_cap: int = 0
        self.depth = 0
        self.budget = 0

    def build(self, depth: int, budget: int) -> Expression:
        target_function = self.poll_function()
        arity = get_arity(target_function)

        children : List [Expression]= []
        for i in range(0, arity):
            children.append(self._build_recurse(depth-1))

        root = Expression(target_function, *children)
        return root
        

    def _build_recurse(self, depth_left: int) -> Expression:
        if (depth_left < 1):
            target_function = self.poll_arity(0)
            return Expression(target_function)
        else:
            target_function = self.poll_function()
            arity = get_arity(target_function)
            children : List [Expression]= []
            for i in range(0, arity):
                children.append(self._build_recurse(depth_left-1))
            base = Expression[T](target_function, *children)
            return base

    def budget_check(self, budget_increment: int) -> bool:
        self.budget = self.budget + budget_increment
        return self.budget > self.budget_cap
    
    def poll_function(self) -> Callable[..., T]:
        return choice(choice(self.function_pool))

    def poll_arity(self, arity: int) -> Callable[..., T]:
        return choice(self.function_pool[arity])
        



import math



class BadSymbolError(Exception):
    def __init__(self, name: str):
        super().__init__(f"The symbol {name} is used but not assigned.")


class Symbol(typing.Generic[T]):
    def __init__(self, name: str = "default_symbol_name", value: Optional[T] = None):
        self.value : Optional[T] = value
        self.__name__ : str = name

    def __call__(self) -> T:
        if (self.value is None):
            raise BadSymbolError(self.__name__)
        return self.value
    
    def assign(self, val: T) -> None:
        self.value = val
    
    def __str__(self)-> str:
        return self.__name__

# a = Symbol[float](1, "var_a")
# b = Symbol[float](2, "var_b")
# c = Symbol[float](3, 'var_c')


# exprfactory = ExpressionFactory[float]((math.sin, math.sqrt, math.pow, add, sub, a, b, c, 4))

# e = exprfactory.build(10, 0)

# print (e.evaluate())

# a.value = 4

# print (e.evaluate())

class ProgramFactory(Generic[T]):
    def __init__(self, functions: Tuple[Callable[..., T], ...], arity):
        self.arity = arity
        self._symbol_count = 0 # only used to keep track of symbol names. Does not relate to arity
        self.symbol_deposit : List[Symbol]= []
        
        for i in range(0, arity):
            self.deposit_symbol(self.next_symbol())

        self.exprfactory = ExpressionFactory[T](functions + tuple(self.symbol_deposit))

    def next_symbol_name(self) -> str:
        self._symbol_count = self._symbol_count + 1
        return str(self._symbol_count)

    def deposit_symbol(self, s: Symbol[T]) -> None:
        self.symbol_deposit.append(s)
        
    def next_symbol(self) -> Symbol[T]:
        return Symbol("sym_" + self.next_symbol_name())
    
    def build(self, depth: int, budget: int) -> Program:
        return Program(self.exprfactory.build(depth, budget), self.symbol_deposit)
    
    
    

# Note that programs from the same factory share the same set of argument values.
# This should be desirable - and well hidden - if the object is only shared by one thread.
# I'm not prepared to deal with concurrency.

class ProgramArityMismatchError(Exception):

    def __init__(self, expected:Optional[int], given: Optional[int]):
        super().__init__(f"The program is expecting {expected} arguments, only {given} are given.")
        

class Program(Genome[T]):
    def __init__(self, expr: Expression, symbols: List[Symbol]):
        self.expr = expr
        self.symbols = symbols
    
    def evaluate(self, *args: T) -> T:
        if (len(args) != len(self.symbols)):
            raise ProgramArityMismatchError(len(self.symbols), len(args))
        self.__class__._assign_values(self.symbols, args)
        return self.expr.evaluate()

    @staticmethod
    def _assign_values(symbols: List[Symbol[T]], values: Tuple[T, ...]) -> None:
        # This is exceptionally unpythonic
        for i in range (0, len(symbols)):
            symbols[i].assign(values[i])

    def __str__(self) -> str:
        return str(self.expr)
    
    # Warning! Breaking design patterns.
    def nodes(self) -> List[Expression[T]]:
        return self.expr.nodes()
    
    def copy(self) -> Self:
        return self.__class__(self.expr.copy(), self.symbols)

def sin(x: float) -> float:
    return math.sin(x)

def cos(x: float) -> float:
    return math.cos(x)

def tan(x: float) -> float:
    return math.tan(x)

def add (x:float, y:float):
    return x + y

def sub (x:float, y:float):
    return x - y

def mul (x:float, y:float):
    return x * y

def div (x:float, y:float):
    if y == 0:
        return 1
    return x / y

def lim(x: float, max_val:float, min_val:float) -> float:
    return max(min(max_val, x), min_val)


import random

from typing import Iterable
class ProgramCrossoverVariator(Variator[Program[T]]):
    def vary(self, rate: float, *program: Program[T]) -> Iterable[Program[T]]:
        root1: Program = program[0]
        root2: Program = program[1]

        expression_nodes_from_root_1 = root1.nodes()
        expression_nodes_from_root_2 = root2.nodes()

        # this relies on a very ad-hoc implementation that provides access to expressions though the program. 
        # This should not be possible. But I am tired and cannot think of better ways to do it.
        expression_internal_nodes_from_root_1 = [x for x in expression_nodes_from_root_1 if len(x.children) > 0]
        expression_internal_nodes_from_root_2 = [x for x in expression_nodes_from_root_2 if len(x.children) > 0]

        if (len(expression_internal_nodes_from_root_1) >= 1 and len(expression_internal_nodes_from_root_2) >= 1):
            expression_node_from_root_1_to_swap = random.choice(expression_internal_nodes_from_root_1)
            expression_node_from_root_2_to_swap = random.choice(expression_internal_nodes_from_root_2)
            self.__class__.swap_children(expression_node_from_root_1_to_swap, expression_node_from_root_2_to_swap)

        return [root1, root2]

    
    @staticmethod
    def swap_children(expr1: Expression[T], expr2: Expression[T]) -> None:
        child_nodes = list(expr1.children + expr2.children)
        random.shuffle(child_nodes)

        for i in range(0,len(expr1.children)):
            expr1.children[i] = child_nodes[i].copy()

        for i in range(-1,-(len(expr2.children) + 1), -1):
            expr2.children[i] = child_nodes[i].copy()



pop_size = 100
tree_depth = 5

class PendulumEvaluator(Evaluator[Program[float]]):
    def evaluate(self, s1: Program[float]) -> float:
        score = evaluate_program_against_environment(s1, "Pendulum-v1") 
        # print ("evaluation done, score is " + str(score))
        return score

# Build the population of ternary programs
progf = ProgramFactory((add, sub, mul, div, sin, cos, mul, div, lim), 3)

pops: Population[Program[float]] = Population()
for i in range(0, pop_size):
    pops.add(progf.build(tree_depth,tree_depth))

# Prepare the variator
variator = ProgramCrossoverVariator[Program[float]](2)

# The evaluaor is ready
evaluator = PendulumEvaluator()

# Selector on standby
import gymnasium as gym
sel = NaiveElitistSelector[Program[float]](evaluator, outsize = pop_size)

def pendulum_wrapper(f: float) -> float:
    return max(min(2, f), -2)

def evaluate_program_against_environment(program, name, visible: bool = False) -> float:
    score = 0. 
    iter_bound = 10
    for i in range(0,iter_bound):
        score = score + evaluate_program_against_environment_once(program, name, visible)
    return score / iter_bound

def evaluate_program_against_environment_once(program, name, visible: bool = False) -> float:
    if visible:
        env = gym.make(name, render_mode="human")
    else:
        env = gym.make(name)
    step_result = env.reset()
    score = step_result[1]
    for i in range(0,100):
        step_result = env.step([pendulum_wrapper(program.evaluate(*step_result[0]))])
        score = step_result[1]
    return score


ctrl = Controller[Program[float]](
    population = pops,
    evaluator = evaluator,
    variator = variator,
    survivor_selector = sel
)

for i in range(0, 20):
    ctrl.step()


parent pool size:100
50
ppppppp length: 50
survivor pool size:101
generation done, best:div ( sin ( cos ( mul ( lim ( sym_1 () sym_3 () sym_2 ()) cos ( sym_3 ())))) sym_3 ()) with score -7.5563435855071095, re-evaluate: -4.032903883878151
parent pool size:101
51
ppppppp length: 51
survivor pool size:101
generation done, best:sin ( div ( sym_1 () lim ( sin ( sym_1 ()) lim ( sym_1 () sym_3 () sym_3 ()) cos ( sym_3 ())))) with score -4.654132782892806, re-evaluate: -5.683927893598097
parent pool size:101
51
ppppppp length: 51
survivor pool size:101
generation done, best:sin ( div ( sym_1 () lim ( sin ( sym_1 ()) lim ( sym_1 () sym_3 () sym_3 ()) cos ( sym_3 ())))) with score -4.654132782892806, re-evaluate: -6.082883607857674
parent pool size:101
51
ppppppp length: 51
survivor pool size:101
generation done, best:sin ( div ( sym_1 () lim ( sin ( sym_1 ()) lim ( sym_1 () sym_3 () sym_3 ()) cos ( sym_3 ())))) with score -4.654132782892806, re-evaluate: -6.87890990114648
parent pool size:101


In [6]:

evaluate_program_against_environment_once(sel.best, "Pendulum-v1", visible = True)

-4.492797153994267

In [3]:
env = gym.make("Pendulum-v1", render_mode="human")
step_result = env.reset()
score = step_result[1]
for i in range(0,1000):
    step_result = env.step([pendulum_wrapper(sel.best.evaluate(*step_result[0]))])
    score = step_result[1]
print(score)
env.close()


-1.0038693006888348


In [4]:
visible = True
if visible:
    env = gym.make("Pendulum-v1", render_mode="human")
else:
    env = gym.make("Pendulum-v1")
step_result = env.reset()
score = step_result[1]
for i in range(0,1000):
    step_result = env.step([-1])
    score = step_result[1]
print(score)

-7.943672213016889


: 