# Comparando audios con descriptores MFCC

**Curso**: CC5213 - Recuperación de Información Multimedia  
**Profesor**: Juan Manuel Barrios  
**Fecha**: 15 de abril de 2025




 Para calcular descriptores de un audio vamos a usar la librería **LibROSA**. Instalar con:
 
 ```
 pip install librosa
 ```

NO USAR `conda install librosa` porque fallará por un error de versiones de librerías (al menos la versión para windows).

LibROSA puede leer archivos de audio formato `wav`. Ver https://librosa.org/doc/main/generated/librosa.load.html:

```
    samples, sample_rate = librosa.load(archivo_wav, sr=None)
```

El argumento `sr` es el sample_rate que se desea leer los samples. Por defecto usa `sr=22050`  (hace una conversion). Con `sr=None` usar el mismo sample_rate del archivo wav.

Para convertir cualquier audio o video a wav usaremos FFmpeg.

Para calcular descriptores usaremos la implementación de MFCC de LibROSA https://librosa.org/doc/main/generated/librosa.feature.mfcc.html:

```
    matriz = librosa.feature.mfcc(y=samples, sr=sr, n_mfcc=dimension, n_fft=ventana, hop_length=salto)
    descriptores_mfcc = matriz.transpose()
```

 * `n_mfcc` es la dimensión de los descriptores a calcular (cantidad de canales a representar). Usualmente un número entre 20 y 30.
 * `n_fft` es el tamaño de ventana a representar (cantidad de samples). Idealmente debe ser potencia de 2 (e.g. 512, 1024, 2048, 4096, 8192) para que el cálculo de la Transformada de Fourier sea rápido.
 * `hop_length` es la cantidad de samples a desplazarse para calcular el siguiente descriptor. Puede ser el mismo número `n_fft` para no tener traslape o `n_fft/2` para tener un traslape de la mitad de la ventana.

Notar que LibROSA retorna **descriptores como columnas**, pero comúnmente para calcular distancias los descriptores se usan como filas, por lo que es recomendable llamar a la función `transpose()` para retornarlos como fila.




# Ejemplo 1: Calcular descriptores MFCC de un archivo de audio


In [None]:
import numpy
import os.path
import subprocess
import librosa


#funcion que recibe un nombre de archivo y llama a FFmpeg para crear un archivo raw
def convertir_a_wav(filename, sample_rate, carpeta_temporal):
    archivo_wav = "{}/{}.{}.wav".format(carpeta_temporal, os.path.basename(filename), sample_rate) 
    if os.path.isfile(archivo_wav):
        return archivo_wav
    os.makedirs(carpeta_temporal, exist_ok=True)
    print("convertir {} a {}  samplerate {}".format(filename, archivo_wav, sample_rate))
    comando = ["ffmpeg", "-i", filename, "-ac", "1", "-ar", str(sample_rate), archivo_wav]
    print("  COMANDO: {}".format(" ".join(comando)))
    code = subprocess.call(comando)
    if code != 0:
        raise Exception("ERROR en comando: " + " ".join(comando))
    return archivo_wav

    
def calcular_descriptores_mfcc(archivo_wav, sample_rate, samples_por_ventana, samples_salto, dimension):
    # leer audio
    samples, sr = librosa.load(archivo_wav, sr=None)
    print("audio samples={} samplerate={} segundos={:.1f}".format(len(samples), sr, len(samples) / sr))
    # calcular MFCC
    mfcc = librosa.feature.mfcc(y=samples, sr=sr, n_mfcc=dimension, n_fft=samples_por_ventana, hop_length=samples_salto)
    # convertir a descriptores por fila
    descriptores = mfcc.transpose()
    # en el descriptor MFCC la primera dimensión está relacionada al volumen global del audio (energía promedio)
    # usualmente es buena idea descartar la primera dimensión para tener descriptores invariantes al volumen global
    return descriptores


def calcular_mfcc_archivo(archivo_audio, sample_rate, samples_por_ventana, samples_salto, dimension, carpeta_temporal):
    archivo_wav = convertir_a_wav(archivo_audio, sample_rate, carpeta_temporal)
    descriptores = calcular_descriptores_mfcc(archivo_wav, sample_rate, samples_por_ventana, samples_salto, dimension)
    return descriptores


sample_rate = 44100         #calidad del audio (44100 es HD, se puede bajar)
samples_por_ventana = 4096  #tamaño de la ventana a la que se calcula un descriptor MFCC (usualmente unas 5 a 10 por segundo)
samples_salto = 4096        #se puede probar con un  el salto es menor al tamaño de la ventana para que haya traslape entre ventanas
dimension = 10              #largo del descriptor MFCC (usualmente entre 10 a 64)

