mne.io.read_raw_edf: Loads the file and read the data

raw.info 

In [None]:
# Before running the code i had to download ipykernel
import os
import mne
import sys
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from mne.preprocessing import ICA, create_eog_epochs
from mpl_toolkits.mplot3d import Axes3D
print(sys.executable)
#%matplotlib notebook
%matplotlib inline


2. Load & combine data by task for 1 subject

Subject 1 data was loaded & combined based on the task
imagined dataset : runs 4, 8, 12
actual movement dataset: runs 3, 7, 11

In [None]:

def process_and_save_subject_task(subject_id, task_name, runs, base_raw_path, base_output_path):
    """
    Loads raw runs for a subject/task, combines them, and saves the result.
    """
    print(f"--- Processing Subject {subject_id}, Task: {task_name} ---")

    # 1. Load and Combine the raw data
    subject_folder = f"S{subject_id:03d}"
    subject_folder_path = os.path.join(base_raw_path, subject_folder)
    
    raw_files = []
    for run_number in runs:
        file_name = f"{subject_folder}R{run_number:02d}.edf"
        file_path = os.path.join(subject_folder_path, file_name)
        raw = mne.io.read_raw_edf(file_path, preload=True, stim_channel='auto')
        raw_files.append(raw)
        
    raw_combined = mne.concatenate_raws(raw_files)

    # 2. Define the output path and save the file
    output_folder = os.path.join(base_output_path, subject_folder)
    os.makedirs(output_folder, exist_ok=True)
    
    output_filename = f"{subject_folder}_{task_name}_raw.fif"
    output_path = os.path.join(output_folder, output_filename)
    
    raw_combined.save(output_path, overwrite=True)
    print(f"âœ… Saved combined data to: {output_path}\n")

# --- Configuration ---
base_raw_path = r"C:\Users\524yu\OneDrive\Documents\VSCODEE\BMI-Robotic-Control\Datasets\raw"
base_output_path = r"C:\Users\524yu\OneDrive\Documents\VSCODEE\BMI-Robotic-Control\Datasets\processed"

TASK_RUNS = {
    'imagined_movement': [4, 8, 12],
    'actual_movement': [3, 7, 11]
}

# --- Now, you can easily process multiple subjects in a loop ---
# Let's process the first 3 subjects as an example
for subject in range(1, 2):
    process_and_save_subject_task(
        subject_id=subject,
        task_name='imagined_movement',
        runs=TASK_RUNS['imagined_movement'],
        base_raw_path=base_raw_path,
        base_output_path=base_output_path
    )
    process_and_save_subject_task(
        subject_id=subject,
        task_name='actual_movement',
        runs=TASK_RUNS['actual_movement'],
        base_raw_path=base_raw_path,
        base_output_path=base_output_path
    )

Load & prepare the reorganised and concatonated data for understanding

In [None]:
subject_id = 1
task_name = 'imagined_movement'

base_output_path = r"C:\Users\524yu\OneDrive\Documents\VSCODEE\BMI-Robotic-Control\Datasets\processed"
subject_folder = f"S{subject_id:03d}"
processed_file = os.path.join(base_output_path, subject_folder, f"{subject_folder}_{task_name}_raw.fif")

raw = mne.io.read_raw_fif(processed_file, preload=True)

print(raw.info)
print(f"Data Duration: {raw.times[-1]/60:.2f} minutes")


Clean channel names & set a 2D montage (togographical map of nodes)

In [None]:
raw.rename_channels(lambda name: name.replace('.', '').strip().upper())

raw.set_channel_types({ch: 'eeg' for ch in raw.ch_names})

montage = mne.channels.make_standard_montage('standard_1020')
raw.set_montage(montage, match_case=False, match_alias=True, on_missing='warn')

missing_positions= []

for ch in raw.info['chs']:
    if np.allclose(ch['loc'][:3], 0.0):
        missing_positions.append(ch['ch_name'])

print("missing positions will be in middle of diagram", missing_positions)


raw.plot_sensors(show_names=True) # 2D montage

Adds a 3D montage visualisation [not working]

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np



# Extract channel positions
pos = np.array([ch['loc'][:3] for ch in raw.info['chs']])
names = raw.ch_names

# Create 3D figure
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(111, projection='3d')

# Plot electrode positions
ax.scatter(pos[:, 0], pos[:, 1], pos[:, 2], s=60, c='r')

# Add channel names as labels
for i, name in enumerate(names):
    ax.text(pos[i, 0], pos[i, 1], pos[i, 2], name, fontsize=8)

