<a href="https://colab.research.google.com/github/MeiChenc/Aurevia/blob/main/EEGpreprocess.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip -q install mne h5py

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m22.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
%%writefile preprocess.py
#one-by-one pipeline for chb05~chb23
import os, re, glob, shutil, gc, numpy as np, mne, h5py, subprocess

#initial settings
FS_TARGET = 256.0
BANDPASS = (0.5, 45.0)
NOTCH_HZ = 60.0
WIN_SEC = 2.0
STRIDE_SEC = 0.5
PRE_ICTAL_MIN = 30
N_CHANNELS_KEEP = 4


BASE_URL = "https://physionet.org/files/chbmit/1.0.0"
DATA_DIR = "/content/drive/MyDrive/chbmit"
OUTPUT_DIR = "/content/drive/MyDrive/processed_data_npy"
CACHE_DIR = "/content/drive/MyDrive/processed_cache"
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)


def run(cmd):
    """執行 shell 指令並即時輸出（遇錯不中斷流程）"""
    print(f"[CMD] {cmd}")
    try:
        subprocess.run(cmd, shell=True, check=True)
    except subprocess.CalledProcessError as e:
        print(f"[WARN] command failed: {e}")

def free_gb(path="/content/drive/MyDrive"):
    st = os.statvfs(path)
    return st.f_bavail * st.f_frsize / (1024**3)

def human(n):
    return f"{n:.2f} GB"

def download_patient(patient: str):
    pdir = os.path.join(DATA_DIR, patient)
    os.makedirs(pdir, exist_ok=True)

    cmd = (
        f'cd "{DATA_DIR}" && '
        f'wget -r -np -nH --cut-dirs=3 -N '
        f'-A "*.edf,*.txt" "{BASE_URL}/{patient}/"'
    )
    run(cmd)
    # edf files
    edfs = glob.glob(os.path.join(pdir, f"{patient}_*.edf"))
    if not edfs:
        raise RuntimeError(f"[ERR] {patient}: no EDF downloaded.")
    print(f"[INFO] {patient}: EDF count = {len(edfs)}")

# read summary
def parse_summary_any(summary_path: str):
    if not os.path.exists(summary_path):
        print(f"[WARN] summary not found: {summary_path} -> treat as no-seizure")
        return {}
    with open(summary_path, "r", errors="ignore") as f:
        txt = f.read()
    blocks = re.split(r"(?i)\bFile Name:\s*", txt)
    seizures = {}
    for b in blocks[1:]:
        fname = b.splitlines()[0].strip()
        start_pat = r"(?i)Seizure(?:\s+\d+)?\s+Start\s+Time:\s*(\d+)\s*(?:sec|seconds)?"
        end_pat   = r"(?i)Seizure(?:\s+\d+)?\s+End\s+Time:\s*(\d+)\s*(?:sec|seconds)?"
        starts = [int(x) for x in re.findall(start_pat, b)]
        ends   = [int(x) for x in re.findall(end_pat, b)]
        pairs = [(s, e) for s, e in zip(starts, ends) if e >= s]
        seizures[fname] = pairs
    return seizures

# read edfs
def load_raw_mne(edf_path: str):
    raw = mne.io.read_raw_edf(edf_path, preload=True, verbose=False)
    if N_CHANNELS_KEEP and len(raw.ch_names) > N_CHANNELS_KEEP:
        raw.pick_channels(raw.ch_names[:N_CHANNELS_KEEP])
    raw.filter(BANDPASS[0], BANDPASS[1], fir_design="firwin", verbose=False)
    if NOTCH_HZ:
        try:
            raw.notch_filter(freqs=[NOTCH_HZ], verbose=False)
        except Exception as e:
            print(f"[WARN] notch_filter failed on {edf_path}: {e}")
    if raw.info["sfreq"] != FS_TARGET:
        raw.resample(FS_TARGET, npad="auto", verbose=False)
    return raw

def make_windows_and_labels(raw: mne.io.Raw, ictal_spans_sec, pre_minutes: int):
    x = raw.get_data()  # (C, T)
    fs = float(raw.info["sfreq"]); T = x.shape[1]
    win = int(round(WIN_SEC * fs)); hop = int(round(STRIDE_SEC * fs))
    if T < win:
        return np.empty((0,0,0), dtype=np.float32), np.empty((0,), dtype=np.int64)

    #backtesting pre-spans in their files
    pre_spans = []
    for s, e in ictal_spans_sec:
        a = max(0.0, float(s) - pre_minutes*60.0)
        b = float(s)
        if b > a: pre_spans.append((a, b))

    def overlap(t0, t1, spans):
        for a, b in spans:
            if not (t1 <= a or t0 >= b):
                return True
        return False

    starts = np.arange(0, T - win + 1, hop, dtype=np.int64)
    X_list, y_list = [], []
    for st in starts:
        ed = st + win
        seg = x[:, st:ed]
        seg = (seg - seg.mean(axis=1, keepdims=True)) / (seg.std(axis=1, keepdims=True) + 1e-6)
        seg = seg.T.astype(np.float32)  # (win, C)
        t0, t1 = st/fs, ed/fs
        y = 0
        if ictal_spans_sec and overlap(t0, t1, ictal_spans_sec):
            y = 2
        elif pre_spans and overlap(t0, t1, pre_spans):
            y = 1
        X_list.append(seg); y_list.append(y)

    if not X_list:
        return np.empty((0,0,0), dtype=np.float32), np.empty((0,), dtype=np.int64)
    return np.stack(X_list, axis=0), np.asarray(y_list, dtype=np.int64)

