In [2]:
from pydub import AudioSegment
import time
import os
import librosa
import pandas as pd
import numpy as np
import warnings

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

from pydub import AudioSegment
import time

import joblib

warnings.filterwarnings("ignore")

In [44]:
def createDataset():
    '''The function creates a dataset from audio files in a directory, extracting message ID, audio file
    name, and person ID.
    
    Returns
    -------
        A DataFrame containing information about audio files in the 'AudiosMp3' directory. The DataFrame
    has columns 'Id_Mensaje', 'Audio', and 'Id_Persona'.
    
    '''
    files = os.listdir('../audios_modelo/')
    
    dict = {'Audio': [], 'Id_Persona': []}
    for file in files:
        dict['Audio'].append(file)
        if file[11:-4] == 'Gerard': 
            dict['Id_Persona'].append(0)
        elif file[11:-4] == 'Albert': 
            dict['Id_Persona'].append(1)
        elif file[11:-4] == 'Adria': 
            dict['Id_Persona'].append(2)
        elif file[11:-4] == 'Raul': 
            dict['Id_Persona'].append(3)
        elif file[11:-4] == 'Otros':
            dict['Id_Persona'].append(4)
            
    df = pd.DataFrame(dict)
    return df

df = createDataset()


In [45]:
df

Unnamed: 0,Audio,Id_Persona
0,AudioFinal_Adria.wav,2
1,AudioFinal_Albert.wav,1
2,AudioFinal_Gerard.wav,0
3,AudioFinal_Otros.wav,4
4,AudioFinal_Raul.wav,3


In [46]:
def unzipData(data):
    '''The `unzipData` function takes a list of tuples, separates the elements into two arrays, and returns
    them.
    
    Parameters
    ----------
    data
        The `data` parameter is expected to be a list of tuples where each tuple contains two elements. The
    first element should be the input data (X) and the second element should be the corresponding target
    data (y).
    
    Returns
    -------
        The `unzipData` function returns two arrays, `X` and `y`, after unzipping the input `data` and
    converting them into numpy arrays.
    
    '''
    X, y = zip(*data)
    X = np.array(list(X))
    y =np.array(list(y))
    return X, y


def removeSilence(audio, silence_threshold = 0.05):
    '''The `removeSilence` function removes silent segments from an audio signal based on a specified
    silence threshold.
    
    Parameters
    ----------
    audio
        The `removeSilence` function you provided is designed to remove segments of silence from an audio
    signal based on a specified silence threshold. The function takes two parameters:
    silence_threshold
        The `silence_threshold` parameter in the `removeSilence` function represents the minimum amplitude
    value below which a segment of audio is considered as silence. Any audio samples with absolute
    values less than this threshold are identified as silence and removed from the audio signal. By
    adjusting this threshold, you can control
    
    Returns
    -------
        The function `removeSilence` returns the audio signal with silence segments removed based on the
    specified silence threshold.
    
    '''
    
    
    
    # Encontrar los índices de los segmentos de silencio
    silence_indices = np.where(np.abs(audio) < silence_threshold)[0]

    # Crear una máscara para mantener los segmentos que no son de silencio
    mask = np.ones_like(audio, dtype=bool)
    mask[silence_indices] = False

    # Aplicar la máscara al audio para eliminar los segmentos de silencio
    audio_sin_silencio = audio[mask]

    if audio_sin_silencio.size == 0:
        return audio
    return audio_sin_silencio


