<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Testando-Lógica" data-toc-modified-id="Testando-Lógica-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Testando Lógica</a></span></li></ul></div>

Ideias:

* Criar um diretório específico na pasta do projeto para alocar arquivos mp3 alvos do consumo do modelo
* Realizar gravações de áudio usando o app Voice Recorder e automaticamente posicionar os arquivos mp3 no diretório específico criado para leitura e consumo do modelo
* Criar script python com algumas regras de verificação de arquivos nesse diretório, como por exemplo:
    * Laço infinito de validação de presença de arquivos (ou então um start manual após a gravação)
    * Validação da presença de mais de 1 arquivo (warning pro usuário)
    * Validação de extensão do arquivo disponibilizado no diretório (necessariamente mp3)
* Após a leitura e consumo do modelo, propor a exclusão do arquivo a criação de um histórico de predições

In [1]:
# Importação de bibliotecas
import os
import librosa
import time

In [2]:
# Definindo variáveis de caminho
PROJECT_PATH = '/home/paninit/workspaces/voice-unlocker'
TARGET_PATH_NAME = 'predictions/audios'
TARGET_PATH = os.path.join(PROJECT_PATH, TARGET_PATH_NAME)

AUDIO_EXT = '.mp3'

# Testando Lógica

In [3]:
# Criando diretório, caso inexistente
if not os.path.isdir(TARGET_PATH):
    os.makedirs(TARGET_PATH)

In [4]:
# Verificando arquivos no diretório
os.listdir(TARGET_PATH)

['2021_03_11_20_49_08.mp3']

In [5]:
# Regra 1 - quantidade de arquivos no diretório
valid_files = [file for file in os.listdir(TARGET_PATH) if os.path.splitext(file)[-1] == AUDIO_EXT]
qtd_files = len(valid_files)

if qtd_files > 1:
    print(f'Foram encontrados {qtd_files} arquivos {AUDIO_EXT} no diretório. Necessário validar um áudio por vez.')
qtd_files

1

___

In [6]:
# Regra 2 - nenhum arquivo encontrado com a extensão correta
if qtd_files == 0:
    print(f'Nenhum arquivo {AUDIO_EXT} encontrado no diretório. Verificar extensão do áudio disponibilizado.')

___

In [7]:
# Regra 3 - manter mais recente em caso de múltiplos audios
if qtd_files > 1:
    ctimes = [os.path.getctime(os.path.join(TARGET_PATH, file)) for file in valid_files]
    audio_ctimes = [time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ct)) for ct in ctimes] 
    print(audio_ctimes)

    idx_max_ctime = audio_ctimes.index(max(audio_ctimes))
    last_valid_audio = valid_files[idx_max_ctime]
    print(last_valid_audio)

___

    # Regra 4 - expurgo dos áudios
    for file in os.listdir(TARGET_PATH):
        os.remove(os.path.join(TARGET_PATH, file))

In [38]:
valid_files

['2021_03_11_20_49_08.mp3']

In [37]:
audio_file = valid_files[0]
audio = [librosa.load(os.path.join(TARGET_PATH, audio_file), sr=22050)[0]]
audio



[array([0., 0., 0., ..., 0., 0., 0.], dtype=float32)]

In [39]:
import pandas as pd
audio_df = pd.DataFrame([audio])
audio_df.columns = ['signal']
audio_df

Unnamed: 0,signal
0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [42]:
import joblib
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

# Definindo função para separação de faixas de frequências (BER)
def calc_split_freq_bin(spec, split_freq, sr):
    """
    Função responsável por calcular o índice da frequência de separação F
    no espectro discreto de frequências

    Parâmetros
    ----------
    :param spec: espectrograma calculado via STFT [type: ndarray]
    :param split_freq: frequência de separação F [type: int]
    :param sr: taxa de amostragem do sinal [type: int]

    Retorno
    -------
    :return idx_split_freq: retorna o índice relacionado ao parâmetro F no espectro discreto [type: int]
    :return split_freq_bin: retorna a frequência discreta relacionada ao parâmetro F [type: float]
    """

    # Intervalo de frequências (Nyquist)
    f_range = sr / 2

    # Intervalo de frequências para cada faixa discreta individual
    qtd_freq_bins = spec.shape[0]
    f_delta_bin = f_range / qtd_freq_bins

    # Calculando índice do parâmetro F nas faixas discretas
    idx_split_freq = int(np.floor(split_freq / f_delta_bin))

    # Calculando faixa de frequência presente na matriz espectral
    freq_bins = np.linspace(0, f_range, qtd_freq_bins)
    split_freq_bin = freq_bins[idx_split_freq]

    return idx_split_freq, split_freq_bin

