In [97]:
import numpy as np
from scipy.fft import fft, ifft
import os
import librosa
from scipy.signal import convolve

In [98]:
def deconvolve_signals(input_signals, output_signals, method="fft", regularization=None, alpha=1e-3):
    """
    Perform deconvolution for n input signals and n corresponding output signals.

    Parameters:
        input_signals (list of np.ndarray): List of n input signals.
        output_signals (list of np.ndarray): List of n corresponding output signals.
        method (str): Deconvolution method ("fft" for frequency-domain, "wiener" for Wiener deconvolution).
        regularization (str): Type of regularization (e.g., None or "wiener").
        alpha (float): Regularization parameter for Wiener deconvolution.

    Returns:
        list of np.ndarray: List of deconvolved impulse responses.
    """
    
    impulse_responses = []
    
    for input_signal, output_signal in zip(input_signals, output_signals):
        # Zero-pad the input signal to match the length of the output signal
        padded_input = np.pad(input_signal, (0, len(output_signal) - len(input_signal)))
        
        # Perform FFT
        X = fft(padded_input)
        Y = fft(output_signal)
        
        if method == "fft":
            # Frequency-domain deconvolution
            H = Y / X
        elif method == "wiener":
            # Wiener deconvolution
            H = (Y * np.conj(X)) / (np.abs(X)**2 + alpha)
        else:
            raise ValueError("Unsupported method. Use 'fft' or 'wiener'.")
        
        # Inverse FFT to compute the impulse response
        impulse_response = np.real(ifft(H))
        impulse_responses.append(impulse_response)
    
    return impulse_responses



In [99]:
def load_audio_files(folder_path, sr=44100):
    """
    Load all audio files from a folder.
    
    Parameters:
        folder_path (str): Path to the folder containing audio files.
        sr (int): Sampling rate for loading audio files.
    
    Returns:
        list of np.ndarray: List of loaded audio signals.
    """
    # List all files in the folder
    audio_files = [
        os.path.join(folder_path, f) 
        for f in os.listdir(folder_path) 
        if os.path.isfile(os.path.join(folder_path, f))
    ]
    
    # Load audio files
    signals = [librosa.load(file, sr=sr)[0] for file in audio_files]
    return signals

In [100]:
def test_impulse_responses(input_signals, output_signals, impulse_responses):
    """
    Test the accuracy of the impulse responses by reconstructing the output signals.

    Parameters:
        input_signals (list of np.ndarray): List of input signals.
        output_signals (list of np.ndarray): List of corresponding output signals.
        impulse_responses (list of np.ndarray): List of computed impulse responses.

    Returns:
        list of float: RMSE values for each signal pair.
        list of float: Accuracy percentages for each signal pair.
    """
    rmse_values = []
    accuracy_percentages = []

    for i, (input_signal, output_signal, impulse_response) in enumerate(
        zip(input_signals, output_signals, impulse_responses)
    ):
        # Reconstruct the output signal by convolving input with impulse response
        predicted_output = convolve(input_signal, impulse_response, mode="full")

        # Truncate predicted output to match actual output length
        predicted_output = predicted_output[:len(output_signal)]

        # Compute RMSE
        rmse = np.sqrt(np.mean((predicted_output - output_signal) ** 2))
        rmse_values.append(rmse)

        # Compute percentage accuracy
        range_actual_signal = np.max(output_signal) - np.min(output_signal)
        if range_actual_signal > 0:  # Avoid division by zero
            accuracy = (1 - rmse / range_actual_signal) * 100
        else:
            accuracy = 0  # Undefined when the range is zero
        accuracy_percentages.append(accuracy)


    return rmse_values, accuracy_percentages


In [101]:
def average_impulse_responses(impulse_responses):
    """
    Compute the average of all impulse responses.

    Parameters:
        impulse_responses (list of np.ndarray): List of computed impulse responses.

    Returns:
        np.ndarray: Average impulse response.
    """
    # Ensure all impulse responses are of the same length (optional check)
    impulse_lengths = [len(ir) for ir in impulse_responses]
    if len(set(impulse_lengths)) > 1:
        print("Warning: Impulse responses have different lengths. Average may not be accurate.")
    
    # Sum all impulse responses element-wise
    sum_responses = np.sum(impulse_responses, axis=0)

    # Compute the average by dividing by the number of impulse responses
    average_response = sum_responses / len(impulse_responses)

    return average_response

