# model_test.ipynb — Hoest vs Normaal (live mic)
Deze notebook laadt de bundel (TFLite + manifest + labels + preproces) en test live via de microfoon met **vensters van 3.0 s** en **0 s overlap** (dus non-overlapping sliding window).

Zorg dat de bundelbestanden beschikbaar zijn (lokaal pad hieronder). Je kunt dit notebook later één-op-één hergebruiken op de Pi; alleen het pad en eventuele device-namen kunnen verschillen.

## 0) Vereisten installeren
Als je deze packages nog niet hebt:
- `tflite-runtime` **of** `tensorflow` (voor de TFLite interpreter)
- `sounddevice` (microfoon)
- `soundfile` (wav I/O voor offline test)
- `librosa`, `scipy` (preprocessing)

Op macOS of Linux moet je mogelijk microfoonrechten toestaan voor je terminal/IDE.

In [None]:
# !pip install --upgrade pip
# Kies bij voorkeur tflite-runtime; zo niet, dan fallback naar tensorflow
# !pip install tflite-runtime==2.14.0 || pip install tensorflow==2.17.0
# !pip install sounddevice soundfile librosa scipy numpy

## 1) Paden & bundel inlezen
Pas `BUNDLE_DIR` aan naar de map waar `cnn_normaal_vs_hoest.tflite`, `labels.json`, `manifest.json` en `preproces.py` staan.

In [2]:
from pathlib import Path
import json, importlib.util, sys

# >>> Paden instellen <<<
# Notebook staat in models/
# Bundel zit in models/bundles/hoest-vs-normaal-1.0.0
# Opnames staan in ../../data/
CANDIDATES = [
    Path("bundles/hart-vs-hoest-1.0.0"),   # hoofdpad van de bundel
    Path("."),                                # fallback: huidige map
    Path("/mnt/data"),                        # fallback: uploadlocatie
]

for c in CANDIDATES:
    if (c / "cnn_hoest_vs_hart.tflite").exists() and (c / "preproces.py").exists():
        BUNDLE_DIR = c
        break
else:
    raise FileNotFoundError(
        "Kan bundelbestanden niet vinden. Zet de 4 bestanden in dezelfde map en pas CANDIDATES aan."
    )

MODEL_PATH   = BUNDLE_DIR / "cnn_hoest_vs_hart.tflite"
LABELS_PATH  = BUNDLE_DIR / "labels.json"
MANIFEST_PATH= BUNDLE_DIR / "manifest.json"
PREPROC_PATH = BUNDLE_DIR / "preproces.py"

print("BUNDLE_DIR =", BUNDLE_DIR.resolve())

labels   = json.loads(LABELS_PATH.read_text(encoding="utf-8"))
manifest = json.loads(MANIFEST_PATH.read_text(encoding="utf-8"))

# Dynamisch importeren van bundel-preprocess
spec = importlib.util.spec_from_file_location("bundle_preproces", str(PREPROC_PATH))
bundle_preproces = importlib.util.module_from_spec(spec); sys.modules["bundle_preproces"] = bundle_preproces
spec.loader.exec_module(bundle_preproces)  # type: ignore

# Pad naar dataset met opnames (relatief t.o.v. models/)
DATA_DIR = Path("../../data")
print("DATA_DIR =", DATA_DIR.resolve())

labels, manifest

BUNDLE_DIR = C:\Users\Esmee Werk\Documents\Persoonlijke Projecten\Lungsound-Annotation-Tool\experiments\pipeline-pi\models\bundles\hart-vs-hoest-1.0.0
DATA_DIR = C:\Users\Esmee Werk\Documents\Persoonlijke Projecten\Lungsound-Annotation-Tool\experiments\data


(['Hart', 'Hoest'],
 {'name': 'hart-vs-hoest',
  'version': '1.0.0',
  'preprocess_mode': 'plugin',
  'sample_rate': 15750,
  'raw_window_seconds': 3.0,
  'stft': {'n_fft': 1024, 'hop': 256, 'window': 'hann'},
  'bandpass': {'low_hz': 20.0,
   'high_hz': 600.0,
   'order': 4,
   'zero_phase': True},
  'input_shape': [513, 182, 1],
  'input_dtype': 'float32',
  'num_classes': 2})

## 2) TFLite Interpreter klaarzetten
We verwachten een input-shape zoals in het manifest (`[F, T, 1]`). We voegen zelf batch-dimensie toe naar `[1, F, T, 1]`.