# Definindo função para o cálculo da Taxa de Energia de Banda (BER)
def calc_ber(spec, split_freq, sr):
    """
    Função responsável por calcular a taxa de energia de banda (BER)

    Parâmetros
    ----------
    :param spec: espectrograma calculado via STFT [type: ndarray]
    :param split_freq: frequência de separação F [type: int]
    :param sr: taxa de amostragem do sinal [type: int]

    Retorno
    -------
    :return ber: taxa de energia de banda para cada frame t [type: np.array]
    """

    # Calculando faixa de frequência discreta do parâmetro F
    idx_split_freq, split_freq_bin = calc_split_freq_bin(spec, split_freq, sr)
    bers = []

    # Transformando amplitudes do espectro em potências
    power_spec = np.abs(spec) ** 2

    # Aplicando transpose para iteração em cada frame
    power_spec = power_spec.T

    # Calculando somatório para cada frame
    for frame in power_spec:
        sum_power_low_freq = frame[:idx_split_freq].sum()
        sum_power_high_freq = frame[idx_split_freq:].sum()
        ber_frame = sum_power_low_freq / sum_power_high_freq
        bers.append(ber_frame)

    return np.array(bers)

# Definindo transformador para envelope de amplitude
class AmplitudeEnvelop(BaseEstimator, TransformerMixin):
    """
    Classe responsável por extrair o envelope de amplitude de sinais de áudio
    considerando agregados estatísticos pré definidos.

    Parâmetros
    ----------
    :param frame_size: quantidade de amostrar por enquadramento do sinal [type: int]
    :param hop_length: parâmetro de overlapping de quadros do sinal [type: int]
    :param signal_col: referência da coluna de armazenamento do sinal na base [type: string, default='signal']
    :param feature_aggreg: lista de agregadores estatísticos aplicados após a extração da features
        *default=['mean', 'median', 'std', 'var', 'max', 'min']

    Retorno
    -------
    :return X: base de dados contendo os agregados estatísticos para o envelope de amplitude [type: pd.DataFrame]

    Aplicação
    ---------
    ae_extractor = AmplitudeEnvelop(frame_size=FRAME_SIZE, hop_length=HOP_LENGTH, 
                                signal_col='signal', feature_aggreg=FEATURE_AGGREG)
    X_ae = ae_extractor.fit_transform(X)                          
    """
    
    def __init__(self, frame_size, hop_length, signal_col='signal',
                 feature_aggreg=['mean', 'median', 'std', 'var', 'max', 'min']):
        self.frame_size = frame_size
        self.hop_length = hop_length
        self.signal_col = signal_col
        self.feature_aggreg = feature_aggreg
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        
        # Retornando o envelope de amplitude para cada frame do sinal
        X['ae'] = X[self.signal_col].apply(lambda x: np.array([max(x[i:i+self.frame_size]) for i in range(0, len(x), self.hop_length)]))
        
        # Criando dicionário com agregações do envelope de amplitude de cada sinal
        X['aggreg_dict'] = X['ae'].apply(lambda x: pd.DataFrame(x).agg(self.feature_aggreg))
        
        # Extraindo agregações e enriquecendo dataset
        for agg in self.feature_aggreg:
            X['ae_' + agg] = X['aggreg_dict'].apply(lambda x: x[0][agg])
            
        # Eliminando colunas adicionais
        X = X.drop(['ae', 'aggreg_dict'], axis=1)
            
        return X
    
