In [1]:
import mne
import numpy as np
import matplotlib.pyplot as plt
import nolds

from mne.datasets import eegbci
from mne.io import concatenate_raws, read_raw_edf
from mne.channels import make_standard_montage


from scipy.signal import detrend
from tqdm import tqdm
from scipy.stats import linregress

from mne.time_frequency import tfr_array_morlet
from scipy.signal import hilbert, butter, filtfilt


In [2]:
import my_functions2 as my_fun2

In [3]:
subject = 1
runs = {
    "rest": [1],
    "motor_execution": [3, 7, 11],
    "motor_imagery": [4, 8, 12]
}

subject_data = my_fun2.load_and_preprocess_subject(subject, runs)


➡️ Loading REST | Runs: [1]
Extracting EDF parameters from C:\Users\flavi\mne_data\MNE-eegbci-data\files\eegmmidb\1.0.0\S001\S001R01.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 9759  =      0.000 ...    60.994 secs...
EEG channel type selected for re-referencing
Adding average EEG reference projection.
1 projection items deactivated
Average reference projection was added, but has not been applied yet. Use the apply_proj method to apply it.
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 1 - 40 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 1.00
- Lower transition bandwidth: 1.00 Hz (-6 dB cutoff frequency: 0.50 Hz)
- Upper passband edge: 40.00 Hz
- Upper transition bandwidth: 10.00 Hz (-6 dB cu

[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.0s


EEG channel type selected for re-referencing
Adding average EEG reference projection.
1 projection items deactivated
Average reference projection was added, but has not been applied yet. Use the apply_proj method to apply it.
Filtering raw data in 3 contiguous segments
Setting up band-pass filter from 1 - 40 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 1.00
- Lower transition bandwidth: 1.00 Hz (-6 dB cutoff frequency: 0.50 Hz)
- Upper passband edge: 40.00 Hz
- Upper transition bandwidth: 10.00 Hz (-6 dB cutoff frequency: 45.00 Hz)
- Filter length: 529 samples (3.306 s)

✅ motor_execution | Shape: (64, 60000) | Duration: 6.25 min

➡️ Loading MOTOR_IMAGERY | Runs: [4, 8, 12]
Extracting EDF parameters from C:\Users\flavi\mne_data\MNE-eegbci-data\files\eegmmidb\1.0.0\S001\S001R04.ed

[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.0s


Reading 0 ... 19999  =      0.000 ...   124.994 secs...
Extracting EDF parameters from C:\Users\flavi\mne_data\MNE-eegbci-data\files\eegmmidb\1.0.0\S001\S001R08.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...
Extracting EDF parameters from C:\Users\flavi\mne_data\MNE-eegbci-data\files\eegmmidb\1.0.0\S001\S001R12.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...
EEG channel type selected for re-referencing
Adding average EEG reference projection.
1 projection items deactivated
Average reference projection was added, but has not been applied yet. Use the apply_proj method to apply it.
Filtering raw data in 3 contiguous segments
Setting up band-pass filter from 1 - 40 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain

[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.0s


In [None]:
%matplotlib qt

# Debug plot (rest condition)
my_fun2.quick_plot(subject_data["rest"], title=f"Subject {subject} | REST")

# Quick check of motor
my_fun2.quick_plot(subject_data["motor_execution"], title=f"Subject {subject} | MOTOR EXECUTION")

Using matplotlib as 2D backend.


Channels marked as bad:
none
Channels marked as bad:
none


In [5]:
# Extract epochs from each condition
epochs_rest = my_fun2.extract_clean_epochs(subject_data["rest"])
epochs_exec = my_fun2.extract_clean_epochs(subject_data["motor_execution"])
epochs_imag = my_fun2.extract_clean_epochs(subject_data["motor_imagery"])

Used Annotations descriptions: ['T0']

⏺️ Used Annotations descriptions: ['T0']
Not setting metadata
1 matching events found
No baseline correction applied
Created an SSP operator (subspace dimension = 1)
1 projection items activated
Using data from preloaded Raw for 1 events and 641 original time points ...
0 bad epochs dropped
⚠️ No T1/T2 (TASK) events found.
📊 Extracted 1 REST epochs & 0 TASK epochs
Used Annotations descriptions: ['T0', 'T1', 'T2']

⏺️ Used Annotations descriptions: ['T0', 'T1', 'T2']
Not setting metadata
45 matching events found
No baseline correction applied
Created an SSP operator (subspace dimension = 1)
1 projection items activated
Using data from preloaded Raw for 45 events and 641 original time points ...
0 bad epochs dropped
Not setting metadata
45 matching events found
No baseline correction applied
Created an SSP operator (subspace dimension = 1)
1 projection items activated
Using data from preloaded Raw for 45 events and 641 original time points ...
0 bad

In [6]:
combined_rest = mne.concatenate_epochs([
    epochs_exec['rest'], 
    epochs_imag['rest'], 
    epochs_rest['rest']
])

combined_task = mne.concatenate_epochs([
    epochs_exec['task'], 
    epochs_imag['task']
])

# Shuffle task epochs 
shuffled_indices = np.random.permutation(len(combined_task))
combined_task = combined_task[shuffled_indices]

Not setting metadata
91 matching events found
No baseline correction applied
Created an SSP operator (subspace dimension = 1)
Not setting metadata
90 matching events found
No baseline correction applied
Created an SSP operator (subspace dimension = 1)


  combined_rest = mne.concatenate_epochs([
  combined_task = mne.concatenate_epochs([


In [None]:
%matplotlib qt 
#combined_task.plot(n_channels=8, title="Combined Task Epochs")

# DFA Analysis

α ≈ 0.7–1.0: Indicates long-range temporal correlations (LRTC) and near-critical dynamics

In [7]:
alpha_rest = my_fun2.compute_dfa_from_epochs(combined_rest, picks=['C3', 'Cz', 'C4'])
alpha_task = my_fun2.compute_dfa_from_epochs(combined_task, picks=['C3', 'Cz', 'C4'])

print("Rest Alpha Values:", alpha_rest)
print("Task Alpha Values:", alpha_task)

Rest Alpha Values: {'C3': 0.48820945778786223, 'Cz': 0.4947144071605881, 'C4': 0.49333113617619306}
Task Alpha Values: {'C3': 0.4848297426072331, 'Cz': 0.4788085346613098, 'C4': 0.4854755032495091}


In [8]:
# Extract EEG signal (flatten Cz channel from rest condition)
signal = combined_task.get_data(picks="C3").reshape(-1)

# Compute DFA
alpha = my_fun2.compute_dfa(signal)
print(f"DFA exponent (Cz, rest): {alpha:.3f}")

DFA exponent (Cz, rest): 0.485


In [None]:
# Extract data (e.g., rest condition)
data_rest = combined_rest.get_data(picks=['C3', 'Cz', 'C4'])  # shape: (n_epochs, n_channels, n_times)
X_rest = data_rest.transpose(1, 0, 2).reshape(data_rest.shape[1], -1)  # (channels, time)

# Define segment sizes
segment_sizes = np.geomspace(30, X_rest.shape[1] // 5, 20).astype(int)

# Compute DFA
F_rest = my_fun2.compute_F(X_rest, segment_sizes)
alpha_rest, intercept_rest, _, _ = my_fun2.compute_DFA(segment_sizes, F_rest, fitting_range=(50, 10000))

# Plot
my_fun2.plot_dfa_fit(segment_sizes, F_rest, alpha_rest, intercept_rest, fitting_range=(50, 10000))

Computing F(n) per channel:  33%|███▎      | 1/3 [01:41<03:22, 101.10s/it]

In [None]:
# Extract data (e.g., rest condition)
data_rest = combined_task.get_data(picks=['C3', 'Cz', 'C4'])  # shape: (n_epochs, n_channels, n_times)
X_rest = data_rest.transpose(1, 0, 2).reshape(data_rest.shape[1], -1)  # (channels, time)

# Define segment sizes
segment_sizes = np.geomspace(30, X_rest.shape[1] // 5, 20).astype(int)

# Compute DFA
F_rest = my_fun2.compute_F(X_rest, segment_sizes)
alpha_rest, intercept_rest, _, _ = my_fun2.compute_DFA(segment_sizes, F_rest, fitting_range=(50, 10000))

# Plot
my_fun2.plot_dfa_fit(segment_sizes, F_rest, alpha_rest, intercept_rest, fitting_range=(50, 10000))

In [None]:
alpha_beta_rest = my_fun2.run_band_limited_dfa(combined_rest, channel="Cz", band=(13, 30))
alpha_beta_task = my_fun2.run_band_limited_dfa(combined_task, channel="Cz", band=(13, 30))

print(f"Beta DFA (Rest - Cz): {alpha_beta_rest:.3f}")
print(f"Beta DFA (Task - Cz): {alpha_beta_task:.3f}")


In [None]:
# 1. Get envelope from Cz, 13 Hz (alpha)
envelope = my_fun2.xtract_wavelet_envelope(combined_rest, channel="Cz", sfreq=combined_rest.info['sfreq'], freq=13)

# 2. Define segment sizes
segment_sizes = np.unique(np.logspace(2, np.log10(len(envelope)/5), num=20, base=10).astype(int))

# 3. Compute F(n)
F = my_fun2.compute_fluctuation_function(envelope, segment_sizes)

# 4. Fit alpha
alpha, _ = my_fun2.compute_dfa_scaling(segment_sizes, F)

# 5. Plot
plt.figure(figsize=(8, 5))
plt.loglog(segment_sizes, F, 'o-', label=f"Cz (α = {alpha:.3f})")
plt.xlabel("log(Window size)")
plt.ylabel("log(Fluctuation)")
plt.title("DFA on Alpha Envelope – Cz (Rest)")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()