# TinyML Emergency Intent Detection — Data Collection & Processing Pipeline

**Version**: 1.0 (Frozen for reproducibility)
**Target**: On-device emergency intent classification from raw audio
**Runtime**: Kaggle Notebook (all data from `/kaggle/input/`); also supports local workspace paths

---

## Pipeline Overview

| Stage | Purpose |
|-------|---------|
| **1. Configuration** | Fix every hyperparameter, path, and label mapping once |
| **2. Dataset Adapters** | Per-source logic that emits `(file_path, intent_label)` tuples |
| **3. Audio Canonicalization** | Load → mono → 16 kHz → DC-offset removal → RMS normalisation |
| **4. Sliding-Window Segmentation** | 2.0 s windows, 1.0 s hop, per-window silence rejection |
| **5. Safety-Critical Filtering** | Clip detection, energy outliers, ambiguity rejection |
| **6. Unified Assembly** | Deterministic shuffle, class-balanced sampling, manifest CSV |
| **7. Verification** | Automated integrity assertions + audit trail |

### Canonical Label Schema

| int | label | meaning |
|-----|-------|---------|
| 0 | `non_emergency` | Calm / neutral / everyday speech |
| 1 | `general_distress` | Fear, anger, panic — acoustic distress |
| 2 | `threat_keyword` | Explicit threat/help phrases (verbal) |
| 3 | `emergency_call` | Real 911 / emergency-call audio |

### Audio Contract

| Parameter | Value |
|-----------|-------|
| Sample rate | 16 000 Hz |
| Channels | 1 (mono) |
| Window length | 2.0 s (32 000 samples) |
| Hop length | 1.0 s (16 000 samples) |
| Dtype | `float32` (normalised to ±1.0) |

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 1 · Imports & Global Configuration
# ──────────────────────────────────────────────────────────────
from __future__ import annotations

import hashlib, json, math, os, platform, sys, warnings
from dataclasses import dataclass, field, asdict
from enum import IntEnum
from pathlib import Path
from typing import Dict, Generator, List, Optional, Tuple

import librosa
import numpy as np
import pandas as pd
import soundfile as sf

warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

# ── Deterministic seed (used for all stochastic operations) ──
GLOBAL_SEED: int = 42
_rng = np.random.default_rng(GLOBAL_SEED)

print(f"Python  : {sys.version}")
print(f"NumPy   : {np.__version__}")
print(f"librosa : {librosa.__version__}")
print(f"pandas  : {pd.__version__}")
print(f"Platform: {platform.platform()}")
print(f"Seed    : {GLOBAL_SEED}")

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 1 (cont.) · Canonical Labels & Audio Contract
# ──────────────────────────────────────────────────────────────

class Intent(IntEnum):
    """Canonical intent labels — the ONLY labels the model will ever see."""
    NON_EMERGENCY    = 0   # calm / neutral / everyday speech
    GENERAL_DISTRESS = 1   # fear, anger, panic — acoustic distress
    THREAT_KEYWORD   = 2   # explicit threat / help phrases (verbal)
    EMERGENCY_CALL   = 3   # real 911 / emergency-call audio

INTENT_NAMES: Dict[int, str] = {i.value: i.name.lower() for i in Intent}


@dataclass(frozen=True)
class AudioContract:
    """Immutable audio-processing constants — set once, used everywhere."""
    sr: int            = 16_000          # sample rate (Hz)
    window_sec: float  = 2.0             # context window (seconds)
    hop_sec: float     = 1.0             # hop between windows (seconds)
    rms_floor: float   = 0.005           # silence gate (RMS below → discard)
    rms_ceiling: float = 0.90            # clipping gate (peak above → discard)
    target_rms: float  = 0.1             # target RMS after normalisation
    min_src_dur: float = 0.5             # shortest source file accepted (sec)
    max_src_dur: float = 600.0           # longest source file accepted (sec)

    # ── derived (computed once) ────────────────────────────────
    @property
    def window_samples(self) -> int:
        return int(self.sr * self.window_sec)           # 32 000

    @property
    def hop_samples(self) -> int:
        return int(self.sr * self.hop_sec)               # 16 000


