From 38d65e46d5f41cdbe2c5ce00d046b05c51fb3e2c Mon Sep 17 00:00:00 2001 From: Rodrigo Ceballos Date: Mon, 27 Jul 2015 12:35:18 -0400 Subject: [PATCH] testing --- open_bci_v3.py | 173 ++++++++++++++++++++++++++++++++++++++++++++----------- plugins/print.py | 9 +-- scripts/test.py | 13 ++++- test_log.py | 33 +++++++++++ user.py | 10 +++- 5 files changed, 195 insertions(+), 43 deletions(-) create mode 100644 test_log.py diff --git a/open_bci_v3.py b/open_bci_v3.py index 1e6137a..74135e0 100644 --- a/open_bci_v3.py +++ b/open_bci_v3.py @@ -23,7 +23,8 @@ def handle_sample(sample): import timeit import atexit import logging - +import threading +import sys SAMPLE_RATE = 250.0 # Hz START_BYTE = 0xA0 # start of data packet @@ -64,14 +65,14 @@ class OpenBCIBoard(object): """ def __init__(self, port=None, baud=115200, filter_data=True, - scaled_output=True, daisy=False, log=True): + scaled_output=True, daisy=False, log=True, timeout=None): if not port: port = find_port() if not port: raise OSError('Cannot find OpenBCI port') print("Connecting to %s" %(port)) - self.ser = serial.Serial(port, baud) + self.ser = serial.Serial(port= port, baudrate = baud, timeout=timeout) print("Serial established...") #Initialize 32-bit board, doesn't affect 8bit board @@ -91,6 +92,10 @@ def __init__(self, port=None, baud=115200, filter_data=True, self.last_odd_sample = OpenBCISample(-1, [], []) # used for daisy self.log = log self.log_packet_count = 0 + self.attempt_reconnect = False + self.last_reconnect = 0 + self.reconnect_freq = 5 + self.packets_dropped = 0 #Disconnects from board when terminated atexit.register(self.disconnect) @@ -129,7 +134,12 @@ def start_streaming(self, callback, lapse=-1): if not isinstance(callback, list): callback = [callback] + + #Initialize check connection + self.check_connection() + while self.streaming: + # read current sample sample = self._read_serial_binary() # if a daisy module is attached, wait to concatenate two samples (main board + daisy) before passing it to callback @@ -147,12 +157,11 @@ def start_streaming(self, callback, lapse=-1): else: for call in callback: call(sample) + if(lapse > 0 and timeit.default_timer() - start_time > lapse): self.stop(); if self.log: self.log_packet_count = self.log_packet_count + 1; - - """ @@ -166,28 +175,30 @@ def start_streaming(self, callback, lapse=-1): def _read_serial_binary(self, max_bytes_to_skip=3000): def read(n): b = self.ser.read(n) - # print b - return b + if not b: + self.warn('Device appears to be stalled. Quitting...') + sys.exit() + raise Exception('Device Stalled') + sys.exit() + return '\xFF' + else: + return b for rep in xrange(max_bytes_to_skip): #---------Start Byte & ID--------- if self.read_state == 0: + b = read(1) - if not b: - if not self.ser.inWaiting(): - self.warn('Device appears to be stalled. Restarting...') - self.ser.write('b') # restart if it's stopped... - time.sleep(.100) - continue - else: - if struct.unpack('B', b)[0] == START_BYTE: - if(rep != 0): - self.warn('Skipped %d bytes before start found' %(rep)) - rep = 0; - packet_id = struct.unpack('B', read(1))[0] #packet id goes from 0-255 + + if struct.unpack('B', b)[0] == START_BYTE: + if(rep != 0): + self.warn('Skipped %d bytes before start found' %(rep)) + rep = 0; + packet_id = struct.unpack('B', read(1))[0] #packet id goes from 0-255 + log_bytes_in = str(packet_id); - self.read_state = 1 + self.read_state = 1 #---------Channel Data--------- elif self.read_state == 1: @@ -198,6 +209,7 @@ def read(n): literal_read = read(3) unpacked = struct.unpack('3B', literal_read) + log_bytes_in = log_bytes_in + '|' + str(literal_read); #3byte int in 2s compliment if (unpacked[0] >= 127): @@ -225,6 +237,8 @@ def read(n): #short = h acc = struct.unpack('>h', read(2))[0] + log_bytes_in = log_bytes_in + '|' + str(acc); + if self.scaling_output: aux_data.append(acc*scale_fac_accel_G_per_count) else: @@ -234,28 +248,23 @@ def read(n): #---------End Byte--------- elif self.read_state == 3: val = struct.unpack('B', read(1))[0] + log_bytes_in = log_bytes_in + '|' + str(val); self.read_state = 0 #read next packet if (val == END_BYTE): sample = OpenBCISample(packet_id, channel_data, aux_data) + self.packets_dropped = 0 return sample else: - self.warn("Warning: Unexpected END_BYTE found <%s> instead of <%s>,\ - discarted packet with id <%d>" - %(val, END_BYTE, packet_id)) + self.warn("ID:<%d> instead of <%s>" + %(packet_id, val, END_BYTE)) + logging.debug(log_bytes_in); + self.packets_dropped = self.packets_dropped + 1 """ - Used by exit clean up function (atexit) + Clean Up (atexit) """ - def warn(self, text): - if self.log: - if self.log_packet_count: - logging.info('Data packets received:'+str(self.log_packet_count)) - self.log_packet_count = 0; - logging.warning(text) - print("Warning: %s" % text) - def stop(self): print("Stopping streaming...\nWait for buffer to flush...") self.streaming = False @@ -267,8 +276,9 @@ def disconnect(self): if(self.streaming == True): self.stop() if (self.ser.isOpen()): - self.warn("Closing Serial...") + print("Closing Serial...") self.ser.close() + logging.warning('serial closed') """ @@ -276,6 +286,15 @@ def disconnect(self): SETTINGS AND HELPERS """ + def warn(self, text): + if self.log: + #log how many packets where sent succesfully in between warnings + if self.log_packet_count: + logging.info('Data packets received:'+str(self.log_packet_count)) + self.log_packet_count = 0; + logging.warning(text) + print("Warning: %s" % text) + def print_incoming_text(self): """ @@ -309,6 +328,94 @@ def print_bytes_in(self): while self.streaming: print(struct.unpack('B',self.ser.read())[0]); + '''Incoming Packet Structure: + Start Byte(1)|Sample ID(1)|Channel Data(24)|Aux Data(6)|End Byte(1) + 0xA0|0-255|8, 3-byte signed ints|3 2-byte signed ints|0xC0''' + + def print_packets_in(self): + if not self.streaming: + self.ser.write('b') + self.streaming = True + skipped_str = '' + while self.streaming: + b = struct.unpack('B', self.ser.read())[0]; + + if b == START_BYTE: + self.attempt_reconnect = False + if skipped_str: + logging.debug('SKIPPED\n' + skipped_str + '\nSKIPPED') + skipped_str = '' + + packet_str = "%03d"%(b) + '|'; + b = struct.unpack('B', self.ser.read())[0]; + packet_str = packet_str + "%03d"%(b) + '|'; + + #data channels + for i in xrange(24-1): + b = struct.unpack('B', self.ser.read())[0]; + packet_str = packet_str + '.' + "%03d"%(b); + + b = struct.unpack('B', self.ser.read())[0]; + packet_str = packet_str + '.' + "%03d"%(b) + '|'; + + #aux channels + for i in xrange(6-1): + b = struct.unpack('B', self.ser.read())[0]; + packet_str = packet_str + '.' + "%03d"%(b); + + b = struct.unpack('B', self.ser.read())[0]; + packet_str = packet_str + '.' + "%03d"%(b) + '|'; + + #end byte + b = struct.unpack('B', self.ser.read())[0]; + + #Valid Packet + if b == END_BYTE: + packet_str = packet_str + '.' + "%03d"%(b) + '|VAL'; + print(packet_str) + #logging.debug(packet_str) + + #Invalid Packet + else: + packet_str = packet_str + '.' + "%03d"%(b) + '|INV'; + #Reset + self.attempt_reconnect = True + + + else: + print(b) + if b == END_BYTE: + skipped_str = skipped_str + '|END|' + else: + skipped_str = skipped_str + "%03d"%(b) + '.' + + if self.attempt_reconnect and (timeit.default_timer()-self.last_reconnect) > self.reconnect_freq: + self.last_reconnect = timeit.default_timer() + self.warn('Reconnecting') + self.reconnect() + + + def check_connection(self, interval = 2, max_packets_to_skip=10): + #check number of dropped packages and establish connection problem if too large + if self.packets_dropped > max_packets_to_skip: + #if error, attempt to reconect + self.reconnect() + # check again again in 2 seconds + threading.Timer(interval, self.check_connection).start() + + def reconnect(self): + self.packets_dropped = 0 + self.warn('Reconnecting') + self.stop() + time.sleep(0.5) + self.ser.write('v') + time.sleep(0.5) + self.ser.write('b') + time.sleep(0.5) + self.streaming = True + #self.attempt_reconnect = False + + #Adds a filter at 60hz to cancel out ambient electrical noise def enable_filters(self): self.ser.write('f') diff --git a/plugins/print.py b/plugins/print.py index d540d5a..b895d70 100644 --- a/plugins/print.py +++ b/plugins/print.py @@ -6,10 +6,11 @@ def activate(self): # called with each new sample def __call__(self, sample): - sample_string = "ID: %f\n%s\n%s" %(sample.id, str(sample.channel_data)[1:-1], str(sample.aux_data)[1:-1]) - print "---------------------------------" - print sample_string - print "---------------------------------" + if sample: + sample_string = "ID: %f\n%s\n%s" %(sample.id, str(sample.channel_data)[1:-1], str(sample.aux_data)[1:-1]) + print "---------------------------------" + print sample_string + print "---------------------------------" # DEBBUGING # try: diff --git a/scripts/test.py b/scripts/test.py index d32d20f..a8cdc76 100644 --- a/scripts/test.py +++ b/scripts/test.py @@ -1,6 +1,8 @@ import sys; sys.path.append('..') # help python find open_bci_v3.py relative to scripts folder import open_bci_v3 as bci import os +import logging +import time def printData(sample): #os.system('clear') @@ -13,7 +15,12 @@ def printData(sample): if __name__ == '__main__': - port = '/dev/ttyUSB0' + port = '/dev/tty.usbserial-DN0096XA' baud = 115200 - board = bci.OpenBCIBoard(port=port) - board.start_streaming(printData) + logging.basicConfig(filename="test.log",format='%(asctime)s - %(levelname)s : %(message)s',level=logging.DEBUG) + logging.info('---------LOG START-------------') + board = bci.OpenBCIBoard(port=port, scaled_output=False, log=True) + board.ser.write('v') + time.sleep(0.100) + #board.start_streaming(printData) + board.print_bytes_in() diff --git a/test_log.py b/test_log.py new file mode 100644 index 0000000..e45a1e9 --- /dev/null +++ b/test_log.py @@ -0,0 +1,33 @@ +import sys; sys.path.append('..') # help python find open_bci_v3.py relative to scripts folder +import open_bci_v3 as bci +import os +import logging +import time + +def printData(sample): + #os.system('clear') + print "----------------" + print("%f" %(sample.id)) + print sample.channel_data + print sample.aux_data + print "----------------" + + + +if __name__ == '__main__': + port = '/dev/tty.usbserial-DN0096XA' + baud = 115200 + logging.basicConfig(filename="test.log",format='%(message)s',level=logging.DEBUG) + logging.info('---------LOG START-------------') + board = bci.OpenBCIBoard(port=port, scaled_output=False, log=True) + + #32 bit reset + board.ser.write('v') + time.sleep(0.100) + + #connect pins to vcc + board.ser.write('p') + time.sleep(0.100) + + #board.start_streaming(printData) + board.print_packets_in() \ No newline at end of file diff --git a/user.py b/user.py index d224f64..4a272e9 100644 --- a/user.py +++ b/user.py @@ -7,6 +7,7 @@ import atexit import threading import logging +import sys from yapsy.PluginManager import PluginManager @@ -71,13 +72,13 @@ #Logging if args.log: print "Logging Enabled" - logging.basicConfig(filename="OBCI.log",format='%(asctime)s - %(levelname)s - %(message)s',level=logging.DEBUG) + logging.basicConfig(filename="OBCI.log",format='%(asctime)s - %(levelname)s : %(message)s',level=logging.DEBUG) logging.info('---------LOG START-------------') logging.info(args) print "\n-------INSTANTIATING BOARD-------" - board = bci.OpenBCIBoard(port=args.port, daisy=args.daisy, filter_data=args.filtering, scaled_output=True, log=args.log) + board = bci.OpenBCIBoard(port=args.port, daisy=args.daisy, filter_data=args.filtering, scaled_output=False, log=args.log) # Info about effective number of channels and sampling rate if board.daisy: @@ -183,7 +184,10 @@ def cleanUp(): # start streaming in a separate thread so we could always send commands in here boardThread = threading.Thread(target=board.start_streaming, args=(fun, lapse)) boardThread.daemon = True # will stop on exit - boardThread.start() + try: + boardThread.start() + except: + raise else: print "No function loaded" rec = True