In [None]:
from collections import defaultdict
from sklearn.cluster import KMeans
import numpy as np
import heapq
import os
import librosa
from IPython.display import Audio, display
import time


### Feature Extraction
Usamos Mel Frequency Cepstral CoefficientS (MFCC)

- Extraemos los features en una cierta ventana de tiempo

In [2]:
def generate_descriptors(dataset_path):
    descriptors = []
    filenames = []

    for fname in os.listdir(dataset_path):
        if fname.endswith(".wav"):
            path = os.path.join(dataset_path, fname)
            y, sr = librosa.load(path, sr=None)

            mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)  # shape: (13, n_frames)
            mfcc = mfcc.T  # shape: (n_frames, 13)
            descriptors.append(mfcc.astype(np.float32))
            filenames.append(fname.replace(".wav", ""))

    return descriptors, filenames


def apply_kmeans(descriptors, n_clusters):
    # K-Means para construir el diccionario visual
    """
    if os.path.exists("kmeans_model.pkl"):
        kmeans = joblib.load("kmeans_model.pkl")
    else: 
        kmeans = KMeans(n_clusters=n_clusters, random_state=0, n_init="auto") #n_clusters con el que obtuvimos mejores resultados
        kmeans.fit(np.vstack(descriptors))
        joblib.dump(kmeans, "kmeans_model.pkl")
    """
    
    kmeans = KMeans(n_clusters=n_clusters, random_state=0, n_init="auto") #n_clusters con el que obtuvimos mejores resultados
    kmeans.fit(np.vstack(descriptors))
    return kmeans

def construct_bd_histograms(descriptors, kmeans):
    all_histograms = []
    for descriptor in descriptors:
        histogram = np.zeros(kmeans.n_clusters, dtype=int)

        if descriptor is not None:
            reshaped_descriptor = descriptor
            cluster_assignments = kmeans.predict(reshaped_descriptor)  # descriptor.shape = (n_frames, 13)
            for idx in cluster_assignments:
                histogram[idx] += 1

        # Normalización
        histogram = histogram.astype(float)
        histogram /= np.sum(histogram) if np.sum(histogram) > 0 else 1
        all_histograms.append(histogram)
    
    return all_histograms

def create_query_histogram(query_path, kmeans): 
    y, sr = librosa.load(query_path, sr=None)
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    mfcc = mfcc.T  # (n_frames, 13)

    cluster_assignments = kmeans.predict(mfcc)

    histogram = np.zeros(kmeans.n_clusters, dtype=int)
    for idx in cluster_assignments:
        histogram[idx] += 1

    histogram = histogram.astype(float)
    histogram /= np.sum(histogram) if np.sum(histogram) > 0 else 1
    return histogram



def calcular_tf_idf(descriptors, all_histograms, kmeans):
    N = len(descriptors) #equivalente a la cantidad de imagenes 
    all_histograms = np.array(all_histograms, dtype=np.float32) 

    tf = all_histograms  # Como ya normalizamos antes, el tf es el histograma

    idf = []
    for i in range(kmeans.n_clusters): #i es el índice del visual word
        df = np.sum(all_histograms[:, i] > 0) # all_histograms > 0 me da una matriz de booleanos (si aparece el word o no en el doc)
        idf.append(np.log(N / (df + 1e-8))) # Sumo constante chiquita para evitar división por cero

    idf_np = np.array(idf, dtype=np.float32)
    tf_idf = tf * idf_np # (num_imagenes, num_clusters). tf_idf[i][j] es la importancia del visual word j en la imagen i

    return tf_idf, idf_np #retorno idf_np para poder aplicarlo a la query

def knn_secuencial(query_hist, k, idf_np, all_histograms, image_paths):
    # Aplicar TF-IDF al histograma de la query
    query = query_hist * idf_np

    heap = []
    for i, hist in enumerate(all_histograms):
        dot = np.dot(query, hist)
        norm_query = np.linalg.norm(query)
        norm_hist = np.linalg.norm(hist)

        cosine_sim = 0
        if norm_query != 0 and norm_hist != 0:
            cosine_sim = dot / (norm_query * norm_hist)


        heapq.heappush(heap, (cosine_sim, image_paths[i]))  

        if len(heap) > k:
            heapq.heappop(heap)  # eliminar el menos similar

    # Mostrar resultados en orden descendente de similitud 
    top_k = sorted(heap, reverse=True)
    return top_k

#### KNN INVERTIDO ####
def construir_indice_invertido(all_histograms):
    inverted_index = defaultdict(set)
    for img_idx, hist in enumerate(all_histograms):
        for word_id, freq in enumerate(hist):
            if freq > 0:
                inverted_index[word_id].add(img_idx)
    return inverted_index

def obtener_candidatos(query_hist, inverted_index):
    candidatos = set()
    for word_id, freq in enumerate(query_hist):
        if freq > 0:
            candidatos.update(inverted_index.get(word_id, []))
    return candidatos