AC = AudioContract()
print(f"Window : {AC.window_sec}s = {AC.window_samples} samples")
print(f"Hop    : {AC.hop_sec}s = {AC.hop_samples} samples")
print(f"Labels : {INTENT_NAMES}")

## Stage 2 · Dataset Path Registry & Adapter Declarations

Every dataset is registered here **once**. The adapter knows:
- where files live (Kaggle path + local fallback)
- what audio extensions to scan
- which canonical `Intent` label to assign and how

| Dataset | Source | Files | Default Intent |
|---------|--------|-------|----------------|
| 911 Recordings | 6-second 911 call clips with CSV metadata | ~707 | `EMERGENCY_CALL` |
| RAVDESS | Acted emotional speech (8 emotions × 24 actors) | ~1 440 | mapped per emotion code |
| Threat-keyword | Spoken distress phrases (7 categories, EN + HI) | ~360 | `THREAT_KEYWORD` |
| Custom SER | Emergency-call emotional speech (18 speakers) | ~316 | `GENERAL_DISTRESS` |

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 2 · Dataset Path Registry
# ──────────────────────────────────────────────────────────────
# Detect runtime: Kaggle vs local development
IS_KAGGLE: bool = Path("/kaggle/input").exists()

@dataclass(frozen=True)
class DatasetEntry:
    """Registry entry for one dataset source."""
    name: str                          # short identifier
    kaggle_root: str                   # absolute path on Kaggle
    local_root: str                    # relative path for local dev
    audio_exts: Tuple[str, ...] = (".wav",)

    @property
    def root(self) -> Path:
        """Return whichever root exists in the current environment."""
        if IS_KAGGLE:
            p = Path(self.kaggle_root)
        else:
            p = Path(__file__).resolve().parent / self.local_root if "__file__" in dir() else Path(self.local_root)
        return p

# ── Registry ─────────────────────────────────────────────────
DS_911 = DatasetEntry(
    name="911_calls",
    kaggle_root="/kaggle/input/911-calls-dataset-eda/911_first6sec",
    local_root="../datasets/911_recordings/911_first6sec",
)

DS_RAVDESS = DatasetEntry(
    name="ravdess",
    kaggle_root="/kaggle/input/ravdess-emotional-speech-audio/audio_speech_actors_01-24",
    # NOTE: We use ONLY the nested copy to avoid duplicates
    local_root="../datasets/ravdess_emotional_speech_audio/audio_speech_actors_01-24",
)

DS_THREAT = DatasetEntry(
    name="threat_keyword",
    kaggle_root="/kaggle/input/threat-detection-audio-dataset/Audio_Dataset3",
    local_root="../datasets/threat detection audio dataset/Audio_Dataset3",
)

DS_CUSTOM_SER = DatasetEntry(
    name="custom_ser",
    kaggle_root="/kaggle/input/speech-emotion-recognition-emergency/CUSTOM_DATASET",
    local_root="../datasets/speech emotional recognition for emergency calls/CUSTOM_DATASET",
)

ALL_DATASETS: List[DatasetEntry] = [DS_911, DS_RAVDESS, DS_THREAT, DS_CUSTOM_SER]

print(f"Runtime       : {'Kaggle' if IS_KAGGLE else 'Local'}")
for ds in ALL_DATASETS:
    exists = ds.root.exists()
    print(f"  {ds.name:20s} → {ds.root}  {'✓' if exists else '✗ NOT FOUND'}")

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 2 (cont.) · Dataset Adapter Functions
# ──────────────────────────────────────────────────────────────
# Each adapter yields (file_path: Path, intent: Intent) tuples.
# Adapters encapsulate ALL dataset-specific logic — label parsing,
# filename conventions, metadata lookups, and ambiguity rejection.
# ──────────────────────────────────────────────────────────────

def _scan_audio(root: Path, exts: Tuple[str, ...] = (".wav",)) -> List[Path]:
    """Recursively collect audio files under *root*, sorted for determinism."""
    files = []
    for ext in exts:
        files.extend(root.rglob(f"*{ext}"))
    return sorted(files)


