In [1]:
import os
import numpy as np
import librosa
import matplotlib
matplotlib.use('Agg')
import pyaudio
import wave
from tensorflow.keras.models import load_model
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

In [2]:
# Dataset path and labels
dataset_path = "E:/Campus/Semester/FYP/siren_detection_project/dataset"
classes = ["siren", "non_siren"]

In [3]:
# Augmentation Function
def augment_audio(audio, sr=16000):
    augmented = []
    noise = 0.005 * np.random.randn(len(audio))
    augmented.append(audio + noise)

    shifted = np.roll(audio, np.random.randint(1000))
    augmented.append(shifted)

    pitch_shifted = librosa.effects.pitch_shift(audio, sr=sr, n_steps=np.random.uniform(-1, 1))
    augmented.append(pitch_shifted)

    return augmented

In [4]:
# Feature Extraction
def extract_features(audio, sr=16000):
    audio, _ = librosa.effects.trim(audio)
    audio = audio[:48000] if len(audio) > 48000 else np.pad(audio, (0, 48000 - len(audio)), mode='constant')

    mel = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=128, hop_length=512)
    mel_db = librosa.power_to_db(mel, ref=np.max)

    mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
    mfcc_delta = librosa.feature.delta(mfcc)
    mfcc_delta2 = librosa.feature.delta(mfcc, order=2)

    if mfcc.shape[1] != mel_db.shape[1]:
        mfcc = librosa.util.fix_length(mfcc, size=mel_db.shape[1], axis=1)
        mfcc_delta = librosa.util.fix_length(mfcc_delta, size=mel_db.shape[1], axis=1)
        mfcc_delta2 = librosa.util.fix_length(mfcc_delta2, size=mel_db.shape[1], axis=1)

    features = np.concatenate([mel_db, mfcc, mfcc_delta, mfcc_delta2], axis=0)
    features = (features - np.mean(features)) / (np.std(features) + 1e-6)
    return features

In [5]:
# Load and augment data
X = []
y = []
for idx, class_name in enumerate(classes):
    class_dir = os.path.join(dataset_path, class_name)
    if not os.path.exists(class_dir):
        print(f"Warning: {class_dir} not found.")
        continue
    for file in os.listdir(class_dir):
        file_path = os.path.join(class_dir, file)
        try:
            audio, sr = librosa.load(file_path, sr=16000)
            audios = [audio] + augment_audio(audio, sr)
            for aug_audio in audios:
                X.append(extract_features(aug_audio))
                y.append(idx)
        except Exception as e:
            print(f"Error loading {file_path}: {e}")
            
# Convert to numpy arrays
X_np = np.array(X)[..., np.newaxis]
y_np = np.array(y)

  "cipher": algorithms.TripleDES,
  "class": algorithms.Blowfish,
  "class": algorithms.TripleDES,


In [6]:
# Stratified train-test split
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
for train_index, test_index in sss.split(X_np, y_np):
    X_train, X_test = X_np[train_index], X_np[test_index]
    y_train, y_test = y_np[train_index], y_np[test_index]