# Definindo transformador para RMS Energy
class RMSEnergy(BaseEstimator, TransformerMixin):
    """
    Classe responsável por extrair a raíz da energia média quadrática de sinais de áudio
    considerando agregados estatísticos pré definidos.

    Parâmetros
    ----------
    :param frame_size: quantidade de amostrar por enquadramento do sinal [type: int]
    :param hop_length: parâmetro de overlapping de quadros do sinal [type: int]
    :param signal_col: referência da coluna de armazenamento do sinal na base [type: string, default='signal']
    :param feature_aggreg: lista de agregadores estatísticos aplicados após a extração da features
        *default=['mean', 'median', 'std', 'var', 'max', 'min']

    Retorno
    -------
    :return X: base de dados contendo os agregados estatísticos para a raíz da energia média quadrática [type: pd.DataFrame]

    Aplicação
    ---------
    rms_extractor = RMSEnergy(frame_size=FRAME_SIZE, hop_length=HOP_LENGTH, 
                              signal_col='signal', feature_aggreg=FEATURE_AGGREG)
    X_rms = rms_extractor.fit_transform(X)
    """
    
    def __init__(self, frame_size, hop_length, signal_col='signal',
                 feature_aggreg=['mean', 'median', 'std', 'var', 'max', 'min']):
        self.frame_size = frame_size
        self.hop_length = hop_length
        self.signal_col = signal_col
        self.feature_aggreg = feature_aggreg
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        
        # Extraindo feature para cada sinal
        X['rms_engy'] = X[self.signal_col].apply(lambda x: librosa.feature.rms(x, frame_length=self.frame_size, 
                                                                               hop_length=self.hop_length)[0])
        
        # Criando dicionário com agregações
        X['aggreg_dict'] = X['rms_engy'].apply(lambda x: pd.DataFrame(x).agg(self.feature_aggreg))
        
        # Extraindo agregações e enriquecendo dataset
        for agg in self.feature_aggreg:
            X['rms_engy_' + agg] = X['aggreg_dict'].apply(lambda x: x[0][agg])
            
        # Eliminando colunas adicionais
        X = X.drop(['rms_engy', 'aggreg_dict'], axis=1)
            
        return X
    
# Definindo transformador para Zero Crossing Rate
class ZeroCrossingRate(BaseEstimator, TransformerMixin):
    """
    Classe responsável por extrair a taxa de cruzamento de zero de sinais de áudio
    considerando agregados estatísticos pré definidos.

    Parâmetros
    ----------
    :param frame_size: quantidade de amostrar por enquadramento do sinal [type: int]
    :param hop_length: parâmetro de overlapping de quadros do sinal [type: int]
    :param signal_col: referência da coluna de armazenamento do sinal na base [type: string, default='signal']
    :param feature_aggreg: lista de agregadores estatísticos aplicados após a extração da features
        *default=['mean', 'median', 'std', 'var', 'max', 'min']

    Retorno
    -------
    :return X: base de dados contendo os agregados estatísticos para a taxa de cruzamento de zero [type: pd.DataFrame]

    Aplicação
    ---------
    zcr_extractor = ZeroCrossingRate(frame_size=FRAME_SIZE, hop_length=HOP_LENGTH, 
                                     signal_col='signal', feature_aggreg=FEATURE_AGGREG)
    X_zcr = zcr_extractor.fit_transform(X)
    """
    
    def __init__(self, frame_size, hop_length, signal_col='signal',
                 feature_aggreg=['mean', 'median', 'std', 'var', 'max', 'min']):
        self.frame_size = frame_size
        self.hop_length = hop_length
        self.signal_col = signal_col
        self.feature_aggreg = feature_aggreg
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        
        # Extraindo feature para cada sinal
        X['zcr'] = X[self.signal_col].apply(lambda x: librosa.feature.zero_crossing_rate(x, frame_length=self.frame_size, 
                                                                                         hop_length=self.hop_length)[0])
        
        # Criando dicionário com agregações
        X['aggreg_dict'] = X['zcr'].apply(lambda x: pd.DataFrame(x).agg(self.feature_aggreg))
        
        # Extraindo agregações e enriquecendo dataset
        for agg in self.feature_aggreg:
            X['zcr_' + agg] = X['aggreg_dict'].apply(lambda x: x[0][agg])
            
        # Eliminando colunas adicionais
        X = X.drop(['zcr', 'aggreg_dict'], axis=1)
            
        return X

