# Import and Hyperparameters

---



In [None]:
import os
import random
import pickle
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import librosa
import librosa.display
import tensorflow as tf

from collections import Counter
from functools import partial
from sklearn.metrics import (accuracy_score, recall_score, f1_score,
                             precision_score, classification_report, confusion_matrix)
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight

# Keras Imports
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (Dense, Flatten, Conv1D, GlobalAveragePooling1D,
                                     LayerNormalization, Input, MaxPooling1D, Multiply,
                                     Reshape, Permute, Add, Activation, UpSampling1D)
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.regularizers import l2

In [None]:
def set_global_determinism(seed=42):
    """
    Sets seeds for all random number generators to ensure reproducible results.
    """
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
    print(f"Global seed set to: {seed}")

SEED = 42
set_global_determinism(SEED)

In [None]:
# Assumption: The 'data' folder is in the same directory as this notebook in the Git repo
BASE_DIR = './data'

TRAIN_DIR = os.path.join(BASE_DIR, 'TRAIN')
TEST_DIR = os.path.join(BASE_DIR, 'TEST')
VAL_DIR = os.path.join(BASE_DIR, 'VALIDATION')

# Where to save the trained model and logs locally
OUTPUT_DIR = './output'
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

In [None]:
SAMPLING_RATE = 48000
INPUT_SHAPE = (SAMPLING_RATE, 1) # 1 second audio
BATCH_SIZE = 32
epochs = 20
LEARNING_RATE = 0.001
NUM_CLASSES = 2

In [None]:
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True,
    verbose=1
)

model_checkpoint = ModelCheckpoint(
    os.path.join(WEIGHTS_DIR, 'best_model.keras'),
    monitor='val_loss',
    save_best_only=True,
    verbose=1
)

In [None]:
# ==========================================
# 6. Helper Functions
# ==========================================
def save_training_history(history, filename):
    """Saves training history (loss/acc) for later plotting."""
    file_path = os.path.join(OUTPUT_DIR, filename)
    with open(file_path, 'wb') as f:
        pickle.dump(history.history, f)
    print(f"History saved to {file_path}")

Mounted at /content/drive


# Preprocessing & Augmentation Functions

---



In [None]:
# ==========================================
# 1. Data Augmentation Utilities (Noise Injection)
# ==========================================

def load_noise_file(file_path):
    """
    Loads and decodes the full background noise file into memory.
    This should be called once outside the dataset pipeline.
    """
    # Read and decode the WAV file
    audio_binary = tf.io.read_file(file_path)
    # Decode wav - assuming mono channel for noise
    audio, _ = tf.audio.decode_wav(audio_binary, desired_channels=1, desired_samples=-1)
    # Remove the channel dimension (N, 1) -> (N,)
    return tf.squeeze(audio, axis=-1)

def add_background_noise(clean_audio, label, noise_tensor, target_snr):
    """
    Injects a random segment of background noise into the clean audio based on a target SNR.

    Args:
        clean_audio: Tensor of shape (samples,)
        label: The associated class label
        noise_tensor: The full pre-loaded noise tensor
        target_snr: Linear Signal-to-Noise Ratio (ratio of powers, not dB)

    Returns:
        noisy_audio: Audio tensor with added noise, clipped to [-1, 1]
        label: Unchanged label
    """
    # Ensure inputs are float32
    clean_audio = tf.cast(clean_audio, tf.float32)
    noise_tensor = tf.cast(noise_tensor, tf.float32)

    sample_length = tf.shape(clean_audio)[0]
    noise_length = tf.shape(noise_tensor)[0]

    # 1. Select a random noise segment matching the audio length
    # Note: Assumes noise file is longer than the audio sample
    max_offset = noise_length - sample_length

    # Safety check: if noise is shorter, repeat it (optional, but good for stability)
    # For now, we assume noise_length > sample_length as per your setup
    random_offset = tf.random.uniform(shape=(), minval=0, maxval=max_offset, dtype=tf.int32)
    noise_segment = tf.slice(noise_tensor, [random_offset], [sample_length])

    # 2. Calculate signal and noise power
    signal_power = tf.reduce_mean(tf.square(clean_audio))
    noise_power = tf.reduce_mean(tf.square(noise_segment))

    # 3. Calculate scaling factor for the noise to achieve target SNR
    # Formula: SNR = P_signal / P_noise_scaled
    # Therefore: P_noise_scaled = P_signal / SNR
    # scale = sqrt(P_noise_scaled / P_noise_original)
    epsilon = 1e-10  # Avoid division by zero
    target_noise_power = signal_power / (target_snr + epsilon)
    scale_factor = tf.sqrt(target_noise_power / (noise_power + epsilon))

    # 4. Add weighted noise
    noisy_audio = clean_audio + (noise_segment * scale_factor)

    # 5. Clip to valid audio range [-1.0, 1.0]
    noisy_audio = tf.clip_by_value(noisy_audio, -1.0, 1.0)

    return noisy_audio, label

In [None]:
# Load the background noise file into memory once
background_noise = load_noise_file(NOISE_FILE_PATH)