def lowPassFilter(audio, sr, cutoff_freq = 3000):
    '''The function `lowPassFilter` applies a low-pass filter to an audio signal in the frequency domain to
    remove high-frequency components above a specified cutoff frequency.
    
    Parameters
    ----------
    audio
        The `audio` parameter is the input audio signal that you want to filter using a low-pass filter. It
    is typically represented as a one-dimensional array of audio samples.
    sr
        The `sr` parameter in the `lowPassFilter` function stands for the sampling rate of the audio
    signal. It represents the number of samples taken per second when the audio signal was recorded or
    processed. The sampling rate is typically measured in Hertz (Hz).
    cutoff_freq
        The `cutoff_freq` parameter in the `lowPassFilter` function represents the frequency at which you
    want to filter out higher frequencies from the audio signal. Frequencies above the `cutoff_freq`
    will be attenuated or removed from the signal, effectively creating a low-pass filter that allows
    only
    
    Returns
    -------
        The function `lowPassFilter` returns the filtered audio signal in the time domain after applying a
    low-pass filter in the frequency domain.
    
    '''
    y_fft = np.fft.fft(audio)

    freqs = np.fft.fftfreq(len(audio), 1 / sr)
    lowpass_filter = np.abs(freqs) <= cutoff_freq

    # Aplicar el filtro pasa bajos multiplicando la señal en el dominio de la frecuencia por el filtro
    y_fft_filtered = y_fft * lowpass_filter

    # Aplicar la Transformada Inversa de Fourier para obtener la señal filtrada en el dominio del tiempo
    y_filtered = np.real(np.fft.ifft(y_fft_filtered))
    
    return y_filtered


def spec(y, sr, spec = 'wavelet'):
    '''This Python function generates different types of spectrograms based on the specified type.
    
    Parameters
    ----------
    y
        The function `spec` you provided seems to be a spectrogram generator that can produce different
    types of spectrograms based on the specified `spec` parameter. However, there are a couple of issues
    in the code:
    sr
        The `sr` parameter in the `spec` function stands for the sampling rate of the audio signal. It
    represents the number of samples of audio carried per second, typically measured in Hz (Hertz).
    spec, optional
        The `spec` function you provided seems to be a spectrogram generator that can produce different
    types of spectrograms based on the `spec` parameter provided. The spectrogram types it supports are
    'wavelet', 'linear', 'log', 'mel', and 'cqt'.
    
    Returns
    -------
        the spectrogram based on the specified type of spectrogram calculation method (wavelet, linear,
    log, mel, or cqt).
    
    '''
    

    if spec == 'linear'  : spectogram = np.abs(librosa.stft(y))

    elif spec == 'log' : spectogram = np.abs(librosa.stft(y))
    
    elif spec == 'mel' : spectogram = librosa.feature.melspectrogram(y=y, sr=sr)
    
    elif spec == 'cqt' : spectogram = np.abs(librosa.cqt(y, sr=sr))
    
    elif spec == 'wavelet' : spectogram = np.abs(librosa.cqt(y, sr=sr, fmin=librosa.note_to_hz('C1')))
    
    else: return None
    return spectogram



def windowing(image, max_size, despl=0):
    '''The function `windowing` creates a matrix of windows from an input image by rearranging its columns.
    
    Parameters
    ----------
    image
        The `image` parameter is a 2D numpy array representing an image. Each element in the array
    corresponds to a pixel value in the image.
    max_size
        The `max_size` parameter in the `windowing` function represents the maximum size of the window
    matrix that will be created. This parameter determines the number of rows in the window matrix,
    while the number of columns will be the same as the number of columns in the input `image` matrix.
    
    Returns
    -------
        A matrix of windows with the same number of rows as the maximum size provided and the same number
    of columns as the input image. Each column of the matrix corresponds to a window extracted from the
    input image.
    
    '''
    
    if despl == 0 : windows = np.zeros((max_size, image.shape[0]))  # Crear matriz de ventanas con el mismo número de columnas que la imagen
    else: windows = np.zeros((max_size, image.shape[0], despl))
    for i in np.arange(0, image.shape[1]):
        if i + despl >= image.shape[1]:
            break
        if despl == 0: windows[i, :] = image[: , i]
        else: windows[ i, :, :] = image[: , i :i + despl]
    return windows



  

