### 0: import libraries

In [None]:
from pathlib import Path
import re
import pandas as pd
import mne

### 1: Imports & config

In [None]:
def get_project_root() -> Path:
    """Work in both scripts and notebooks: resolve repo root."""
    try:
        return Path(__file__).resolve().parents[1]  # scripts
    except NameError:
        cwd = Path.cwd().resolve()                  # notebooks
        return cwd.parent if cwd.name == "notebooks" else cwd

ROOT = get_project_root()

# Set your data folder here (edit if needed). Path handles spaces just fine.
CANDIDATES = [
    ROOT / "docs" / "EDF filer",
    ROOT / "EDF filer",
    ROOT / "data" / "raw",
]

def pick_data_dir(candidates):
    """Return the first existing directory from candidates; print which one is used."""
    for d in candidates:
        if d.exists():
            print(f"Using data dir: {d}")
            return d
    raise FileNotFoundError(f"No data dir found. Tried: {candidates}")

DATA_DIR = pick_data_dir(CANDIDATES)

Found 161 EDF files


### 2: Find EDF files

In [13]:
# Recursively collect .edf files
edf_files = sorted(DATA_DIR.rglob("*.edf"))
print(f"Found {len(edf_files)} EDF files in: {DATA_DIR}")
assert len(edf_files) > 0, "No EDF files found — adjust DATA_DIR above."

Found 161 EDF files in: \\regionh.top.local\dfs\Logget\AuditData\CONNECT-ME\Bjoern\eeg-bachelor\EDF filer


### 3: Lightweight scan (metadata only; no preprocessing)

In [14]:
def parse_name(p: Path):
    """
    Extract subject ID and optional split-part index from filenames like:
    '03TN-EDF+.edf'  -> subject='03TN', part=0
    '03TN-EDF+4.edf' -> subject='03TN', part=4
    Falls back to stem if pattern doesn't match.
    """
    m = re.match(r"^(.+?)-EDF\+?(\d+)?\.edf$", p.name, flags=re.IGNORECASE)
    subj = m.group(1) if m else p.stem
    part = int(m.group(2)) if (m and m.group(2)) else 0
    return subj, part

rows = []
channel_sets = []
sfreqs = set()
split_map = {}   # subject -> list of split indices

for f in edf_files:
    # Read header only (preload=False means signal is not loaded into RAM)
    raw = mne.io.read_raw_edf(f, preload=False, verbose="ERROR")

    sf = float(raw.info["sfreq"])
    sfreqs.add(sf)

    # EEG channels only
    picks = mne.pick_types(raw.info, eeg=True, meg=False, eog=False, ecg=False, stim=False, exclude=[])
    eeg_chs = [raw.ch_names[i] for i in picks]

    # Duration in seconds
    dur_s = raw.n_times / sf

    # Number of annotations (events/notes in the header)
    n_ann = len(raw.annotations) if raw.annotations is not None else 0

    # Measurement start time if present
    start = raw.info.get("meas_date")

    # Subject ID + split index from filename
    subj, part = parse_name(f)
    split_map.setdefault(subj, []).append(part)

    rows.append({
        "file": f.name,
        "subject": subj,
        "part": part,
        "sfreq_Hz": sf,
        "n_eeg": len(eeg_chs),
        "duration_s": round(dur_s, 2),
        "start": str(start) if start else None,
        "n_annotations": n_ann,
    })
    channel_sets.append(set(eeg_chs))

import pandas as pd
df = pd.DataFrame(rows).sort_values(["subject", "part"], ignore_index=True)
print("Preview (first 12 rows):")
display(df.head(12))


Preview (first 12 rows):


