#### Imports and global settings

In [11]:
# === Libraries ===
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as signal
import librosa
from IPython.display import Audio, display
from scipy.signal import savgol_filter
import pandas as pd

plt.style.use("ggplot")
plt.rcParams['figure.figsize'] = (15,5)

print("Libraries loaded.")


Libraries loaded.


#### File handling

In [None]:
participants = ["emre", "olena", "narmeen", "sneha", "paula", "yernur", "peyman"]

def get_file(participant, condition):
    """
    condition = 'normal' or 'fatigue'
    """
    return f"{participant}_{condition}.wav"


#### Load audio

In [3]:
def load_audio(filename):
    y, sr = librosa.load(filename, sr=None)
    duration = len(y) / sr
    print(f"Loaded: {filename} | SR = {sr} Hz | Duration = {duration:.2f}s")
    return y, sr


#### Breathing envelop function

In [4]:
def extract_breathing_envelope(y, sr, target_sr=100):
    # Normalize
    y = y / (np.max(np.abs(y)) + 1e-12)

    # Hilbert envelope
    analytic = signal.hilbert(y)
    env = np.abs(analytic)

    # Smooth (SavGol)
    env = savgol_filter(env, 201, 3)

    # Light low-pass (cut = 0.7 Hz)
    b, a = signal.butter(1, 0.7 / (sr/2), btype='low')
    env = signal.filtfilt(b, a, env)

    # Z-score
    env = (env - np.mean(env)) / (np.std(env) + 1e-12)

    # Downsample
    N = int(len(env) * target_sr / sr)
    env = signal.resample(env, N)

    return env, target_sr


#### Breathing rate variability

In [None]:
def compute_brv_from_peaks(peaks, sr):
    """
    Compute breathing rate variability from detected breath peaks.
    peaks: indices of breaths in the envelope
    sr: envelope sampling rate
    """
    if len(peaks) < 3:
        return {"sdnn": 0, "rmssd": 0, "cv": 0, "mean_ibi": 0}

    times = peaks / sr

    ibis = np.diff(times)  

    if len(ibis) < 2:
        return {"sdnn": 0, "rmssd": 0, "cv": 0, "mean_ibi": np.mean(ibis)}

    sdnn = np.std(ibis)           
    rmssd = np.sqrt(np.mean(np.diff(ibis)**2))
    cv = sdnn / np.mean(ibis)      

    return {
        "sdnn": sdnn,
        "rmssd": rmssd,
        "cv": cv,
        "mean_ibi": np.mean(ibis)
    }


#### Cough detection

In [None]:
def detect_coughs(y, sr,
                  lf_cut=300,
                  hf_low=600, hf_high=3000,
                  win_ms=20, hop_ms=10,
                  min_gap_ms=250,
                  energy_thresh_factor=2.5,
                  ratio_thresh=3.0):

    b_lf, a_lf = signal.butter(4, lf_cut/(sr/2), btype='low')
    y_lf = signal.filtfilt(b_lf, a_lf, y)

    b_hf, a_hf = signal.butter(4,
                               [hf_low/(sr/2), hf_high/(sr/2)],
                               btype='band')
    y_hf = signal.filtfilt(b_hf, a_hf, y)

    win = int(sr * win_ms/1000)
    hop = int(sr * hop_ms/1000)

    hf_energy = []
    lf_energy = []

    for i in range(0, len(y)-win, hop):
        f_lf = y_lf[i:i+win]
        f_hf = y_hf[i:i+win]
        lf_energy.append(np.sum(f_lf**2))
        hf_energy.append(np.sum(f_hf**2))

    hf_energy = np.array(hf_energy)
    lf_energy = np.array(lf_energy)

    ratio = hf_energy / (lf_energy + 1e-6)

    thr_energy = np.mean(hf_energy) + energy_thresh_factor*np.std(hf_energy)
    thr_ratio = np.mean(ratio) + ratio_thresh*np.std(ratio)

    candidates = np.where((hf_energy > thr_energy) & (ratio > thr_ratio))[0]

    min_gap_frames = int(min_gap_ms/hop_ms)
    cough_events = []
    last = -9999

    for c in candidates:
        if c - last > min_gap_frames:
            cough_events.append(c)
            last = c

    return len(cough_events), hf_energy, cough_events


#### Method 1: Peak detection

In [None]:
def detect_breath_peaks(env, sr):
    d = np.gradient(env)
    d = signal.medfilt(d, kernel_size=101)

    zc = np.where(np.diff(np.sign(d)) < 0)[0]

    min_dist = int(0.8 * sr)

    peaks = []
    last = -9999

    for z in zc:
        if z - last >= min_dist:
            peaks.append(z)
            last = z

    peaks = np.array(peaks)

    if len(peaks) < 2:
        bpm = 0
    else:
        intervals = np.diff(peaks) / sr
        bpm = 60 / np.mean(intervals)

    brv_metrics = compute_brv_from_peaks(peaks, sr)

    return peaks, bpm, brv_metrics