# ── 911 Calls Adapter ────────────────────────────────────────
def adapt_911(ds: DatasetEntry) -> Generator[Tuple[Path, Intent], None, None]:
    """
    911-call clips (≈6 s each).
    Metadata CSV contains a `false_alarm` column — we use it to reject
    prank / non-emergency 911 recordings.  Remaining clips → EMERGENCY_CALL.

    Conservative rule:
        • false_alarm == 1  → SKIP  (not a real emergency)
        • false_alarm == 0  → EMERGENCY_CALL
    """
    csv_path = ds.root / "911_metadata.csv"
    if csv_path.exists():
        meta = pd.read_csv(csv_path)
        # Build filename → false_alarm lookup
        fa_lookup: Dict[str, float] = {}
        for _, row in meta.iterrows():
            fname = str(row.get("filename", ""))
            # filename column has relative path like '911_first6sec/call_2_0.wav'
            basename = Path(fname).name
            fa_lookup[basename] = float(row.get("false_alarm", 0))
    else:
        fa_lookup = {}

    for fp in _scan_audio(ds.root, ds.audio_exts):
        basename = fp.name
        # Reject known false-alarm / prank calls
        if fa_lookup.get(basename, 0) == 1.0:
            continue
        yield fp, Intent.EMERGENCY_CALL


# ── RAVDESS Adapter ──────────────────────────────────────────
# Filename schema: MM-VC-EM-IN-ST-RE-AC.wav
#   MM = modality (03 = audio-only)
#   VC = vocal channel (01 = speech, 02 = song)
#   EM = emotion  (01-08)
#   IN = intensity (01 = normal, 02 = strong)
#   ST = statement (01 / 02)
#   RE = repetition (01 / 02)
#   AC = actor (01-24, odd = male, even = female)
#
# Emotion mapping (conservative):
RAVDESS_EMOTION_MAP: Dict[str, Optional[Intent]] = {
    "01": Intent.NON_EMERGENCY,       # neutral
    "02": Intent.NON_EMERGENCY,       # calm
    "03": Intent.NON_EMERGENCY,       # happy
    "04": None,                        # sad — ambiguous → DROP
    "05": Intent.GENERAL_DISTRESS,    # angry
    "06": Intent.GENERAL_DISTRESS,    # fearful
    "07": None,                        # disgust — ambiguous → DROP
    "08": None,                        # surprised — ambiguous → DROP
}

def adapt_ravdess(ds: DatasetEntry) -> Generator[Tuple[Path, Intent], None, None]:
    """
    RAVDESS acted emotional speech.
    We use ONLY speech modality (vocal channel 01) and audio-only files.
    Ambiguous emotions (sad, disgust, surprised) are DROPPED.
    """
    for fp in _scan_audio(ds.root, ds.audio_exts):
        parts = fp.stem.split("-")
        if len(parts) != 7:
            continue                    # malformed filename → skip

        modality, vocal_ch, emotion = parts[0], parts[1], parts[2]

        # Accept only audio-only (03) speech (01) files
        if modality != "03" or vocal_ch != "01":
            continue

        intent = RAVDESS_EMOTION_MAP.get(emotion)
        if intent is None:
            continue                    # ambiguous emotion → drop

        yield fp, intent


# ── Threat-Keyword Adapter ───────────────────────────────────
# Folder names encode the spoken phrase.  ALL folders represent
# explicit distress / threat keywords → THREAT_KEYWORD.
THREAT_FOLDER_INTENTS: Dict[str, Intent] = {
    "call police":         Intent.THREAT_KEYWORD,
    "help me":             Intent.THREAT_KEYWORD,
    "i need help":         Intent.THREAT_KEYWORD,
    "madat karo":          Intent.THREAT_KEYWORD,   # Hindi: "help me"
    "mujhe_bachao":        Intent.THREAT_KEYWORD,   # Hindi: "save me"
    "palice call martini": Intent.THREAT_KEYWORD,   # Hindi: "call the police"
    "Sendhelp":            Intent.THREAT_KEYWORD,
}

