-
Notifications
You must be signed in to change notification settings - Fork 4
/
OnlineSystem.py
125 lines (110 loc) · 3.65 KB
/
OnlineSystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import argparse
import time
from multiprocessing import Queue, Process, freeze_support
import keyboard
import Online.Controller as Controller
import Online.Stimulator as Stimulator
from config import cfg, merge_cfg_from_file
def parse_args():
parser = argparse.ArgumentParser(
description='Online typing BCI main entry'
)
parser.add_argument(
'--cfg',
dest='cfg_file',
help='Config file for subject',
default=None,
type=str
)
parser.add_argument(
'--screen-index',
'-s',
dest='screen_index',
help='Screen index',
default=-1,
type=int
)
# testing parameters
parser.add_argument(
'--test',
'-t',
dest='test',
help='activate testing mode',
action='store_true'
)
parser.add_argument(
'--date',
'-d',
dest='model_date',
help='model datetime',
type=str,
default=None
)
return parser.parse_args()
def quit_process():
Controller.quit_flag = True
def online_main(screen_index, test, cfg_file=None, model_date=None):
if cfg_file is not None:
merge_cfg_from_file(cfg_file)
# q_result: controller -> screen (sending judgement result and start/quit signal)
# q_stim: screen -> controller (sending event order)
q_result = Queue()
q_stim = Queue()
# create stimulator object
stim_string = cfg.exp_config.train_string if not test else cfg.exp_config.test_string
kwargs = {
'q_stim': q_stim,
'q_result': q_result,
'screen_index': screen_index,
'stim_string': stim_string,
'amp': cfg.amp_info.amp,
'trigger_type': cfg.amp_info.trigger_type,
'stim_dir': cfg.exp_config.stim_dir if not cfg.exp_config.bidir else None # None for bidir
}
print('Configuration finished. Start process.')
# start process
process = Process(target=Stimulator.run_exp, kwargs=kwargs)
process.start()
# main process
if test:
# testing mode
# create data_client object
n_channel = len(cfg.subj_info.montage)
if cfg.amp_info.amp == 'neuracle':
from Online import Neuracle
data_client = Neuracle.Neuracle(n_channel=n_channel + 1, # +1 for trigger channel
samplerate=cfg.amp_info.samplerate)
else:
raise ValueError("Unexpected amplifier type")
controller = Controller.TestingController(q_stim=q_stim, q_result=q_result, dataclient=data_client,
model_date=model_date, stim_string=stim_string)
else:
# training mode
controller = Controller.TrainingController(q_stim=q_stim, q_result=q_result, stim_string=stim_string)
# write exp config info to log file
controller.write_exp_log()
# waiting start signal
keyboard.wait('s')
# put starting signal into q_result
q_result.put(-2)
# set quit hotkey
keyboard.add_hotkey('q', quit_process)
while controller.char_cnt < len(controller.stim_string) and not Controller.quit_flag:
controller.run()
time.sleep(0.05)
# close events file io
controller.close()
if test:
# writing down itr
print('accu: %.2f, average time: %.2f, itr: %.2f' % controller.itr())
# turn off data client thread
data_client.close()
# terminate process
if Controller.quit_flag:
process.terminate()
process.join()
if __name__ == '__main__':
# freeze support first
freeze_support()
args = parse_args()
online_main(args.screen_index, args.test, args.cfg_file, args.model_date)