In [47]:
def compute_rowSpecWindowing(row, specType='mel', despl=0, smoothSpec = False, filter = False, silence = False):
    '''The function `compute_rowSpecWindowing` loads an audio file, processes it by removing silence and
    smoothing, generates a spectrogram image, and then applies windowing to create smaller segments
    along with corresponding labels.
    
    Parameters
    ----------
    row
        The `compute_rowSpecWindowing` function takes a row of data as input and processes the audio file
    specified in the row to generate spectrogram windows. Here is a breakdown of the parameters used in
    the function:
    clean, optional
        The `clean` parameter in the `compute_rowSpecWindowing` function is used to determine whether to
    remove silence from the beginning and end of the audio signal before processing it. If `clean=True`,
    the function will apply `librosa.effects.trim(y)` to remove the silence. If `
    smoothAudio, optional
        The `smoothAudio` parameter in the `compute_rowSpecWindowing` function is used to determine whether
    to apply smoothing to the audio signal before generating the spectrogram. If `smoothAudio` is set to
    `True`, the function will call a `smooth_audio` function with a smoothing factor of
    smoothSpec, optional
        The `smoothSpec` parameter in the `compute_rowSpecWindowing` function is used to determine whether
    to apply smoothing to the spectrogram image generated from the audio data. If `smoothSpec` is set to
    `True`, then the spectrogram image will be smoothed using the `smooth_image`
    
    Returns
    -------
        The code snippet is defining a function `compute_rowSpecWindowing` that processes audio data from a
    DataFrame row. It loads an audio file, trims silence if specified, applies smoothing to the audio
    and spectrogram if specified, generates a spectrogram image, and then performs windowing on the
    spectrogram image. Finally, it creates a numpy array `y` with the 'Id_Persona
    
    '''
    try:
        y, sr = librosa.load(f"..\\audios_modelo\\{row['Audio']}")
    except:
        y, sr = librosa.load(f"../audios_modelo/{row['Audio']}")
    

    if silence : y = removeSilence(y)    
    
    
    if filter: y = lowPassFilter(y, sr)



    image = spec(y, sr, spec=specType)
    print(image.shape)
    if smoothSpec : image = smooth_image(image, 1.5)
    
    windows = windowing(image, 21000, despl = despl)
    y = np.array([row['Id_Persona'] for i in range(21000)])
    
    return windows, y

In [48]:
def entrenarModelo():
    '''The function `entrenarModelo` evaluates different machine learning models using various
    spectrogram types and returns classification reports for each model and spectrogram type.
        
    Returns
    -------
        The function 'entrenarModelo' is returning a dictionary `classification_reports` containing
    classification reports for different models and spectrogram types. Each model is evaluated with
    different spectrogram types ('linear', 'log', 'mel', 'cqt', 'wavelet'). The classification reports
    include metrics such as precision, recall, F1-score, and support for each class.
    
    '''

    models = {
        "RandomForest": RandomForestClassifier(),
        #"GradientBoosting": GradientBoostingClassifier(),
        #"XGBoost": XGBClassifier(),
        #"LightGBM": LGBMClassifier(),
    }
    
    results = []


    classification_reports = {}
    for model_name, model in models.items():
        classification_reports[model_name] = {}

        data = df.apply(lambda row: compute_rowSpecWindowing(row, specType='mel',despl=0,  filter=True, silence=True), axis=1)

        X, y = unzipData(data)

        X = np.reshape(X, (X.shape[0] * X.shape[1], X.shape[2]))
        y = np.reshape(y, (y.shape[0] * y.shape[1]))

        y = y[np.mean(X, axis=1) != 0]
        X = X[np.mean(X, axis=1) != 0]

        sc =StandardScaler()

        X = sc.fit_transform(X)

        joblib.dump(sc, '../raspi/modelos/StandardScaler.pkl')

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

        model.fit(X_train, y_train)
    
        joblib.dump(model, '../raspi/modelos/ModeloWindowing.pkl')

        y_pred = model.predict(X_test)
        report = accuracy_score(y_test, y_pred)
        
        results.append({
                'Model': 'rf',
                'filter': True,
                'silence': True,
                'Accuracy': report,
                
        })

    return pd.DataFrame(results)

In [49]:
resultados = entrenarModelo()

(128, 6957)
(128, 6869)
(128, 5529)
(128, 20964)
(128, 1967)