# Create a partial function with fixed noise and SNR arguments
# This prepares the function to be mapped over the dataset
add_noise_map_fn = partial(
    add_background_noise,
    noise_tensor=background_noise,
    target_snr=SNR
)

# Data Loading Pipeline
---

In [None]:
def load_data(root_dir):
    """
    Scans the directory for .wav files and assigns labels based on folder names.
    """
    file_paths = []
    labels = []

    # Check if directory exists to avoid errors
    if not os.path.exists(root_dir):
        print(f"Warning: Directory not found: {root_dir}")
        return [], [], []

    # Iterate over classes (folders)
    for class_name in sorted(os.listdir(root_dir)): # Added sorted for consistency
        class_path = os.path.join(root_dir, class_name)

        if os.path.isdir(class_path):
            for subdir, dirs, files in os.walk(class_path):
                for file in files:
                    if file.endswith(".wav"):
                        file_path = os.path.join(subdir, file)
                        file_paths.append(file_path)
                        labels.append(class_name)

    # Convert labels to categorical indices
    unique_labels = sorted(set(labels))
    label_to_index = {label: index for index, label in enumerate(unique_labels)}
    labels = [label_to_index[label] for label in labels]

    return file_paths, labels, unique_labels

In [None]:
def preprocess_audio(file_path):
    """
    Reads a WAV file and decodes it into a normalized tensor.
    """
    audio_binary = tf.io.read_file(file_path)
    # Using the global SAMPLING_RATE defined in config
    audio, _ = tf.audio.decode_wav(audio_binary, desired_channels=1, desired_samples=SAMPLING_RATE)
    audio = tf.squeeze(audio, axis=-1)  # Remove the channel dimension (N, 1) -> (N,)
    return audio

# Function to load the audio files and their labels
def load_audio_and_label(file_path, label):
    """
    Wrapper function to map file paths to audio tensors and labels.
    """
    audio = preprocess_audio(file_path)
    return audio, label

In [None]:
# Load file paths and labels
train_paths, train_labels, class_names = load_data(TRAIN_DIR)
val_paths, val_labels, _ = load_data(VAL_DIR)

# Calculate class weights to address dataset imbalance
# Note: Uses 'balanced' mode to automatically adjust weights inversely proportional to class frequencies
class_weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(train_labels),
    y=train_labels
)
class_weight_dict = dict(enumerate(class_weights))

print(f"Class Weights: {class_weight_dict}")

In [None]:
# Convert integer labels to One-Hot Encoding
y_train = tf.keras.utils.to_categorical(train_labels, num_classes=NUM_CLASSES)
y_val = tf.keras.utils.to_categorical(val_labels, num_classes=NUM_CLASSES)

In [None]:

# TOGGLE THIS FLAG to switch between "Noise Robustness" and "Clean" experiments
USE_NOISE_AUGMENTATION = True

print(f"Building Dataset... (Noise Augmentation: {USE_NOISE_AUGMENTATION})")

# 1. Create Base Dataset from paths and labels
train_dataset = tf.data.Dataset.from_tensor_slices((train_paths, y_train))

# 2. Load and Preprocess Audio
train_dataset = train_dataset.map(load_audio_and_label, num_parallel_calls=tf.data.AUTOTUNE)

# 3. Apply Noise Augmentation (Conditionally)
if USE_NOISE_AUGMENTATION:
    print("Log: Injecting background noise into training data.")
    # Note: add_noise_map_fn must be defined in previous steps
    train_dataset = train_dataset.map(add_noise_map_fn, num_parallel_calls=tf.data.AUTOTUNE)

# 4. Shuffle, Batch, and Prefetch
train_dataset = train_dataset.shuffle(buffer_size=len(train_paths))
train_dataset = train_dataset.batch(BATCH_SIZE)
train_dataset = train_dataset.cache()
train_dataset = train_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

# ==========================================
# Validation Set (Always Clean)
# ==========================================
validation_dataset = tf.data.Dataset.from_tensor_slices((val_paths, y_val))
validation_dataset = validation_dataset.map(load_audio_and_label, num_parallel_calls=tf.data.AUTOTUNE)
validation_dataset = validation_dataset.batch(BATCH_SIZE)
validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

print("Datasets ready.")

# Model Architecture & Trainin
 ---




In [None]:
# ==========================================
# Model Definitions
# ==========================================

def build_efficient_cnn(input_shape, num_classes):
    """
    Standard Efficient-CNN baseline.
    """
    model = Sequential([
        Conv1D(16, 3, activation='relu', input_shape=input_shape),
        LayerNormalization(),
        MaxPooling1D(2),

        Conv1D(16, 3, activation='relu'),
        MaxPooling1D(2),

        GlobalAveragePooling1D(),
        Dense(32, activation='relu'),
        Dense(16, activation='relu'),
        Dense(num_classes, activation='softmax')
    ], name="Efficient_CNN")
    return model


