Permalink
Cannot retrieve contributors at this time
Fetching contributors…
| #!/usr/bin/env python2.7 | |
| import argparse # new in Python2.7 | |
| import os | |
| import time | |
| import string | |
| import atexit | |
| import threading | |
| import logging | |
| import sys | |
| logging.basicConfig(level=logging.ERROR) | |
| from yapsy.PluginManager import PluginManager | |
| # Load the plugins from the plugin directory. | |
| manager = PluginManager() | |
| if __name__ == '__main__': | |
| print ("------------user.py-------------") | |
| parser = argparse.ArgumentParser(description="OpenBCI 'user'") | |
| parser.add_argument('--board', default="cyton", | |
| help="Choose between [cyton] and [ganglion] boards.") | |
| parser.add_argument('-l', '--list', action='store_true', | |
| help="List available plugins.") | |
| parser.add_argument('-i', '--info', metavar='PLUGIN', | |
| help="Show more information about a plugin.") | |
| parser.add_argument('-p', '--port', | |
| help="For Cyton, port to connect to OpenBCI Dongle " + | |
| "( ex /dev/ttyUSB0 or /dev/tty.usbserial-* ). For Ganglion, MAC address of the board. For both, AUTO to attempt auto-detection.") | |
| parser.set_defaults(port="AUTO") | |
| # baud rate is not currently used | |
| parser.add_argument('-b', '--baud', default=115200, type=int, | |
| help="Baud rate (not currently used)") | |
| parser.add_argument('--no-filtering', dest='filtering', | |
| action='store_false', | |
| help="Disable notch filtering") | |
| parser.set_defaults(filtering=True) | |
| parser.add_argument('-d', '--daisy', dest='daisy', | |
| action='store_true', | |
| help="Force daisy mode (cyton board)") | |
| parser.add_argument('-x', '--aux', dest='aux', | |
| action='store_true', | |
| help="Enable accelerometer/AUX data (ganglion board)") | |
| # first argument: plugin name, then parameters for plugin | |
| parser.add_argument('-a', '--add', metavar=('PLUGIN', 'PARAM'), | |
| action='append', nargs='+', | |
| help="Select which plugins to activate and set parameters.") | |
| parser.add_argument('--log', dest='log', action='store_true', | |
| help="Log program") | |
| parser.add_argument('--plugins-path', dest='plugins_path', nargs='+', | |
| help="Additional path(s) to look for plugins") | |
| parser.set_defaults(daisy=False, log=False) | |
| args = parser.parse_args() | |
| if not(args.add): | |
| print ("WARNING: no plugin selected, you will only be able to communicate with the board. You should select at least one plugin with '--add [plugin_name]'. Use '--list' to show available plugins or '--info [plugin_name]' to get more information.") | |
| if args.board == "cyton": | |
| print ("Board type: OpenBCI Cyton (v3 API)") | |
| import open_bci_v3 as bci | |
| elif args.board == "ganglion": | |
| print ("Board type: OpenBCI Ganglion") | |
| import open_bci_ganglion as bci | |
| else: | |
| raise ValueError('Board type %r was not recognized. Known are 3 and 4' % args.board) | |
| # Check AUTO port selection, a "None" parameter for the board API | |
| if "AUTO" == args.port.upper(): | |
| print("Will try do auto-detect board's port. Set it manually with '--port' if it goes wrong.") | |
| args.port = None | |
| else: | |
| print("Port: ", args.port) | |
| plugins_paths = ["plugins"] | |
| if args.plugins_path: | |
| plugins_paths += args.plugins_path | |
| manager.setPluginPlaces(plugins_paths) | |
| manager.collectPlugins() | |
| # Print list of available plugins and exit | |
| if args.list: | |
| print ("Available plugins:") | |
| for plugin in manager.getAllPlugins(): | |
| print ("\t- " + plugin.name) | |
| exit() | |
| # User wants more info about a plugin... | |
| if args.info: | |
| plugin = manager.getPluginByName(args.info) | |
| if plugin == None: | |
| # eg: if an import fail inside a plugin, yapsy skip it | |
| print ("Error: [ " + args.info + " ] not found or could not be loaded. Check name and requirements.") | |
| else: | |
| print (plugin.description) | |
| plugin.plugin_object.show_help() | |
| exit() | |
| print ("\n------------SETTINGS-------------") | |
| print ("Notch filtering:" + str(args.filtering)) | |
| # Logging | |
| if args.log: | |
| print ("Logging Enabled: " + str(args.log)) | |
| logging.basicConfig(filename="OBCI.log", format='%(asctime)s - %(levelname)s : %(message)s', level=logging.DEBUG) | |
| logging.getLogger('yapsy').setLevel(logging.DEBUG) | |
| logging.info('---------LOG START-------------') | |
| logging.info(args) | |
| else: | |
| print ("user.py: Logging Disabled.") | |
| print ("\n-------INSTANTIATING BOARD-------") | |
| board = bci.OpenBCIBoard(port=args.port, | |
| daisy=args.daisy, | |
| filter_data=args.filtering, | |
| scaled_output=True, | |
| log=args.log, | |
| aux=args.aux) | |
| # Info about effective number of channels and sampling rate | |
| if board.daisy: | |
| print ("Force daisy mode:") | |
| else: | |
| print ("No daisy:") | |
| print (board.getNbEEGChannels(), "EEG channels and", board.getNbAUXChannels(), "AUX channels at", board.getSampleRate(), "Hz.") | |
| print ("\n------------PLUGINS--------------") | |
| # Loop round the plugins and print their names. | |
| print ("Found plugins:") | |
| for plugin in manager.getAllPlugins(): | |
| print ("[ " + plugin.name + " ]") | |
| print("\n") | |
| # Fetch plugins, try to activate them, add to the list if OK | |
| plug_list = [] | |
| callback_list = [] | |
| if args.add: | |
| for plug_candidate in args.add: | |
| # first value: plugin name, then optional arguments | |
| plug_name = plug_candidate[0] | |
| plug_args = plug_candidate[1:] | |
| # Try to find name | |
| plug = manager.getPluginByName(plug_name) | |
| if plug == None: | |
| # eg: if an import fail inside a plugin, yapsy skip it | |
| print ("Error: [ " + plug_name + " ] not found or could not be loaded. Check name and requirements.") | |
| else: | |
| print ("\nActivating [ " + plug_name + " ] plugin...") | |
| if not plug.plugin_object.pre_activate(plug_args, sample_rate=board.getSampleRate(), eeg_channels=board.getNbEEGChannels(), aux_channels=board.getNbAUXChannels(), imp_channels=board.getNbImpChannels()): | |
| print ("Error while activating [ " + plug_name + " ], check output for more info.") | |
| else: | |
| print ("Plugin [ " + plug_name + "] added to the list") | |
| plug_list.append(plug.plugin_object) | |
| callback_list.append(plug.plugin_object) | |
| if len(plug_list) == 0: | |
| fun = None | |
| else: | |
| fun = callback_list | |
| def cleanUp(): | |
| board.disconnect() | |
| print ("Deactivating Plugins...") | |
| for plug in plug_list: | |
| plug.deactivate() | |
| print ("User.py exiting...") | |
| atexit.register(cleanUp) | |
| print ("--------------INFO---------------") | |
| print ("User serial interface enabled...\n\ | |
| View command map at http://docs.openbci.com.\n\ | |
| Type /start to run (/startimp for impedance \n\ | |
| checking, if supported) -- and /stop\n\ | |
| before issuing new commands afterwards.\n\ | |
| Type /exit to exit. \n\ | |
| Board outputs are automatically printed as: \n\ | |
| % <tab> message\n\ | |
| $$$ signals end of message") | |
| print("\n-------------BEGIN---------------") | |
| # Init board state | |
| # s: stop board streaming; v: soft reset of the 32-bit board (no effect with 8bit board) | |
| s = 'sv' | |
| # Tell the board to enable or not daisy module | |
| if board.daisy: | |
| s = s + 'C' | |
| else: | |
| s = s + 'c' | |
| # d: Channels settings back to default | |
| s = s + 'd' | |
| while(s != "/exit"): | |
| # Send char and wait for registers to set | |
| if (not s): | |
| pass | |
| elif("help" in s): | |
| print ("View command map at: \ | |
| http://docs.openbci.com/software/01-OpenBCI_SDK.\n\ | |
| For user interface: read README or view \ | |
| https://github.com/OpenBCI/OpenBCI_Python") | |
| elif board.streaming and s != "/stop": | |
| print ("Error: the board is currently streaming data, please type '/stop' before issuing new commands.") | |
| else: | |
| # read silently incoming packet if set (used when stream is stopped) | |
| flush = False | |
| if('/' == s[0]): | |
| s = s[1:] | |
| rec = False # current command is recognized or fot | |
| if("T:" in s): | |
| lapse = int(s[string.find(s, "T:")+2:]) | |
| rec = True | |
| elif("t:" in s): | |
| lapse = int(s[string.find(s, "t:")+2:]) | |
| rec = True | |
| else: | |
| lapse = -1 | |
| if('startimp' in s): | |
| if board.getBoardType() == "cyton": | |
| print ("Impedance checking not supported on cyton.") | |
| else: | |
| board.setImpedance(True) | |
| if(fun != None): | |
| # start streaming in a separate thread so we could always send commands in here | |
| boardThread = threading.Thread(target=board.start_streaming, args=(fun, lapse)) | |
| boardThread.daemon = True # will stop on exit | |
| try: | |
| boardThread.start() | |
| except: | |
| raise | |
| else: | |
| print ("No function loaded") | |
| rec = True | |
| elif("start" in s): | |
| board.setImpedance(False) | |
| if(fun != None): | |
| # start streaming in a separate thread so we could always send commands in here | |
| boardThread = threading.Thread(target=board.start_streaming, args=(fun, lapse)) | |
| boardThread.daemon = True # will stop on exit | |
| try: | |
| boardThread.start() | |
| except: | |
| raise | |
| else: | |
| print ("No function loaded") | |
| rec = True | |
| elif('test' in s): | |
| test = int(s[s.find("test")+4:]) | |
| board.test_signal(test) | |
| rec = True | |
| elif('stop' in s): | |
| board.stop() | |
| rec = True | |
| flush = True | |
| if rec == False: | |
| print("Command not recognized...") | |
| elif s: | |
| for c in s: | |
| if sys.hexversion > 0x03000000: | |
| board.ser_write(bytes(c, 'utf-8')) | |
| else: | |
| board.ser_write(bytes(c)) | |
| time.sleep(0.100) | |
| line = '' | |
| time.sleep(0.1) #Wait to see if the board has anything to report | |
| # The Cyton nicely return incoming packets -- here supposedly messages -- whereas the Ganglion prints incoming ASCII message by itself | |
| if board.getBoardType() == "cyton": | |
| while board.ser_inWaiting(): | |
| c = board.ser_read().decode('utf-8', errors='replace') # we're supposed to get UTF8 text, but the board might behave otherwise | |
| line += c | |
| time.sleep(0.001) | |
| if (c == '\n') and not flush: | |
| print('%\t'+line[:-1]) | |
| line = '' | |
| elif board.getBoardType() == "ganglion": | |
| while board.ser_inWaiting(): | |
| board.waitForNotifications(0.001) | |
| if not flush: | |
| print(line) | |
| # Take user input | |
| #s = input('--> ') | |
| if sys.hexversion > 0x03000000: | |
| s = input('--> ') | |
| else: | |
| s = raw_input('--> ') |