In [2]:
import os
from glob import glob
from time import time, sleep
from random import choice

import numpy as np
from pandas import DataFrame
from psychopy import visual, core, event
from psychopy.hardware import keyboard

from brainflow import DataFilter, BoardShim, BoardIds, BrainFlowInputParams

from utils import get_fns
from psychopy.hardware import keyboard


pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


In [3]:
STD_PORT = '/dev/ttyUSB0'


def get_board_info(board_type='synthetic', connection='usb', usb_port=None):
    """ Gets the BarinFlow ID for the respective OpenBCI board in use

    Parameters:
        board_type (str): Type of base OpenBCI board (options: synthetic, cyton, daisy)
        connection (str): Connection method, either via USB dongle or the Wifi board

    Returns:
        board_id (int): Id value for the board in the BrainFlow API
        """
    params = BrainFlowInputParams()
    if usb_port is None:
        usb_port = STD_PORT

    if board_type == 'synthetic':
        board_id = BoardIds.SYNTHETIC_BOARD.value
    elif board_type == 'cyton':
        if connection == 'usb':
            board_id = BoardIds.CYTON_BOARD.value
            params.serial_port = usb_port
        elif connection == 'wifi':
            board_id = BoardIds.CYTON_WIFI_BOARD.value
    elif board_type == 'daisy':
        if connection == 'usb':
            board_id = BoardIds.CYTON_DAISY_BOARD.value
            params.serial_port = usb_port
        elif connection == 'wifi':
            board_id = BoardIds.CYTON_DAISY_WIFI_BOARD.value

    return board_id, params

In [4]:
class eventRelatedPotential:

    def __init__(self, erp='n170'):
        self.erp = erp
        self.board_prepared = False
        self.board_id = None
        self.params = None
        self.board = None
        self.max_trials = 500
        self._setup_trial()

    def initialize_eeg(self, board_type='synthetic', connection_method='usb', usb_port=None):
        self.board_id, self.params = get_board_info(board_type, connection_method, usb_port)
        self.board = BoardShim(self.board_id, self.params)
        self.board.prepare_session()
        self.board_prepared = True

    def _setup_trial(self):
        if self.erp == 'n170':
            self.image_type = np.random.binomial(1, 0.5, self.max_trials)
        if self.erp == 'p300':
            self.image_type = np.random.binomial(1, 0.5, self.max_trials)

        self.trials = DataFrame(dict(image_type=self.image_type,
                                     timestamp=np.zeros(self.max_trials)))

    def _setup_task(self):
        if self.erp == 'n170':
            self.markernames = ['houses', 'faces']
            self.markers = [1, 2]
        if self.erp == 'p300':
            self.markernames = ['nontargets', 'targets']
            self.markers = [1, 2]

    def _setup_graphics(self):
        self.mywin = visual.Window([1600, 900], monitor='testMonitor', units="deg")

    def run_trial(self, duration, subject, run):
        if self.board_prepared == False:
            self.board.prepare_session()
            self.board_prepared = True

        record_duration = np.float32(duration)
        print("Beginning EEG Stream; Wait 5 seconds for signal to settle... \n")
        self.board.start_stream()
        sleep(5)

        # Get starting time-stamp by pulling the last sample from the board and using its time stamp
        last_sample = self.board.get_current_board_data(1)
        start = last_sample[-1][0]

        # setup graphics
        self._setup_graphics()

        while (time() - start) < record_duration:
            if len(event.getKeys()) > 1:
                print(event.getKeys())

        event.clearEvents()

        # cleanup the session
        self.board.stop_stream()
        #self.board_prepared = False
        data = self.board.get_board_data()
        data_fn, event_fn = get_fns(subject, run, self.erp)
        DataFilter.write_file(data, data_fn, 'w')
        self.mywin.close()
        self.trials.to_csv(event_fn)

In [5]:
n170_exp = eventRelatedPotential(erp='n170')
n170_exp.initialize_eeg(board_type='synthetic')

subject_name = 'kb_test'
duration = 10
trial_num = 3
n170_exp.run_trial(duration=duration,
                   subject=subject_name,
                   run=trial_num)

Beginning EEG Stream; Wait 5 seconds for signal to settle... 

