# Cough Detection (5s) - Clean Training Pipeline

This notebook follows a simple flow similar to the old 9s notebook:
1. Import and label dataset from metadata.
2. Truncate each clip to 5 seconds.
3. Extract MFCC features.
4. Train the model.
5. Evaluate and export (`.h5` and int8 `.tflite`).

Noise robustness is added in training through waveform augmentation.


In [9]:
import os
import json
import random
from pathlib import Path

import numpy as np
import pandas as pd
import librosa
from tqdm import tqdm


import tensorflow as tf
import keras
from keras import layers, models
from keras.utils import to_categorical

from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support, roc_auc_score

print('TensorFlow:', tf.__version__)


TensorFlow: 2.10.0


In [10]:
# =====================
# Configuration
# =====================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)
tf.random.set_seed(SEED)

PUBLIC_DATASET_CANDIDATES = [Path('../public_dataset'), Path('public_dataset')]
ESP32_NOISE_CANDIDATES = [Path('./esp32_dataset/non_cough'), Path('model/esp32_dataset/non_cough')]


def first_existing(paths):
    for p in paths:
        if p.exists():
            return p
    return paths[0]


PUBLIC_DATASET_DIR = first_existing(PUBLIC_DATASET_CANDIDATES)
ESP32_NOISE_DIR = first_existing(ESP32_NOISE_CANDIDATES)

OUTPUT_PREFIX = 'cough_cnn_5s_base'

# Strong labels from public metadata
POS_THRESHOLD = 0.80  # cough_detected >= 0.80 -> cough
NEG_THRESHOLD = 0.20  # cough_detected <= 0.20 -> non-cough

# Audio + MFCC
SR = 16000
DURATION = 5.0
TARGET_SAMPLES = int(SR * DURATION)

N_MFCC = 40
N_MELS = 128
N_FFT = 1024
HOP_LENGTH = 512
EXPECTED_FRAMES = 1 + int(np.floor((TARGET_SAMPLES - N_FFT) / float(HOP_LENGTH)))

# Split
TEST_SIZE = 0.15
VAL_SIZE_FROM_TRAIN = 0.1765  # gives ~15% total val when test is 15%

# Training
BATCH_SIZE = 32
EPOCHS = 40
LEARNING_RATE = 3e-4

# Optional caps for quick experiments (set None for full dataset)
MAX_TRAIN_PER_CLASS = None
MAX_VAL_PER_CLASS = None
MAX_TEST_PER_CLASS = None

# Data augmentation (training only)
# IMPORTANT: keep this conservative to avoid learning "noise => cough" shortcuts.
AUG_PER_SAMPLE = 1
USE_BACKGROUND_NOISE_MIX = False  # set True only if you want explicit background-noise mixing

print('Public dataset:', PUBLIC_DATASET_DIR.resolve())
print('ESP32 noise dir:', ESP32_NOISE_DIR.resolve() if ESP32_NOISE_DIR.exists() else ESP32_NOISE_DIR)
print('Target clip:', DURATION, 'seconds')
print('Expected MFCC shape:', (EXPECTED_FRAMES, N_MFCC))
print('USE_BACKGROUND_NOISE_MIX:', USE_BACKGROUND_NOISE_MIX)


Public dataset: E:\minor-project\public_dataset
ESP32 noise dir: E:\minor-project\model\esp32_dataset\non_cough
Target clip: 5.0 seconds
Expected MFCC shape: (155, 40)
USE_BACKGROUND_NOISE_MIX: False


In [11]:
# =====================
# Step 1: Build metadata table
# =====================

def build_dataframe(dataset_dir, pos_threshold=0.8, neg_threshold=0.2):
    files = []
    cough_scores = []
    labels = []

    for wav_path in sorted(dataset_dir.glob('*.wav')):
        json_path = wav_path.with_suffix('.json')
        if not json_path.exists():
            continue

        try:
            meta = json.loads(json_path.read_text(encoding='utf-8'))
            score = float(meta.get('cough_detected'))
        except Exception:
            continue

        if score >= pos_threshold:
            label = 1
        elif score <= neg_threshold:
            label = 0
        else:
            continue

        files.append(str(wav_path.resolve()))
        cough_scores.append(score)
        labels.append(label)

    df = pd.DataFrame({
        'wav_path': files,
        'cough_score': cough_scores,
        'label': labels,
    })
    return df