Unnamed: 0,file,subject,part,sfreq_Hz,n_eeg,duration_s,start,n_annotations
0,01CX-EDF+.edf,01CX,0,250.0,31,309.0,2020-09-11 16:41:59+00:00,6
1,01CX-EDF+1.edf,01CX,1,250.0,31,3595.0,2020-09-11 16:41:59+00:00,24
2,02IT-EDF+.edf,02IT,0,250.0,32,4578.0,2020-09-24 10:58:24+00:00,29
3,03TN-EDF+.edf,03TN,0,250.0,31,1881.0,2020-09-30 11:19:15+00:00,15
4,03TN-EDF+1.edf,03TN,1,250.0,31,31.0,2020-09-30 11:19:15+00:00,2
5,03TN-EDF+2.edf,03TN,2,250.0,31,50.0,2020-09-30 11:52:04+00:00,2
6,03TN-EDF+3.edf,03TN,3,250.0,31,24.0,2020-09-30 11:19:15+00:00,2
7,03TN-EDF+4.edf,03TN,4,250.0,31,305.0,2020-09-30 11:19:15+00:00,4
8,03TN-EDF+5.edf,03TN,5,250.0,31,2708.0,2020-09-30 11:19:15+00:00,16
9,04IW-EDF+.edf,04IW,0,250.0,25,1577.0,2020-10-01 17:16:25+00:00,12


file: file name.

subject: subject ID (udledt af filnavnet).

part: split-nummer for samme optagelse (0 = første del, +1, +2 … = fortsættelser).

sfreq_Hz: sample rate in Hz.

n_eeg: amount of EEG channels in the file.

duration_s: varighed i sekunder.

start: målingens start-timestamp.

n_annotations: antal annoteringer i headeren (events/noter). Det betyder fx at person X har snakket til ved kommende 10 gange + Y snakkede til patienten 10 gange osv. (se afsnit 3.2).

#### 3.1 Sample rate sanity check (≠ 250 Hz)

In [20]:
# Cell 3.1 — Samplerate sanity check
print("Unique sampling rates (Hz):", sorted(df["sfreq_Hz"].unique()))
print("\nCounts per sampling rate:")
print(df["sfreq_Hz"].value_counts().sort_index())

# Flag everything that is not exactly 250 Hz
non_250 = df[df["sfreq_Hz"] != 250.0]

if non_250.empty:
    print("\n✅ All files are at 250 Hz.")
else:
    print(f"\n⚠️ Found {len(non_250)} files not at 250 Hz:")
    display(
        non_250[["file","subject","part","sfreq_Hz","n_eeg","duration_s"]]
        .sort_values(["sfreq_Hz","subject","part"])
    )
    print("\nPer-subject counts for non-250 Hz:")
    display(non_250.groupby(["subject","sfreq_Hz"]).size().to_frame("n_files"))


Unique sampling rates (Hz): [np.float64(250.0), np.float64(500.0)]

Counts per sampling rate:
sfreq_Hz
250.0    148
500.0     13
Name: count, dtype: int64

⚠️ Found 13 files not at 250 Hz:


Unnamed: 0,file,subject,part,sfreq_Hz,n_eeg,duration_s
49,20UP-EDF+.edf,20UP,0,500.0,26,4535.0
55,24BC-EDF+.edf,24BC,0,500.0,32,1733.0
56,24BC-EDF+1.edf,24BC,1,500.0,32,148.0
60,26DM-EDF+.edf,26DM,0,500.0,26,411.0
61,26DM-EDF+1.edf,26DM,1,500.0,26,2885.0
126,60UP-EDF+.edf,60UP,0,500.0,25,42.0
127,60UP-EDF+1.edf,60UP,1,500.0,24,164.0
128,60UP-EDF+2.edf,60UP,2,500.0,26,632.0
149,76NH-EDF+.edf,76NH,0,500.0,26,2559.0
150,76NH-EDF+1.edf,76NH,1,500.0,26,709.0



Per-subject counts for non-250 Hz:


Unnamed: 0_level_0,Unnamed: 1_level_0,n_files
subject,sfreq_Hz,Unnamed: 2_level_1
20UP,500.0,1
24BC,500.0,2
26DM,500.0,2
60UP,500.0,3
76NH,500.0,5


#### 3.2: Annotation summaries for specific files + comparison

In [None]:
# Inspect and compare annotation labels for selected files (names must match df["file"])
from collections import Counter, defaultdict

def path_by_name(name: str):
    """Return full Path for a filename in edf_files."""
    p = next((p for p in edf_files if p.name == name), None)
    if p is None:
        raise FileNotFoundError(f"EDF '{name}' not found under {DATA_DIR}")
    return p