print("ventana={} samples ({:.3f} segundos)".format(samples_por_ventana, samples_por_ventana/sample_rate))
print("salto  ={} samples ({:.3f} segundos)".format(samples_salto, samples_salto/sample_rate))

# donde crear archivos intermedios
carpeta_temporal = "audios_temporales"

archivo_audio = "vivaldi.mp3"
descriptores_mfcc = calcular_mfcc_archivo(archivo_audio, sample_rate, samples_por_ventana, samples_salto, dimension, carpeta_temporal)

print()
print("matriz de descriptores MFCC")
print("  filas={} columnas={} tipo={}".format(descriptores_mfcc.shape[0], descriptores_mfcc.shape[1], descriptores_mfcc.dtype))

descriptores_mfcc


# Ejemplo 2: Buscar segmentos de audios conocidos dentro de un audio desconocido

## 2.a) Primero se calculan descriptores de los audios conocidos (conjunto R)


In [None]:
class Ventana:
    def __init__(self, nombre_archivo, segundos_desde, segundos_hasta):
        self.nombre_archivo = nombre_archivo
        self.segundos_desde = segundos_desde
        self.segundos_hasta = segundos_hasta
    
    def __str__(self):
        return "{} [{:6.3f}-{:6.3f}]".format(self.nombre_archivo, self.segundos_desde, self.segundos_hasta)

def lista_ventanas(nombre_archivo, numero_descriptores, sample_rate, samples_por_ventana):
    # tantas ventanas como numero de descriptores
    tiempos = []
    for i in range(0, samples_por_ventana * numero_descriptores, samples_por_ventana):
        # tiempo de inicio de la ventana
        segundos_desde = i / sample_rate
        # tiempo de fin de la ventana
        segundos_hasta = (i + samples_por_ventana - 1) / sample_rate
        # crear objeto
        v = Ventana(nombre_archivo, segundos_desde, segundos_hasta)
        # agregar a la lista
        tiempos.append(v)
    return tiempos


def calcular_mfcc_varios_archivos(lista_archivos, sample_rate, samples_por_ventana, samples_salto, dimension, carpeta_temporal):
    descriptores_mfcc = []
    descriptores_ventanas = []
    for nombre_archivo in lista_archivos:
        audio_mfcc = calcular_mfcc_archivo(nombre_archivo, sample_rate, samples_por_ventana, samples_salto, dimension, carpeta_temporal)
        audio_ventanas = lista_ventanas(nombre_archivo, audio_mfcc.shape[0], sample_rate, samples_por_ventana)
        print("  descriptores: {}".format(audio_mfcc.shape))
        if len(descriptores_mfcc) == 0:
            descriptores_mfcc = audio_mfcc
        else:
            # agregar como filas
            descriptores_mfcc = numpy.vstack([descriptores_mfcc, audio_mfcc])
        # agregar al final
        descriptores_ventanas.extend(audio_ventanas)
    return descriptores_ventanas, descriptores_mfcc
   
def imprimir_ventanas(ventanas, mfcc, muestreo_ventanas=1):
    print("ventanas={} descriptores={}".format(len(ventanas), mfcc.shape))
    print("mostrando algunas ventanas:")
    for i in range(0, len(ventanas), muestreo_ventanas):
        print(" {:4d}) {} descriptor={}".format(i, ventanas[i], mfcc[i].shape))
    
archivos_conocidos = ["vivaldi.mp3", "jaivas.mp3"]

ventanas_conocidos, mfcc_conocidos = calcular_mfcc_varios_archivos(archivos_conocidos, sample_rate, samples_por_ventana, samples_salto, dimension, carpeta_temporal)

#escribir los descriptores de audios conocidos
print()
print("descriptores conocidos")
imprimir_ventanas(ventanas_conocidos, mfcc_conocidos, 1)


## 2.b) Se calculan descriptores de un audio desconocido (conjunto Q)


In [None]:
#audio consulta
query_archivo = "varios.mp3"

ventanas_query, mfcc_query = calcular_mfcc_varios_archivos([query_archivo], sample_rate, samples_por_ventana, samples_salto, dimension, carpeta_temporal)

print("Query: ventanas={} descriptores={}".format(len(ventanas_query), mfcc_query.shape))
#escribir los descriptores de audios conocidos
print()
print("descriptores audio desconocido")
imprimir_ventanas(ventanas_query, mfcc_query, 1)


