-
Notifications
You must be signed in to change notification settings - Fork 0
/
models.py
195 lines (152 loc) · 6.43 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import numpy as np
from optimizers import SGD
class Model(object):
"""
An abstract class that represents a machine learning model.
"""
def __init__(self):
pass
def __call__(self, X, *args, **kwargs):
return self.forward(X, *args, **kwargs)
def forward(self, X, training=False, *args, **kwargs):
"""
Computes the model's output (forward pass)
If training is True variables that need to be stored during training are kept in a cache
dictionary for each layer. Also, some layers such as Dropout behave differently if
training is set to True.
"""
return X
def configure(self, loss, optimizer=SGD(learning_rate=0.01), metrics=None):
"""
Configure the model for training or evaluation
metrics should be a dictionary with the name each of the metric as the key and the
function that computes the metric as the value
"""
self.loss = loss
self.optimizer = optimizer
self.metrics = metrics
if self.metrics is None:
self.metrics = {}
def _evaluate(self, Y_train, Y_train_pred, Y_val=None, Y_val_pred=None):
metrics = {}
metrics['loss'] = self.loss(Y_train, Y_train_pred)
if Y_val is not None:
metrics['val_loss'] = self.loss(Y_val, Y_val_pred)
for name, metric in self.metrics.items():
metrics[name] = metric(Y_train, Y_train_pred)
if Y_val is not None:
metrics[f"val_{name}"] = metric(Y_val, Y_val_pred)
return metrics
def evaluate(self, X, Y, X_val=None, Y_val=None):
"""
Evaluates the configured loss and metrics on the given data batch
Returns the loss and the metrics
"""
# make predictions on the training set
Y_pred = self.forward(X)
# make predictions on the validation set
if X_val is not None:
Y_val_pred = self.forward(X_val)
else:
Y_val_pred = None
# compute and print the metrics
metrics = self._evaluate(Y, Y_pred, Y_val, Y_val_pred)
eval_info = self.format_metrics(metrics)
print(eval_info)
return metrics
def format_metrics(self, metrics):
"""
metrics should be a dictionary with the name each of the metric as the key and the
value of the metric as the dictionary value
"""
s = '\t'.join([f"{name}={value:.6f}" for name, value in metrics.items()])
return s
def train_step(self, X, Y, *args, **kwargs):
"""
Performs backpropagation through the model and updates the model's parameters using the
training batch (X, Y).
"""
pass
def train(self, X, Y, X_val=None, Y_val=None, epochs=10, batch_size=32, drop_remainder=True, shuffle=True, verbose=True, *args, **kwargs):
"""
Performs backpropagation through the model and updates the model's parameters using the
training set for the given number of epochs (X, Y).
- X_val and Y_val can be set to evaluate the model on validation data after each epoch.
- If drop_remainder is True, the last batch will be dropped if its size is less than
batch_size.
- If shuffle is True, the training set will be shuffled before each epoch.
"""
history = []
if batch_size > X.shape[0]:
drop_remainder = False
for epoch in range(epochs):
# shuffle the training set
if shuffle:
indices = np.random.permutation(X.shape[0])
X = X[indices]
Y = Y[indices]
# drop the last minibatch if its size is less than batch_sze
if drop_remainder:
train_size = (X.shape[0] // batch_size) * batch_size
else:
train_size = X.shape[0]
# loop over each batch and train the model
for batch_start in range(0, train_size, batch_size):
batch_end = batch_start + batch_size
X_batch = X[batch_start:batch_end]
Y_batch = Y[batch_start:batch_end]
self.train_step(X_batch, Y_batch, *args, **kwargs)
# make predictions on the training set
Y_pred = self.forward(X)
# make predictions on the validation set
if X_val is not None:
Y_val_pred = self.forward(X_val)
else:
Y_val_pred = None
# compute the metrics and store them in history
metrics = self._evaluate(Y, Y_pred, Y_val, Y_val_pred)
history.append(metrics)
if verbose:
epoch_info = f"Epoch {epoch+1:02}\t" + self.format_metrics(metrics)
print(epoch_info)
return history
class Sequential(Model):
def __init__(self, layers=None):
self.layers = []
if layers is not None:
for layer in layers:
self.add(layer)
def add(self, layer):
"""
Add a layer to the model
"""
if len(self.layers) == 0:
if layer.input_shape is None:
raise Exception("The input shape for the first layer of the model must be "
"specified")
else:
layer.input_shape = self.layers[-1].output_shape
layer.build()
self.layers.append(layer)
def forward(self, X, training=False):
Y_pred = X
for layer in self.layers:
Y_pred = layer(Y_pred, training=training)
return Y_pred
def train_step(self, X, Y):
# forward pass
Y_pred = self.forward(X, training=True)
# compute the gradient of the loss with respect to the Y_pred
dJ_dY = self.loss.backward(Y, Y_pred)
# dJ_dZ is a variable that holds the gradient of the loss with respect to
# the current layer's outputs
dJ_dZ = dJ_dY
for l in range(len(self.layers)-1, -1, -1):
layer = self.layers[l]
# backpropagate through this layer and update its parameters
grads = layer.backward(dJ_dZ, training=True)
if isinstance(grads, tuple):
dJ_dZ = grads[0]
layer.update_parameters(*grads[1:], self.optimizer)
else:
dJ_dZ = grads