In [235]:
import os
import logging
import numpy as np
import pandas as pd
import librosa
import librosa.display
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns
import json

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader


In [236]:
def setup_logging():
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

setup_logging()
logger = logging.getLogger("Main")

output_dir = "../output/task2a"
os.makedirs(output_dir, exist_ok=True)
metadata_path = "../data/UrbanSound8K/metadata/UrbanSound8K.csv"
audio_dir = "../data/UrbanSound8K/audio"

In [237]:
def load_metadata(metadata_path):
    metadata = pd.read_csv(metadata_path)
    logger.info("Metadata loaded successfully.")
    return metadata

metadata = load_metadata(metadata_path)

le = LabelEncoder()
metadata["encoded_class"] = le.fit_transform(metadata["class"])
logger.info(f"Classes found: {list(le.classes_)}")


train_data, test_data = train_test_split(metadata, test_size=0.2, random_state=42, stratify=metadata["encoded_class"])
logger.info(f"Train samples: {len(train_data)}, Test samples: {len(test_data)}")

2025-02-02 22:18:54,373 - INFO - Metadata loaded successfully.
2025-02-02 22:18:54,374 - INFO - Classes found: ['air_conditioner', 'car_horn', 'children_playing', 'dog_bark', 'drilling', 'engine_idling', 'gun_shot', 'jackhammer', 'siren', 'street_music']
2025-02-02 22:18:54,377 - INFO - Train samples: 6985, Test samples: 1747


In [238]:
def hann_window(N):
    n = np.arange(N)
    return 0.5 * (1 - np.cos(2 * np.pi * n / (N - 1)))

def hamming_window(N):
    n = np.arange(N)
    return 0.54 - 0.46 * np.cos(2 * np.pi * n / (N - 1))

def rectangular_window(N):
    return np.ones(N)

In [239]:
def plot_window_functions(window_funcs, N=1024, output_path=os.path.join(output_dir, "window_functions.png")):

    plt.figure(figsize=(10, 6))
    x = np.arange(N)
    for name, func in window_funcs:
        w = func(N)
        plt.plot(x, w, label=name)
    plt.title("Window Functions")
    plt.xlabel("Sample")
    plt.ylabel("Amplitude")
    plt.legend()
    plt.grid(True)
    plt.savefig(output_path)
    plt.close()
    logger.info(f"Window functions plot saved to {output_path}")

plot_window_functions([("Hann", hann_window), ("Hamming", hamming_window), ("Rectangular", rectangular_window)], N=1024)

2025-02-02 22:18:54,478 - INFO - Window functions plot saved to ../output/task2a/window_functions.png


In [243]:
def compute_stft(y, window_func, N=1024, hop_length=512):
    window = window_func(N)
    return librosa.stft(y, n_fft=N, hop_length=hop_length, window=window)


def extract_features(file_path, window_func, N=1024, hop_length=256, n_mfcc=40):

    try:
        y, sr = librosa.load(file_path, sr=None)
    except Exception as e:
        logger.error(f"Error loading {file_path}: {e}")
        return None
    D = compute_stft(y, window_func, N, hop_length)
    S = np.abs(D)
    
    mfcc = librosa.feature.mfcc(S=librosa.power_to_db(S**2), sr=sr, n_mfcc=n_mfcc)
    return np.mean(mfcc.T, axis=0)

def process_features(data, window_func, N=1024, hop_length=512, n_mfcc=40):

    features = []
    labels = []
    for _, row in tqdm(data.iterrows(), total=len(data), desc="Extracting features"):
        file_path = os.path.join(audio_dir, f"fold{row['fold']}", row["slice_file_name"])
        feat = extract_features(file_path, window_func, N, hop_length, n_mfcc)
        if feat is not None:
            features.append(feat)
            labels.append(row["encoded_class"])
    return np.array(features), np.array(labels)



def estimate_noise_floor(signal, frame_length=1024, hop_length=512):
    
    frames = librosa.util.frame(signal, frame_length=frame_length, hop_length=hop_length)
    
    frame_power = np.mean(frames**2, axis=0)
    
    noise_floor = np.min(frame_power)
    return noise_floor