#### Method 2: FFT

In [22]:
def breathing_rate_fft(env, sr, fmin=0.1, fmax=0.7):
    n = len(env)
    freqs = np.fft.rfftfreq(n, 1/sr)
    spectrum = np.abs(np.fft.rfft(env))

    mask = (freqs >= fmin) & (freqs <= fmax)
    freqs_band = freqs[mask]
    spec_band = spectrum[mask]

    if len(spec_band) == 0:
        return 0

    peak_freq = freqs_band[np.argmax(spec_band)]
    bpm = peak_freq * 60
    return bpm


#### Method 3: Local autocorrelation

In [None]:
def breathing_rate_local_ac(env, sr,
                            window_sec=10,
                            hop_sec=2,
                            min_bpm=8,
                            max_bpm=40,
                            rhythm_thresh=1.0): 
    win = int(window_sec * sr)
    hop = int(hop_sec * sr)

    bpm_list = []

    for start in range(0, len(env) - win, hop):
        seg = env[start:start+win]
        seg = seg - np.mean(seg)

        ac = np.correlate(seg, seg, mode='full')
        ac = ac[len(ac)//2:]   

        min_lag = int((60/max_bpm) * sr)
        max_lag = int((60/min_bpm) * sr)

        ac_seg = ac[min_lag:max_lag]
        lags = np.arange(min_lag, max_lag)

        if len(ac_seg) == 0:
            continue

        peak = np.max(ac_seg)
        avg = np.mean(ac_seg)
        score = peak / (avg + 1e-12)

        if score < rhythm_thresh:
            continue

        best_lag = lags[np.argmax(ac_seg)]
        bpm = 60 / (best_lag / sr)
        bpm_list.append(bpm)

    if len(bpm_list) == 0:
        return 0, {"sdnn":0, "rmssd":0, "cv":0, "mean_ibi":0}

    bpm_avg = np.mean(bpm_list)

    ibis = 60 / np.array(bpm_list)

    sdnn = np.std(ibis)
    rmssd = np.sqrt(np.mean(np.diff(ibis)**2)) if len(ibis) > 1 else 0
    cv = sdnn / np.mean(ibis)

    brv_metrics = {
        "sdnn": sdnn,
        "rmssd": rmssd,
        "cv": cv,
        "mean_ibi": np.mean(ibis)
    }

    return bpm_avg, brv_metrics


#### Process one file

In [None]:
def expand_labels_to_samples(labels, y_len, win, hop):
    """
    Convert frame-level labels into sample-level labels.
    labels: array of length N_frames
    returns mask of length y_len
    """
    mask = np.zeros(y_len)

    for i, lab in enumerate(labels):
        start = i * hop
        end = start + win
        if start >= y_len:
            break
        mask[start:min(end, y_len)] = lab 

    return mask


In [53]:
def analyze_file(filename):
    y, sr = load_audio(filename)
    env, sr_env = extract_breathing_envelope(y, sr)

    peaks, bpm_peak, brv_peak = detect_breath_peaks(env, sr_env)
    bpm_fft = breathing_rate_fft(env, sr_env)
    bpm_ac, brv_ac = breathing_rate_local_ac(env, sr_env)

    coughs, energy_env, cough_peaks = detect_coughs(y, sr)

    return {
        "file": filename,
        "bpm_peak": bpm_peak,
        "brv_peak_sdnn": brv_peak["sdnn"],
        "brv_peak_rmssd": brv_peak["rmssd"],
        "brv_peak_cv": brv_peak["cv"],
        "bpm_fft": bpm_fft,
        "bpm_ac": bpm_ac,
        "brv_ac_sdnn": brv_ac["sdnn"],
        "brv_ac_rmssd": brv_ac["rmssd"],
        "brv_ac_cv": brv_ac["cv"],
        "cough_count": coughs
    }


#### Classify all

In [None]:
conditions = ["normal", "fatigue", "cough"]

results = []

for p in participants:
    for cond in conditions:
        fname = get_file(p, cond) 
        out = analyze_file(fname)
        out["participant"] = p
        out["condition_true"] = cond
        results.append(out)

df = pd.DataFrame(results)
df


Loaded: emre_normal.wav | SR = 16000 Hz | Duration = 63.03s
Loaded: emre_fatigue.wav | SR = 16000 Hz | Duration = 60.68s
Loaded: emre_cough.wav | SR = 16000 Hz | Duration = 43.40s
Loaded: olena_normal.wav | SR = 16000 Hz | Duration = 72.39s
Loaded: olena_fatigue.wav | SR = 16000 Hz | Duration = 65.51s
Loaded: olena_cough.wav | SR = 16000 Hz | Duration = 48.70s
Loaded: narmeen_normal.wav | SR = 16000 Hz | Duration = 32.24s
Loaded: narmeen_fatigue.wav | SR = 16000 Hz | Duration = 32.63s
Loaded: narmeen_cough.wav | SR = 16000 Hz | Duration = 34.97s
Loaded: sneha_normal.wav | SR = 16000 Hz | Duration = 62.70s
Loaded: sneha_fatigue.wav | SR = 16000 Hz | Duration = 61.90s
Loaded: sneha_cough.wav | SR = 16000 Hz | Duration = 37.79s
Loaded: paula_normal.wav | SR = 16000 Hz | Duration = 62.62s
Loaded: paula_fatigue.wav | SR = 16000 Hz | Duration = 55.66s
Loaded: paula_cough.wav | SR = 16000 Hz | Duration = 25.02s
Loaded: yernur_normal.wav | SR = 16000 Hz | Duration = 70.27s
Loaded: yernur_fatig

Unnamed: 0,file,bpm_peak,brv_peak_sdnn,brv_peak_rmssd,brv_peak_cv,bpm_fft,bpm_ac,brv_ac_sdnn,brv_ac_rmssd,brv_ac_cv,cough_count,participant,condition_true
0,emre_normal.wav,18.417462,0.424062,0.515638,0.130169,6.663494,0.0,0.0,0.0,0.0,0,emre,normal
1,emre_fatigue.wav,26.732673,0.443466,0.590326,0.197584,28.675016,27.720871,0.035,0.07,0.016166,0,emre,fatigue
2,emre_cough.wav,16.140866,0.194987,0.232809,0.052454,16.589862,17.647059,0.0,0.0,0.0,4,emre,cough
3,olena_normal.wav,18.231804,0.290236,0.319186,0.088192,17.405719,19.292605,0.0,0.0,0.0,10,olena,normal
4,olena_fatigue.wav,25.882729,0.552275,0.778089,0.23824,6.411235,21.165221,1.048337,1.302027,0.331869,0,olena,fatigue
5,olena_cough.wav,13.129103,1.391079,1.03082,0.304394,11.090573,0.0,0.0,0.0,0.0,1,olena,cough
6,narmeen_normal.wav,11.385199,0.800479,1.18628,0.151894,11.169718,0.0,0.0,0.0,0.0,6,narmeen,normal
7,narmeen_fatigue.wav,22.587269,0.217183,0.370365,0.08176,22.065584,24.108145,0.055498,0.071414,0.022288,0,narmeen,fatigue
8,narmeen_cough.wav,14.176019,0.580619,0.87178,0.137181,15.441807,0.0,0.0,0.0,0.0,3,narmeen,cough
9,sneha_normal.wav,18.59184,0.546644,0.936348,0.169385,8.61244,18.927445,0.0,0.0,0.0,9,sneha,normal


#### classify

In [55]:
def classify_bpm(bpm):
    if bpm < 20:
        return "normal"
    else:
        return "fatigue"

df["pred_peak"] = df["bpm_peak"].apply(classify_bpm)
df["pred_fft"] = df["bpm_fft"].apply(classify_bpm)
df["pred_ac"]   = df["bpm_ac"].apply(classify_bpm)

df[["participant", "file", "condition_true",
    "bpm_peak", "pred_peak",
    "bpm_fft", "pred_fft",
    "bpm_ac", "pred_ac"]]


Unnamed: 0,participant,file,condition_true,bpm_peak,pred_peak,bpm_fft,pred_fft,bpm_ac,pred_ac
0,emre,emre_normal.wav,normal,18.417462,normal,6.663494,normal,0.0,normal
1,emre,emre_fatigue.wav,fatigue,26.732673,fatigue,28.675016,fatigue,27.720871,fatigue
2,emre,emre_cough.wav,cough,16.140866,normal,16.589862,normal,17.647059,normal
3,olena,olena_normal.wav,normal,18.231804,normal,17.405719,normal,19.292605,normal
4,olena,olena_fatigue.wav,fatigue,25.882729,fatigue,6.411235,normal,21.165221,fatigue
5,olena,olena_cough.wav,cough,13.129103,normal,11.090573,normal,0.0,normal
6,narmeen,narmeen_normal.wav,normal,11.385199,normal,11.169718,normal,0.0,normal
7,narmeen,narmeen_fatigue.wav,fatigue,22.587269,fatigue,22.065584,fatigue,24.108145,fatigue
8,narmeen,narmeen_cough.wav,cough,14.176019,normal,15.441807,normal,0.0,normal
9,sneha,sneha_normal.wav,normal,18.59184,normal,8.61244,normal,18.927445,normal


#### Accuracy of methods

In [58]:
methods = ["pred_peak", "pred_fft", "pred_ac"]

df_temp = df[df["condition_true"] != "cough"]
for m in methods:
    acc = (df_temp[m] == df_temp["condition_true"]).mean()
    print(f"{m} accuracy: {acc*100:.1f}%")


pred_peak accuracy: 92.9%
pred_fft accuracy: 85.7%
pred_ac accuracy: 92.9%