# Definindo transformador para BER
class BandEnergyRatio(BaseEstimator, TransformerMixin):
    """
    Classe responsável por extrair a taxa de energia de banda de sinais de áudio
    considerando agregados estatísticos pré definidos.

    Parâmetros
    ----------
    :param frame_size: quantidade de amostrar por enquadramento do sinal [type: int]
    :param hop_length: parâmetro de overlapping de quadros do sinal [type: int]
    :param split_freq: frequência de separação entre altas e baixas frequências [type: int]
    :param sr: taxa de amostragem do sinal de áudio [type: int]
    :param signal_col: referência da coluna de armazenamento do sinal na base [type: string, default='signal']
    :param feature_aggreg: lista de agregadores estatísticos aplicados após a extração da features
        *default=['mean', 'median', 'std', 'var', 'max', 'min']

    Retorno
    -------
    :return X: base de dados contendo os agregados estatísticos para a taxa de energia de banda [type: pd.DataFrame]

    Aplicação
    ---------
    ber_extractor = BandEnergyRatio(frame_size=FRAME_SIZE, hop_length=HOP_LENGTH, 
                                    signal_col='signal', feature_aggreg=FEATURE_AGGREG)
    X_ber = ber_extractor.fit_transform(X)
    """
    
    def __init__(self, frame_size, hop_length, split_freq, sr, signal_col='signal',
                 feature_aggreg=['mean', 'median', 'std', 'var', 'max', 'min']):
        self.frame_size = frame_size
        self.hop_length = hop_length
        self.split_freq = split_freq
        self.sr = sr
        self.signal_col = signal_col
        self.feature_aggreg = feature_aggreg
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        
        # Calculando espectrograma dos sinais
        X['spec'] = X[self.signal_col].apply(lambda x: librosa.stft(y=x, n_fft=self.frame_size, 
                                                                    hop_length=self.hop_length))
        
        # Calculando BER
        X['ber'] = X['spec'].apply(lambda x: calc_ber(spec=x, split_freq=self.split_freq, sr=self.sr))
        
        # Criando dicionário com agregações
        X['aggreg_dict'] = X['ber'].apply(lambda x: pd.DataFrame(x).agg(self.feature_aggreg))
        
        # Extraindo agregações e enriquecendo dataset
        for agg in self.feature_aggreg:
            X['ber_' + agg] = X['aggreg_dict'].apply(lambda x: x[0][agg])
            
        # Eliminando colunas adicionais
        X = X.drop(['spec', 'ber', 'aggreg_dict'], axis=1)
            
        return X
    
# Definindo transformador para Spectral Centroid
class SpectralCentroid(BaseEstimator, TransformerMixin):
    """
    Classe responsável por extrair o centroide espectral de sinais de áudio
    considerando agregados estatísticos pré definidos.

    Parâmetros
    ----------
    :param frame_size: quantidade de amostrar por enquadramento do sinal [type: int]
    :param hop_length: parâmetro de overlapping de quadros do sinal [type: int]
    :param sr: taxa de amostragem do sinal de áudio [type: int]
    :param signal_col: referência da coluna de armazenamento do sinal na base [type: string, default='signal']
    :param feature_aggreg: lista de agregadores estatísticos aplicados após a extração da features
        *default=['mean', 'median', 'std', 'var', 'max', 'min']

    Retorno
    -------
    :return X: base de dados contendo os agregados estatísticos para o centroide espectral [type: pd.DataFrame]

    Aplicação
    ---------
    sc_extractor = SpectralCentroid(frame_size=FRAME_SIZE, hop_length=HOP_LENGTH, 
                                     signal_col='signal', feature_aggreg=FEATURE_AGGREG)
    X_sc = sc_extractor.fit_transform(X)
    """
    
    def __init__(self, frame_size, hop_length, sr, signal_col='signal',
                 feature_aggreg=['mean', 'median', 'std', 'var', 'max', 'min']):
        self.frame_size = frame_size
        self.hop_length = hop_length
        self.sr = sr
        self.signal_col = signal_col
        self.feature_aggreg = feature_aggreg
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        
        # Calculando feature
        X['sc'] = X[self.signal_col].apply(lambda x: librosa.feature.spectral_centroid(y=x, sr=self.sr,
                                                                                       n_fft=self.frame_size,
                                                                                       hop_length=self.hop_length)[0])
        
        # Criando dicionário com agregações
        X['aggreg_dict'] = X['sc'].apply(lambda x: pd.DataFrame(x).agg(self.feature_aggreg))
        
        # Extraindo agregações e enriquecendo dataset
        for agg in self.feature_aggreg:
            X['sc_' + agg] = X['aggreg_dict'].apply(lambda x: x[0][agg])
            
        # Eliminando colunas adicionais
        X = X.drop(['sc', 'aggreg_dict'], axis=1)
            
        return X
    
