forked from many-facedgod/NEAT-RL
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Network.py
146 lines (123 loc) · 3.64 KB
/
Network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from __future__ import print_function
import networkx as nx
import numpy as np
from Config import *
def sigmoid(x, z):
return 1.0 / (1 + np.exp(-z * x))
class Neuron:
count = 0
def __init__(self, bias, is_input, is_output):
self.id = Neuron.count
Neuron.count += 1
self.bias = bias
self.current = 0
self.is_input = is_input
self.is_output = is_output
class Edge:
count = 0
def __init__(self, weight, rec):
self.id = Edge.count
Edge.count += 1
self.weight = weight
self.isrec = rec
class Network:
count = 0
def __init__(self):
self.graph = nx.DiGraph()
self.DAG = nx.DiGraph()
self.id = Network.count
Network.count += 1
self.isrec = False
self.inputs = []
self.outputs = []
self.nodes = []
self.topocache=[]
def node_count(self):
return len(self.nodes)
def edge_count(self):
return len(self.graph.edges())
def add_edge(self, weight, source, dest):
assert (source, dest) not in self.graph.edges(), "Edge already present"
rec = nx.has_path(self.DAG, dest, source)
e = Edge(weight, rec)
self.graph.add_edge(source, dest, object=e)
if not rec:
self.DAG.add_edge(source, dest, object=e)
else:
self.isrec = True
self.topocache=[]
def add_node(self, is_input, is_output, bias=0.0):
n = Neuron(bias, is_input, is_output)
self.graph.add_node(n)
self.DAG.add_node(n)
if is_input:
self.inputs.append(n)
if is_output:
self.outputs.append(n)
self.nodes.append(n)
self.topocache=[]
return n
def eval_node(self, node):
assert not node.is_input, "Input nodes cannot be evaluated"
l = self.graph.in_edges(nbunch=[node], data=True)
s = 0
for source, _, e in l:
s += source.current * e["object"].weight
s += node.bias
return sigmoid(s, SIGMOID_POWER)
def set_input(self, inputs):
for n, v in zip(self.inputs, inputs):
n.current = v
def eval_synch(self, k=5):
l = self.nodes
for j in xrange(k):
new_vals = []
for n in l:
if not n.is_input:
new_vals.append(self.eval_node(n))
i = 0
for n in l:
if not n.is_input:
n.current = new_vals[i]
i += 1
def eval_asynch(self):
if not self.topocache:
self.topocache = nx.topological_sort(self.DAG)
for n in self.topocache:
if not n.is_input:
n.current = self.eval_node(n)
def get_current_output(self):
return [n.current for n in self.outputs]
def clear_state(self):
for n in self.nodes:
n.current=0.0
"""net=Network()
n1=net.add_node(True, False)
n2=net.add_node(True, False)
n3=net.add_node(False, False, 0.0)
n4=net.add_node(False, False, 0.0)
n5=net.add_node(False, True, 0.0)
net.add_edge(1, n1, n3)
net.add_edge(1, n1, n4)
net.add_edge(1, n2, n3)
net.add_edge(1, n2, n4)
net.add_edge(1, n3, n5)
net.add_edge(-1, n5, n3)
net.add_edge(1, n4, n5)
print(net.isrec)
net.set_input([0.1, 0.2])
net.eval_asynch()
print(net.get_current_output())
net.set_input([0.1, 0.2])
net.eval_asynch()
print(net.get_current_output())
net.clear_state()
net.set_input([0.0, -0.5])
net.eval_asynch()
print(net.get_current_output())
net.set_input([0.1, 0.2])
net.eval_asynch()
print(net.get_current_output())
net.set_input([0.4, -1])
net.eval_asynch()
print(net.get_current_output())"""