In [None]:
def to_mono_float32(arr):
    arr = arr.astype(np.float32)
    if arr.ndim == 1:
        return arr / 32768.0
    elif arr.ndim == 2:
        return arr.mean(axis=1) / 32768.0
    else:
        raise ValueError(f"Formato de audio no soportado: shape={arr.shape}")

class Feedback_Supression(buffer.Buffering):
    """AEC NLMS con VAD basado en frecuencia dominante y SNR, preprocesado HP y sustracción de ruido, logs completos."""

    def __init__(self):
        super().__init__()
        self.fir_length = 128
        self.fir_coeffs = np.zeros(self.fir_length, dtype=np.float32)

        self.mu_base = 0.002
        self.mu_max = 0.005
        self.eps = 1e-9

        self.chunk_history = deque(maxlen=self.fir_length)

        self.fade_in_chunks = 50 * self.CHUNK_NUMBERS
        self.chunk_number = 0

        # --- Parámetros del entorno ---
        self.fs = getattr(minimal.args, "sample_rate", 48000)
        self.frames_per_chunk = getattr(minimal.args, "frames_per_chunk", 1024)

        # --- High-pass first order ---
        self.hp_cutoff = 80.0
        rc = 1.0 / (2 * np.pi * self.hp_cutoff)
        dt = 1.0 / self.fs
        self.hp_alpha = rc / (rc + dt)
        self.hp_prev_x = 0.0
        self.hp_prev_y = 0.0

        # --- Estimador de ruido estacionario ---
        self.noise_beta = 0.995
        self.noise_profile = np.zeros(self.frames_per_chunk, dtype=np.float32)

        # --- FFT thresholds ---
        self.fft_min_energy = 1e-6

    # ===========================================================
    # PREPROCESADO
    # ===========================================================
    def _highpass_chunk(self, x):
        """High-pass primer orden con continuidad entre chunks."""
        y = np.empty_like(x)
        prev_x = self.hp_prev_x
        prev_y = self.hp_prev_y
        a = self.hp_alpha
        for i, xi in enumerate(x):
            yi = a * (prev_y + xi - prev_x)
            y[i] = yi
            prev_x = xi
            prev_y = yi
        self.hp_prev_x = prev_x
        self.hp_prev_y = prev_y
        return y

    def _update_noise_profile_and_subtract(self, x):
        """Actualiza noise_profile con EMA lenta y devuelve x - noise_est."""
        if len(self.noise_profile) != len(x):
            self.noise_profile = np.zeros_like(x)
        self.noise_profile = self.noise_beta * self.noise_profile + (1.0 - self.noise_beta) * x
        return x - self.noise_profile

    def _dominant_freq(self, x):
        """Devuelve frecuencia dominante y magnitud del chunk."""
        N = len(x)
        if N <= 0:
            return 0.0, 0.0
        X = np.fft.rfft(x * np.hanning(N))
        mags = np.abs(X)
        if np.sum(mags**2) < self.fft_min_energy:
            return 0.0, 0.0
        freqs = np.fft.rfftfreq(N, d=1.0/self.fs)
        idx = np.argmax(mags)
        return float(freqs[idx]), float(mags[idx])

    # ===========================================================
    # PROCESAMIENTO PRINCIPAL
    # ===========================================================
    def _record_IO_and_play(self, ADC, DAC, frames, time, status):
        self.chunk_number += 1

        # --- Chunk a reproducir ---
        chunk = self.unbuffer_next_chunk()
        chunk = chunk if chunk is not None else self.zero_chunk
        mono_chunk = to_mono_float32(chunk)
        self.chunk_history.append(mono_chunk)
        while len(self.chunk_history) < self.fir_length:
            self.chunk_history.appendleft(np.zeros_like(mono_chunk))

        ADC_float = to_mono_float32(ADC)

        # --- Preprocesado ---
        ADC_hp = self._highpass_chunk(ADC_float)
        ADC_denoised = self._update_noise_profile_and_subtract(ADC_hp)
        dom_freq, dom_mag = self._dominant_freq(ADC_hp)

        # --- Estimación de eco FIR ---
        echo_est = np.zeros_like(ADC_denoised)
        history = list(self.chunk_history)
        for k in range(self.fir_length):
            x = history[k]
            L = min(len(ADC_denoised), len(x))
            echo_est[:L] += self.fir_coeffs[k] * x[:L]

        error = ADC_denoised - echo_est

        # --- RMS y SNR ---
        rms_error = np.sqrt(np.mean(error**2)) + 1e-9
        rms_voice = np.sqrt(np.mean(ADC_denoised**2)) + 1e-9
        noise_rms = np.sqrt(np.mean(self.noise_profile**2)) + 1e-12
        snr = rms_voice / noise_rms

        fade_in_factor = min(1.0, self.chunk_number / self.fade_in_chunks)

        # --- VAD basado en frecuencia dominante y SNR ---
        voice_detected = (dom_freq > 180.0) and (snr > 15.0)

        # --- NLMS si hay voz ---
        mu_used = []
        if voice_detected:
            for k in range(self.fir_length):
                x = history[k][:len(error)]
                energy = np.dot(x, x) + self.eps
                mu_eff = min(self.mu_base * fade_in_factor / energy, self.mu_max)
                self.fir_coeffs[k] += mu_eff * np.dot(x, error)
                mu_used.append(mu_eff)

        # --- Decaimiento anti-drift ---
        self.fir_coeffs *= 0.9997
        self.fir_coeffs = np.clip(self.fir_coeffs, -0.5, 0.5)

        # --- Normalización salida ---
        if rms_error > 0.4:
            error *= 0.4 / rms_error
        filtered_ADC_stereo = np.column_stack([error, error])
        filtered_ADC_stereo = (filtered_ADC_stereo * 32767).astype(np.int16)

        atten_factor = 0.1 + 0.9 * fade_in_factor
        manual = 1.5
        chunk_to_play = np.clip(chunk.astype(np.float32) * atten_factor * manual, -32768, 32767)

        # Ajuste de forma
        channels = minimal.args.number_of_channels
        frames = minimal.args.frames_per_chunk
        if chunk_to_play.size != frames * channels:
            chunk_to_play = np.resize(chunk_to_play, (frames, channels))
        else:
            chunk_to_play = chunk_to_play.reshape(frames, channels)

        self.play_chunk(DAC, chunk_to_play)

        # --- Logs ---
        fir_energy = np.sum(self.fir_coeffs**2)
        noise_profile_rms = np.sqrt(np.mean(self.noise_profile**2)) + 1e-12
        print("──────────── ITERACIÓN AEC ────────────")
        print(f"Chunks procesados:        {self.chunk_number}")
        print(f"Fade-in factor:           {fade_in_factor:.6f}")
        print(f"RMS voz (mic, preproc):   {rms_voice:.6f}")
        print(f"RMS error:                {rms_error:.6f}")
        print(f"SNR:                      {snr:.2f}")
        print(f"Actividad de voz:         {voice_detected}")
        print(f"Promedio μ aplicado:      {np.mean(mu_used) if mu_used else 0:.6e}")
        print(f"Energía FIR:              {fir_energy:.6f}")
        print(f"Atenuación playback:      {atten_factor:.3f}")
        print(f"Noise profile RMS:        {noise_profile_rms:.6e}")
        print(f"Dominant freq (HP input): {dom_freq:.1f} Hz (mag {dom_mag:.3e})")
        print("────────────────────────────────────────")

        # --- Enviar paquete ---
        self.send(self.pack(self.chunk_number, filtered_ADC_stereo))