# Axis labels and title
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_title("3D EEG Electrode Layout (Notebook-safe)")

plt.show()


## EEG Signal Visualisation (Raw & Spectral Analysis)

Visualise the EEG data 

Grab Plotting of all regions before filtering
1. Visualise raw EEG signal in time domain.
2. Visualise the power spectral density of the EEG signal (how signal power is distributed across different frequncy)

In [None]:
def plot_eeg_overview(raw_obj):
    print("Showing all channels together...")
    raw.plot(
        duration=5,            # how many seconds per screen
        n_channels=len(raw.ch_names),  # display all channels
        scalings='auto',       # automatically scale signal amplitude
        title='All EEG Channels',
        butterfly=True         # overlay channels for better comparison
    )

    psd = raw.compute_psd(fmax=60)
    psd.plot(picks="eeg", exclude = "bads", average = True, dB = True)
    # Pick - picks what type of signal it has to plot; eeg signal
    # exlude - do not include bad channels
    # average -

    print("Showing channels in their own categories")
    channel_subgroups = {
        "Frontal": [ch for ch in raw.ch_names if ch.startswith(("F", "Fp"))],
        "Central": [ch for ch in raw.ch_names if ch.startswith("C")],
        "Parietal": [ch for ch in raw.ch_names if ch.startswith("P")],
        "Occipital": [ch for ch in raw.ch_names if ch.startswith("O")],
        "Temporal": [ch for ch in raw.ch_names if ch.startswith(("T", "Tp"))]
    }

    for region, channels in channel_subgroups.items():
        if channels:
            print(f"--- {region} Channels ---")
            
            # Raw signal plot
            raw.plot(picks=channels, duration=5, title=f"{region} EEG Channels")
            
            # PSD plot
            psd = raw.compute_psd(picks=channels)
            fig = psd.plot(average=True, dB=True)  # Remove 'title'
            fig.suptitle(f"{region} PSD")           # Set title manually


    plt.show()
plot_eeg_overview(raw)


##Bandpass Filtering & Notch Filter
-Remove slow drifts (DC offsets) & high-frequency noise.

From above, line noise is observed at the 60Hz mark thus a notch filter was used

l_freq = 1 : High-pass filter at 1Hz
h_freq = 40: Low-pass filter at 40Hz

In [None]:
raw_filtered = raw.copy().filter(l_freq=1., h_freq=40.)
raw_filtered.notch_filter(freqs=[50])

Replots the channels & PSD to view the effects of the Bandpass & Notch Filter.

In [None]:
plot_eeg_overview(raw_filtered)

Using the Automated ICA Pipeline (first few subjects first)

In [None]:
ica = ICA(n_components=20, random_state=97, max_iter=800)
ica.fit(raw_filtered)

ica.plot_sources(raw_filtered)
ica.exclude = [0, 4, 11]

raw_cleaned = ica.apply(raw_filtered.copy())

print("Display original data")
raw_filtered.plot(title="Original Data")

print("\nDisplaying data after removing artifacts")
raw_cleaned.plot(title="Cleaned Data", duration=5, n_channels=len(raw.ch_names))


##Define EEG Frequency Bands

Brainwave ranges such as,
delta: 1-4Hz
theta: 4-8Hz
alpha: 8-13Hz
beta:  13-30Hz
gamma: 30-40Hz

In [None]:
bands = {
    'delta': (1, 4),
    'theta': (4, 8),
    'alpha': (8, 13),
    'beta': (13, 30),
    'gamma': (30, 40)
}

## Bandpower Computation Power for Each Channel
- Computes how much power is in each frequency band [main feature for EEG analysis]


#REMINDER THAT I NEED TO MOVE UP IMPORTS TO THE FIRST CODING BLOCK IN THE FUTURE

In [None]:
def bandpower(data, sf, band):
    from scipy.signal import welch
    low, high = band
    freqs, psd = welch(data, sf, nperseg=sf*2)
    freq_res = freqs[1] - freqs[0]
    idx_band = np.logical_and(freqs >= low, freqs <= high)
    return np.sum(psd[idx_band]) * freq_res

sf = raw_cleaned.info['sfreq']
eeg_channels = raw_cleaned.pick_types(eeg=True).ch_names

band_powers = []

for ch in eeg_channels:
    data = raw_cleaned.get_data(picks=ch)[0]
    features = {'channels': ch}
    for band_name, band_range in bands.items():
        features[band_name] = bandpower(data, sf, band_range)
    band_powers.append(features)

band_df = pd.DataFrame(band_powers)
band_df.head()
    