In [1]:
from node import Node, Operation
from graph import Graph

In [2]:
plus = Operation(arity=2, operation=lambda x,y: x+y)
neg = Operation(arity=1, operation=lambda x: -x)

(plus.func(1,2), neg.func(5)) == (plus(1,2), neg(5))

True

In [3]:
def get_value(self):
    if self.value:
        return self.value
        
    #self.value = self.operation(*[x.get_value() for x in self.inputs]) #if we keep references instead of ids
    self.value = self.operation(*[Node.graph[x].get_value() for x in self.inputs])
    return self.value

Node.get_value = get_value

#operations
incr = Operation(arity=2, operation=lambda x: x+1)
neg  = Operation(arity=1, operation=lambda x: -x)
cte  = Operation(arity=1, operation=lambda x: x)

# input
i0 = Node(terminal=True, operation=cte, value=5)

# middle node
n0 = Node(terminal=False, operation=neg)

# output
o1 = Node(terminal=False, operation=cte)
o2 = Node(terminal=False, operation=incr)

#connections
n0.add_inputs([i0.id])

o1.add_inputs([n0.id])
o2.add_inputs([n0.id])

'''
i0->n0->(o1,o2)
'''
Node.graph = {i0.id: i0,
              n0.id: n0,
              o1.id: o1,
              o2.id: o2}

o1.get_value(), o2.get_value()

(-5, -4)

In [42]:
import numpy as np

class CGP():
    operations = []
    rng = np.random

    def __init__(self, n_in, n_out, n_row, n_col):
        self.nodes: Dict[str, Node] = {}
            
        self.n_in  = n_in
        self.n_out = n_out
        self.n_row = n_row
        self.n_col = n_col
        
        self.columns = []

    def add_input_layer(self):
        cte = Operation(arity=1, operation=lambda x:x)
        inputs = []
        for i in range(self.n_in):
            new_node = Node(terminal=True, operation=cte)
            new_node.active = False
            self.nodes[new_node.id] = new_node
            inputs.append(new_node.id)
        self.columns.append(inputs)
        
    def add_middle_layer(self):
        for i in range(self.n_col):
            col = []
            for j in range(self.n_row):
                new_node = Node(terminal=False, operation=CGP.rng.choice(CGP.operations))
                new_node.active = False
                self.nodes[new_node.id] = new_node
                col.append(new_node.id)
            self.columns.append(col)

    def add_output_layer(self):
        cte = Operation(arity=1, operation=lambda x:x)
        outputs = []
        for i in range(self.n_out):
            new_node = Node(terminal=False, operation=cte)
            new_node.active = True
            self.nodes[new_node.id] = new_node
            outputs.append(new_node.id)
        self.columns.append(outputs)
            
    def make_connections(self):
        # for each column, from output layer to input layer
        for i in range(start, 0, -1):
            # list of all previous nodes flattened
            previous_cols = [value for col in self.columns[:i] for value in col]
            
            # for each node in the layer i
            for j in self.columns[i]:
                current_node = self.nodes[j]
#                 print("node", current_node.id, end=" ")

                if current_node.active:
                    arity  = current_node.operation.arity
#                     print("with arity", arity, end=" ")
                    
                    # pick n=arity nodes randomly
                    inodes_idlist = CGP.rng.choice(previous_cols, arity)
                    
                    # add to the list of inputs
                    current_node.add_inputs(inodes_idlist)
#                     print("added inputs", current_node.inputs)
                    
                    # mark picked nodes as active
                    # (another option, we could process active nodes only)
                    for nodeid in inodes_idlist:
                        self.nodes[nodeid].active = True
        
    def get_node_value(self, node_id: int):
        node = self.nodes[node_id]
        if node.value != None:
            return node.value

        inputs = []
        for i in node.inputs:
            value = self.get_node_value(i)
            inputs.append(value)

        result = node.operation(*inputs)
        node.value = result # this may help avoid recalculating nodes in the same execution
                            # remember to make this value null when reseting the graph

        return result

        
    @staticmethod
    def initialize(n):
        return [CGP() for _ in range(n)]
            
    @staticmethod
    def add_operation(arity, func):
        op = Operation(arity, func)
        CGP.operations.append(op)
        
## set up ##
seed = 2001
CGP.rng = np.random.RandomState(seed)

## parameters ##
n_in  = 2
n_out = 2
max_size = (3, 2) #2 layers with 3 nodes each

## primitives ##
CGP.add_operation(1, lambda x: x+1)
CGP.add_operation(2, lambda x: x+y)
CGP.add_operation(1, lambda x: -x)


n = 4 #population size
population = [CGP(n_in, n_out, *max_size) for i in range(n)]

In [43]:
a = CGP(n_in, n_out, *max_size)

a.add_input_layer()
a.add_middle_layer()
a.add_output_layer()

# print(a.nodes)
print(a.columns)

for c in a.columns:
    for n in c:
        print(a.nodes[n].operation.arity, end=" ")

[[84, 85], [86, 87, 88], [89, 90, 91], [92, 93]]
1 1 1 1 1 1 1 1 1 1 

In [44]:
a.make_connections()

for c in a.columns:
    for n in c:
        mynode = a.nodes[n]
        print(f"id:{mynode.id} inputs{mynode.inputs}")

node 92 with arity 1 added inputs [89]
node 93 with arity 1 added inputs [84]
node 89 with arity 1 added inputs [85]
node 90 node 91 node 86 node 87 node 88 id:84 inputs[]
id:85 inputs[]
id:86 inputs[]
id:87 inputs[]
id:88 inputs[]
id:89 inputs[85]
id:90 inputs[]
id:91 inputs[]
id:92 inputs[89]
id:93 inputs[84]


In [45]:
inode1 = a.nodes[a.columns[0][0]]
inode2 = a.nodes[a.columns[0][1]]

inode1.value = 5
inode2.value = 10

In [47]:
o1 = a.columns[-1][0]
o2 = a.columns[-1][1]

a.get_node_value(o1), a.get_node_value(o2)

(11, 5)

In [49]:
self = a
for i in range(start, 0, -1):
    for j in self.columns[i]:
        current_node = self.nodes[j]

        if current_node.active:
            print(current_node.id, current_node.inputs)

92 [89]
93 [84]
89 [85]


In [None]:
# def evaluate(self):
#     if self.value:
#         return self.value
#     return self.func([x.value for x in self.inputs])


# def get_value(self):
#     if self._value:
#         return self._value

#     self._value = self.func([x.value for x in self.inputs])
#     return self._value

# @property
# def value(self):
#     if not self._value:
#         self._value = self.func([x.value for x in self.inputs])
#     return self._value

# @x.setter
# def x(self, x):
#     #logging.debug("Solution.set_x "+ str(x))
#     self._x      = Solution.repair(x, *Solution.bounds)[:]
#     self.fitness = None 