df = build_dataframe(PUBLIC_DATASET_DIR, POS_THRESHOLD, NEG_THRESHOLD)

print('Total usable samples:', len(df))
print('Class distribution:', df['label'].value_counts().to_dict())
print(df.head(3))


Total usable samples: 17839
Class distribution: {1: 12290, 0: 5549}
                                            wav_path  cough_score  label
0  E:\minor-project\public_dataset\2d22665d-5ca0-...       0.8796      1
1  E:\minor-project\public_dataset\2d25616a-2bee-...       0.0099      0
2  E:\minor-project\public_dataset\2d26833b-337e-...       0.9817      1


In [12]:
# =====================
# Step 2: Split train/val/test
# =====================

train_df, test_df = train_test_split(
    df,
    test_size=TEST_SIZE,
    random_state=SEED,
    stratify=df['label']
)

train_df, val_df = train_test_split(
    train_df,
    test_size=VAL_SIZE_FROM_TRAIN,
    random_state=SEED,
    stratify=train_df['label']
)


def cap_per_class(split_df, max_per_class, seed=42):
    if max_per_class is None:
        return split_df.reset_index(drop=True)

    return (
        split_df.groupby('label', group_keys=False)
        .apply(lambda g: g.sample(n=min(len(g), int(max_per_class)), random_state=seed))
        .reset_index(drop=True)
    )


train_df = cap_per_class(train_df, MAX_TRAIN_PER_CLASS, SEED)
val_df = cap_per_class(val_df, MAX_VAL_PER_CLASS, SEED)
test_df = cap_per_class(test_df, MAX_TEST_PER_CLASS, SEED)

print('Train:', train_df.shape, train_df['label'].value_counts().to_dict())
print('Val:  ', val_df.shape, val_df['label'].value_counts().to_dict())
print('Test: ', test_df.shape, test_df['label'].value_counts().to_dict())

Train: (12486, 3) {1: 8602, 0: 3884}
Val:   (2677, 3) {1: 1844, 0: 833}
Test:  (2676, 3) {1: 1844, 0: 832}


In [13]:
# =====================
# Step 3: Noise pool + augmentation
# =====================

PUBLIC_NOISE_PATHS = train_df.loc[train_df['label'] == 0, 'wav_path'].tolist()
ESP32_NOISE_PATHS = sorted(str(p.resolve()) for p in ESP32_NOISE_DIR.glob('*.wav')) if ESP32_NOISE_DIR.exists() else []

print('Public non-cough noise pool:', len(PUBLIC_NOISE_PATHS))
print('ESP32 non-cough noise pool:', len(ESP32_NOISE_PATHS))


def pad_or_trim(y, target_samples):
    if len(y) < target_samples:
        y = np.pad(y, (0, target_samples - len(y)))
    elif len(y) > target_samples:
        y = y[:target_samples]
    return y.astype(np.float32)


def rms(x):
    return float(np.sqrt(np.mean(np.square(x), dtype=np.float64) + 1e-10))


def mix_at_snr(clean, noise, snr_db):
    clean_rms = max(rms(clean), 1e-4)
    noise_rms = max(rms(noise), 1e-6)
    noise_target_rms = clean_rms / (10.0 ** (snr_db / 20.0))
    return clean + noise * (noise_target_rms / noise_rms)


def sample_noise_clip(target_samples, rng):
    all_noise = []
    if len(PUBLIC_NOISE_PATHS) > 0:
        all_noise.extend(PUBLIC_NOISE_PATHS)
    if len(ESP32_NOISE_PATHS) > 0:
        all_noise.extend(ESP32_NOISE_PATHS)

    if len(all_noise) == 0:
        return np.zeros(target_samples, dtype=np.float32)

    selected = all_noise[rng.integers(0, len(all_noise))]
    y, _ = librosa.load(selected, sr=SR, mono=True)
    y = y.astype(np.float32)

    if len(y) >= target_samples:
        start = int(rng.integers(0, len(y) - target_samples + 1))
        return y[start:start + target_samples]

    reps = int(np.ceil(target_samples / len(y)))
    return np.tile(y, reps)[:target_samples]