def adapt_threat(ds: DatasetEntry) -> Generator[Tuple[Path, Intent], None, None]:
    """
    Threat-keyword dataset — spoken distress phrases.
    Label is determined by the parent folder name.
    Files whose parent folder is unknown are DROPPED.
    """
    for fp in _scan_audio(ds.root, ds.audio_exts):
        folder = fp.parent.name
        intent = THREAT_FOLDER_INTENTS.get(folder)
        if intent is None:
            continue                    # unknown folder → skip
        yield fp, intent


# ── Custom SER Adapter ───────────────────────────────────────
# Filename schema: EE_SS_CC_DD_RR.wav (emotion_speaker_…)
# No external documentation — we label conservatively as GENERAL_DISTRESS
# because the dataset is specifically curated for "emergency call" emotion.
def adapt_custom_ser(ds: DatasetEntry) -> Generator[Tuple[Path, Intent], None, None]:
    """
    Custom emergency-call SER dataset (18 speakers).
    All files are emotional speech in an emergency-call context.
    Conservative label: GENERAL_DISTRESS (acoustic emotional distress).
    """
    for fp in _scan_audio(ds.root, ds.audio_exts):
        yield fp, Intent.GENERAL_DISTRESS


# ── Adapter dispatch table ───────────────────────────────────
ADAPTERS: Dict[str, callable] = {
    "911_calls":       adapt_911,
    "ravdess":         adapt_ravdess,
    "threat_keyword":  adapt_threat,
    "custom_ser":      adapt_custom_ser,
}

print(f"Adapters registered: {list(ADAPTERS.keys())}")

## Stage 3 · Audio Canonicalization

Each source file goes through **one deterministic path**:

1. `librosa.load` → mono, 16 kHz, float32
2. DC-offset removal (`y -= mean`)
3. Duration gate (reject files < 0.5 s or > 600 s)
4. RMS normalisation to a fixed target (0.1 RMS)
5. Peak-limiting (hard clip at ±1.0 with headroom)

The result is a numpy array ready for windowing.

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 3 · Audio Canonicalization Functions
# ──────────────────────────────────────────────────────────────

def load_canonical(path: Path, ac: AudioContract = AC) -> Optional[np.ndarray]:
    """
    Load an audio file and return a canonicalised float32 numpy array,
    or *None* if the file should be rejected.

    Steps:
        1. librosa.load → mono, target SR, float32
        2. DC-offset removal
        3. Duration gate
        4. RMS normalisation → target_rms
        5. Peak-limit to ±1.0
    """
    try:
        y, _ = librosa.load(path, sr=ac.sr, mono=True)
    except Exception:
        return None                     # corrupt / unreadable

    # ── duration gate ─────────────────────────────────────────
    dur = len(y) / ac.sr
    if dur < ac.min_src_dur or dur > ac.max_src_dur:
        return None

    # ── DC-offset removal ─────────────────────────────────────
    y = y - np.mean(y)

    # ── RMS check — reject near-silence ───────────────────────
    rms = float(np.sqrt(np.mean(y ** 2)))
    if rms < ac.rms_floor:
        return None

    # ── RMS normalisation ─────────────────────────────────────
    y = y * (ac.target_rms / rms)

    # ── peak-limit (hard clip with headroom) ──────────────────
    peak = float(np.max(np.abs(y)))
    if peak > 1.0:
        y = y / peak                    # bring back into [-1, 1]

    return y.astype(np.float32)


print("✓ load_canonical() ready")

## Stage 4 · Sliding-Window Segmentation

Each canonicalised audio array is cut into **2.0 s windows** with a **1.0 s hop**.

Per-window quality gates applied:
- **Silence gate**: window RMS < `rms_floor` → discard
- **Clipping gate**: window peak > `rms_ceiling` → discard
- **Zero-crossing rate outlier**: optional, catches digital artefacts

Short files (< 2.0 s) are **zero-padded on the right** and accepted only
if the non-silent portion ≥ 50 % of the window.

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 4 · Sliding-Window Segmentation
# ──────────────────────────────────────────────────────────────