In [3]:
def _get_tflite_interpreter():
    # 1) Probeer tflite-runtime (lichtgewicht)
    try:
        from tflite_runtime.interpreter import Interpreter  # type: ignore
        return Interpreter, "tflite-runtime"
    except ModuleNotFoundError:
        pass

    # 2) Fallback: TensorFlow (heeft altijd tf.lite.Interpreter)
    try:
        import tensorflow as tf
        Interpreter = tf.lite.Interpreter  # correcte import voor TF
        return Interpreter, f"tensorflow {tf.__version__}"
    except Exception as e:
        raise ImportError(
            "Geen TFLite-backend gevonden. Installeer óf 'tflite-runtime' óf 'tensorflow>=2.9'.\n"
            "Voor Windows is 'tensorflow==2.17.1' de makkelijkste optie."
        ) from e

Interpreter, _backend = _get_tflite_interpreter()
print("TFLite backend:", _backend)

interpreter = Interpreter(model_path=str(MODEL_PATH))
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

print("Model input:", input_details[0]["shape"], input_details[0]["dtype"])
print("Model output:", output_details[0]["shape"], output_details[0]["dtype"])

TFLite backend: tensorflow 2.20.0
Model input: [  1 513 182   1] <class 'numpy.float32'>
Model output: [1 2] <class 'numpy.float32'>


    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


In [6]:
# Live reader-factory voor 3.0 s blokken met automatische resampling naar manifest['sample_rate'].
import numpy as np
import sounddevice as sd
import librosa

FS_TARGET = int(manifest.get("sample_rate", 16000))
WIN_SEC   = float(manifest.get("raw_window_seconds", 3.0))

class Mic3sReader:
    def __init__(self, fs_target: int, seconds: float):
        self.fs_target = int(fs_target)
        self.seconds   = float(seconds)
        self.done = False

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc, tb):
        return False

    def _record_once(self, fs: int, seconds: float) -> np.ndarray:
        frames = int(round(fs * seconds))
        audio = sd.rec(frames, samplerate=fs, channels=1, dtype="float32", blocking=True)
        sd.wait()
        y = audio[:, 0].astype(np.float32, copy=False)
        return y

    def read_bytes(self) -> bytes:
        if self.done:
            return b""

        # Probeer direct op de target sample rate; zo niet, gebruik device-default en resample.
        try:
            sd.check_input_settings(samplerate=self.fs_target, channels=1, dtype="float32")
            cap_fs = self.fs_target
        except Exception:
            dev = sd.query_devices(kind="input")
            cap_fs = int(round(dev["default_samplerate"]))

        y = self._record_once(fs=cap_fs, seconds=self.seconds)

        if cap_fs != self.fs_target:
            y = librosa.resample(y, orig_sr=cap_fs, target_sr=self.fs_target, res_type="kaiser_best")

        # Forceer exacte lengte (kan door resampling nét afwijken)
        n_need = int(self.fs_target * self.seconds)
        if len(y) < n_need:
            y = np.pad(y, (0, n_need - len(y)))
        elif len(y) > n_need:
            y = y[:n_need]

        # Float32 [-1,1] → int16 PCM bytes
        y = np.clip(y, -1.0, 1.0)
        x_i16 = (y * 32767.0).astype(np.int16, copy=False)
        self.done = True
        return x_i16.tobytes()

def reader_factory_live():
    return Mic3sReader(fs_target=FS_TARGET, seconds=WIN_SEC)

In [7]:
import numpy as np

def softmax(x):
    x = np.array(x, dtype=np.float32)
    x = x - np.max(x, axis=-1, keepdims=True)
    e = np.exp(x)
    return e / np.sum(e, axis=-1, keepdims=True)

def run_inference(x_F_T_1: np.ndarray):
    # Verwacht (F,T,1) float32; we voegen batch toe
    x = np.expand_dims(x_F_T_1, axis=0)
    # Cast/shape naar modelverwachting
    idx = input_details[0]["index"]
    x = x.astype(input_details[0]["dtype"], copy=False)
    interpreter.set_tensor(idx, x)
    interpreter.invoke()
    out = interpreter.get_tensor(output_details[0]["index"])
    out = np.squeeze(out, axis=0)
    return out

## 3) Reader-factory voor LIVE microfoon (3.0 s blokken, 0 s overlap)
De bundel verwacht **int16 mono PCM** op de sample rate uit het manifest. Niet alle laptops ondersteunen precies 15.75 kHz; in dat geval vangen we dit op door op een ondersteunde rate op te nemen en **te resamplen** naar het manifest (`sample_rate`).

- Venster: `raw_window_seconds` uit het manifest (hier 3.0 s)
- Overlap: 0 s → dus één blok per 3 s

In [8]:
# Plaats dit blok vlak vóór je aanroep naar build_preprocessor(...)

# Veiligheidsdefaults + bandpass instellen
manifest.setdefault("raw_window_seconds", 3.0)   # 3.0s vensters, 0 overlap
manifest.setdefault("sample_rate", 16000)        # pas aan indien anders getraind