def augment_waveform(y, rng):
    y = y.astype(np.float32).copy()

    # Conservative augmentation to avoid learning loudness shortcuts.
    y *= rng.uniform(0.80, 1.20)

    if rng.random() < 0.70:
        max_shift = int(0.15 * SR)
        shift = int(rng.integers(-max_shift, max_shift + 1))
        y = np.roll(y, shift)

    # Optional background noise mix (disabled by default).
    if USE_BACKGROUND_NOISE_MIX and rng.random() < 0.50:
        n = sample_noise_clip(len(y), rng)
        snr_db = rng.uniform(8.0, 25.0)
        y = mix_at_snr(y, n, snr_db)

    # Tiny sensor-like hiss only (very mild).
    if rng.random() < 0.20:
        y += rng.uniform(0.0002, 0.0020) * rng.normal(0.0, 1.0, len(y)).astype(np.float32)

    return np.clip(y, -1.0, 1.0).astype(np.float32)



Public non-cough noise pool: 3884
ESP32 non-cough noise pool: 288


In [14]:
# =====================
# Step 4: MFCC extraction (5s)
# =====================

def extract_mfcc_2d(wav_path=None,
                    y=None,
                    sr=SR,
                    n_mfcc=N_MFCC,
                    n_mels=N_MELS,
                    n_fft=N_FFT,
                    hop_length=HOP_LENGTH,
                    duration=DURATION,
                    max_frames=EXPECTED_FRAMES,
                    normalise=True):
    """
    Extract MFCC as (frames, n_mfcc) from a 5s clip.
    """
    if y is None:
        y, _ = librosa.load(wav_path, sr=sr, mono=True, offset=0.0, duration=duration)

    y = pad_or_trim(y, int(sr * duration))

    mfcc = librosa.feature.mfcc(
        y=y,
        sr=sr,
        n_mfcc=n_mfcc,
        n_mels=n_mels,
        n_fft=n_fft,
        hop_length=hop_length,
        htk=False
    )

    mfcc = mfcc.T.astype(np.float32)

    if max_frames is not None:
        if mfcc.shape[0] < max_frames:
            pad = np.zeros((max_frames - mfcc.shape[0], n_mfcc), dtype=np.float32)
            mfcc = np.vstack([mfcc, pad])
        elif mfcc.shape[0] > max_frames:
            mfcc = mfcc[:max_frames, :]

    if normalise:
        mean = np.mean(mfcc, axis=0, keepdims=True)
        std = np.std(mfcc, axis=0, keepdims=True) + 1e-6
        mfcc = (mfcc - mean) / std

    return mfcc.astype(np.float32)


In [15]:
def extract_features(df,
                     duration=DURATION,
                     normalise=True,
                     augment=False,
                     aug_per_sample=0,
                     seed=42):
    """
    Build X, y arrays from dataframe.
    If augment=True, generates additional noisy samples per training clip.
    """
    rng = np.random.default_rng(seed)

    n_files = len(df)
    total = n_files * (1 + aug_per_sample if augment else 1)

    X = np.zeros((total, EXPECTED_FRAMES, N_MFCC), dtype=np.float32)
    y = np.zeros((total,), dtype=np.int32)

    idx = 0
    for row in tqdm(df.itertuples(index=False), total=n_files):
        wav_path = row.wav_path
        label = int(row.label)

        base_wave_raw, _ = librosa.load(wav_path, sr=SR, mono=True, offset=0.0, duration=duration)
        base_wave_raw = pad_or_trim(base_wave_raw, TARGET_SAMPLES)

        base_wave = base_wave_raw.astype(np.float32)

        # original
        X[idx] = extract_mfcc_2d(y=base_wave, duration=duration, normalise=normalise)
        y[idx] = label
        idx += 1

        # noisy copies (training only)
        if augment:
            for _ in range(aug_per_sample):
                aug_wave = augment_waveform(base_wave_raw, rng)
                aug_wave = aug_wave.astype(np.float32)
                X[idx] = extract_mfcc_2d(y=aug_wave, duration=duration, normalise=normalise)
                y[idx] = label
                idx += 1

    return X[:idx], y[:idx]


