### Full Pipeline Testing Notebook
Working on integrating the SSVEP stimulus with the EEG processing classes/modules

In [1]:
import sys
# import os
sys.path.append("..")
# import time
# import numpy as np
# from psychopy import visual, event, core, monitors
# from brainflow.board_shim import BoardShim, BrainFlowInputParams, BoardIds
# from mvlearn.embed import CCA
# from multiprocessing import Process, Queue

# Assuming the other modules (brainflow_stream, filtering, segmentation, classification) are available
from modules.brainflow_stream import *
from modules.filtering import *
from modules.segmentation import *
from modules.classification import *



import threading
import time
from brainflow.board_shim import BoardShim
import numpy as np
from multiprocessing import Process, Queue
from psychopy import visual, event, core, monitors
import warnings
warnings.filterwarnings("ignore", message="elementwise comparison failed; returning scalar instead")

class SSVEPStimulus:
    def __init__(self, box_frequencies, box_texts=None, box_text_indices=None, display_index=0, display_mode=None, monitor_name='testMonitor'):
        self.box_frequencies = box_frequencies
        self.box_texts = box_texts
        self.box_text_indices = box_text_indices
        self.display_mode = display_mode

        if box_texts and len(box_texts) != len(box_text_indices):
            raise ValueError("The length of box_texts and box_text_indices must be the same if box_texts is provided.")
        
        monitor = monitors.Monitor(name=monitor_name)
        self.win = visual.Window(
            monitor=monitor, 
            screen=display_index, 
            fullscr=True, 
            color='black', 
            units='pix', 
            allowGUI=False, 
            winType='pyglet',
            autoLog=False
        )

        refresh_rates = []
        for _ in range(2):
            refresh_rate = self.win.getActualFrameRate(nIdentical=80, nWarmUpFrames=200, threshold=1)
            if refresh_rate is not None:
                refresh_rates.append(refresh_rate)
            core.wait(0.1)

        self.refresh_rate = round(np.mean(refresh_rates), 0)  
        print(f"Measured Refresh Rate: {self.refresh_rate:.2f} Hz")

        self.actual_frequencies = self.calculate_actual_frequencies(box_frequencies)
        print(f"Calculated Actual Frequencies: {self.actual_frequencies}")

        sorted_indices = sorted(range(len(self.actual_frequencies)), key=lambda i: self.actual_frequencies[i])
        interleaved_indices = []
        left, right = 0, len(sorted_indices) - 1
        while left <= right:
            if left == right:
                interleaved_indices.append(sorted_indices[left])
            else:
                interleaved_indices.append(sorted_indices[left])
                interleaved_indices.append(sorted_indices[right])
            left += 1
            right -= 1

        self.boxes = []
        self.frame_count = 0

        centerX, centerY = 0, 0
        radius = min(self.win.size) // 3
        num_boxes = len(self.actual_frequencies)

        for i, idx in enumerate(interleaved_indices):
            angle = 2 * np.pi * i / num_boxes
            pos = (centerX + int(radius * np.cos(angle)), centerY + int(radius * np.sin(angle)))
            box = visual.Rect(win=self.win, width=150, height=150, fillColor='white', lineColor='white', pos=pos)
            
            box_info = {
                "box": box,
                "frequency": self.actual_frequencies[idx],
                "frame_count": 0,
                "on": True
            }

            if self.display_mode in ["freq", "both"]:
                freq_text_stim = visual.TextStim(win=self.win, text=f"{self.actual_frequencies[idx]:.2f} Hz", color='black', pos=pos)
                box_info["text"] = freq_text_stim

            if self.display_mode in ["both", "text"] and self.box_texts and idx in box_text_indices:
                box_text = box_texts[box_text_indices.index(idx)]
                text_pos = (pos[0], pos[1] + 30) if self.display_mode == "both" else pos
                box_text_stim = visual.TextStim(win=self.win, text=box_text, color='black', pos=text_pos)
                box_info["box_text"] = box_text_stim
            
            self.boxes.append(box_info)

        self.has_started = False
        self.start_button = visual.Rect(win=self.win, width=300, height=100, fillColor='green', pos=(0, 0))
        self.start_text = visual.TextStim(win=self.win, text='Press Space/Enter to Start', color='white', pos=(0, 0))

    def calculate_actual_frequencies(self, desired_frequencies):
        actual_frequencies = []
        for freq in desired_frequencies:
            frames_per_cycle = round(self.refresh_rate / freq)
            actual_freq = round(self.refresh_rate / frames_per_cycle, 2)
            actual_frequencies.append(actual_freq)
        return actual_frequencies

    def run(self):
        while True:
            keys = event.getKeys()
            if 'escape' in keys:
                break
            elif 'space' in keys or 'return' in keys:
                self.has_started = True
            
            if not self.has_started:
                self.start_button.draw()
                self.start_text.draw()
            else:
                self.frame_count += 1
                for box in self.boxes:
                    flicker_period = self.refresh_rate / box["frequency"]
                    if (self.frame_count % flicker_period) < (flicker_period / 2):
                        if not box["on"]:
                            box["on"] = True
                            box["box"].setAutoDraw(True)
                            if "text" in box:
                                box["text"].setAutoDraw(True)
                            if "box_text" in box:
                                box["box_text"].setAutoDraw(True)
                    else:
                        if box["on"]:
                            box["on"] = False
                            box["box"].setAutoDraw(False)
                            if "text" in box:
                                box["text"].setAutoDraw(False)
                            if "box_text" in box:
                                box["box_text"].setAutoDraw(False)

            self.win.flip()
        
        self.win.close()
        core.quit()

    def stop(self):
        self.has_started = False
        self.win.close()
        core.quit()