def compute_snr_comparison(signal, window_func, N):  
    noise_floor = estimate_noise_floor(signal)
    
    signal_power = np.mean(signal ** 2)
    original_snr = 10 * np.log10(signal_power / noise_floor) if noise_floor > 0 else np.inf
    
    
    window = window_func(len(signal))
    windowed_signal = signal * window
    
    
    windowed_power = np.mean(windowed_signal ** 2)
    windowed_snr = 10 * np.log10(windowed_power / noise_floor) if noise_floor > 0 else np.inf
    
    return original_snr, windowed_snr

def compute_rmse(original, windowed):

    scale_factor = np.sqrt(np.sum(original**2) / np.sum(windowed**2))
    windowed_normalized = windowed * scale_factor
    return np.sqrt(np.mean((original - windowed_normalized) ** 2))

def compute_cumulative_error(original, windowed):
    
    time_error = np.sum(np.abs(original - windowed))
    
    orig_fft = np.fft.fft(original)
    wind_fft = np.fft.fft(windowed)
    freq_error = np.sum(np.abs(np.abs(orig_fft) - np.abs(wind_fft)))
    
    return {
        'time_domain_error': time_error,
        'frequency_domain_error': freq_error,
        'total_error': time_error + freq_error
    }


def compute_spectral_leakage(window_func, N, fs=22050):
    
    t = np.arange(N) / fs
    bin_spacing = fs / N
    freq = 5.5 * bin_spacing  
    x = np.sin(2 * np.pi * freq * t)

    w = window_func(N)
    w /= np.sum(w)  
    xw = x * w
    
    X = np.fft.fft(xw, n=8 * N)
    X_mag = np.abs(X) ** 2 

    peak_index = np.argmax(X_mag)
    n_bins = X_mag.size    
    
    main_lobe_start = max(0, peak_index - 5)
    main_lobe_end = min(n_bins, peak_index + 5)
    
    main_lobe_energy = np.sum(X_mag[main_lobe_start:main_lobe_end])
    total_energy = np.sum(X_mag)
    leakage = 1 - (main_lobe_energy / total_energy)
    
    return leakage


def compute_window_metrics(file_path, window_func, N, fs_target=None):

    x, sr = librosa.load(file_path, sr=None)
    if fs_target is None:
        fs_target = sr
    
    
    if len(x) < N:
        x = np.pad(x, (0, N - len(x)), mode='constant')
    else:
        x = x[:N]
    
    w = window_func(N)
    xw = x * w
    
    original_snr, windowed_snr = compute_snr_comparison(x, window_func, N)
    snr_difference = original_snr - windowed_snr
    
    
    rmse_value = compute_rmse(x, xw)
    
    
    error_metrics = compute_cumulative_error(x, xw)
    
    
    leakage = compute_spectral_leakage(window_func, N, fs=fs_target)
    
    
    time_resolution = N / fs_target
    frequency_resolution = fs_target / N
    
    return {
        'original_snr': original_snr,
        'windowed_snr': windowed_snr,
        'snr_difference': snr_difference,
        'rmse': rmse_value,
        'cumulative_error': error_metrics,
        'spectral_leakage': leakage,
        'time_resolution': time_resolution,
        'frequency_resolution': frequency_resolution
    }


#sample for plottign the spectrogram and mfcc
sample_row = train_data.iloc[0]
sample_audio_path = os.path.join(audio_dir, f"fold{sample_row['fold']}", sample_row["slice_file_name"])


def plot_spectrogram(file_path, window_func, N=1024, hop_length=512, output_path="spectrogram.png"):

    y, sr = librosa.load(file_path, sr=None)
    D = compute_stft(y, window_func, N, hop_length)
    S_db = librosa.amplitude_to_db(np.abs(D), ref=np.max)
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(S_db, sr=sr, hop_length=hop_length, x_axis="time", y_axis="log", cmap='viridis')
    plt.colorbar(format="%+2.0f dB")
    plt.title(f"Spectrogram (N={N}, hop_length={hop_length})")
    plt.tight_layout()
    plt.savefig(output_path)
    plt.close()
    logger.info(f"Spectrogram saved to {output_path}")




