# EEG-BCI Pipeline Documentation & Developer Guide

This notebook documents a full EEG-based Brain–Computer Interface (BCI) pipeline that connects to an OpenBCI Cyton board and uses real-time neural activity to control a wheelchair assembly. It explains how to acquire EEG data, preprocess and train motor imagery models, and deploy them in a live control loop that translates user intent into motion commands. The goal is both to serve as technical documentation and as a step-by-step guide for developers who need to run, modify, or extend the system.

# 1. Data Acquisition (from Cyton)

This script collects EEG trials from the OpenBCI Cyton board and saves them into organized dataset folders for model training. Each trial corresponds to a single mental task (e.g., imagining moving the left or right hand).

# Imports and Functions
- BrainFlow API: handles connection to the Cyton board.
- NumPy: stores EEG samples in .npy format.
- time/os: timestamps and directory management.

In [None]:
from brainflow import BoardShim, BrainFlowInputParams, BoardIds
import numpy as np
import time, os

- Defines how long each trial lasts, how many samples to expect, and where to save the data.

In [None]:
TRIAL_DURATION = 1          # Seconds per trial
SAMPLES_PER_SEC = 250       # Cyton sample rate
NUM_TRIALS = 100            # Trials per class
DATASET_ROOT = r"...\datasets"

Purpose:
- Ensures the folder structure exists (e.g., motor/left/).
- Saves each trial as a timestamped .npy file.
- Prints confirmation.

In [None]:
def save_trial(data, label, region_dir):
    os.makedirs(os.path.join(region_dir, label), exist_ok=True)
    filename = os.path.join(region_dir, label, f"{int(time.time()*1000)}.npy")
    np.save(filename, data)
    print(f"Saved {data.shape} samples to {filename}")

# Board Setup

- Connects to the Cyton board on the specified COM port.
- Starts the EEG data stream.

In [None]:
params = BrainFlowInputParams()
params.serial_port = "COM3"
board = BoardShim(BoardIds.CYTON_BOARD.value, params)
board.prepare_session()
board.start_stream()

- Each subject gets their own dataset folder.
    - Example:
    - datasets/Grey/motor/left/
    - datasets/Grey/motor/right/

In [None]:
name = input("Enter patient name: ")
root_dir = os.path.join(DATASET_ROOT, name)
motor_dir = os.path.join(root_dir, "motor")

- Selects all 8 Cyton EEG channels.
- This preserves raw flexibility for future preprocessing.

In [None]:
selected_channels = BoardShim.get_eeg_channels(BoardIds.CYTON_BOARD.value)

# Trail Collection Loop

How it works:
- Prompts the user to perform the mental task.
- Flushes the buffer to avoid stale samples.
- Waits exactly TRIAL_DURATION seconds.
- Pulls 250 fresh samples from all selected channels.

In [None]:
try:
    motor_labels = ["left", "right"]
    print("Get ready for EEG data collection...")

    for label in motor_labels:
        print(f"\n--- Starting {NUM_TRIALS} {label.upper()} trials ---")
        for t in range(NUM_TRIALS):
            # input(f"Press Enter to start trial {t+1}/{NUM_TRIALS} ({label})")
            print(f"Imagine moving your {label} hand for {TRIAL_DURATION} seconds...")
            board.get_board_data()  # flush
            time.sleep(TRIAL_DURATION)
            eeg_data = board.get_current_board_data(250)[selected_channels, :]

            desired = SAMPLES_PER_SEC
            current = eeg_data.shape[1]

            if current > desired:
                eeg_data = eeg_data[:, -desired:]  # crop extra
            elif current < desired:
                pad = np.tile(eeg_data[:, -1:], (1, desired - current))
                eeg_data = np.concatenate((eeg_data, pad), axis=1)
            
            save_trial(eeg_data, label, motor_dir)

finally:
    board.stop_stream()
    board.release_session()
    print("\nSession finished safely.")

# 2. Inspect Raw Samples (Optional, Helpful for Toubleshooting)

Change the sample data path to where your's are save to, to view it

