1) Plan
For each filtered EEG file:

Run weak stationarity test (t-test for means, Levene for variances).

Compute D₂_eff for the real signal.

Generate n_surrogates IAAFT surrogates and compute D₂_eff for each; take surrogate mean ± std.

Compute Nonlinear Prediction Error (NLE) using local-neighbour prediction (embedding dimension from D₂_opt whenever available).

Save per-file .npz with metrics, and also accumulate a master validation_metrics.csv.

After all files processed:

Train a Random Forest classifier on aggregated features (D2, NLE, basic stats) as a baseline.

Produce segmented .npz datasets for CNN/LSTM (options to save as .npy arrays).

optimize for speed:

Use joblib.Parallel to parallelize files.

Use KDTree for nearest-neighbor queries.

Keep surrogate count modest by default (e.g., 9) to balance speed vs. stat power — you can increase later.

In [9]:
# full_metrics_pipeline.py
import os
import numpy as np
from joblib import Parallel, delayed
from tqdm import tqdm
from scipy.stats import ttest_ind, levene, linregress
from sklearn.neighbors import KDTree
from sklearn.metrics import mean_squared_error
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, StratifiedKFold
import time
import warnings
warnings.filterwarnings("ignore")

# -------------------------
# USER CONFIG
# -------------------------
INPUT_ROOT = r"D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED"   # filtered files: F/,N/,O/,S/,Z/
OUTPUT_ROOT = r"D:\Downloads\EEG_DATA_REPO\EEG_METRICS"      # per-file npz outputs
CSV_OUT = os.path.join(OUTPUT_ROOT, "validation_metrics.csv")
os.makedirs(OUTPUT_ROOT, exist_ok=True)

# D2/NLE params
MAX_M = 12
TAU = 1
THEILER = 5
EPS_POINTS = 25
MIN_SCALING_PTS = 5
N_NEIGHBORS_NLE = 5

# Surrogates
N_SURROGATES = 7            # default; increase to 19 for more power
IAAFT_MAX_ITERS = 100

# Parallel
N_JOBS = -1                # use all cores

# -------------------------
# Helper functions
# -------------------------
def weak_stationarity_test(signal, segments=4):
    N = len(signal)
    seg_len = N // segments
    if seg_len < 10:
        return {'mean_p_vals': [], 'var_p_vals': [], 'stationary': False}
    segs = [signal[i*seg_len:(i+1)*seg_len] for i in range(segments)]
    p_mean = []
    p_var = []
    for i in range(segments):
        for j in range(i+1, segments):
            _, p_m = ttest_ind(segs[i], segs[j], equal_var=False)
            _, p_v = levene(segs[i], segs[j])
            p_mean.append(p_m)
            p_var.append(p_v)
    stationary = (np.min(p_mean) > 0.05) and (np.min(p_var) > 0.05)
    return {'mean_p_vals': p_mean, 'var_p_vals': p_var, 'stationary': stationary}

def embed_time_series(x, m, tau=TAU):
    N = len(x)
    M = N - (m - 1) * tau
    if M <= 0:
        return np.empty((0, m))
    return np.array([x[i:i + m * tau:tau] for i in range(M)])

def correlation_sum_kdtree(X, epsilons, theiler=THEILER):
    N = X.shape[0]
    tree = KDTree(X)
    C = np.zeros(len(epsilons), dtype=np.float64)
    for idx, eps in enumerate(epsilons):
        neighs = tree.query_radius(X, r=eps, return_distance=False)
        count = 0
        for i, neigh in enumerate(neighs):
            count += np.sum(np.abs(neigh - i) > theiler)
        C[idx] = 2.0 * count / (N * (N - 1))
    return C

def find_scaling_region(log_eps, log_C, min_pts=MIN_SCALING_PTS):
    max_r2 = 0.0
    best_slope = None
    best_range = None
    L = len(log_eps)
    for start in range(0, L - min_pts):
        for end in range(start + min_pts, L):
            x = log_eps[start:end]
            y = log_C[start:end]
            slope, _, r_value, _, _ = linregress(x, y)
            if (r_value**2) > max_r2 and 0.1 < slope < 50:
                max_r2 = r_value**2
                best_slope = slope
                best_range = (start, end)
    return best_slope, best_range, max_r2