@dataclass
class WindowMeta:
    """Metadata for one extracted audio window."""
    window_id: str              # globally unique identifier
    dataset: str                # source dataset name
    source_file: str            # original filename (basename only)
    intent: int                 # canonical intent label (int)
    intent_name: str            # human-readable label
    window_idx: int             # window index within source file
    rms: float                  # per-window RMS
    peak: float                 # per-window peak absolute value
    sha256: str                 # SHA-256 of the float32 bytes


def _sha256(arr: np.ndarray) -> str:
    """Deterministic hash of a numpy array's raw bytes."""
    return hashlib.sha256(arr.tobytes()).hexdigest()[:16]


def window_generator(
    y: np.ndarray,
    intent: Intent,
    dataset_name: str,
    source_file: str,
    file_idx: int,
    ac: AudioContract = AC,
) -> Generator[Tuple[np.ndarray, WindowMeta], None, None]:
    """
    Yield (window_array, WindowMeta) tuples from a canonicalised audio array.

    Rules:
        • Full windows only when len(y) >= window_samples.
        • If len(y) < window_samples, zero-pad RIGHT and accept only
          if the non-silent portion >= 50 % of window.
        • Per-window silence & clipping gates applied.
    """
    W = ac.window_samples
    H = ac.hop_samples

    # ── Handle short files (< 1 window) → zero-pad ───────────
    if len(y) < W:
        non_silent_frac = len(y) / W
        if non_silent_frac < 0.50:
            return                      # too short even after padding
        padded = np.zeros(W, dtype=np.float32)
        padded[: len(y)] = y
        y = padded
        # Only yield this one padded window
        positions = [0]
    else:
        positions = list(range(0, len(y) - W + 1, H))

    for win_idx, start in enumerate(positions):
        w = y[start : start + W]

        # ── per-window quality gates ──────────────────────────
        rms = float(np.sqrt(np.mean(w ** 2)))
        peak = float(np.max(np.abs(w)))

        if rms < ac.rms_floor:
            continue                    # silence → skip
        if peak > ac.rms_ceiling:
            continue                    # clipping → skip

        wid = f"{dataset_name}_f{file_idx:05d}_w{win_idx:03d}"
        meta = WindowMeta(
            window_id=wid,
            dataset=dataset_name,
            source_file=source_file,
            intent=int(intent),
            intent_name=INTENT_NAMES[int(intent)],
            window_idx=win_idx,
            rms=round(rms, 6),
            peak=round(peak, 6),
            sha256=_sha256(w),
        )
        yield w, meta


print("✓ window_generator() ready")

## Stage 5 · Full Pipeline Execution

For each registered dataset:
1. Run its adapter to get `(path, intent)` pairs
2. Load & canonicalise each file via `load_canonical()`
3. Segment into 2.0 s windows via `window_generator()`
4. Collect all windows + metadata into a single list

Result: a flat list of `(np.ndarray, WindowMeta)` pairs ready for assembly.

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 5 · Run All Adapters → Canonicalise → Window
# ──────────────────────────────────────────────────────────────

all_windows: List[np.ndarray] = []
all_meta:    List[WindowMeta] = []

# Per-dataset counters for audit
audit: Dict[str, Dict[str, int]] = {}