def build_dilated_cnn(input_shape, num_classes):
    """
    Dilated-CNN to capture wider receptive fields.
    """
    model = Sequential([
        Conv1D(16, 3, activation='relu', input_shape=input_shape, padding='same'),
        LayerNormalization(),
        MaxPooling1D(2),

        # Dilation blocks
        Conv1D(32, 3, activation='relu', padding='same', dilation_rate=2),
        LayerNormalization(),

        Conv1D(32, 3, activation='relu', padding='same', dilation_rate=4),
        LayerNormalization(),
        MaxPooling1D(2),

        GlobalAveragePooling1D(),
        Dense(32, activation='relu'),
        Dense(16, activation='relu'),
        Dense(num_classes, activation='softmax')
    ], name="Dilated_CNN")
    return model


def build_se_cnn(input_shape, num_classes, reduction_ratio=4):
    """
    Proposed SE-CNN (Squeeze-and-Excitation) Architecture.
    """
    input_tensor = Input(shape=input_shape)

    # First Conv Block
    x = Conv1D(32, 3, activation='relu', padding='same')(input_tensor)
    x = LayerNormalization()(x)
    x = MaxPooling1D(2)(x)

    # --- SE Block Start ---
    input_features = x
    num_channels = input_features.shape[-1]

    # Squeeze: Global Information Embedding
    se_branch = GlobalAveragePooling1D()(input_features)

    # Excitation: Adaptive Recalibration
    se_branch = Dense(num_channels // reduction_ratio, activation='relu')(se_branch)
    se_branch = Dense(num_channels, activation='sigmoid')(se_branch)
    se_branch = Reshape((1, num_channels))(se_branch)

    # Scale: Re-weighting
    x = Multiply()([input_features, se_branch])
    # --- SE Block End ---

    # Second Conv Block
    x = Conv1D(32, 3, activation='relu', padding='same')(x)
    x = LayerNormalization()(x)
    x = MaxPooling1D(2)(x)

    # Classification Head
    x = GlobalAveragePooling1D()(x)
    x = Dense(32, activation='relu')(x)
    x = Dense(16, activation='relu')(x)
    output_tensor = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=input_tensor, outputs=output_tensor, name="SE_CNN_Ours")
    return model

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [None]:
# Select the model architecture here: 'Efficient', 'Dilated', or 'SE'
MODEL_TYPE = 'SE'

if MODEL_TYPE == 'Efficient':
    model = build_efficient_cnn(INPUT_SHAPE, NUM_CLASSES)
elif MODEL_TYPE == 'Dilated':
    model = build_dilated_cnn(INPUT_SHAPE, NUM_CLASSES)
elif MODEL_TYPE == 'SE':
    model = build_se_cnn(INPUT_SHAPE, NUM_CLASSES, reduction_ratio=4)
else:
    raise ValueError("Unknown Model Type")

model.summary()

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss='categorical_crossentropy',
    metrics=['accuracy', tf.keras.metrics.Recall(name='recall'), tf.keras.metrics.Precision(name='precision')]
)

In [None]:
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS,
    class_weight=class_weight_dict,
    callbacks=[early_stopping, model_checkpoint],
    verbose=1
)


In [None]:
model.save_weights(os.path.join(weights_save_dir, 'for_deployment.weights.h5'))
save_training_history(history, os.path.join(history_save_dir, 'for_deployment._history.pkl'))
save_training_history(history, 'training_history.pkl')

# Model Evaluation
---

In [None]:
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix

def plot_training_history(history):
    """
    Plots the Loss and Accuracy curves for training and validation.
    """
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

    # Loss Plot
    ax1.plot(history.history['loss'], label='Train Loss', color='blue')
    ax1.plot(history.history['val_loss'], label='Val Loss', color='red', linestyle='--')
    ax1.set_title('Loss Curve')
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Accuracy Plot
    ax2.plot(history.history['accuracy'], label='Train Acc', color='blue')
    ax2.plot(history.history['val_accuracy'], label='Val Acc', color='red', linestyle='--')
    ax2.set_title('Accuracy Curve')
    ax2.set_xlabel('Epochs')
    ax2.set_ylabel('Accuracy')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

def evaluate_and_report(model, dataset, class_names):
    """
    Runs inference, calculates metrics, and displays confusion matrix.
    """
    print("Running inference on Test Set...")

    # 1. Get Predictions
    y_true = []
    y_pred = []

    # Iterate over the dataset (unbatching to get sample-by-sample)
    for audio_batch, label_batch in dataset:
        preds = model.predict(audio_batch, verbose=0)
        y_pred.extend(np.argmax(preds, axis=1))
        y_true.extend(np.argmax(label_batch.numpy(), axis=1))

    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    # 2. Print Classification Report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=class_names, digits=4))

    # 3. Plot Confusion Matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')
    plt.show()

# ==========================================
# Execute Evaluation
# ==========================================
plot_training_history(history)

test_dataset = tf.data.Dataset.from_tensor_slices((test_paths, tf.keras.utils.to_categorical(test_labels, NUM_CLASSES)))
test_dataset = test_dataset.map(load_audio_and_label).batch(BATCH_SIZE).cache()

evaluate_and_report(model, test_dataset, class_names)

Report saved to /content/model_report.docx