def estimate_d2_eff_signal(x, m_max=MAX_M, tau=TAU, theiler=THEILER, eps_points=EPS_POINTS):
    x = np.asarray(x, dtype=np.float64)
    N = len(x)
    if N < 50:
        return {'D2_eff': np.nan, 'm_opt': None, 'log_eps': None, 'log_C': None, 'scaling_range': None, 'valid': False}
    s = np.std(x)
    epsilons = np.logspace(np.log10(max(1e-12, 0.01*s)), np.log10(max(1e-12, 0.5*s)), num=eps_points)
    log_eps = np.log10(epsilons)
    best_slope = np.inf
    best_m = None
    best_log_C = None
    best_range = None
    max_m = min(m_max, (N - 1) // tau)
    for m in range(1, max_m + 1):
        X = embed_time_series(x, m, tau)
        if X.shape[0] < 10:
            continue
        C = correlation_sum_kdtree(X, epsilons, theiler=theiler)
        log_C = np.log10(C + 1e-12)
        slope, eps_range, r2 = find_scaling_region(log_eps, log_C)
        if slope is not None and slope < best_slope:
            best_slope = slope
            best_m = m
            best_log_C = log_C
            best_range = eps_range
    valid = best_m is not None
    return {'D2_eff': float(best_slope) if valid else np.nan,
            'm_opt': int(best_m) if valid else None,
            'log_eps': log_eps if valid else None,
            'log_C': best_log_C if valid else None,
            'scaling_range': best_range if valid else None,
            'valid': bool(valid)}

# -------- IAAFT surrogate generator (vectorized-ish) ----------
def iaaft_surrogate(x, max_iter=IAAFT_MAX_ITERS):
    # Single surrogate; returns same-length vector
    x = np.asarray(x, dtype=np.float64)
    N = len(x)
    sorted_x = np.sort(x)
    # original amplitude spectrum
    orig_fft = np.fft.rfft(x)
    amp = np.abs(orig_fft)
    # initialize surrogate as random shuffle
    surr = np.random.permutation(x)
    for _ in range(max_iter):
        # impose target spectrum
        surr_fft = np.fft.rfft(surr)
        phases = np.angle(surr_fft)
        surr = np.fft.irfft(amp * np.exp(1j * phases), n=N)
        # impose amplitude distribution by rank-order
        surr = sorted_x[np.argsort(np.argsort(surr))]
    return surr

def make_iaaft_surrogates(x, n_surrogates=7):
    return [iaaft_surrogate(x) for _ in range(n_surrogates)]

# -------- NLE ----------
def nonlinear_prediction_error(x, m=3, tau=TAU, k=N_NEIGHBORS_NLE):
    x = np.asarray(x, dtype=np.float64)
    X = embed_time_series(x, m+1, tau)
    if X.shape[0] < (k+3):
        return np.nan
    X_obs = X[:, :-1]
    y = X[:, -1]
    tree = KDTree(X_obs)
    dists, idxs = tree.query(X_obs, k=k+1)
    preds = []
    for i in range(X_obs.shape[0]):
        neigh_idxs = idxs[i, 1:]  # exclude self
        pred = np.mean(X[neigh_idxs, -1])
        preds.append(pred)
    preds = np.array(preds)
    return float(np.sqrt(mean_squared_error(y, preds)))

# -------------------------
# Per-file worker
# -------------------------
def process_file(in_path, out_dir, compute_surrogates=True):
    name = os.path.splitext(os.path.basename(in_path))[0]
    out_path = os.path.join(out_dir, f"{name}.npz")
    if os.path.exists(out_path):
        return {'name': name, 'status': 'exists'}

    try:
        x = np.loadtxt(in_path)
    except Exception as e:
        return {'name': name, 'status': 'load_error', 'error': str(e)}

    # 1. stationarity
    stat = weak_stationarity_test(x)

    # 2. D2 for real
    d2_real = estimate_d2_eff_signal(x)

    # 3. D2 for surrogates (mean+std)
    surr_mean = np.nan
    surr_std = np.nan
    if compute_surrogates:
        sur_d2s = []
        for s in make_iaaft_surrogates(x, n_surrogates=N_SURROGATES):
            dd = estimate_d2_eff_signal(s)
            sur_d2s.append(dd['D2_eff'])
        sur_d2s = np.array(sur_d2s, dtype=np.float64)
        surr_mean = float(np.nanmean(sur_d2s))
        surr_std = float(np.nanstd(sur_d2s))

    # 4. NLE (use d2 m_opt if available)
    m_for_nle = d2_real['m_opt'] if d2_real['m_opt'] is not None else 3
    nle = nonlinear_prediction_error(x, m=max(2, m_for_nle), tau=TAU, k=N_NEIGHBORS_NLE)

    # 5. basic stats
    basic = {'mean': float(np.mean(x)), 'std': float(np.std(x)),
             'min': float(np.min(x)), 'max': float(np.max(x))}

    np.savez_compressed(out_path,
                        name=name,
                        stationarity=stat,
                        d2_real=d2_real,
                        d2_surr_mean=surr_mean,
                        d2_surr_std=surr_std,
                        nle=nle,
                        basic=basic)
    return {'name': name, 'status': 'processed', 'd2_real': d2_real['D2_eff'], 'd2_surr_mean': surr_mean, 'nle': nle}

# -------------------------
# Run across all files in parallel
# -------------------------
def run_all(input_root=INPUT_ROOT, output_root=OUTPUT_ROOT, labels=['F','N','O','S','Z'], compute_surrogates=True):
    tasks = []
    for label in labels:
        in_folder = os.path.join(input_root, label)
        out_folder = os.path.join(output_root, label)
        os.makedirs(out_folder, exist_ok=True)
        for fname in sorted(os.listdir(in_folder)):
            if fname.lower().endswith('.txt') or fname.lower().endswith('.npy'):
                tasks.append((os.path.join(in_folder, fname), out_folder))

    print("Total files:", len(tasks))
    results = Parallel(n_jobs=N_JOBS)(
        delayed(process_file)(t[0], t[1], compute_surrogates) for t in tqdm(tasks)
    )
    return results

# -------------------------
# Aggregate to CSV
# -------------------------
import csv
def aggregate_to_csv(metrics_root=OUTPUT_ROOT, csv_out=CSV_OUT, labels=['F','N','O','S','Z']):
    rows = []
    for label in labels:
        folder = os.path.join(metrics_root, label)
        if not os.path.exists(folder): continue
        for fname in sorted(os.listdir(folder)):
            if not fname.endswith('.npz'): continue
            data = np.load(os.path.join(folder, fname), allow_pickle=True)
            name = data['name'].item()
            stationarity = data['stationarity'].item()
            d2r = data['d2_real'].item()['D2_eff'] if isinstance(data['d2_real'].item(), dict) else float(data['d2_real'].item())
            d2s_mean = float(data['d2_surr_mean'].item())
            nle = float(data['nle'].item())
            basic = data['basic'].item()
            rows.append({'label': label, 'name': name, 'stationary': stationarity['stationary'],
                         'd2_real': d2r, 'd2_surr_mean': d2s_mean, 'd2_surr_std': float(data['d2_surr_std'].item()),
                         'nle': nle,
                         'mean': basic['mean'], 'std': basic['std']})
    # write csv
    keys = ['label','name','stationary','d2_real','d2_surr_mean','d2_surr_std','nle','mean','std']
    with open(csv_out, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=keys)
        writer.writeheader()
        for r in rows:
            writer.writerow(r)
    print("CSV saved:", csv_out)
    return csv_out

# -------------------------
# Random forest baseline training
# -------------------------
def rf_baseline_from_csv(csv_path=CSV_OUT, n_splits=5):
    import pandas as pd
    df = pd.read_csv(csv_path)
    # drop NaNs, simple imputation if needed
    df = df.dropna()
    X = df[['d2_real','d2_surr_mean','nle','mean','std']].values
    y = df['label'].values
    # simple label encoding
    from sklearn.preprocessing import LabelEncoder
    le = LabelEncoder()
    y_enc = le.fit_transform(y)
    rf = RandomForestClassifier(n_estimators=200, n_jobs=-1, random_state=42)
    cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    scores = cross_val_score(rf, X, y_enc, cv=cv, scoring='accuracy', n_jobs=-1)
    print("RF baseline accuracy (cv): mean=%.3f std=%.3f" % (scores.mean(), scores.std()))
    return rf, scores, le

# -------------------------
# Example run in __main__
# -------------------------
if __name__ == "__main__":
    t0 = time.time()
    results = run_all(compute_surrogates=True)
    print("Processing done. time:", time.time() - t0)
    aggregate_to_csv()
    rf, scores, le = rf_baseline_from_csv()
    print("Done. RF CV scores:", scores)


Total files: 500


100%|██████████| 500/500 [41:02<00:00,  4.92s/it]  


Processing done. time: 3586.430608510971
CSV saved: D:\Downloads\EEG_DATA_REPO\EEG_METRICS\validation_metrics.csv
RF baseline accuracy (cv): mean=0.836 std=0.055
Done. RF CV scores: [0.8  0.83 0.76 0.92 0.87]


Next steps for CNN / LSTM pipelines:

Prepare your input data:

For CNNs, you typically need fixed-length windows of your EEG signals (e.g., 4096 points per window) and optionally normalized.

For LSTMs, you can feed raw sequential data or preprocessed features (D2, NLE, etc.) in time order.

Label encoding:

Your labels (F, N, O, S, Z) need to be numerically encoded for supervised learning.

Train-test split:

Keep some portion of the data for validation/testing to avoid overfitting.

CNN pipeline:

Input: 1D EEG signal (shape: [samples, timesteps, 1])

Layers: Conv1D → ReLU → Pooling → Flatten → Dense → Softmax

Loss: Categorical Cross-Entropy

Optimizer: Adam

LSTM pipeline:

Input: [samples, timesteps, features]

Layers: LSTM → Dropout → Dense → Softmax

Loss: Categorical Cross-Entropy

Optimizer: Adam

Evaluation:

Accuracy, confusion matrix, F1 score.

Compare CNN/LSTM performance to your Random Forest baseline (≈ 0.836).

4) How to run & expected runtime