In [7]:
# Model Definition
model = models.Sequential([
    layers.Input(shape=X_train.shape[1:]),
    layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(256, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),

    layers.Flatten(),
    layers.Dense(256, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.3),
    layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

In [8]:
# Callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-5)

In [9]:
# Training
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=50,
    batch_size=32,
    callbacks=[early_stopping, reduce_lr]
)

Epoch 1/50
[1m74/74[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 672ms/step - accuracy: 0.8500 - loss: 1.0210 - val_accuracy: 0.5169 - val_loss: 3.1077 - learning_rate: 0.0010
Epoch 2/50
[1m74/74[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m49s[0m 666ms/step - accuracy: 0.9621 - loss: 0.1890 - val_accuracy: 0.5339 - val_loss: 1.0184 - learning_rate: 0.0010
Epoch 3/50
[1m74/74[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 674ms/step - accuracy: 0.9653 - loss: 0.2129 - val_accuracy: 0.9407 - val_loss: 0.1835 - learning_rate: 0.0010
Epoch 4/50
[1m74/74[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m52s[0m 696ms/step - accuracy: 0.9800 - loss: 0.1279 - val_accuracy: 0.9305 - val_loss: 0.2618 - learning_rate: 0.0010
Epoch 5/50
[1m74/74[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 677ms/step - accuracy: 0.9797 - loss: 0.1237 - val_accuracy: 0.9881 - val_loss: 0.0479 - learning_rate: 0.0010
Epoch 6/50
[1m74/74[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m 

In [10]:
# Plot Accuracy
plt.figure(figsize=(10, 5))
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Val Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig('accuracy_plot.png')

In [11]:
# Plot Loss
plt.figure(figsize=(10, 5))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.savefig('loss_plot.png')

In [12]:
# Prediction & Evaluation
y_pred_prob = model.predict(X_test)
y_pred = (y_pred_prob > 0.5).astype(int).flatten()
y_true = y_test.flatten()

[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 130ms/step


In [13]:
# Metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print("\nPerformance Metrics:")
print(f"Accuracy :  {accuracy * 100:.2f}%")
print(f"Precision:  {precision * 100:.2f}%")
print(f"Recall   :  {recall * 100:.2f}%")
print(f"F1-Score :  {f1 * 100:.2f}%")


Performance Metrics:
Accuracy :  98.98%
Precision:  98.07%
Recall   :  100.00%
F1-Score :  99.03%


In [14]:
# Confusion Matrix
print("\nConfusion Matrix:")
print(confusion_matrix(y_true, y_pred))

# Save model
model.save("siren_detection_model_v2.h5")
print("\nModel saved as 'siren_detection_model_v2.h5'")




Confusion Matrix:
[[279   6]
 [  0 305]]

Model saved as 'siren_detection_model_v2.h5'


In [None]:

# Load your trained model
model = load_model("siren_detection_model_v2.h5")

# Feature extraction function (same as training)
def extract_features(audio, sr=16000):
    audio, _ = librosa.effects.trim(audio)
    audio = audio[:48000] if len(audio) > 48000 else np.pad(audio, (0, max(0, 48000 - len(audio))), mode='constant')
    
    mel = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=128, hop_length=512)
    mel_db = librosa.power_to_db(mel, ref=np.max)

    mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
    if mfcc.shape[1] != mel_db.shape[1]:
        mfcc = librosa.util.fix_length(mfcc, size=mel_db.shape[1], axis=1)

    # Combine mel and mfcc
    features = np.concatenate([mel_db, mfcc], axis=0)

    # Resize to (167, 94) to match model input
    desired_shape = (167, 94)
    features_resized = librosa.util.fix_length(features, size=desired_shape[0], axis=0)
    features_resized = librosa.util.fix_length(features_resized, size=desired_shape[1], axis=1)

    return features_resized


# Real-time audio recording setup
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16500
CHUNK = 1024
RECORD_SECONDS = 5

p = pyaudio.PyAudio()
try:
    stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
except Exception as e:
    print("Failed to open microphone stream:", e)
    exit()

print("Waiting for sound... (Recording 4 seconds)")

frames = []
for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
    frames.append(stream.read(CHUNK))

stream.stop_stream()
stream.close()
p.terminate()

# Save the audio temporarily
with wave.open("temp.wav", 'wb') as wf:
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(p.get_sample_size(FORMAT))
    wf.setframerate(RATE)
    wf.writeframes(b''.join(frames))

# Predict
try:
    audio, sr = librosa.load("temp.wav", sr=16000)
    features = extract_features(audio)
    input_data = features[np.newaxis, ..., np.newaxis]

    prediction = model.predict(input_data)[0][0]
    # print(f"\n Prediction Score: {prediction:.4f}")
    if prediction < 0.9:
        print("Siren Detected!")
    else:
        print("No Siren Detected.")
except Exception as e:
    print("Error in prediction:", e)



Waiting for sound... (Recording 4 seconds)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 163ms/step
Siren Detected!