In [16]:
# Extract features
X_train, y_train = extract_features(
    train_df,
    duration=DURATION,
    normalise=True,
    augment=True,
    aug_per_sample=AUG_PER_SAMPLE,
    seed=SEED
)

X_val, y_val = extract_features(
    val_df,
    duration=DURATION,
    normalise=True,
    augment=False,
    aug_per_sample=0,
    seed=SEED
)

X_test, y_test = extract_features(
    test_df,
    duration=DURATION,
    normalise=True,
    augment=False,
    aug_per_sample=0,
    seed=SEED
)

print('X_train:', X_train.shape, 'y_train:', y_train.shape)
print('X_val:  ', X_val.shape, 'y_val:  ', y_val.shape)
print('X_test: ', X_test.shape, 'y_test: ', y_test.shape)

print('Train class counts:', {0:int(np.sum(y_train==0)), 1:int(np.sum(y_train==1))})
print('Val class counts:  ', {0:int(np.sum(y_val==0)), 1:int(np.sum(y_val==1))})
print('Test class counts: ', {0:int(np.sum(y_test==0)), 1:int(np.sum(y_test==1))})


100%|██████████| 12486/12486 [02:00<00:00, 103.36it/s]
100%|██████████| 2677/2677 [00:14<00:00, 186.70it/s]
100%|██████████| 2676/2676 [00:12<00:00, 213.32it/s]

X_train: (24972, 155, 40) y_train: (24972,)
X_val:   (2677, 155, 40) y_val:   (2677,)
X_test:  (2676, 155, 40) y_test:  (2676,)
Train class counts: {0: 7768, 1: 17204}
Val class counts:   {0: 833, 1: 1844}
Test class counts:  {0: 832, 1: 1844}





In [17]:
# Prepare labels for categorical training
y_train_onehot = to_categorical(y_train, num_classes=2)
y_val_onehot = to_categorical(y_val, num_classes=2)
y_test_onehot = to_categorical(y_test, num_classes=2)

class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.array([0, 1]),
    y=y_train
)
class_weight = {0: float(class_weights[0]), 1: float(class_weights[1])}

print('Class weights:', class_weight)


Class weights: {0: 1.607363542739444, 1: 0.7257614508253895}


In [18]:
# =====================
# Step 5: Model + training
# =====================

def build_tinyml_cnn(input_shape):
    inputs = tf.keras.Input(shape=input_shape)

    x = layers.Conv1D(32, 3, padding='same', activation='relu')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(2)(x)

    x = layers.Conv1D(64, 3, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(2)(x)

    x = layers.Conv1D(128, 3, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.GlobalAveragePooling1D()(x)

    x = layers.Dropout(0.30)(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.20)(x)

    outputs = layers.Dense(2, activation='softmax')(x)

    model = tf.keras.Model(inputs, outputs)
    return model


input_shape = X_train.shape[1:]
model = build_tinyml_cnn(input_shape)

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss=keras.losses.CategoricalCrossentropy(label_smoothing=0.05),
    metrics=['accuracy']
)

model.summary()

callbacks = [
    keras.callbacks.ModelCheckpoint(
        filepath=f'{OUTPUT_PREFIX}.h5',
        monitor='val_loss',
        mode='min',
        save_best_only=True,
        verbose=1,
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        mode='min',
        factor=0.5,
        patience=4,
        min_lr=1e-5,
        verbose=1,
    ),
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        mode='min',
        patience=10,
        restore_best_weights=True,
        verbose=1,
    ),
]

history = model.fit(
    X_train, y_train_onehot,
    validation_data=(X_val, y_val_onehot),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    class_weight=class_weight,
    callbacks=callbacks,
    verbose=1
)

