# Beehive Audio → NumPy Feature Builder (Log-Mel first, MFCC optional)

This notebook cleans WAV files, segments them, extracts **log-Mel** (default) or **MFCC(+Δ,+ΔΔ)** features,
and exports NumPy arrays (`.npy`) into a separate directory for CNN training.

In [22]:
# If you need to install, uncomment:
# !pip install librosa soundfile scipy scikit-learn numpy

import os
from pathlib import Path
import json
import random

import numpy as np
import librosa
import soundfile as sf
from scipy.signal import butter, sosfiltfilt
from sklearn.model_selection import train_test_split

## Configuration

In [23]:
# --- Paths ---
# Set your dataset path; it should contain class subfolders, e.g.:
# data_bees/
#   ├─ queen/*.wav
#   └─ no_queen/*.wav
# --- Paths ---
# Folder containing all WAV files (not separated by class)
DATA_DIR = Path(r"C:\Users\leona\Documents\Thesis_Project_UACH\Temp\Dataset\BeesAnna\sound_files")

# Output directory for the extracted .npy files and meta.json
OUT_DIR = Path(r"C:\Users\leona\Documents\Thesis_Project_UACH\Temp\Dataset\BeesAnna\features_bees")


# --- Audio & preprocessing ---
SR        = 16000                   # target sample rate (Hz)
TRIM_DB   = 30                      # trim leading/trailing silence below this dB
USE_BANDPASS = True                 # light noise control via band-pass
BP_LOW, BP_HIGH = 100, 1000         # Hz (tune after inspecting spectrograms)

# --- Segmentation ---
SEG_SEC   = 2.0                     # segment length (seconds)
HOP_SEC   = 1.0                     # hop between segments (seconds)

# --- Feature Type ---
FEATURE_TYPE = "logmel"             # "logmel" (recommended) or "mfcc"

# Log-Mel parameters
N_MELS    = 64
N_FFT     = int(0.025 * SR)         # 25 ms
HOP_LEN   = int(0.010 * SR)         # 10 ms
FMIN, FMAX = 20, SR // 2

# MFCC parameters
N_MFCC      = 32
ADD_DELTAS  = True                  # stack Δ and ΔΔ for CNN channels

# --- Splits & randomness ---
RANDOM_SEED = 123
TEST_SIZE   = 0.15
VAL_SIZE    = 0.15                  # of remaining after taking out test

## Utilities: file listing, labels, filters, loading, segmentation

In [24]:
def list_wavs(root: Path):
    files = []
    classes = []
    for class_dir in sorted(p for p in root.iterdir() if p.is_dir()):
        label = class_dir.name
        for wav in class_dir.rglob("*.wav"):
            files.append(wav)
            classes.append(label)
    return files, classes

def label_to_index(labels):
    labset = sorted(set(labels))
    lab2idx = {lab:i for i,lab in enumerate(labset)}
    y = np.array([lab2idx[l] for l in labels], dtype=np.int64)
    return y, lab2idx

def peak_normalize(x, eps=1e-9):
    peak = np.max(np.abs(x)) + eps
    return x / peak

def bandpass_sos(sr, low_hz, high_hz, order=4):
    sos = butter(order, [low_hz, high_hz], btype='bandpass', fs=sr, output='sos')
    return sos

def apply_bandpass(x, sr, low, high):
    sos = bandpass_sos(sr, low, high)
    return sosfiltfilt(sos, x)

def load_and_clean(path):
    # Load mono at target SR
    x, _sr = librosa.load(path, sr=SR, mono=True)
    # Trim leading/trailing silence
    x, _ = librosa.effects.trim(x, top_db=TRIM_DB)
    # Peak normalize
    x = peak_normalize(x)
    # Optional band-pass
    if USE_BANDPASS:
        x = apply_bandpass(x, SR, BP_LOW, BP_HIGH)
    return x

def segment_signal(x, sr, seg_sec, hop_sec):
    seg_len = int(seg_sec * sr)
    hop_len = int(hop_sec * sr)
    if len(x) < seg_len:
        # pad (reflect) so we have at least one segment
        pad = seg_len - len(x)
        x = np.pad(x, (0, pad), mode='reflect')
    segments = []
    for start in range(0, max(1, len(x)-seg_len+1), hop_len):
        end = start + seg_len
        seg = x[start:end]
        if len(seg) < seg_len:
            seg = np.pad(seg, (0, seg_len - len(seg)), mode='reflect')
        segments.append(seg)
    return segments

## Feature extraction: log-Mel (default) and MFCC (optional)

In [25]:
def compute_logmel(seg):
    S = librosa.feature.melspectrogram(
        y=seg, sr=SR, n_fft=N_FFT, hop_length=HOP_LEN,
        n_mels=N_MELS, fmin=FMIN, fmax=FMAX, power=2.0
    )
    logS = librosa.power_to_db(S, ref=np.max)
    # (n_mels, T) -> (1, H, W) for CNNs
    return logS[np.newaxis, :, :].astype(np.float32)

def compute_mfcc(seg):
    mfcc = librosa.feature.mfcc(
        y=seg, sr=SR, n_mfcc=N_MFCC, n_fft=N_FFT, hop_length=HOP_LEN,
        fmin=FMIN, fmax=FMAX
    )
    if ADD_DELTAS:
        delta = librosa.feature.delta(mfcc)
        delta2 = librosa.feature.delta(mfcc, order=2)
        feat = np.stack([mfcc, delta, delta2], axis=0)  # (3, n_mfcc, T)
    else:
        feat = mfcc[np.newaxis, :, :]                    # (1, n_mfcc, T)
    return feat.astype(np.float32)