## 2.c) Comparar descriptores del audio desconocido (Q) con los descriptores de los audios conocidos (R)

Con cdist se comparan todos los descriptores de Q contra todos los descriptores de R y entrega la matriz de distancias.


In [None]:
import scipy

matriz_distancias = scipy.spatial.distance.cdist(mfcc_query, mfcc_conocidos, metric='euclidean')
print(matriz_distancias.shape)
matriz_distancias


## 2.d) Para cada descriptor de Q mostrar la ventana más parecida de R

Cada ventana se identifica por el nombre del archivo y el intervalo de tiempo que representa.


In [None]:
#obtener la posicion del mas cercano por fila
posicion_min = numpy.argmin(matriz_distancias, axis=1)
minimo = numpy.amin(matriz_distancias, axis=1)

for i in range(len(ventanas_query)):
    query = ventanas_query[i]
    conocido = ventanas_conocidos[posicion_min[i]]
    diferencia = (conocido.segundos_desde - query.segundos_desde)
    print(" {:4d}) {} se parece a  {}    (diferencia de tiempos={:4.1f} seg.)".format(i, query, conocido, diferencia))


Notar que  en las zonas donde hay coincidencia de audio, los tiempos de ambas ventanas van avanzando al mismo tiempo, es decir, la diferencia entre los tiempos de sus ventanas `conocido.segundos_desde - query.segundos_desde` se mantiene constante.  

Si se agrupan las diferencias de tiempo que más se repiten, se pueden localizar las regiones comunes entre audios.

Por ejemplo, viendo la lista anterior se puede concluir que:

 * En `varios.mp3` entre [0.279 y 5.759] se escucha el audio de `vivaldi.mp3` entre los segundos 47.926 y 53.313 (es decir, tienen un desfase de 47.6 segundos).
 * En `varios.mp3` entre [5.851 y 8.916] se escucha `jaivas.mp3` entre [29.536 y 32.601]  (es decir, tienen un desfase de 23.7 segundos)





## 2.e) Votación entre ventanas parecidas para encontrar zonas que coinciden

Se buscan las zonas donde se repita más el trio (audioQ, audioR, offset). Se usará un diccionario y se acumularán votos.



In [None]:
class Votos:
    def __init__(self, name, query, conocido):
        self.name = name
        self.query_nombre = query.nombre_archivo
        self.query_inicio = query.segundos_desde
        self.query_fin = query.segundos_hasta
        self.conocido_nombre = conocido.nombre_archivo
        self.conocido_inicio = conocido.segundos_desde
        self.conocido_fin = conocido.segundos_hasta
        self.numVotos = 1

    def addVoto(self, query, conocido):
        # muevo el final de la zona y sumo un voto
        self.query_fin = query.segundos_hasta
        self.conocido_fin = conocido.segundos_hasta
        self.numVotos += 1

    def __str__(self):
        return "{} entre [{:6.3f}-{:6.3f}]  se parece a  {} entre [{:6.3f}-{:6.3f}]  ({} votos)".format(
            self.query_nombre, self.query_inicio, self.query_fin, 
            self.conocido_nombre, self.conocido_inicio, self.conocido_fin,
            self.numVotos)

contadores = dict()

for i in range(len(ventanas_query)):
    query = ventanas_query[i]
    conocido = ventanas_conocidos[posicion_min[i]]
    diferencia = (conocido.segundos_desde - query.segundos_desde)
    # llave para acumular (se podría mejorar la acumulación si la diferencia se redondea)
    key = "{}-{}-{:4.1f}".format(query.nombre_archivo, conocido.nombre_archivo, diferencia)
    # ver si hay votos anteriores
    votos = contadores.get(key)
    if votos is None:
        # se inicia votacion por ese desfase
        votos = Votos(key, query, conocido)
        contadores[key] = votos
    else:
        # suma un voto a una deteccion encontrada previamente con el mismo desfase
        votos.addVoto(query, conocido)

# mostrar las mayores votaciones
allVotos = list(contadores.values())
for v in sorted(allVotos, key = lambda x : x.numVotos, reverse=True):
    if v.numVotos > 20:
        print(v)


## Propuesto:

En la votacion anterior se encontraron dos zonas similares.

Notar que `varios.mp3` tiene **cinco zonas** parecidas (tres para `vivaldi.mp3` y dos para `jaivas.mp3`)

¿Se pueden detectar las cinco zonas? ¿Qué habría que modificar?

Ver el material el curso de esta semana y de la semana 07 para ideas sobre cómo se puede mejorar esta detección.
