# Sleep-EDF Preprocessing (EEG → Epochs → NPZ)

This notebook converts **Sleep-EDF** EDF recordings into per-night `.npz` files (`X`, `y`) and an optional combined dataset (`X_all`, `y_all`, `night_ids_all`).

**Pipeline:** EDF files → MNE preprocessing → 30s epochs → stage mapping → saved artifacts.

## Preprocessing specifications (reproducible)

- Dataset: Sleep-EDF Expanded (sleep-cassette subset)
- Input files (not included in repo): `*PSG.edf` and `*Hypnogram.edf`
- Channel: `EEG Fpz-Cz` (configurable via `CHANNEL`)
- Bandpass: 0.5–40 Hz (configurable via `BANDPASS_HZ`)
- Epoching: 30 seconds (configurable via `EPOCH_SEC`)
- Stage mapping: W→0, N1→1, N2→2, N3(3+4 merged)→3, REM→4
- Outputs:
  - Per-night artifacts: `{night_id}.npz` with `X`, `y`, plus metadata fields (`channel`, `bandpass`, `epoch_sec`)
  - Optional combined artifact: `sleep_edf_all_with_ids.npz` with `X`, `y`, `night_ids`

### How to run (recommended)

This notebook is meant to be *readable documentation* of the preprocessing pipeline.
For automation / reproducibility, use the CLI entrypoint defined at the bottom:

```bash
# Preprocess raw EDF → per-night NPZ
python Sleep_01_preprocessing_CLEAN.ipynb --preprocess --raw_dir <RAW_DIR> --out_dir <OUT_DIR>

# Combine per-night NPZ → one dataset with night ids
python Sleep_01_preprocessing_CLEAN.ipynb --combine --processed_dir <OUT_DIR> --out_path <OUT_PATH>
```

(If you convert this into a `.py` module later, these exact flags transfer cleanly.)


In [13]:
!pip install numpy mne

In [10]:
# --- Setup: paths & constants ---
import os, glob
import numpy as np
import mne

# Configure a base directory so this runs on Colab *and* locally.
# In Colab you can set: os.environ["SLEEP_BCI_BASEDIR"] = "/content/drive/MyDrive/sleep-insights"
BASE_DIR = os.getenv("SLEEP_BCI_BASEDIR", "/content/drive/MyDrive/sleep-insights")

RAW_DIR = os.path.join(BASE_DIR, "raw_edf", "sleep-cassette")
OUT_DIR = os.path.join(BASE_DIR, "processed")
os.makedirs(OUT_DIR, exist_ok=True)

print("RAW_DIR:", RAW_DIR)
print("Exists?", os.path.exists(RAW_DIR))
print("Files:", os.listdir(RAW_DIR)[:5] if os.path.exists(RAW_DIR) else "NO DIR")


# Sleep stage mapping (merge stages 3 & 4 → N3)
STAGE_MAP = {
    "Sleep stage W": 0,  # Wake
    "Sleep stage 1": 1,  # N1
    "Sleep stage 2": 2,  # N2
    "Sleep stage 3": 3,  # N3
    "Sleep stage 4": 3,  # N3 (merged)
    "Sleep stage R": 4,  # REM
}

# Target channel & epoch params
CHANNEL = "EEG Fpz-Cz"
BANDPASS_HZ = (0.5, 40.0)
EPOCH_SEC = 30
SFREQ_TARGET = None  # keep original; set e.g. 100 to resample


RAW_DIR: /content/drive/MyDrive/sleep-insights/raw_edf/sleep-cassette
Exists? True
Files: ['processed']


In [8]:
def build_hypnogram_lookup(raw_dir: str) -> dict:
    """Map a PSG prefix like 'SC4001E' to the corresponding hypnogram EDF path."""
    if not os.path.exists(raw_dir):
        raise FileNotFoundError(f"RAW_DIR does not exist: {raw_dir}")

    hyp_files = glob.glob(os.path.join(raw_dir, "*Hypnogram.edf"))
    lookup = {}
    for h in hyp_files:
        # Example filename: SC4001EC-Hypnogram.edf → key 'SC4001E'
        prefix = os.path.basename(h).split("-")[0]      # SC4001EC
        lookup[prefix[:-1]] = h                         # SC4001E
    return lookup