def featurize(seg):
    if FEATURE_TYPE == "logmel":
        return compute_logmel(seg)
    elif FEATURE_TYPE == "mfcc":
        return compute_mfcc(seg)
    else:
        raise ValueError(f"Unknown FEATURE_TYPE: {FEATURE_TYPE}")

def standardize_features(train_X, val_X, test_X, eps=1e-6):
    # Per-channel standardization (mean/std over all samples and time-frequency bins)
    C = train_X.shape[1]
    means = []
    stds = []
    train_X_std = train_X.copy()
    val_X_std = val_X.copy()
    test_X_std = test_X.copy()
    for c in range(C):
        mu = train_X[:, c].mean()
        sigma = train_X[:, c].std() + eps
        means.append(float(mu))
        stds.append(float(sigma))
        train_X_std[:, c] = (train_X[:, c] - mu) / sigma
        val_X_std[:, c]   = (val_X[:, c]   - mu) / sigma
        test_X_std[:, c]  = (test_X[:, c]  - mu) / sigma
    stats = {"channel_means": means, "channel_stds": stds}
    return train_X_std, val_X_std, test_X_std, stats

## Build dataset and export `.npy` files

In [26]:
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

OUT_DIR.mkdir(parents=True, exist_ok=True)

wavs, labels = list_wavs(DATA_DIR)
if not wavs:
    raise SystemExit(f"No WAV files found under {DATA_DIR}. Expected class subfolders with .wav files.")

y_all, lab2idx = label_to_index(labels)
idx2lab = {v:k for k,v in lab2idx.items()}
print("Classes:", idx2lab)

# Stratified split *by file* (not by segments)
from sklearn.model_selection import train_test_split
wavs = np.array(wavs)
y_all = np.array(y_all)

wavs_train, wavs_tmp, y_train, y_tmp = train_test_split(
    wavs, y_all, test_size=TEST_SIZE, random_state=RANDOM_SEED, stratify=y_all
)

val_ratio_of_remaining = VAL_SIZE / (1.0 - TEST_SIZE)
wavs_val, wavs_test, y_val, y_test = train_test_split(
    wavs_tmp, y_tmp, test_size=1.0 - val_ratio_of_remaining,
    random_state=RANDOM_SEED, stratify=y_tmp
)

def process_many(file_list, labels_list):
    feats = []
    labs  = []
    for path, lab in zip(file_list, labels_list):
        x = load_and_clean(str(path))
        segments = segment_signal(x, SR, SEG_SEC, HOP_SEC)
        for seg in segments:
            f = featurize(seg)  # (C,H,W)
            feats.append(f)
            labs.append(lab)
    X = np.stack(feats, axis=0)   # (N,C,H,W)
    y = np.array(labs, dtype=np.int64)
    return X, y

print("Processing TRAIN...")
X_train, y_train = process_many(wavs_train, y_train)
print("Processing VAL...")
X_val, y_val     = process_many(wavs_val, y_val)
print("Processing TEST...")
X_test, y_test   = process_many(wavs_test, y_test)

print("Standardizing features (train stats only)...")
X_train, X_val, X_test, stats = standardize_features(X_train, X_val, X_test)

# Save arrays and metadata
np.save(OUT_DIR / "X_train.npy", X_train)
np.save(OUT_DIR / "y_train.npy", y_train)
np.save(OUT_DIR / "X_val.npy",   X_val)
np.save(OUT_DIR / "y_val.npy",   y_val)
np.save(OUT_DIR / "X_test.npy",  X_test)
np.save(OUT_DIR / "y_test.npy",  y_test)

with open(OUT_DIR / "meta.json", "w") as f:
    json.dump({
        "sr": SR,
        "segment_seconds": SEG_SEC,
        "hop_seconds": HOP_SEC,
        "feature_type": FEATURE_TYPE,
        "n_mels": int(N_MELS) if FEATURE_TYPE=="logmel" else None,
        "n_mfcc": int(N_MFCC) if FEATURE_TYPE=="mfcc" else None,
        "add_deltas": bool(ADD_DELTAS) if FEATURE_TYPE=="mfcc" else None,
        "classes": {int(k): v for k, v in idx2lab.items()},
        "standardization": stats,
        "bandpass": {"enabled": bool(USE_BANDPASS), "low": int(BP_LOW), "high": int(BP_HIGH)}
    }, f, indent=2)

print("Saved features to:", OUT_DIR.resolve())

SystemExit: No WAV files found under C:\Users\leona\Documents\Thesis_Project_UACH\Temp\Dataset\BeesAnna\sound_files. Expected class subfolders with .wav files.

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


## Sanity check

In [None]:
# Reload and inspect shapes to verify downstream compatibility
Xt = np.load(OUT_DIR / "X_train.npy")
yt = np.load(OUT_DIR / "y_train.npy")
print("X_train shape:", Xt.shape, "(N, C, H, W)")
print("y_train shape:", yt.shape)
print("Feature type:", FEATURE_TYPE)

## (Optional) Visual preview of one feature map

In [None]:
# %matplotlib inline
import matplotlib.pyplot as plt

Xt = np.load(OUT_DIR / "X_train.npy")
yt = np.load(OUT_DIR / "y_train.npy")

i = 0  # change index to preview other samples
feat = Xt[i]  # (C,H,W)

# show first channel
plt.figure()
plt.imshow(feat[0], aspect="auto", origin="lower")
plt.title(f"Channel 0 - {FEATURE_TYPE}")
plt.xlabel("Frames"); plt.ylabel("Bins")
plt.colorbar()
plt.show()