In [2]:
import matplotlib
import pathlib
matplotlib.use('Qt5Agg')

# Load Data

In [34]:
import mne

# Load EDF file (BIDS formatted)
raw = mne.io.read_raw_edf('/home/donaf-strange/LAB_WORK/eeg_arithmetic_project/data/files/Subject01_2.edf', preload=True)
raw.set_eeg_reference('average')  # Re-reference to average

Extracting EDF parameters from /home/donaf-strange/LAB_WORK/eeg_arithmetic_project/data/files/Subject01_2.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 30999  =      0.000 ...    61.998 secs...
EEG channel type selected for re-referencing
Applying average reference.
Applying a custom ('EEG',) reference.


Unnamed: 0,General,General.1
,Filename(s),Subject01_2.edf
,MNE object type,RawEDF
,Measurement date,2011-01-01 at 00:00:00 UTC
,Participant,1
,Experimenter,Unknown
,Acquisition,Acquisition
,Duration,00:01:02 (HH:MM:SS)
,Sampling frequency,500.00 Hz
,Time points,31000
,Channels,Channels


# Bandpass Filter

In [35]:
# Bandpass filter (0.5–45 Hz)
raw.filter(0.5, 45, fir_design='firwin', phase='zero-double')  # Zero-phase FIR

Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 45 Hz

FIR filter parameters
---------------------
Designing a two-pass forward and reverse, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-12 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 45.00 Hz
- Upper transition bandwidth: 11.25 Hz (-12 dB cutoff frequency: 50.62 Hz)
- Filter length: 3301 samples (6.602 s)



[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.0s


Unnamed: 0,General,General.1
,Filename(s),Subject01_2.edf
,MNE object type,RawEDF
,Measurement date,2011-01-01 at 00:00:00 UTC
,Participant,1
,Experimenter,Unknown
,Acquisition,Acquisition
,Duration,00:01:02 (HH:MM:SS)
,Sampling frequency,500.00 Hz
,Time points,31000
,Channels,Channels


# Notch Filter (50 Hz)

In [5]:
raw.notch_filter(50, notch_widths=1)  # Adjust width if needed

Filtering raw data in 1 contiguous segment
Setting up band-stop filter from 49 - 51 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandstop filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 49.00
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 48.75 Hz)
- Upper passband edge: 51.00 Hz
- Upper transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 51.25 Hz)
- Filter length: 3301 samples (6.602 s)