def process_one_night(psg_path: str, hyp_path: str) -> tuple[np.ndarray, np.ndarray]:
    """Load PSG + hypnogram EDF, return (X, y) for this night."""
    raw = mne.io.read_raw_edf(psg_path, preload=True, verbose=False)

    ann = mne.read_annotations(hyp_path)
    raw.set_annotations(ann)

    # Keep one EEG channel
    raw.pick([CHANNEL])

    # Optional resampling (disabled by default)
    if SFREQ_TARGET is not None:
        raw.resample(SFREQ_TARGET, verbose=False)

    # Basic filtering
    raw.filter(BANDPASS_HZ[0], BANDPASS_HZ[1], verbose=False)

    # Convert annotations → events
    events, event_id = mne.events_from_annotations(raw, verbose=False)

    # Keep only labels we can map
    valid_event_id = {k: v for k, v in event_id.items() if k in STAGE_MAP}
    if not valid_event_id:
        raise ValueError("No valid sleep stages found in hypnogram annotations.")

    epochs = mne.Epochs(
        raw,
        events,
        event_id=valid_event_id,
        tmin=0,
        tmax=EPOCH_SEC,
        baseline=None,
        preload=True,
        verbose=False,
    )

    # Get epoch data
    X = epochs.get_data()

    # Standardize epoch length if off-by-one occurs
    if X.shape[-1] > 3000:
        X = X[:, :, :3000]

    # Map event integers back to annotation strings, then to STAGE_MAP values
    inv_event_id = {v: k for k, v in valid_event_id.items()}
    y = np.array([STAGE_MAP[inv_event_id[e]] for e in epochs.events[:, 2]], dtype=np.int64)

    return X, y