MÉTODOS:

In [None]:
def to_mono_float32(arr):
    arr = arr.astype(np.float32)
    if arr.ndim == 1:
        return arr / 32768.0
    elif arr.ndim == 2:
        return arr.mean(axis=1) / 32768.0
    else:
        raise ValueError(f"Formato de audio no soportado: shape={arr.shape}")

Este método transforma el audio a mono. Tanto si es para un solo canal, como para 2, en este último caso te fusiona ambos(media = mean()).
Sirve para facilitar el cálculo de RMS,SNR,FIR etc.. evitando hacer calculo para ambos canales.
Pues la cancelación de eco y filtros se aplican por canales y para no tener que hacerlo doble se transforma a mono.

In [None]:
 def _highpass_chunk(self, x):
        """High-pass primer orden con continuidad entre chunks."""
        y = np.empty_like(x)
        prev_x = self.hp_prev_x
        prev_y = self.hp_prev_y
        a = self.hp_alpha
        for i, xi in enumerate(x):
            yi = a * (prev_y + xi - prev_x)
            y[i] = yi
            prev_x = xi
            prev_y = yi
        self.hp_prev_x = prev_x
        self.hp_prev_y = prev_y
        return y

Este es un método de filtro paso alto, quedandose con las frecuencias más altas y eliminando frecuencias bajas como el ruido o zumbidos.
Siendo X el array de chunk, prev_x y prev_y ultimo valor de entrada y salida del chunk,
Va filtrando el array de chunk y lo va introduciendo el Y (el array de chunk resultado)

