# Detect Animal and UrbanSound

## Packages and Parameters

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import librosa
import librosa.display
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_recall_fscore_support
from sklearn.utils.class_weight import compute_class_weight
import seaborn as sns

# 設定參數
BASE_PATH = r"~/data"
SAMPLE_RATE = 44100
MAX_DURATION = 5
MAX_SAMPLES = SAMPLE_RATE * MAX_DURATION
N_MELS = 32
N_FFT = 256
HOP_LENGTH = 64
N_CLASSES = 2
CLASS_MAPPING = {
    "Animals": 0,
    "urban noises": 1
}

## melspectrogram and split data
- extract_melspectrogram 
- shuffled randomly and split train(augment), validation, test data 

In [None]:
def extract_melspectrogram(file_path, augment=False):
    try:
        audio, sr = librosa.load(file_path, sr=SAMPLE_RATE)

        # augment
        if augment:
            # Add noise
            if np.random.random() < 0.3:
                noise = np.random.normal(0, 0.005, audio.shape)
                audio = audio + noise
            
            # Time shift 
            if np.random.random() < 0.3:
                shift = int(np.random.uniform(-0.2, 0.2) * len(audio))
                audio = np.roll(audio, shift)
            
            # Volume adjustment
            if np.random.random() < 0.3:
                audio = audio * np.random.uniform(0.8, 1.2)

        if len(audio) != MAX_SAMPLES:
            print(f"Skipping {file_path}: audio length {len(audio)} does not match required length {MAX_SAMPLES}")
            return None
        
        
        mel_spec = librosa.feature.melspectrogram(
            y=audio, 
            sr=sr, 
            n_fft=N_FFT, 
            hop_length=HOP_LENGTH, 
            n_mels=N_MELS,
        )
        
        mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)      

        mel_spec_db = (mel_spec_db - np.mean(mel_spec_db)) / (np.std(mel_spec_db) + 1e-8)
        
        return mel_spec_db
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None

# load data and subset
def load_data_subset_(file_list, augment_data):
    X_subset = []
    y_subset = []
    file_paths_subset = []
    for file_path, class_label in file_list:
        #  augment_data = True --> augment
        mel_spec = extract_melspectrogram(file_path, augment=augment_data)
        if mel_spec is not None:
            X_subset.append(mel_spec)
            y_subset.append(class_label)
            file_paths_subset.append(file_path)
    return np.array(X_subset), np.array(y_subset), np.array(file_paths_subset)


# load_data and split train, validation, test data 
def load_data(data_dir, random_seed=507):
    import random
    from sklearn.utils import shuffle
    
    np.random.seed(random_seed)
    random.seed(random_seed)    
  
    # file path and label 
    all_files = []
    for class_name in os.listdir(data_dir):
        class_path = os.path.join(data_dir, class_name)
        
        if not os.path.isdir(class_path) or class_name not in CLASS_MAPPING:
            continue
        
        class_label = CLASS_MAPPING[class_name]
        
        for file_name in os.listdir(class_path):
            if file_name.endswith('.wav'):
                file_path = os.path.join(class_path, file_name)
                all_files.append((file_path, class_label))
    
    # shuffled randomly
    random.shuffle(all_files)
    print(f"Found {len(all_files)} audio files, shuffled randomly")
    
    train_val_files, test_files = train_test_split(
        all_files, test_size=0.1, random_state=random_seed)
    
    train_files, val_files = train_test_split(
        train_val_files, test_size=(0.2 / 0.9), random_state=random_seed)
    
    # spilt data 
    print("\nProcessing training data (with augmentation)...")
    X_train, y_train, file_paths_train = load_data_subset_(train_files, augment_data=True)
    print("Processing validation data (without augmentation)...")
    X_val, y_val, file_paths_val = load_data_subset_(val_files, augment_data=False)
    print("Processing test data (without augmentation)...")
    X_test, y_test, file_paths_test = load_data_subset_(test_files, augment_data=False)
    
    # shuffle*2 
    X_train, y_train, file_paths_train = shuffle(X_train, y_train, file_paths_train, random_state=random_seed)
    X_val, y_val, file_paths_val = shuffle(X_val, y_val, file_paths_val, random_state=random_seed)
    X_test, y_test, file_paths_test = shuffle(X_test, y_test, file_paths_test, random_state=random_seed)
    
    # Add a dimension (CNN)
    X_train = X_train[..., np.newaxis]
    X_val = X_val[..., np.newaxis]
    X_test = X_test[..., np.newaxis]
    
    # Display the category distribution of each data set
    print("\n--- Class Distributions ---")
    unique_train, counts_train = np.unique(y_train, return_counts=True)
    print(f"Train set class distribution: {dict(zip(unique_train, counts_train))}")
    unique_val, counts_val = np.unique(y_val, return_counts=True)
    print(f"Validation set class distribution: {dict(zip(unique_val, counts_val))}")
    unique_test, counts_test = np.unique(y_test, return_counts=True)
    print(f"Test set class distribution: {dict(zip(unique_test, counts_test))}")

    return (X_train, y_train, file_paths_train,
            X_val, y_val, file_paths_val,
            X_test, y_test, file_paths_test)