# Normalisatie afdwingen als dict
norm = manifest.get("normalization")
if not isinstance(norm, dict):
    manifest["normalization"] = {"mean": 0.0, "std": 1.0}
else:
    manifest["normalization"].setdefault("mean", 0.0)
    manifest["normalization"].setdefault("std", 1.0)

# Bandpass: low=50 Hz, high=3000 Hz, orde=4, GEEN zero-phase
bp = manifest.get("bandpass") or {}
bp.update({
    "low_hz": 100.0,
    "high_hz": 3000.0,
    "order": 4,
    "zero_phase": False,   # geen filtfilt: introduceert fase (zoals gevraagd)
})
manifest["bandpass"] = bp

print("Manifest (samenvatting):")
print("  sample_rate         :", manifest["sample_rate"])
print("  raw_window_seconds  :", manifest["raw_window_seconds"])
print("  normalization       :", manifest["normalization"])
print("  bandpass            :", manifest["bandpass"])

# Nu de preprocessor bouwen met deze manifest-instellingen
preproc_live = bundle_preproces.build_preprocessor(reader_factory_live, cfg=None, manifest=manifest)

Manifest (samenvatting):
  sample_rate         : 15750
  raw_window_seconds  : 3.0
  normalization       : {'mean': 0.0, 'std': 1.0}
  bandpass            : {'low_hz': 100.0, 'high_hz': 3000.0, 'order': 4, 'zero_phase': False}


In [9]:
import sounddevice as sd
import numpy as np
import librosa

FS_TARGET = int(manifest.get("sample_rate", 15750))
WIN_SEC   = float(manifest.get("raw_window_seconds", 3.0))
N_SAMPLES_TARGET = int(FS_TARGET * WIN_SEC)

class Mic3sReader:
    def __init__(self, fs_target: int, seconds: float):
        self.fs_target = int(fs_target)
        self.seconds   = float(seconds)
        self.done = False

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc, tb):
        return False

    def _record_once(self, fs: int, seconds: float) -> np.ndarray:
        frames = int(round(fs * seconds))
        audio = sd.rec(frames, samplerate=fs, channels=1, dtype="float32", blocking=True)
        sd.wait()
        y = audio[:,0].astype(np.float32, copy=False)
        return y

    def read_bytes(self) -> bytes:
        if self.done:
            return b""
        # Probeer direct op FS_TARGET; als het faalt, valt terug op device-default
        try:
            sd.check_input_settings(samplerate=self.fs_target, channels=1, dtype="float32")
            cap_fs = self.fs_target
        except Exception:
            cap_fs = sd.query_devices(kind="input")["default_samplerate"]
            cap_fs = int(round(cap_fs))  # bv. 48000 of 44100

        y = self._record_once(fs=cap_fs, seconds=self.seconds)
        if cap_fs != self.fs_target:
            # Resample naar target
            y = librosa.resample(y, orig_sr=cap_fs, target_sr=self.fs_target, res_type="kaiser_best")
            # Zorg exact op lengte
            n_need = int(self.fs_target * self.seconds)
            if len(y) < n_need:
                y = np.pad(y, (0, n_need - len(y)))
            elif len(y) > n_need:
                y = y[:n_need]

        # Float32 → int16 PCM bytes
        y = np.clip(y, -1.0, 1.0)
        x_i16 = (y * 32767.0).astype(np.int16, copy=False)
        self.done = True
        return x_i16.tobytes()

def reader_factory_live():
    return Mic3sReader(fs_target=FS_TARGET, seconds=WIN_SEC)

# Bouw de bundel-preprocessor met onze reader-factory
preproc_live = bundle_preproces.build_preprocessor(reader_factory_live, cfg=None, manifest=manifest)

# Test één sample-preprocess om de shape te checken (neemt ~3s op)
print("Neem eenmalig 3.0 s op om preproc-shape te verifiëren…")
arr = preproc_live()
print("Preproc output shape/dtype:", arr.shape, arr.dtype)

Neem eenmalig 3.0 s op om preproc-shape te verifiëren…
Preproc output shape/dtype: (513, 182, 1) float32


## 4) Live inferentie-loop (elke 3.0 s)
Deze loop neemt steeds een nieuw 3.0 s blok op, voert preprocessing uit, en draait het model. Geen overlap.

In [10]:
import sounddevice as sd

sd.default.device = (1, None)  # index 1 = interne mic (MME)
print(sd.query_devices(sd.default.device[0]))