# Definindo transformador para BandWidth
class BandWidth(BaseEstimator, TransformerMixin):
    """
    Classe responsável por extrair a largura de banda de sinais de áudio
    considerando agregados estatísticos pré definidos.

    Parâmetros
    ----------
    :param frame_size: quantidade de amostrar por enquadramento do sinal [type: int]
    :param hop_length: parâmetro de overlapping de quadros do sinal [type: int]
    :param sr: taxa de amostragem do sinal de áudio [type: int]
    :param signal_col: referência da coluna de armazenamento do sinal na base [type: string, default='signal']
    :param feature_aggreg: lista de agregadores estatísticos aplicados após a extração da features
        *default=['mean', 'median', 'std', 'var', 'max', 'min']

    Retorno
    -------
    :return X: base de dados contendo os agregados estatísticos para a largura de banda [type: pd.DataFrame]

    Aplicação
    ---------
    bw_extractor = BandWidth(frame_size=FRAME_SIZE, hop_length=HOP_LENGTH, 
                             signal_col='signal', feature_aggreg=FEATURE_AGGREG)
    X_bw = bw_extractor.fit_transform(X)
    """
    
    def __init__(self, frame_size, hop_length, sr, signal_col='signal',
                 feature_aggreg=['mean', 'median', 'std', 'var', 'max', 'min']):
        self.frame_size = frame_size
        self.hop_length = hop_length
        self.sr = sr
        self.signal_col = signal_col
        self.feature_aggreg = feature_aggreg
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        
        # Calculando feature
        X['bw'] = X[self.signal_col].apply(lambda x: librosa.feature.spectral_bandwidth(y=x, sr=self.sr,
                                                                                        n_fft=self.frame_size,
                                                                                        hop_length=self.hop_length)[0])
        
        # Criando dicionário com agregações
        X['aggreg_dict'] = X['bw'].apply(lambda x: pd.DataFrame(x).agg(self.feature_aggreg))
        
        # Extraindo agregações e enriquecendo dataset
        for agg in self.feature_aggreg:
            X['bw_' + agg] = X['aggreg_dict'].apply(lambda x: x[0][agg])
            
        # Eliminando colunas adicionais
        X = X.drop(['bw', 'aggreg_dict'], axis=1)
            
        return X
    
