In [1]:
!pip install librosa
!pip install torch
!pip install torchaudio
!pip install noisereduce



In [2]:
import datetime
from IPython.display import Audio
import librosa
import librosa.display
import matplotlib.pyplot as plt
import noisereduce as nr
import numpy as np
import os
import random
from scipy.io import wavfile
import shutil
import soundfile as sf
from tqdm import tqdm

In [12]:
class AudioAnalyzer:
    def __init__(self, file_path):
        self.file_path = file_path
        self.date = None # String: Date of the audio recording
        self.time = None # String: Time of the audio recording
        self.alarms = [] # Starting times of alarm events
        self.mfccs_features = [] # Mel-frequency cepstral coefficients. Must be a list of lists.
        self.sbw_features = [] # Spectral bandwidth. Must be a list of lists.

    def get_audio_date_and_time(self):
        """
        Extract the date and time from the file name and assign them to the instance variables.
        
        The file name format is expected to be "YYYYMMDD-HHMMSS".
        """
        file_name = os.path.basename(self.file_path)
        file_name_without_extension = os.path.splitext(file_name)[0]
        
        # Split the filename into date and time
        self.date, self.time = file_name_without_extension.split('-')
        # Convert the date and time to integers
        self.date = int(self.date)
        # Convert the time to an integer while preserving leading zeros
        self.time = self.time.lstrip('0')  # Remove leading zeros
        self.time = int(self.time)  # Convert to integer
        self.time = f"{self.time:06d}"  # Re-add leading zeros if necessary

    def denoise_auxiliary_function(self):
        """
        Perform noise reduction on the audio file.

        Returns:
        - y_denoised (ndarray): Denoised audio signal.
        - sr (int): Sample rate of the audio signal.
        """
        # Load data
        y, sr = librosa.load(self.file_path)

        # Perform noise reduction
        y_denoised = nr.reduce_noise(y=y, sr=sr)

        return y_denoised, sr
    
    def denoise_and_plot(self, save_denoised=False, save_time_plot=False, output_folder=None):
        """
        Perform noise reduction on the audio file and optionally save the denoised audio and/or plot the waveforms.

        Args:
            save_denoised (bool, optional): Whether to save the denoised audio to a file. Defaults to False.
            save_time_plot (bool, optional): Whether to plot the original and denoised waveforms. Defaults to False.
            output_folder (str, optional): The folder where the denoised audio and/or plot will be saved. If not specified,
                a new folder will be created in the current working directory. Defaults to None.

        Returns:
            tuple: A tuple containing the denoised audio signal and the sample rate.

        """
        # Perform noise reduction using noisereduce
        y_denoised, sr = self.denoise_auxiliary_function()

        # Check if plots or denoised audio are required
        if save_time_plot or save_denoised:
            # If the output folder is not specified, create a new folder in the current working directory
            if output_folder is None:
                output_folder = os.path.join(os.getcwd(), "output_folder")
            # Ensure the output folder exists
            os.makedirs(output_folder, exist_ok=True)

        if save_time_plot == True:
            # Plot the original and denoised waveforms
            plt.figure(figsize=(10, 8))
            plt.subplot(2, 1, 1)
            y, _ = librosa.load(self.file_path)
            plt.plot(y)
            plt.title('Original Waveform')
            plt.xlabel('Time (samples)')
            plt.ylabel('Amplitude')

            plt.subplot(2, 1, 2)
            plt.plot(y_denoised)
            plt.title('Denoised Waveform')
            plt.xlabel('Time (samples)')
            plt.ylabel('Amplitude')

            plt.tight_layout()

            # Get the name of the original file
            file_name = os.path.splitext(os.path.basename(self.file_path))[0]
            # Name the plot file
            plot_path = os.path.join(output_folder, f"{file_name}_time_plot.png")
            # Save the plot
            plt.savefig(plot_path)
            # Close the plot to avoid displaying it
            plt.close()

        # Write the denoised audio to a file
        if save_denoised == True:
            # Get the name of the original file
            file_name = os.path.splitext(os.path.basename(self.file_path))[0]
            # Name the denoised file
            denoised_path = os.path.join(output_folder, f"{file_name}.wav")
            # Save the denoised audio
            sf.write(denoised_path, y_denoised, sr)

        # Return the denoised audio signal and the sample rate so that we can compute features on it
        return y_denoised, sr
    
    def compute_mfccs_on_window(self, window, sr):
        """
        Compute the mel-frequency cepstral coefficients (MFCCs) for a given audio signal.

        Parameters:
        - y_denoised (ndarray): Denoised audio signal.
        - sr (int): Sampling rate of the audio signal.

        Returns:
        - None

        This method calculates the MFCCs for the given audio signal and stores the resulting feature vector in the `mfccs_features` attribute of the class.
        The feature vector includes the mean, standard deviation, minimum, maximum, and percentiles of each MFCC coefficient.
        """
        # Compute the mel-frequency cepstral coefficients (MFCCs)
        mfccs = librosa.feature.mfcc(y=window, sr=sr, n_mfcc=13)
        
        # Calculate the mean, std, min, max, and percentiles for each MFCC coefficient
        mfccs_mean = np.mean(mfccs, axis=1)
        mfccs_std = np.std(mfccs, axis=1)
        mfccs_min = np.min(mfccs, axis=1)
        mfccs_max = np.max(mfccs, axis=1)
        mfccs_percentiles = np.percentile(mfccs, [25, 50, 75], axis=1)

        # Combine all statistics into a single feature vector
        feature_vector = np.hstack([
            mfccs_mean,
            mfccs_std,
            mfccs_min,
            mfccs_max,
            mfccs_percentiles.flatten()
        ])

        # Append the feature vector to the list
        self.mfccs_features.append(feature_vector)

    def compute_spectral_bandwidth_on_window(self, window, sr):
        """
        Compute the spectral bandwidth of an audio signal.

        Parameters:
        - y_denoised (ndarray): Denoised audio signal.
        - sr (int): Sampling rate of the audio signal.

        Returns:
        - None

        This method computes the spectral bandwidth of an audio signal using the librosa library.
        It calculates the mean, standard deviation, minimum, maximum, and percentiles of the spectral bandwidth,
        and combines them into a single feature vector.

        The computed feature vector is stored in the `sbw_features` attribute of the class instance.
        """

        # Compute the spectral bandwidth
        sbw = librosa.feature.spectral_bandwidth(y=window, sr=sr)

        # Aggregate the spectral bandwidth
        mean_bandwidth = np.mean(sbw)
        std_bandwidth = np.std(sbw)
        min_bandwidth = np.min(sbw)
        max_bandwidth = np.max(sbw)
        percentiles_bandwidth = np.percentile(sbw, [25, 50, 75])

        # Combine all features into a single feature vector
        feature_vector = np.hstack([
            mean_bandwidth,
            std_bandwidth,
            min_bandwidth,
            max_bandwidth,
            percentiles_bandwidth
        ])

        self.sbw_features.append(feature_vector)

    def get_alarms_and_features_and_plot_spectrograms(self, y_denoised, sr, plot_spectrograms=False, output_folder=None,
                                         pre_max=7500, post_max=7500, pre_avg=1000, post_avg=1000, delta=0.4, wait=10000):
        """
        Extracts alarms from the denoised audio signal and plots the corresponding spectrograms.

        Parameters:
        - y_denoised (ndarray): The denoised audio signal.
        - sr (int): The sample rate of the audio signal.
        - plot_spectrograms (bool): Whether to plot and save the spectrograms. Default is False.
        - output_folder (str): The folder path to save the spectrograms. If not specified, a new folder will be created in the current working directory.
        - pre_max (int): The number of samples before the peak to consider for peak picking. Default is 7500.
        - post_max (int): The number of samples after the peak to consider for peak picking. Default is 7500.
        - pre_avg (int): The number of samples before the peak to consider for peak averaging. Default is 1000.
        - post_avg (int): The number of samples after the peak to consider for peak averaging. Default is 1000.
        - delta (float): The threshold for peak picking. Default is 0.2.
        - wait (int): The number of samples to wait before picking another peak. Default is 10000.

        Returns:
        - None

        """
        # Check if plots are required
        if plot_spectrograms:
            # If the output folder is not specified, create a new folder in the current working directory
            if output_folder is None:
                output_folder = os.path.join(os.getcwd(), "output_folder")
            # Ensure the output folder exists
            os.makedirs(output_folder, exist_ok=True)

        # Extract the peaks from the denoised audio signal
        peaks = librosa.util.peak_pick(y_denoised, pre_max=pre_max, post_max=post_max, pre_avg=pre_avg, post_avg=post_avg, delta=delta, wait=wait)

         # Iterate over each peak
        for peak in peaks:
            # Extract a window centered at the peak with a duration of 1.6 seconds
            window_size = int(sr * 1.6) # int
            window_start = max(0, peak - window_size // 2) # int
            window_end = min(len(y_denoised), peak + window_size // 2) # int
            window = y_denoised[window_start:window_end] # ndarray

            # Compute mfccs for the window
            self.compute_mfccs_on_window(window, sr)
            # Compute spectral bandwidth for the window
            self.compute_spectral_bandwidth_on_window(window, sr)
            
            # Compute the mel-spectrogram of the window
            S = librosa.feature.melspectrogram(y=window, sr=sr, n_mels=128)
            # Compute the spectrogram of the window
            S_dB = librosa.power_to_db(S, ref=np.max)
            
            # Compute the start time of the window in seconds
            start_time_in_seconds = window_start / sr
            self.alarms.append(np.floor(start_time_in_seconds))
            # Express it in minutes for better readability
            minutes = int(start_time_in_seconds // 60)
            remaining_seconds = int(start_time_in_seconds % 60)
            start_time_in_minutes =  f"{minutes}.{remaining_seconds:02d}min"
            
            if plot_spectrograms:
                # Display the melspectrogram
                plt.figure(figsize=(10, 4))
                librosa.display.specshow(S_dB, sr=sr, x_axis='time', y_axis='log')
                plt.colorbar(format='%+2.0f dB')
                plt.title(f"Mel-Spectrogram for Alarm Event in {os.path.basename(self.file_path)} (Start Time: {start_time_in_minutes} seconds)")
                
                # Save the spectrogram as an image
                file_name = os.path.splitext(os.path.basename(self.file_path))[0]
                spectrogram_path = os.path.join(output_folder, f"{file_name}_spectrogram_{minutes}{remaining_seconds:02d}.png")
                plt.savefig(spectrogram_path)
                plt.close()
                print(f"Mel-Spectrogram saved as: {spectrogram_path}")

    # These 2 last functions must be called excplicitly by the user
    def extract_features_from_audio(self,
                                    save_denoised=False, save_time_plot=False, output_folder=None,
                                    plot_spectrograms=False, 
                                    pre_max=7500, post_max=7500, pre_avg=1000, post_avg=1000, delta=0.5, wait=10000):
        """
        Extracts features from audio data saving them as attributes of the AudioAnalyzer object.

        Args:
            save_denoised (bool, optional): Whether to save the denoised audio. Defaults to False.
            save_time_plot (bool, optional): Whether to save the time plot of the audio waveform. Defaults to False.
            output_folder (str, optional): The folder path to save the denoised audio and time plot. Defaults to None.
            plot_spectrograms (bool, optional): Whether to plot spectrograms of the audio. Defaults to False.
            pre_max (int, optional): Number of samples to include before the maximum value in spectrogram plots. Defaults to 7500.
            post_max (int, optional): Number of samples to include after the maximum value in spectrogram plots. Defaults to 7500.
            pre_avg (int, optional): Number of samples to include before the average value in spectrogram plots. Defaults to 1000.
            post_avg (int, optional): Number of samples to include after the average value in spectrogram plots. Defaults to 1000.
            delta (float, optional): Threshold for detecting alarms in spectrogram plots. Defaults to 0.2.
            wait (int, optional): Time in milliseconds to wait between plotting spectrograms. Defaults to 10000.
        """
        # Perform noise reduction and optionally save the denoised audio and/or plot the waveforms
        y_denoised, sr = self.denoise_and_plot(save_denoised, save_time_plot, output_folder)

        # Extract the date and time from the file name
        self.get_audio_date_and_time()

        # Extract alarms and plot spectrograms
        self.get_alarms_and_features_and_plot_spectrograms(y_denoised, sr, plot_spectrograms, output_folder,
                                         pre_max, post_max, pre_avg, post_avg, delta, wait)
        
    def create_alarm_feature_array(self):
        """
        Creates a numpy array where each row corresponds to an alarm event, and columns include date, time, MFCCs, and spectral bandwidth features.
        
        Returns:
        - alarm_feature_array (ndarray): A numpy array with n rows (one for each alarm event) and columns representing the features.
        """
        # Initialize an empty list to hold the rows of the feature array
        rows = []
        
        # Convert the date to a datetime object
        self.date = datetime.datetime.strptime(str(self.date), "%Y%m%d").date()
        # Convert the time to a datetime object
        self.time = datetime.datetime.strptime(str(self.time), "%H%M%S").time()

        # Iterate over the alarms
        for i, alarm_time in enumerate(self.alarms):
            alarm_event_time = datetime.datetime.combine(self.date, self.time) + datetime.timedelta(seconds=alarm_time)
            # convert the alarm event time to a string
            alarm_event_time = alarm_event_time.strftime("%Y-%m-%d %H:%M:%S")
            
            # Get the MFCC and spectral bandwidth features for the current alarm event
            mfcc_features = self.mfccs_features[i]
            sbw_features = self.sbw_features[i]
            
            # Flatten the MFCC and spectral bandwidth arrays to ensure they are one-dimensional
            mfcc_features_flat = mfcc_features.flatten()
            sbw_features_flat = sbw_features.flatten()
            
            # Create a row with the date, alarm event time, and the feature arrays
            row = [alarm_event_time] + mfcc_features_flat.tolist() + sbw_features_flat.tolist()
            
            # Append the row to the list of rows
            rows.append(row)
        
        # Convert the list of rows to a NumPy array
        alarm_feature_array = np.array(rows)
        
        return alarm_feature_array

In [13]:
# Example usage
#file_path = 'example.wav'
#audio_analyzer = AudioAnalyzer(file_path)
#audio_analyzer.extract_features_from_audio()
#alarms = audio_analyzer.alarms
#h_array1 = audio_analyzer.mfccs_features
#h_array2 = audio_analyzer.sbw_features

In [14]:
def analyze_random_audio_files(input_folder, num_files=15):
    # Get a list of all files in the input folder
    file_list = []
    if os.path.isdir(input_folder):
        for root, dirs, files in os.walk(input_folder):
            for file in files:
                file_list.append(os.path.join(root, file))
    else:
        file_list.append(input_folder)

    # Select a random subset of files
    random_files = random.sample(file_list, num_files)

    # Create AudioAnalyzer objects and compute spectral bandwidth for each file
    for file_path in random_files:
        audio_analyzer = AudioAnalyzer(file_path)
        audio_analyzer.extract_features_from_audio(save_denoised=True, save_time_plot=True, plot_spectrograms=True)

In [15]:
#input_folder = r'\\10.100.113.14\2024scs4_genetail\Audio_Suini_Gorza\20240405'
#analyze_random_audio_files(input_folder)

In [16]:
def find_audio_files(folder, extensions=['.wav', '.mp3', '.flac']):
    """
    Recursively find all audio files in the given folder.

    Args:
    - folder (str): The root folder to search for audio files.
    - extensions (list): A list of file extensions to look for.

    Returns:
    - audio_files (list): A list of paths to audio files.
    """
    audio_files = []
    for root, _, files in os.walk(folder):
        for file in files:
            if any(file.lower().endswith(ext) for ext in extensions):
                audio_files.append(os.path.join(root, file))
    return audio_files


In [17]:
def process_audio_files(folder):
    """
    Process all audio files in the specified folder, extract features, and stack them horizontally.

    Args:
    - folder (str): The root folder to search for audio files.

    Returns:
    - combined_matrix_with_header (ndarray): A combined matrix of alarm features from all audio files,
      including the header row.
    """
    # Find all audio files in the folder
    audio_files = find_audio_files(folder)

    # Initialize an empty list to hold all the alarm feature arrays
    feature_matrices = []

    # Initialize a progress bar
    pbar = tqdm(total=len(audio_files))

    # Process each audio file
    for file_path in audio_files:
        # Create an AudioAnalyzer object
        audio_analyzer = AudioAnalyzer(file_path)

        # Extract features from the audio file
        audio_analyzer.extract_features_from_audio(save_denoised=True)

        # Create the alarm feature array
        alarm_feature_array = audio_analyzer.create_alarm_feature_array()

        # Update the progress bar
        pbar.update(1)

        if alarm_feature_array.shape[0] > 0:
            # Append the feature array to the list
            feature_matrices.append(alarm_feature_array)

    # Close the progress bar
    pbar.close()

    # Check if feature_matrices is not empty
    if feature_matrices:
        # Stack all feature matrices vertically to create a combined matrix
        combined_matrix = np.vstack(feature_matrices)
    else:
        # If no features were extracted, return an empty array with a message
        print("No features extracted from any files.")
        return np.array([])

    # Define the header row
    header_row = "Date, "
    for i in range(1, 14):
        header_row += f"mfcc_mean{i}, "
    for i in range(1, 14):
        header_row += f"mfcc_std{i}, "
    for i in range(1, 14):
        header_row += f"mfcc_min{i}, "
    for i in range(1, 14):
        header_row += f"mfcc_max{i}, "
    for i in range(1, 14):
        header_row += f"mfcc_perc25_{i}, "
    for i in range(1, 14):
        header_row += f"mfcc_perc50_{i}, "
    for i in range(1, 14):
        header_row += f"mfcc_perc75_{i}, "
    header_row += "sbw_mean, sbw_std, sbw_min, sbw_max, sbw_perc25, sbw_perc50, sbw_perc75"

    # Split the header row into a list of column names
    header_row_list = header_row.split(', ')

    # Insert the header row as the first row in the combined_matrix
    combined_matrix_with_header = np.vstack([header_row_list] + combined_matrix.tolist())

    return combined_matrix_with_header

In [18]:
input_folder = r"\\10.100.113.14\2024scs4_genetail\Audio_Suini_Gorza\Ciclo2\20240514"
data_array = process_audio_files(input_folder)

  0%|          | 0/288 [00:00<?, ?it/s]

100%|██████████| 288/288 [18:41<00:00,  3.89s/it]


In [19]:
print(data_array.shape)

(179, 99)


In [20]:
np.savetxt('data_array.csv', data_array, delimiter=',', fmt='%s', comments='')