Put the script in the same environment you used for filtering (it needs numpy, joblib, scipy, sklearn, tqdm).

Default N_SURROGATES = 7. With D₂ computations
and surrogates, expect this to take minutes per file unless signals are short. Parallelism reduces wall-clock time. For 500 signals, runtime could be several hours depending on CPU. If you want speed:

Lower N_SURROGATES (1–3) for a quick check.

Lower MAX_M to 8.

Or run on a machine with more CPU cores.

The script saves per-file .npz — you can resume by re-running; it skips files if .npz already exists.

the 'N' folder is empty or missing those .txt files altogether under EEGDATA_EXTRACTED.

That means the problem happened at or before extraction from the zip files.

Next immediate steps:

Check the raw zip for 'N' files:

Look inside the original N.zip to verify it actually contains .txt files.

Run this snippet to list contents without extracting:

After running — what to inspect

EEG_METRICS/<label>/file.npz — open few with:

In [2]:
d = np.load("EEG_METRICS/F/S01.npz", allow_pickle=True)
print(d['d2_real'].item())
print("NLE:", d['nle'])


FileNotFoundError: [Errno 2] No such file or directory: 'EEG_METRICS/F/S01.npz'

  3%|▎         | 12/400 [00:20<00:07, 52.42it/s]

validation_metrics.csv — quick view of real vs surrogate D₂_mean and NLE distribution.

