In [1]:
import pickle
import numpy as np
import heapq

from gradio import DataFrame
from rich.progress import track

# load the data
with open("pca_features.pkl", "rb") as file:
    track_ids = pickle.load(file)
    features_pca = pickle.load(file)

In [3]:
import rtree
print(rtree.__version__)

1.3.0


In [4]:
print(len(track_ids))
print((features_pca).shape)

11883
(237660, 154)


In [28]:
print(type(track_ids))
print(track_ids[0])

<class 'list'>
7pse475uICmWRY5hEkvPvI



# Rtree and linear knn


In [5]:
from rtree import index
from tqdm import tqdm
import os


p = index.Property()
p.dimension = 154  

index_name = "rtree_index_spotify"
if os.path.exists(f"{index_name}.dat") and os.path.exists(f"{index_name}.idx"):
    rtree_index = index.Index(index_name, properties=p)
    print("Indice cargado")
else:
 
    rtree_index = index.Index(index_name, properties=p)
    total_vectors = len(track_ids) * 20  # Total de vectores para indexar
    with tqdm(total=total_vectors, desc="Indexando vectores en R-Tree") as pbar:
        for song_idx, track_id in enumerate(track_ids):  # Iterando en las canciones
            for local_idx in range(20):  # Cada canci칩n tiene 20 descriptores
                global_idx = song_idx * 20 + local_idx  # Indice global en features_pca
                vector = features_pca[global_idx]  # Obtener el vector local
                rtree_index.insert(global_idx, vector)  # Insertar en el R-Tree
                pbar.update(1)  # Incrementar la barra de progreso
    print("Indice guardado")

Indice cargado