def plot_audio_analysis(file_path, window_func, N=1024, hop_length=512, n_mfcc=40, output_path="audio_analysis.png"):
    
    y, sr = librosa.load(file_path, sr=None)
    D = compute_stft(y, window_func, N, hop_length)
    S_db = librosa.amplitude_to_db(np.abs(D), ref=np.max)
    mfcc = librosa.feature.mfcc(S=librosa.power_to_db(np.abs(D)**2), sr=sr, n_mfcc=n_mfcc)
    
    fig, ax = plt.subplots(1, 2, figsize=(16, 6))
    
    
    img1 = librosa.display.specshow(S_db, sr=sr, hop_length=hop_length, x_axis="time", y_axis="log", ax=ax[0], cmap='viridis')
    ax[0].set_title(f"Spectrogram (N={N}, hop_length={hop_length})")
    fig.colorbar(img1, ax=ax[0], format="%+2.0f dB")
    
    
    img2 = librosa.display.specshow(mfcc, sr=sr, x_axis="time", ax=ax[1], cmap='viridis')
    ax[1].set_title(f"MFCC (n_mfcc={n_mfcc})")
    fig.colorbar(img2, ax=ax[1])
    
    plt.tight_layout()
    plt.savefig(output_path)
    plt.close()
    logger.info(f"Audio analysis plot saved to {output_path}")


plot_audio_analysis(sample_audio_path, hann_window, N=1024, hop_length=512, n_mfcc=40, output_path=os.path.join(output_dir, "audio_analysis.png"))
plot_spectrogram(sample_audio_path, hann_window, N=1024, hop_length=512, output_path=os.path.join(output_dir, "spectrogram.png"))

2025-02-02 22:40:04,245 - INFO - Audio analysis plot saved to ../output/task2a/audio_analysis.png
2025-02-02 22:40:04,510 - INFO - Spectrogram saved to ../output/task2a/spectrogram.png


In [241]:
class AudioClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(AudioClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(64, 32)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(32, num_classes)
        
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.fc3(x)
        return x


def plot_training_curves(train_losses, val_losses, window_name, output_dir, params_str):
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label="Training Loss")
    plt.plot(val_losses, label="Validation Loss")
    plt.title(f"Loss Curves - {window_name}\n{params_str}")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    output_path = os.path.join(output_dir, f"loss_curve_{window_name}_{params_str}.png")
    plt.savefig(output_path)
    plt.close()
    logger.info(f"Loss curve saved to {output_path}")