def summarize_annotations(edf_path: Path, top_n: int = 50):
    """Print per-label counts (and total duration per label) for one EDF."""
    raw = mne.io.read_raw_edf(edf_path, preload=False, verbose="ERROR")
    n = len(raw.annotations)
    print(f"\nFile: {edf_path.name} — n_annotations={n}")
    if n == 0:
        print("  (No annotations)")
        return

    # Convert annotations to plain Python types
    labels = list(map(str, raw.annotations.description))
    counts = Counter(labels)

    # Sum durations per label (seconds)
    dur_sum = defaultdict(float)
    for lbl, dur in zip(labels, raw.annotations.duration):
        dur_sum[str(lbl)] += float(dur)

    # Print up to top_n labels by count
    for i, (lbl, cnt) in enumerate(counts.most_common(top_n), start=1):
        print(f"  {i:02d}. {lbl}  —  count={cnt:3d}, total_dur={dur_sum[lbl]:.1f}s")

def compare_two_files(name_a: str, name_b: str):
    """Compare label sets and counts between two EDF files (A vs B)."""
    pa, pb = path_by_name(name_a), path_by_name(name_b)

    # Build summaries
    raw_a = mne.io.read_raw_edf(pa, preload=False, verbose="ERROR")
    raw_b = mne.io.read_raw_edf(pb, preload=False, verbose="ERROR")
    labels_a = list(map(str, raw_a.annotations.description)) if len(raw_a.annotations) else []
    labels_b = list(map(str, raw_b.annotations.description)) if len(raw_b.annotations) else []
    ca, cb = Counter(labels_a), Counter(labels_b)

    set_a, set_b = set(ca), set(cb)
    only_a = sorted(set_a - set_b)
    only_b = sorted(set_b - set_a)
    common = sorted(set_a & set_b)

    print(f"\n=== Compare ===")
    print(f"A: {pa.name}  (n_annotations={len(labels_a)})")
    print(f"B: {pb.name}  (n_annotations={len(labels_b)})")

    print("\nLabels only in A:")
    print("  (none)" if not only_a else "  " + ", ".join(only_a))

    print("\nLabels only in B:")
    print("  (none)" if not only_b else "  " + ", ".join(only_b))

    # Differences in counts for common labels
    diffs = [(lbl, ca[lbl], cb[lbl]) for lbl in common if ca[lbl] != cb[lbl]]
    print("\nCommon labels with different counts:")
    if not diffs:
        print("  (none)")
    else:
        for lbl, a_cnt, b_cnt in diffs[:50]:
            print(f"  {lbl}: A={a_cnt} vs B={b_cnt}")

# ---- Run on specific files ----
files_to_check = [
    "01CX-EDF+.edf",
    "01CX-EDF+1.edf",
    "02IT-EDF+.edf",
]

for fn in files_to_check:
    summarize_annotations(path_by_name(fn))

# Direct comparison between the two 01CX files with different n_annotations
compare_two_files("01CX-EDF+.edf", "01CX-EDF+1.edf")



File: 01CX-EDF+.edf — n_annotations=6
  01. Montage is now: ComAvgLwR  —  count=  2, total_dur=0.0s
  02. Impedance  —  count=  1, total_dur=0.0s
  03. Detections Inactive  —  count=  1, total_dur=0.0s
  04. Montage is now: LongLwR  —  count=  1, total_dur=0.0s
  05. Montage is now: TransLwR  —  count=  1, total_dur=0.0s

File: 01CX-EDF+1.edf — n_annotations=24
  01. Tiltale-X  —  count= 10, total_dur=162.4s
  02. Tiltale-Y  —  count= 10, total_dur=147.6s
  03. 00:11:56  —  count=  1, total_dur=0.0s
  04. Resting  —  count=  1, total_dur=600.7s
  05. spl giver medicin  —  count=  1, total_dur=0.0s
  06. spl tager blodprøve  —  count=  1, total_dur=0.0s

File: 02IT-EDF+.edf — n_annotations=29
  01. Tiltale-X  —  count= 10, total_dur=196.1s
  02. Tiltale-Y  —  count= 10, total_dur=153.2s
  03. Resting  —  count=  3, total_dur=1839.3s
  04. Montage is now: ComAvgLwR  —  count=  2, total_dur=0.0s
  05. Impedance  —  count=  1, total_dur=0.0s
  06. Detections Inactive  —  count=  1, total_

### 4: Global stats (counts, durations, samplerates)