## Balanced Batch Generator

In [None]:
class BalancedBatchGenerator(tf.keras.utils.Sequence):
    def __init__(self, X, y, batch_size=32, shuffle=True):
        self.X = X
        self.y = y
        self.batch_size = batch_size
        self.shuffle = shuffle
        
        # Separate different categories 
        self.class_0_indices = np.where(y == 0)[0]
        self.class_1_indices = np.where(y == 1)[0]
        
        # The number of samples in each category in the batch
        self.samples_per_class = batch_size
        
        # Calculate the total batch size (based on the fewer categories)
        min_samples = min(len(self.class_0_indices), len(self.class_1_indices))
        self.n_batches = min_samples // self.samples_per_class
        
        print(f"Balanced batch generator created:")
        print(f"  Class 0 samples: {len(self.class_0_indices)}")
        print(f"  Class 1 samples: {len(self.class_1_indices)}")
        print(f"  Samples per class per batch: {self.samples_per_class}")
        print(f"  Total batches: {self.n_batches}")
        
        # shuffled index
        self.shuffled_class_0_indices = np.random.permutation(self.class_0_indices)
        self.shuffled_class_1_indices = np.random.permutation(self.class_1_indices)
    
    def __len__(self):
        return self.n_batches
    
    def __getitem__(self, index):
        # Take samples from each category
        start_idx = index * self.samples_per_class
        end_idx = (index + 1) * self.samples_per_class
        
        batch_indices_0 = self.shuffled_class_0_indices[start_idx:end_idx]
        batch_indices_1 = self.shuffled_class_1_indices[start_idx:end_idx]
        
        # concatenate and shuffle
        batch_indices = np.concatenate([batch_indices_0, batch_indices_1])
        np.random.shuffle(batch_indices)
        
        return self.X[batch_indices], self.y[batch_indices]
    
    def on_epoch_end(self):
        if self.shuffle:
            self.shuffled_class_0_indices = np.random.permutation(self.class_0_indices)
            self.shuffled_class_1_indices = np.random.permutation(self.class_1_indices)

## Build model 

