-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLinearRegressionNetwork.py
172 lines (148 loc) · 6.78 KB
/
LinearRegressionNetwork.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from Node import Node
from Connection import Connection
from Layer import Layer
from time import time
import random
class LinearRegressionNetwork:
def __init__(self, shape, learningRate, initWeights=None):
self.m_shape = shape
self.m_learningRate = learningRate
self.m_initWeights = initWeights
self.m_layers = []
self.m_nodes = []
self.m_connectionsDict = {} #toNode, fromNode
self.m_connections = []
self.initNetwork()
def addLayer(self, layer):
self.m_layers.append(layer)
def addNode(self, node):
self.m_nodes.append(node)
def addConnection(self, connection):
toNode = connection.getToNode()
fromNode = connection.getFromNode()
try:
self.m_connectionsDict[toNode][fromNode] = connection
except:
self.m_connectionsDict[toNode] = {}
self.m_connectionsDict[toNode][fromNode] = connection
self.m_connections.append(connection)
def createNodesAndLayers(self):
for shapeIndex, shape in enumerate(self.m_shape):
currentLayer = Layer([], self, shapeIndex, shape)
for nodeIndex in range(0, shape):
currentNode = Node(currentLayer, self, nodeIndex)
currentLayer.addNode(currentNode)
self.addNode(currentNode)
if(shapeIndex < len(self.m_shape) - 1):
biasNode = Node(currentLayer,self,shape,True)
self.addNode(biasNode)
currentLayer.addNode(biasNode)
self.addLayer(currentLayer)
def createConnections(self):
random.seed(time())
allLayersButLast = self.m_layers[:-1]
for layerIndex, currentLayer in enumerate(allLayersButLast):
nextLayer = self.m_layers[layerIndex + 1]
for fromNode in currentLayer.m_nodes:
for toNode in nextLayer.m_nodes:
if (toNode.isBias() == False):
currentConnection = Connection(fromNode, toNode)
if (self.m_initWeights != None):
currentConnection.setWeight(self.m_initWeights)
else:
currentConnection.setWeight(random.random())
self.addConnection(currentConnection)
fromNode.addConnectionUp(currentConnection)
toNode.addConnectionDown(currentConnection)
def initNetwork(self):
self.createNodesAndLayers()
self.createConnections()
def getInputSize(self):
return self.m_shape[0]
def getOutputSize(self):
return self.m_shape[-1]
def getLayer(self, index=-1):
return self.m_layers[index]
def getLearningRate(self):
return self.m_learningRate
def setLearningRate(self, learningRate):
self.m_learningRate = learningRate
def getConnectionsDict(self):
return self.m_connectionsDict
def getConnections(self):
return self.m_connections
def getNodes(self):
return self.m_nodes
def setInputLayer(self, input):
for i, node in enumerate(self.getLayer(0).getNodes()):
if (node.isBias() == False):
node.setValue(input[i])
else:
node.setValue(1)
def forward(self, input):
if (len(input) != self.getInputSize()):
raise IOError("Input is not in the right size")
self.setInputLayer(input)
for previousLayerIndex, currentLayer in enumerate(self.m_layers[1: ]):
for toNode in currentLayer.getNodes():
if(toNode.isBias() == False):
newValue = 0
for fromNode in self.m_connectionsDict[toNode]:
connection = self.m_connectionsDict[toNode][fromNode]
newValue += connection.getWeight() * fromNode.getValue()
toNode.setValue(newValue)
lastLayer = self.getLayer(-1)
return lastLayer.getLayerInArrayFormat()
def calculateLoss(self, expectedOutput):
if (self.getOutputSize() != len(expectedOutput)):
raise IOError(f"Output is not in the right size. Output: {self.getOutputSize()}, Expected output: {expectedOutput}")
lastLayer = self.getLayer(-1)
lastLayerArray = lastLayer.getLayerInArrayFormat()
loss = 0
for index, value in enumerate(lastLayerArray):
expectedValue = expectedOutput[index]
loss += (value - expectedValue) ** 2
return loss
def calculateGradientForOutputLayer(self, expectedOutput):
if (self.getOutputSize() != len(expectedOutput)):
raise IOError("Output is not in the right size")
lastLayer = self.getLayer(-1)
lastLayerArray = lastLayer.getLayerInArrayFormat()
for index, value in enumerate(lastLayerArray):
expectedValue = expectedOutput[index]
currentNode = lastLayer.getNodes()[index]
#The dLoss / dLastLayerNodes = 2 * (value - expectedValue)
value = 2 * (value - expectedValue)
value /= self.getOutputSize()
currentNode.setGradient(value)
def backwardNodeGradient(self):
# dLoss/dFromNode = SUM((dLoss/dToNode) * (dToNode/dFromNode))
# dToNode/dFromNode = Weight between nodes
allLayersButLast = self.m_layers[:-1]
for layer in allLayersButLast[::-1]:
for fromNode in layer.getNodes():
if (fromNode.isBias() == False):
sum = 0
for connectionUp in fromNode.getConnectionsUp():
toNode = connectionUp.getToNode()
sum += toNode.getGradient() * connectionUp.getWeight()
sum /= layer.getLength()
fromNode.setGradient(sum)
def updateWeightsGradient(self):
#dLoss/dWeight = dLoss/dToNode * dToNode/dFromNode
for connection in self.getConnections():
toNode = connection.getToNode()
fromNode = connection.getFromNode()
gradientValue = toNode.getGradient() * fromNode.getValue()
connection.setGradient(gradientValue)
def step(self):
for connection in self.getConnections():
currentWeight = connection.getWeight()
currentWeight -= connection.getGradient() * self.getLearningRate()
connection.setWeight(currentWeight)
def train(self, input, expectedOutput):
self.forward(input)
self.calculateGradientForOutputLayer(expectedOutput)
self.backwardNodeGradient()
self.updateWeightsGradient()
self.step()