In [18]:
from collections import Counter
import numpy as np
def knn_top_R_tree(rtree_index, features_pca, track_ids, query_vectors, k=2):
    """
    Realiza una b칰squeda KNN en un R-Tree para 20 vectores de una canci칩n y retorna las `k` canciones m치s cercanas.

    Par치metros:
        rtree_index (rtree.index.Index): 칈ndice R-Tree construido con los vectores PCA.
        features_pca (np.ndarray): Matriz de vectores PCA.
        track_ids (list): Lista de IDs de canciones, uno por canci칩n.
        query_vectors (np.ndarray): 20 vectores de consulta en el espacio PCA (de una canci칩n).
        k (int): N칰mero de canciones m치s cercanas a retornar.

    Retorna:
        list: Lista de las `k` canciones m치s cercanas, en formato:
              [(track_id1, votos1), (track_id2, votos2), ...].
    """
    start_time = time.time()
    # Acumular resultados de los 20 vectores
    all_neighbors = []

    for query_vector in query_vectors:
        # B칰squeda KNN para cada vector de consulta
        nearest_neighbors = list(rtree_index.nearest(query_vector, num_results=20))  # Buscar 20 vecinos por vector ya que cada cancion tiene 20 vectores caracteristicos
        all_neighbors.extend(nearest_neighbors)

    # Asociar vecinos con sus canciones
    song_indices = [neighbor // 20 for neighbor in all_neighbors]  # 칈ndices de canciones
    track_id_votes = [track_ids[song_idx] for song_idx in song_indices]  # Mapear a track_ids

    # Votaci칩n mayoritaria
    song_counts = Counter(track_id_votes)
    top_songs = song_counts.most_common(k)  # Seleccionar las `k` canciones m치s votadas
    
    search_time = time.time() - start_time
    return top_songs, search_time
def knn_top_lineal(cancionesc,track_ids,consultas, top_k):
    start_time = time.time()
    
    all_neighbors=[]
    
    for consulta in consultas:
        nearest_neighbors = [(-float('inf'), -1)] * 20 #20 caracteristicas
        for i, embedding in enumerate(cancionesc):
            dist = -np.linalg.norm(consulta - embedding)#consigo mi distancia euclidiana invertida para usar como max-heap
            if dist > nearest_neighbors[0][0]:
                heapq.heappop(nearest_neighbors)
                heapq.heappush(nearest_neighbors, (dist, i))
        nearest_neighbors = [i for dit, i in nearest_neighbors]
        all_neighbors.extend(nearest_neighbors)

    song_indices = [neighbor // 20 for neighbor in all_neighbors]
    track_id_votes = [track_ids[song_idx] for song_idx in song_indices]

    song_counts = Counter(track_id_votes)
    top_songs = song_counts.most_common(top_k)  
    
    search_time = time.time() - start_time
    return top_songs, search_time
def knn_busquedarango_lineal(cancionesc, track_ids, consultas,radio):
    all_neighbors = []
    distancias = [] 

    for consulta in consultas:
        nearest_neighbors = [(-float('inf'), -1)] * 20  # 20 caracter칤sticas
        for i, embedding in enumerate(cancionesc):
            dist = -np.linalg.norm(consulta - embedding)  # Distancia euclidiana invertida para max-heap
            if dist > nearest_neighbors[0][0]:
                heapq.heappop(nearest_neighbors)
                heapq.heappush(nearest_neighbors, (dist, i))
        nearest_neighbors = [(dit, i) for dit, i in nearest_neighbors]
        all_neighbors.extend([i for dit, i in nearest_neighbors])
        distancias.extend([-dit for dit, i in nearest_neighbors])  

    song_indices = [neighbor // 20 for neighbor in all_neighbors]
    track_id_votes = [track_ids[song_idx] for song_idx in song_indices] 
    song_counts = Counter(track_id_votes)

    track_id_distancias = {}
    for idx, track_id in enumerate(track_id_votes):
        if track_id not in track_id_distancias:
            track_id_distancias[track_id] = []
        track_id_distancias[track_id].append(distancias[idx])
    distancias_canciones = [(song_id, votes, np.mean(track_id_distancias[song_id])) for song_id, votes in song_counts.most_common()]
    top_songs = []
    for i, (song_id, votes, avg_dist) in enumerate(distancias_canciones):
        if i < 10 or avg_dist <= radio:
            top_songs.append((song_id, votes))

    return top_songs




In [19]:
song_idx = 0
query_vectors = features_pca[song_idx * 20 : (song_idx + 1) * 20]
result, time_serach_rtree = knn_top_R_tree(rtree_index, features_pca, track_ids, query_vectors, k=8) # Llamar a la funci칩n con k=3
resultado_final=[]
# Mostrar resultados
for i, (song_id, votes) in enumerate(result, 1):
    resultado_final.append(song_id)
print(resultado_final)

['7pse475uICmWRY5hEkvPvI', '5CwOUooch74h0XarhDfAQK', '3bZCS8ThTAxMJZavYWOY1z', '1U2xFfjK1QUuicENnW0iwv', '6MGryNr7aENIEfPUV1cHyg', '3OiEY2VLzrTyCoU8q2SQpe', '1oy6EH41CdAido7rIuuFzY', '2vPZ4Lklyu75zBR3SgbFNI']


# LSH

In [20]:
# Indice LSH
# Librer칤a faiss

import faiss
import numpy as np
import pickle
import time

# N = 11903  # N칰mero de canciones (features)


# Cargar los datos

"""
CHANGE PATH
"""
# with open("feature_spotify.pkl", "rb") as file:
with open(r"..\data_extraction\feature_spotify.pkl", "rb") as file:

    track_ids = pickle.load(file)
    features = pickle.load(file)

# Validar que track_ids y features tengan el mismo tama침o
assert len(track_ids) == len(features), "track_ids y features no est치n alineados."

# Aplanar los vectores y alinear los IDs
features_flat = []
track_ids_flat = []

for idx, (track_id, feature_vectors) in enumerate(zip(track_ids, features)):
    if len(feature_vectors) > 0 and len(feature_vectors[0]) == 1280:  # Validar dimensi칩n
        features_flat.extend(feature_vectors)  # Aplanar vectores
        track_ids_flat.extend([track_id] * len(feature_vectors))  # Repetir track_id

features_flat = np.array(features_flat, dtype="float32")  # Convertir a arreglo numpy

# Crear el 칤ndice LSH
n_bits = 1024  # N칰mero de bits para el hash LSH
index_lsh = faiss.IndexLSH(1280, n_bits)

# Indexaci칩n
start_time = time.time()
index_lsh.add(features_flat)
indexation_time = time.time() - start_time
print(f"Tiempo de indexaci칩n (LSH): {indexation_time:.2f} segundos")


Tiempo de indexaci칩n (LSH): 2.68 segundos


In [21]:
# Funci칩n de b칰squeda K-NN
def knn_search_lsh(query_index, k):
    query_vector = features_flat[query_index].reshape(1, -1)
    start_time = time.time()
    D, I = index_lsh.search(query_vector, k)
    search_time = time.time() - start_time
    
    print(f"B칰squeda k-NN para el objeto en el 칤ndice {query_index} (k={k}):")
    print("Distancias:", D[0])
    print("칈ndices:", I[0])
    
    # Imprimir los track_ids correspondientes
    result_ids = [track_ids_flat[idx] for idx in I[0] if idx != -1]  # Ignorar 칤ndices inv치lidos
    print("Track IDs:", result_ids)
    print(f"Tiempo de b칰squeda k-NN: {search_time:.2f} segundos")
    
    return result_ids, search_time


# 칈ndice de consulta
# query_index = 100 # Canci칩n de consulta
# k = 8       # N칰mero de vecinos m치s cercanos

# print("Buscar: ", track_ids_flat[query_index])

# print()
# # Ejecutar b칰squeda K-NN
# knn_res = knn_search_lsh(query_index, k)


# Parser


In [80]:
import pandas as pd

df_songs = pd.read_csv(r"..\data_extraction\spotify_songs.csv")
# map the track_ids to the song names
track_id_to_name = df_songs.set_index("track_id")["track_name"].to_dict()

In [95]:

# track_name_to_id = df_songs.set_index("track_name")["track_id"].to_dict()
track_name_to_id = {}
song_names_ids = os.listdir(r"..\data_extraction\data_previews")
for song_name_id in song_names_ids:
    song_name = song_name_id[:-27]
    song_id = song_name_id[-26:-4] 
    track_name_to_id[song_name] = song_id


In [96]:
print(track_name_to_id['Rasputin - Single Version'])

67hbP9PFQZrb4XZc3TzB0s


In [82]:
print(track_name_to_id['$20 Fine'])

7pse475uICmWRY5hEkvPvI


In [83]:
n = 5
for it in track_id_to_name:
    print(it)
    n-=1
    if n==0:
        break

0017A6SJgTbfQVU2EtsPNo
004s3t0ONYlzxII9PLgU6z
00chLpzhgVjxs1zKC9UScL
00cqd6ZsSkLZqGMlQCR0Zo
00emjlCv9azBN0fzuuyLqy


In [84]:
print(track_id_to_name['0017A6SJgTbfQVU2EtsPNo'])


Pangarap


In [None]:

# Adapters
def track_id_to_index(track_id):
    # map the track_id to its index, track_id is a list
    for idx, id in enumerate(track_ids):
        if id == track_id:
            return idx

In [85]:
import re
def parser(consulta):
    resultado_final=[]
    metodo = r"using\s+([a-zA-Z0-9_]+)" 
    song_name__ = r"where song_name\s*=\s+'(.*?)'\s+"
    
    top_k = r"LIMIT\s+([\d.]+)" 

    # Extraemos los valores usando `re.search`
    metodoknn = re.search(metodo, consulta)
    song_id_pre = re.search(song_name__, consulta)
    pre_top_k = re.search(top_k, consulta)
    metodofinal = metodoknn.group(1) 
    song_name_final = song_id_pre.group(1)
    final_top_k = pre_top_k.group(1)
    
    song_id_final = get_pos_by_name(song_name_final) # adapted to support song names
    
    if metodofinal=='knn_top_R_tree':
        song_idx = int(song_id_final)
        query_vectors = features_pca[song_idx * 20 : (song_idx + 1) * 20]
        result, time_search = knn_top_R_tree(rtree_index, features_pca, track_ids, query_vectors, k=int(final_top_k)) # Llamar a la funci칩n con k=3
        # agregar los resultados
        for i, (song_id, votes) in enumerate(result, 1):
            resultado_final.append(song_id)
    if metodofinal=='knn_top_lineal':
        indicecancion = int(song_id_final)
        query = features_pca[indicecancion * 20 : (indicecancion + 1) * 20]  # consulta
        top_k =int(final_top_k)  #cuantos k vecinos cercanos quiero
        result, time_search = knn_top_lineal(features_pca, track_ids, query, top_k)
        for i, (song_id, votes) in enumerate(result, 1):
            resultado_final.append(song_id)
    if metodofinal=='knn_busquedarango_lineal':
        indicecancion = int(song_id_final)
        query = features_pca[indicecancion * 20 : (indicecancion + 1) * 20]
        radio = float(final_top_k) 
        result = knn_busquedarango_lineal(features_pca, track_ids, query,radio)
        for i, (song_id, votes) in enumerate(result, 1):
            resultado_final.append(song_id)
    if metodofinal=='knn_search_lsh':
        query_index = int(song_id_final)
        k = int(final_top_k) 
        resultado_final, time_search = knn_search_lsh(query_index, k)
       
    track_names_result = [track_id_to_name[track_id] for track_id in resultado_final]
  
    return pd.DataFrame.from_dict({'Indices': track_names_result}), time_search
    


consultas para utilizar:
"select song_id from spotify_songs using knn_top_lineal where song_name = '0' LIMIT 7"


"select song_id from spotify_songs using knn_top_R_tree where song_name = '0' LIMIT 7"


"select song_id from spotify_songs using knn_top_R_tree where song_name = '5' LIMIT 7"


"select song_id from spotify_songs using knn_top_lineal where song_name = '5' LIMIT 7"


"select song_id from spotify_songs using knn_busquedarango_lineal where song_name = '0' LIMIT 0.3"


"select song_id from spotify_songs using knn_top_R_tree where song_name = '12' LIMIT 5"


"select song_id from spotify_songs using knn_top_lineal where song_name = '12' LIMIT 5"


"select song_id from spotify_songs using knn_top_R_tree where song_name = '40' LIMIT 12"


"select song_id from spotify_songs using knn_top_lineal where song_name = '40' LIMIT 12"

In [86]:
name_song = "'98 Freestyle"
consulta = f"select song_id from spotify_songs using knn_search_lsh where song_name = '{name_song}' LIMIT 5"
list_result, time_exec = parser(consulta)
print(type(list_result), type(time_exec))


B칰squeda k-NN para el objeto en el 칤ndice 4 (k=5):
Distancias: [  0. 408. 410. 410. 411.]
칈ndices: [     4  47516  89032 201079 147197]
Track IDs: ['7pse475uICmWRY5hEkvPvI', '1OBD6ZyBF8oYm4PRHt42zv', '3p7byaSR5J1he8vvDkxirp', '3ZjnFYlal0fXN6t61wdxhl', '4FtuDjqmTPP9mjxygLKzEm']
Tiempo de b칰squeda k-NN: 0.02 segundos
<class 'pandas.core.frame.DataFrame'> <class 'float'>


# FRONT

In [43]:
import gradio as gr
import os

def load_full_path_songs(directory):

    full_paths = {}
    for archivo in os.listdir(directory):
        try:
            name_id = archivo[:-27] # remove id.mp3
            
            full_paths[name_id] = os.path.join(directory, archivo)
        except :
            print(f"Error en {archivo}")
    # map the track_names to the song path
    return full_paths

def get_audio_by_name(name):
    # map the name to the id
    return full_path_songs[name]

full_path_songs = load_full_path_songs(r'..\data_extraction\data_previews')

def get_pos_by_name(name):
    song_id_ = track_name_to_id[name]
    return track_id_to_index(song_id_)

In [40]:
print(full_path_songs['$20 Fine'])

..\data_extraction\data_previews\$20 Fine_7pse475uICmWRY5hEkvPvI.mp3


In [41]:
itt = 5
for it in full_path_songs:
    print(it)
    itt-=1
    if itt==0:
        break

$20 Fine
$ave Dat Money (feat. Fetty Wap & Rich Homie Quan)
$Dreams
$ENHOR
'98 Freestyle


In [101]:

def interface_song_names():
    # Lista ordenada de nombres de canciones
    all_song_names = sorted(list(full_path_songs.keys()))

    with gr.Blocks(theme='HaleyCH/HaleyCH_Theme') as interfaz_final:
        gr.Markdown("# Spotify Songs Search Engine 游꿨")

        with gr.Row():
            # Reproductor de canciones
            with gr.Column(scale=1):
                gr.Markdown("## 游꿧 Song Player")
                nombre_dropdown = gr.Dropdown(
                    choices=all_song_names,
                    label="Select a Song",
                    allow_custom_value=True
                )
                audio_output = gr.Audio(
                    type="filepath",
                    label="Song Preview"
                )

                # Boton para reproducir
                play_btn = gr.Button("Play Preview", variant="primary")
                play_btn.click(
                    fn=get_audio_by_name,
                    inputs=nombre_dropdown,
                    outputs=audio_output
                )

            # Parser de busqueda
            with gr.Column(scale=1):
                gr.Markdown("## 游댌 Song Search")
                parser_input = gr.Textbox(
                    label="Search Query",
                    placeholder="Enter your search query"
                )
                parser_output = gr.Dataframe(
                    label="Similar Song IDs",
                    headers=None,
                    datatype="str",
                )
                tiempo_ejecucion = gr.Textbox(
                    label="Execution Time (seconds)",
                    interactive=False
                )

                # Boton para buscar
                search_btn = gr.Button("Search", variant="primary")
                search_btn.click(
                    fn=parser,
                    inputs=parser_input,
                    outputs=[parser_output, tiempo_ejecucion]
                )
                examples = gr.Examples(examples=[
                    """
                    select song_id from spotify_songs using knn_search_lsh where song_name = '$20 Fine' LIMIT 5 
                    """,
                    """
                    select song_id from spotify_songs using knn_search_lsh where song_name = ''98 Freestyle' LIMIT 5 
                    """,
                ], inputs=[parser_input])

    return interfaz_final


In [102]:

interfaz = interface_song_names()
interfaz.launch( 
    debug=True ,
    allowed_paths=[r"..\data_extraction\data_previews"]
)


* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.


Keyboard interruption in main thread... closing server.




In [104]:
interfaz.close()

Closing server running on port: 7860