RF baseline printed accuracy — gives simple baseline comparing feature-based method.

6) Next steps I can do for you (pick any)

Increase surrogate count and/or compute p-values per-file (e.g., real D₂ > 95th percentile of surrogates).

Produce visualization code: histograms, boxplots of D₂_real vs D₂_surr across labels.

Build and train full CNN and LSTM on segmented raw data created from filtered signals (I’ll produce the segmentation & model scripts).

Tune RF classifier (feature selection, hyperparameter search) or try XGBoost.

If you’d like, I can now:

paste the script into your environment and walk you through running it step-by-step, OR

immediately produce the CNN/LSTM dataset & model scripts (with training/eval loops) after this pipeline finishes.

In [3]:
import zipfile

zip_path = r"D:\Downloads\EEG_DATA_REPO\EEGDATA\N.zip"
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    print("Files inside N.zip:")
    print(zip_ref.namelist())


Files inside N.zip:
['N001.TXT', 'N099.TXT', 'N098.TXT', 'N097.TXT', 'N096.TXT', 'N095.TXT', 'N094.TXT', 'N093.TXT', 'N092.TXT', 'N091.TXT', 'N090.TXT', 'N089.TXT', 'N088.TXT', 'N087.TXT', 'N086.TXT', 'N085.TXT', 'N084.TXT', 'N083.TXT', 'N082.TXT', 'N081.TXT', 'N080.TXT', 'N079.TXT', 'N078.TXT', 'N077.TXT', 'N076.TXT', 'N075.TXT', 'N074.TXT', 'N073.TXT', 'N072.TXT', 'N071.TXT', 'N070.TXT', 'N069.TXT', 'N068.TXT', 'N067.TXT', 'N066.TXT', 'N065.TXT', 'N064.TXT', 'N063.TXT', 'N062.TXT', 'N061.TXT', 'N060.TXT', 'N059.TXT', 'N058.TXT', 'N057.TXT', 'N056.TXT', 'N055.TXT', 'N054.TXT', 'N053.TXT', 'N052.TXT', 'N051.TXT', 'N050.TXT', 'N049.TXT', 'N048.TXT', 'N047.TXT', 'N046.TXT', 'N045.TXT', 'N044.TXT', 'N043.TXT', 'N042.TXT', 'N041.TXT', 'N040.TXT', 'N039.TXT', 'N038.TXT', 'N037.TXT', 'N036.TXT', 'N035.TXT', 'N034.TXT', 'N033.TXT', 'N032.TXT', 'N031.TXT', 'N030.TXT', 'N029.TXT', 'N028.TXT', 'N027.TXT', 'N026.TXT', 'N025.TXT', 'N024.TXT', 'N023.TXT', 'N022.TXT', 'N021.TXT', 'N020.TXT', 'N019.T

If the zip is empty or missing expected .txt files, you’ll need to re-download or source a proper archive.

If files are present in zip, re-extract carefully:

Make sure you are extracting the zip correctly. Here’s a robust extract snippet:

In [4]:
import zipfile
import os

zip_path = r"D:\Downloads\EEG_DATA_REPO\EEGDATA\N.zip"
extract_dir = r"D:\Downloads\EEG_DATA_REPO\EEGDATA_EXTRACTED\N"

os.makedirs(extract_dir, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

print(f"Extracted N.zip contents to: {extract_dir}")


Extracted N.zip contents to: D:\Downloads\EEG_DATA_REPO\EEGDATA_EXTRACTED\N


After extraction, check the folder again:

in EEGDATA_EXTRACTED THE N files were written as N001.TXT and so on. we need to re convert them to N....txt and run the d2eff and the steps on them

In [5]:
print(os.listdir(extract_dir))


['n001.txt', 'n002.txt', 'n003.txt', 'n004.txt', 'n005.txt', 'n006.txt', 'n007.txt', 'n008.txt', 'n009.txt', 'n010.txt', 'n011.txt', 'n012.txt', 'n013.txt', 'n014.txt', 'n015.txt', 'n016.txt', 'n017.txt', 'n018.txt', 'n019.txt', 'n020.txt', 'n021.txt', 'n022.txt', 'n023.txt', 'n024.txt', 'n025.txt', 'n026.txt', 'n027.txt', 'n028.txt', 'n029.txt', 'n030.txt', 'n031.txt', 'n032.txt', 'n033.txt', 'n034.txt', 'n035.txt', 'n036.txt', 'n037.txt', 'n038.txt', 'n039.txt', 'n040.txt', 'n041.txt', 'n042.txt', 'n043.txt', 'n044.txt', 'n045.txt', 'n046.txt', 'n047.txt', 'n048.txt', 'n049.txt', 'n050.txt', 'n051.txt', 'n052.txt', 'n053.txt', 'n054.txt', 'n055.txt', 'n056.txt', 'n057.txt', 'n058.txt', 'n059.txt', 'n060.txt', 'n061.txt', 'n062.txt', 'n063.txt', 'n064.txt', 'n065.txt', 'n066.txt', 'n067.txt', 'n068.txt', 'n069.txt', 'n070.txt', 'n071.txt', 'n072.txt', 'n073.txt', 'n074.txt', 'n075.txt', 'n076.txt', 'n077.txt', 'n078.txt', 'n079.txt', 'n080.txt', 'n081.txt', 'n082.txt', 'n083.txt', 'n0

Your filtering and analysis code is looking for files ending with .txt (lowercase) — so N001.TXT (uppercase) won't be picked up.

Also, you might want consistent filenames (N001.txt) to keep your pipeline smooth.

In [7]:
import os

n_dir = r"D:\Downloads\EEG_DATA_REPO\EEGDATA_EXTRACTED\N"

for fname in os.listdir(n_dir):
    if fname.endswith('.TXT'):
        old_path = os.path.join(n_dir, fname)
        new_fname = fname.lower()  # e.g., N001.TXT -> n001.txt
        new_path = os.path.join(n_dir, new_fname)
        os.rename(old_path, new_path)
        print(f"Renamed: {fname} -> {new_fname}")
print("Renaming complete.")


Renaming complete.


After renaming:

Your filtering and subsequent steps will find and process the N channel files seamlessly.

Make sure your scripts are case-insensitive when reading files or that you consistently lowercase filenames.

Optional improvement:

If you want, I can help you update the file loading code to automatically accept any case (.TXT or .txt) to avoid this issue in the future.

Once renamed, just rerun your filtering, feature extraction, and classification pipeline including the N files.

Here’s a Python script to verify all your zip files for presence of .txt files and then extract only those missing or incomplete into your EEGDATA_EXTRACTED folder. This way you won't miss any channel:

In [1]:
import zipfile
import os

zip_root = r"D:\Downloads\EEG_DATA_REPO\EEGDATA"
extract_root = r"D:\Downloads\EEG_DATA_REPO\EEGDATA_EXTRACTED"

channels = ['F', 'N', 'O', 'S', 'Z']

for ch in channels:
    zip_path = os.path.join(zip_root, f"{ch}.zip")
    extract_dir = os.path.join(extract_root, ch)

    print(f"Checking {ch}...")

    # Check zip contents
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        txt_files_in_zip = [f for f in zip_ref.namelist() if f.endswith('.txt')]
        if not txt_files_in_zip:
            print(f"  ⚠️ WARNING: No .txt files found inside {ch}.zip!")
            continue

    # Check if extraction folder exists and has .txt files
    if os.path.exists(extract_dir):
        extracted_txt_files = [f for f in os.listdir(extract_dir) if f.endswith('.txt')]
    else:
        extracted_txt_files = []

    # Decide whether to extract or skip
    if len(extracted_txt_files) >= len(txt_files_in_zip):
        print(f"  ✅ Extraction folder already has all files for {ch}, skipping extraction.")
    else:
        print(f"  ⏳ Extracting {ch}.zip to {extract_dir} ...")
        os.makedirs(extract_dir, exist_ok=True)
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_dir)
        print(f"  ✅ Extraction done for {ch}.")


Checking F...
  ✅ Extraction folder already has all files for F, skipping extraction.
Checking N...
Checking O...
  ✅ Extraction folder already has all files for O, skipping extraction.
Checking S...
  ✅ Extraction folder already has all files for S, skipping extraction.
Checking Z...
  ✅ Extraction folder already has all files for Z, skipping extraction.


In [8]:

import os
import numpy as np
import pandas as pd
from scipy.signal import butter, filtfilt
import matplotlib.pyplot as plt
from tqdm import tqdm

# Paths
EXTRACTED_DIR =  r"D:\Downloads\EEG_DATA_REPO\EEGDATA_EXTRACTED\N"
FILTERED_DIR = r"D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N"

os.makedirs(FILTERED_DIR, exist_ok=True)

# --- Bandpass Filter ---
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype="band")
    return b, a

def bandpass_filter(data, lowcut=0.5, highcut=40.0, fs=173.61, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = filtfilt(b, a, data)
    return y

# --- Load and filter N files ---
def process_N_files():
    all_files = [f for f in os.listdir(EXTRACTED_DIR) if f.startswith("n") and f.endswith(".txt")]
    
    print(f"Found {len(all_files)} N files")
    for fname in tqdm(all_files, desc="Filtering N files"):
        filepath = os.path.join(EXTRACTED_DIR, fname)
        
        # Load data
        data = np.loadtxt(filepath)
        
        # Apply bandpass filter
        filtered = bandpass_filter(data, fs=173.61)
        
        # Save filtered file
        savepath = os.path.join(FILTERED_DIR, fname.replace(".txt", ".txt"))
        np.savetxt(savepath, filtered)

    print(f"✅ All N files filtered & saved in {FILTERED_DIR}")

# --- Run ---
process_N_files()


Found 100 N files


Filtering N files: 100%|██████████| 100/100 [00:00<00:00, 114.00it/s]

✅ All N files filtered & saved in D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N





In [9]:
import os
import glob

# Path to the N folder inside EEGDATA_FILTERED
folder = r"D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N"

# Pattern: n + 4 characters + _filtered.txt
pattern = os.path.join(folder, "n???_filtered.txt")

# Find matching files
files_to_delete = glob.glob(pattern)

print(f"Found {len(files_to_delete)} files to delete.")

# Delete them
for file in files_to_delete:
    try:
        os.remove(file)
        print(f"Deleted: {file}")
    except Exception as e:
        print(f"Error deleting {file}: {e}")

Found 100 files to delete.
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n001_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n002_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n003_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n004_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n005_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n006_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n007_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n008_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n009_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n010_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n011_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n012_filtered.txt
Deleted: D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N\n013_filtered.txt
Deleted: D:\Downloads\EE

In [2]:
import os
import time

# Path to N folder inside EEGDATA_FILTERED
folder = "EEGDATA_FILTERED/N"

# Loop through all files in the folder
for file in os.listdir(folder):
    if "_filtered.txt" in file:   # only match the unwanted files
        file_path = os.path.join(folder, file)
        os.remove(file_path)      # delete the file
        print(f"Deleted: {file_path}")
        time.sleep(0.1)  # small delay for safety (optional)


In [8]:
import os
import pandas as pd

# Define your output root (replace with your actual path)
OUTPUT_ROOT = r" D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED\N"   # <-- update this

# Paths to metrics files
csv_main = os.path.join(OUTPUT_ROOT, "validation_metrics.csv")
csv_n    = os.path.join(OUTPUT_ROOT, "metrics_N.csv")

csv_n = csv_n.strip()

# Load both
df_main = pd.read_csv(csv_main)
df_n    = pd.read_csv(csv_n)

# Compare or merge as needed
print(df_main.head())
print(df_n.head())

OSError: [Errno 22] Invalid argument: ' D:\\Downloads\\EEG_DATA_REPO\\EEGDATA_FILTERED\\N\\validation_metrics.csv'

In [5]:
if __name__ == "__main__":
    t0 = time.time()
    # run only for N
    results = run_all(labels=['N'], compute_surrogates=True)
    print("Processing done. time:", time.time() - t0)

    # aggregate only N into a temporary CSV
    aggregate_to_csv(labels=['N'], csv_out=os.path.join(OUTPUT_ROOT, "metrics_N.csv"))

NameError: name 'run_all' is not defined

In [None]:
import os
import numpy as np
from scipy.signal import butter, filtfilt
from multiprocessing import Pool, cpu_count

# === Step 1: Rename .TXT to .txt in 'N' folder ===
def rename_uppercase_txt_to_lower(directory):
    renamed_files = 0
    for fname in os.listdir(directory):
        if fname.endswith('.TXT'):
            old_path = os.path.join(directory, fname)
            new_fname = fname.lower()
            new_path = os.path.join(directory, new_fname)
            os.rename(old_path, new_path)
            renamed_files += 1
            print(f"Renamed: {fname} -> {new_fname}")
    print(f"Renaming complete. Total files renamed: {renamed_files}")

# === Step 2: Filtering setup ===
Fs = 173.16
Fc = 40
order = 4
Wn = Fc / (Fs / 2)
b, a = butter(order, Wn, btype='low', analog=False)

def filter_and_save(task):
    in_path, out_path = task
    signal = np.loadtxt(in_path)
    filtered = filtfilt(b, a, signal)
    np.savetxt(out_path, filtered)
    return out_path


# === Step 3: Run filtering only for N ===
def run_filtering_for_N(input_root, output_root):
    input_dir = os.path.join(input_root, 'N')
    output_dir = os.path.join(output_root, 'N')
    os.makedirs(output_dir, exist_ok=True)

    tasks = []
    for fname in sorted(os.listdir(input_dir)):
        if fname.lower().endswith('.txt'):
            in_path = os.path.join(input_dir, fname)
            out_path = os.path.join(output_dir, fname.lower())
            tasks.append((in_path, out_path))

    print(f"Total N files to filter: {len(tasks)}")

    with Pool(cpu_count()) as pool:
        for result in pool.imap_unordered(filter_and_save, tasks):
            print(f"Saved: {result}")

if __name__ == "__main__":
    extracted_root = r"D:\Downloads\EEG_DATA_REPO\EEGDATA_EXTRACTED"
    filtered_root  = r"D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED"

    # Step 1: Fix .TXT case problem in N folder
    rename_uppercase_txt_to_lower(os.path.join(extracted_root, 'N'))

    # Step 2: Run filtering only for N
    run_filtering_for_N(extracted_root, filtered_root)

    print("✅ Filtering complete for N channel only!")

Renaming complete. Total files renamed: 0
Total N files to filter: 100


In [None]:
import os
import numpy as np
from scipy.signal import butter, filtfilt
from multiprocessing import Pool, cpu_count

# === Step 1: Rename .TXT to .txt in 'N' folder ===
def rename_uppercase_txt_to_lower(directory):
    renamed_files = 0
    for fname in os.listdir(directory):
        if fname.endswith('.TXT'):
            old_path = os.path.join(directory, fname)
            new_fname = fname.lower()
            new_path = os.path.join(directory, new_fname)
            os.rename(old_path, new_path)
            renamed_files += 1
            print(f"Renamed: {fname} -> {new_fname}")
    print(f"Renaming complete. Total files renamed: {renamed_files}")

# === Step 2: Filtering function ===
Fs = 173.16
Fc = 40
order = 4
Wn = Fc / (Fs / 2)
b, a = butter(order, Wn, btype='low', analog=False)

def filter_and_save(task):
    in_path, out_path = task
    signal = np.loadtxt(in_path)
    filtered = filtfilt(b, a, signal)
    np.savetxt(out_path, filtered)
    return out_path

# === Step 3: Prepare and run filtering on all channels ===
def run_filtering(input_root, output_root, channels):
    os.makedirs(output_root, exist_ok=True)
    tasks = []

    for ch in channels:
        input_dir = os.path.join(input_root, ch)
        output_dir = os.path.join(output_root, ch)
        os.makedirs(output_dir, exist_ok=True)

        for fname in sorted(os.listdir(input_dir)):
            # Accept .txt or .TXT but after renaming ideally .txt only
            if fname.lower().endswith('.txt'):
                in_path = os.path.join(input_dir, fname)
                out_path = os.path.join(output_dir, fname.lower())
                tasks.append((in_path, out_path))

    print(f"Total files to filter: {len(tasks)}")

    with Pool(cpu_count()) as pool:
        for result in pool.imap_unordered(filter_and_save, tasks):
            print(f"Saved: {result}")

if __name__ == "__main__":
    # Change these paths to your environment
    extracted_root = r"D:\Downloads\EEG_DATA_REPO\EEGDATA_EXTRACTED"
    filtered_root = r"D:\Downloads\EEG_DATA_REPO\EEGDATA_FILTERED"
    channel_list = ['F', 'N', 'O', 'S', 'Z']

    # Step 1: Rename in 'N'
    rename_uppercase_txt_to_lower(os.path.join(extracted_root, 'N'))

    # Step 2 & 3: Run filtering on all channels
    run_filtering(extracted_root, filtered_root, channel_list)

    print("✅ All filtering complete!")


Renaming complete. Total files renamed: 0
Total files to filter: 500


Set Z (Healthy, eyes open)

EEG from healthy volunteers in a resting state with eyes open.

Recorded from the surface electrodes.

Considered baseline normal activity.


Set O (Healthy, eyes closed)

EEG from the same healthy volunteers, but with eyes closed.

Also surface recordings.

Typically has stronger alpha rhythms (8–12 Hz) compared to Set Z.

Set N (Interictal, opposite hemisphere)

EEG from epileptic patients, recorded during seizure-free intervals (interictal state).

Taken from the hippocampal formation of the opposite hemisphere (not the epileptogenic zone).

Intracranial recording (depth electrode).

Set F (Interictal, epileptogenic zone)

EEG from epileptic patients, also in seizure-free intervals.

Recorded from the epileptogenic zone (where seizures originate).

Intracranial (depth electrode).

Often shows subtle pathological activity.


🔹 Set S (Seizure activity / Ictal EEG)

EEG from epileptic patients during seizures (ictal periods).

Intracranial recordings from within the epileptogenic zone.

Contains clear seizure discharges.

Clinical Terms in EEG Context

Ictal
👉 Refers to the actual seizure event (from onset until termination).
When an EEG segment is labeled ictal, it means the patient is actively experiencing a seizure in that recording window.

“Ictus” = Latin for “strike/attack.”

EEG shows rhythmic, abnormal discharges (spikes, sharp waves, spike-and-wave complexes).

Interictal
👉 EEG recorded between seizures.
The patient is not seizing, but there may be abnormal background activity (epileptiform discharges like spikes or sharp waves).
Important for diagnosis — many epilepsy patients show abnormalities even when not seizing.

Preictal
👉 The time before a seizure starts.
EEG may begin showing gradual changes (slowing, rhythmic buildup, nonlinear shifts). Used in seizure prediction studies.

Postictal
👉 The time right after a seizure ends.
EEG often shows slowing, suppression, or diffuse abnormalities as the brain recovers.

🔹 Mapping to the Bonn Dataset Sets

The Bonn dataset has 5 subsets (Z, O, F, N, S):

Z – Healthy volunteers, eyes open. (Normal baseline)

O – Healthy volunteers, eyes closed. (Normal baseline, less visual artifact)

N – Epileptic patients, interictal, EEG recorded from the healthy hemisphere.

F – Epileptic patients, interictal, EEG recorded from the epileptogenic zone (but not during seizure).

S – Epileptic patients, ictal — EEG recorded during seizures.

🔹 Practical Use

Z & O → Normal control signals.

N & F → Interictal (important for detecting abnormalities outside seizures).

S → Ictal (active seizure events).

So in summary:

Ictal = seizure,

Interictal = between seizures,

Preictal = before seizure,

Postictal = after seizure.

timeline diagram (preictal → ictal → postictal → interictal)

1. Ictal

Meaning: “During a seizure.”

In medical terms, “ictus” = seizure.

So, ictal EEG = EEG recorded while the patient is actively experiencing a seizure.

Characteristics:

Sudden, abnormal, high-amplitude discharges.

Spikes, sharp waves, rhythmic activity.

Distinct from normal background EEG.

2. Pre-ictal

Meaning: The time just before a seizure begins.

Sometimes considered the “warning phase.”

Researchers are especially interested here → because predicting seizures requires recognizing pre-ictal patterns.

Characteristics:

Gradual build-up of abnormal discharges.

Subtle shifts in frequency or synchrony.

3. Inter-ictal

Meaning: The period between seizures (patient is not seizing).

But still: there can be abnormal EEG signatures even when the patient is not having a seizure.

Characteristics:

May show “spike-and-wave” discharges.

Background EEG can be relatively normal or mildly disturbed.

This is important because an inter-ictal patient can look clinically normal but EEG shows epileptiform activity.


4. Normal (Baseline/Healthy)

EEG of subjects with no epilepsy at all.

Used as control group in datasets.


5. Bonn Dataset Sets (Z, O, N, F, S)

Specifically in the Bonn dataset, which you are working on:

Set Z: Healthy, eyes open.

Set O: Healthy, eyes closed.

Set N: Interictal (epileptic patient, opposite hemisphere, not seizing).

Set F: Interictal (epileptic patient, within epileptogenic zone, not seizing).

Set S: Ictal (epileptic seizure activity).

So:

Z, O = healthy (normal EEGs)

N, F = interictal (abnormal but no seizure)

S = ictal (seizure present)