# def start_ssvep_stimulus(queue, box_frequencies, box_texts=None, box_text_indices=None, display_index=0, display_mode=None, monitor_name='testMonitor'):
#     try:
#         print("Starting SSVEP Stimulus")
#         stimulus = SSVEPStimulus(box_frequencies, box_texts, box_text_indices, display_index, display_mode, monitor_name)
#         print("Putting actual frequencies into the queue")
#         queue.put(stimulus.actual_frequencies)
#         print("Actual frequencies put into the queue")
#         stimulus.run()
#     except Exception as e:
#         print(f"Error in SSVEP stimulus: {e}")
#         queue.put(None)  # Indicate failure by putting None into the queue

# def run_ssvep_stimulus_in_process(box_frequencies, box_texts=None, box_text_indices=None, display_index=0, display_mode=None, monitor_name='testMonitor'):
#     queue = Queue()
#     stimulus_process = Process(target=start_ssvep_stimulus, args=(queue, box_frequencies, box_texts, box_text_indices, display_index, display_mode, monitor_name))
#     stimulus_process.start()
#     try:
#         print("Waiting to get actual frequencies from the queue")
#         actual_frequencies = queue.get(timeout=30)  # Increase timeout to 30 seconds
#         if actual_frequencies is None:
#             raise Exception("Failed to initialize SSVEP stimulus process.")
#         print("Got actual frequencies from the queue")
#     except Exception as e:
#         print(f"Failed to get actual frequencies: {e}")
#         stimulus_process.terminate()
#         stimulus_process.join()
#         raise
#     return stimulus_process, actual_frequencies

def start_ssvep_stimulus(box_frequencies, box_texts=None, box_text_indices=None, display_index=0, display_mode=None, monitor_name='testMonitor'):

    stimulus = SSVEPStimulus(box_frequencies, box_texts, box_text_indices, display_index, display_mode, monitor_name)
    stimulus.run()

def run_ssvep_stimulus_in_process(box_frequencies, box_texts=None, box_text_indices=None, display_index=0, display_mode=None, monitor_name='testMonitor'):
    
    stimulus_process = Process(target=start_ssvep_stimulus, args=(box_frequencies, box_texts, box_text_indices, display_index, display_mode, monitor_name))
    stimulus_process.start()
    return stimulus_process

pygame 2.5.2 (SDL 2.28.3, Python 3.8.18)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
box_frequencies = [9.25, 11.25, 13.25, 15.25]
box_texts = ['Right', 'Left', 'Up', 'Down']
box_text_indices = [0, 1, 2, 3]
display_index = 0
display_mode = 'both'

run_ssvep_stimulus_in_process(box_frequencies, box_texts, box_text_indices, display_index, display_mode)

NameError: name 'stimulus' is not defined

In [2]:
box_frequencies = [9.25, 11.25, 13.25, 15.25]
box_texts = ['Right', 'Left', 'Up', 'Down']
box_text_indices = [0, 1, 2, 3]
display_index = 0
display_mode = 'both'

# Using run_ssvep_stimulus_in_process to run the stimulus in a separate process and get actual frequencies
stimulus_process, actual_frequencies = run_ssvep_stimulus_in_process(box_frequencies, box_texts, box_text_indices, display_index, display_mode)
print("Actual Frequencies:", actual_frequencies)