# Definindo transformador para agregação de espectrograma em grupos
class GroupSpecAggreg(BaseEstimator, TransformerMixin):
    """
    Classe responsável por extrair a potência espectral de altas e baixas frequências
    de sinais de áudio considerando agregados estatísticos pré definidos.

    Parâmetros
    ----------
    :param frame_size: quantidade de amostrar por enquadramento do sinal [type: int]
    :param hop_length: parâmetro de overlapping de quadros do sinal [type: int]
    :param sr: taxa de amostragem do sinal de áudio [type: int]
    :param split_freq: frequência de separação entre altas e baixas frequências [type: int]
    :param freq_cat_aggreg: agregador aplicado no agrupamento das potências [type: int, default='sum']
    :param signal_col: referência da coluna de armazenamento do sinal na base [type: string, default='signal']
    :param feature_aggreg: lista de agregadores estatísticos aplicados após a extração da features
        *default=['mean', 'median', 'std', 'var', 'max', 'min']

    Retorno
    -------
    :return X: base de dados contendo os agregados estatísticos para a potência espectral agrupada [type: pd.DataFrame]

    Aplicação
    ---------
    spec_extractor = GroupSpecAggreg(frame_size=FRAME_SIZE, hop_length=HOP_LENGTH, 
                                     signal_col='signal', feature_aggreg=FEATURE_AGGREG)
    X_spec = spec_extractor.fit_transform(X)
    """
    
    def __init__(self, frame_size, hop_length, sr, split_freq, freq_cat_aggreg='sum',
                 signal_col='signal', feature_aggreg=['mean', 'median', 'std', 'var', 'max', 'min']):
        self.frame_size = frame_size
        self.hop_length = hop_length
        self.sr = sr
        self.split_freq = split_freq
        self.freq_cat_aggreg = freq_cat_aggreg
        self.signal_col = signal_col
        self.feature_aggreg = feature_aggreg
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        
        # Criando DataFrame vazio e aplicando STFT no sinal
        all_spec_agg = pd.DataFrame()
        X['spec'] = X[self.signal_col].apply(lambda x: np.abs(librosa.stft(y=x, n_fft=self.frame_size, 
                                                                           hop_length=self.hop_length))**2)
        
        idx_split, split_freq = calc_split_freq_bin(X['spec'][0], split_freq=self.split_freq, sr=self.sr)
        frequency_bins = np.linspace(0, self.sr/2, 1025)
        
        # Iterando sobre cada espectrograma de cada sinal
        for spec in X['spec']:
            # DataFrame intermediário para agregações de cada sinal
            signal_spec_agg = pd.DataFrame()
            i = 0
            
            # Separando frequências de acordo com threshold estabelecido
            spec_data = pd.DataFrame(spec)
            spec_data.reset_index(inplace=True)
            spec_data['freq_cat'] = spec_data['index'].apply(lambda x: 'low_freq_pwr' if x <= idx_split else 'high_freq_pwr')
            
            # Somando potências de baixas e altas frequências
            spec_data_sum = spec_data.groupby(by='freq_cat').agg(self.freq_cat_aggreg)
            spec_data_sum.drop('index', axis=1, inplace=True)
            
            # Agregando resultado agregado separado por grupo de frequências
            S_aggreg = pd.DataFrame(spec_data_sum).agg(self.feature_aggreg, axis=1)
            #print(spec_data_sum)
            #print(S_aggreg)

            # Iterando sobre cada agregador para gerar um novo DataFrame
            for agg in self.feature_aggreg:
                S_agg = pd.DataFrame(S_aggreg[agg]).T
                S_agg.reset_index(inplace=True, drop=True)
                S_agg.columns = [col + '_' + agg for col in S_agg.columns]
                
                # Unindo agregadores em DataFrame intermediário do sinal
                if i == 0:
                    signal_spec_agg = S_agg.copy()
                else:
                    signal_spec_agg = signal_spec_agg.merge(S_agg, left_index=True, right_index=True)
                i += 1
            
            # Empilhando compilado agregado de cada sinal
            all_spec_agg = all_spec_agg.append(signal_spec_agg)
        
        # Enriquecendo dataset com agregações geradas
        all_spec_agg.reset_index(inplace=True, drop=True)
        X = X.merge(all_spec_agg, left_index=True, right_index=True)
        
        # Dropando colunas auxiliares
        X.drop('spec', axis=1, inplace=True)
        
        return X
    