for ds in ALL_DATASETS:
    adapter_fn = ADAPTERS.get(ds.name)
    if adapter_fn is None:
        print(f"⚠ No adapter for {ds.name}, skipping")
        continue
    if not ds.root.exists():
        print(f"⚠ Path not found for {ds.name}: {ds.root}")
        continue

    ds_stats = {"files_scanned": 0, "files_loaded": 0,
                "files_rejected": 0, "windows_emitted": 0}

    print(f"\n{'─'*60}")
    print(f"Processing: {ds.name}")
    print(f"  Root: {ds.root}")

    file_idx = 0
    for fp, intent in adapter_fn(ds):
        ds_stats["files_scanned"] += 1
        y = load_canonical(fp, AC)
        if y is None:
            ds_stats["files_rejected"] += 1
            continue
        ds_stats["files_loaded"] += 1

        win_count_before = len(all_windows)
        for w, m in window_generator(y, intent, ds.name, fp.name, file_idx, AC):
            all_windows.append(w)
            all_meta.append(m)
        ds_stats["windows_emitted"] += len(all_windows) - win_count_before
        file_idx += 1

        # Progress indicator every 100 files
        if ds_stats["files_scanned"] % 100 == 0:
            print(f"    … {ds_stats['files_scanned']} files scanned")

    audit[ds.name] = ds_stats
    print(f"  Scanned : {ds_stats['files_scanned']}")
    print(f"  Loaded  : {ds_stats['files_loaded']}")
    print(f"  Rejected: {ds_stats['files_rejected']}")
    print(f"  Windows : {ds_stats['windows_emitted']}")

# ── Summary ───────────────────────────────────────────────────
print(f"\n{'━'*60}")
print(f"Total windows collected: {len(all_windows)}")
print(f"Total metadata records : {len(all_meta)}")
assert len(all_windows) == len(all_meta), "Window/meta count mismatch!"
print(f"{'━'*60}")

## Stage 6 · Unified Assembly — Manifest, Shuffle, Balance

Build a pandas DataFrame from all collected window metadata, apply:
1. **Duplicate-hash rejection** — no two windows share the same SHA-256
2. **Deterministic shuffle** — fixed seed for train/val reproducibility
3. **Class-distribution report** — expose any severe class imbalance
4. **Manifest CSV** — the single source of truth for downstream training

> **No class resampling is applied here** — that is the training script's
> responsibility.  The manifest records the natural distribution so the
> trainer can decide on oversampling / loss weighting.

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 6 · Unified Assembly — Build Manifest DataFrame
# ──────────────────────────────────────────────────────────────

# ── Build DataFrame from WindowMeta records ───────────────────
manifest = pd.DataFrame([asdict(m) for m in all_meta])
print(f"Raw manifest rows: {len(manifest)}")

# ── Duplicate detection via SHA-256 ───────────────────────────
dup_mask = manifest.duplicated(subset=["sha256"], keep="first")
n_dups = int(dup_mask.sum())
print(f"Duplicate windows (by SHA-256): {n_dups}")
manifest = manifest[~dup_mask].reset_index(drop=True)
print(f"After deduplication           : {len(manifest)}")

# ── Remove corresponding audio arrays ─────────────────────────
# Build the surviving indices from the original all_meta list
surviving_indices = manifest.index.tolist()
# Actually, we need to track original indices before reset
# Rebuild from scratch — safer:
manifest_pre = pd.DataFrame([asdict(m) for m in all_meta])
dup_mask2 = manifest_pre.duplicated(subset=["sha256"], keep="first")
keep_mask = ~dup_mask2
surviving_windows = [all_windows[i] for i in range(len(all_windows)) if keep_mask.iloc[i]]

assert len(surviving_windows) == len(manifest), \
    f"Window/manifest mismatch after dedup: {len(surviving_windows)} vs {len(manifest)}"

# ── Deterministic shuffle ─────────────────────────────────────
shuffle_idx = _rng.permutation(len(manifest))
manifest = manifest.iloc[shuffle_idx].reset_index(drop=True)
surviving_windows = [surviving_windows[i] for i in shuffle_idx]

# ── Class distribution report ─────────────────────────────────
print(f"\n{'─'*60}")
print("Class distribution (natural, no resampling):")
class_counts = manifest["intent_name"].value_counts()
for name, count in class_counts.items():
    pct = 100.0 * count / len(manifest)
    print(f"  {name:25s}  {count:6d}  ({pct:5.1f}%)")

# ── Per-dataset breakdown ─────────────────────────────────────
print(f"\nPer-dataset breakdown:")
cross = pd.crosstab(manifest["dataset"], manifest["intent_name"], margins=True)
print(cross.to_string())

print(f"\n{'━'*60}")
print(f"Final manifest size: {len(manifest)} windows")
print(f"{'━'*60}")