def preprocess_all_nights(raw_dir: str, out_dir: str) -> tuple[int, int]:
    """Iterate PSG files, save per-night X/y npz files. Returns (kept, skipped).

    Fail-fast behavior:
      - Raises if raw_dir does not exist or contains no PSG EDF files.
      - Raises if no hypnograms are found.
    """
    if not os.path.exists(raw_dir):
        raise FileNotFoundError(f"RAW_DIR does not exist: {raw_dir}")

    os.makedirs(out_dir, exist_ok=True)

    psg_files = sorted(glob.glob(os.path.join(raw_dir, "*PSG.edf")))
    if not psg_files:
        sample = os.listdir(raw_dir)[:10] if os.path.isdir(raw_dir) else []
        raise ValueError(
            "No PSG EDF files found.
"
            f"Expected pattern: {os.path.join(raw_dir, '*PSG.edf')}
"
            f"Directory sample: {sample}"
        )

    hyp_lookup = build_hypnogram_lookup(raw_dir)
    if not hyp_lookup:
        raise ValueError(
            "No hypnogram EDF files found.
"
            f"Expected pattern: {os.path.join(raw_dir, '*Hypnogram.edf')}"
        )

    print(f"Found {len(psg_files)} PSG files")
    print(f"Found {len(hyp_lookup)} hypnogram keys")

    kept, skipped = 0, 0

    for psg in psg_files:
        base = os.path.basename(psg).replace("-PSG.edf", "")  # SC4001E0
        key = base[:-1]                                       # SC4001E

        hyp = hyp_lookup.get(key)
        if hyp is None:
            print(f"❌ Missing hypnogram for {base}, skipping")
            skipped += 1
            continue

        try:
            X, y = process_one_night(psg, hyp)

            out_path = os.path.join(out_dir, f"{base}.npz")
            np.savez(
                out_path,
                X=X,
                y=y,
                channel=CHANNEL,
                bandpass=BANDPASS_HZ,
                epoch_sec=EPOCH_SEC,
                sfreq_target=SFREQ_TARGET,
            )

            print(f"✅ Saved {os.path.basename(out_path)} | epochs: {len(y)}")
            kept += 1

        except Exception as e:
            print(f"❌ Error processing {base}: {e}")
            skipped += 1

    print("
======================")
    print("✅ Done preprocessing")
    print(f"Kept nights: {kept}")
    print(f"Skipped nights: {skipped}")
    return kept, skipped


In [9]:
# --- Run preprocessing ---
# This writes one .npz per night into OUT_DIR.
kept, skipped = preprocess_all_nights(RAW_DIR, OUT_DIR)


Found 0 PSG files
Found 0 hypnogram keys

✅ Done preprocessing
Kept nights: 0
Skipped nights: 0


In [None]:
def combine_nights(processed_dir: str, out_path: str) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Combine nightly .npz files into one dataset + night_ids."""
    if not os.path.exists(processed_dir):
        raise FileNotFoundError(f"processed_dir does not exist: {processed_dir}")

    night_files = sorted([
        f for f in glob.glob(os.path.join(processed_dir, "*.npz"))
        if "sleep_edf_all" not in os.path.basename(f)
    ])

    if not night_files:
        raise FileNotFoundError("No nightly .npz files found. Run preprocessing first.")

    print(f"Found {len(night_files)} nightly files")

    X_all, y_all, night_ids_all = [], [], []

    for night_idx, f in enumerate(night_files):
        data = np.load(f, allow_pickle=True)
        X = data["X"]
        y = data["y"]

        # Fix off-by-one issue if present (3001 -> 3000)
        if X.shape[-1] == 3001:
            X = X[:, :, :3000]

        if X.shape[0] != y.shape[0]:
            raise ValueError(f"Mismatch in {f}: X has {X.shape[0]} epochs but y has {y.shape[0]} labels")

        X_all.append(X)
        y_all.append(y)
        night_ids_all.append(np.full(len(y), night_idx, dtype=np.int32))

        print(f"{os.path.basename(f)} → epochs: {len(y)} | night_id: {night_idx}")

    X_all = np.concatenate(X_all, axis=0)
    y_all = np.concatenate(y_all, axis=0)
    night_ids_all = np.concatenate(night_ids_all, axis=0)

    np.savez(out_path, X=X_all, y=y_all, night_ids=night_ids_all)

    print("\n✅ Final dataset shapes")
    print("X:", X_all.shape)
    print("y:", y_all.shape)
    print("night_ids:", night_ids_all.shape)
    print(f"✅ Saved combined dataset: {out_path}")

    return X_all, y_all, night_ids_all


In [None]:
# --- Combine into one dataset (optional) ---
COMBINED_PATH = os.path.join(OUT_DIR, "sleep_edf_all_with_ids.npz")
X_all, y_all, night_ids_all = combine_nights(OUT_DIR, COMBINED_PATH)

# Minimal sanity check
print("Label counts:", dict(zip(*np.unique(y_all, return_counts=True))))


In [ ]:
# --- CLI entrypoint (optional, for reproducible runs) ---
# This cell lets you run preprocessing/combining as a script-like interface.
# In a notebook kernel, it will NOT auto-run (guarded by "__file__" check).
#
# Example usage after you export this notebook to a .py script:
#   python Sleep_01_preprocessing_CLEAN.py --preprocess --raw_dir data/raw/sleep-cassette --out_dir data/processed
#   python Sleep_01_preprocessing_CLEAN.py --combine --processed_dir data/processed --out_path data/processed/sleep_edf_all_with_ids.npz

import argparse

def cli(argv=None):
    parser = argparse.ArgumentParser(description="Sleep-EDF preprocessing utilities")
    parser.add_argument("--preprocess", action="store_true", help="Run raw EDF → per-night NPZ preprocessing")
    parser.add_argument("--combine", action="store_true", help="Combine per-night NPZ into one dataset with night_ids")

    parser.add_argument("--raw_dir", type=str, default=RAW_DIR, help="Directory containing *PSG.edf and *Hypnogram.edf")
    parser.add_argument("--out_dir", type=str, default=OUT_DIR, help="Output directory for per-night .npz files")
    parser.add_argument("--processed_dir", type=str, default=OUT_DIR, help="Directory containing per-night .npz files")
    parser.add_argument("--out_path", type=str, default=os.path.join(OUT_DIR, "sleep_edf_all_with_ids.npz"),
                        help="Path to write combined dataset .npz")

    args = parser.parse_args(argv)

    if not (args.preprocess or args.combine):
        parser.error("Choose an action: --preprocess and/or --combine")

    if args.preprocess:
        preprocess_all_nights(args.raw_dir, args.out_dir)

    if args.combine:
        combine_nights(args.processed_dir, args.out_path)

# Only auto-run when executed as a script (not in an interactive notebook kernel).
if __name__ == "__main__" and "__file__" in globals():
    cli()