In [15]:
def fmt_hms(seconds: float) -> str:
    """Pretty-print seconds as H:MM:SS."""
    seconds = int(round(seconds))
    h = seconds // 3600
    m = (seconds % 3600) // 60
    s = seconds % 60
    return f"{h:d}:{m:02d}:{s:02d}"

total_duration = float(df["duration_s"].sum()) if len(df) else 0.0
avg_duration  = float(df["duration_s"].mean()) if len(df) else 0.0
min_dur = float(df["duration_s"].min()) if len(df) else 0.0
max_dur = float(df["duration_s"].max()) if len(df) else 0.0

print(f"Unique sampling rates (Hz): {sorted(sfreqs)}")
print(f"Total files: {len(df)}")
print(f"Total duration (all files): {fmt_hms(total_duration)}")
print(f"Average duration per file:  {fmt_hms(avg_duration)}")
print(f"Shortest / longest file:    {fmt_hms(min_dur)} / {fmt_hms(max_dur)}")


Unique sampling rates (Hz): [250.0, 500.0]
Total files: 161
Total duration (all files): 110:39:16
Average duration per file:  0:41:14
Shortest / longest file:    0:00:06 / 1:47:56


### 5: Per-subject summary (how much per patient)

In [16]:
# Files per subject
counts = df.groupby("subject").size().rename("n_files")

# Total duration per subject
durations = df.groupby("subject")["duration_s"].sum().rename("duration_s")

# Merge into one table
per_subject = pd.concat([counts, durations], axis=1).sort_values("duration_s", ascending=False)
per_subject["duration_hms"] = per_subject["duration_s"].map(fmt_hms)

print("Per-subject summary (top 15 by total duration):")
display(per_subject.head(15))

print(f"Unique subjects: {per_subject.shape[0]}")
if per_subject.shape[0] > 0:
    top = per_subject.iloc[0]
    print(f"Subject with most data: {top.name} — {top['n_files']} files, {top['duration_hms']}")


Per-subject summary (top 15 by total duration):


Unnamed: 0_level_0,n_files,duration_s,duration_hms
subject,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
39IM,10,9098.0,2:31:38
08KQ,1,6476.0,1:47:56
32CC,1,6298.0,1:44:58
23IA,1,6287.0,1:44:47
53BL,1,6266.0,1:44:26
33NI,2,5949.0,1:39:09
15QL,1,5816.0,1:36:56
50NT,1,5805.0,1:36:45
27UF,1,5804.0,1:36:44
07CN,1,5781.0,1:36:21


Unique subjects: 79
Subject with most data: 39IM — 10 files, 2:31:38


### 6: Channel overlap (common channels across all files)

In [23]:
if channel_sets:
    common = sorted(set.intersection(*channel_sets)) if len(channel_sets) > 1 else sorted(next(iter(channel_sets)))
    print(f"Common EEG channels across all files: {len(common)}")
    # Print up to first 30 names, to keep output short
    print(common[:30] + (["..."] if len(common) > 30 else []))
else:
    print("No channel information found (empty channel_sets).")

Common EEG channels across all files: 23
['Bursts', 'ECG EKG-REF', 'EEG C3-REF', 'EEG C4-REF', 'EEG Cz-REF', 'EEG F3-REF', 'EEG F4-REF', 'EEG F7-REF', 'EEG F8-REF', 'EEG Fp1-REF', 'EEG Fp2-REF', 'EEG Fz-REF', 'EEG O1-REF', 'EEG O2-REF', 'EEG P3-REF', 'EEG P4-REF', 'EEG P7-REF', 'EEG P8-REF', 'EEG Pz-REF', 'EEG T7-REF', 'EEG T8-REF', 'IBI', 'Suppr']


#### 6.1: See how frequent each channel appears across files

In [27]:
# See how frequent each channel appears across files (helps choose a near-universal set, e.g. ≥95%)
from collections import Counter

union_counts = Counter()
for s in channel_sets:
    union_counts.update(s)

n_files_total = len(channel_sets)
print(f"Total unique channels across all files: {len(union_counts)}")

# Top 40 most common channels
top = union_counts.most_common(40)
print("\nMost common channels (name : count / coverage):")
for name, cnt in top:
    cov = 100.0 * cnt / n_files_total
    print(f"  {name:>6} : {cnt:3d} / {cov:5.1f}%")