def windows_for(edf_path, spans):
    try:
        raw = load_raw_mne(edf_path)
        return make_windows_and_labels(raw, spans, PRE_ICTAL_MIN)
    except Exception as e:
        print(f"[ERROR] {edf_path}: {e}")
        return np.empty((0,0,0), dtype=np.float32), np.empty((0,), dtype=np.int64)

# HDF5
def h5_init(h5_path, win, C):
    if not os.path.exists(h5_path):
        with h5py.File(h5_path, "w") as f:
            f.create_dataset("X", shape=(0, win, C), maxshape=(None, win, C),
                             dtype="float32", chunks=(2048, win, C), compression="lzf")
            f.create_dataset("y", shape=(0,), maxshape=(None,),
                             dtype="int64",   chunks=(8192,), compression="lzf")

def h5_append(h5_path, Xb, yb):
    with h5py.File(h5_path, "a") as f:
        n0 = f["X"].shape[0]; n1 = n0 + Xb.shape[0]
        f["X"].resize((n1, Xb.shape[1], Xb.shape[2])); f["X"][n0:n1] = Xb
        f["y"].resize((n1,)); f["y"][n0:n1] = yb
        f.flush()

def h5_to_npy(h5_path, out_x, out_y, batch=20000):
    with h5py.File(h5_path, "r") as f:
        N, win, C = f["X"].shape
        X_mm = np.lib.format.open_memmap(out_x, mode="w+", dtype="float32", shape=(N, win, C))
        Y_mm = np.lib.format.open_memmap(out_y, mode="w+", dtype="int64",   shape=(N,))
        i = 0
        while i < N:
            j = min(i + batch, N)
            X_mm[i:j] = f["X"][i:j]
            Y_mm[i:j] = f["y"][i:j]
            i = j
        del X_mm, Y_mm
    return N

# filiiing npy for each patient
def process_patient(patient: str):
    print("="*90)
    print(f"[START] {patient} | free={human(free_gb())}")

    pdir = os.path.join(DATA_DIR, patient)
    h5_path = os.path.join(CACHE_DIR, f"{patient}.h5")
    out_x = os.path.join(OUTPUT_DIR, f"X_{patient}.npy")
    out_y = os.path.join(OUTPUT_DIR, f"y_{patient}.npy")

    #skip the existing
    if os.path.exists(out_x) and os.path.exists(out_y):
        print(f"[SKIP] {patient} npy already exists.")
        return

    if not os.path.isdir(pdir) or not glob.glob(os.path.join(pdir, f"{patient}_*.edf")):
        print(f"[DL] downloading {patient} …")
        download_patient(patient)

    # summary
    summary_path = os.path.join(pdir, f"{patient}-summary.txt")
    seizures_by_file = parse_summary_any(summary_path)

    #  append to H5
    edf_files = sorted(glob.glob(os.path.join(pdir, f"{patient}_*.edf")))
    h5_ready = False
    total_files = 0

    for edf_path in edf_files:
        fname = os.path.basename(edf_path)
        spans = seizures_by_file.get(fname, [])
        Xb, yb = windows_for(edf_path, spans)
        if Xb.size == 0:
            print(f"[INFO] {fname}: windows=0 (skip)")
            continue
        if not h5_ready:
            win, C = Xb.shape[1], Xb.shape[2]
            h5_init(h5_path, win, C)
            h5_ready = True
        h5_append(h5_path, Xb, yb)
        cnt = dict(zip(*np.unique(yb, return_counts=True)))
        print(f"[OK] {fname}: windows={Xb.shape[0]}, labels={cnt}")
        del Xb, yb, cnt; gc.collect()
        total_files += 1

    if not h5_ready:
        raise RuntimeError(f"[ERR] {patient}: no data appended to H5.")

    # convert to npy
    N = h5_to_npy(h5_path, out_x, out_y, batch=20000)
    y = np.load(out_y, mmap_mode="r")
    from collections import Counter
    print(f"[SAVE] {patient}: X.shape={[N, win, C]}, y.shape={(N,)}, labels={Counter(np.asarray(y))}")

    # delete original files and keep npy
    print(f"[CLEAN] deleting raw EDF and cache for {patient} …")
    try:
        shutil.rmtree(pdir)
    except Exception as e:
        print(f"[WARN] remove {pdir} failed: {e}")
    try:
        os.remove(h5_path)
    except Exception as e:
        print(f"[WARN] remove {h5_path} failed: {e}")

    print(f"[DONE] {patient} | free={human(free_gb())}")

# for all patient
patients = [f"chb{n:02d}" for n in range(5, 24)]

for patient in patients:
    # Aware the storage
    if free_gb() < 9.0:
        print(f"[WAIT] free space={human(free_gb())} too low; please free space and rerun this cell.")
        break
    process_patient(patient)


Writing preprocess.py


In [4]:
!ls /content/drive/MyDrive/processed_data_npy

X_chb05.npy  X_chb07.npy  y_chb04.npy  y_chb06.npy  y_chb08.npy
X_chb06.npy  X_chb08.npy  y_chb05.npy  y_chb07.npy  y_chb09.npy