[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.0s


Unnamed: 0,General,General.1
,Filename(s),Subject00_1.edf
,MNE object type,RawEDF
,Measurement date,2011-01-01 at 00:00:00 UTC
,Participant,0
,Experimenter,Unknown
,Acquisition,Acquisition
,Duration,00:03:02 (HH:MM:SS)
,Sampling frequency,500.00 Hz
,Time points,91000
,Channels,Channels


# Inspect Filter Effects

In [6]:
raw.plot_psd(fmin=0.5, fmax=60)  # Before/after comparison

NOTE: plot_psd() is a legacy function. New code should use .compute_psd().plot().
Effective window size : 4.096 (s)
Plotting power spectral density (dB=True).


  raw.plot_psd(fmin=0.5, fmax=60)  # Before/after comparison


<MNELineFigure size 1000x350 with 1 Axes>

# High-Pass for ERPs

In [7]:
raw.filter(1.0, None)  # High-pass at 1 Hz for ERP studies

Filtering raw data in 1 contiguous segment
Setting up high-pass filter at 1 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal highpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 1.00
- Lower transition bandwidth: 1.00 Hz (-6 dB cutoff frequency: 0.50 Hz)
- Filter length: 1651 samples (3.302 s)



[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.1s


Unnamed: 0,General,General.1
,Filename(s),Subject00_1.edf
,MNE object type,RawEDF
,Measurement date,2011-01-01 at 00:00:00 UTC
,Participant,0
,Experimenter,Unknown
,Acquisition,Acquisition
,Duration,00:03:02 (HH:MM:SS)
,Sampling frequency,500.00 Hz
,Time points,91000
,Channels,Channels


# Adaptive Filtering: Artifact Subspace Reconstruction (ASR)
### When to Use: When your EEG has non-stationary artifacts (e.g., muscle bursts, movement).

In [8]:
import numpy as np
from scipy.signal import butter, lfilter

def apply_asr(data, sfreq, cutoff=20):
    """
    Apply Artifact Subspace Reconstruction (ASR) to EEG data.

    Parameters:
        data (ndarray): EEG data (channels x samples)
        sfreq (float): Sampling frequency
        cutoff (float): Cutoff frequency for ASR

    Returns:
        clean_data (ndarray): Cleaned EEG data
    """
    # Design a high-pass filter
    nyquist = sfreq / 2
    b, a = butter(4, cutoff / nyquist, btype='highpass')

    # Apply the filter to each channel
    clean_data = lfilter(b, a, data, axis=1)
    return clean_data

# Example usage
eeg_data = raw.get_data()  # Get EEG data from MNE Raw object
sfreq = raw.info['sfreq']  # Sampling frequency
cleaned_data = apply_asr(eeg_data, sfreq, cutoff=20)

# Replace the raw data with cleaned data
raw._data = cleaned_data
raw.plot(title='After ASR', n_channels=10, duration=5)


Using matplotlib as 2D backend.


<MNEBrowseFigure size 800x800 with 4 Axes>

# Epochs

The Task Structure

The dataset includes:

    Resting EEG: 180 seconds (last 3 minutes of rest).

    Task EEG: 60 seconds (first 1 minute of mental arithmetic).

    Events: Serial subtraction tasks (no explicit markers, but timings are fixed).

Key Timing:

    Task starts at 180 seconds (after 3 min rest).

    Task duration: 60 seconds (1 min).

In [9]:
# Create epochs from 180s to 240s (task period)
events = mne.make_fixed_length_events(
    raw, 
    start=180,  # Start of task (in seconds)
    stop=240,   # End of task
    duration=2.0,  # Epoch duration (2s)
    overlap=1.0    # 50% overlap (optional)
)

# Define epoch windows
epochs = mne.Epochs(
    raw,
    events,
    tmin=0,      # Start of epoch (relative to event)
    tmax=2.0,    # End of epoch (2s later)
    baseline=None,  # No baseline (or use pre-task rest)
    preload=True
)

Not setting metadata
1 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 1 events and 1001 original time points ...
1 bad epochs dropped


You might need to alter reject/flat-criteria or drop bad channels to avoid this. You can use Epochs.plot_drop_log() to see which channels are responsible for the dropping of epochs.
  epochs = mne.Epochs(


### Resting State as Baseline

In [10]:
rest_epochs = mne.make_fixed_length_events(raw, start=0, stop=180, duration=2.0)
task_epochs = mne.make_fixed_length_events(raw, start=180, stop=240, duration=2.0)

# Baseline correct using rest
epochs = mne.Epochs(
    raw,
    task_epochs,
    tmin=0,
    tmax=2.0,
    baseline=(0, 0.5),  # Use first 500ms of rest epochs
    preload=True
)

Not setting metadata
1 matching events found
Applying baseline correction (mode: mean)
0 projection items activated
Using data from preloaded Raw for 1 events and 1001 original time points ...
1 bad epochs dropped


You might need to alter reject/flat-criteria or drop bad channels to avoid this. You can use Epochs.plot_drop_log() to see which channels are responsible for the dropping of epochs.
  epochs = mne.Epochs(


### Group-Specific Epoching (Good vs. Bad Counters)

In [11]:
import pandas as pd

# Define the correct path to the CSV file
csv_path = "/home/donaf-strange/LAB_WORK/eeg_arithmetic_project/data/files/subject-info.csv"

# Read the participants data with the correct delimiter (comma in this case)
participants = pd.read_csv(csv_path, sep=",")  # Use ',' as the delimiter

# Print column names to verify
print("Columns in the CSV file:", participants.columns)

# Separate participants into groups based on Count Quality
group_b = participants[participants["Count quality"] == 0]["Subject"].tolist()  # Bad counters
group_g = participants[participants["Count quality"] == 1]["Subject"].tolist()  # Good counters

# Print the groups for verification
print("Bad counters:", group_b)
print("Good counters:", group_g)


'''# Function to process EEG data for a group
def process_group(group, label):
    print(f"Processing {label} group...")
    for subject in group:
        print(f"Processing EEG data for {subject}")
        
        # Apply preprocessing steps to the previously defined raw data
        raw.filter(0.5, 45, fir_design="firwin", phase="zero-double")  # Bandpass filter
        raw.notch_filter(50, notch_widths=1)  # Notch filter
        
        # Plot PSD for inspection
        raw.plot_psd(fmin=0.5, fmax=60)
        
        # Create epochs for task period (180s to 240s)
        events = mne.make_fixed_length_events(
            raw, start=180, stop=240, duration=2.0, overlap=1.0
        )
        epochs = mne.Epochs(
            raw, events, tmin=0, tmax=2.0, baseline=None, preload=True
        )
        
        # Save processed epochs
        #output_path = f"{data_dir}/processed/{subject}_epochs.fif"
        #epochs.save(output_path, overwrite=True)
        #print(f"Saved processed epochs for {subject} to {output_path}")

# Process both groups
process_group(group_b, "Bad counters")
process_group(group_g, "Good counters")'''

Columns in the CSV file: Index(['Subject', 'Age', 'Gender', 'Recording year', 'Number of subtractions',
       'Count quality'],
      dtype='object')
Bad counters: ['Subject00', 'Subject04', 'Subject06', 'Subject09', 'Subject10', 'Subject14', 'Subject19', 'Subject21', 'Subject22', 'Subject30']
Good counters: ['Subject01', 'Subject02', 'Subject03', 'Subject05', 'Subject07', 'Subject08', 'Subject11', 'Subject12', 'Subject13', 'Subject15', 'Subject16', 'Subject17', 'Subject18', 'Subject20', 'Subject23', 'Subject24', 'Subject25', 'Subject26', 'Subject27', 'Subject28', 'Subject29', 'Subject31', 'Subject32', 'Subject33', 'Subject34', 'Subject35']


'# Function to process EEG data for a group\ndef process_group(group, label):\n    print(f"Processing {label} group...")\n    for subject in group:\n        print(f"Processing EEG data for {subject}")\n        \n        # Apply preprocessing steps to the previously defined raw data\n        raw.filter(0.5, 45, fir_design="firwin", phase="zero-double")  # Bandpass filter\n        raw.notch_filter(50, notch_widths=1)  # Notch filter\n        \n        # Plot PSD for inspection\n        raw.plot_psd(fmin=0.5, fmax=60)\n        \n        # Create epochs for task period (180s to 240s)\n        events = mne.make_fixed_length_events(\n            raw, start=180, stop=240, duration=2.0, overlap=1.0\n        )\n        epochs = mne.Epochs(\n            raw, events, tmin=0, tmax=2.0, baseline=None, preload=True\n        )\n        \n        # Save processed epochs\n        #output_path = f"{data_dir}/processed/{subject}_epochs.fif"\n        #epochs.save(output_path, overwrite=True)\n        #pri

### Epoch by Group

In [12]:
import mne
from pathlib import Path

# Path to your task-only .edf files
DATA_PATH = Path("/home/donaf-strange/LAB_WORK/eeg_arithmetic_project/data/files")

def get_all_subjects(data_path=DATA_PATH):
    """
    Extracts subject IDs from available task files like SubjectXX_2.edf
    """
    edf_files = list(data_path.glob("Subject??_2.edf"))
    subject_ids = [f.stem.replace("_2", "") for f in edf_files]
    return sorted(subject_ids)

def load_task_eeg(subject_id, data_path=DATA_PATH):
    """
    Loads only the 60s task EEG file for a subject.
    """
    task_file = data_path / f"{subject_id}_2.edf"
    raw_task = mne.io.read_raw_edf(task_file, preload=True)
    return raw_task

def epoch_task_eeg(subject_id, data_path=DATA_PATH):
    """
    Epochs the 60s task EEG into fixed 2-second windows.
    """
    raw = load_task_eeg(subject_id, data_path)
    events = mne.make_fixed_length_events(raw, start=0, stop=60, duration=2.0)
    epochs = mne.Epochs(raw, events, tmin=0.0, tmax=2.0, baseline=None, preload=True)
    return epochs

def epoch_all_task_subjects(data_path=DATA_PATH):
    """
    Epochs all task EEG files into 2-second segments.
    Returns: dict of {subject_id: epochs}
    """
    all_subjects = get_all_subjects(data_path)
    all_epochs = {}

    for subject_id in all_subjects:
        try:
            print(f"Epoching {subject_id}...")
            epochs = epoch_task_eeg(subject_id, data_path)
            all_epochs[subject_id] = epochs
        except Exception as e:
            print(f"⚠️ Error processing {subject_id}: {e}")

    return all_epochs


In [13]:
task_epochs = epoch_all_task_subjects()

# For example, get Subject13 epochs:
epochs_sub13 = task_epochs["Subject13"]

# Or combine all subjects into a list:
all_epochs_list = list(task_epochs.values())


Epoching Subject00...
Extracting EDF parameters from /home/donaf-strange/LAB_WORK/eeg_arithmetic_project/data/files/Subject00_2.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 30999  =      0.000 ...    61.998 secs...
Not setting metadata
30 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 30 events and 1001 original time points ...
0 bad epochs dropped
Epoching Subject01...
Extracting EDF parameters from /home/donaf-strange/LAB_WORK/eeg_arithmetic_project/data/files/Subject01_2.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 30999  =      0.000 ...    61.998 secs...
Not setting metadata
30 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 30 events and 1001 original time points ...
0 bad epochs dropped
Epoching Subject02...
Extracting EDF param

Channels marked as bad:
none


### Compare groups

In [14]:
# Constants
DATA_PATH = Path("/home/donaf-strange/LAB_WORK/eeg_arithmetic_project/data/files")
CSV_PATH = DATA_PATH / "subject-info.csv"  # Update if CSV is elsewhere

def load_task_eeg(subject_id, data_path=DATA_PATH):
    """
    Loads task EEG for one subject.
    """
    file = data_path / f"{subject_id}_2.edf"
    raw = mne.io.read_raw_edf(file, preload=True)
    return raw

def epoch_task_eeg(subject_id, data_path=DATA_PATH):
    """
    Epochs 60s task EEG into 2-second segments.
    """
    raw = load_task_eeg(subject_id, data_path)
    events = mne.make_fixed_length_events(raw, start=0, stop=60, duration=2.0)
    return mne.Epochs(raw, events, tmin=0, tmax=2.0, baseline=None, preload=True)

def load_group_epochs(csv_path=CSV_PATH, data_path=DATA_PATH):
    """
    Loads group assignments from CSV and returns separate lists of epochs for group B and G.
    """
    df = pd.read_csv(csv_path)
    epochs_b, epochs_g = [], []

    for _, row in df.iterrows():
        subject_id = row['Subject']
        group = row['Count quality']

        try:
            print(f"Epoching {subject_id} [Group {group}]...")
            epochs = epoch_task_eeg(subject_id, data_path)
            if group == 0:
                epochs_b.append(epochs)
            elif group == 1:
                epochs_g.append(epochs)
        except Exception as e:
            print(f"⚠️ Error with {subject_id}: {e}")

    return epochs_b, epochs_g


In [15]:
epochs_b, epochs_g = load_group_epochs()

# Then your comparison code will work:
evoked_b = mne.grand_average([e.average() for e in epochs_b])
evoked_g = mne.grand_average([e.average() for e in epochs_g])

# Verify available channel names
print("Available channels:", evoked_b.info['ch_names'])

# Use a valid channel name from the list
valid_channel = evoked_b.info['ch_names'][0]  # Replace with an appropriate channel name

mne.viz.plot_compare_evokeds(
    {"Bad Counters": evoked_b, "Good Counters": evoked_g},
    picks=valid_channel,
    title=f"ERP at {valid_channel}: Group B vs. G"
)


Epoching Subject00 [Group 0]...
Extracting EDF parameters from /home/donaf-strange/LAB_WORK/eeg_arithmetic_project/data/files/Subject00_2.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 30999  =      0.000 ...    61.998 secs...
Not setting metadata
30 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 30 events and 1001 original time points ...
0 bad epochs dropped
Epoching Subject01 [Group 1]...
Extracting EDF parameters from /home/donaf-strange/LAB_WORK/eeg_arithmetic_project/data/files/Subject01_2.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 30999  =      0.000 ...    61.998 secs...
Not setting metadata
30 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 30 events and 1001 original time points ...
0 bad epochs dropped
Epoching Subject02 [Gr

  mne.viz.plot_compare_evokeds(


[<Figure size 800x600 with 1 Axes>]

1. What You Can Tentatively Conclude
(If the Plot Shows Clear Trends)

    Group Differences in Early Processing (0–300 ms):

        If Group G (Good Counters) shows a larger P1/N1 (early positive/negative peaks) at Fp1, it suggests:

            Faster attentional engagement to the arithmetic task.

            More efficient sensory processing in prefrontal regions.

    Late Components (300–2000 ms):

        If Group B (Bad Counters) has a sustained negative shift (or reduced positivity), it implies:

            Higher cognitive load (linked to frontal theta/alpha suppression).

            Inefficient working memory maintenance (Fp1 is near dorsolateral prefrontal cortex).

Example Observation:

    "Good counters exhibit a stronger early P2 component (~200 ms) at Fp1, possibly reflecting rapid task engagement, while bad counters show a prolonged frontal negativity (>500 ms), suggesting sustained effort or cognitive strain."

2. What You Cannot Conclude Yet

    Statistical Significance: Without p-values or confidence intervals, differences might be noise.

    Spatial Specificity: Fp1 alone can’t reveal network dynamics (need other electrodes).

    Cognitive Mechanism: Is the effect due to attention, memory, or error monitoring?

3. Next Steps to Strengthen Conclusions
A. Add Statistical Validation

    Cluster-based permutation test (for time windows 

In [16]:
from mne.stats import permutation_cluster_test

# Combine data from all Epochs objects in each group
X_b = np.concatenate([e.get_data()[:, 0, :] for e in epochs_b], axis=0)  # Fp1 = channel 0
X_g = np.concatenate([e.get_data()[:, 0, :] for e in epochs_g], axis=0)  # Fp1 = channel 0

# Perform permutation cluster test
t_obs, clusters, p_values, _ = permutation_cluster_test([X_b, X_g], n_permutations=1000)

# Mark significant clusters (p < 0.05) on the ERP plot
for cluster, p_val in zip(clusters, p_values):
	if p_val < 0.05:
		tmin, tmax = cluster[0][0] / sfreq, cluster[0][-1] / sfreq  # Convert sample indices to time
		plt.fill_betweenx([-5, 5], tmin, tmax, color='gray', alpha=0.3, label='p < 0.05')

Using a threshold of 3.850099
stat_fun(H1): min=8.538678864112182e-07 max=5.402692811264322
Running initial clustering …
Found 4 clusters


  t_obs, clusters, p_values, _ = permutation_cluster_test([X_b, X_g], n_permutations=1000)
  t_obs, clusters, p_values, _ = permutation_cluster_test([X_b, X_g], n_permutations=1000)


  0%|          | Permuting : 0/999 [00:00<?,       ?it/s]

In [17]:
# Ensure both evoked_b and evoked_g have the same channels
common_channels = set(evoked_b.info['ch_names']).intersection(evoked_g.info['ch_names'])
evoked_b.pick_channels(list(common_channels))
evoked_g.pick_channels(list(common_channels))

# Verify available channel names after alignment
print("Available channels after alignment:", evoked_b.info['ch_names'])

# Use a valid channel name from the aligned list
valid_channel = evoked_b.info['ch_names'][0]  # Replace with an appropriate channel name

# Plot comparison using a valid channel name
import matplotlib.pyplot as plt
mne.viz.plot_compare_evokeds(
    {"Bad Counters": evoked_b, "Good Counters": evoked_g},
    picks=valid_channel,  # Use a valid channel name
    title=f"ERP at {valid_channel}: Group B vs. G",
    axes=plt.gca(),
    show=True,
    truncate_yaxis=False,
    ylim=dict(eeg=[-5, 5]),  # Set Y-axis range
    ci=True  # Add confidence intervals
)

# Add significance markers (if you have p-values)
plt.fill_betweenx([-5, 5], 0.3, 0.6, color='gray', alpha=0.3, label='p < 0.05')
plt.legend()
plt.show()

NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
Available channels after alignment: ['EEG Pz', 'EEG Fp1', 'EEG F4', 'EEG O2', 'ECG ECG', 'EEG F3', 'EEG O1', 'EEG T3', 'EEG A2-A1', 'EEG T4', 'EEG Fp2', 'EEG T6', 'EEG C4', 'EEG P4', 'EEG C3', 'EEG F8', 'EEG Cz', 'EEG Fz', 'EEG T5', 'EEG F7', 'EEG P3']


  mne.viz.plot_compare_evokeds(


In [18]:
from mne.stats import permutation_cluster_test
import numpy as np

# Combine data from all Epochs objects in each group
data_b = np.concatenate([e.get_data()[:, 0, :] for e in epochs_b], axis=0)  # Fp1 is channel 0
data_g = np.concatenate([e.get_data()[:, 0, :] for e in epochs_g], axis=0)  # Fp1 is channel 0

# Reshape data: (n_epochs × n_times) for Fp1
X = [data_b, data_g]
t_obs, clusters, p_values, _ = permutation_cluster_test(X, n_permutations=1000)

Using a threshold of 3.850099
stat_fun(H1): min=8.538678864112182e-07 max=5.402692811264322
Running initial clustering …
Found 4 clusters


  t_obs, clusters, p_values, _ = permutation_cluster_test(X, n_permutations=1000)
  t_obs, clusters, p_values, _ = permutation_cluster_test(X, n_permutations=1000)


  0%|          | Permuting : 0/999 [00:00<?,       ?it/s]

In [19]:
# Add a standard montage to ensure sensor locations are defined
montage = mne.channels.make_standard_montage('standard_1020')

# Rename channels in evoked_b to match the montage nomenclature
rename_dict = {
	'EEG Fp1': 'Fp1', 'EEG Fp2': 'Fp2', 'EEG F3': 'F3', 'EEG F4': 'F4',
	'EEG F7': 'F7', 'EEG F8': 'F8', 'EEG T3': 'T3', 'EEG T4': 'T4',
	'EEG C3': 'C3', 'EEG C4': 'C4', 'EEG T5': 'T5', 'EEG T6': 'T6',
	'EEG P3': 'P3', 'EEG P4': 'P4', 'EEG O1': 'O1', 'EEG O2': 'O2',
	'EEG Fz': 'Fz', 'EEG Cz': 'Cz', 'EEG Pz': 'Pz', 'EEG A2-A1': 'A2-A1',
	'ECG ECG': 'ECG'
}

# Ensure the rename dictionary keys match the existing channel names in evoked_b.info['ch_names']
existing_channels = evoked_b.info['ch_names']
rename_dict = {k: v for k, v in rename_dict.items() if k in existing_channels}

evoked_b.rename_channels(rename_dict)

# Set channel types for missing channels
evoked_b.set_channel_types({'A2-A1': 'misc', 'ECG': 'misc'})

# Set the montage, allowing missing channels
evoked_b.set_montage(montage, on_missing='ignore')

# Plot the topomap
evoked_b.plot_topomap(times=[0.3, 0.6, 0.9], ch_type='eeg')

  evoked_b.set_channel_types({'A2-A1': 'misc', 'ECG': 'misc'})


<MNEFigure size 450x220 with 4 Axes>

In [20]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = ''  # Disable GPU usage

import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model
import numpy as np

# Combine data from all epochs in one group (e.g., epochs_b)
if len(epochs_b) > 0:
    X_raw = np.concatenate([e.get_data().reshape(len(e), -1) for e in epochs_b], axis=0)
elif len(epochs_g) > 0:
    X_raw = np.concatenate([e.get_data().reshape(len(e), -1) for e in epochs_g], axis=0)
else:
    raise ValueError("Both 'epochs_b' and 'epochs_g' are empty. Ensure valid data is loaded.")

# Define autoencoder
input_layer = Input(shape=(X_raw.shape[1],))
encoded = Dense(64, activation='relu')(input_layer)
encoded = Dense(3, activation='relu')(encoded)  # Latent space (3D)
decoded = Dense(64, activation='relu')(encoded)
decoded = Dense(X_raw.shape[1], activation='linear')(decoded)

autoencoder = Model(input_layer, decoded)
autoencoder.compile(optimizer='adam', loss='mse')
autoencoder.fit(X_raw, X_raw, epochs=50, batch_size=32)

# Extract latent features
encoder = Model(input_layer, encoded)
X_ae = encoder.predict(X_raw)

print("Latent features extracted successfully.")

2025-07-01 08:12:21.370267: E external/local_xla/xla/stream_executor/cuda/cuda_platform.cc:51] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected


Epoch 1/50


2025-07-01 08:12:22.367476: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 25225200 exceeds 10% of free system memory.
2025-07-01 08:12:22.445254: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 25225200 exceeds 10% of free system memory.


[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 55ms/step - loss: 6.7569e-10
Epoch 2/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 53ms/step - loss: 6.6733e-10
Epoch 3/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 58ms/step - loss: 6.5861e-10
Epoch 4/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 60ms/step - loss: 6.6809e-10
Epoch 5/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 59ms/step - loss: 6.6402e-10
Epoch 6/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 52ms/step - loss: 6.8459e-10
Epoch 7/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 52ms/step - loss: 6.4998e-10
Epoch 8/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 54ms/step - loss: 6.6874e-10
Epoch 9/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 51ms/step - loss: 6.7210e-10
Epoch 10/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m

2025-07-01 08:12:58.956884: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 25225200 exceeds 10% of free system memory.


[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
Latent features extracted successfully.


In [23]:
from sklearn.decomposition import PCA
from scipy.stats import mannwhitneyu


# Ensure X_ae contains valid latent features
if 'X_ae' not in locals() or X_ae is None:
    raise ValueError("The variable 'X_ae' is not defined or contains no data. Ensure latent features are extracted.")

# Perform PCA on the latent features
pca = PCA(n_components=2)  # Reduce to 2 principal components for testing
X_pca = pca.fit_transform(X_ae)

# Define labels for the groups (e.g., 'B' for Bad counters, 'G' for Good counters)
# Ensure labels correspond to the data in X_pca
num_samples = X_pca.shape[0]
labels = np.array(['B'] * (num_samples // 2) + ['G'] * (num_samples // 2))  # Adjust to match the number of samples

# Test PC1 differences between groups
try:
    u, p = mannwhitneyu(X_pca[labels == 'B', 0], X_pca[labels == 'G', 0])
    print(f"PC1 U={u}, p={p:.4f}")
except Exception as e:
    print(f"Error performing Mann-Whitney U test: {e}")

PC1 U=10915.5, p=0.6566


In [24]:
# Energy landscape from PCA
from scipy.stats import gaussian_kde
z = gaussian_kde(X_pca.T)(X_pca.T)  # Density estimate
plt.tricontourf(X_pca[:, 0], X_pca[:, 1], z, levels=20, cmap='viridis')

<matplotlib.tri._tricontour.TriContourSet at 0x792e901e2c30>

In [26]:
from numpy import mean, std
from scipy.stats import norm
from math import sqrt

def cohens_d(x, y):
    return (mean(x) - mean(y)) / sqrt((std(x, ddof=1)**2 + std(y, ddof=1)**2) / 2)

d = cohens_d(X_pca[labels=='B', 0], X_pca[labels=='G', 0])
print(f"Cohen's d: {d:.3f}")

Cohen's d: -0.049