In [None]:
def _update_noise_profile_and_subtract(self, x):
        """Actualiza noise_profile con EMA lenta y devuelve x - noise_est."""
        if len(self.noise_profile) != len(x):
            self.noise_profile = np.zeros_like(x)
        self.noise_profile = self.noise_beta * self.noise_profile + (1.0 - self.noise_beta) * x
        return x - self.noise_profile

Estima el ruido estacionario y lo elimina del chunk de entrada usando un factor de suavizado, noise_beta.
noise_profile es el array del ruido estimado de la misma longitud que el chunk para poder restarselo.

In [None]:
 def _dominant_freq(self, x):
        """Devuelve frecuencia dominante y magnitud del chunk."""
        N = len(x)
        if N <= 0:
            return 0.0, 0.0
        X = np.fft.rfft(x * np.hanning(N))
        mags = np.abs(X)
        if np.sum(mags**2) < self.fft_min_energy:
            return 0.0, 0.0
        freqs = np.fft.rfftfreq(N, d=1.0/self.fs)
        idx = np.argmax(mags)
        return float(freqs[idx]), float(mags[idx])


Este método detecta la frecuencia dominante en el array de chunk.
Hanning funciona para unir los extremos de las ondas ya que no son periódicas, si no lo hicieramos la transformada de fourier se inventaría los extremos para que fueran periódicas.

Mags es un array de magnitudes de la transformada de fourier anteriormente calculada.
Freqs es un array de frecuencias correspondiente a cada componente de la FFT.
idx -> indice de la magnitud máxima 
Da la frecuencia y magnitud de la magnitud máxima (frecuencia dominante)

CLASE FEEDBACK_SUPRESSION:

In [None]:
  def _record_IO_and_play(self, ADC, DAC, frames, time, status):
        self.chunk_number += 1

        # --- Chunk a reproducir ---
        chunk = self.unbuffer_next_chunk()
        chunk = chunk if chunk is not None else self.zero_chunk
        mono_chunk = to_mono_float32(chunk)
        self.chunk_history.append(mono_chunk)
        while len(self.chunk_history) < self.fir_length:
            self.chunk_history.appendleft(np.zeros_like(mono_chunk))

        ADC_float = to_mono_float32(ADC)

Siendo ADC el audio que entra y DAC el audio que sale:
Se va sumando 1 al número de chunks para ir capturando audio.
Vamos sacando chunk uno por uno en cada iteración  y si está vacío lo ponemos a 0.
Si la longitud del filtro es mayor (pues cada coeficiente va de la mano de un chunk del historial)a la del array (se rellena de 0)

Preparamos la señal de entrada a mono para tratarla.

In [None]:
  # --- Preprocesado ---
        ADC_hp = self._highpass_chunk(ADC_float)
        ADC_denoised = self._update_noise_profile_and_subtract(ADC_hp)
        dom_freq, dom_mag = self._dominant_freq(ADC_hp)

Aquí estamos limpiando la señal para trabajar con ella:
Le quitamos las frecuencias bajas (filtro paso alto) = ADC_hp
De es audio sin frecuencias bajas, le quitamos el ruido o zumbido que pueda tener (noise_profile) = ADC_denoised, señal con la que trabajaremos 
Y de esa señal sin frecuencias bajas y sin ruido sacaremos su frecuencia dominante (reconocimiento de voz)

In [None]:
# --- Estimación de eco FIR ---
        echo_est = np.zeros_like(ADC_denoised)
        history = list(self.chunk_history)
        for k in range(self.fir_length):
            x = history[k]
            L = min(len(ADC_denoised), len(x))
            echo_est[:L] += self.fir_coeffs[k] * x[:L]

        error = ADC_denoised - echo_est


