# EEG Data Pipeline & Characterization

EDA notebook to summarize available structured EEG-related files (CSV) and describe the data pipeline for the assignment. EEG waveforms/audio (EDF/WAV) are currently on OneDrive and not present locally; slides in `data/slides/` provide experiment context/protocols.


## Environment

Create/activate the repo venv and install deps:

```
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
Run this notebook from the repo root so relative paths resolve.


In [36]:
from __future__ import annotations
import json
import os
from pathlib import Path
import pandas as pd

BASE = Path("../data").resolve()
BASE


PosixPath('/Users/ayush/Desktop/Capstone/awaken-ai/data')

In [37]:
# List key CSV assets (excluding slide decks and external EDF/WAV on OneDrive)
paths = [
    "patient_df.csv", "patient_df (1).csv", "patient_df (3).csv",
    "patient_df_043025.csv", "patient_df_052225.csv",
    "patient_history.csv", "patient_history (1).csv",
    "patient_notes.csv", "patient_notes (3).csv",
    "CON008_2025-08-14_stimulus_results.csv",
    "CON009_2025-08-26_stimulus_results.csv",
    "CON010_2025-10-31_stimulus_results.csv",
    "TESTpjs091725_2025-09-17_stimulus_results.csv",
    "stimuli_record/CON008_2025-08-14_stimulus_results.csv",
    "stimuli_record/CON009_2025-08-26_stimulus_results.csv",
    "stimuli_record/TESTpjs091725_2025-09-17_stimulus_results.csv",
    "stimuli_record/old stimulus software/CON001a_20240917.csv",
    "stimuli_record/old stimulus software/CON001b_20240917.csv",
    "stimuli_record/old stimulus software/CON002_20240924.csv",
    "stimuli_record/old stimulus software/CON003_20250114.csv",
    "stimuli_record/old stimulus software/patient_df_043025.csv",
]

files = [BASE / p for p in paths if (BASE / p).exists()]
len(files), files[:3]


(21,
 [PosixPath('/Users/ayush/Desktop/Capstone/awaken-ai/data/patient_df.csv'),
  PosixPath('/Users/ayush/Desktop/Capstone/awaken-ai/data/patient_df (1).csv'),
  PosixPath('/Users/ayush/Desktop/Capstone/awaken-ai/data/patient_df (3).csv')])

In [38]:
def summarize_csv(path: Path) -> dict:
    df = pd.read_csv(path)
    return {
        "file": path.relative_to(BASE).as_posix(),
        "rows": len(df),
        "cols": df.shape[1],
        "columns": list(df.columns),
    }

summaries = pd.DataFrame([summarize_csv(p) for p in files]).sort_values("file")
summaries


Unnamed: 0,file,rows,cols,columns
9,CON008_2025-08-14_stimulus_results.csv,82,7,"[patient_id, date, trial_type, sentences, star..."
10,CON009_2025-08-26_stimulus_results.csv,182,7,"[patient_id, date, trial_type, sentences, star..."
11,CON010_2025-10-31_stimulus_results.csv,24,8,"[patient_id, date, trial_type, sentences, star..."
12,TESTpjs091725_2025-09-17_stimulus_results.csv,182,7,"[patient_id, date, trial_type, sentences, star..."
1,patient_df (1).csv,672,8,"[Unnamed: 0, patient_id, date, trial_type, sen..."
2,patient_df (3).csv,420,8,"[Unnamed: 0, patient_id, date, trial_type, sen..."
0,patient_df.csv,588,8,"[Unnamed: 0, patient_id, date, trial_type, sen..."
3,patient_df_043025.csv,1120,11,"[Unnamed: 0, patient_id, date, trial_type, sen..."
4,patient_df_052225.csv,8,7,"[patient_id, date, trial_type, sentences, star..."
6,patient_history (1).csv,3,2,"[patient_id, date]"


In [39]:
# Trial-level characterization across patient logs
trial_logs = [p for p in files if "patient_df" in p.name or "stimulus_results" in p.name]

frames = []
for p in trial_logs:
    df = pd.read_csv(p)
    # Standardize columns that may be missing
    for col in ["trial_type", "patient_id", "duration", "date"]:
        if col not in df.columns:
            df[col] = pd.NA
    df["source_file"] = p.relative_to(BASE).as_posix()
    frames.append(df)
all_trials = pd.concat(frames, ignore_index=True)

trial_type_counts = (
    all_trials
    .groupby(["source_file", "trial_type"], dropna=False)
    .size()
    .reset_index(name="count")
    .sort_values(["source_file", "count"], ascending=[True, False])
)

trial_type_counts.head(20)


Unnamed: 0,source_file,trial_type,count
0,CON008_2025-08-14_stimulus_results.csv,language,72
2,CON008_2025-08-14_stimulus_results.csv,oddball+p,4
1,CON008_2025-08-14_stimulus_results.csv,left_command+p,3
3,CON008_2025-08-14_stimulus_results.csv,right_command+p,3
5,CON009_2025-08-26_stimulus_results.csv,language,72
4,CON009_2025-08-26_stimulus_results.csv,control,50
7,CON009_2025-08-26_stimulus_results.csv,loved_one_voice,50
8,CON009_2025-08-26_stimulus_results.csv,oddball+p,4
6,CON009_2025-08-26_stimulus_results.csv,left_command+p,3
9,CON009_2025-08-26_stimulus_results.csv,right_command+p,3


In [40]:
# Duration stats by trial_type (across combined logs)
if "duration" in all_trials:
    all_trials["duration_sec"] = pd.to_numeric(all_trials["duration"], errors="coerce")
    duration_stats = (
        all_trials
        .groupby("trial_type")
        ["duration_sec"]
        .agg(["count", "mean", "median", "min", "max"])
        .reset_index()
        .sort_values("count", ascending=False)
    )
    duration_stats


## Schema reconciliation & missingness


In [41]:
# Compare column sets across files
superset_cols = sorted({c for p in files for c in pd.read_csv(p, nrows=0).columns})

schema_records = []
for p in files:
    cols = list(pd.read_csv(p, nrows=0).columns)
    missing_vs_superset = [c for c in superset_cols if c not in cols]
    schema_records.append({
        "file": p.relative_to(BASE).as_posix(),
        "cols": len(cols),
        "missing_cols": missing_vs_superset,
    })
schema_df = pd.DataFrame(schema_records).sort_values("file")
schema_df


Unnamed: 0,file,cols,missing_cols
9,CON008_2025-08-14_stimulus_results.csv,7,"[Unnamed: 0, notes, paradigm, stimulus_details..."
10,CON009_2025-08-26_stimulus_results.csv,7,"[Unnamed: 0, notes, paradigm, stimulus_details..."
11,CON010_2025-10-31_stimulus_results.csv,8,"[Unnamed: 0, paradigm, stimulus_details, trial..."
12,TESTpjs091725_2025-09-17_stimulus_results.csv,7,"[Unnamed: 0, notes, paradigm, stimulus_details..."
1,patient_df (1).csv,8,"[notes, paradigm, stimulus_details, trial_index]"
2,patient_df (3).csv,8,"[notes, paradigm, stimulus_details, trial_index]"
0,patient_df.csv,8,"[notes, paradigm, stimulus_details, trial_index]"
3,patient_df_043025.csv,11,[notes]
4,patient_df_052225.csv,7,"[Unnamed: 0, notes, paradigm, stimulus_details..."
6,patient_history (1).csv,2,"[Unnamed: 0, duration, end_time, notes, paradi..."


In [42]:
# Missingness per file (fraction of nulls per column)
miss_tables = []
for p in files:
    df = pd.read_csv(p)
    miss = df.isna().mean().reset_index()
    miss.columns = ["column", "missing_frac"]
    miss["file"] = p.relative_to(BASE).as_posix()
    miss_tables.append(miss)
missingness = pd.concat(miss_tables, ignore_index=True)
missingness.head(20)


Unnamed: 0,column,missing_frac,file
0,Unnamed: 0,1.0,patient_df.csv
1,patient_id,0.0,patient_df.csv
2,date,0.0,patient_df.csv
3,trial_type,0.0,patient_df.csv
4,sentences,0.0,patient_df.csv
5,start_time,0.0,patient_df.csv
6,end_time,0.0,patient_df.csv
7,duration,0.0,patient_df.csv
8,Unnamed: 0,1.0,patient_df (1).csv
9,patient_id,0.0,patient_df (1).csv


## Participant/session coverage


In [43]:
# Counts by patient and by patient-date
counts_patient = (
    all_trials.groupby("patient_id").size().reset_index(name="rows").sort_values("rows", ascending=False)
)
counts_session = (
    all_trials.groupby(["patient_id", "date"]).size().reset_index(name="rows").sort_values("rows", ascending=False)
)
counts_patient.head(10), counts_session.head(10)


(       patient_id  rows
 6          CON006   448
 13       test_new   448
 0         CON001a   420
 1         CON001b   420
 2          CON002   420
 3          CON003   420
 12          khanh   420
 8          CON009   364
 10  TESTpjs091725   364
 4          CON004   336,
        patient_id        date  rows
 7          CON006  2025-04-30   448
 14       test_new  2025-04-29   448
 0         CON001a  2024-09-17   420
 1         CON001b  2024-09-17   420
 2          CON002  2024-09-24   420
 3          CON003  2025-01-14   420
 13          khanh  2024-08-29   420
 9          CON009  2025-08-26   364
 11  TESTpjs091725  2025-09-17   364
 4          CON004  2025-01-30   336)

## Trial-type & duration by patient


In [44]:
# Trial-type counts by patient
trial_counts_patient = (
    all_trials.groupby(["patient_id", "trial_type"], dropna=False).size()
    .reset_index(name="count")
    .sort_values(["patient_id", "count"], ascending=[True, False])
)

# Duration stats by patient and trial_type
if "duration" in all_trials:
    all_trials["duration_sec"] = pd.to_numeric(all_trials["duration"], errors="coerce")
    duration_by_patient = (
        all_trials.groupby(["patient_id", "trial_type"])["duration_sec"]
        .agg(["count", "mean", "median", "min", "max"])
        .reset_index()
        .sort_values("count", ascending=False)
    )
    duration_by_patient.head(20), trial_counts_patient.head(20)


## Assignment answers (data pipeline & characterization)

**Data streams in use**
- Structured trial/event logs (CSV): `patient_df*` and `*_stimulus_results` variants capture trial_type, sentence lists, start/end, durations; older logs under `data/stimuli_record/old stimulus software/` keep same structure.
- Patient metadata (CSV): `patient_history*`, `patient_notes*` with sparse visit dates/notes.
- Contextual protocol slides (PowerPoint/PDF) describing acquisition and paradigms: Claassen2019 pipeline (EEG acquisition → PSD → SVM CMD detection) and Sokoliuk2021 language tracking (72 trials of 4-word sentences, ITPC analyses). See [Claassen2019 Implementation](file:///Users/ayush/Desktop/Capstone/awaken-ai/data/slides/Claassen2019%20Implementation.pdf) and [Sokoliuk2021 language processing](file:///Users/ayush/Desktop/Capstone/awaken-ai/data/slides/Implementation%20of%20Sokoliuk2021_languageprocessingTBI.pdf).
- Raw EEG (EDF) and audio (WAV) are **not** in the repo; they remain on OneDrive.

**How many data points?**
- File sizes (rows): see `summaries` output; biggest is `patient_df_043025.csv` (1,120 rows), others range 5–672 rows; stimulus_results files: 24–182 rows; older logs: 84 rows each.
- Per-patient counts (`counts_patient`): top subjects have hundreds of trials; per-session counts (`counts_session`) show repeat dates (e.g., multiple paradigms same day).
- Trial-type counts: language dominates (e.g., 72 trials per stimulus_results session); controls/loved_one_voice ~50 each; commands ~3 per side; oddball/beep ~4 per session; legacy `patient_df` files label “lang/rcmd/lcmd/beep/rcmd” similarly.
- Durations: commands ~200s blocks; language ~15–16s; oddball/beep ~30–34s; summarized in `duration_stats` and `duration_by_patient`.

**Where does the data reside?**
- CSVs and slides live under `data/` in the repo.
- EDF/WAV reside externally on OneDrive and must be synced before signal processing.

**Software used to access data**
- Python (pandas) via this Jupyter notebook; environment pinned in `requirements.txt` and run inside `.venv`.
- Slides opened with PowerPoint/Keynote; future signal work will use MNE/NumPy/SciPy per slide pipelines (PSD, ITPC, SVM).

**If creating/augmenting data**
- Planned products: cleaned/epoched EEG aligned to trial logs; PSD features (Claassen-style CMD pipeline); ITPC features for language tracking (Sokoliuk2021); classifier-ready tables with labels and outcomes.
- Formats: HDF5/Parquet for tabular features; EDF-derived arrays saved as NumPy; plots as PNG/HTML; processed tables under `data/processed/` (to be created) with fields: patient_id, date, trial_type, stimulus, start/end, duration, QC flags.
- Generation location/storage: run locally/institutional compute; write outputs back into repo’s `data/processed/` (or cloud bucket) with readme + schema versioning.

**Weak points / risks**
- Missing raw EDF/WAV locally; cannot validate timing or artifacts yet.
- Schema drift: multiple `patient_df*` versions with different columns (`trial_index`, `paradigm`, `stimulus_details` only in some); need harmonization.
- Small N per subject and unbalanced trial_types (few command trials vs many language); some sessions tiny (24 rows).
- Sparse metadata: clinical/demographic fields absent; notes minimal.
- Potential time-sync drift between logs and raw signals; duplicate/overlapping sessions possible; missingness (see `missingness` table) in optional columns.

**Next actions**
- Pull EDF/WAV from OneDrive; standardize schema to a superset; reconcile duplicate `patient_df*` versions; align events with signals; add QC flags; reproduce PSD/SVM (Claassen2019) and ITPC (Sokoliuk2021) analyses once signals are available.

**Preliminary Results slides**
- Use to cross-check early outcomes/QC expectations (e.g., trial completeness, PSD/ITPC targets) alongside the above pipelines; helps define success metrics and gaps before model training.


## Data dictionary (proposed canonical schema)

| column            | meaning / examples                                   |
|-------------------|-------------------------------------------------------|
| patient_id        | Subject ID (e.g., CON008, CON010, TESTpjs091725)     |
| date              | Session date (YYYY-MM-DD)                            |
| trial_type        | Paradigm label (language, control, loved_one_voice, left_command+p/right_command+p, oddball+p, beep, lang/rcmd/lcmd legacy) |
| sentences         | Stimulus sequence indices per trial (list-like string) |
| start_time        | Trial start (epoch seconds)                           |
| end_time          | Trial end (epoch seconds)                             |
| duration          | Trial duration (seconds)                              |
| trial_index       | Optional integer index within session                |
| paradigm          | Optional paradigm grouping (if provided)             |
| stimulus_details  | Optional detailed stimulus description               |
| notes             | Optional free-text notes                              |
| source_file       | File of origin (for traceability)                     |

Notes: some delivered files omit optional columns; harmonization should add them with nulls where absent.


In [45]:
# Harmonize all logs to a canonical schema (in-memory only)
canonical_cols = [
    "patient_id",
    "date",
    "trial_type",
    "sentences",
    "start_time",
    "end_time",
    "duration",
    "trial_index",
    "paradigm",
    "stimulus_details",
    "notes",
]

harmonized_frames = []
for p in files:
    df = pd.read_csv(p)
    # ensure required columns exist
    for col in canonical_cols:
        if col not in df.columns:
            df[col] = pd.NA
    df = df[canonical_cols].copy()
    df["source_file"] = p.relative_to(BASE).as_posix()
    harmonized_frames.append(df)

harmonized = pd.concat(harmonized_frames, ignore_index=True)
canonical_preview = harmonized.head()
unique_trial_types = harmonized["trial_type"].dropna().unique()
canonical_preview, unique_trial_types


  harmonized = pd.concat(harmonized_frames, ignore_index=True)


(  patient_id        date trial_type  \
 0      khanh  2024-08-29       rcmd   
 1      khanh  2024-08-29       lang   
 2      khanh  2024-08-29       lang   
 3      khanh  2024-08-29       lang   
 4      khanh  2024-08-29       lang   
 
                                        sentences    start_time      end_time  \
 0                                             []  1.724958e+09  1.724958e+09   
 1      [2, 0, 17, 23, 21, 8, 6, 29, 7, 26, 1, 3]  1.724958e+09  1.724958e+09   
 2    [29, 32, 19, 0, 30, 13, 8, 5, 33, 2, 17, 7]  1.724958e+09  1.724958e+09   
 3   [16, 30, 2, 0, 19, 32, 14, 1, 23, 33, 27, 5]  1.724958e+09  1.724958e+09   
 4  [20, 30, 3, 26, 22, 24, 7, 29, 25, 18, 2, 27]  1.724958e+09  1.724958e+09   
 
      duration  trial_index paradigm stimulus_details notes     source_file  
 0  208.290775          NaN      NaN              NaN   NaN  patient_df.csv  
 1   15.601367          NaN      NaN              NaN   NaN  patient_df.csv  
 2   15.579206          NaN      NaN

In [46]:
# QC: basic checks
issues = {}

# durations missing or non-positive
if "duration" in harmonized:
    durations = pd.to_numeric(harmonized["duration"], errors="coerce")
    issues["missing_duration_rows"] = int(durations.isna().sum())
    issues["nonpositive_duration_rows"] = int((durations <= 0).sum())
    issues["duration_quantiles"] = durations.quantile([0, 0.25, 0.5, 0.75, 0.9, 0.99, 1]).to_dict()

# unexpected trial types (relative to observed set)
observed_types = set(harmonized["trial_type"].dropna().unique())
# define a canonical expected set based on current data
expected_types = {
    "language", "lang",
    "control",
    "loved_one_voice",
    "left_command+p", "right_command+p", "rcmd", "lcmd",
    "oddball+p", "beep",
}
issues["unexpected_trial_types"] = sorted(observed_types - expected_types)
issues


{'missing_duration_rows': 16,
 'nonpositive_duration_rows': 0,
 'duration_quantiles': {0.0: 9.5367431640625e-07,
  0.25: 15.437801718711853,
  0.5: 15.55670404217102,
  0.75: 15.57464152692917,
  0.9: 22.205912113189697,
  0.99: 208.340878,
  1.0: 213.1348922252655},
 'unexpected_trial_types': ['lang_0',
  'lang_1',
  'lang_10',
  'lang_11',
  'lang_12',
  'lang_13',
  'lang_14',
  'lang_15',
  'lang_16',
  'lang_17',
  'lang_18',
  'lang_19',
  'lang_2',
  'lang_20',
  'lang_21',
  'lang_22',
  'lang_23',
  'lang_24',
  'lang_25',
  'lang_26',
  'lang_27',
  'lang_28',
  'lang_29',
  'lang_3',
  'lang_30',
  'lang_31',
  'lang_32',
  'lang_33',
  'lang_34',
  'lang_35',
  'lang_36',
  'lang_37',
  'lang_38',
  'lang_39',
  'lang_4',
  'lang_40',
  'lang_41',
  'lang_42',
  'lang_43',
  'lang_44',
  'lang_45',
  'lang_46',
  'lang_47',
  'lang_48',
  'lang_49',
  'lang_5',
  'lang_50',
  'lang_51',
  'lang_52',
  'lang_53',
  'lang_54',
  'lang_55',
  'lang_56',
  'lang_57',
  'lang_58

In [47]:
# Class balance overview on harmonized data
trial_balance = (
    harmonized.groupby(["trial_type"]).size().reset_index(name="count")
    .sort_values("count", ascending=False)
)
trial_balance


Unnamed: 0,trial_type,count
2,lang,2880
75,language,432
1,control,411
0,beep,264
79,loved_one_voice,210
...,...,...
36,lang_39,4
35,lang_38,4
34,lang_37,4
33,lang_36,4


## How to interpret the notebook outputs

- **Canonical preview & trial types**: The harmonized preview shows example rows after forcing all files into the canonical schema (missing optional columns are null). The `unique_trial_types` list reveals all labels present; use it to spot typos or legacy variants to standardize.
- **QC issues dict**: Reports counts of missing or non-positive durations and duration quantiles (0–100%). Also flags any unexpected `trial_type` labels relative to the current expected set; standardize or map these before modeling.
- **Class balance table**: Summarizes counts per `trial_type` across all logs. Use this to understand class imbalance (notably few command trials vs many language trials) and to guide sampling/weighting in downstream models.
- **Processed layout plan**: Outlines where to store harmonized events, signal epochs, features, and QC plots once raw EDF/WAV are synced, with a README/manifest for reproducibility.


## Processed data layout (plan)

- `processed/events/`: harmonized event logs with canonical schema, one file per session plus a manifest (versioned).
- `processed/signals/`: EDF-derived epochs (e.g., NumPy/HDF5) aligned to events; include channel lists and sampling rates.
- `processed/features/`: PSD/ITPC and other features, Parquet/HDF5 with patient_id, date, trial_type, stimulus, timings, QC flags.
- `processed/plots/`: QC plots (durations, artifacts, class balance) and feature visualizations (PNG/HTML).
- `processed/README.md`: schema dictionary, version history, and checksum/row-count manifest for reproducibility.