## Stage 7 · Persist Outputs

Save:
- **`windows.npy`** — stacked `(N, 32000)` float32 array of all windows
- **`manifest.csv`** — full metadata manifest
- **`pipeline_config.json`** — frozen configuration for reproducibility

On Kaggle the output goes to `/kaggle/working/processed/`.
Locally it goes to `./processed_data/`.

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 7 · Save Outputs
# ──────────────────────────────────────────────────────────────

OUTPUT_DIR = Path("/kaggle/working/processed") if IS_KAGGLE else Path("processed_data")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# ── 1. Stack audio windows into a single (N, 32000) array ────
X = np.stack(surviving_windows, axis=0)          # (N, 32000) float32
print(f"Audio tensor shape : {X.shape}  dtype: {X.dtype}")
print(f"Audio tensor size  : {X.nbytes / 1e6:.1f} MB")

npy_path = OUTPUT_DIR / "windows.npy"
np.save(npy_path, X)
print(f"Saved: {npy_path}")

# ── 2. Save manifest CSV ─────────────────────────────────────
csv_path = OUTPUT_DIR / "manifest.csv"
manifest.to_csv(csv_path, index=False)
print(f"Saved: {csv_path}")

# ── 3. Save frozen pipeline configuration ────────────────────
config_snapshot = {
    "audio_contract": asdict(AC),
    "intent_labels": INTENT_NAMES,
    "global_seed": GLOBAL_SEED,
    "ravdess_emotion_map": {k: (v.name if v is not None else "DROP")
                            for k, v in RAVDESS_EMOTION_MAP.items()},
    "threat_folder_intents": {k: v.name for k, v in THREAT_FOLDER_INTENTS.items()},
    "audit_per_dataset": audit,
    "final_manifest_rows": len(manifest),
    "duplicates_removed": n_dups,
}

config_path = OUTPUT_DIR / "pipeline_config.json"
with open(config_path, "w") as f:
    json.dump(config_snapshot, f, indent=2, default=str)
print(f"Saved: {config_path}")

print(f"\n✅ All outputs saved to {OUTPUT_DIR}/")
print(f"   windows.npy           — {X.shape[0]} windows × {X.shape[1]} samples")
print(f"   manifest.csv          — {len(manifest)} rows")
print(f"   pipeline_config.json  — frozen config + audit trail")

## Stage 8 · Verification & Integrity Assertions

Automated checks that MUST pass before this dataset can be used for training:

| Check | Description |
|-------|-------------|
| **Shape** | Every row in `windows.npy` has exactly 32 000 samples |
| **Dtype** | All audio is float32 |
| **Range** | All values in [−1.0, +1.0] |
| **Labels** | All `intent` values ∈ {0, 1, 2, 3} |
| **Uniqueness** | No duplicate `window_id` or `sha256` |
| **Manifest ↔ Array** | Row count matches between CSV and .npy |
| **Round-trip** | Reload from disk and compare |

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 8 · Verification & Integrity Assertions
# ──────────────────────────────────────────────────────────────

