In [2]:
import time
import numpy as np
from pylsl import StreamInlet, resolve_stream
import mne
# Parameters
number_of_trials_per_type = 25  # Number of trials for each type (right, left)
trial_duration = 4  # seconds
rest_duration = 2   # seconds
trial_types = ['right', 'left']  # Order of trial types
streams = resolve_stream()


In [None]:


def bandpass_filter(data, low_freq, high_freq, sampling_rate):
    # Create a filter
    filter = mne.filter.create_filter(data, sampling_rate, l_freq=low_freq, h_freq=high_freq)
    # Apply the filter
    filtered_data = mne.filter.filter_data(data, sampling_rate, l_freq=low_freq, h_freq=high_freq)
    return filtered_data

In [3]:
for stream in streams:
    print(f"Stream found: {stream.name()} - {stream.type()} - {stream.channel_count()} channels")
# Resolve an EEG stream on the network and create an inlet
print("Looking for an EEG stream...")


streams = resolve_stream('type', 'EEG')
inlet = StreamInlet(streams[0])
print("Stream found. Starting experiment...")


Stream found: NR-2022.07.11 - EEG - 8 channels
Looking for an EEG stream...
Stream found. Starting experiment...


In [5]:
# Initialize a list to store trial data and labels
all_trials_data = []
trials_info = []

# Experiment loop
for trial_type in trial_types:
    for trial in range(number_of_trials_per_type):
        # Present cue for the trial
        cue = f"Trial {trial + 1}/{number_of_trials_per_type}, {trial_type} hand imagery"
        print(cue)
        
        # Record the start timestamp
        start_timestamp = time.time()
        
        # Collect EEG data for the duration of the trial
        trial_data = []
        while time.time() - start_timestamp < trial_duration:
            sample, timestamp = inlet.pull_sample()
            trial_data.append(sample)

        trial_data = np.array(trial_data)
        low_freq = 8
        high_freq = 30
        sampling_rate = 250
        trial_data_filtered = bandpass_filter(trial_data, low_freq, high_freq, sampling_rate)
        end_timestamp = time.time()

        all_trials_data.append(np.array(trial_data_filtered))
        
        # Store the trial data and label
        
        trials_info.append((start_timestamp, end_timestamp, trial_type))
        
        # Rest period
        print("Rest")
        time.sleep(rest_duration)

Trial 1/25, right hand imagery
Rest
Trial 2/25, right hand imagery
Rest
Trial 3/25, right hand imagery
Rest
Trial 4/25, right hand imagery
Rest
Trial 5/25, right hand imagery
Rest
Trial 6/25, right hand imagery
Rest
Trial 7/25, right hand imagery
Rest
Trial 8/25, right hand imagery
Rest
Trial 9/25, right hand imagery
Rest
Trial 10/25, right hand imagery
Rest
Trial 11/25, right hand imagery
Rest
Trial 12/25, right hand imagery
Rest
Trial 13/25, right hand imagery
Rest
Trial 14/25, right hand imagery
Rest
Trial 15/25, right hand imagery
Rest
Trial 16/25, right hand imagery
Rest
Trial 17/25, right hand imagery
Rest
Trial 18/25, right hand imagery
Rest
Trial 19/25, right hand imagery
Rest
Trial 20/25, right hand imagery
Rest
Trial 21/25, right hand imagery
Rest
Trial 22/25, right hand imagery
Rest
Trial 23/25, right hand imagery
Rest
Trial 24/25, right hand imagery
Rest
Trial 25/25, right hand imagery
Rest
Trial 1/25, left hand imagery
Rest
Trial 2/25, left hand imagery
Rest
Trial 3/25, le

In [16]:

#segmenting the data according to the timestamps
sampling_rate = 250  # Hz, adjust this to match your data's sampling rate

# Initialize a list to store segmented trial data
segmented_trials = []

# Segment the data based on trials_info
for start_time, end_time, label in trials_info:
    start_index = int(start_time * sampling_rate)
    end_index = int(end_time * sampling_rate)
    trial_segment = all_trials_data[start_index:end_index, :]
    segmented_trials.append((trial_segment, label))


Experiment completed.


In [1]:
from mne.decoding import CSP

# Assume segmented_trials contains tuples of (segmented_data, label) for each trial
# and labels are 'left' or 'right' for the two classes

# Separate the data into two classes
left_trials = [trial_data for trial_data, label in segmented_trials if label == 'left']
right_trials = [trial_data for trial_data, label in segmented_trials if label == 'right']

# Stack the trials to create 3D arrays (trials x channels x samples)
left_data = np.stack(left_trials)
right_data = np.stack(right_trials)

# Create and fit the CSP model
X = np.concatenate([left_data, right_data], axis=0)
y = np.array(['left'] * len(left_trials) + ['right'] * len(right_trials))

# Initialize CSP
csp = CSP(n_components=4, norm_trace=False)

# Fit CSP on the data
csp.fit(X, y)

# Transform the data using CSP filters
X_csp = csp.transform(X)


NameError: name 'np' is not defined

In [19]:
# Save the CSP features to a file
np.save('csp_features.npy', X_csp)

# Optionally, save the labels as well
np.save('csp_labels.npy', y)