|
|
@@ -23,7 +23,8 @@ def handle_sample(sample): |
|
|
import timeit
|
|
|
import atexit
|
|
|
import logging
|
|
|
-
|
|
|
+import threading
|
|
|
+import sys
|
|
|
|
|
|
SAMPLE_RATE = 250.0 # Hz
|
|
|
START_BYTE = 0xA0 # start of data packet
|
|
|
@@ -64,14 +65,14 @@ class OpenBCIBoard(object): |
|
|
"""
|
|
|
|
|
|
def __init__(self, port=None, baud=115200, filter_data=True,
|
|
|
- scaled_output=True, daisy=False, log=True):
|
|
|
+ scaled_output=True, daisy=False, log=True, timeout=None):
|
|
|
if not port:
|
|
|
port = find_port()
|
|
|
if not port:
|
|
|
raise OSError('Cannot find OpenBCI port')
|
|
|
|
|
|
print("Connecting to %s" %(port))
|
|
|
- self.ser = serial.Serial(port, baud)
|
|
|
+ self.ser = serial.Serial(port= port, baudrate = baud, timeout=timeout)
|
|
|
print("Serial established...")
|
|
|
|
|
|
#Initialize 32-bit board, doesn't affect 8bit board
|
|
|
@@ -91,6 +92,10 @@ def __init__(self, port=None, baud=115200, filter_data=True, |
|
|
self.last_odd_sample = OpenBCISample(-1, [], []) # used for daisy
|
|
|
self.log = log
|
|
|
self.log_packet_count = 0
|
|
|
+ self.attempt_reconnect = False
|
|
|
+ self.last_reconnect = 0
|
|
|
+ self.reconnect_freq = 5
|
|
|
+ self.packets_dropped = 0
|
|
|
|
|
|
#Disconnects from board when terminated
|
|
|
atexit.register(self.disconnect)
|
|
|
@@ -129,7 +134,12 @@ def start_streaming(self, callback, lapse=-1): |
|
|
if not isinstance(callback, list):
|
|
|
callback = [callback]
|
|
|
|
|
|
+
|
|
|
+ #Initialize check connection
|
|
|
+ self.check_connection()
|
|
|
+
|
|
|
while self.streaming:
|
|
|
+
|
|
|
# read current sample
|
|
|
sample = self._read_serial_binary()
|
|
|
# if a daisy module is attached, wait to concatenate two samples (main board + daisy) before passing it to callback
|
|
|
@@ -147,12 +157,11 @@ def start_streaming(self, callback, lapse=-1): |
|
|
else:
|
|
|
for call in callback:
|
|
|
call(sample)
|
|
|
+
|
|
|
if(lapse > 0 and timeit.default_timer() - start_time > lapse):
|
|
|
self.stop();
|
|
|
if self.log:
|
|
|
self.log_packet_count = self.log_packet_count + 1;
|
|
|
-
|
|
|
-
|
|
|
|
|
|
|
|
|
"""
|
|
|
@@ -166,28 +175,30 @@ def start_streaming(self, callback, lapse=-1): |
|
|
def _read_serial_binary(self, max_bytes_to_skip=3000):
|
|
|
def read(n):
|
|
|
b = self.ser.read(n)
|
|
|
- # print b
|
|
|
- return b
|
|
|
+ if not b:
|
|
|
+ self.warn('Device appears to be stalled. Quitting...')
|
|
|
+ sys.exit()
|
|
|
+ raise Exception('Device Stalled')
|
|
|
+ sys.exit()
|
|
|
+ return '\xFF'
|
|
|
+ else:
|
|
|
+ return b
|
|
|
|
|
|
for rep in xrange(max_bytes_to_skip):
|
|
|
|
|
|
#---------Start Byte & ID---------
|
|
|
if self.read_state == 0:
|
|
|
+
|
|
|
b = read(1)
|
|
|
- if not b:
|
|
|
- if not self.ser.inWaiting():
|
|
|
- self.warn('Device appears to be stalled. Restarting...')
|
|
|
- self.ser.write('b') # restart if it's stopped...
|
|
|
- time.sleep(.100)
|
|
|
- continue
|
|
|
- else:
|
|
|
- if struct.unpack('B', b)[0] == START_BYTE:
|
|
|
- if(rep != 0):
|
|
|
- self.warn('Skipped %d bytes before start found' %(rep))
|
|
|
- rep = 0;
|
|
|
- packet_id = struct.unpack('B', read(1))[0] #packet id goes from 0-255
|
|
|
+
|
|
|
+ if struct.unpack('B', b)[0] == START_BYTE:
|
|
|
+ if(rep != 0):
|
|
|
+ self.warn('Skipped %d bytes before start found' %(rep))
|
|
|
+ rep = 0;
|
|
|
+ packet_id = struct.unpack('B', read(1))[0] #packet id goes from 0-255
|
|
|
+ log_bytes_in = str(packet_id);
|
|
|
|
|
|
- self.read_state = 1
|
|
|
+ self.read_state = 1
|
|
|
|
|
|
#---------Channel Data---------
|
|
|
elif self.read_state == 1:
|
|
|
@@ -198,6 +209,7 @@ def read(n): |
|
|
literal_read = read(3)
|
|
|
|
|
|
unpacked = struct.unpack('3B', literal_read)
|
|
|
+ log_bytes_in = log_bytes_in + '|' + str(literal_read);
|
|
|
|
|
|
#3byte int in 2s compliment
|
|
|
if (unpacked[0] >= 127):
|
|
|
@@ -225,6 +237,8 @@ def read(n): |
|
|
|
|
|
#short = h
|
|
|
acc = struct.unpack('>h', read(2))[0]
|
|
|
+ log_bytes_in = log_bytes_in + '|' + str(acc);
|
|
|
+
|
|
|
if self.scaling_output:
|
|
|
aux_data.append(acc*scale_fac_accel_G_per_count)
|
|
|
else:
|
|
|
@@ -234,28 +248,23 @@ def read(n): |
|
|
#---------End Byte---------
|
|
|
elif self.read_state == 3:
|
|
|
val = struct.unpack('B', read(1))[0]
|
|
|
+ log_bytes_in = log_bytes_in + '|' + str(val);
|
|
|
self.read_state = 0 #read next packet
|
|
|
if (val == END_BYTE):
|
|
|
sample = OpenBCISample(packet_id, channel_data, aux_data)
|
|
|
+ self.packets_dropped = 0
|
|
|
return sample
|
|
|
else:
|
|
|
- self.warn("Warning: Unexpected END_BYTE found <%s> instead of <%s>,\
|
|
|
- discarted packet with id <%d>"
|
|
|
- %(val, END_BYTE, packet_id))
|
|
|
+ self.warn("ID:<%d> <Unexpected END_BYTE found <%s> instead of <%s>"
|
|
|
+ %(packet_id, val, END_BYTE))
|
|
|
+ logging.debug(log_bytes_in);
|
|
|
+ self.packets_dropped = self.packets_dropped + 1
|
|
|
|
|
|
"""
|
|
|
|
|
|
- Used by exit clean up function (atexit)
|
|
|
+ Clean Up (atexit)
|
|
|
|
|
|
"""
|
|
|
- def warn(self, text):
|
|
|
- if self.log:
|
|
|
- if self.log_packet_count:
|
|
|
- logging.info('Data packets received:'+str(self.log_packet_count))
|
|
|
- self.log_packet_count = 0;
|
|
|
- logging.warning(text)
|
|
|
- print("Warning: %s" % text)
|
|
|
-
|
|
|
def stop(self):
|
|
|
print("Stopping streaming...\nWait for buffer to flush...")
|
|
|
self.streaming = False
|
|
|
@@ -267,15 +276,25 @@ def disconnect(self): |
|
|
if(self.streaming == True):
|
|
|
self.stop()
|
|
|
if (self.ser.isOpen()):
|
|
|
- self.warn("Closing Serial...")
|
|
|
+ print("Closing Serial...")
|
|
|
self.ser.close()
|
|
|
+ logging.warning('serial closed')
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
SETTINGS AND HELPERS
|
|
|
|
|
|
"""
|
|
|
+ def warn(self, text):
|
|
|
+ if self.log:
|
|
|
+ #log how many packets where sent succesfully in between warnings
|
|
|
+ if self.log_packet_count:
|
|
|
+ logging.info('Data packets received:'+str(self.log_packet_count))
|
|
|
+ self.log_packet_count = 0;
|
|
|
+ logging.warning(text)
|
|
|
+ print("Warning: %s" % text)
|
|
|
+
|
|
|
|
|
|
def print_incoming_text(self):
|
|
|
"""
|
|
|
@@ -309,6 +328,94 @@ def print_bytes_in(self): |
|
|
while self.streaming:
|
|
|
print(struct.unpack('B',self.ser.read())[0]);
|
|
|
|
|
|
+ '''Incoming Packet Structure:
|
|
|
+ Start Byte(1)|Sample ID(1)|Channel Data(24)|Aux Data(6)|End Byte(1)
|
|
|
+ 0xA0|0-255|8, 3-byte signed ints|3 2-byte signed ints|0xC0'''
|
|
|
+
|
|
|
+ def print_packets_in(self):
|
|
|
+ if not self.streaming:
|
|
|
+ self.ser.write('b')
|
|
|
+ self.streaming = True
|
|
|
+ skipped_str = ''
|
|
|
+ while self.streaming:
|
|
|
+ b = struct.unpack('B', self.ser.read())[0];
|
|
|
+
|
|
|
+ if b == START_BYTE:
|
|
|
+ self.attempt_reconnect = False
|
|
|
+ if skipped_str:
|
|
|
+ logging.debug('SKIPPED\n' + skipped_str + '\nSKIPPED')
|
|
|
+ skipped_str = ''
|
|
|
+
|
|
|
+ packet_str = "%03d"%(b) + '|';
|
|
|
+ b = struct.unpack('B', self.ser.read())[0];
|
|
|
+ packet_str = packet_str + "%03d"%(b) + '|';
|
|
|
+
|
|
|
+ #data channels
|
|
|
+ for i in xrange(24-1):
|
|
|
+ b = struct.unpack('B', self.ser.read())[0];
|
|
|
+ packet_str = packet_str + '.' + "%03d"%(b);
|
|
|
+
|
|
|
+ b = struct.unpack('B', self.ser.read())[0];
|
|
|
+ packet_str = packet_str + '.' + "%03d"%(b) + '|';
|
|
|
+
|
|
|
+ #aux channels
|
|
|
+ for i in xrange(6-1):
|
|
|
+ b = struct.unpack('B', self.ser.read())[0];
|
|
|
+ packet_str = packet_str + '.' + "%03d"%(b);
|
|
|
+
|
|
|
+ b = struct.unpack('B', self.ser.read())[0];
|
|
|
+ packet_str = packet_str + '.' + "%03d"%(b) + '|';
|
|
|
+
|
|
|
+ #end byte
|
|
|
+ b = struct.unpack('B', self.ser.read())[0];
|
|
|
+
|
|
|
+ #Valid Packet
|
|
|
+ if b == END_BYTE:
|
|
|
+ packet_str = packet_str + '.' + "%03d"%(b) + '|VAL';
|
|
|
+ print(packet_str)
|
|
|
+ #logging.debug(packet_str)
|
|
|
+
|
|
|
+ #Invalid Packet
|
|
|
+ else:
|
|
|
+ packet_str = packet_str + '.' + "%03d"%(b) + '|INV';
|
|
|
+ #Reset
|
|
|
+ self.attempt_reconnect = True
|
|
|
+
|
|
|
+
|
|
|
+ else:
|
|
|
+ print(b)
|
|
|
+ if b == END_BYTE:
|
|
|
+ skipped_str = skipped_str + '|END|'
|
|
|
+ else:
|
|
|
+ skipped_str = skipped_str + "%03d"%(b) + '.'
|
|
|
+
|
|
|
+ if self.attempt_reconnect and (timeit.default_timer()-self.last_reconnect) > self.reconnect_freq:
|
|
|
+ self.last_reconnect = timeit.default_timer()
|
|
|
+ self.warn('Reconnecting')
|
|
|
+ self.reconnect()
|
|
|
+
|
|
|
+
|
|
|
+ def check_connection(self, interval = 2, max_packets_to_skip=10):
|
|
|
+ #check number of dropped packages and establish connection problem if too large
|
|
|
+ if self.packets_dropped > max_packets_to_skip:
|
|
|
+ #if error, attempt to reconect
|
|
|
+ self.reconnect()
|
|
|
+ # check again again in 2 seconds
|
|
|
+ threading.Timer(interval, self.check_connection).start()
|
|
|
+
|
|
|
+ def reconnect(self):
|
|
|
+ self.packets_dropped = 0
|
|
|
+ self.warn('Reconnecting')
|
|
|
+ self.stop()
|
|
|
+ time.sleep(0.5)
|
|
|
+ self.ser.write('v')
|
|
|
+ time.sleep(0.5)
|
|
|
+ self.ser.write('b')
|
|
|
+ time.sleep(0.5)
|
|
|
+ self.streaming = True
|
|
|
+ #self.attempt_reconnect = False
|
|
|
+
|
|
|
+
|
|
|
#Adds a filter at 60hz to cancel out ambient electrical noise
|
|
|
def enable_filters(self):
|
|
|
self.ser.write('f')
|
|
|
|
0 comments on commit
38d65e4