def verify_dataset(output_dir: Path, ac: AudioContract = AC) -> None:
    """Run all integrity checks. Raises AssertionError on any failure."""

    print("Running verification checks …\n")

    # ── Reload from disk ──────────────────────────────────────
    X_disk = np.load(output_dir / "windows.npy")
    m_disk = pd.read_csv(output_dir / "manifest.csv")

    # Check 1: manifest ↔ array row count
    assert X_disk.shape[0] == len(m_disk), \
        f"FAIL: array rows ({X_disk.shape[0]}) ≠ manifest rows ({len(m_disk)})"
    print(f"  ✓ Manifest rows ({len(m_disk)}) == array rows ({X_disk.shape[0]})")

    # Check 2: window sample count
    assert X_disk.shape[1] == ac.window_samples, \
        f"FAIL: sample dim ({X_disk.shape[1]}) ≠ expected ({ac.window_samples})"
    print(f"  ✓ Window length = {ac.window_samples} samples ({ac.window_sec}s)")

    # Check 3: dtype
    assert X_disk.dtype == np.float32, f"FAIL: dtype is {X_disk.dtype}"
    print(f"  ✓ Dtype = float32")

    # Check 4: value range
    assert float(np.min(X_disk)) >= -1.0, "FAIL: values below −1.0"
    assert float(np.max(X_disk)) <=  1.0, "FAIL: values above +1.0"
    print(f"  ✓ Value range [{np.min(X_disk):.4f}, {np.max(X_disk):.4f}] ⊂ [−1, +1]")

    # Check 5: labels are valid
    valid_intents = set(INTENT_NAMES.keys())
    actual_intents = set(m_disk["intent"].unique())
    assert actual_intents.issubset(valid_intents), \
        f"FAIL: invalid intents {actual_intents - valid_intents}"
    print(f"  ✓ All intent labels valid: {sorted(actual_intents)}")

    # Check 6: unique window IDs
    assert m_disk["window_id"].is_unique, "FAIL: duplicate window_id"
    print(f"  ✓ All window_id values unique")

    # Check 7: unique SHA-256
    assert m_disk["sha256"].is_unique, "FAIL: duplicate sha256"
    print(f"  ✓ All SHA-256 hashes unique (no duplicate audio content)")

    # Check 8: no NaN in manifest
    nan_count = int(m_disk.isna().sum().sum())
    assert nan_count == 0, f"FAIL: {nan_count} NaN values in manifest"
    print(f"  ✓ No NaN values in manifest")

    # ── Summary ───────────────────────────────────────────────
    print(f"\n{'━'*60}")
    print(f"✅  ALL {8} VERIFICATION CHECKS PASSED")
    print(f"    Dataset is ready for downstream training.")
    print(f"{'━'*60}")


verify_dataset(OUTPUT_DIR)

## Stage 9 · Dataset Summary & Quick EDA

Quick visual / tabular inspection of the final processed dataset.

In [None]:
# ──────────────────────────────────────────────────────────────
# Stage 9 · Quick EDA on final processed dataset
# ──────────────────────────────────────────────────────────────
import matplotlib
matplotlib.use("Agg")           # headless-safe backend
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# ── 1. Class distribution bar chart ──────────────────────────
intent_counts = manifest["intent_name"].value_counts()
colors = ["#2ecc71", "#e67e22", "#e74c3c", "#8e44ad"]
intent_counts.plot.bar(ax=axes[0], color=colors[:len(intent_counts)])
axes[0].set_title("Class Distribution (natural)")
axes[0].set_ylabel("Window count")
axes[0].tick_params(axis="x", rotation=30)

# ── 2. Per-dataset contribution ──────────────────────────────
dataset_counts = manifest["dataset"].value_counts()
dataset_counts.plot.bar(ax=axes[1], color="steelblue")
axes[1].set_title("Windows per Dataset Source")
axes[1].set_ylabel("Window count")
axes[1].tick_params(axis="x", rotation=30)

# ── 3. RMS distribution (quality check) ─────────────────────
axes[2].hist(manifest["rms"], bins=80, color="gray", edgecolor="white")
axes[2].axvline(AC.rms_floor, color="red", ls="--", label=f"silence floor ({AC.rms_floor})")
axes[2].set_title("Per-Window RMS Distribution")
axes[2].set_xlabel("RMS")
axes[2].legend()

plt.tight_layout()
plt.savefig(OUTPUT_DIR / "eda_summary.png", dpi=150)
plt.show()

# ── Manifest sample ──────────────────────────────────────────
print("\nManifest sample (first 10 rows):")
print(manifest.head(10).to_string(index=False))

# ── Full audit trail ─────────────────────────────────────────
print(f"\n{'─'*60}")
print("AUDIT TRAIL (per-dataset)")
for ds_name, stats in audit.items():
    print(f"\n  {ds_name}:")
    for k, v in stats.items():
        print(f"    {k:20s}: {v}")

print(f"\n{'━'*60}")
print("Pipeline complete. Dataset is ready for model training.")
print(f"{'━'*60}")