### SETUP

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!wget -c -O EEG.zip "https://zenodo.org/records/1199011/files/DATA_preproc.zip?download=1"

--2026-02-16 21:09:34--  https://zenodo.org/records/1199011/files/DATA_preproc.zip?download=1
Resolving zenodo.org (zenodo.org)... 137.138.52.235, 188.185.43.153, 188.185.48.75, ...
Connecting to zenodo.org (zenodo.org)|137.138.52.235|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1838947136 (1.7G) [application/octet-stream]
Saving to: ‘EEG.zip’


2026-02-16 21:10:52 (22.4 MB/s) - ‘EEG.zip’ saved [1838947136/1838947136]



In [None]:
!unzip -q EEG.zip -d EEG_data
!ls EEG_data

S10_data_preproc.mat  S16_data_preproc.mat  S4_data_preproc.mat
S11_data_preproc.mat  S17_data_preproc.mat  S5_data_preproc.mat
S12_data_preproc.mat  S18_data_preproc.mat  S6_data_preproc.mat
S13_data_preproc.mat  S1_data_preproc.mat   S7_data_preproc.mat
S14_data_preproc.mat  S2_data_preproc.mat   S8_data_preproc.mat
S15_data_preproc.mat  S3_data_preproc.mat   S9_data_preproc.mat


In [None]:
!git clone https://github.com/MHM-Rajpoot/SMM.git