In [None]:
sample = np.load("datasets/name/motor/left/sample_001.npy")

plt.figure(figsize=(14, 6))
# TODO: Update these channels to the correct positions on the headset
for i, label in enumerate(["C3", "Cz", "C4", "CPz", "P3", "Pz", "P4", "jaw_connection"]):
    plt.plot(sample[i], label=label, alpha=0.8)

plt.title("Raw Motor EEG Trial (8 channels)")
plt.legend()
plt.show()

# 3. Train a Model

# Imports & Basic Setup
prints if there are any GPUs the workstation can use for training the nerual network

In [None]:
from sklearn.model_selection import KFold
from functions import split_data, standardize, load_data, preprocess_raw_eeg, ACTIONS
from neural_nets import cris_net, res_net, TA_CSPNN, EEGNet
import os

from matplotlib import pyplot as plt
from tensorflow import keras
import tensorflow as tf
import numpy as np
import time

print(tf.__version__)
print("Num GPUs Available:", len(tf.config.experimental.list_physical_devices('GPU')))

# Fit & Save Function
Saves the current model's validation and training is above 77% accuracy 

In [None]:
def fit_and_save(model, epochs, train_X, train_y, validation_X, validation_y, batch_size):
    # fits the network epoch by epoch and saves only accurate models

    train_loss = []
    train_acc = []
    val_loss = []
    val_acc = []

    for epoch in range(epochs):
        history = model.fit(train_X, train_y, epochs=1, batch_size=batch_size,
                            validation_data=(validation_X, validation_y))

        train_acc.append(history.history["accuracy"][-1])
        train_loss.append(history.history["loss"][-1])
        val_acc.append(history.history["val_accuracy"][-1])
        val_loss.append(history.history["val_loss"][-1])

        MODEL_NAME = f"models/Grey/{round(val_acc[-1] * 100, 2)}-{epoch}epoch-{int(time.time())}-loss-{round(val_loss[-1], 2)}.keras"

        if round(val_acc[-1] * 100, 4) >= 77 and round(train_acc[-1] * 100, 4) >= 77:
            # saving & plotting only relevant models
            model.save(MODEL_NAME)
            print("saved: ", MODEL_NAME)

            # Create combined plot with accuracy and loss
            fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

            # TODO: Only save this once training is over
            
            # Accuracy subplot
            ax1.plot(np.arange(len(val_acc)), val_acc, label='val', linewidth=2)
            ax1.plot(np.arange(len(train_acc)), train_acc, label='train', linewidth=2)
            ax1.set_title('Model Accuracy', fontsize=14, fontweight='bold')
            ax1.set_ylabel('Accuracy', fontsize=12)
            ax1.set_xlabel('Epoch', fontsize=12)
            ax1.legend(loc='lower right')
            ax1.grid(True, alpha=0.3)
            
            # Loss subplot
            ax2.plot(np.arange(len(val_loss)), val_loss, label='val', linewidth=2)
            ax2.plot(np.arange(len(train_loss)), train_loss, label='train', linewidth=2)
            ax2.set_title('Model Loss', fontsize=14, fontweight='bold')
            ax2.set_ylabel('Loss', fontsize=12)
            ax2.set_xlabel('Epoch', fontsize=12)
            ax2.legend(loc='upper right')
            ax2.grid(True, alpha=0.3)
            
            plt.tight_layout()
            plt.savefig("pictures/motor_training_curves.png", dpi=150)
            plt.show()

# Dataset Paths, Splitting, and Loading

In [None]:
# Setup paths for user's dataset
DATASET_DIR = os.path.join("datasets", "Grey", "motor")
MODELS_DIR = os.path.join("models", "Grey")

# Create directories
os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs("pictures", exist_ok=True)

split_data(starting_dir=DATASET_DIR, shuffle=True, splitting_percentage=(70, 20, 10), division_factor=0, coupling=False)

# loading dataset
tmp_train_X, train_y = load_data(starting_dir="training_data", shuffle=True, balance=True)
tmp_validation_X, validation_y = load_data(starting_dir="validation_data", shuffle=True, balance=True)

