In [1]:
import time
import sdl2
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime

from brainflow.board_shim import BoardShim, BrainFlowInputParams, LogLevels, BoardIds
from brainflow.data_filter import DataFilter, AggOperations, FilterTypes, WindowOperations, NoiseTypes

from scipy import signal

import dearpygui.dearpygui as dpg

In [2]:
BoardShim.get_eeg_names(BoardIds.CROWN_BOARD.value)

['CP3', 'C3', 'F5', 'PO3', 'PO4', 'F6', 'C4', 'CP4']

In [2]:
board_id = BoardIds.CROWN_BOARD.value
# board_id = BoardIds.SYNTHETIC_BOARD.value
params = BrainFlowInputParams()
params.mac_address = "C0:EE:40:84:DD:56"
params.serial_number = "58a99b0107e64cd40ea5e6607882cbe2"
params.board_id = board_id
params.timeout = 5
BoardShim.enable_dev_board_logger()
board = BoardShim(board_id, params)
board.prepare_session()

[2023-01-31 22:52:40.122] [board_logger] [info] incoming json: {
    "board_id": 23,
    "file": "",
    "file_anc": "",
    "file_aux": "",
    "ip_address": "",
    "ip_address_anc": "",
    "ip_address_aux": "",
    "ip_port": 0,
    "ip_port_anc": 0,
    "ip_port_aux": 0,
    "ip_protocol": 0,
    "mac_address": "C0:EE:40:84:DD:56",
    "master_board": -100,
    "other_info": "",
    "serial_number": "58a99b0107e64cd40ea5e6607882cbe2",
    "serial_port": "",
    "timeout": 5
}
[2023-01-31 22:52:40.122] [board_logger] [trace] Board object created 23
[2023-01-31 22:52:40.122] [board_logger] [debug] Use IP port 9000


In [3]:
sdl2.SDL_Init(sdl2.SDL_INIT_JOYSTICK)
joystick = sdl2.SDL_JoystickOpen(0)

def get_direction():
        sdl2.SDL_PumpEvents()
        # depending on the gamepad this gives you a value between -32768 and +32768
        # or between 0 and 32768
        x = sdl2.SDL_JoystickGetAxis(joystick, 0)
        y = sdl2.SDL_JoystickGetAxis(joystick, 1)
        # x_direction = "right" if joy_x / 32768 > 0.3 else "left" if joy_x / 32768 < -0.1 else "center" # WARNING: A bit offset because of drift, should be 0.2 and -0.2 for the deadzone
        # print(f"X: {joy_x} Y: {joy_y} Direction: {x_direction}                    ", end="\r")

        return x, y

In [4]:
def update_series(data):
    for i, channel in enumerate(data):
        dpg.set_value(f'c{i}', [np.arange(0, len(channel)), channel])
        dpg.fit_axis_data(f'y_axis_c{i}')

def update_direction(x, y, direction):
    dpg.set_value("direction_text", f"X: {x} Y: {y} Direction: {direction}")

In [5]:
signal_length = 256 * 5

In [6]:
dpg.create_context()
dpg.create_viewport(title="Recording Data", width=600, height=800)
dpg.setup_dearpygui()

with dpg.window(label="Recording Data", tag="Primary Window"):
    for i in range(8):
        with dpg.plot(height=75, width=-1, no_menus=True, no_mouse_pos=True):
            dpg.add_plot_axis(dpg.mvXAxis, tag=f"x_axis_c{i}", no_gridlines=True, no_tick_marks=True, no_tick_labels=True)
            dpg.add_plot_axis(dpg.mvYAxis, tag=f"y_axis_c{i}", no_gridlines=True, no_tick_marks=True, no_tick_labels=True)
            
            dpg.add_line_series(np.arange(0, signal_length // 2), np.zeros((signal_length // 2)), label="mV", parent=f"y_axis_c{i}", tag=f"c{i}")
            dpg.set_axis_limits(f'x_axis_c{i}', 0, signal_length // 2)
            dpg.fit_axis_data(f'y_axis_c{i}')
    dpg.add_text("X: None Y: None Direction: None", tag="direction_text")

dpg.set_primary_window("Primary Window", True)

In [7]:
train_test = input("Train or Test? ").lower()

text_keys = {"10000": "center", "01000": "up", "00100": "right", "00010": "down", "00001": "left", "01100": "up-right", "00110": "down-right", "00011": "down-left", "01001": "up-left"}

raw_data = np.zeros((8, signal_length + 512), dtype=np.float64)

start_time = time.time()
inputs = np.empty((8,0))
outputs = []
BoardShim.disable_board_logger()
board.start_stream()
time.sleep(1)
dpg.show_viewport()
while(start_time + 60 > time.time()):
    x, y = get_direction()
    x_d = 2 if x / 32768 > 0.2 else 1 if x / 32768 < -0.2 else 0
    y_d = 2 if y / 32768 < -0.2 else 1 if y / 32768 > 0.2 else 0
    direction = [
        1 if x_d == 0 and y_d == 0 else 0,
        1 if y_d == 2 else 0,
        1 if x_d == 2 else 0,
        1 if y_d == 1 else 0,
        1 if x_d == 1 else 0,
    ]
    data = board.get_board_data()[1:9]
    inputs = np.concatenate((inputs, data), axis=1)
    for i in range(data.shape[1]):
        outputs.append(direction)
    text_direction = text_keys[f"{direction[0]}{direction[1]}{direction[2]}{direction[3]}{direction[4]}"]
    update_direction(x, y, text_direction)
    time.sleep(0.05)
    
    if data.shape[1] != 0:
        data_len = data.shape[1]
        raw_data = np.roll(raw_data, -data_len, axis=1)
        raw_data[:, -data_len if data_len < signal_length else signal_length:] = data[:, -signal_length:]
        
        
        gui_data = np.empty((8, raw_data.shape[1]//2))
        
        for i in range(gui_data.shape[0]):
            gui_data[i] = DataFilter.perform_downsampling(raw_data[i], 2, AggOperations.MEAN.value)
            DataFilter.perform_bandpass(gui_data[i], 256//2, 1.0, 50.0, 4, FilterTypes.BUTTERWORTH.value, 0)
            DataFilter.remove_environmental_noise(gui_data[i], 256//2, NoiseTypes.SIXTY.value)
            # for hz in range(60, 121, 60):
            #     b, a = signal.iirnotch(hz, 10, 128)
            #     gui_data[i] = signal.filtfilt(b, a, gui_data[i]) # Americas wires run at 60Hz; Because wires run at 60Hz, there is a 120Hz harmonic
        
        update_series(gui_data[:, 256:])
    dpg.render_dearpygui_frame()

outputs = np.array(outputs)    
board.stop_stream()
dpg.destroy_context()

In [9]:
now = datetime.now()
dt_string = now.strftime("%d:%m:%Y-%H:%M:%S")

dataset = np.array([inputs, outputs])
np.save(f"data/{train_test}/{dt_string}.npy", dataset)

  dataset = np.array([inputs, outputs])


: 

In [None]:
board.release_session()