-
Notifications
You must be signed in to change notification settings - Fork 14
/
openbci_matlab.py
168 lines (146 loc) · 6.22 KB
/
openbci_matlab.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#######################################################
# openbci_matlab.py
#
# An interface to allow live streaming from the OpenBCI board
# into Matlab using Python. Python establishes an instance of
# lab streaming layer, which is then received by a Matlab script.
#
# For more information or troubleshooting, see README.md or
# https://github.com/OpenBCI/OpenBCI_MATLAB
#
import sys; sys.path.append('lib')
from pylsl import StreamInfo, StreamOutlet
import argparse
import os
import time
import string
import atexit
import threading
import sys
import open_bci_v3 as bci
class StreamerLSL():
def __init__(self,daisy=False):
parser = argparse.ArgumentParser(description="OpenBCI 'user'")
parser.add_argument('-p', '--port',
help="Port to connect to OpenBCI Dongle " +
"( ex /dev/ttyUSB0 or /dev/tty.usbserial-* )")
parser.add_argument('-d', action="store_true",
help="Enable Daisy Module " +
"-d")
args = parser.parse_args()
port = args.port
print ("\n-------INSTANTIATING BOARD-------")
self.board = bci.OpenBCIBoard(port, daisy=args.d)
self.eeg_channels = self.board.getNbEEGChannels()
self.aux_channels = self.board.getNbAUXChannels()
self.sample_rate = self.board.getSampleRate()
print('{} EEG channels and {} AUX channels at {} Hz'.format(self.eeg_channels, self.aux_channels,self.sample_rate))
def send(self,sample):
print(sample.channel_data)
self.outlet_eeg.push_sample(sample.channel_data)
self.outlet_aux.push_sample(sample.aux_data)
def create_lsl(self):
info_eeg = StreamInfo("OpenBCI_EEG", 'EEG', self.eeg_channels, self.sample_rate,'float32',"openbci_eeg_id1");
info_aux = StreamInfo("OpenBCI_AUX", 'AUX', self.aux_channels,self.sample_rate,'float32',"openbci_aux_id1")
self.outlet_eeg = StreamOutlet(info_eeg)
self.outlet_aux = StreamOutlet(info_aux)
def cleanUp():
board.disconnect()
print ("Disconnecting...")
atexit.register(cleanUp)
def begin(self):
print ("--------------INFO---------------")
print ("User serial interface enabled...\n" + \
"View command map at http://docs.openbci.com.\n" + \
"Type /start to run -- and /stop before issuing new commands afterwards.\n" + \
"Type /exit to exit. \n" + \
"Board outputs are automatically printed as: \n" + \
"% <tab> message\n" + \
"$$$ signals end of message")
print("\n-------------BEGIN---------------")
# Init board state
# s: stop board streaming; v: soft reset of the 32-bit board (no effect with 8bit board)
s = 'sv'
# Tell the board to enable or not daisy module
print(self.board.daisy)
if self.board.daisy:
s = s + 'C'
else:
s = s + 'c'
# d: Channels settings back to default
s = s + 'd'
while(s != "/exit"):
# Send char and wait for registers to set
if (not s):
pass
elif("help" in s):
print ("View command map at:" + \
"http://docs.openbci.com/software/01-OpenBCI_SDK.\n" +\
"For user interface: read README or view" + \
"https://github.com/OpenBCI/OpenBCI_Python")
elif self.board.streaming and s != "/stop":
print ("Error: the board is currently streaming data, please type '/stop' before issuing new commands.")
else:
# read silently incoming packet if set (used when stream is stopped)
flush = False
if('/' == s[0]):
s = s[1:]
rec = False # current command is recognized or fot
if("T:" in s):
lapse = int(s[string.find(s, "T:")+2:])
rec = True
elif("t:" in s):
lapse = int(s[string.find(s, "t:")+2:])
rec = True
else:
lapse = -1
if("start" in s):
# start streaming in a separate thread so we could always send commands in here
boardThread = threading.Thread(target=self.board.start_streaming,args=(self.send,-1))
boardThread.daemon = True # will stop on exit
try:
boardThread.start()
print("Streaming data...")
except:
raise
rec = True
elif('test' in s):
test = int(s[s.find("test")+4:])
self.board.test_signal(test)
rec = True
elif('stop' in s):
self.board.stop()
rec = True
flush = True
if rec == False:
print("Command not recognized...")
elif s:
for c in s:
if sys.hexversion > 0x03000000:
self.board.ser.write(bytes(c, 'utf-8'))
else:
self.board.ser.write(bytes(c))
time.sleep(0.100)
line = ''
time.sleep(0.1) #Wait to see if the board has anything to report
while self.board.ser.inWaiting():
c = self.board.ser.read().decode('utf-8', errors='replace')
line += c
time.sleep(0.001)
if (c == '\n') and not flush:
# print('%\t'+line[:-1])
line = ''
if not flush:
print(line)
# Take user input
#s = input('--> ')
if sys.hexversion > 0x03000000:
s = input('--> ')
else:
s = raw_input('--> ')
def main():
lsl = StreamerLSL()
lsl.create_lsl()
lsl.begin()
if __name__ == '__main__':
main()