Los chunks es lo que recoge el micrófono ya reproducido.
Por tanto este método estima el eco gracias a ellos y se lo resta a la señal que se va a reproducir (ADC_denoised).
Gracias a la multiplicación por el coeficiente FIR con el chunk, le damos más o menos sonido a ese chunk dependiendo del eco que tenga en el sonido.

In [None]:
# --- RMS y SNR ---
        rms_error = np.sqrt(np.mean(error**2)) + 1e-9
        rms_voice = np.sqrt(np.mean(ADC_denoised**2)) + 1e-9
        noise_rms = np.sqrt(np.mean(self.noise_profile**2)) + 1e-12
        snr = rms_voice / noise_rms

        fade_in_factor = min(1.0, self.chunk_number / self.fade_in_chunks)

rms_error es el eco real que detecta
rms_voice es la energía que detecta del eco
 Si el rms_error y rms_voice es bajo significa que apenas hay eco porque no lo detecta bien,
 pero si rms_error es bajo y rms_voice es alto, está eliminando eco bien y detecta bien la voz.
 noise_rms es la energía del ruido

 snr es el % de señal útil en comparación con el ruido, si es alta, estamos eliminando bien el ruido.

 fade_in_factor decide la rapidez con la que el algoritmo funciona y cambia la señal. Para que vaya incrementando poco a poco y el sonido no se vea demasiado afectado para que se distorsione, va incrementando con el número de chunks que capta.

In [None]:
        voice_detected = (dom_freq > 180.0) and (snr > 15.0)


Variable flag que determina si hay voz o no.
Si el dominio de fre calculado y el snr (sonido útil), es mayor a esos parámetros (aproximado para cualquier voz), da true

In [None]:
# --- NLMS si hay voz ---
        mu_used = []
        if voice_detected:
            for k in range(self.fir_length):
                x = history[k][:len(error)]
                energy = np.dot(x, x) + self.eps
                mu_eff = min(self.mu_base * fade_in_factor / energy, self.mu_max)
                self.fir_coeffs[k] += mu_eff * np.dot(x, error)
                mu_used.append(mu_eff)


Este método ajusta el coefciente fir dependiendo del error y energía del chunk.
El cambio radical depende del mu_eff y del fade_in_factor, con un límite en la velocidad del mu para que la adaptación no sea muy agresiva y haya inestabilidad en el sonido.

In [None]:
# --- Decaimiento anti-drift ---
        self.fir_coeffs *= 0.9997
        self.fir_coeffs = np.clip(self.fir_coeffs, -0.5, 0.5)

Esto estandariza los coeficientes para que en la adaptación de los chunks no se desborden o acumulen errores. Limita su valor a (-0.5, 0.5) y multiplica por 0.9997. Des esta manera es como hacer un pequeño reset por si se ha estimado mal el eco y estan aprendiendo de ruido.

In [None]:
# --- Normalización salida ---
        if rms_error > 0.4:
            error *= 0.4 / rms_error
        filtered_ADC_stereo = np.column_stack([error, error])
        filtered_ADC_stereo = (filtered_ADC_stereo * 32767).astype(np.int16)

        atten_factor = 0.1 + 0.9 * fade_in_factor
        manual = 1.5
        chunk_to_play = np.clip(chunk.astype(np.float32) * atten_factor * manual, -32768, 32767)


Si el error llega a subir mucho se regula y se rebaja a la mitad para no dar lugar a distorsiones o inestabilidad en el sistema. Se dividen los audios en estéreo para poder reproducirlo de nuevo.
Además atenua el sonido del chunk para reproducirlo con el menos eco posible que tenga.


In [None]:
 # Ajuste de forma
        channels = minimal.args.number_of_channels
        frames = minimal.args.frames_per_chunk
        if chunk_to_play.size != frames * channels:
            chunk_to_play = np.resize(chunk_to_play, (frames, channels))
        else:
            chunk_to_play = chunk_to_play.reshape(frames, channels)

        self.play_chunk(DAC, chunk_to_play)


Aquí se preparan el array de doble dimensión (dos canales) para poder reproducirlos, si hay algún problema se hace reshape.
Finalmente se reproduce el audio de los chunks al DAC (audio de salida)