# Definindo transformador para agregação individual de espectrograma
class MFCCsAggreg(BaseEstimator, TransformerMixin):
    """
    Classe responsável por extrair as componentes MFCCs (primeira e segunda derivada)
    de sinais de áudio considerando agregados estatísticos pré definidos.

    Parâmetros
    ----------
    :param n_mfcc: quantidade de componentes MFCCs extraídas [type: int]
    :param order: ordem das derivadas extraídas [type: int, default=0]
    :param signal_col: referência da coluna de armazenamento do sinal na base [type: string, default='signal']
    :param feature_aggreg: lista de agregadores estatísticos aplicados após a extração da features
        *default=['mean', 'median', 'std', 'var', 'max', 'min']

    Retorno
    -------
    :return X: base de dados contendo os agregados estatísticos para as componentes MFCCs [type: pd.DataFrame]

    Aplicação
    ---------
    mfcc_extractor = MFCCsAggreg(frame_size=FRAME_SIZE, hop_length=HOP_LENGTH, 
                                 signal_col='signal', feature_aggreg=FEATURE_AGGREG)
    X_mfcc = mfcc_extractor.fit_transform(X)
    """
    
    def __init__(self, n_mfcc, order=0, signal_col='signal',
                 feature_aggreg=['mean', 'median', 'std', 'var', 'max', 'min']):
        self.n_mfcc = n_mfcc
        self.order = order
        self.signal_col = signal_col
        self.feature_aggreg = feature_aggreg
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        
        # Criando DataFrame vazio e retornando mfccs
        all_mfcc_agg = pd.DataFrame()
        X['mfcc'] = X[self.signal_col].apply(lambda x: librosa.feature.mfcc(y=x, n_mfcc=self.n_mfcc))
        
        # Calculando derivadas (se aplicáveis)
        if self.order == 1:
            X['d1_mfcc'] = X['mfcc'].apply(lambda x: librosa.feature.delta(x, order=1))
        
        if self.order == 2:
            X['d1_mfcc'] = X['mfcc'].apply(lambda x: librosa.feature.delta(x, order=1))
            X['d2_mfcc'] = X['mfcc'].apply(lambda x: librosa.feature.delta(x, order=2))
        
        # Iterando sobre cada conjunto de coeficientes mfccs de cada sinal
        if self.order == 0:
            for mfcc in X['mfcc']:
                # DataFrame intermediário para agregações de cada sinal
                signal_mfcc_agg = pd.DataFrame()
                i = 0

                # Agregando dimensão temporal do espectrograma (eixo 1)
                M_aggreg = pd.DataFrame(mfcc).agg(self.feature_aggreg, axis=1)
                M_aggreg.index = ['mfcc_c' + str(i) for i in range(1, self.n_mfcc + 1)]

                # Iterando sobre cada agregador para gerar um novo DataFrame
                for agg in self.feature_aggreg:
                    M_agg = pd.DataFrame(M_aggreg[agg]).T
                    M_agg.reset_index(inplace=True, drop=True)
                    M_agg.columns = [col + '_' + agg for col in M_agg.columns]

                    # Unindo agregadores em DataFrame intermediário do sinal
                    if i == 0:
                        signal_mfcc_agg = M_agg.copy()
                    else:
                        signal_mfcc_agg = signal_mfcc_agg.merge(M_agg, left_index=True, right_index=True)
                    i += 1

                # Empilhando compilado agregado de cada sinal
                all_mfcc_agg = all_mfcc_agg.append(signal_mfcc_agg)
                to_drop = ['mfcc']
                
        elif self.order == 1:
            for mfcc, d1_mfcc in X.loc[:, ['mfcc', 'd1_mfcc']].values:
                # DataFrame intermediário para agregações de cada sinal
                signal_mfcc_agg = pd.DataFrame()
                i = 0

                # Agregando dimensão temporal do espectrograma (eixo 1)
                M_aggreg_d0 = pd.DataFrame(mfcc).agg(self.feature_aggreg, axis=1)
                M_aggreg_d0.index = ['mfcc_c' + str(i) for i in range(1, self.n_mfcc + 1)]
                
                M_aggreg_d1 = pd.DataFrame(d1_mfcc).agg(self.feature_aggreg, axis=1)
                M_aggreg_d1.index = ['d1_mfcc_c' + str(i) for i in range(1, self.n_mfcc + 1)]

                # Iterando sobre cada agregador para gerar um novo DataFrame
                for agg in self.feature_aggreg:
                    M_agg_d0 = pd.DataFrame(M_aggreg_d0[agg]).T
                    M_agg_d0.reset_index(inplace=True, drop=True)
                    M_agg_d0.columns = [col + '_' + agg for col in M_agg_d0.columns]
                                       
                    M_agg_d1 = pd.DataFrame(M_aggreg_d1[agg]).T
                    M_agg_d1.reset_index(inplace=True, drop=True)
                    M_agg_d1.columns = [col + '_' + agg for col in M_agg_d1.columns]
                    
                    M_agg = M_agg_d0.merge(M_agg_d1, left_index=True, right_index=True)

                    # Unindo agregadores em DataFrame intermediário do sinal
                    if i == 0:
                        signal_mfcc_agg = M_agg.copy()
                    else:
                        signal_mfcc_agg = signal_mfcc_agg.merge(M_agg, left_index=True, right_index=True)
                    i += 1

                # Empilhando compilado agregado de cada sinal
                all_mfcc_agg = all_mfcc_agg.append(signal_mfcc_agg)
                to_drop = ['mfcc', 'd1_mfcc']
                
        elif self.order == 2:
            for mfcc, d1_mfcc, d2_mfcc in X.loc[:, ['mfcc', 'd1_mfcc', 'd2_mfcc']].values:
                # DataFrame intermediário para agregações de cada sinal
                signal_mfcc_agg = pd.DataFrame()
                i = 0

                # Agregando dimensão temporal do espectrograma (eixo 1)
                M_aggreg_d0 = pd.DataFrame(mfcc).agg(self.feature_aggreg, axis=1)
                M_aggreg_d0.index = ['mfcc_c' + str(i) for i in range(1, self.n_mfcc + 1)]
                
                M_aggreg_d1 = pd.DataFrame(d1_mfcc).agg(self.feature_aggreg, axis=1)
                M_aggreg_d1.index = ['d1_mfcc_c' + str(i) for i in range(1, self.n_mfcc + 1)]
                
                M_aggreg_d2 = pd.DataFrame(d2_mfcc).agg(self.feature_aggreg, axis=1)
                M_aggreg_d2.index = ['d2_mfcc_c' + str(i) for i in range(1, self.n_mfcc + 1)]

                # Iterando sobre cada agregador para gerar um novo DataFrame
                for agg in self.feature_aggreg:
                    M_agg_d0 = pd.DataFrame(M_aggreg_d0[agg]).T
                    M_agg_d0.reset_index(inplace=True, drop=True)
                    M_agg_d0.columns = [col + '_' + agg for col in M_agg_d0.columns]
                                       
                    M_agg_d1 = pd.DataFrame(M_aggreg_d1[agg]).T
                    M_agg_d1.reset_index(inplace=True, drop=True)
                    M_agg_d1.columns = [col + '_' + agg for col in M_agg_d1.columns]
                    
                    M_agg_d2 = pd.DataFrame(M_aggreg_d2[agg]).T
                    M_agg_d2.reset_index(inplace=True, drop=True)
                    M_agg_d2.columns = [col + '_' + agg for col in M_agg_d2.columns]
                    
                    M_agg = M_agg_d0.merge(M_agg_d1, left_index=True, right_index=True)
                    M_agg = M_agg.merge(M_agg_d2, left_index=True, right_index=True)

                    # Unindo agregadores em DataFrame intermediário do sinal
                    if i == 0:
                        signal_mfcc_agg = M_agg.copy()
                    else:
                        signal_mfcc_agg = signal_mfcc_agg.merge(M_agg, left_index=True, right_index=True)
                    i += 1

                # Empilhando compilado agregado de cada sinal
                all_mfcc_agg = all_mfcc_agg.append(signal_mfcc_agg)
                to_drop = ['mfcc', 'd1_mfcc', 'd2_mfcc']
        
        # Enriquecendo dataset com agregações geradas
        all_mfcc_agg.reset_index(inplace=True, drop=True)
        X = X.merge(all_mfcc_agg, left_index=True, right_index=True)
        
        # Dropando colunas auxiliares
        X.drop(to_drop, axis=1, inplace=True)
        
        return X


