In [12]:
import os
import glob
import re
import h5py
import numpy as np
from typing import Tuple, Literal, Optional

In [None]:

_CHUNK_RE = re.compile(r'_(\d+)\.h5$')          # capture the trailing “…_<chunk>.h5”

def _get_dataset_name(path: str) -> str:
    """Return the internal dataset name inside the HDF5 file."""
    return "_".join(os.path.basename(path).split('_')[:-1])

def load_participant_arrays(participant_id: int, base_dir: str = "train"):
    """
    Concatenate all chunks for every task of one participant and return
    four NumPy arrays in the order:
        rest, task_motor, task_story_math, task_working_memory

    Each array has shape (n_nodes, total_timepoints) or is None.
    """
    # buckets: task → list[(chunk_number, matrix)]
    buckets = {
        "rest":               [],
        "task_motor":         [],
        "task_story_math":    [],
        "task_working_memory": []
    }

    # find all relevant files, e.g. rest_105923_1.h5
    pattern = os.path.join(base_dir, f"*_{participant_id}_*.h5")
    for path in glob.glob(pattern):
        ds_name = _get_dataset_name(path)

        # identify task & chunk number
        task = next((t for t in buckets if ds_name.startswith(t)), None)
        if task is None:
            continue  # skip unrecognised file

        chunk_match = _CHUNK_RE.search(path)
        chunk_num = int(chunk_match.group(1)) if chunk_match else 0

        # load matrix
        with h5py.File(path, "r") as f:
            matrix = f[ds_name][()]        # (nodes, timepoints)

        buckets[task].append((chunk_num, matrix))

    # concatenate chunks for each task
    out = []
    for task, lst in buckets.items():
        if not lst:
            out.append(None)
            continue

        # sort by chunk number to keep temporal order
        lst.sort(key=lambda item: item[0])
        matrices = [m for _, m in lst]

        # sanity‑check dimensionality: all chunks must share the node axis size
        first_rows = matrices[0].shape[0]
        if not all(mat.shape[0] == first_rows for mat in matrices):
            raise ValueError(f"Inconsistent node counts in {task} chunks for participant {participant_id}")

        # concat along time axis (axis=1)
        out.append(np.concatenate(matrices, axis=1))

    return tuple(out)  # (rest, motor, story_math, working_memory)


In [11]:
rest_arr, motor_arr, story_math_arr, wm_arr = load_participant_arrays(105923, base_dir="train")

print("Rest shape:",               None if rest_arr is None else rest_arr.shape)
print("Motor shape:",              None if motor_arr is None else motor_arr.shape)
print("Story‑Math shape:",         None if story_math_arr is None else story_math_arr.shape)
print("Working‑Memory shape:",     None if wm_arr is None else wm_arr.shape)


Rest shape: (248, 284992)
Motor shape: (248, 284992)
Story‑Math shape: (248, 284992)
Working‑Memory shape: (248, 284992)


In [14]:
def minmax_scale(
        arr: np.ndarray,
        axis: int = 1,
        eps: float = 1e-12
) -> np.ndarray:
    """Per‑row/column min‑max scaling (default: rows = nodes)."""
    mins = arr.min(axis=axis, keepdims=True)
    maxs = arr.max(axis=axis, keepdims=True)
    return (arr - mins) / (maxs - mins + eps)

def zscore(
        arr: np.ndarray,
        axis: int = 1,
        eps: float = 1e-12
) -> np.ndarray:
    """Per‑row/column standardisation."""
    means = arr.mean(axis=axis, keepdims=True)
    stds  = arr.std(axis=axis, keepdims=True)
    return (arr - means) / (stds + eps)

def downsample(
        arr: np.ndarray,
        *,
        factor: Optional[int] = None,
        target_rate: Optional[int] = None,
        orig_rate: int = 2034,
        axis: int = 1,
        method: Literal["slice", "decimate"] = "slice"
) -> np.ndarray:
    """
    Reduce the temporal resolution.

    Provide EITHER `factor`  (e.g. 16 → 1 sample every 16)
           OR    `target_rate` (e.g. 128 Hz from 2034 Hz → factor≈16).

    `method="slice"` is lightning‑fast subsampling.
    `method="decimate"` (needs SciPy) applies an anti‑aliasing filter.
    """
    if factor is None:
        if target_rate is None:
            raise ValueError("Specify either factor or target_rate")
        factor = int(round(orig_rate / target_rate))

    if factor <= 1:
        return arr  # already at (or above) target resolution

    if method == "slice":
        slicer = [slice(None)] * arr.ndim
        slicer[axis] = slice(None, None, factor)
        return arr[tuple(slicer)]

    elif method == "decimate":
        from scipy.signal import decimate
        return decimate(arr, factor, axis=axis, zero_phase=True)

    else:
        raise ValueError("method must be 'slice' or 'decimate'")

In [15]:
# Suppose `rest_arr` has shape (248, 35624)

rest_scaled  = minmax_scale(rest_arr)          # values in [0, 1]
rest_standardised = zscore(rest_arr)           # mean 0 / std 1
rest_down128 = downsample(rest_arr, target_rate=128)   # ≈128 Hz via slicing