print(f"Train samples: {len(tmp_train_X)}, Val samples: {len(tmp_validation_X)}")

# cleaning the raw data (bandpass 8-30 Hz for motor imagery)
train_X, fft_train_X = preprocess_raw_eeg(tmp_train_X, lowcut=8, highcut=30, coi3order=0)
validation_X, fft_validation_X = preprocess_raw_eeg(tmp_validation_X, lowcut=8, highcut=30, coi3order=0)

# check_other_classifiers(train_X, train_y, validation_X, validation_y)

# reshaping
train_X = train_X.reshape((len(train_X), len(train_X[0]), len(train_X[0, 0]), 1))
validation_X = validation_X.reshape((len(validation_X), len(validation_X[0]), len(validation_X[0, 0]), 1))

# Model Selection & Compilation

In [None]:
model = EEGNet(nb_classes=len(ACTIONS), Chans=8, Samples=250)
model.summary()
model.compile(loss='categorical_crossentropy',
              optimizer='nadam',
              metrics=['accuracy'])

keras.utils.plot_model(model, "pictures/net.png", show_shapes=True)

# Training Loop

In [None]:
batch_size = 32
epochs = 500
fit_and_save(model, epochs, train_X, train_y, validation_X, validation_y, batch_size)

# 4. Test Using Real-Time-Plot

# Imports and Globals
- Introduces all Python modules used in the realtime BCI system.
- Clarifies why each import is needed.
- Sets expectations for dependencies and external libraries.

In [6]:
# This is intended for OpenBCI Cyton Board
# For other boards: https://brainflow.readthedocs.io

from functions import ACTIONS, preprocess_raw_eeg

from brainflow import BoardShim, BrainFlowInputParams, BoardIds
from matplotlib import pyplot as plt
from timeit import default_timer as timer
from tensorflow import keras
import numpy as np
import threading
import argparse
import time
import cv2
import os

# Shared Memory Class (Thread Communication) & Graphical Interface (GUI) Class
Shared Memory Class holds two values:
- sample: the most recent 1-second EEG window (8×250)
- key: keyboard input used for quitting the GUI

Avoids using global variables for thread safety.

Synchronization between threads is handled using a shared mutex lock.

GUI Class:
- Built using OpenCV for lightweight 2D graphics.
- Displays a simple box on a colored background that moves left/right.

Movement is controlled by the model’s predictions:
- “left” → box moves left
- “right” → box moves right
- Has a central horizontal and vertical line as visual anchors.
- Uses random colors for box/lines to visually differentiate elements.
- Resets automatically after a certain number of frames (count_down).

In [7]:
class Shared:
    def __init__(self):
        self.sample = None
        self.key = None


class GraphicalInterface:
    def __init__(self, WIDTH=500, HEIGHT=500, SQ_SIZE=40, MOVE_SPEED=5):
        self.WIDTH = WIDTH
        self.HEIGHT = HEIGHT
        self.SQ_SIZE = SQ_SIZE
        self.MOVE_SPEED = MOVE_SPEED

        self.square = {'x1': int(WIDTH / 2 - SQ_SIZE / 2),
                       'x2': int(WIDTH / 2 + SQ_SIZE / 2),
                       'y1': int(HEIGHT / 2 - SQ_SIZE / 2),
                       'y2': int(HEIGHT / 2 + SQ_SIZE / 2)}

        self.box = np.ones((self.SQ_SIZE, self.SQ_SIZE, 3)) * np.random.uniform(size=(3,))
        self.horizontal_line = np.ones((HEIGHT, 10, 3)) * np.random.uniform(size=(3,))
        self.vertical_line = np.ones((10, WIDTH, 3)) * np.random.uniform(size=(3,))


# Acquisition Thread (Pulling Cyton Samples)
- Runs in a dedicated thread.
- Continuously reads 250 samples (1 second) from the Cyton board buffer.
- Uses BoardShim.get_current_board_data for overlapping windows.
- Extracts EEG channel data from all 8 Cyton channels.
- Writes the newest EEG window into shared_vars.sample.
- Uses a mutex lock to ensure safe shared memory access.