In [50]:
def predict(algorithm='windowing', dataType='ruido', model='rf', filter=False, audio='aux.mp3'):
    """
    Predicts the output based on the given parameters.

    Parameters:
    - algorithm (str): The algorithm to use for prediction. Default is 'windowing'.
    - dataType (str): The type of data to use for prediction. Default is 'ruido'.
    - model (str): The model to use for prediction. Default is 'svm'.
    - filter (bool): Whether to apply a filter or not. Default is False.
    - audio (str): The path to the audio file. Default is 'aux.mp3'.

    Returns:
    - y_pred (numpy.ndarray): The predicted output.

    Raises:
    - FileNotFoundError: If the audio file does not exist.
    - ValueError: If the combination of algorithm, dataType, and model is not supported.

    """
    if not os.path.exists(audio):
        return -1
    
    if algorithm == 'windowing':
        if model != 'rf':
            return -1
        
        t0 = time.time()
        
        
        m = joblib.load(f'../raspi/modelos/{dataType}/{algorithm}/ModeloWindowing.pkl')
        scaler = joblib.load(f'../raspi/modelos/{dataType}/{algorithm}/StandardScaler.pkl')
        
        y, sr = librosa.load(audio)
        
        if dataType == 'ruidoNorm':
            average_rms = joblib.load(f'./modelos/{dataType}/average_rms.pkl')
            y = normalize_audio(y, average_rms)
            y = apply_compression(y)
        
        y = lowPassFilter(y, sr)
        y = removeSilence(y, 0.01)
        image = spec(y, sr, spec='mel')
        windows = windowing(image, 320, despl=0)
        X = windows[np.mean(windows, axis=1) != 0]
        X = scaler.transform(X)
        y_pred = m.predict(X)
        y_pred = np.array(y_pred)
        return y_pred, np.bincount(y_pred).argmax()
    
    if algorithm == 'specsModel':
        if filter and dataType == 'ruidoNorm':
            return -1
        if model != 'cnn':
            return -1
        
        model = joblib.load(f'./modelos/{dataType}/{algorithm}/modelos/cnn_{algorithm}_filter_{str(filter)}.pkl')
        y, sr = librosa.load(audio)
        
        if dataType == 'ruidoNorm':
            average_rms = joblib.load(f'./modelos/{dataType}/average_rms.pkl')
            y = normalize_audio(y, average_rms)
            y = apply_compression(y)
        
        y = removeSilence(y, 0.01)
        y = lowPassFilter(y, sr)
        y, sr = extrapolate_audio(y, sr, 6)
        image = spec(y, sr, spec='wavelet')
        
        size = {'original': 32, 'ruido': 128, 'ruidoNorm': 256}[dataType]
        
        test = np.zeros((size, image.shape[0], image.shape[1]))
        test[0] = image
        y_pred = model.predict(test)
        return y_pred[0]
    
    if algorithm == 'featureModel':
        if model not in ['svc', 'lr', 'rf']:
            return -1
        
        model = joblib.load(f'./modelos/{dataType}/{algorithm}/modelos/{model}_{algorithm}_filter_{str(filter)}.pkl')
        scaler = joblib.load(f'./modelos/{dataType}/{algorithm}/scalers/scaler_{algorithm}_filter_{str(filter)}.pkl')
        y, sr = librosa.load(audio)
        
        if dataType == 'ruidoNorm':
            average_rms = joblib.load(f'./modelos/{dataType}/average_rms.pkl')
            y = normalize_audio(y, average_rms)
            y = apply_compression(y)
        
        y = lowPassFilter(y, sr)
        y = removeSilence(y, 0.02)
        features = extract_features(y, sr)
        features = np.array(features).reshape(1, -1)
        features = scaler.transform(features)
        y_pred = model.predict(features)
        return y_pred[0]

In [51]:
res  = predict(audio='../01_Gerard.mp3')
print(res)

(array([4, 0, 2, 0, 2, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0,
       2, 2, 4, 4, 0]), 0)