model = keras.models.load_model(f'{OUTPUT_PREFIX}.h5')


Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 155, 40)]         0         
                                                                 
 conv1d (Conv1D)             (None, 155, 32)           3872      
                                                                 
 batch_normalization (BatchN  (None, 155, 32)          128       
 ormalization)                                                   
                                                                 
 max_pooling1d (MaxPooling1D  (None, 77, 32)           0         
 )                                                               
                                                                 
 conv1d_1 (Conv1D)           (None, 77, 64)            6208      
                                                                 
 batch_normalization_1 (Batc  (None, 77, 64)           256   

In [19]:
# =====================
# Step 6: Validation threshold + Test evaluation
# =====================
val_prob = model.predict(X_val, batch_size=BATCH_SIZE, verbose=1)[:, 1]

best_thr = None
best_row = None
rows = []

for thr in np.linspace(0.10, 0.90, 161):
    pred = (val_prob >= thr).astype(np.int32)
    p, r, f1, _ = precision_recall_fscore_support(y_val, pred, average='binary', zero_division=0)
    rows.append((float(thr), float(p), float(r), float(f1)))

thr_df = pd.DataFrame(rows, columns=['threshold', 'precision', 'recall', 'f1'])

# prefer thresholds with precision >= 0.80 to reduce false cough detections
candidates = thr_df[thr_df['precision'] >= 0.80]
if len(candidates) > 0:
    best_row = candidates.sort_values(['f1', 'precision', 'recall'], ascending=False).iloc[0]
else:
    best_row = thr_df.sort_values(['f1', 'precision', 'recall'], ascending=False).iloc[0]

best_thr = float(best_row['threshold'])

print('Best threshold from val:', best_thr)
print('Val metrics @best threshold:', {
    'precision': float(best_row['precision']),
    'recall': float(best_row['recall']),
    'f1': float(best_row['f1'])
})

test_prob = model.predict(X_test, batch_size=BATCH_SIZE, verbose=1)[:, 1]
test_pred = (test_prob >= best_thr).astype(np.int32)

print('Confusion matrix (test):')
print(confusion_matrix(y_test, test_pred))

print('Classification report (test):')
print(classification_report(y_test, test_pred, digits=4))

print('Test AUC:', roc_auc_score(y_test, test_prob))


Best threshold from val: 0.315
Val metrics @best threshold: {'precision': 0.9172450613988254, 'recall': 0.9316702819956616, 'f1': 0.9244013989776702}
Confusion matrix (test):
[[ 679  153]
 [ 116 1728]]
Classification report (test):
              precision    recall  f1-score   support

           0     0.8541    0.8161    0.8347       832
           1     0.9187    0.9371    0.9278      1844

    accuracy                         0.8995      2676
   macro avg     0.8864    0.8766    0.8812      2676
weighted avg     0.8986    0.8995    0.8988      2676

Test AUC: 0.9526257195895211


In [20]:
# =====================
# Step 7: Convert to int8 TFLite
# =====================

def representative_data_gen():
    n = min(300, len(X_train))
    idx = np.random.choice(len(X_train), size=n, replace=False)
    for i in idx:
        sample = X_train[i:i+1].astype(np.float32)
        yield [sample]


converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

quant_tflite = converter.convert()

model_tflite_path = Path(f'{OUTPUT_PREFIX}_int8.tflite')
model_tflite_path.write_bytes(quant_tflite)
print('Saved TFLite model:', model_tflite_path.resolve())




INFO:tensorflow:Assets written to: C:\Users\Aman\AppData\Local\Temp\tmpdnf5p1e6\assets


INFO:tensorflow:Assets written to: C:\Users\Aman\AppData\Local\Temp\tmpdnf5p1e6\assets


Saved TFLite model: E:\minor-project\model\cough_cnn_5s_base_int8.tflite


In [21]:
# Check quantization parameters (needed for Arduino)
interpreter = tf.lite.Interpreter(model_path=f'{OUTPUT_PREFIX}_int8.tflite')
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()[0]
output_details = interpreter.get_output_details()[0]

print('Input shape:', input_details['shape'])
print('Input quantization (scale, zero_point):', input_details['quantization'])
print('Input dtype:', input_details['dtype'])

print('Output shape:', output_details['shape'])
print('Output quantization (scale, zero_point):', output_details['quantization'])
print('Output dtype:', output_details['dtype'])


Input shape: [  1 155  40]
Input quantization (scale, zero_point): (0.09420423954725266, -1)
Input dtype: <class 'numpy.int8'>
Output shape: [1 2]
Output quantization (scale, zero_point): (0.00390625, -128)
Output dtype: <class 'numpy.int8'>