Cloning into 'SMM'...
remote: Enumerating objects: 17, done.[K
remote: Counting objects: 100% (17/17), done.[K
remote: Compressing objects: 100% (16/16), done.[K
remote: Total 17 (delta 2), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (17/17), 13.78 KiB | 3.44 MiB/s, done.
Resolving deltas: 100% (2/2), done.


In [None]:
!pip install soundfile librosa matplotlib --quiet

### Data Structure

In [None]:
import scipy.io
import numpy as np
from pathlib import Path

mat_files = sorted(Path('EEG_data').glob('S*_data_preproc.mat'))
raw = scipy.io.loadmat(mat_files[0])
data = raw['data'][0, 0]

# 1. Inspect 'event' structure
print("=== EVENT ===")
event = data['event']
print("event shape:", event.shape)
print("event dtype:", event.dtype.names)

# Show first few events
for i in range(min(5, event.shape[1])):
    e = event[0, i]
    print(f"  event[{i}]:", {name: e[name] for name in event.dtype.names})

# 2. Inspect 'cfg' structure
print("\n=== CFG ===")
cfg = data['cfg']
print("cfg dtype names:", cfg.dtype.names)

# 3. Check wavA and wavB (these might encode the attended stimulus!)
print("\n=== wavA / wavB ===")
print("wavA:", data['wavA'])
print("wavB:", data['wavB'])

=== EVENT ===
event shape: (1, 1)
event dtype: ('eeg',)
  event[0]: {'eeg': array([[(array([[1]], dtype=uint8), array([[array([[2]], dtype=uint8)]], dtype=object)),
        (array([[1]], dtype=uint8), array([[array([[2]], dtype=uint8)]], dtype=object)),
        (array([[1]], dtype=uint8), array([[array([[2]], dtype=uint8)]], dtype=object)),
        (array([[1]], dtype=uint8), array([[array([[2]], dtype=uint8)]], dtype=object)),
        (array([[1]], dtype=uint8), array([[array([[2]], dtype=uint8)]], dtype=object)),
        (array([[1]], dtype=uint8), array([[array([[2]], dtype=uint8)]], dtype=object)),
        (array([[1]], dtype=uint8), array([[array([[2]], dtype=uint8)]], dtype=object)),
        (array([[1]], dtype=uint8), array([[array([[2]], dtype=uint8)]], dtype=object)),
        (array([[1]], dtype=uint8), array([[array([[2]], dtype=uint8)]], dtype=object)),
        (array([[1]], dtype=uint8), array([[array([[2]], dtype=uint8)]], dtype=object)),
        (array([[1]], dtype=uint8)

In [None]:
# ── DIAGNOSTIC: inspect fsample structure for subject 1 ──────────────────────
import scipy.io
from pathlib import Path

mat_files = sorted(Path('EEG_data').glob('S*_data_preproc.mat'))
raw  = scipy.io.loadmat(mat_files[0])
data = raw['data'][0, 0]

fs_val = data['fsample']
print(f"Type:  {type(fs_val)}")
print(f"Value: {fs_val}")
print(f"dtype: {getattr(fs_val, 'dtype', 'N/A')}")
print(f"shape: {getattr(fs_val, 'shape', 'N/A')}")

# Walk every level
for i in range(8):
    item = fs_val.flat[0]
    print(f"  flat[0] level {i}: type={type(item)}  repr={repr(item)[:120]}")
    if isinstance(item, (int, float, np.integer, np.floating)):
        print(f"  ✓ scalar found at level {i}: {item}")
        break
    fs_val = item

Type:  <class 'numpy.ndarray'>
Value: [[(array([[64]], dtype=uint8), array([[64]], dtype=uint8), array([[64]], dtype=uint8))]]
dtype: [('eeg', 'O'), ('wavA', 'O'), ('wavB', 'O')]
shape: (1, 1)
  flat[0] level 0: type=<class 'numpy.void'>  repr=np.void((array([[64]], dtype=uint8), array([[64]], dtype=uint8), array([[64]], dtype=uint8)), dtype=[('eeg', 'O'), ('wav
  flat[0] level 1: type=<class 'numpy.void'>  repr=np.void((array([[64]], dtype=uint8), array([[64]], dtype=uint8), array([[64]], dtype=uint8)), dtype=[('eeg', 'O'), ('wav
  flat[0] level 2: type=<class 'numpy.void'>  repr=np.void((array([[64]], dtype=uint8), array([[64]], dtype=uint8), array([[64]], dtype=uint8)), dtype=[('eeg', 'O'), ('wav
  flat[0] level 3: type=<class 'numpy.void'>  repr=np.void((array([[64]], dtype=uint8), array([[64]], dtype=uint8), array([[64]], dtype=uint8)), dtype=[('eeg', 'O'), ('wav
  flat[0] level 4: type=<class 'numpy.void'>  repr=np.void((array([[64]], dtype=uint8), array([[64]], dtype=uint8), arr

In [None]:
# ── CONFIRM the structure and find ALL top-level fields ──────────────────────
import scipy.io
import numpy as np
from pathlib import Path

mat_files = sorted(Path('EEG_data').glob('S*_data_preproc.mat'))
raw  = scipy.io.loadmat(mat_files[0])
data = raw['data'][0, 0]

print("=== Top-level fields in data ===")
print(data.dtype.names)

print("\n=== fsample field ===")
fs_raw = data['fsample']
print(f"  dtype fields: {fs_raw.dtype.names}")
fs_void = fs_raw[0, 0]   # np.void
print(f"  'eeg' field:  {fs_void['eeg']}  → fs = {int(fs_void['eeg'].flat[0])}")
print(f"  'wavA' field: {fs_void['wavA']} → fs = {int(fs_void['wavA'].flat[0])}")
print(f"  'wavB' field: {fs_void['wavB']} → fs = {int(fs_void['wavB'].flat[0])}")

print("\n=== eeg trials shape ===")
print(f"  data['eeg'].shape: {data['eeg'].shape}")
print(f"  one trial shape:   {data['eeg'][0,0].shape}")

print("\n=== event structure ===")
ev = data['event'][0, 0]
print(f"  event dtype fields: {ev.dtype.names}")
ev_eeg = ev['eeg']
print(f"  event['eeg'].shape: {ev_eeg.shape}")
ev0 = ev_eeg[0, 0]
print(f"  event['eeg'][0,0] dtype: {ev0.dtype.names if hasattr(ev0,'dtype') else type(ev0)}")
print(f"  event['eeg'][0,0]['value']: {ev0['value']}")

=== Top-level fields in data ===
('dim', 'fsample', 'event', 'eeg', 'cfg', 'wavA', 'wavB')

=== fsample field ===
  dtype fields: ('eeg', 'wavA', 'wavB')
  'eeg' field:  [[64]]  → fs = 64
  'wavA' field: [[64]] → fs = 64
  'wavB' field: [[64]] → fs = 64

=== eeg trials shape ===
  data['eeg'].shape: (1, 60)
  one trial shape:   (3200, 66)

=== event structure ===
  event dtype fields: ('eeg',)
  event['eeg'].shape: (1, 60)
  event['eeg'][0,0] dtype: ('sample', 'value')
  event['eeg'][0,0]['value']: [[array([[2]], dtype=uint8)]]


### EEG to FRQMAP and TOPOMAP

In [None]:
# ============================================================================
# EEG TO 2D CONVERTER — FULL REWRITE (SMM + INCREMENTAL SAVE)
# Confirmed dataset structure:
#   fsample  : (1,1) struct  dtype=('eeg','wavA','wavB')  → each [[64]] uint8
#   eeg      : (1,60) trials → each (3200, 66) float  [50s @ 64Hz, 66ch]
#   event    : (1,1) struct  → ['eeg'] (1,60)
#                             → [0,t] struct dtype=('sample','value')
#                             → ['value'] [[array([[1or2]])]]  (3× flat[0])
#
# Key constraint:
#   fs=64Hz, SMM default nperseg=256 → window must be ≥ 256/64 = 4.0s
#   WINDOW_SEC=4.0 / STEP_SEC=0.5 satisfies this safely.
# ============================================================================

import sys
import tempfile
import scipy.io
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm

# ── SMM import ───────────────────────────────────────────────────────────────
_SMM_PATH = Path("SMM")
if _SMM_PATH.exists():
    sys.path.insert(0, str(_SMM_PATH))

try:
    from SMM.signal_maps.pipeline import generate_both_maps
    print("✓ SMM library loaded")
except ImportError:
    raise ImportError(
        "SMM not found — install with:\n"
        "  pip install git+https://github.com/MHM-Rajpoot/SMM.git"
    )


# ============================================================================
# CONFIG
# ============================================================================
class Config:
    DATA_DIR    = "EEG_data"
    OUTPUT_DIR  = "eeg_2d_outputs"
    TF_METHOD   = "cwt"

    # ── Window sizing ─────────────────────────────────────────────────────────
    # fs = 64 Hz  →  1 s = 64 samples
    # SMM internally uses nperseg = 256 by default.
    # Constraint:  WINDOW_SEC × fs  ≥  nperseg
    #              WINDOW_SEC       ≥  256 / 64  =  4.0 s  (minimum)
    #
    # Using 4.0 s window / 0.5 s step:
    #   • each chunk = 256 samples  (exactly meets nperseg=256)
    #   • finer step gives denser temporal coverage vs old 2s/1s
    WINDOW_SEC  = 4.0   # was 2.0 → produced 128-sample chunks → SMM crash
    STEP_SEC    = 0.5   # was 1.0 → finer step compensates for larger window

    OUTPUT_MODE = "medium"

    SAMPLING_STRATEGIES = {
        "full":      1.0,
        "half":      0.5,
        "quarter":   0.25,
        "tenth":     0.1,
        "hundredth": 0.01,
    }

    # Dataset constants (confirmed via diagnostic)
    N_TRIALS   = 60
    N_CHANNELS = 66

    # Minimum signal length SMM can process (= nperseg used internally)
    SMM_MIN_SAMPLES = 256


# ============================================================================
# DATA LOADER
# ============================================================================
class EEGDataLoader:
    """
    Loads KUL/similar MATLAB EEG datasets with the following confirmed layout:

        raw['data'][0,0]
        ├── fsample  (1,1) struct  dtype=('eeg','wavA','wavB')
        │                          each field: array([[64]], uint8)
        ├── eeg      (1,60)        each element: (3200, 66) float64
        └── event    (1,1) struct
                └── ['eeg']  (1,60)
                        └── [0,t]  struct dtype=('sample','value')
                                └── ['value']  [[array([[1or2]])]]
    """

    def __init__(self, data_dir: str = "EEG_data"):
        self.data_dir  = Path(data_dir)
        self.mat_files = sorted(self.data_dir.glob("S*_data_preproc.mat"))
        if not self.mat_files:
            raise FileNotFoundError(f"No S*_data_preproc.mat files found in '{data_dir}'")
        print(f"✓ Found {len(self.mat_files)} subject files")

    # ── public API ───────────────────────────────────────────────────────────

    def load_subject(self, file_path) -> dict:
        raw  = scipy.io.loadmat(str(file_path))
        data = raw["data"][0, 0]

        fs         = self._extract_fsample(data)
        eeg_trials = data["eeg"]                        # (1, 60)
        labels     = self._extract_labels(data)         # ndarray shape (60,)

        return {
            "subject":    Path(file_path).stem.split("_")[0],
            "fs":         fs,           # float, e.g. 64.0
            "eeg_trials": eeg_trials,   # (1, 60) object array
            "labels":     labels,       # (60,)  values in {1, 2}
        }

    def get_signal(self, subj_data: dict, trial_idx: int, ch_idx: int) -> np.ndarray:
        """Return 1-D signal array for (trial, channel). Shape: (n_samples,)."""
        return subj_data["eeg_trials"][0, trial_idx][:, ch_idx].copy()

    # ── private helpers ──────────────────────────────────────────────────────

    @staticmethod
    def _extract_fsample(data) -> float:
        """
        fsample is a (1,1) struct with fields 'eeg', 'wavA', 'wavB'.
        Each field contains array([[64]], dtype=uint8).
        We always use the 'eeg' field.
        """
        fs_struct = data["fsample"][0, 0]   # np.void
        return float(fs_struct["eeg"].flat[0])

    @staticmethod
    def _extract_labels(data) -> np.ndarray:
        """
        event['eeg'] has shape (1, 60).
        event['eeg'][0, t]['value'] → [[array([[1or2]])]]
        Three .flat[0] calls reach the uint8 scalar.
        """
        event_eeg = data["event"][0, 0]["eeg"]   # (1, 60)
        n_trials  = event_eeg.shape[1]
        labels    = []
        for t in range(n_trials):
            val = event_eeg[0, t]["value"]  # [[array([[2]])]]
            val = val.flat[0]               # array([[2]])
            val = val.flat[0]               # array([2])
            val = val.flat[0]               # uint8(2)
            labels.append(int(val))

        labels = np.array(labels)

        # Sanity checks
        assert labels.shape == (n_trials,), \
            f"Expected {n_trials} labels, got {labels.shape}"
        unexpected = set(labels) - {1, 2}
        assert not unexpected, f"Unexpected label values: {unexpected}"

        return labels


# ============================================================================
# SIGNAL SAMPLER
# ============================================================================
class SignalSampler:
    """
    Randomly selects a fraction of (trial, channel) pairs per subject.
    Selection is reproducible via fixed seed.
    """

    def __init__(self, strategy: str = "tenth", seed: int = 42):
        np.random.seed(seed)
        strategies = Config.SAMPLING_STRATEGIES

        if strategy in strategies:
            self.ratio = strategies[strategy]
            self.name  = strategy
        else:
            try:
                self.ratio = float(strategy)
                self.name  = f"custom({self.ratio:.1%})"
            except ValueError:
                raise ValueError(
                    f"Unknown strategy '{strategy}'. "
                    f"Choose from: {list(strategies.keys())} or a float 0–1."
                )

        print(f"Sampling strategy : {self.name} ({self.ratio:.1%})")

    def sample(self, n_subjects: int, n_trials: int, n_channels: int) -> list:
        """
        Returns list of (subj_idx, trial_idx, ch_idx) tuples.
        Each subject contributes `floor(n_trials × n_channels × ratio)` signals.
        """
        per_subj = max(1, int(n_trials * n_channels * self.ratio))
        selected = []
        for s in range(n_subjects):
            indices = np.random.choice(
                n_trials * n_channels, per_subj, replace=False
            )
            for idx in indices:
                selected.append((s, int(idx // n_channels), int(idx % n_channels)))

        print(f"→ {len(selected):,} signals selected "
              f"({per_subj} per subject × {n_subjects} subjects)")
        return selected


# ============================================================================
# EEG → WAV → 2D CONVERTER
# ============================================================================
class EEG2DConverter:
    """
    Converts a 1-D EEG signal to a 2-D time-frequency image via SMM.
    Intermediate WAV files are written to a temp directory and deleted
    immediately after conversion.
    """

    def __init__(self, output_dir: str = "eeg_2d_outputs"):
        self.out_root = Path(output_dir)
        self.out_root.mkdir(parents=True, exist_ok=True)
        self.temp_dir = Path(tempfile.mkdtemp(prefix="eeg_wav_"))
        self._cfg     = Config()

    # ── public API ───────────────────────────────────────────────────────────

    def convert(self, signal: np.ndarray, fs: float, signal_id: str) -> dict:
        """
        signal    : 1-D float array
        fs        : sampling rate (Hz)
        signal_id : unique string identifier used for filenames / folders

        Returns dict with keys 'output_dir' and 'preview'.
        Raises ValueError if signal is too short for SMM's internal nperseg.
        """
        # Guard: SMM uses nperseg=256 internally; chunk must be ≥ that length.
        # At fs=64Hz, WINDOW_SEC=4.0 → 256 samples (exactly the minimum).
        n_samples = len(signal)
        if n_samples < Config.SMM_MIN_SAMPLES:
            raise ValueError(
                f"Signal too short: {n_samples} samples < "
                f"SMM minimum {Config.SMM_MIN_SAMPLES}. "
                f"Increase WINDOW_SEC above "
                f"{Config.SMM_MIN_SAMPLES / fs:.2f}s at fs={fs:.0f}Hz."
            )

        wav_path = self.temp_dir / f"{signal_id}.wav"
        self._write_wav(signal, fs, wav_path)

        out_dir = self.out_root / signal_id
        out_dir.mkdir(exist_ok=True)

        generate_both_maps(
            input_file  = str(wav_path),
            output_dir  = str(out_dir),
            tf_method   = self._cfg.TF_METHOD,
            window_sec  = self._cfg.WINDOW_SEC,
            step_sec    = self._cfg.STEP_SEC,
            output_mode = self._cfg.OUTPUT_MODE,
        )

        wav_path.unlink(missing_ok=True)   # clean up temp WAV

        preview = self._find_preview(out_dir)
        return {"output_dir": str(out_dir), "preview": str(preview)}

    # ── private helpers ──────────────────────────────────────────────────────

    @staticmethod
    def _write_wav(signal: np.ndarray, fs: float, path: Path) -> None:
        from scipy.io import wavfile
        norm = signal / (np.max(np.abs(signal)) + 1e-10)
        wavfile.write(str(path), int(fs), (norm * 32767).astype(np.int16))

    @staticmethod
    def _find_preview(out_dir: Path) -> Path:
        for name in ("combined_frames_preview.png", "time_frequency_single.png"):
            p = out_dir / name
            if p.exists():
                return p
        return out_dir / "combined_frames_preview.png"   # fallback (may not exist)


# ============================================================================
# PIPELINE
# ============================================================================
class EEGPipeline:
    """
    Orchestrates loading → sampling → conversion → incremental CSV save.

    Output files (written to output_dir):
        results.csv  — full metadata per signal
        labels.csv   — minimal signal_id / attended label lookup
    """

    _RESULTS_COLS = [
        "signal_id", "subject", "trial", "channel",
        "attended", "status", "output_dir", "preview",
    ]
    _LABELS_COLS = ["signal_id", "attended"]

    def __init__(
        self,
        data_dir:   str = "EEG_data",
        output_dir: str = "eeg_2d_outputs",
        strategy:   str = "tenth",
    ):
        self.loader    = EEGDataLoader(data_dir)
        self.sampler   = SignalSampler(strategy)
        self.converter = EEG2DConverter(output_dir)
        self.out_dir   = Path(output_dir)

        self.results_file = self.out_dir / "results.csv"
        self.labels_file  = self.out_dir / "labels.csv"

        # Write headers (overwrites any previous run)
        pd.DataFrame(columns=self._RESULTS_COLS).to_csv(self.results_file, index=False)
        pd.DataFrame(columns=self._LABELS_COLS ).to_csv(self.labels_file,  index=False)

    # ── public API ───────────────────────────────────────────────────────────

    def run(self) -> pd.DataFrame:
        n_subj   = len(self.loader.mat_files)
        selected = self.sampler.sample(
            n_subj, Config.N_TRIALS, Config.N_CHANNELS
        )

        ok, err = 0, 0
        print("\nStarting conversion — results saved incrementally...\n")

        for subj_idx, trial_idx, ch_idx in tqdm(selected, desc="Converting"):
            sig_id = f"S{subj_idx+1:02d}_T{trial_idx:02d}_C{ch_idx:02d}"   # fallback ID
            try:
                subj_data = self.loader.load_subject(
                    self.loader.mat_files[subj_idx]
                )
                signal    = self.loader.get_signal(subj_data, trial_idx, ch_idx)
                sig_id    = f"{subj_data['subject']}_T{trial_idx:02d}_C{ch_idx:02d}"
                conv      = self.converter.convert(signal, subj_data["fs"], sig_id)
                label     = int(subj_data["labels"][trial_idx])

                self._append_row(
                    results={
                        "signal_id":  sig_id,
                        "subject":    subj_data["subject"],
                        "trial":      trial_idx,
                        "channel":    ch_idx,
                        "attended":   label,
                        "status":     "ok",
                        "output_dir": conv["output_dir"],
                        "preview":    conv["preview"],
                    },
                    label={"signal_id": sig_id, "attended": label},
                )
                ok += 1

            except Exception as exc:
                tqdm.write(f"[ERROR] {sig_id}: {exc}")
                self._append_row(
                    results={
                        "signal_id":  sig_id,
                        "subject":    f"S{subj_idx+1}",
                        "trial":      trial_idx,
                        "channel":    ch_idx,
                        "attended":   None,
                        "status":     "failed",
                        "output_dir": None,
                        "preview":    None,
                    },
                    label={"signal_id": sig_id, "attended": None},
                )
                err += 1

        print(f"\n{'✅' if err == 0 else '⚠️ '} Done — "
              f"{ok:,} converted  |  {err:,} errors")
        print(f"   Results : {self.results_file}")
        print(f"   Labels  : {self.labels_file}")

        return pd.read_csv(self.results_file)

    # ── private helpers ──────────────────────────────────────────────────────

    def _append_row(self, results: dict, label: dict) -> None:
        pd.DataFrame([results]).to_csv(
            self.results_file, mode="a", header=False, index=False
        )
        pd.DataFrame([label]).to_csv(
            self.labels_file,  mode="a", header=False, index=False
        )


# ============================================================================
# CONVENIENCE ENTRY POINT
# ============================================================================
def run_conversion(
    data_dir:   str = "EEG_data",
    output_dir: str = "eeg_2d_outputs",
    strategy:   str = "tenth",
) -> pd.DataFrame:
    """
    One-liner entry point:
        results = run_conversion(strategy='tenth')
    """
    return EEGPipeline(data_dir, output_dir, strategy).run()


# ============================================================================
# EXECUTE
# ============================================================================
if __name__ == "__main__" or "get_ipython" in dir():
    results = run_conversion(strategy="tenth")
    try:
        display(results.head(10))   # Jupyter
    except NameError:
        print(results.head(10))

✓ SMM library loaded
✓ Found 18 subject files
Sampling strategy : tenth (10.0%)
→ 7,128 signals selected (396 per subject × 18 subjects)

Starting conversion — results saved incrementally...



Converting:  43%|████▎     | 3069/7128 [4:07:52<5:27:49,  4.85s/it]


KeyboardInterrupt: 

In [None]:
!cp -r /content/eeg_2d_outputs "/content/drive/MyDrive/Colab Notebooks/EEG /Data"

### Data Expolration

In [None]:
!ls -al /content/eeg_2d_outputs/labels.csv

-rw-r--r-- 1 root root 42985 Feb 17 01:19 /content/eeg_2d_outputs/labels.csv


In [None]:
import pandas as pd

labels_path = '/content/eeg_2d_outputs/labels.csv'
if Path(labels_path).exists():
    df_labels = pd.read_csv(labels_path)
    display(df_labels.head())
else:
    print(f'File not found: {labels_path}')

Unnamed: 0,signal_id,attended
0,S10_T02_C17,2
1,S10_T15_C35,2
2,S10_T27_C64,1
3,S10_T10_C60,2
4,S10_T04_C61,2


In [None]:
attended_counts = df_labels['attended'].value_counts()
print('Counts of unique values for attended:')
display(attended_counts)

Counts of unique values for attended:


Unnamed: 0_level_0,count
attended,Unnamed: 1_level_1
1,1536
2,1533