In [8]:
def acquire_signals():
    count = 0
    while True:
        with mutex:
            if count == 0:
                time.sleep(2)
                count += 1
            time.sleep(0.5)

            data = board.get_current_board_data(250)

            eeg_channels = BoardShim.get_eeg_channels(BoardIds.CYTON_BOARD.value)
            sample = [data[ch] for ch in eeg_channels]
            shared_vars.sample = np.array(sample)

            if shared_vars.key == ord("q"):
                break

        time.sleep(0.1)


# Compute Thread (Inference + GUI Control)
How it works:
- Runs in parallel with the acquisition thread.
- Loads the trained Keras model at startup.

Each iteration:
1. Reads the newest EEG window from shared_vars.sample
2. Preprocesses it using the same filters used during training
3. Runs the model to obtain class probabilities
4. Smooths predictions using an exponential moving average (EMA)
5. Applies confidence thresholding
6. Updates GUI movement accordingly
- Prevents jittery or unstable predictions using the EMA filter.
- Displays real-time FPS for performance monitoring.
- Resets the GUI after 100 cycles for a consistent visual experience.
- Exits immediately if the user presses “q”.

In [9]:
def compute_signals():
    MODEL_NAME = "models/Grey/85.0-76epoch-1763074954-loss-0.43.keras"
    model = keras.models.load_model(MODEL_NAME)

    count_down = 100
    EMA = [-1, -1]
    alpha = 0.4
    gui = GraphicalInterface()
    first_run = True

    while True:
        with mutex:

            if count_down == 0:
                gui = GraphicalInterface()
                count_down = 100

            env = np.zeros((gui.WIDTH, gui.HEIGHT, 3))

            nn_input, _ = preprocess_raw_eeg(
                shared_vars.sample.reshape((1, 8, 250)),
                fs=250, lowcut=8, highcut=30, coi3order=0
            )
            nn_input = nn_input.reshape((1, 8, 250, 1))

            nn_out = model.predict(nn_input, verbose=0)[0]

            if EMA[0] == -1:
                EMA = nn_out.copy()
            else:
                EMA = alpha * nn_out + (1 - alpha) * np.array(EMA)

            predicted_action = ACTIONS[np.argmax(EMA)]
            conf = EMA[np.argmax(EMA)]

            if conf > 0.67:
                if predicted_action == "left":
                    gui.square['x1'] -= gui.MOVE_SPEED
                    gui.square['x2'] -= gui.MOVE_SPEED
                elif predicted_action == "right":
                    gui.square['x1'] += gui.MOVE_SPEED
                    gui.square['x2'] += gui.MOVE_SPEED

            count_down -= 1

            # Draw GUI
            env[:, gui.HEIGHT // 2 - 5:gui.HEIGHT // 2 + 5] = gui.horizontal_line
            env[gui.WIDTH // 2 - 5:gui.WIDTH // 2 + 5, :] = gui.vertical_line

            env[
                gui.square['y1']:gui.square['y2'],
                gui.square['x1']:gui.square['x2']
            ] = gui.box

            cv2.imshow('EEG BCI', env)

            if first_run:
                first_run = False
                start = timer()
            else:
                end = timer()
                print("\rFPS:", 1 / (end - start + 1e-6), end='')
                start = timer()

            shared_vars.key = cv2.waitKey(1) & 0xFF
            if shared_vars.key == ord("q"):
                cv2.destroyAllWindows()
                break


# Main Execution Block

In [None]:
params = BrainFlowInputParams()
params.serial_port = "COM3"

board = BoardShim(BoardIds.CYTON_BOARD.value, params)
board.prepare_session()

shared_vars = Shared()
mutex = threading.Lock()

board.start_stream()

acquisition = threading.Thread(target=acquire_signals)
computing = threading.Thread(target=compute_signals)

acquisition.start()
computing.start()

acquisition.join()
computing.join()

board.stop_stream()
board.release_session()

# 5. Move Wheelchair