# Example: channels present in 100% of files (a practical nearly-common set)
near_common = [ch for ch, cnt in union_counts.items() if cnt >= 1 * n_files_total]
near_common = sorted(near_common)
print(f"\nChannels present in 100% of files: {len(near_common)}")
print(near_common[:30] + (['...'] if len(near_common) > 30 else []))


Total unique channels across all files: 37

Most common channels (name : count / coverage):
  ECG EKG-REF : 161 / 100.0%
  Bursts : 161 / 100.0%
  EEG F7-REF : 161 / 100.0%
  EEG Fp2-REF : 161 / 100.0%
  EEG Pz-REF : 161 / 100.0%
  EEG Fp1-REF : 161 / 100.0%
  EEG F3-REF : 161 / 100.0%
  EEG P4-REF : 161 / 100.0%
  EEG F8-REF : 161 / 100.0%
  EEG O1-REF : 161 / 100.0%
  EEG F4-REF : 161 / 100.0%
  EEG Cz-REF : 161 / 100.0%
  EEG T8-REF : 161 / 100.0%
  EEG P8-REF : 161 / 100.0%
     IBI : 161 / 100.0%
  EEG O2-REF : 161 / 100.0%
  EEG Fz-REF : 161 / 100.0%
  EEG P3-REF : 161 / 100.0%
   Suppr : 161 / 100.0%
  EEG C4-REF : 161 / 100.0%
  EEG P7-REF : 161 / 100.0%
  EEG T7-REF : 161 / 100.0%
  EEG C3-REF : 161 / 100.0%
  Photic-REF : 159 /  98.8%
  Pulse Rate : 153 /  95.0%
  EEG P9-REF :  28 /  17.4%
  EEG F9-REF :  27 /  16.8%
  EEG T10-REF :  27 /  16.8%
  EEG F10-REF :  27 /  16.8%
  EEG P10-REF :  27 /  16.8%
  EEG T9-REF :  27 /  16.8%
  EOG AOG-REF :  13 /   8.1%
  EMG2-REF :   3 

### 7: Split recordings (…EDF+, …EDF+1, …EDF+2, …)

In [24]:
# Subjects with more than one part recorded
split_subjects = {k: sorted(v) for k, v in split_map.items() if len(v) > 1}
print(f"Subjects with split recordings: {len(split_subjects)}")
# Show up to 10
for i, (subj, parts) in enumerate(list(split_subjects.items())[:10], start=1):
    print(f"{i:2d}. {subj}: parts {parts}")

Subjects with split recordings: 37
 1. 01CX: parts [0, 1]
 2. 03TN: parts [0, 1, 2, 3, 4, 5]
 3. 04IW: parts [0, 1, 2, 3, 4, 5, 6]
 4. 05IX: parts [0, 1, 2]
 5. 06IP: parts [0, 1]
 6. 10GK: parts [0, 1]
 7. 11JU: parts [0, 1, 2, 3, 4, 5]
 8. 12SG: parts [0, 1, 2]
 9. 13IS: parts [0, 1, 2, 3]
10. 18TH: parts [0, 1, 2, 3]


Here, we can see how many recordings we have for each patient.

### 8: Annotation labels (quick peek on up to 3 files)

In [25]:
# Collect unique annotation descriptions over up to 3 files as a quick glance
labels = set()
for f in edf_files[:3]:
    raw = mne.io.read_raw_edf(f, preload=False, verbose="ERROR")
    if len(raw.annotations) > 0:
        # raw.annotations.description is an array-like of strings
        labels.update(map(str, raw.annotations.description))

if labels:
    print(f"Unique annotation labels in sample (n={len(labels)}):")
    print(sorted(labels)[:30] + (["..."] if len(labels) > 30 else []))
else:
    print("No annotations found in the first few files.")

Unique annotation labels in sample (n=11):
['00:11:56', 'Detections Inactive', 'Impedance', 'Montage is now: ComAvgLwR', 'Montage is now: LongLwR', 'Montage is now: TransLwR', 'Resting', 'Tiltale-X', 'Tiltale-Y', 'spl giver medicin', 'spl tager blodprøve']


Cell 8 quickly lists the unique annotation labels by scanning the first few EDF files (metadata only). It prints label names only—no counts or durations—doesn’t save or modify anything, and helps you see what label types exist; widen the slice to scan more files.