# Definindo variáveis para extração de artefatos
PIPELINE_PATH = os.path.join(PROJECT_PATH, 'pipelines')
PIPELINE_NAME = 'audio_fe_pipeline.pkl'
MODEL_PATH = os.path.join(PROJECT_PATH, 'model')
MODEL_NAME = 'lgbm_clf.pkl'

# Lendo pipeline de preparação e modelo treinado
pipeline = joblib.load(os.path.join(PIPELINE_PATH, PIPELINE_NAME))
model = joblib.load(os.path.join(MODEL_PATH, MODEL_NAME))

In [43]:
audio_prep = pipeline.fit_transform(audio_df)

In [44]:
audio_prep

Unnamed: 0,signal,ae_mean,ae_median,ae_std,ae_var,ae_max,ae_kurtosis,ae_skew,rms_engy_mean,rms_engy_median,...,d2_mfcc_c4_skew,d2_mfcc_c5_skew,d2_mfcc_c6_skew,d2_mfcc_c7_skew,d2_mfcc_c8_skew,d2_mfcc_c9_skew,d2_mfcc_c10_skew,d2_mfcc_c11_skew,d2_mfcc_c12_skew,d2_mfcc_c13_skew
0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.158322,0.13208,0.134365,0.018054,0.581573,-0.135131,0.750086,0.053612,0.039259,...,0.025019,0.290705,0.130415,0.22064,-0.157529,0.101512,-0.049428,-0.125009,-0.06242,-0.131471


In [49]:
df_audios = pd.DataFrame()
df_audios['audio_file'] = [audio_file]
df_audios

Unnamed: 0,audio_file
0,2021_03_11_20_49_08.mp3