def knn_con_indice(query_hist, k, idf_np, all_histograms, image_paths, inverted_index):
    query = query_hist * idf_np
    norm_query = np.linalg.norm(query)
    heap = []

    # Visual words presentes en la query
    visual_words = np.where(query_hist > 0)[0]

    # Conjunto de candidatos
    candidatos = set()
    
    for word in visual_words:
        candidatos.update(inverted_index[word])
    print(f"Candidatos a comparar: {len(candidatos)} de {len(image_paths)}")

    for idx in candidatos:
        hist = all_histograms[idx]
        norm_hist = np.linalg.norm(hist)

        cosine_sim = 0
        if norm_query != 0 and norm_hist != 0:
            cosine_sim = np.dot(query, hist) / (norm_query * norm_hist)

        heapq.heappush(heap, (cosine_sim, image_paths[idx]))
        if len(heap) > k:
            heapq.heappop(heap)

    top_k = sorted(heap, reverse=True)
    return top_k

def knn_con_indice2(query_hist, k, idf_np, all_histograms, image_paths, inverted_index):
    query = query_hist * idf_np
    norm_query = np.linalg.norm(query)

    # Obtener visual words presentes en la query
    visual_words = np.where(query_hist > 0)[0]

    # Conjunto de candidatos por índice invertido
    candidatos = set()
    for word in visual_words:
        candidatos.update(inverted_index[word])
    print(f"Candidatos a comparar: {len(candidatos)} de {len(all_histograms)}")

    if len(candidatos) == 0:
        print("No se encontraron candidatos para comparar.")
        return []

    # Convertir lista de índices en array
    candidatos = np.array(list(candidatos))

    # Seleccionar solo los histogramas de candidatos
    cand_histograms = all_histograms[candidatos]

    # Normalizar los histogramas candidatos
    cand_norms = np.linalg.norm(cand_histograms, axis=1)
    sim = np.dot(cand_histograms, query) / (cand_norms * norm_query + 1e-10)

    # Obtener top-k
    top_k_indices = np.argsort(sim)[-k:][::-1]
    top_k = [(sim[i], image_paths[candidatos[i]]) for i in top_k_indices]

    return top_k

def print_top_results(top_k):
    print("\nTop audios más similares:")
    for sim, name in top_k:
        print(f"{name}: similitud = {sim:.4f}")

def show_audio_results(query_path, top_k_results, dataset_path, sample_rate=22050):
    print("Query audio:")
    display(Audio(query_path, rate=sample_rate))

    print("Top-k Similar Audios:")
    for i, (sim, filename) in enumerate(top_k_results, start=1):
        audio_path = os.path.join(dataset_path, filename + ".wav")
        if not os.path.exists(audio_path):
            print(f"[{i}] No se encontró el file {filename}")
            continue
        print(f"[{i}] {filename} (similitud: {sim:.3f})")
        display(Audio(audio_path, rate=sample_rate))

In [None]:
dataset_path = "../data/audios_1000/"
n_clusters = 100

descriptors, image_paths = generate_descriptors(dataset_path)
kmeans = apply_kmeans(descriptors, n_clusters)
all_histograms = construct_bd_histograms(descriptors, kmeans)
tf_idf, idf = calcular_tf_idf(descriptors, all_histograms, kmeans)

k = 5
query_path = "../data/audio_tests/011799_3.wav"
query_histogram = create_query_histogram(query_path, kmeans)

In [8]:
start_time_seq = time.perf_counter()
top_k_results_seq = knn_secuencial(query_histogram, k, idf, all_histograms, image_paths)
end_time_seq = time.perf_counter()
print(f"\n Tiempo KNN Secuencial: {end_time_seq - start_time_seq:.4f} segundos")
show_audio_results(query_path, top_k_results_seq, dataset_path)



 Tiempo KNN Secuencial: 0.0262 segundos
Query audio:


Top-k Similar Audios:
[1] 036589_3 (similitud: 0.804)


[2] 146707_1 (similitud: 0.754)


[3] 075551_1 (similitud: 0.732)


[4] 103957_1 (similitud: 0.716)


[5] 033559_1 (similitud: 0.716)


In [9]:


# === KNN con Índice Invertido ===
all_histograms_np = np.array(all_histograms, dtype=np.float32)

start_time_inv = time.perf_counter()
inverted_index = construir_indice_invertido(all_histograms)
top_k_results_inv = knn_con_indice2(query_histogram, k, idf, all_histograms_np, image_paths, inverted_index)

end_time_inv = time.perf_counter()

print(f"\n Tiempo KNN con Índice Invertido: {end_time_inv - start_time_inv:.4f} segundos")
print_top_results(top_k_results_inv)
show_audio_results(query_path, top_k_results_seq, dataset_path)


Candidatos a comparar: 848 de 990

 Tiempo KNN con Índice Invertido: 0.0292 segundos

Top audios más similares:
036589_3: similitud = 0.8039
146707_1: similitud = 0.7540
075551_1: similitud = 0.7317
103957_1: similitud = 0.7162
033559_1: similitud = 0.7159
Query audio:


Top-k Similar Audios:
[1] 036589_3 (similitud: 0.804)


[2] 146707_1 (similitud: 0.754)


[3] 075551_1 (similitud: 0.732)


[4] 103957_1 (similitud: 0.716)


[5] 033559_1 (similitud: 0.716)