{'name': 'Microfoonmatrix (3- Intel® Smar', 'index': 1, 'hostapi': 0, 'max_input_channels': 4, 'max_output_channels': 0, 'default_low_input_latency': 0.09, 'default_low_output_latency': 0.09, 'default_high_input_latency': 0.18, 'default_high_output_latency': 0.18, 'default_samplerate': 44100.0}


In [11]:
import numpy as np, soundfile as sf

fs = 16000
dur = 3
print("Opnemen via interne microfoon...")
audio = sd.rec(int(fs*dur), samplerate=fs, channels=1, dtype="float32")
sd.wait()
print("Klaar. RMS:", np.sqrt(np.mean(audio**2)))

sf.write("mic_test.wav", audio, fs)
print("Bestand opgeslagen als mic_test.wav")

Opnemen via interne microfoon...
Klaar. RMS: 0.0001586376
Bestand opgeslagen als mic_test.wav


In [12]:
import time
import numpy as np
from collections import deque, Counter

# Controleer even dat de labelvolgorde klopt
print("Labels:", labels)  # verwacht zoiets als ["Hoest", "Normaal"] of ["Normaal", "Hoest"]

CONF_THRESH = 0.60     # minimale confidence voor een enkele beslissing
RMS_THRESH  = 0.010    # minimale RMS van het ruwe 3s-segment (0..1 float, tuneer op jouw microfoon)
VOTE_K      = 3        # majority vote over de laatste K beslissingen

recent = deque(maxlen=VOTE_K)

def segment_rms_from_preproc_input(x):
    """
    Schatting van input-energie. Als je preproces x=(F,T,1) features geeft,
    is de RMS van features minder betekenisvol. Daarom vragen we de preproc
    om ook de ruwe PCM te geven (optioneel), maar als dat niet kan,
    gebruiken we een proxy: gemiddelde log-magnitude over het featureframe.
    Pas dit aan als jouw preproces ruwe audio kan teruggeven.
    """
    # Proxy: gemiddelde absolute waarde van featuretensor
    return float(np.mean(np.abs(x)))

def predict_once(preproc_callable):
    x = preproc_callable()                 # verwacht (F,T,1)
    rms_proxy = segment_rms_from_preproc_input(x)
    logits = run_inference(x)              # (num_classes,)
    probs  = softmax(logits)
    top_i  = int(np.argmax(probs))
    top_p  = float(probs[top_i])
    return top_i, top_p, probs, rms_proxy

print("Start live inferentie. Druk op Stop (interrupteer de cel) om te beëindigen.")
t0 = time.time()
while True:
    t_start = time.time()
    i, p, probs, rms = predict_once(preproc_live)
    stamp = time.time() - t0

    # Energie-gate: negeer stille/heel lage-energie segmenten
    gated = rms < RMS_THRESH

    # Beslissing op dit segment
    decision = labels[i]
    confident = (p >= CONF_THRESH) and (not gated)

    # Majority vote buffer bijhouden alleen als we niet ‘gaten’
    if not gated:
        recent.append(decision)

    # Majority vote over laatste K
    voted = None
    if len(recent) == VOTE_K:
        cnt = Counter(recent)
        voted, n = cnt.most_common(1)[0]
        # optioneel: eis dat majority ook met confidence van dit segment samengaat
        # (kun je weglaten als je K al beschermend genoeg vindt)

    # Logging
    print(
        f"[{stamp:7.2f}s] "
        f"Top={decision:<7} p={p:.3f}  "
        f"voted={voted or '-':<7}  "
        f"rms≈{rms:.3f}  "
        f"{'GATED' if gated else ''}"
    )

    # Eenvoudige trigger (bijv. voor live feedback):
    # Als majority op 'Hoest' uitkomt én niet gated, dan 'Hoest' melden
    if voted == "Hoest" and not gated:
        print(">>> Detectie: Hoest")

    # 0 s overlap ⇒ geen extra slaap; opname+preprocess beslaat ~WIN_SEC zelf

Labels: ['Hart', 'Hoest']
Start live inferentie. Druk op Stop (interrupteer de cel) om te beëindigen.
[   3.67s] Top=Hart    p=0.703  voted=-        rms≈0.000  GATED
[   7.05s] Top=Hart    p=0.702  voted=-        rms≈0.000  GATED
[  10.44s] Top=Hart    p=0.709  voted=-        rms≈0.022  
[  13.79s] Top=Hart    p=0.706  voted=-        rms≈0.004  GATED
[  17.17s] Top=Hart    p=0.726  voted=-        rms≈0.093  
[  20.54s] Top=Hart    p=0.703  voted=-        rms≈0.000  GATED
[  23.91s] Top=Hart    p=0.725  voted=Hart     rms≈0.108  
[  27.27s] Top=Hart    p=0.701  voted=Hart     rms≈0.000  GATED


KeyboardInterrupt: 