# Let the stimulus run for 10 seconds
time.sleep(10)

# Stop the stimulus process
stimulus_process.terminate()
stimulus_process.join()

Waiting to get actual frequencies from the queue
Failed to get actual frequencies: 


Empty: 

In [2]:
serial_port = 'COM4' 
board_id = BoardIds.SYNTHETIC_BOARD # BoardIds.PLAYBACK_FILE_BOARD -> only if recorded with a support board (I believe...)
frequencies = [9.25, 11.25, 13.25, 15.25] # Stimulus frequencies; used for CCA & harmonic generation
buttons = ['Right', 'Left', 'Up', 'Down'] # Adds custom text to each box - must be same length as frequencies 
button_pos = [0, 2, 3, 1] # Assigns positions to custom text - must be same length as buttons
segment_duration = 4 # seconds
display = 0 # Which screen to display the stimulus paradigm on --> 0 is default
display_mode = "both"

# Static Variables - Probably don't need to touch :)
harmonics = np.arange(1, 4) # Generates the 1st, 2nd, & 3rd Harmonics
sampling_rate = BoardShim.get_sampling_rate(board_id)
n_samples = sampling_rate * segment_duration 


eeg_channels = BoardShim.get_eeg_names(board_id)
channel_names = ["O1", "O2", "Oz", "Pz", "P3", "P4", "POz", "P1"]
channel_mapping = dict(zip(eeg_channels, channel_names))

In [3]:
import threading

# Function to run the EEG processing loop
def eeg_processing_loop(segmentation_time_wait, cca_classifier):
    while True:
        segment = segmentation_time_wait.get_segment_time()
        if segment is None:
            time.sleep(0.1)  # Wait a little before trying again
            continue

        eeg_segment = segment[:8, :]  # Assuming 8 channels, adjust if necessary
        detected_freq, correlation = cca_classifier(eeg_segment)
        print(f"Detected frequency using CCA: {detected_freq} Hz with correlation: {correlation:.3f}")

        time.sleep(0.1)

# Setup the board and segmentation
board = BrainFlowBoardSetup(board_id, serial_port)
board.setup()

segmentation_time_wait = Segmentation(board, segment_duration=2)

print('test')

# Run the SSVEP Stimulus in a separate process and get actual frequencies
stimulus_process, actual_freqs = run_ssvep_stimulus_in_process(box_frequencies=frequencies, 
                                                              box_texts=buttons, 
                                                              box_text_indices=button_pos,
                                                              display_index=display, 
                                                              display_mode=display_mode)



cca_classifier = SSVEPClassifier(frequencies=actual_freqs, 
                                 harmonics=np.arange(1, 4), 
                                 sampling_rate=BoardShim.get_sampling_rate(board_id), 
                                 n_samples=BoardShim.get_sampling_rate(board_id) * segment_duration, 
                                 method='CCA', 
                                 stack_harmonics=True)

# Start the EEG processing loop in a separate thread
eeg_thread = threading.Thread(target=eeg_processing_loop, args=(segmentation_time_wait, cca_classifier))
eeg_thread.start()

# Wait for the EEG processing thread to finish (it won't in this example, so you might want to add a condition to break the loop)
eeg_thread.join()

# Ensure to terminate the stimulus process when done
stimulus_process.terminate()
stimulus_process.join()

Board setup and streaming started successfully
test


## Older Test:

In [2]:
from modules.multiprocessing_psychopy_ssvep_stim import *

In [3]:
serial_port = 'COM4' 
board_id = BoardIds.SYNTHETIC_BOARD # BoardIds.PLAYBACK_FILE_BOARD -> only if recorded with a support board (I believe...)
frequencies = [9.25, 11.25, 13.25, 15.25] # Stimulus frequencies; used for CCA & harmonic generation
buttons = ['Right', 'Left', 'Up', 'Down'] # Adds custom text to each box - must be same length as frequencies 
button_pos = [0, 2, 3, 1] # Assigns positions to custom text - must be same length as buttons
segment_duration = 4 # seconds
display = 0 # Which screen to display the stimulus paradigm on --> 0 is default
display_mode = "both"

# Static Variables - Probably don't need to touch :)
harmonics = np.arange(1, 4) # Generates the 1st, 2nd, & 3rd Harmonics
sampling_rate = BoardShim.get_sampling_rate(board_id)
n_samples = sampling_rate * segment_duration 