In [None]:
def build_model(input_shape):
    model = models.Sequential([
        # 1 convolutional block
        layers.Conv2D(64, (3, 3), activation='relu', padding='same', 
                     input_shape=input_shape,
                     kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Conv2D(64, (3, 3), activation='relu', padding='same',
                     kernel_regularizer=regularizers.l2(0.001)),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # 2 convolutional block
        layers.Conv2D(128, (3, 3), activation='relu', padding='same',
                     kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Conv2D(64, (3, 3), activation='relu', padding='same',
                     kernel_regularizer=regularizers.l2(0.001)),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25), 
        
        # Dense
        layers.GlobalAveragePooling2D(),  
        layers.Dense(64, activation='relu',
                    kernel_regularizer=regularizers.l2(0.001)),
        layers.Dropout(0.5),
        layers.Dense(32, activation='relu',
                    kernel_regularizer=regularizers.l2(0.001)),
        layers.Dropout(0.5),
        layers.Dense(N_CLASSES, activation='softmax')
    ])
    
    # model.compile
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

## Train model

In [None]:
def train_model():
    # GPU 
    physical_devices = tf.config.list_physical_devices('GPU')
    if len(physical_devices) > 0:
        tf.config.experimental.set_memory_growth(physical_devices[0], True)
    
    print("Loading data ...")
    X_train, y_train, file_paths_train, X_val, y_val, file_paths_val,X_test, y_test, file_paths_test= load_data(BASE_PATH)
            
    
    # Batch Generator
    train_generator = BalancedBatchGenerator(X_train, y_train, batch_size=5, shuffle=True)
    val_generator = BalancedBatchGenerator(X_val, y_val, batch_size=5, shuffle=False)
    
    # compute class weight
    class_weights = compute_class_weight('balanced', 
                                       classes=np.unique(y_train), 
                                       y=y_train)
    class_weight_dict = dict(enumerate(class_weights))
    print(f"Class weights: {class_weight_dict}")
    
    # build model
    input_shape = X_train.shape[1:]
    model = build_model(input_shape)
    model.summary()
    
    # Callback Function
    callbacks = [
        ModelCheckpoint('best_audio_model_improved.h5', 
                       monitor='val_loss', 
                       mode='min', 
                       save_best_only=True, 
                       verbose=1,
                       save_weights_only=False),
        EarlyStopping(monitor='val_loss', 
                     patience=10, 
                     restore_best_weights=True, 
                     verbose=1),
        ReduceLROnPlateau(monitor='val_loss', 
                         factor=0.5, 
                         patience=7, 
                         min_lr=1e-7, 
                         verbose=1)
    ]
    
    history = model.fit(
        train_generator,
        epochs=30,
        validation_data=val_generator,
        callbacks=callbacks,
        class_weight=class_weight_dict,  # class weight
        verbose=1
    )
    
    return model, history, X_test, y_test, file_paths_test

## Evaluate model

In [None]:
def evaluate_model(model, X_test, y_test, class_names):

    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    
    accuracy = accuracy_score(y_test, y_pred_classes)
    precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred_classes, average='weighted')
    
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-Score: {f1:.4f}")
    
    # Confusion matrix
    cm = confusion_matrix(y_test, y_pred_classes)
    
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Model - Confusion Matrix')
    plt.tight_layout()
    plt.savefig('confusion_matrix.png', dpi=300)
    plt.show()
    
    # classification_report
    print("\nDetailed Classification Report:")
    print(classification_report(y_test, y_pred_classes, target_names=class_names))

def plot_history(history):
    fig, axs = plt.subplots(1, 2, figsize=(15, 5))
    
    # Accuracy
    axs[0].plot(history.history['accuracy'], label='Train Accuracy')
    axs[0].plot(history.history['val_accuracy'], label='Validation Accuracy')
    axs[0].set_title('Model Accuracy')
    axs[0].set_ylabel('Accuracy')
    axs[0].set_xlabel('Epoch')
    axs[0].legend()
    axs[0].grid(True)
    
    # Loss
    axs[1].plot(history.history['loss'], label='Train Loss')
    axs[1].plot(history.history['val_loss'], label='Validation Loss')
    axs[1].set_title('Model Loss')
    axs[1].set_ylabel('Loss')
    axs[1].set_xlabel('Epoch')
    axs[1].legend()
    axs[1].grid(True)
    
    plt.tight_layout()
    plt.savefig('training_history.png', dpi=300)
    plt.show()

## Main function 

In [None]:
def main():
    model, history, X_test, y_test, test_paths = train_model()    

    plot_history(history)    

    class_names = list(CLASS_MAPPING.keys())
    evaluate_model(model, X_test, y_test, class_names)
    
    # save model
    model.save('audio_classification_model.h5')
    print("mproved model saved!")