# Data preparation

In [1]:
import os
import numpy as np
import librosa
import soundfile as sf
import random
from glob import glob
from scipy.signal import butter, lfilter

SR = 16000
TARGET_PER_CLASS = 2000   # desired balanced dataset size

BASE_DIR = "."
OUT_DIR = "balanced_dataset"
os.makedirs(OUT_DIR, exist_ok=True)

CLASSES = ["background", "explosion", "glass_break", "gunshot", "scream"]


# ===========================
# Audio augmentation methods
# ===========================

def add_noise(y, noise):
    # random SNR 0–15 dB
    snr_db = random.uniform(0, 15)
    rms_y = np.sqrt(np.mean(y**2))
    rms_n = np.sqrt(np.mean(noise**2))
    desired_rms_n = rms_y / (10 ** (snr_db / 20))
    noise = noise * (desired_rms_n / (rms_n + 1e-9))
    return y + noise[:len(y)]

def time_stretch(y):
    rate = random.uniform(0.9, 1.1)
    y2 = librosa.effects.time_stretch(y=y, rate=rate)
    if len(y2) < SR:
        y2 = np.pad(y2, (0, SR - len(y2)))
    return y2[:SR]


def pitch_shift(y):
    steps = random.uniform(-2, 2)
    return librosa.effects.pitch_shift(y=y, sr=SR, n_steps=steps)


def lowpass(y):
    b, a = butter(4, 3000 / (SR / 2), btype='low')
    return lfilter(b, a, y)


def highpass(y):
    b, a = butter(4, 200 / (SR / 2), btype='high')
    return lfilter(b, a, y)


def specaugment(logmel):
    """Apply simple SpecAugment masks"""
    mel = logmel.copy()
    # time mask
    t = random.randint(5, 15)
    t0 = random.randint(0, mel.shape[1]-t)
    mel[:, t0:t0+t] = 0
    # freq mask
    f = random.randint(1, 6)
    f0 = random.randint(0, mel.shape[0]-f)
    mel[f0:f0+f, :] = 0
    return mel


# ===========================
# Load 1-second audio
# ===========================

def load_wav(path):
    y, _ = librosa.load(path, sr=SR, mono=True)
    if len(y) < SR:
        y = np.pad(y, (0, SR - len(y)))
    return y[:SR]


# ===========================
# create balanced dataset
# ===========================

def create_balanced_dataset():
    # load background noises for mixing
    background_pool = []
    for bg_file in glob(os.path.join(BASE_DIR, "background", "*.wav")):
        background_pool.append(load_wav(bg_file))

    # process each class
    for cname in CLASSES:
        print(f"\nProcessing class: {cname}")
        in_dir = os.path.join(BASE_DIR, cname)
        out_dir = os.path.join(OUT_DIR, cname)
        os.makedirs(out_dir, exist_ok=True)

        files = glob(os.path.join(in_dir, "*.wav"))
        originals = [load_wav(f) for f in files]
        count = len(originals)

        print(f"Original samples: {count}")

        # Always save original files first
        idx = 0
        for i, y in enumerate(originals):
            out = os.path.join(out_dir, f"{cname}_{idx}.wav")
            sf.write(out, y, SR)
            idx += 1

        # Augment until reaching TARGET_PER_CLASS
        while idx < TARGET_PER_CLASS:
            y = random.choice(originals)

            # choose random augmentation chain
            aug = y.copy()

            if random.random() < 0.7:
                aug = add_noise(aug, random.choice(background_pool))

            if random.random() < 0.5:
                aug = time_stretch(aug)

            if random.random() < 0.5:
                aug = pitch_shift(aug)

            if random.random() < 0.3:
                aug = lowpass(aug)

            if random.random() < 0.3:
                aug = highpass(aug)

            out = os.path.join(out_dir, f"{cname}_{idx}.wav")
            sf.write(out, aug, SR)
            idx += 1

        print(f"Final count for {cname}: {idx}")


if __name__ == "__main__":
    create_balanced_dataset()


  "cipher": algorithms.TripleDES,
  "class": algorithms.Blowfish,
  "class": algorithms.TripleDES,



Processing class: background
Original samples: 1920
Final count for background: 2000

Processing class: explosion
Original samples: 40
Final count for explosion: 2000

Processing class: glass_break
Original samples: 40
Final count for glass_break: 2000

Processing class: gunshot
Original samples: 1450
Final count for gunshot: 2000

Processing class: scream
Original samples: 1583
Final count for scream: 2000


In [3]:
import os
import numpy as np
import librosa

SR = 16000
N_MELS = 40
N_FFT = 512
HOP = 160
WIN = 400

BASE = "balanced_dataset"
LABELS = ["background", "explosion", "glass_break", "gunshot", "scream"]

def logmel(y):
    mel = librosa.feature.melspectrogram(
        y=y, sr=SR, n_fft=N_FFT, hop_length=HOP,
        win_length=WIN, n_mels=N_MELS
    )
    logmel = librosa.power_to_db(mel)
    return (logmel - np.mean(logmel)) / (np.std(logmel)+1e-9)


