-
Notifications
You must be signed in to change notification settings - Fork 3
/
BN3.py
317 lines (258 loc) · 10.7 KB
/
BN3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
import pytorch_lightning as pl
import torch
import torch.optim as optim
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import Dataset, TensorDataset, DataLoader
from torch.utils.data.dataset import random_split
class Net(pl.LightningModule):
# +++++++++++++++++++++++++++ Explain architecture +++++++++++++++++++++++++++
def explainModel():
# Explain the model using HTML text
# This will be added to the HTML tab in Comet
text = ""
# Model name
model_name = "BN3"
text += "<h1>{}</h1>".format(model_name)
# Model key points
key_points = """
<p>This model is taken from the paper:<br>
<i>M. Liu, W. Wu, Z. Gu, Z. Yu, F. F. Qi, and Y. Li, “Deep learning based on Batch Normalization for P300 signal detection,” Neurocomputing, vol. 275, pp. 288–297, 2017.</i></p>
<p>It defines a 6 layers CNN architecture.</p>
"""
# Further explain the architecture
text += "{}".format(key_points)
return text
# Log hyperparameters in Comet
def get_hyperparams(self):
return self.hyper_params
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# +++++++++++++++++++++++++++ Define Architecture +++++++++++++++++++++++++++
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def __init__(self, input_size=None, hyperparams={}):
super(Net, self).__init__()
# Default hyper-parameters from original work
# self.hyper_params = {
# 'input_size': input_size,
# 'batch_size': 64, # Batch size (try powers of 2)
# 'test_batch_size': 1,
# 'max_num_epochs': 500,
# 'optimizer': 'Adam', # SGD, Adam, ...
# 'betas': (0.9, 0.999),
# 'eps': 1e-8,
# 'weight_decay': 0,
# 'learning_rate': 1e-3, # Learning rate for Optimizer
# }
# FOR TRYING THE SGD OPTIMIZER!
# Default hyper-parameters from original work
self.hyper_params = {
'input_size': input_size,
'batch_size': 64, # Batch size (try powers of 2)
'test_batch_size': 1,
'max_num_epochs': 500,
'optimizer': 'SGD', # SGD, Adam, ...
'weight_decay': 0.00001, # Weight decay (L2 regularization)
'momentum': 0.9, # Momentum for Optimizer
'learning_rate': 1e-3, # Learning rate for Optimizer
}
# Overwrite hyperparameters if given
if(hyperparams):
for key,val in hyperparams.items():
self.hyper_params[key] = val
# **************** Declare layers ****************
self.layer0 = nn.BatchNorm2d(1)
self.layer1 = nn.Conv2d(1,16,kernel_size=(64,1))
self.layer2 = nn.Sequential(
nn.Conv2d(16,16,kernel_size=(1,20),stride=(1,20)),
nn.BatchNorm2d(16),
nn.ReLU(),
)
self.flatten = nn.Flatten()
self.layer3 = nn.Sequential(
nn.Dropout(p=0.2),
nn.Linear(240,100),
nn.Tanh(),
)
self.layer4 = nn.Sequential(
nn.Dropout(p=0.2),
nn.Linear(100,100),
nn.Tanh(),
)
self.layer5 = nn.Sequential(
nn.Linear(100,1),
# Sigmoid is not needed because it calculated is inside BCEWithLogitsLoss
# If Sigmoid would be explicitly written here, then use BCELoss loss function.
#nn.Sigmoid()
)
# +++++++++++++++++++++++++ Loss function +++++++++++++++++++++++++
self.loss_function = nn.BCEWithLogitsLoss()
def forward(self, x):
# Convert input to float (match network weights type)
x = x.float()
# Convert from [Batch, Channel, Length] to [Batch, Channel, Height, Width]
# Do this in order to correctly apply 2D convolution
# Get current size format
s = x.shape
# Convert
x = x.view(s[0], 1, s[1], s[2])
# apply Batch Normalization to input
out0 = self.layer0(x)
out1 = self.layer1(out0)
out2 = self.layer2(out1)
out2_flatten = self.flatten(out2)
out3 = self.layer3(out2_flatten)
out4 = self.layer4(out3)
out5 = self.layer5(out4)
# print("\nSizes:\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n".format(
# x.shape,
# out0.shape,
# out1.shape,
# out2.shape,
# out3.shape,
# out4.shape,
# out5.shape))
# Return output
return out5
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# +++++++++++++++++++ Test, Validation and Test steps +++++++++++++++++++++++
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# ---------- Train ----------
def training_step(self, train_batch, batch_idx):
x, y = train_batch
y = y[:,4] # get only label
# output: 1 node (logit. To classify first pass through sigmoid and use 0.5 as threshold [done in binary_acc])
y_logits = self.forward(x)
y_logits = y_logits.view(y_logits.shape[0])
y = y.type_as(y_logits)
loss = self.loss_function(y_logits, y)
acc = self.binary_acc(y_logits, y)
# Define outputs
logs = {
'loss_train': loss,
'acc_train': acc.clone().detach()}
output = {
'loss_train': loss,
'acc_train': acc.clone().detach(),
'loss': loss,
'log': logs
}
# print("Output train: {}".format(output))
return output
# ---------- Validation ----------
def validation_step(self, val_batch, batch_idx):
x, y = val_batch
y = y[:,4] # get only label
y_logits = self.forward(x)
y_logits = y_logits.view(y_logits.shape[0])
y = y.type_as(y_logits)
loss = self.loss_function(y_logits, y)
acc = self.binary_acc(y_logits, y)
# Define outputs
output = {
'loss_val': loss,
'acc_val': acc.clone().detach()
}
# print("Output val: {}".format(output))
return output
def validation_epoch_end(self, outputs):
avg_loss = torch.stack([x['loss_val'] for x in outputs]).mean()
avg_acc = torch.stack([x['acc_val'] for x in outputs]).mean()
# Define outputs
logs = {'loss_val': avg_loss, 'acc_val': avg_acc}
return {'val_loss': avg_loss, 'log': logs}
# ---------- Test ----------
def test_step(self, batch, batch_idx):
x, y_all = batch
y = y_all[:,4] # get only label
y_logits = self.forward(x)
y_logits = y_logits.view(y_logits.shape[0])
y = y.type_as(y_logits)
loss = self.loss_function(y_logits, y)
acc = self.binary_acc(y_logits, y)
# Specificy the subject from where this sample comes from:
subj = y_all[:,0].tolist()
subj_str = 'acc_' + str(subj[0])
# Calculate predicted outputs
y_predicted = torch.sigmoid(y_logits)
# define 0.5 as threshold
y_predicted[y_predicted>=0.5] = 1.0
y_predicted[y_predicted<0.5] = 0.0
# Define outputs
output = {
'acc': acc.clone().detach(),
subj_str: acc.clone().detach(),
'y_true': y.clone().detach(),
'y_predicted': y_predicted.clone().detach() # convert list of pairs to y_prediction
}
return output
def test_epoch_end(self, outputs):
test_acc = torch.stack([x['acc'] for x in outputs]).mean()
self.test_y_true = torch.stack([x['y_true'] for x in outputs])
self.test_y_predicted = torch.stack([x['y_predicted'] for x in outputs])
# Get accuracy per subject
self.test_acc_subj = [0]*6
for i in range(6):
subj_str = 'acc_' + str(i+1)
accuracies = [x[subj_str] for x in outputs if(subj_str in x)]
if(not accuracies): # if empty
self.test_acc_subj[i] = 0
else:
self.test_acc_subj[i] = torch.stack(accuracies).mean()
# Define outputs
# General accuracy
logs = {'acc_test': test_acc}
# Accuracy per subject
for i in range(6):
subj_str = 'acc_subj_' + str(i+1)
logs[subj_str] = self.test_acc_subj[i]
#for i in range()
return {'log': logs}
# # +++++++++++++++++++++++++++ Get data and split (test, val, train) +++++++++++++++++++++++++++
# def prepare_data(self):
# # All datasets are downloaded passed to the Model Constructor.
# # No further preparation needed.
# pass
# # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# # +++++++++++++++++++++++++++ Data Loaders +++++++++++++++++++++++++++
# # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# def train_dataloader(self):
# return DataLoader(self.train_dataset, batch_size=self.hyper_params['batch_size'], num_workers=8, shuffle=True)
# def val_dataloader(self):
# return DataLoader(self.val_dataset, batch_size=self.hyper_params['batch_size'], num_workers=8, shuffle=False)
# def test_dataloader(self):
# return DataLoader(self.test_dataset, batch_size=self.hyper_params['test_batch_size'], num_workers=8)
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# +++++++++++++++++++++++++++ Oprimizer and Scheduler +++++++++++++++++++++++++++
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def configure_optimizers(self):
# Use Stochastic Gradient Discent
# optimizer = torch.optim.Adam( self.parameters(),
# lr=self.hyper_params['learning_rate'],
# betas=self.hyper_params['betas'],
# eps=self.hyper_params['eps'],
# weight_decay=self.hyper_params['weight_decay'])
# Use Stochastic Gradient Discent
optimizer = torch.optim.SGD( self.parameters(),
lr = self.hyper_params['learning_rate'],
momentum = self.hyper_params['momentum'],
weight_decay = self.hyper_params['weight_decay'])
#scheduler = StepLR(optimizer, step_size=1)
#scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer)
# return [optimizer], [scheduler]
return [optimizer]
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# ++++-+++++++++++++++++++++++++++ Auxiliar functions +++++++++++++++++++++++++++
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# Measure binary accuracy
def binary_acc(self, y_logits, y):
# apply sigmoid to output result (from single node)
y_sigmoid = torch.sigmoid(y_logits)
# define 0.5 as threshold
y_sigmoid[y_sigmoid>=0.5] = 1
y_sigmoid[y_sigmoid<0.5] = 0
# Calculate accuracy (sum of all inputs equal to the targets divided by total number of targets)
acc = 100 * (y_sigmoid == y).sum().type(torch.float) / len(y)
return acc
# Return test y_true and y_predicted vectors for Confusion Matrix in Comet ML
def get_test_labels_predictions(self):
return (self.test_y_true, self.test_y_predicted)