In [102]:
def pad_impulse_responses(impulse_responses):
    """
    Pad impulse responses to make them all the same length (the length of the longest impulse response).

    Parameters:
        impulse_responses (list of np.ndarray): List of impulse responses with different lengths.

    Returns:
        list of np.ndarray: List of padded impulse responses.
    """
    # Find the length of the longest impulse response
    max_length = max(len(ir) for ir in impulse_responses)

    # Pad each impulse response to the maximum length
    padded_responses = [
        np.pad(ir, (0, max_length - len(ir)), mode='constant') for ir in impulse_responses
    ]

    return padded_responses

def average_impulse_responses(impulse_responses):
    """
    Compute the average of all impulse responses after padding them to the same length.

    Parameters:
        impulse_responses (list of np.ndarray): List of computed impulse responses.

    Returns:
        np.ndarray: Average impulse response.
    """
    # Pad all impulse responses to the same length
    padded_responses = pad_impulse_responses(impulse_responses)

    # Sum all padded impulse responses element-wise
    sum_responses = np.sum(padded_responses, axis=0)

    # Compute the average by dividing by the number of impulse responses
    average_response = sum_responses / len(padded_responses)

    return average_response

In [103]:
def test_impulse_response(input_signals, output_signals, impulse_response):
    """
    Test the accuracy of the impulse response by reconstructing the output signals.

    Parameters:
        input_signals (list of np.ndarray): List of input signals.
        output_signals (list of np.ndarray): List of corresponding output signals.
        impulse_responses A computed impulse response.

    Returns:
        list of float: RMSE values for each signal pair.
        list of float: Accuracy percentages for each signal pair.
    """
    rmse_values = []
    accuracy_percentages = []

    for i, (input_signal, output_signal) in enumerate(
        zip(input_signals, output_signals)
    ):
        # Reconstruct the output signal by convolving input with impulse response
        predicted_output = convolve(input_signal, impulse_response, mode="full")

        # Truncate predicted output to match actual output length
        predicted_output = predicted_output[:len(output_signal)]

        # Compute RMSE
        rmse = np.sqrt(np.mean((predicted_output - output_signal) ** 2))
        rmse_values.append(rmse)

        # Compute percentage accuracy
        range_actual_signal = np.max(output_signal) - np.min(output_signal)
        if range_actual_signal > 0:  # Avoid division by zero
            accuracy = (1 - rmse / range_actual_signal) * 100
        else:
            accuracy = 0  # Undefined when the range is zero
        accuracy_percentages.append(accuracy)

    return rmse_values, accuracy_percentages

In [104]:
if __name__ == "__main__":
    # Define folder paths
    new_claps_folder_path = '../data/new_claps/'
    new_echoes_folder_path = '../data/new_echoes/'
    
    # Load input and output signals
    input_signals = load_audio_files(new_claps_folder_path)
    output_signals = load_audio_files(new_echoes_folder_path)
    
    # Perform deconvolution
    impulse_responses = deconvolve_signals(input_signals, output_signals, method="fft")

    # Test impulse responses
    rmse_values, accuracy_percentages = test_impulse_responses(input_signals, output_signals, impulse_responses)

    # Best impulse response
    best_index = np.argmax(accuracy_percentages)
    best_impulse_response = impulse_responses[best_index]
    rmse_values, accuracy_percentages = test_impulse_response(input_signals, output_signals, best_impulse_response)

    print(f"Best Impulse Response Accuracy - Min: {min(accuracy_percentages)}, Max: {max(accuracy_percentages)}")

    # Compute average impulse response
    # avg_impulse_response = average_impulse_responses(impulse_responses)
    # rmse_values, accuracy_percentages = test_impulse_response(input_signals, output_signals, avg_impulse_response)
    # print(min(accuracy_percentages))
    # print(max(accuracy_percentages))


Best Impulse Response Accuracy - Min: 87.7069041132927, Max: 98.4475745819509