def extract_all():
    X, y = [], []

    for label_idx, cname in enumerate(LABELS):
        folder = os.path.join(BASE, cname)
        files = [f for f in os.listdir(folder) if f.endswith(".wav")]

        for f in files:
            y_audio, _ = librosa.load(os.path.join(folder, f), sr=SR, mono=True)
            if len(y_audio) < SR:
                y_audio = np.pad(y_audio, (0, SR - len(y_audio)))
            y_audio = y_audio[:SR]

            feat = logmel(y_audio)
            X.append(feat)
            y.append(label_idx)

    X = np.array(X)[..., np.newaxis]
    y = np.array(y)

    np.savez("features_balanced.npz", X=X, y=y, labels=LABELS)
    print("\nSaved features_balanced.npz")


if __name__ == "__main__":
    extract_all()



Saved features_balanced.npz


# TRAINING (BALANCED MINI-BATCH + CLASS WEIGHTS)

In [6]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from model_audio import build_model
import random

data = np.load("features_balanced.npz", allow_pickle=True)
X = data["X"]
y = data["y"]

labels = data["labels"]

# Train-test split
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# Class weights (still useful)
cw = class_weight.compute_class_weight("balanced", classes=np.unique(y), y=y)
cw_dict = dict(enumerate(cw))

# Balanced on-the-fly generator
def balanced_gen():
    groups = {c: np.where(y_train==c)[0] for c in np.unique(y_train)}
    classes = list(groups.keys())

    while True:
        Xb, yb = [], []
        for _ in range(32):  # batch size
            c = random.choice(classes)
            idx = random.choice(groups[c])
            Xb.append(X_train[idx])
            yb.append(y_train[idx])
        yield np.array(Xb), np.array(yb)


model = build_model(input_shape=X.shape[1:], num_classes=len(labels))

model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

model.fit(
    balanced_gen(),
    steps_per_epoch=200,    # enough to train properly
    validation_data=(X_val, y_val),
    epochs=40
)

model.save("audio_balanced_model.h5")
print("\nSaved audio_balanced_model.h5")

Epoch 1/40
[1m200/200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 20ms/step - accuracy: 0.3608 - loss: 1.4197 - val_accuracy: 0.6135 - val_loss: 0.9862
Epoch 2/40
[1m200/200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 18ms/step - accuracy: 0.6077 - loss: 0.9843 - val_accuracy: 0.6965 - val_loss: 0.7962
Epoch 3/40
[1m200/200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 18ms/step - accuracy: 0.7052 - loss: 0.7601 - val_accuracy: 0.7365 - val_loss: 0.6855
Epoch 4/40
[1m200/200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 18ms/step - accuracy: 0.7235 - loss: 0.7028 - val_accuracy: 0.7550 - val_loss: 0.6404
Epoch 5/40
[1m200/200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 18ms/step - accuracy: 0.7591 - loss: 0.6354 - val_accuracy: 0.7615 - val_loss: 0.6113
Epoch 6/40
[1m200/200[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 18ms/step - accuracy: 0.7508 - loss: 0.6572 - val_accuracy: 0.7795 - val_loss: 0.5748
Epoch 7/40
[1m200/200




Saved audio_balanced_model.h5


In [11]:
pip install fastapi unicorn pillow opencv-python-headless ultralytics facenet-pytorch torchvision sqlalchemy aiofiles python-multipart opencv-contrib-python-headless

Collecting fastapi
  Downloading fastapi-0.122.0-py3-none-any.whl.metadata (30 kB)
Collecting unicorn
  Downloading unicorn-2.1.4-cp37-abi3-win_amd64.whl.metadata (4.3 kB)
Collecting opencv-python-headless
  Downloading opencv_python_headless-4.12.0.88-cp37-abi3-win_amd64.whl.metadata (20 kB)
Collecting ultralytics
  Downloading ultralytics-8.3.231-py3-none-any.whl.metadata (37 kB)
Collecting facenet-pytorch
  Downloading facenet_pytorch-2.6.0-py3-none-any.whl.metadata (12 kB)
Collecting torchvision
  Downloading torchvision-0.24.1-cp312-cp312-win_amd64.whl.metadata (5.9 kB)
Collecting aiofiles
  Downloading aiofiles-25.1.0-py3-none-any.whl.metadata (6.3 kB)
Collecting python-multipart
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)
Collecting opencv-contrib-python-headless
  Downloading opencv_contrib_python_headless-4.12.0.88-cp37-abi3-win_amd64.whl.metadata (20 kB)
Collecting starlette<0.51.0,>=0.40.0 (from fastapi)
  Downloading starlette-0.50.0-py3-none-an

  You can safely remove it manually.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
gensim 4.3.3 requires numpy<2.0,>=1.18.5, but you have numpy 2.2.6 which is incompatible.
numba 0.60.0 requires numpy<2.1,>=1.22, but you have numpy 2.2.6 which is incompatible.
tensorflow-intel 2.18.0 requires numpy<2.1.0,>=1.26.0, but you have numpy 2.2.6 which is incompatible.