eeg_channels = BoardShim.get_eeg_names(board_id)
channel_names = ["O1", "O2", "Oz", "Pz", "P3", "P4", "POz", "P1"]
channel_mapping = dict(zip(eeg_channels, channel_names))

### Testing:

In [4]:
board = BrainFlowBoardSetup(board_id, serial_port)
board.setup()

segmentation_time_wait = Segmentation(board, segment_duration=2)
actual_freqs = frequencies

cca_classifier = SSVEPClassifier(frequencies=actual_freqs, 
                                    harmonics=np.arange(1, 4), 
                                    sampling_rate=BoardShim.get_sampling_rate(board_id), 
                                    n_samples=BoardShim.get_sampling_rate(board_id) * segment_duration, 
                                    method='CCA', 
                                    stack_harmonics=True)

# Run the SSVEP Stimulus in a seperate process
run_ssvep_stimulus_in_process(box_frequencies=frequencies, 
                              box_texts=buttons, 
                              box_text_indices=button_pos,
                              display_index=display, 
                              display_mode=display_mode)

while True:
    segment = segmentation_time_wait.get_segment_time()
    if segment is None:
        time.sleep(0.1)  # Wait a little before trying again
        continue

    eeg_segment = segment[:8, :]  # Assuming 8 channels, adjust if necessary
    detected_freq, correlation = cca_classifier(eeg_segment)
    print(f"Detected frequency using CCA: {detected_freq} Hz with correlation: {correlation:.3f}")

    time.sleep(0.1)

Board setup and streaming started successfully
Detected frequency using CCA: None Hz with correlation: 0.000
Detected frequency using CCA: None Hz with correlation: 0.000
Detected frequency using CCA: None Hz with correlation: 0.000
Detected frequency using CCA: None Hz with correlation: 0.000
Detected frequency using CCA: None Hz with correlation: 0.000
Detected frequency using CCA: None Hz with correlation: 0.000
Detected frequency using CCA: None Hz with correlation: 0.000
Detected frequency using CCA: None Hz with correlation: 0.000
Detected frequency using CCA: None Hz with correlation: 0.000
Detected frequency using CCA: None Hz with correlation: 0.000
Detected frequency using CCA: None Hz with correlation: 0.000


KeyboardInterrupt: 

In [None]:
board.stop()


Streaming stopped
Session released


### Reference/Old Code:

In [None]:

# Show board information
print(f"Sampling Rate: {sampling_rate}")
print(f"Default Channels: {eeg_channels}")
print(f"Channel Mapping: {channel_mapping}")

board = BrainFlowBoardSetup(board_id, serial_port)
# board.show_params() # Logger shows this info by default - this is another method to show
board.setup()

Sampling Rate: 250
Default Channels: ['Fz', 'C3', 'Cz', 'C4', 'Pz', 'PO7', 'Oz', 'PO8', 'F5', 'F7', 'F3', 'F1', 'F2', 'F4', 'F6', 'F8']
Channel Mapping: {'Fz': 'O1', 'C3': 'O2', 'Cz': 'Oz', 'C4': 'Pz', 'Pz': 'P3', 'PO7': 'P4', 'Oz': 'POz', 'PO8': 'P1'}
Board setup and streaming started successfully


In [None]:
segmenter = PreProcess(board, segment_duration=segment_duration)
classifier = ClassifySSVEP(frequencies, harmonics, sampling_rate, n_samples, stack_harmonics=False)
classifier_stacked = ClassifySSVEP(frequencies, harmonics, sampling_rate, n_samples, stack_harmonics=True)

while True:
    segment = segmenter.get_segment()
    if segment is not None:
        
        # print(f"Segment shape: {segment.shape}")
        eeg_segment = segment[0:8, :]

        # Step 2: Filter the data
        filtered_segment = segmenter.filter_data(eeg_segment)
        print("Filtered data shape:", filtered_segment.shape)

        # Step 3: Use CCA to match the EEG & Reference (harmonic) signals
            # Unstacked Harmonics (testing)
        detected_freq, correlation = classifier.cca_analysis(filtered_segment)
            # Stacked Harmonics (testing)
        detected_freq_stacked, correlation_stacked = classifier_stacked.cca_analysis(filtered_segment)
        
        print(f"Detected frequency: {detected_freq} Hz with correlation: {correlation}")

        # Optionally save or process the data further
        # segmenter.save_data(filtered_data, "filtered_data.csv")
        # segmenter.save_data(features, "features.csv")

    # Sleep for a while to collect new data
    time.sleep(segmenter.segment_duration)

NameError: name 'PreProcess' is not defined