def train_and_evaluate_nn(X_train, X_test, y_train, y_test, window_name,
                          num_epochs=20, batch_size=32, learning_rate=0.001, val_split=0.2):
    logger.info(f"Training and evaluating NN model for {window_name}")
    
    
    X_train_final, X_val, y_train_final, y_val = train_test_split(X_train, y_train, test_size=val_split, random_state=42, stratify=y_train)
    
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    X_train_tensor = torch.tensor(X_train_final, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train_final, dtype=torch.long)
    X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val, dtype=torch.long)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)
    
    
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    
    input_dim = X_train.shape[1]
    num_classes = len(np.unique(y_train))
    model = AudioClassifier(input_dim, num_classes).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    train_losses = []
    val_losses = []
    
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for batch_features, batch_labels in train_loader:
            batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
            optimizer.zero_grad()
            outputs = model(batch_features)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * batch_features.size(0)
        epoch_train_loss = running_loss / len(train_loader.dataset)
        train_losses.append(epoch_train_loss)
        
        
        model.eval()
        running_val_loss = 0.0
        with torch.no_grad():
            for batch_features, batch_labels in val_loader:
                batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
                outputs = model(batch_features)
                loss = criterion(outputs, batch_labels)
                running_val_loss += loss.item() * batch_features.size(0)
        epoch_val_loss = running_val_loss / len(val_loader.dataset)
        val_losses.append(epoch_val_loss)
        
        logger.info(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {epoch_train_loss:.4f}, Val Loss: {epoch_val_loss:.4f}")
    
    
    params_str = f"epochs{num_epochs}_bs{batch_size}"
    plot_training_curves(train_losses, val_losses, window_name, output_dir, params_str)
    
    
    model.eval()
    with torch.no_grad():
        outputs = model(X_test_tensor.to(device))
        _, predicted = torch.max(outputs, 1)
    y_pred = predicted.cpu().numpy()
    acc = accuracy_score(y_test, y_pred)
    logger.info(f"Test Accuracy with {window_name}: {acc:.4f}")
    report = classification_report(y_test, y_pred)
    logger.info(f"Classification Report for {window_name}:\n{report}")
    
    #confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=le.classes_, yticklabels=le.classes_)
    plt.title(f"Confusion Matrix - {window_name}")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    cm_path = os.path.join(output_dir, f"confusion_matrix_{window_name}.png")
    plt.savefig(cm_path)
    plt.close()
    logger.info(f"Confusion matrix saved to {cm_path}")
    
    return model, acc, json.dumps(report)



In [242]:

results = []

N_list = [1024]
hop_length_list = [512]
n_mfcc_list = [40]

window_functions = [("Hann", hann_window), ("Hamming", hamming_window), ("Rectangular", rectangular_window)]


for window_name, window_func in window_functions:
    for N_val in N_list:
        for hop_val in hop_length_list:
            for n_mfcc_val in n_mfcc_list:
                current_config = f"N{N_val}_hop{hop_val}_mfcc{n_mfcc_val}"
                current_window_name = f"{window_name}_{current_config}"
                logger.info(f"Evaluating: {current_window_name}")
                
                
                metrics = compute_window_metrics(sample_audio_path, window_func, N_val)
                
                snr_val = metrics['snr_difference']
                rmse_val = metrics['rmse']
                leakage_val = metrics['spectral_leakage']
                time_res  = metrics['time_resolution']
                freq_res  = metrics['frequency_resolution']
                
                logger.info(f"Metrics for {current_window_name} -- SNR Diff: {snr_val:.2f} dB, RMSE: {rmse_val:.4f}, " +
                            f"Spectral Leakage: {leakage_val:.4f}, Time Res: {time_res:.4f}s, Freq Res: {freq_res:.2f}Hz")
                
                
                X_train_features, y_train_labels = process_features(train_data, window_func, N=N_val, hop_length=hop_val, n_mfcc=n_mfcc_val)
                X_test_features, y_test_labels = process_features(test_data, window_func, N=N_val, hop_length=hop_val, n_mfcc=n_mfcc_val)
                
                
                model, acc, report_json = train_and_evaluate_nn(
                    X_train_features, X_test_features, y_train_labels, y_test_labels,
                    window_name=current_window_name,
                    num_epochs=20, batch_size=32, learning_rate=0.001
                )
                
                results.append({
                    "Window": window_name,
                    "N": N_val,
                    "hop_length": hop_val,
                    "n_mfcc": n_mfcc_val,
                    "SNR (dB)": snr_val,
                    "RMSE": rmse_val,
                    "Spectral Leakage": leakage_val,
                    "Time Resolution (s)": time_res,
                    "Frequency Resolution (Hz)": freq_res,
                    "Test Accuracy": acc,
                    "Report": report_json
                })
                
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()


results_df = pd.DataFrame(results)
results_csv_path = os.path.join(output_dir, "parameter_experiment_results.csv")
results_df.to_csv(results_csv_path, index=False)
logger.info(f"Experiment results saved to {results_csv_path}")


2025-02-02 22:18:55,210 - INFO - Evaluating: Hann_N1024_hop512_mfcc40
2025-02-02 22:18:55,215 - INFO - Metrics for Hann_N1024_hop512_mfcc40 -- SNR Diff: 3.30 dB, RMSE: 0.0202, Spectral Leakage: 0.6458, Time Res: 0.0213s, Freq Res: 46.88Hz
Extracting features: 100%|██████████| 6985/6985 [00:43<00:00, 160.77it/s]
Extracting features: 100%|██████████| 1747/1747 [00:10<00:00, 165.12it/s]
2025-02-02 22:19:49,249 - INFO - Training and evaluating NN model for Hann_N1024_hop512_mfcc40
2025-02-02 22:19:51,559 - INFO - Epoch [1/20], Train Loss: 2.4560, Val Loss: 1.6460
2025-02-02 22:19:53,737 - INFO - Epoch [2/20], Train Loss: 1.5586, Val Loss: 1.5123
2025-02-02 22:19:55,975 - INFO - Epoch [3/20], Train Loss: 1.4270, Val Loss: 1.3554
2025-02-02 22:19:58,202 - INFO - Epoch [4/20], Train Loss: 1.3151, Val Loss: 1.3953
2025-02-02 22:20:00,389 - INFO - Epoch [5/20], Train Loss: 1.2674, Val Loss: 1.2480
2025-02-02 22:20:02,618 - INFO - Epoch [6/20], Train Loss: 1.2073, Val Loss: 1.1519
2025-02-02 22: